Fix #3004: Sort Inputs by foreach index in join steps#3009
Fix #3004: Sort Inputs by foreach index in join steps#3009orbin123 wants to merge 4 commits intoNetflix:masterfrom
Conversation
The Inputs class now sorts flows by their foreach_stack index when constructing the inputs list for foreach join steps. This ensures inputs[i] always corresponds to the i-th foreach split. For non-foreach joins (static splits), the original order is preserved. Fixes Netflix#3004
Greptile SummaryThis PR resolves the long-standing
Confidence Score: 5/5
Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[Inputs.__init__ called with flows] --> B[self.flows = list of flows]
B --> C[call _sort_by_foreach_index]
C --> D{flows list empty?}
D -- yes --> E[return early, no-op]
D -- no --> F[Loop over each flow]
F --> G[load _foreach_stack from flow datastore]
G --> H{KeyError or AttributeError raised?}
H -- yes --> E
G --> I{stack is empty or None?}
I -- yes --> E
I -- no --> J[store stack in dict keyed by flow id]
J --> K{more flows?}
K -- yes --> F
K -- no --> L[sort self.flows by topmost frame index]
L --> M[return to __init__]
M --> N[setattr loop: bind step name to flow]
Last reviewed commit: 8e962f0 |
_foreach_stack was fetched once per flow during the guard loop but the results were discarded; the sort key then re-fetched the same artifact for every flow. Capture each stack in a dict keyed by flow id during the guard pass and reuse it as the sort key, halving datastore I/O for foreach joins backed by remote storage.
…/orbin123/metaflow into fix/sort-inputs-by-foreach-index
| ForeachFrame = namedtuple( | ||
| "ForeachFrame", ["step", "var", "num_splits", "index", "value"] | ||
| ) |
There was a problem hiding this comment.
Test ForeachFrame diverges from production definition
The test replicates ForeachFrame using a plain namedtuple, but the real definition in metaflow/tuple_util.py uses namedtuple_with_defaults — giving all fields default values of None. This divergence is harmless for the current tests (they all supply every field explicitly), but it means a future test that omits a field will fail with TypeError in the test harness while succeeding against the production type.
A more robust approach is to import the real ForeachFrame directly:
from metaflow.tuple_util import ForeachFrameThis also guards against any future changes to the field list in tuple_util.py.
| def test_iter_foreach_sorted(self): | ||
| """Iterating over foreach Inputs should yield sorted order.""" | ||
| flows = [_make_mock_flow("process", foreach_index=i) for i in [2, 0, 1]] | ||
|
|
||
| inputs = Inputs(flows) | ||
|
|
||
| indices = [f._datastore["_foreach_stack"][-1].index for f in inputs] | ||
| assert indices == [0, 1, 2] |
There was a problem hiding this comment.
No test coverage for nested (multi-level) foreach stacks
All tests use a single-frame _foreach_stack ([frame]). In practice, a nested foreach will have multiple frames — e.g., [outer_frame, inner_frame]. The sort key correctly reads stacks[id(f)][-1].index (the topmost frame), but this behaviour is never exercised in the test suite.
Consider adding a test case like:
def test_nested_foreach_sorted_by_innermost_index(self):
outer_frame = ForeachFrame(step="outer", var="a", num_splits=2, index=0, value="0")
flows = [
_make_mock_flow_with_stack("process", [outer_frame, ForeachFrame("inner","b",3,i,"v")])
for i in [2, 0, 1]
]
inputs = Inputs(flows)
inner_indices = [f._datastore["_foreach_stack"][-1].index for f in inputs]
assert inner_indices == [0, 1, 2]This documents and locks in the "sort by innermost index" contract.
|
Thanks for the contribution! This is addressed more robustly in #2974 which sorts at the datastore layer in |
Summary
Sort the
Inputs.flowslist by foreach index so thatinputs[i]deterministically corresponds to the i-th foreach split in join steps.
Context / Motivation
The
Inputs.__init__inmetaflow/datastore/inputs.pyhas a TODO comment:# TODO sort by foreach index. Without sorting,inputs[i]in a foreachjoin step returns whichever split the datastore resolved in arbitrary order,
making index-based access unreliable.
Fixes #3004
Changes Made
_sort_by_foreach_index()method toInputsclass that sortsself.flowsby_foreach_stack[-1].indexfor foreach joinsearly when
_foreach_stackis absent or emptytest/unit/test_inputs.pycovering:Testing
test/unit/test_inputs.pywith 8 test casescd test/unit && python -m pytest -vinputs[i].item == iafter the fixTrade-offs / Design Decisions
flow._datastore["_foreach_stack"]directly instead of goingthrough
flow.__getattr__to avoid caching side effects on the flow objecttry/exceptfor graceful handling of non-foreach joins rather thanchecking
join_type, since theInputsclass doesn't receive join type infolist.sort), so equal-index items (shouldn't happen inpractice) preserve their relative order
AI Disclosure
(Check if applicable — per Metaflow's AI Tool Usage Policy, you must be able
to explain every line of your code.)