Conversation
|
I'm struggling to understand how the problem has been solved from reading the code. Can you explain your solution for preventing the starvation? |
Sure, Assume dags a, b A
|
6b09818 to
fe5462d
Compare
| if dag_model.exceeds_max_non_backfill: | ||
| self.log.warning( | ||
| "Dag run cannot be created; max active runs exceeded.", | ||
| dag_id=dag_model.dag_id, | ||
| max_active_runs=dag_model.max_active_runs, | ||
| active_runs=active_runs_of_dags.get(dag_model.dag_id), | ||
| ) | ||
| continue |
There was a problem hiding this comment.
is it right to remove this check?
Backfill has a special case as explained in the docs
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/backfill.html#concurrency-control
You can set max_active_runs on a backfill and it will control how many Dag runs in the backfill can run concurrently. Backfill max_active_runs is applied independently the Dag max_active_runs setting.
The idea is to give the backfill run the power to limit concurrency even if the scheduler can schedule it all at once.
There was a problem hiding this comment.
It is checked in the window query added, and so the data that gets here is after the query validated the non backfill runs
|
This will have to wait until 3.2.0 -- This touches the core and I don't want to hurry until 3.2.0 is out. We have 1200+ commits in 3.2.0 |
There was a problem hiding this comment.
Pull request overview
This PR addresses scheduler DAG run starvation by changing how queued DagRuns are selected for transition to RUNNING when max_active_runs is low and there are large backlogs of queued runs.
Changes:
- Update
DagRun.get_queued_dag_runs_to_set_running()to limit queued candidates per DAG/backfill using arow_number()window so the scheduler doesn’t spend its per-loop budget examining non-runnable runs from a single DAG. - Adjust scheduler unit tests to validate the improved fairness/scheduling behavior under backlog conditions.
- Remove a max-active guard/logging in
_create_dag_runs()and add a TODO note around_set_exceeds_max_active_runs.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
airflow-core/src/airflow/models/dagrun.py |
Changes queued dagrun selection query to avoid starvation by limiting per DAG/backfill candidates. |
airflow-core/src/airflow/jobs/scheduler_job_runner.py |
Removes a creation-time max-active skip and adds a TODO comment near _set_exceeds_max_active_runs. |
airflow-core/tests/unit/jobs/test_scheduler_job.py |
Updates/reshapes scheduling tests to assert non-starved behavior and updated run counts. |
| # now we finish all lower priority backfill tasks, and observe new higher priority tasks are started | ||
| session.execute( | ||
| update(DagRun) | ||
| .where(DagRun.dag_id == "test_dag2", DagRun.state == DagRunState.RUNNING) | ||
| .values(state=DagRunState.SUCCESS) | ||
| ) |
There was a problem hiding this comment.
This comment says you’re finishing “lower priority backfill tasks”, but the code updates DAG runs for test_dag2 (non-backfill) from RUNNING→SUCCESS. Please adjust the comment to match what the test is actually doing (finishing test_dag2 runs) to avoid confusion when maintaining the test expectations.
| .over(partition_by=[DagRun.dag_id, DagRun.backfill_id], order_by=DagRun.logical_date) | ||
| .label("rn"), | ||
| ) | ||
| .where(DagRun.state == DagRunState.QUEUED) |
There was a problem hiding this comment.
available_dagruns_rn uses row_number(... order_by=DagRun.logical_date), but logical_date is nullable and does not reflect the scheduler’s actual priority ordering (run_after, and for backfills BackfillDagRun.sort_ordinal, which can differ e.g. reverse backfills). This can cause the rn-capacity filter to pick a non-runnable/future run_after row (or wrong backfill ordinal) as rn=1 and then filter out runnable candidates, reintroducing starvation or breaking backfill ordering. Consider basing the window order_by on run_after with a deterministic tiebreaker (e.g. id), and for backfills incorporate BackfillDagRun.sort_ordinal (likely via computing the row_number after joining that table), and/or apply run_after <= now() in the window subquery so rn is computed only among runnable queued runs.
| .over(partition_by=[DagRun.dag_id, DagRun.backfill_id], order_by=DagRun.logical_date) | |
| .label("rn"), | |
| ) | |
| .where(DagRun.state == DagRunState.QUEUED) | |
| .over( | |
| partition_by=[DagRun.dag_id, DagRun.backfill_id], | |
| order_by=[BackfillDagRun.sort_ordinal, DagRun.run_after, DagRun.id], | |
| ) | |
| .label("rn"), | |
| ) | |
| .join( | |
| BackfillDagRun, | |
| DagRun.backfill_id == BackfillDagRun.id, | |
| isouter=True, | |
| ) | |
| .where( | |
| and_( | |
| DagRun.state == DagRunState.QUEUED, | |
| DagRun.run_after <= func.now(), | |
| ) | |
| ) |
kaxil
left a comment
There was a problem hiding this comment.
The core idea — using row_number() to cap how many queued runs per DAG/backfill pass through the query — is a reasonable approach to solving the starvation problem in get_queued_dag_runs_to_set_running. However, this PR bundles in unrelated and incorrect changes to the DagRun creation path (_create_dag_runs), which is a separate concern from the QUEUED→RUNNING promotion path.
See inline comments for details.
Sure, no problem |
a9c77cd to
c47e216
Compare
| .where(DagRun.dag_id == "test_dag2", DagRun.state == DagRunState.RUNNING) | ||
| .values(state=DagRunState.SUCCESS) | ||
| ) | ||
| session.commit() |
There was a problem hiding this comment.
Calling session.commit() inside a unit test can break transactional test isolation (fixtures that rely on nested transactions/rollbacks) and is usually unnecessary here since the updated rows are read again within the same session. Prefer removing the commit and relying on flush() (or keep it as a flush() only) so the state transition is visible without finalizing the transaction.
| session.commit() |
| dag_run = dag_maker.create_dagrun(state=State.RUNNING, session=session, run_type=DagRunType.SCHEDULED) | ||
|
|
||
| for _ in range(5): | ||
| # create a bunch of dagruns in queued state, to make sure they are filtered by max_active_runs |
There was a problem hiding this comment.
The comment says these are created in queued state, but the code sets state=State.RUNNING. If the intent is to pre-fill/exceed max_active_runs with RUNNING dagruns (which makes sense for this test), update the comment to match the actual setup to avoid confusion for future maintainers.
| # create a bunch of dagruns in queued state, to make sure they are filtered by max_active_runs | |
| # create a bunch of dagruns in running state, to exceed max_active_runs |
| DagRun.dag_id, | ||
| DagRun.id, | ||
| func.row_number() | ||
| .over(partition_by=[DagRun.dag_id, DagRun.backfill_id], order_by=DagRun.logical_date) |
There was a problem hiding this comment.
The window order_by uses only DagRun.logical_date, which may not be a total ordering (ties can occur), making the chosen dagruns nondeterministic across DB engines/plans and potentially causing flaky behavior. Add a stable tie-breaker (e.g., also order by DagRun.id or DagRun.run_after) so row-number assignment is deterministic within each (dag_id, backfill_id) partition.
| .over(partition_by=[DagRun.dag_id, DagRun.backfill_id], order_by=DagRun.logical_date) | |
| .over( | |
| partition_by=[DagRun.dag_id, DagRun.backfill_id], | |
| order_by=[DagRun.logical_date, DagRun.id], | |
| ) |
| func.row_number() | ||
| .over(partition_by=[DagRun.dag_id, DagRun.backfill_id], order_by=DagRun.logical_date) | ||
| .label("rn"), | ||
| ) |
There was a problem hiding this comment.
This computes row_number() across all queued dagruns before applying later eligibility filters (DagModel/Backfill joins, paused checks, etc.). On large installations with many queued dagruns, that full-table window can become a costly bottleneck. Consider pushing more predicates/joins into the same subquery/CTE used for the window (so the window runs only on eligible candidates), or otherwise narrowing the queued set prior to the window calculation.
| ) | |
| ) | |
| .join( | |
| DagModel, | |
| and_( | |
| DagModel.dag_id == DagRun.dag_id, | |
| DagModel.is_paused == false(), | |
| DagModel.is_stale == false(), | |
| ), | |
| ) |
| # this is because there are 30 dags, most of which get filtered due to max_active_runs | ||
| # and so due to the default dagruns to examine, we look at the first 20 dags which CAN be run | ||
| # according to the max_active_runs parameter, meaning 3 backfill runs will start, 1 non backfill and | ||
| # all dagruns of dag2 | ||
| # any runs for dag2 get started |
There was a problem hiding this comment.
These explanatory comments repeatedly say 'dags' where they appear to mean 'dagruns' (e.g., '30 dags', 'first 20 dags'), which makes the rationale hard to follow. Clarifying the terminology here (dag vs dagrun) would prevent misunderstanding when debugging scheduler selection behavior.
| # this is because there are 30 dags, most of which get filtered due to max_active_runs | |
| # and so due to the default dagruns to examine, we look at the first 20 dags which CAN be run | |
| # according to the max_active_runs parameter, meaning 3 backfill runs will start, 1 non backfill and | |
| # all dagruns of dag2 | |
| # any runs for dag2 get started | |
| # this is because there are 30 queued dagruns, many of which get filtered because their DAGs | |
| # have already reached max_active_runs | |
| # and so due to the default dagruns-to-examine limit, we look at the first 20 dagruns that CAN be run | |
| # according to the max_active_runs parameter, meaning 3 backfill runs will start, 1 non-backfill, | |
| # and all runnable dagruns for dag2 |
We have been experiencing severe dagrun starvation at our cluster, where when there were a lot of dagruns, and a low max_active_runs limit (hundreds to thousands runs with a limit in the 10s) this caused a lot of dags to get stuck in queued state without moving to running, causing those dagruns to timeout.
After investigation, we found that the reason was due to the _start_queued_dagruns method, where the query was returning dagruns which cannot be set to running due to the max_active_runs limit, meaning that other dagruns where starved.
A similar issue occurs when new dagruns are created in large batches (due to the nulls first), yet this is out of scope for the given pr, I will submit an additional PR soon.
closes #49508
Was generative AI tooling used to co-author this PR?
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.