Correctly support resuming tasks after triggers#47061
Merged
Conversation
cecb510 to
62bcbea
Compare
amoghrajesh
reviewed
Feb 25, 2025
Contributor
amoghrajesh
left a comment
There was a problem hiding this comment.
Good catch! I have some comments / qns but its looking nice generally.
62bcbea to
b890aa3
Compare
Member
Author
|
I have also moved over the |
19277ff to
857b22a
Compare
There was a number of issues here that prevented us from correctly resuming a task after it's trigger fired. First off, only ever ran the execute method and we didn't respect next_method. So we added that info the the TI context we send in the server in response to the `.../run` endpoint. Once we'd fixed that, we then ran into the net problem that we were incorrectly setting the same value for trigger_kwargs (which are the kwargs we pass to the trigger constructor) and to the next_kwargs (which are the kwargs to use when resuming the task) -- they needed to be different. This involved adding the new field (`kwargs`) onto the TIDeferredStatePayload. The next complication after that was the "ExtendedJSON" type on the next_kwargs column of TI: this is a type decorator that automatically applies the BaseSerialization encode/decode step (__var and __type etc). The problem with that is that we need to do the serialization on the client in order to send a JSON HTTP request, so we don't want to encode _again_ on the server ideally. I was able to do that easily on the write/update side but not so easily on the read side -- there I left a comment and for now we will hae SQLA decode it for us, and then we have to encode it again. Not the best, but not a disaster either. The other change I did here was to have the DeferTask automatically apply the serde encoding when serializing, just so that there are fewer places in the code that need to be aware of that detail (So the Task subprocess will encode it before making the request toe the Supervisor, and in the Supervisor it will be kept encoded and passed along as is to it's HTTP request). This means you can once again pass datetime objects to a trigger "natively", not only strings. For consistency with the user facing code I renamed `next_method` on `DeferTask` message to `method_name`. I'm not sure this really makes sense on in the API request to the API server though.
Co-authored-by: Amogh Desai <amoghrajesh1999@gmail.com>
857b22a to
f292056
Compare
jedcunningham
approved these changes
Feb 25, 2025
Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
ambika-garg
pushed a commit
to ambika-garg/airflow
that referenced
this pull request
Feb 28, 2025
There was a number of issues here that prevented us from correctly resuming a task after it's trigger fired. First off, only ever ran the execute method and we didn't respect next_method. So we added that info the the TI context we send in the server in response to the `.../run` endpoint. Once we'd fixed that, we then ran into the net problem that we were incorrectly setting the same value for trigger_kwargs (which are the kwargs we pass to the trigger constructor) and to the next_kwargs (which are the kwargs to use when resuming the task) -- they needed to be different. This involved adding the new field (`kwargs`) onto the TIDeferredStatePayload. The next complication after that was the "ExtendedJSON" type on the next_kwargs column of TI: this is a type decorator that automatically applies the BaseSerialization encode/decode step (__var and __type etc). The problem with that is that we need to do the serialization on the client in order to send a JSON HTTP request, so we don't want to encode _again_ on the server ideally. I was able to do that easily on the write/update side but not so easily on the read side -- there I left a comment and for now we will hae SQLA decode it for us, and then we have to encode it again. Not the best, but not a disaster either. The other change I did here was to have the DeferTask automatically apply the serde encoding when serializing, just so that there are fewer places in the code that need to be aware of that detail (So the Task subprocess will encode it before making the request toe the Supervisor, and in the Supervisor it will be kept encoded and passed along as is to it's HTTP request). This means you can once again pass datetime objects to a trigger "natively", not only strings. For consistency with the user facing code I renamed `next_method` on `DeferTask` message to `method_name`. I'm not sure this really makes sense on in the API request to the API server though. Co-authored-by: Amogh Desai <amoghrajesh1999@gmail.com> Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
2 tasks
nailo2c
pushed a commit
to nailo2c/airflow
that referenced
this pull request
Apr 4, 2025
There was a number of issues here that prevented us from correctly resuming a task after it's trigger fired. First off, only ever ran the execute method and we didn't respect next_method. So we added that info the the TI context we send in the server in response to the `.../run` endpoint. Once we'd fixed that, we then ran into the net problem that we were incorrectly setting the same value for trigger_kwargs (which are the kwargs we pass to the trigger constructor) and to the next_kwargs (which are the kwargs to use when resuming the task) -- they needed to be different. This involved adding the new field (`kwargs`) onto the TIDeferredStatePayload. The next complication after that was the "ExtendedJSON" type on the next_kwargs column of TI: this is a type decorator that automatically applies the BaseSerialization encode/decode step (__var and __type etc). The problem with that is that we need to do the serialization on the client in order to send a JSON HTTP request, so we don't want to encode _again_ on the server ideally. I was able to do that easily on the write/update side but not so easily on the read side -- there I left a comment and for now we will hae SQLA decode it for us, and then we have to encode it again. Not the best, but not a disaster either. The other change I did here was to have the DeferTask automatically apply the serde encoding when serializing, just so that there are fewer places in the code that need to be aware of that detail (So the Task subprocess will encode it before making the request toe the Supervisor, and in the Supervisor it will be kept encoded and passed along as is to it's HTTP request). This means you can once again pass datetime objects to a trigger "natively", not only strings. For consistency with the user facing code I renamed `next_method` on `DeferTask` message to `method_name`. I'm not sure this really makes sense on in the API request to the API server though. Co-authored-by: Amogh Desai <amoghrajesh1999@gmail.com> Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
2 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
There was a number of issues here that prevented us from correctly resuming a
task after it's trigger fired.
First off, only ever ran the execute method and we didn't respect next_method.
So we added that info the the TI context we send in the server in response
to the
.../runendpoint.Once we'd fixed that, we then ran into the net problem that we were
incorrectly setting the same value for trigger_kwargs (which are the kwargs we
pass to the trigger constructor) and to the next_kwargs (which are the kwargs
to use when resuming the task) -- they needed to be different. This involved
adding the new field (
kwargs) onto the TIDeferredStatePayload.The next complication after that was the "ExtendedJSON" type on the
next_kwargs column of TI: this is a type decorator that automatically applies
the BaseSerialization encode/decode step (__var and __type etc). The problem
with that is that we need to do the serialization on the client in order to
send a JSON HTTP request, so we don't want to encode again on the server
ideally. I was able to do that easily on the write/update side but not so
easily on the read side -- there I left a comment and for now we will hae SQLA
decode it for us, and then we have to encode it again. Not the best, but not a
disaster either.
The other change I did here was to have the DeferTask automatically apply the
serde encoding when serializing, just so that there are fewer places in the
code that need to be aware of that detail (So the Task subprocess will encode
it before making the request toe the Supervisor, and in the Supervisor it will be
kept encoded and passed along as is to it's HTTP request). This means you can
once again pass datetime objects to a trigger "natively", not only strings.
For consistency with the user facing code I renamed
next_methodonDeferTaskmessage tomethod_name. I'm not sure this really makes sense onin the API request to the API server though.
Closes #47013
Testing:
I tested with this dag
And the task state now goes to success, and showing a snippet of task logs.
The initial start + defer:
{"timestamp":"2025-02-25T14:44:01.823199","level":"info","event":"DAG bundles loaded: dags-folder","logger":"airflow.dag_processing.bundles.manager.DagBundlesManager"} {"timestamp":"2025-02-25T14:44:01.823360","level":"info","event":"Filling up the DagBag from /files/dags/kafka_test.py","logger":"airflow.models.dagbag.DagBag"} {"timestamp":"2025-02-25T14:44:01.823732","level":"debug","event":"Importing /files/dags/kafka_test.py","logger":"airflow.models.dagbag.DagBag"} {"timestamp":"2025-02-25T14:44:01.828547","level":"debug","event":"Loaded DAG <DAG: trigger_test>","logger":"airflow.models.dagbag.DagBag"} {"timestamp":"2025-02-25T14:44:01.828767","level":"debug","event":"DAG file parsed","file":"kafka_test.py","logger":"task"} {"timestamp":"2025-02-25T14:44:01.853442","level":"info","event":"Pausing task as DEFERRED. ","dag_id":"trigger_test","task_id":"wait_for_time","run_id":"manual__2025-02-25T14:44:01.007863+00:00_xeVPbb1o","logger":"task"} {"timestamp":"2025-02-25T14:44:01.853690","level":"debug","event":"Sending request","json":"{\"state\":\"deferred\",\"classpath\":\"airflow.providers.standard.triggers.temporal.DateTimeTrigger\",\"trigger_kwargs\":{\"__var\":{\"moment\":{\"__var\":1740494701.834626,\"__type\":\"datetime\"},\"end_from_trigger\":false},\"__type\":\"dict\"},\"trigger_timeout\":null,\"method_name\":\"execute_complete\",\"kwargs\":{\"__var\":{},\"__type\":\"dict\"},\"type\":\"DeferTask\"}\n","logger":"task"}Here's the bit from the trigger:
{"timestamp":"2025-02-25T14:44:03.133222","level":"info","event":"trigger trigger_test/manual__2025-02-25T14:44:01.007863+00:00_xeVPbb1o/wait_for_time/-1/1 (ID 6) starting"} {"timestamp":"2025-02-25T14:44:03.133478","level":"info","event":"trigger starting","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"} {"timestamp":"2025-02-25T14:44:03.135572","level":"info","event":"58 seconds remaining; sleeping 10 seconds","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"} {"timestamp":"2025-02-25T14:44:13.138648","level":"info","event":"48 seconds remaining; sleeping 10 seconds","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"} {"timestamp":"2025-02-25T14:44:23.140705","level":"info","event":"38 seconds remaining; sleeping 10 seconds","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"} {"timestamp":"2025-02-25T14:44:33.145052","level":"info","event":"28 seconds remaining; sleeping 10 seconds","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"} {"timestamp":"2025-02-25T14:44:43.151867","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"} {"timestamp":"2025-02-25T14:44:44.158527","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"} {"timestamp":"2025-02-25T14:44:45.164749","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"} {"timestamp":"2025-02-25T14:44:46.166918","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"} {"timestamp":"2025-02-25T14:44:47.173182","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"} {"timestamp":"2025-02-25T14:44:48.175339","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"} {"timestamp":"2025-02-25T14:44:49.180447","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"} {"timestamp":"2025-02-25T14:44:50.186073","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"} {"timestamp":"2025-02-25T14:44:51.190796","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"} {"timestamp":"2025-02-25T14:44:52.194850","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"} {"timestamp":"2025-02-25T14:44:53.196601","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"} {"timestamp":"2025-02-25T14:44:54.203250","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"} {"timestamp":"2025-02-25T14:44:55.209000","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"} {"timestamp":"2025-02-25T14:44:56.213201","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"} {"timestamp":"2025-02-25T14:44:57.215168","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"} {"timestamp":"2025-02-25T14:44:58.218028","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"} {"timestamp":"2025-02-25T14:44:59.221094","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"} {"timestamp":"2025-02-25T14:45:00.221974","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"} {"timestamp":"2025-02-25T14:45:01.223881","level":"info","event":"sleeping 1 second...","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"} {"timestamp":"2025-02-25T14:45:02.227491","level":"info","event":"yielding event with payload DateTime(2025, 2, 25, 14, 45, 1, 834626, tzinfo=Timezone('UTC'))","logger":"airflow.providers.standard.triggers.temporal.DateTimeTrigger"} {"timestamp":"2025-02-25T14:45:02.228959","level":"info","event":"Trigger fired event","name":"trigger_test/manual__2025-02-25T14:44:01.007863+00:00_xeVPbb1o/wait_for_time/-1/1 (ID 6)","result":"TriggerEvent<DateTime(2025, 2, 25, 14, 45, 1, 834626, tzinfo=Timezone('UTC'))>"} {"timestamp":"2025-02-25T14:45:02.230003","level":"info","event":"trigger completed","name":"trigger_test/manual__2025-02-25T14:44:01.007863+00:00_xeVPbb1o/wait_for_time/-1/1 (ID 6)"}And back on the worker:
{"timestamp":"2025-02-25T14:45:03.180834","level":"info","event":"DAG bundles loaded: dags-folder","logger":"airflow.dag_processing.bundles.manager.DagBundlesManager"} {"timestamp":"2025-02-25T14:45:03.181018","level":"info","event":"Filling up the DagBag from /files/dags/kafka_test.py","logger":"airflow.models.dagbag.DagBag"} {"timestamp":"2025-02-25T14:45:03.181425","level":"debug","event":"Importing /files/dags/kafka_test.py","logger":"airflow.models.dagbag.DagBag"} {"timestamp":"2025-02-25T14:45:03.187019","level":"debug","event":"Loaded DAG <DAG: trigger_test>","logger":"airflow.models.dagbag.DagBag"} {"timestamp":"2025-02-25T14:45:03.187235","level":"debug","event":"DAG file parsed","file":"kafka_test.py","logger":"task"} {"timestamp":"2025-02-25T14:45:03.194373","level":"debug","event":"Sending request","json":"{\"rendered_fields\":{\"target_time\":\"2025-02-25 14:46:03.194255\"},\"type\":\"SetRenderedFields\"}\n","logger":"task"} {"timestamp":"2025-02-25T14:45:03.194493","level":"debug","event":"Calling 'on_task_instance_running' with {'previous_state': <TaskInstanceState.QUEUED: 'queued'>, 'task_instance': RuntimeTaskInstance(id=UUID('01953d90-9785-790b-99e0-535a92b6f179'), task_id='wait_for_time', dag_id='trigger_test', run_id='manual__2025-02-25T14:44:01.007863+00:00_xeVPbb1o', try_number=1, map_index=-1, hostname='d250d05894d8', task=<Task(DateTimeSensorAsync): wait_for_time>, max_tries=0, start_date=datetime.datetime(2025, 2, 25, 14, 45, 3, 164729, tzinfo=TzInfo(UTC)))}","logger":"airflow.listeners.listener"} {"timestamp":"2025-02-25T14:45:03.194523","level":"debug","event":"Hook impls: []","logger":"airflow.listeners.listener"} {"timestamp":"2025-02-25T14:45:03.194548","level":"debug","event":"Result from 'on_task_instance_running': []","logger":"airflow.listeners.listener"} {"timestamp":"2025-02-25T14:45:03.194723","level":"debug","event":"Sending request","json":"{\"state\":\"success\",\"end_date\":\"2025-02-25T14:45:03.194658Z\",\"task_outlets\":[],\"outlet_events\":[],\"type\":\"SucceedTask\"}\n","logger":"task"}^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.