Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
75ef8b5
feat(scheduler): add INFO logging for dataset-triggered DagRun creation
leossantos Mar 13, 2026
7b7e6a8
feat(scheduler): add deep diagnostic logging for dataset trigger deci…
leossantos Mar 16, 2026
5fbf772
chore(scheduler): Add [DEBUG DATASETS] tag to logs
leossantos Mar 16, 2026
735e145
fix(scheduler): guard against condition bypass when SerializedDagMode…
leossantos Mar 20, 2026
bed9938
fix(dag): remove missing_from_serialized after processing serialized …
leossantos Mar 20, 2026
e427302
test(models): add tests for DDRQ entries without SerializedDagModel
leossantos Mar 20, 2026
f59cec3
chore(scheduler): remove dataset debug logging
leossantos Mar 26, 2026
c626f65
fix(datasets): demote missing SerializedDagModel DDRQ log to debug
leossantos Mar 26, 2026
d089870
chore(datasets): debug-log when dataset condition passes in dags_need…
leossantos Mar 26, 2026
36105b1
docs: add bugfix newsfragment and dags_needing_dagruns dataset docstring
leossantos Mar 26, 2026
684fd2c
test(dag): persist DagModel before DatasetDagRunQueue in unit tests
leossantos Mar 27, 2026
2c70fa3
test(dag): pass serialized=True in dataset dags_needing_dagruns tests
leossantos Mar 30, 2026
7f25115
Apply suggestions from code review
leossantos Mar 31, 2026
3c67b9f
style(dag): normalize line endings in DDRQ serialized-DAG guard block
leossantos Mar 31, 2026
8561190
test(dag): align DDRQ missing-serialized log assertions with dag.py
leossantos Apr 1, 2026
e3198c9
test(dag): assert full DDRQ missing-serialized log line in caplog
leossantos Apr 1, 2026
e0425b5
chore: change missing serialized DAG log level from debug to info and…
leossantos Apr 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -4069,6 +4069,11 @@ def dags_needing_dagruns(cls, session: Session) -> tuple[Query, dict[str, tuple[
This will return a resultset of rows that is row-level-locked with a "SELECT ... FOR UPDATE" query,
you should ensure that any scheduling decisions are made in a single transaction -- as soon as the
transaction is committed it will be unlocked.

For dataset-triggered scheduling, Dags that have ``DatasetDagRunQueue`` rows but no matching
``SerializedDagModel`` row are omitted from the returned ``dataset_triggered_dag_info`` until
serialization exists; DDRQs are **not** deleted here so the scheduler can re-evaluate on a
later run.
"""
from airflow.models.serialized_dag import SerializedDagModel

Expand All @@ -4094,13 +4099,33 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses: dict) -> bool | None:
ser_dags = session.scalars(
select(SerializedDagModel).where(SerializedDagModel.dag_id.in_(dag_statuses.keys()))
).all()
ser_dag_ids = {s.dag_id for s in ser_dags}
missing_from_serialized = set(by_dag.keys()) - ser_dag_ids
if missing_from_serialized:
log.info(
"Dags have queued dataset events (DDRQs), but are not found in the serialized_dag table."
" — skipping Dag run creation: %s",
sorted(missing_from_serialized),
)
for dag_id in missing_from_serialized:
del by_dag[dag_id]
del dag_statuses[dag_id]
for ser_dag in ser_dags:
dag_id = ser_dag.dag_id
statuses = dag_statuses[dag_id]
dataset_condition = ser_dag.dag.timetable.dataset_condition

if not dag_ready(dag_id, cond=ser_dag.dag.timetable.dataset_condition, statuses=statuses):
if not dag_ready(dag_id, cond=dataset_condition, statuses=statuses):
del by_dag[dag_id]
del dag_statuses[dag_id]
else:
log.debug(
"Dataset condition satisfied: dag_id=%s, condition=%s, ddrq_uris=%s, ddrq_count=%d",
dag_id,
dataset_condition,
sorted(statuses.keys()),
len(statuses),
)
del dag_statuses
dataset_triggered_dag_info = {}
for dag_id, records in by_dag.items():
Expand Down
111 changes: 111 additions & 0 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3030,6 +3030,7 @@ def test_dags_needing_dagruns_datasets(self, dag_maker, session):
max_active_runs=1,
schedule=[dataset],
start_date=pendulum.now().add(days=-2),
serialized=True,
) as dag:
EmptyOperator(task_id="dummy")

Expand Down Expand Up @@ -3064,6 +3065,115 @@ def test_dags_needing_dagruns_datasets(self, dag_maker, session):
dag_models = query.all()
assert dag_models == [dag_model]

def test_dags_needing_dagruns_skips_ddrq_when_serialized_dag_missing(self, session, caplog):
"""DDRQ rows for a Dag without SerializedDagModel must be skipped (no dataset_triggered info).

Rows must remain in ``dataset_dag_run_queue`` so the scheduler can re-evaluate on a later
heartbeat once ``SerializedDagModel`` exists (``dags_needing_dagruns`` only drops them from
the in-memory candidate set, it does not delete ORM rows).
"""
orphan_dag_id = "ddr_q_no_serialized_dag"
session.add(DatasetModel(uri="dataset_for_orphan_ddrq"))
session.flush()
dataset_id = session.query(DatasetModel.id).filter_by(uri="dataset_for_orphan_ddrq").scalar()
session.add(
DagModel(
dag_id=orphan_dag_id,
max_active_tasks=1,
has_task_concurrency_limits=False,
next_dagrun=timezone.datetime(2038, 1, 1),
next_dagrun_create_after=timezone.datetime(2038, 1, 2),
is_active=True,
has_import_errors=False,
is_paused=False,
)
)
session.flush()
session.add(DatasetDagRunQueue(dataset_id=dataset_id, target_dag_id=orphan_dag_id))
session.flush()

with caplog.at_level(logging.DEBUG, logger="airflow.models.dag"):
_query, dataset_triggered_dag_info = DagModel.dags_needing_dagruns(session)

assert orphan_dag_id not in dataset_triggered_dag_info
assert (
"Dags have queued dataset events (DDRQs), but are not found in the serialized_dag table."
in caplog.text
)
assert orphan_dag_id in caplog.text
assert (
session.query(DatasetDagRunQueue)
.filter(DatasetDagRunQueue.target_dag_id == orphan_dag_id)
.count()
== 1
)

def test_dags_needing_dagruns_missing_serialized_warning_lists_sorted_dag_ids(self, session, caplog):
"""When multiple dags lack SerializedDagModel, the warning lists dag_ids sorted."""
session.add_all(
[
DatasetModel(uri="ds_ghost_z"),
DatasetModel(uri="ds_ghost_a"),
]
)
session.flush()
ds_z_id = session.query(DatasetModel.id).filter_by(uri="ds_ghost_z").scalar()
ds_a_id = session.query(DatasetModel.id).filter_by(uri="ds_ghost_a").scalar()
far = timezone.datetime(2038, 1, 1)
far_after = timezone.datetime(2038, 1, 2)
session.add_all(
[
DagModel(
dag_id="ghost_z",
max_active_tasks=1,
has_task_concurrency_limits=False,
next_dagrun=far,
next_dagrun_create_after=far_after,
is_active=True,
has_import_errors=False,
is_paused=False,
),
DagModel(
dag_id="ghost_a",
max_active_tasks=1,
has_task_concurrency_limits=False,
next_dagrun=far,
next_dagrun_create_after=far_after,
is_active=True,
has_import_errors=False,
is_paused=False,
),
]
)
session.flush()

session.add_all(
[
DatasetDagRunQueue(dataset_id=ds_z_id, target_dag_id="ghost_z"),
DatasetDagRunQueue(dataset_id=ds_a_id, target_dag_id="ghost_a"),
]
)
session.flush()

with caplog.at_level(logging.DEBUG, logger="airflow.models.dag"):
_query, dataset_triggered_dag_info = DagModel.dags_needing_dagruns(session)

assert "ghost_a" not in dataset_triggered_dag_info
assert "ghost_z" not in dataset_triggered_dag_info
msg = next(
r.message
for r in caplog.records
if "Dags have queued dataset events (DDRQs), but are not found in the serialized_dag table."
in r.message
)
assert msg.index("ghost_a") < msg.index("ghost_z")
assert (
session.query(DatasetDagRunQueue)
.filter(DatasetDagRunQueue.target_dag_id.in_(("ghost_a", "ghost_z")))
.count()
== 2
)

def test_dags_needing_dagruns_dataset_aliases(self, dag_maker, session):
# link dataset_alias hello_alias to dataset hello
dataset_model = DatasetModel(uri="hello")
Expand All @@ -3078,6 +3188,7 @@ def test_dags_needing_dagruns_dataset_aliases(self, dag_maker, session):
max_active_runs=1,
schedule=[DatasetAlias(name="hello_alias")],
start_date=pendulum.now().add(days=-2),
serialized=True,
):
EmptyOperator(task_id="dummy")

Expand Down
Loading