Skip to content

Guard against null trigger in asset watcher cleanup which cause dag processor crash#64659

Merged
vatsrahul1001 merged 1 commit intomainfrom
fix/asset-watcher-null-trigger-guard
Apr 3, 2026
Merged

Guard against null trigger in asset watcher cleanup which cause dag processor crash#64659
vatsrahul1001 merged 1 commit intomainfrom
fix/asset-watcher-null-trigger-guard

Conversation

@vatsrahul1001
Copy link
Copy Markdown
Contributor

Summary

Guard against watcher.trigger being None in add_asset_trigger_references when filtering
watchers during the removal phase.
This can happen when Trigger.clean_unused() (in the triggerer) deletes a trigger row between
parsing loops, leaving an AssetWatcherModel with a dangling reference. The SQLAlchemy
relationship resolves to None, causing AttributeError: 'NoneType' object has no attribute 'classpath'
and crashing the dag processor.
Watchers with deleted triggers are correctly dropped by the filter since they're orphaned.


Traceback (most recent call last):
  File "/usr/python/bin/airflow", line 10, in <module>
    sys.exit(main())
  File "/opt/airflow/airflow-core/src/airflow/__main__.py", line 55, in main
    args.func(args)
  File "/opt/airflow/airflow-core/src/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
  File "/opt/airflow/airflow-core/src/airflow/utils/memray_utils.py", line 59, in wrapper
    return func(*args, **kwargs)
  File "/opt/airflow/airflow-core/src/airflow/utils/cli.py", line 113, in wrapper
    return f(*args, **kwargs)
  File "/opt/airflow/airflow-core/src/airflow/utils/providers_configuration_loader.py", line 54, in wrapped_function
    return func(*args, **kwargs)
  File "/opt/airflow/airflow-core/src/airflow/cli/commands/dag_processor_command.py", line 64, in dag_processor
    run_command_with_daemon_option(
  File "/opt/airflow/airflow-core/src/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option
    callback()
  File "/opt/airflow/airflow-core/src/airflow/cli/commands/dag_processor_command.py", line 67, in <lambda>
    callback=lambda: run_job(job=job_runner.job, execute_callable=job_runner._execute),
  File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 100, in wrapper
    return func(*args, session=session, **kwargs)  # type: ignore[arg-type]
  File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 355, in run_job
    return execute_job(job, execute_callable=execute_callable)
  File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 384, in execute_job
    ret = execute_callable()
  File "/opt/airflow/airflow-core/src/airflow/jobs/dag_processor_job_runner.py", line 61, in _execute
    self.processor.run()
  File "/opt/airflow/airflow-core/src/airflow/dag_processing/manager.py", line 334, in run
    return self._run_parsing_loop()
  File "/opt/airflow/airflow-core/src/airflow/dag_processing/manager.py", line 441, in _run_parsing_loop
    self._collect_results()
  File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 100, in wrapper
    return func(*args, session=session, **kwargs)  # type: ignore[arg-type]
  File "/opt/airflow/airflow-core/src/airflow/dag_processing/manager.py", line 948, in _collect_results
    self._file_stats[file] = process_parse_results(
  File "/opt/airflow/airflow-core/src/airflow/dag_processing/manager.py", line 1347, in process_parse_results
    update_dag_parsing_results_in_db(
  File "/opt/airflow/airflow-core/src/airflow/dag_processing/collection.py", line 463, in update_dag_parsing_results_in_db
    for attempt in run_with_db_retries(logger=log):
  File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line 438, in __iter__
    do = self.iter(retry_state=retry_state)
  File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line 371, in iter
    result = action(retry_state)
  File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line 393, in <lambda>
    self._add_action_func(lambda rs: rs.outcome.result())
  File "/usr/python/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/usr/python/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/opt/airflow/airflow-core/src/airflow/dag_processing/collection.py", line 473, in update_dag_parsing_results_in_db
    SerializedDAG.bulk_write_to_db(
  File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 98, in wrapper
    return func(*args, **kwargs)
  File "/opt/airflow/airflow-core/src/airflow/serialization/definitions/dag.py", line 221, in bulk_write_to_db
    asset_op.add_asset_trigger_references(orm_assets, session=session)
  File "/opt/airflow/airflow-core/src/airflow/dag_processing/collection.py", line 1101, in add_asset_trigger_references
    asset_model.watchers = [
  File "/opt/airflow/airflow-core/src/airflow/dag_processing/collection.py", line 1104, in <listcomp>
    if BaseEventTrigger.hash(watcher.trigger.classpath, watcher.trigger.kwargs)
AttributeError: 'NoneType' object has no attribute 'classpath'
  • Yes (please specify the tool below)

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

@vatsrahul1001 vatsrahul1001 added the backport-to-v3-2-test Mark PR with this label to backport to v3-2-test branch label Apr 3, 2026
@vatsrahul1001 vatsrahul1001 added this to the Airflow 3.2.0 milestone Apr 3, 2026
Copy link
Copy Markdown
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, thanks!

@vatsrahul1001 vatsrahul1001 merged commit 1ec9d8a into main Apr 3, 2026
41 checks passed
@vatsrahul1001 vatsrahul1001 deleted the fix/asset-watcher-null-trigger-guard branch April 3, 2026 07:01
github-actions bot pushed a commit that referenced this pull request Apr 3, 2026
(cherry picked from commit 1ec9d8a)

Co-authored-by: Rahul Vats <43964496+vatsrahul1001@users.noreply.github.com>
@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 3, 2026

Backport successfully created: v3-2-test

Note: As of Merging PRs targeted for Airflow 3.X
the committer who merges the PR is responsible for backporting the PRs that are bug fixes (generally speaking) to the maintenance branches.

In matter of doubt please ask in #release-management Slack channel.

Status Branch Result
v3-2-test PR Link

github-actions bot pushed a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Apr 3, 2026
…he#64659)

(cherry picked from commit 1ec9d8a)

Co-authored-by: Rahul Vats <43964496+vatsrahul1001@users.noreply.github.com>
vatsrahul1001 added a commit that referenced this pull request Apr 3, 2026
…) (#64660)

(cherry picked from commit 1ec9d8a)

Co-authored-by: Rahul Vats <43964496+vatsrahul1001@users.noreply.github.com>
vatsrahul1001 added a commit that referenced this pull request Apr 8, 2026
…) (#64660)

(cherry picked from commit 1ec9d8a)

Co-authored-by: Rahul Vats <43964496+vatsrahul1001@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:DAG-processing backport-to-v3-2-test Mark PR with this label to backport to v3-2-test branch

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants