Skip to content

Share sparkey readers across DoFns with memory-aware heap fallback#5903

Draft
spkrka wants to merge 1 commit intospotify:mainfrom
spkrka:krka/shared-reader
Draft

Share sparkey readers across DoFns with memory-aware heap fallback#5903
spkrka wants to merge 1 commit intospotify:mainfrom
spkrka:krka/shared-reader

Conversation

@spkrka
Copy link
Copy Markdown
Member

@spkrka spkrka commented Mar 27, 2026

Summary

Sparkey side inputs on Dataflow suffer from two problems:

  1. Per-DoFn reader duplication: Beam creates one DoFn per vCPU (e.g. 80 on n4-standard-80). Each independently opens the same sparkey files via mmap, wasting file descriptors and mmap regions.

  2. Page cache thrashing: Dataflow hardcodes the JVM heap to ~70% of worker memory. On a 320 GB machine, only ~96 GB remains for the OS page cache — but sparkey side inputs can easily exceed 200 GB. The result is severe thrashing and I/O-bound jobs (10-14h runtimes).

This PR addresses both:

  • Shared reader cache — A static ConcurrentHashMap<path, CompletableFuture<SparkeyReader>> deduplicates readers across all DoFn instances on a worker. The first thread to request a reader loads it; others block on future.join(). Failed futures stay cached so subsequent callers fast-fail (reader failures are deterministic and fatal).

  • Memory-aware heap fallback — New HostMemoryTracker estimates two budgets at JVM startup:

    • Off-heap: totalPhysicalMemory - maxHeap - 2 GB reserve (for OS, kernel, Beam shuffle, etc.)
    • Heap: maxHeap - max(4 GB, 10% of maxHeap) (GC headroom and non-sparkey allocations)

    Each shard atomically claims budget via getAndAccumulate (lock-free CAS). Strategy per shard:

    1. Try off-heap → open via mmap (default, zero-copy)
    2. If off-heap exhausted → try heap → open via Sparkey.reader().useHeap(true) (reads into byte[])
    3. If neither fits → fall back to mmap with a warning (best-effort, may thrash)

    This lets the otherwise-wasted JVM heap absorb overflow, achieving ~100% memory utilization.

Design choices

  • Dedicated 4-thread loader pool instead of ForkJoinPool.commonPool(): reader loading involves blocking I/O (GCS download, disk reads, heap copying). Using the common pool risks starving other JVM work. 4 threads balances parallelism for multiple side inputs without overwhelming disk/network. Threads are daemon (won't block JVM shutdown) and named sparkey-reader-loader-N for debuggability.

  • No budget release: Readers are cached for the JVM lifetime and never closed (Beam side inputs have no teardown lifecycle). Budget is similarly never released. If a sharded reader fails partway through (some shards claimed), that budget is leaked — acceptable since the failure is fatal to the pipeline.

  • No cache eviction on failure: If getReader throws, the failed CompletableFuture stays in the cache. Reader failures are deterministic (corrupt files, missing shards), so retrying would just repeat the same failure. Fast-failing is preferable.

  • Heap reserve minimum of 4 GB: Workers with <4 GB heap get zero heap budget, effectively disabling the heap fallback. This is intentional — small heaps shouldn't absorb sparkey data.

  • Sparkey 3.5.1 → 3.7.0: Required for Sparkey.reader().useHeap(true) builder API.

Impact

Tested on a pipeline processing 1.6B user entities with ~200 GB of sparkey side inputs on n4-standard-80 (320 GB):

  • Runtime: 10-13h → 33 min (20x)
  • Throughput: 8-15K → 1.4M elem/s (100x)
  • Cost per run: ~$1,300 → ~$70
  • CPU utilization: low (I/O bound) → 100% (compute bound)

Test plan

  • Existing sparkey unit tests pass
  • MiMa binary compatibility check passes
  • Manual validation on Dataflow with sharded sparkey side inputs
  • Verify log output shows budget claims and heap/mmap decisions

🤖 Generated with Claude Code

@codecov
Copy link
Copy Markdown

codecov bot commented Mar 27, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 61.73%. Comparing base (d59fbba) to head (47495bd).

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #5903      +/-   ##
==========================================
+ Coverage   61.54%   61.73%   +0.19%     
==========================================
  Files         317      318       +1     
  Lines       11653    11709      +56     
  Branches      822      833      +11     
==========================================
+ Hits         7172     7229      +57     
+ Misses       4481     4480       -1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@spkrka spkrka force-pushed the krka/shared-reader branch from 247d2dd to 97c986a Compare March 27, 2026 15:46
* claimed), the budget for those shards is leaked. This is acceptable since a reader failure is
* fatal to the pipeline.
*/
private[sparkey] class HostMemoryTracker(offHeapBudget: Long, heapBudget: Long) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is technically not purely a sparkey thing, could also be used for other resources/sideinput types

@spkrka spkrka force-pushed the krka/shared-reader branch from 97c986a to 8166f8b Compare March 27, 2026 16:14
…llback

- Shared reader cache: static ConcurrentHashMap deduplicates readers across
  all DoFn instances on a worker (was: one reader per vCPU thread).
  Loaded concurrently via a dedicated 4-thread daemon pool.

- HostMemoryTracker: estimates off-heap budget (totalPhysical - maxHeap - 2GB)
  and heap budget (maxHeap - max(4GB, 10%)). Each shard atomically claims
  from off-heap first; if exhausted, claims heap and opens with
  Sparkey.reader().useHeap(true); if neither fits, falls back to mmap
  with a warning.

- Bump sparkey 3.5.1 → 3.7.0 for the heap-backed reader API.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

Fix Scala 2.12 SLF4J logger ambiguity
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant