While testing the triggerer, my task is stuck in the deferred state.
2025-02-24 08:52:41 [info ] 0 triggers currently running [airflow.jobs.triggerer_job_runner]
2025-02-24 08:53:41 [info ] 1 triggers currently running [airflow.jobs.triggerer_job_runner]
2025-02-24 08:54:15 [debug ] State change from async process [supervisor] state=TriggerStateChanges(kind='TriggerStateChanges', events=[(1, TriggerEvent<'2025-02-24T08:54:14.989934Z'>)], failures=None, finished=[1])
2025-02-24 08:54:42 [info ] 1 triggers currently running [airflow.jobs.triggerer_job_runner]
[2025-02-24 08:54:51 +0000] [186] [INFO] Handling signal: winch
{"timestamp":"2025-02-24T08:53:15.005196","level":"warning","event":"DateTimeSensorAsync.execute cannot be called outside TaskInstance!","logger":"airflow.task.operators.airflow.providers.standard.sensors.date_time.DateTimeSensorAsync"}
{"timestamp":"2025-02-24T08:54:13.878274","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"}
{"timestamp":"2025-02-24T08:54:13.878274","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"}
{"timestamp":"2025-02-24T08:53:15.005618","level":"info","event":"Pausing task as DEFERRED. ","dag_id":"example_deferrable_dag","task_id":"wait_for_time","run_id":"manual__2025-02-24T08:53:11.758586+00:00_ck7eYD9z","logger":"task"}
{"timestamp":"2025-02-24T08:53:15.005738","level":"debug","event":"Sending request","json":"{\\"state\\":\\"deferred\\",\\"classpath\\":\\"airflow.providers.standard.triggers.temporal.DateTimeTrigger\\",\\"trigger_kwargs\\":{\\"moment\\":\\"2025-02-24T08:54:14.989934Z\\",\\"end_from_trigger\\":false},\\"next_method\\":\\"execute_complete\\",\\"trigger_timeout\\":null,\\"type\\":\\"DeferTask\\"}\
","logger":"task"}
{"timestamp":"2025-02-24T08:53:15.005767","level":"debug","event":"Running finalizers","ti":"RuntimeTaskInstance(id=UUID(\'01953729-0bdf-7608-8a90-2db778f86789\'), task_id=\'wait_for_time\', dag_id=\'example_deferrable_dag\', run_id=\'manual__2025-02-24T08:53:11.758586+00:00_ck7eYD9z\', try_number=1, map_index=-1, hostname=\'1a890cf9f894\', task=<Task(DateTimeSensorAsync): wait_for_time>, max_tries=0, start_date=datetime.datetime(2025, 2, 24, 8, 53, 14, 540673, tzinfo=TzInfo(UTC)))","logger":"task"}
{"timestamp":"2025-02-24T08:53:15.005899","level":"debug","event":"Calling \'before_stopping\' with {\'component\': <airflow.sdk.execution_time.task_runner.TaskRunnerMarker object at 0xffff7e19e640>}","logger":"airflow.listeners.listener"}
{"timestamp":"2025-02-24T08:53:15.005934","level":"debug","event":"Hook impls: []","logger":"airflow.listeners.listener"}
{"timestamp":"2025-02-24T08:53:15.005967","level":"debug","event":"Result from \'before_stopping\': []","logger":"airflow.listeners.listener"}')]
from airflow import DAG
from airflow.providers.standard.sensors.date_time import DateTimeSensor, DateTimeSensorAsync
from airflow.providers.standard.operators.empty import EmptyOperator
from pendulum import datetime
with DAG(
dag_id='example_deferrable_dag',
start_date=datetime(2023, 1, 1),
catchup=False,
tags=['example', 'deferrable'],
) as dag:
start = EmptyOperator(
task_id='start'
)
wait = DateTimeSensorAsync(
task_id='wait_for_time',
target_time="""{{ macros.datetime.utcnow() + macros.timedelta(minutes=1) }}"""
)
end = EmptyOperator(
task_id='end'
)
start >> wait >> end
Apache Airflow version
3.0.0a4
If "Other Airflow 2 version" selected, which one?
No response
What happened?
While testing the triggerer, my task is stuck in the deferred state.
Triggerer logs
Scheduler logs
What you think should happen instead?
Task should resume from deferred mode
How to reproduce
Operating System
linux
Versions of Apache Airflow Providers
No response
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
Code of Conduct