Skip to content

Correctly support resuming tasks after triggers#47061

Merged
ashb merged 3 commits intomainfrom
resume-deferred-operator-tasksdk
Feb 25, 2025
Merged

Correctly support resuming tasks after triggers#47061
ashb merged 3 commits intomainfrom
resume-deferred-operator-tasksdk

Conversation

@ashb
Copy link
Copy Markdown
Member

@ashb ashb commented Feb 25, 2025

There was a number of issues here that prevented us from correctly resuming a
task after it's trigger fired.

First off, only ever ran the execute method and we didn't respect next_method.
So we added that info the the TI context we send in the server in response
to the .../run endpoint.

Once we'd fixed that, we then ran into the net problem that we were
incorrectly setting the same value for trigger_kwargs (which are the kwargs we
pass to the trigger constructor) and to the next_kwargs (which are the kwargs
to use when resuming the task) -- they needed to be different. This involved
adding the new field (kwargs) onto the TIDeferredStatePayload.

The next complication after that was the "ExtendedJSON" type on the
next_kwargs column of TI: this is a type decorator that automatically applies
the BaseSerialization encode/decode step (__var and __type etc). The problem
with that is that we need to do the serialization on the client in order to
send a JSON HTTP request, so we don't want to encode again on the server
ideally. I was able to do that easily on the write/update side but not so
easily on the read side -- there I left a comment and for now we will hae SQLA
decode it for us, and then we have to encode it again. Not the best, but not a
disaster either.

The other change I did here was to have the DeferTask automatically apply the
serde encoding when serializing, just so that there are fewer places in the
code that need to be aware of that detail (So the Task subprocess will encode
it before making the request toe the Supervisor, and in the Supervisor it will be
kept encoded and passed along as is to it's HTTP request). This means you can
once again pass datetime objects to a trigger "natively", not only strings.

For consistency with the user facing code I renamed next_method on
DeferTask message to method_name. I'm not sure this really makes sense on
in the API request to the API server though.

Closes #47013

Testing:

I tested with this dag

from airflow import DAG
from airflow.providers.standard.sensors.date_time import DateTimeSensorAsync
from airflow.providers.standard.sensors.time_delta import WaitSensor

with DAG("trigger_test"):
    wait = DateTimeSensorAsync(
        task_id="wait_for_time",
        target_time="""{{ macros.datetime.utcnow() + macros.timedelta(minutes=1) }}""",
    )

And the task state now goes to success, and showing a snippet of task logs.

The initial start + defer:

{"timestamp":"2025-02-25T14:44:01.823199","level":"info","event":"DAG bundles loaded: dags-folder","logger":"airflow.dag_processing.bundles.manager.DagBundlesManager"}
{"timestamp":"2025-02-25T14:44:01.823360","level":"info","event":"Filling up the DagBag from /files/dags/kafka_test.py","logger":"airflow.models.dagbag.DagBag"}
{"timestamp":"2025-02-25T14:44:01.823732","level":"debug","event":"Importing /files/dags/kafka_test.py","logger":"airflow.models.dagbag.DagBag"}
{"timestamp":"2025-02-25T14:44:01.828547","level":"debug","event":"Loaded DAG <DAG: trigger_test>","logger":"airflow.models.dagbag.DagBag"}
{"timestamp":"2025-02-25T14:44:01.828767","level":"debug","event":"DAG file parsed","file":"kafka_test.py","logger":"task"}
{"timestamp":"2025-02-25T14:44:01.853442","level":"info","event":"Pausing task as DEFERRED. ","dag_id":"trigger_test","task_id":"wait_for_time","run_id":"manual__2025-02-25T14:44:01.007863+00:00_xeVPbb1o","logger":"task"}
{"timestamp":"2025-02-25T14:44:01.853690","level":"debug","event":"Sending request","json":"{\"state\":\"deferred\",\"classpath\":\"airflow.providers.standard.triggers.temporal.DateTimeTrigger\",\"trigger_kwargs\":{\"__var\":{\"moment\":{\"__var\":1740494701.834626,\"__type\":\"datetime\"},\"end_from_trigger\":false},\"__type\":\"dict\"},\"trigger_timeout\":null,\"method_name\":\"execute_complete\",\"kwargs\":{\"__var\":{},\"__type\":\"dict\"},\"type\":\"DeferTask\"}\n","logger":"task"}

Here's the bit from the trigger:

{"timestamp":"2025-02-25T14:44:03.133222","level":"info","event":"trigger trigger_test/manual__2025-02-25T14:44:01.007863+00:00_xeVPbb1o/wait_for_time/-1/1 (ID 6) starting"}
{"timestamp":"2025-02-25T14:44:03.133478","level":"info","event":"trigger starting","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"}
{"timestamp":"2025-02-25T14:44:03.135572","level":"info","event":"58 seconds remaining; sleeping 10 seconds","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"}
{"timestamp":"2025-02-25T14:44:13.138648","level":"info","event":"48 seconds remaining; sleeping 10 seconds","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"}
{"timestamp":"2025-02-25T14:44:23.140705","level":"info","event":"38 seconds remaining; sleeping 10 seconds","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"}
{"timestamp":"2025-02-25T14:44:33.145052","level":"info","event":"28 seconds remaining; sleeping 10 seconds","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"}
{"timestamp":"2025-02-25T14:44:43.151867","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"}
{"timestamp":"2025-02-25T14:44:44.158527","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"}
{"timestamp":"2025-02-25T14:44:45.164749","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"}
{"timestamp":"2025-02-25T14:44:46.166918","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"}
{"timestamp":"2025-02-25T14:44:47.173182","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"}
{"timestamp":"2025-02-25T14:44:48.175339","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"}
{"timestamp":"2025-02-25T14:44:49.180447","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"}
{"timestamp":"2025-02-25T14:44:50.186073","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"}
{"timestamp":"2025-02-25T14:44:51.190796","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"}
{"timestamp":"2025-02-25T14:44:52.194850","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"}
{"timestamp":"2025-02-25T14:44:53.196601","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"}
{"timestamp":"2025-02-25T14:44:54.203250","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"}
{"timestamp":"2025-02-25T14:44:55.209000","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"}
{"timestamp":"2025-02-25T14:44:56.213201","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"}
{"timestamp":"2025-02-25T14:44:57.215168","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"}
{"timestamp":"2025-02-25T14:44:58.218028","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"}
{"timestamp":"2025-02-25T14:44:59.221094","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"}
{"timestamp":"2025-02-25T14:45:00.221974","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"}
{"timestamp":"2025-02-25T14:45:01.223881","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"}
{"timestamp":"2025-02-25T14:45:02.227491","level":"info","event":"yielding event with payload DateTime(2025, 2, 25, 14, 45, 1, 834626, tzinfo=Timezone('UTC'))","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"}
{"timestamp":"2025-02-25T14:45:02.228959","level":"info","event":"Trigger fired event","name":"trigger_test/manual__2025-02-25T14:44:01.007863+00:00_xeVPbb1o/wait_for_time/-1/1 (ID 6)","result":"TriggerEvent<DateTime(2025, 2, 25, 14, 45, 1, 834626, tzinfo=Timezone('UTC'))>"}
{"timestamp":"2025-02-25T14:45:02.230003","level":"info","event":"trigger completed","name":"trigger_test/manual__2025-02-25T14:44:01.007863+00:00_xeVPbb1o/wait_for_time/-1/1 (ID 6)"}

And back on the worker:

{"timestamp":"2025-02-25T14:45:03.180834","level":"info","event":"DAG bundles loaded: dags-folder","logger":"airflow.dag_processing.bundles.manager.DagBundlesManager"}
{"timestamp":"2025-02-25T14:45:03.181018","level":"info","event":"Filling up the DagBag from /files/dags/kafka_test.py","logger":"airflow.models.dagbag.DagBag"}
{"timestamp":"2025-02-25T14:45:03.181425","level":"debug","event":"Importing /files/dags/kafka_test.py","logger":"airflow.models.dagbag.DagBag"}
{"timestamp":"2025-02-25T14:45:03.187019","level":"debug","event":"Loaded DAG <DAG: trigger_test>","logger":"airflow.models.dagbag.DagBag"}
{"timestamp":"2025-02-25T14:45:03.187235","level":"debug","event":"DAG file parsed","file":"kafka_test.py","logger":"task"}
{"timestamp":"2025-02-25T14:45:03.194373","level":"debug","event":"Sending request","json":"{\"rendered_fields\":{\"target_time\":\"2025-02-25 14:46:03.194255\"},\"type\":\"SetRenderedFields\"}\n","logger":"task"}
{"timestamp":"2025-02-25T14:45:03.194493","level":"debug","event":"Calling 'on_task_instance_running' with {'previous_state': <TaskInstanceState.QUEUED: 'queued'>, 'task_instance': RuntimeTaskInstance(id=UUID('01953d90-9785-790b-99e0-535a92b6f179'), task_id='wait_for_time', dag_id='trigger_test', run_id='manual__2025-02-25T14:44:01.007863+00:00_xeVPbb1o', try_number=1, map_index=-1, hostname='d250d05894d8', task=<Task(DateTimeSensorAsync): wait_for_time>, max_tries=0, start_date=datetime.datetime(2025, 2, 25, 14, 45, 3, 164729, tzinfo=TzInfo(UTC)))}","logger":"airflow.listeners.listener"}
{"timestamp":"2025-02-25T14:45:03.194523","level":"debug","event":"Hook impls: []","logger":"airflow.listeners.listener"}
{"timestamp":"2025-02-25T14:45:03.194548","level":"debug","event":"Result from 'on_task_instance_running': []","logger":"airflow.listeners.listener"}
{"timestamp":"2025-02-25T14:45:03.194723","level":"debug","event":"Sending request","json":"{\"state\":\"success\",\"end_date\":\"2025-02-25T14:45:03.194658Z\",\"task_outlets\":[],\"outlet_events\":[],\"type\":\"SucceedTask\"}\n","logger":"task"}

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added area:API Airflow's REST/HTTP API area:task-sdk labels Feb 25, 2025
@ashb ashb requested a review from amoghrajesh February 25, 2025 14:29
@ashb ashb force-pushed the resume-deferred-operator-tasksdk branch from cecb510 to 62bcbea Compare February 25, 2025 14:51
Copy link
Copy Markdown
Contributor

@amoghrajesh amoghrajesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I have some comments / qns but its looking nice generally.

@ashb ashb force-pushed the resume-deferred-operator-tasksdk branch from 62bcbea to b890aa3 Compare February 25, 2025 16:29
@ashb ashb requested a review from uranusjr as a code owner February 25, 2025 16:29
@ashb
Copy link
Copy Markdown
Member Author

ashb commented Feb 25, 2025

I have also moved over the self.defer method from models.baseoperator to sdk's BaseOperator

@ashb ashb force-pushed the resume-deferred-operator-tasksdk branch from 19277ff to 857b22a Compare February 25, 2025 17:06
ashb and others added 2 commits February 25, 2025 21:02
There was a number of issues here that prevented us from correctly resuming a
task after it's trigger fired.

First off, only ever ran the execute method and we didn't respect next_method.
So we added that info the the TI context we send in the server in response
to the `.../run` endpoint.

Once we'd fixed that, we then ran into the net problem that we were
incorrectly setting the same value for trigger_kwargs (which are the kwargs we
pass to the trigger constructor) and to the next_kwargs (which are the kwargs
to use when resuming the task) -- they needed to be different. This involved
adding the new field (`kwargs`) onto the TIDeferredStatePayload.

The next complication after that was the "ExtendedJSON" type on the
next_kwargs column of TI: this is a type decorator that automatically applies
the BaseSerialization encode/decode step (__var and __type etc). The problem
with that is that we need to do the serialization on the client in order to
send a JSON HTTP request, so we don't want to encode _again_ on the server
ideally. I was able to do that easily on the write/update side but not so
easily on the read side -- there I left a comment and for now we will hae SQLA
decode it for us, and then we have to encode it again. Not the best, but not a
disaster either.

The other change I did here was to have the DeferTask automatically apply the
serde encoding when serializing, just so that there are fewer places in the
code that need to be aware of that detail (So the Task subprocess will encode
it before making the request toe the Supervisor, and in the Supervisor it will be
kept encoded and passed along as is to it's HTTP request). This means you can
once again pass datetime objects to a trigger "natively", not only strings.

For consistency with the user facing code I renamed `next_method` on
`DeferTask` message to `method_name`. I'm not sure this really makes sense on
in the API request to the API server though.
Co-authored-by: Amogh Desai <amoghrajesh1999@gmail.com>
@ashb ashb force-pushed the resume-deferred-operator-tasksdk branch from 857b22a to f292056 Compare February 25, 2025 21:02
Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
@ashb ashb merged commit fbbe59a into main Feb 25, 2025
63 checks passed
@ashb ashb deleted the resume-deferred-operator-tasksdk branch February 25, 2025 22:11
ambika-garg pushed a commit to ambika-garg/airflow that referenced this pull request Feb 28, 2025
There was a number of issues here that prevented us from correctly resuming a
task after it's trigger fired.

First off, only ever ran the execute method and we didn't respect next_method.
So we added that info the the TI context we send in the server in response
to the `.../run` endpoint.

Once we'd fixed that, we then ran into the net problem that we were
incorrectly setting the same value for trigger_kwargs (which are the kwargs we
pass to the trigger constructor) and to the next_kwargs (which are the kwargs
to use when resuming the task) -- they needed to be different. This involved
adding the new field (`kwargs`) onto the TIDeferredStatePayload.

The next complication after that was the "ExtendedJSON" type on the
next_kwargs column of TI: this is a type decorator that automatically applies
the BaseSerialization encode/decode step (__var and __type etc). The problem
with that is that we need to do the serialization on the client in order to
send a JSON HTTP request, so we don't want to encode _again_ on the server
ideally. I was able to do that easily on the write/update side but not so
easily on the read side -- there I left a comment and for now we will hae SQLA
decode it for us, and then we have to encode it again. Not the best, but not a
disaster either.

The other change I did here was to have the DeferTask automatically apply the
serde encoding when serializing, just so that there are fewer places in the
code that need to be aware of that detail (So the Task subprocess will encode
it before making the request toe the Supervisor, and in the Supervisor it will be
kept encoded and passed along as is to it's HTTP request). This means you can
once again pass datetime objects to a trigger "natively", not only strings.

For consistency with the user facing code I renamed `next_method` on
`DeferTask` message to `method_name`. I'm not sure this really makes sense on
in the API request to the API server though.

Co-authored-by: Amogh Desai <amoghrajesh1999@gmail.com>
Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
nailo2c pushed a commit to nailo2c/airflow that referenced this pull request Apr 4, 2025
There was a number of issues here that prevented us from correctly resuming a
task after it's trigger fired.

First off, only ever ran the execute method and we didn't respect next_method.
So we added that info the the TI context we send in the server in response
to the `.../run` endpoint.

Once we'd fixed that, we then ran into the net problem that we were
incorrectly setting the same value for trigger_kwargs (which are the kwargs we
pass to the trigger constructor) and to the next_kwargs (which are the kwargs
to use when resuming the task) -- they needed to be different. This involved
adding the new field (`kwargs`) onto the TIDeferredStatePayload.

The next complication after that was the "ExtendedJSON" type on the
next_kwargs column of TI: this is a type decorator that automatically applies
the BaseSerialization encode/decode step (__var and __type etc). The problem
with that is that we need to do the serialization on the client in order to
send a JSON HTTP request, so we don't want to encode _again_ on the server
ideally. I was able to do that easily on the write/update side but not so
easily on the read side -- there I left a comment and for now we will hae SQLA
decode it for us, and then we have to encode it again. Not the best, but not a
disaster either.

The other change I did here was to have the DeferTask automatically apply the
serde encoding when serializing, just so that there are fewer places in the
code that need to be aware of that detail (So the Task subprocess will encode
it before making the request toe the Supervisor, and in the Supervisor it will be
kept encoded and passed along as is to it's HTTP request). This means you can
once again pass datetime objects to a trigger "natively", not only strings.

For consistency with the user facing code I renamed `next_method` on
`DeferTask` message to `method_name`. I'm not sure this really makes sense on
in the API request to the API server though.

Co-authored-by: Amogh Desai <amoghrajesh1999@gmail.com>
Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:API Airflow's REST/HTTP API area:task-sdk

Projects

None yet

Development

Successfully merging this pull request may close these issues.

DateTimeSensorAsync task stuck in deferred mode

3 participants