Skip to content

Fix #3004: Sort Inputs by foreach index in join steps#3009

Closed
orbin123 wants to merge 4 commits intoNetflix:masterfrom
orbin123:fix/sort-inputs-by-foreach-index
Closed

Fix #3004: Sort Inputs by foreach index in join steps#3009
orbin123 wants to merge 4 commits intoNetflix:masterfrom
orbin123:fix/sort-inputs-by-foreach-index

Conversation

@orbin123
Copy link

@orbin123 orbin123 commented Mar 12, 2026

Summary

Sort the Inputs.flows list by foreach index so that inputs[i]
deterministically corresponds to the i-th foreach split in join steps.

Context / Motivation

The Inputs.__init__ in metaflow/datastore/inputs.py has a TODO comment:
# TODO sort by foreach index. Without sorting, inputs[i] in a foreach
join step returns whichever split the datastore resolved in arbitrary order,
making index-based access unreliable.

Fixes #3004

Changes Made

  • Added _sort_by_foreach_index() method to Inputs class that sorts
    self.flows by _foreach_stack[-1].index for foreach joins
  • Non-foreach joins (static splits) are left unchanged — the method returns
    early when _foreach_stack is absent or empty
  • Added unit tests in test/unit/test_inputs.py covering:
    • Foreach inputs sorted from reverse/shuffled order
    • Index-based access correctness after sorting
    • Static split order preservation
    • Empty inputs edge case
    • Iteration order correctness

Testing

  • Added test/unit/test_inputs.py with 8 test cases
  • All existing unit tests pass: cd test/unit && python -m pytest -v
  • Manually tested with a foreach flow (5 splits) confirming
    inputs[i].item == i after the fix

Trade-offs / Design Decisions

  • Accesses flow._datastore["_foreach_stack"] directly instead of going
    through flow.__getattr__ to avoid caching side effects on the flow object
  • Uses try/except for graceful handling of non-foreach joins rather than
    checking join_type, since the Inputs class doesn't receive join type info
  • Sorting is stable (list.sort), so equal-index items (shouldn't happen in
    practice) preserve their relative order

AI Disclosure

  • I used AI tools in preparing this contribution

(Check if applicable — per Metaflow's AI Tool Usage Policy, you must be able
to explain every line of your code.)

The Inputs class now sorts flows by their foreach_stack index when
constructing the inputs list for foreach join steps. This ensures
inputs[i] always corresponds to the i-th foreach split.

For non-foreach joins (static splits), the original order is preserved.

Fixes Netflix#3004
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 12, 2026

Greptile Summary

This PR resolves the long-standing # TODO sort by foreach index in Inputs.__init__ by introducing _sort_by_foreach_index(), ensuring inputs[i] deterministically maps to the i-th foreach split in join steps.

  • Core fix (metaflow/datastore/inputs.py): The new method fetches _foreach_stack once per flow into a stacks dict during the validation pass, then reuses those values as the sort key — correctly avoiding the double-datastore-read pattern flagged in prior review threads.
  • Early-return guards correctly skip sorting for empty inputs, non-foreach (static-split) joins where _foreach_stack is absent, and joins where the stack is empty.
  • Stable sort (list.sort) preserves relative order for any hypothetical duplicate-index flows.
  • Tests (test/unit/test_inputs.py): 8 cases cover reverse, shuffled, and already-sorted foreach inputs; index-based access; static-split preservation; empty inputs; and iteration order. Two minor gaps: the test-local ForeachFrame duplicates the production definition from tuple_util.py, and there is no test for nested (multi-level) foreach stacks where multiple frames are present in the stack.

Confidence Score: 5/5

  • Safe to merge — addresses a well-scoped correctness bug with no regressions to existing behaviour.
  • The change is small and self-contained. The previous double-load concern was addressed by caching stacks in a dict. The id()-keyed dict is safe because self.flows keeps every flow alive during the sort. Both non-foreach and empty-input paths are guarded with early returns. Tests are thorough for the primary use cases. Minor style concerns in the test file do not affect correctness.
  • No files require special attention.

Important Files Changed

Filename Overview
metaflow/datastore/inputs.py Adds _sort_by_foreach_index() that loads _foreach_stack once per flow into a stacks dict (avoiding the double-load concern from earlier review threads) and sorts self.flows in-place before the setattr loop. Logic is correct: early-return for empty inputs, non-foreach joins (empty stack or missing key), and stable sort for the foreach case.
test/unit/test_inputs.py New test file covering reverse/shuffled order, index access, static-split preservation, empty inputs, and iteration. Minor issues: ForeachFrame is duplicated instead of imported from production source, and there is no test for multi-level (nested) foreach stacks.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[Inputs.__init__ called with flows] --> B[self.flows = list of flows]
    B --> C[call _sort_by_foreach_index]
    C --> D{flows list empty?}
    D -- yes --> E[return early, no-op]
    D -- no --> F[Loop over each flow]
    F --> G[load _foreach_stack from flow datastore]
    G --> H{KeyError or AttributeError raised?}
    H -- yes --> E
    G --> I{stack is empty or None?}
    I -- yes --> E
    I -- no --> J[store stack in dict keyed by flow id]
    J --> K{more flows?}
    K -- yes --> F
    K -- no --> L[sort self.flows by topmost frame index]
    L --> M[return to __init__]
    M --> N[setattr loop: bind step name to flow]
Loading

Last reviewed commit: 8e962f0

_foreach_stack was fetched once per flow during the guard loop but the
results were discarded; the sort key then re-fetched the same artifact
for every flow. Capture each stack in a dict keyed by flow id during the
guard pass and reuse it as the sort key, halving datastore I/O for
foreach joins backed by remote storage.
Comment on lines +9 to +11
ForeachFrame = namedtuple(
"ForeachFrame", ["step", "var", "num_splits", "index", "value"]
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test ForeachFrame diverges from production definition

The test replicates ForeachFrame using a plain namedtuple, but the real definition in metaflow/tuple_util.py uses namedtuple_with_defaults — giving all fields default values of None. This divergence is harmless for the current tests (they all supply every field explicitly), but it means a future test that omits a field will fail with TypeError in the test harness while succeeding against the production type.

A more robust approach is to import the real ForeachFrame directly:

from metaflow.tuple_util import ForeachFrame

This also guards against any future changes to the field list in tuple_util.py.

Comment on lines +113 to +120
def test_iter_foreach_sorted(self):
"""Iterating over foreach Inputs should yield sorted order."""
flows = [_make_mock_flow("process", foreach_index=i) for i in [2, 0, 1]]

inputs = Inputs(flows)

indices = [f._datastore["_foreach_stack"][-1].index for f in inputs]
assert indices == [0, 1, 2]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No test coverage for nested (multi-level) foreach stacks

All tests use a single-frame _foreach_stack ([frame]). In practice, a nested foreach will have multiple frames — e.g., [outer_frame, inner_frame]. The sort key correctly reads stacks[id(f)][-1].index (the topmost frame), but this behaviour is never exercised in the test suite.

Consider adding a test case like:

def test_nested_foreach_sorted_by_innermost_index(self):
    outer_frame = ForeachFrame(step="outer", var="a", num_splits=2, index=0, value="0")
    flows = [
        _make_mock_flow_with_stack("process", [outer_frame, ForeachFrame("inner","b",3,i,"v")])
        for i in [2, 0, 1]
    ]
    inputs = Inputs(flows)
    inner_indices = [f._datastore["_foreach_stack"][-1].index for f in inputs]
    assert inner_indices == [0, 1, 2]

This documents and locks in the "sort by innermost index" contract.

@npow
Copy link
Collaborator

npow commented Mar 12, 2026

Thanks for the contribution! This is addressed more robustly in #2974 which sorts at the datastore layer in task.py where the join type is already known, avoiding the need for defensive checks. Closing in favor of that PR.

@npow npow closed this Mar 12, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Sort input flows by foreach index in Inputs class for join steps

2 participants