Teardown Waiting for All in-scope Tasks to Complete#64181
Conversation
|
@huashi-st Ask please @jscheffl to review. Nice patch 👍 |
Nataneljpwd
left a comment
There was a problem hiding this comment.
Looks great!
Some minor improvements can be done, otherwise looks great
jscheffl
left a comment
There was a problem hiding this comment.
Looks good! Am a bit envy because I attempted a fix also (some time ago) and failed in making pytests work. But seems you mastered it! Thanks!
|
Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions. |
* teardown run only when all the tasks between setup and teardown are complete * calcate done in 1 line * not needed task_id from query * remove unnecessary if for rare cases.
* teardown run only when all the tasks between setup and teardown are complete * calcate done in 1 line * not needed task_id from query * remove unnecessary if for rare cases. (cherry picked from commit f7c5793)
| statuses = list(_evaluate_direct_relatives()) | ||
| yield from statuses | ||
| if not statuses: | ||
| yield from _evaluate_teardown_scope() |
There was a problem hiding this comment.
list() eagerly materializes the entire generator, which means all side effects inside _evaluate_direct_relatives() (including ti.set_state() on line 460) execute immediately rather than lazily. Before this PR, yield from _evaluate_direct_relatives() was lazy. This does not cause bugs today since callers consume all statuses, but a sentinel-based approach avoids the behavioral change:
found_failure = False
for status in _evaluate_direct_relatives():
found_failure = True
yield status
if not found_failure:
yield from _evaluate_teardown_scope()| setup_obj = task.dag.get_task(setup_id) | ||
| in_scope_ids.update(indirect_upstream_ids & setup_obj.get_flat_relative_ids(upstream=False)) | ||
|
|
||
| in_scope_tasks = {tid: task.dag.get_task(tid) for tid in in_scope_ids} |
There was a problem hiding this comment.
If in_scope_ids ends up empty after the intersection (e.g., setup has no indirect downstream overlap with teardown's indirect upstreams), this still iterates all finished TIs on line 642-646 to produce done=0, then expected=0, and exits without yielding. Adding if not in_scope_tasks: return here would skip that unnecessary work.
| if not task.dag: | ||
| return | ||
|
|
||
| setup_task_ids = {t.task_id for t in task.upstream_list if t.is_setup} |
There was a problem hiding this comment.
What happens when the teardown has no setup upstreams? setup_task_ids would be empty, so the for setup_id in setup_task_ids loop on line 636 never executes, in_scope_ids stays empty, and _evaluate_teardown_scope yields nothing. That's fine. But what about a teardown whose setups are not direct upstreams? upstream_list only returns direct upstream tasks. If a setup is an indirect upstream (connected through a chain rather than a direct edge), it won't be found here. Is that a valid DAG topology, or does as_teardown(setups=...) always create a direct edge?
| t2 = EmptyOperator(task_id="t2") | ||
| t3 = EmptyOperator(task_id="t3") | ||
| teardown_task = EmptyOperator(task_id="teardown").as_teardown(setups=setup) | ||
| setup >> t1 >> t2 >> t3 >> teardown_task |
There was a problem hiding this comment.
The tests all use linear chains (setup >> t1 >> t2 >> t3 >> teardown), but the bug in #29332 is about parallel branches where one fails and the other is still running. The PR description DAG (setup >> [t_fail, t_slow] >> d >> td) would be the most direct regression test. Worth adding a test with that DAG shape to verify the actual reported scenario.
Also missing: a test where an in-scope task has FAILED state. The current tests only use SUCCESS or None. The teardown should still proceed when in-scope tasks are in any terminal state (FAILED, UPSTREAM_FAILED, SKIPPED), not just SUCCESS. Without that test, someone could accidentally filter on SUCCESS-only and these tests wouldn't catch it.
| ) | ||
| assert len(dep_statuses) == 1 | ||
| assert not dep_statuses[0].passed | ||
| assert "2 in-scope" in dep_statuses[0].reason |
There was a problem hiding this comment.
assert "2 in-scope" in dep_statuses[0].reason is fragile -- it couples the test to the exact error message format. If someone changes the message to say "2 tasks in-scope" or "in-scope: 2", this breaks. Testing not dep_statuses[0].passed is already sufficient for correctness; if you want to verify the count, parsing from structured data would be more durable.
* teardown run only when all the tasks between setup and teardown are complete * calcate done in 1 line * not needed task_id from query * remove unnecessary if for rare cases.
* teardown run only when all the tasks between setup and teardown are complete * calcate done in 1 line * not needed task_id from query * remove unnecessary if for rare cases.
Fix teardown tasks firing before all in-scope tasks complete.
Previously, teardown only checked direct upstream states, so it could run while cleared or parallel in-scope tasks were still executing. adds
_evaluate_teardown_scope()to verify all tasks between setup and teardown have reached a terminal state before proceeding.closes: #29332
Test Dag
Before (at the time
task_failfailed) -After (at the time
task_failfailed) -Was generative AI tooling used to co-author this PR?
Generated-by: Claude (Cursor) following the guidelines
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.