Skip to content

Teardown Waiting for All in-scope Tasks to Complete#64181

Merged
jscheffl merged 6 commits intoapache:mainfrom
huashi-st:feature/teardown-waiting-for-all-in-scope-task-complete
Mar 25, 2026
Merged

Teardown Waiting for All in-scope Tasks to Complete#64181
jscheffl merged 6 commits intoapache:mainfrom
huashi-st:feature/teardown-waiting-for-all-in-scope-task-complete

Conversation

@huashi-st
Copy link
Copy Markdown
Contributor

@huashi-st huashi-st commented Mar 24, 2026

Fix teardown tasks firing before all in-scope tasks complete.

Previously, teardown only checked direct upstream states, so it could run while cleared or parallel in-scope tasks were still executing. adds _evaluate_teardown_scope() to verify all tasks between setup and teardown have reached a terminal state before proceeding.

closes: #29332


Test Dag

import time
from datetime import datetime
from airflow.sdk import DAG, task
@task
def setup_resource():
    print("Setting up resource")
@task
def task_fail():
    raise ValueError("I'm designed to fail quickly")
@task
def task_slow():
    print("Starting slow task...")
    time.sleep(120)
    print("Slow task done")
@task
def dummy():
    print("Dummy task")
@task
def teardown_resource():
    print("Tearing down resource")
with DAG(
    dag_id="test_teardown_wait",
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False,
) as dag:
    s = setup_resource()
    t_fail = task_fail()
    t_slow = task_slow()
    d = dummy()
    td = teardown_resource().as_teardown(setups=s)
    s >> [t_fail, t_slow] >> d >> td

Before (at the time task_fail failed) -

image

After (at the time task_fail failed) -

image
Was generative AI tooling used to co-author this PR?
  • Yes — Claude (Cursor)

Generated-by: Claude (Cursor) following the guidelines


  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

@morooshka
Copy link
Copy Markdown
Contributor

@huashi-st Ask please @jscheffl to review. Nice patch 👍

Copy link
Copy Markdown
Contributor

@Nataneljpwd Nataneljpwd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great!
Some minor improvements can be done, otherwise looks great

Copy link
Copy Markdown
Contributor

@Nataneljpwd Nataneljpwd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great!

Copy link
Copy Markdown
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Am a bit envy because I attempted a fix also (some time ago) and failed in making pytests work. But seems you mastered it! Thanks!

@jscheffl jscheffl added this to the Airflow 3.2.0 milestone Mar 25, 2026
@jscheffl jscheffl added area:Scheduler including HA (high availability) scheduler type:bug-fix Changelog: Bug Fixes labels Mar 25, 2026
@jscheffl jscheffl merged commit f7c5793 into apache:main Mar 25, 2026
81 of 82 checks passed
@boring-cyborg
Copy link
Copy Markdown

boring-cyborg bot commented Mar 25, 2026

Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions.

nailo2c pushed a commit to nailo2c/airflow that referenced this pull request Mar 30, 2026
* teardown run only when all the tasks between setup and teardown are complete

* calcate done in 1 line

* not needed task_id from query

* remove unnecessary if for rare cases.
vatsrahul1001 pushed a commit that referenced this pull request Mar 30, 2026
* teardown run only when all the tasks between setup and teardown are complete

* calcate done in 1 line

* not needed task_id from query

* remove unnecessary if for rare cases.

(cherry picked from commit f7c5793)
statuses = list(_evaluate_direct_relatives())
yield from statuses
if not statuses:
yield from _evaluate_teardown_scope()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

list() eagerly materializes the entire generator, which means all side effects inside _evaluate_direct_relatives() (including ti.set_state() on line 460) execute immediately rather than lazily. Before this PR, yield from _evaluate_direct_relatives() was lazy. This does not cause bugs today since callers consume all statuses, but a sentinel-based approach avoids the behavioral change:

found_failure = False
for status in _evaluate_direct_relatives():
    found_failure = True
    yield status
if not found_failure:
    yield from _evaluate_teardown_scope()

setup_obj = task.dag.get_task(setup_id)
in_scope_ids.update(indirect_upstream_ids & setup_obj.get_flat_relative_ids(upstream=False))

in_scope_tasks = {tid: task.dag.get_task(tid) for tid in in_scope_ids}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If in_scope_ids ends up empty after the intersection (e.g., setup has no indirect downstream overlap with teardown's indirect upstreams), this still iterates all finished TIs on line 642-646 to produce done=0, then expected=0, and exits without yielding. Adding if not in_scope_tasks: return here would skip that unnecessary work.

if not task.dag:
return

setup_task_ids = {t.task_id for t in task.upstream_list if t.is_setup}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when the teardown has no setup upstreams? setup_task_ids would be empty, so the for setup_id in setup_task_ids loop on line 636 never executes, in_scope_ids stays empty, and _evaluate_teardown_scope yields nothing. That's fine. But what about a teardown whose setups are not direct upstreams? upstream_list only returns direct upstream tasks. If a setup is an indirect upstream (connected through a chain rather than a direct edge), it won't be found here. Is that a valid DAG topology, or does as_teardown(setups=...) always create a direct edge?

t2 = EmptyOperator(task_id="t2")
t3 = EmptyOperator(task_id="t3")
teardown_task = EmptyOperator(task_id="teardown").as_teardown(setups=setup)
setup >> t1 >> t2 >> t3 >> teardown_task
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests all use linear chains (setup >> t1 >> t2 >> t3 >> teardown), but the bug in #29332 is about parallel branches where one fails and the other is still running. The PR description DAG (setup >> [t_fail, t_slow] >> d >> td) would be the most direct regression test. Worth adding a test with that DAG shape to verify the actual reported scenario.

Also missing: a test where an in-scope task has FAILED state. The current tests only use SUCCESS or None. The teardown should still proceed when in-scope tasks are in any terminal state (FAILED, UPSTREAM_FAILED, SKIPPED), not just SUCCESS. Without that test, someone could accidentally filter on SUCCESS-only and these tests wouldn't catch it.

)
assert len(dep_statuses) == 1
assert not dep_statuses[0].passed
assert "2 in-scope" in dep_statuses[0].reason
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert "2 in-scope" in dep_statuses[0].reason is fragile -- it couples the test to the exact error message format. If someone changes the message to say "2 tasks in-scope" or "in-scope: 2", this breaks. Testing not dep_statuses[0].passed is already sufficient for correctness; if you want to verify the count, parsing from structured data would be more durable.

Suraj-kumar00 pushed a commit to Suraj-kumar00/airflow that referenced this pull request Apr 7, 2026
* teardown run only when all the tasks between setup and teardown are complete

* calcate done in 1 line

* not needed task_id from query

* remove unnecessary if for rare cases.
abhijeets25012-tech pushed a commit to abhijeets25012-tech/airflow that referenced this pull request Apr 9, 2026
* teardown run only when all the tasks between setup and teardown are complete

* calcate done in 1 line

* not needed task_id from query

* remove unnecessary if for rare cases.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler type:bug-fix Changelog: Bug Fixes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ensure teardown doesn't run until all other tasks complete

5 participants