Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions task-sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,7 @@ def _on_term(signum, frame):

try:
result = _execute_task(context=context, ti=ti, log=log)
log.info("::group::Post Execute")
except Exception:
import jinja2

Expand All @@ -1290,22 +1291,24 @@ def _on_term(signum, frame):
# Send update only if value changed (e.g., user set context variables during execution)
if ti.rendered_map_index and ti.rendered_map_index != previous_rendered_map_index:
SUPERVISOR_COMMS.send(msg=SetRenderedMapIndex(rendered_map_index=ti.rendered_map_index))
finally:
log.info("::group::Post Execute")

_push_xcom_if_needed(result, ti, log)

msg, state = _handle_current_task_success(context, ti)
except DownstreamTasksSkipped as skip:
log.info("::group::Post Execute")
log.info("Skipping downstream tasks.")
tasks_to_skip = skip.tasks if isinstance(skip.tasks, list) else [skip.tasks]
SUPERVISOR_COMMS.send(msg=SkipDownstreamTasks(tasks=tasks_to_skip))
msg, state = _handle_current_task_success(context, ti)
except DagRunTriggerException as drte:
log.info("::group::Post Execute")
msg, state = _handle_trigger_dag_run(drte, context, ti, log)
except TaskDeferred as defer:
log.info("::group::Post Execute")
msg, state = _defer_task(defer, ti, log)
except AirflowSkipException as e:
log.info("::group::Post Execute")
if e.args:
log.info("Skipping task.", reason=e.args[0])
msg = TaskState(
Expand All @@ -1315,6 +1318,7 @@ def _on_term(signum, frame):
)
state = TaskInstanceState.SKIPPED
except AirflowRescheduleException as reschedule:
log.info("::group::Post Execute")
log.info("Rescheduling task, marking task as UP_FOR_RESCHEDULE")
msg = RescheduleTask(
reschedule_date=reschedule.reschedule_date, end_date=datetime.now(tz=timezone.utc)
Expand All @@ -1324,6 +1328,7 @@ def _on_term(signum, frame):
# If AirflowFailException is raised, task should not retry.
# If a sensor in reschedule mode reaches timeout, task should not retry.
log.exception("Task failed with exception")
log.info("::group::Post Execute")
ti.end_date = datetime.now(tz=timezone.utc)
msg = TaskState(
state=TaskInstanceState.FAILED,
Expand All @@ -1335,13 +1340,15 @@ def _on_term(signum, frame):
except (AirflowTaskTimeout, AirflowException, AirflowRuntimeError) as e:
# We should allow retries if the task has defined it.
log.exception("Task failed with exception")
log.info("::group::Post Execute")
msg, state = _handle_current_task_failed(ti)
error = e
except AirflowTaskTerminated as e:
# External state updates are already handled with `ti_heartbeat` and will be
# updated already be another UI API. So, these exceptions should ideally never be thrown.
# If these are thrown, we should mark the TI state as failed.
log.exception("Task failed with exception")
log.info("::group::Post Execute")
ti.end_date = datetime.now(tz=timezone.utc)
msg = TaskState(
state=TaskInstanceState.FAILED,
Expand All @@ -1353,10 +1360,12 @@ def _on_term(signum, frame):
except SystemExit as e:
# SystemExit needs to be retried if they are eligible.
log.error("Task exited", exit_code=e.code)
log.info("::group::Post Execute")
msg, state = _handle_current_task_failed(ti)
error = e
except BaseException as e:
log.exception("Task failed with exception")
log.info("::group::Post Execute")
msg, state = _handle_current_task_failed(ti)
error = e
finally:
Expand Down
Loading