Share sparkey readers across DoFns with memory-aware heap fallback#5903
Draft
spkrka wants to merge 1 commit intospotify:mainfrom
Draft
Share sparkey readers across DoFns with memory-aware heap fallback#5903spkrka wants to merge 1 commit intospotify:mainfrom
spkrka wants to merge 1 commit intospotify:mainfrom
Conversation
4 tasks
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #5903 +/- ##
==========================================
+ Coverage 61.54% 61.73% +0.19%
==========================================
Files 317 318 +1
Lines 11653 11709 +56
Branches 822 833 +11
==========================================
+ Hits 7172 7229 +57
+ Misses 4481 4480 -1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
247d2dd to
97c986a
Compare
spkrka
commented
Mar 27, 2026
| * claimed), the budget for those shards is leaked. This is acceptable since a reader failure is | ||
| * fatal to the pipeline. | ||
| */ | ||
| private[sparkey] class HostMemoryTracker(offHeapBudget: Long, heapBudget: Long) { |
Member
Author
There was a problem hiding this comment.
I guess this is technically not purely a sparkey thing, could also be used for other resources/sideinput types
97c986a to
8166f8b
Compare
…llback - Shared reader cache: static ConcurrentHashMap deduplicates readers across all DoFn instances on a worker (was: one reader per vCPU thread). Loaded concurrently via a dedicated 4-thread daemon pool. - HostMemoryTracker: estimates off-heap budget (totalPhysical - maxHeap - 2GB) and heap budget (maxHeap - max(4GB, 10%)). Each shard atomically claims from off-heap first; if exhausted, claims heap and opens with Sparkey.reader().useHeap(true); if neither fits, falls back to mmap with a warning. - Bump sparkey 3.5.1 → 3.7.0 for the heap-backed reader API. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Fix Scala 2.12 SLF4J logger ambiguity
8166f8b to
47495bd
Compare
This was referenced Mar 28, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Sparkey side inputs on Dataflow suffer from two problems:
Per-DoFn reader duplication: Beam creates one DoFn per vCPU (e.g. 80 on n4-standard-80). Each independently opens the same sparkey files via mmap, wasting file descriptors and mmap regions.
Page cache thrashing: Dataflow hardcodes the JVM heap to ~70% of worker memory. On a 320 GB machine, only ~96 GB remains for the OS page cache — but sparkey side inputs can easily exceed 200 GB. The result is severe thrashing and I/O-bound jobs (10-14h runtimes).
This PR addresses both:
Shared reader cache — A static
ConcurrentHashMap<path, CompletableFuture<SparkeyReader>>deduplicates readers across all DoFn instances on a worker. The first thread to request a reader loads it; others block onfuture.join(). Failed futures stay cached so subsequent callers fast-fail (reader failures are deterministic and fatal).Memory-aware heap fallback — New
HostMemoryTrackerestimates two budgets at JVM startup:totalPhysicalMemory - maxHeap - 2 GB reserve(for OS, kernel, Beam shuffle, etc.)maxHeap - max(4 GB, 10% of maxHeap)(GC headroom and non-sparkey allocations)Each shard atomically claims budget via
getAndAccumulate(lock-free CAS). Strategy per shard:Sparkey.reader().useHeap(true)(reads intobyte[])This lets the otherwise-wasted JVM heap absorb overflow, achieving ~100% memory utilization.
Design choices
Dedicated 4-thread loader pool instead of
ForkJoinPool.commonPool(): reader loading involves blocking I/O (GCS download, disk reads, heap copying). Using the common pool risks starving other JVM work. 4 threads balances parallelism for multiple side inputs without overwhelming disk/network. Threads are daemon (won't block JVM shutdown) and namedsparkey-reader-loader-Nfor debuggability.No budget release: Readers are cached for the JVM lifetime and never closed (Beam side inputs have no teardown lifecycle). Budget is similarly never released. If a sharded reader fails partway through (some shards claimed), that budget is leaked — acceptable since the failure is fatal to the pipeline.
No cache eviction on failure: If
getReaderthrows, the failedCompletableFuturestays in the cache. Reader failures are deterministic (corrupt files, missing shards), so retrying would just repeat the same failure. Fast-failing is preferable.Heap reserve minimum of 4 GB: Workers with <4 GB heap get zero heap budget, effectively disabling the heap fallback. This is intentional — small heaps shouldn't absorb sparkey data.
Sparkey 3.5.1 → 3.7.0: Required for
Sparkey.reader().useHeap(true)builder API.Impact
Tested on a pipeline processing 1.6B user entities with ~200 GB of sparkey side inputs on n4-standard-80 (320 GB):
Test plan
🤖 Generated with Claude Code