Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1603,6 +1603,33 @@ workers:
type: float
example: ~
default: "60.0"
missing_dag_retires:
description: |
Maximum number of times a task will be rescheduled if the worker fails to
load the Dag or task definition during startup.

This situation can occur due to transient infrastructure issues such as
missing Dag files, temporary filesystem or network problems, or bundle
synchronization delays. Rescheduling in this case does not count as a
task retry.

Set this value to 0 to disable rescheduling and fail the task immediately
on startup failures.
version_added: 3.1.7
type: integer
example: ~
default: "3"
missing_dag_retry_delay:
description: |
Delay in seconds before a task is rescheduled after a worker startup
failure caused by an inability to load the Dag or task definition.

This delay is applied when the task runner requests the scheduler to
reschedule the task instance in UP_FOR_RESCHEDULE state.
version_added: 3.1.7
type: integer
example: ~
default: "60"
api_auth:
description: Settings relating to authentication on the Airflow APIs
options:
Expand Down
12 changes: 12 additions & 0 deletions airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
from airflow.stats import Stats
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import REQUEUEABLE_DEPS, RUNNING_DEPS
from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep
from airflow.utils.helpers import prune_dict
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
Expand Down Expand Up @@ -898,6 +899,17 @@ def are_dependencies_met(
:param verbose: whether log details on failed dependencies on info or debug log level
"""
dep_context = dep_context or DepContext()
if self.state == TaskInstanceState.UP_FOR_RESCHEDULE:
# This DepContext is used when a task instance is in UP_FOR_RESCHEDULE state.
#
# Tasks can be put into UP_FOR_RESCHEDULE by the task runner itself (e.g. when
# the worker cannot load the Dag or task). In this case, the scheduler must respect
# the task instance's reschedule_date before scheduling it again.
#
# ReadyToRescheduleDep is the only dependency that enforces this time-based gating.
# We therefore extend the normal scheduling dependency set with it, instead of
# modifying the global scheduler dependencies.
dep_context.deps.add(ReadyToRescheduleDep())
failed = False
verbose_aware_logger = self.log.info if verbose else self.log.debug
for dep_status in self.get_failed_dep_statuses(dep_context=dep_context, session=session):
Expand Down
3 changes: 2 additions & 1 deletion airflow-core/src/airflow/ti_deps/deps/base_ti_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
# under the License.
from __future__ import annotations

from collections.abc import Iterator
from typing import TYPE_CHECKING, NamedTuple

from airflow.ti_deps.dep_context import DepContext
from airflow.utils.session import provide_session

if TYPE_CHECKING:
from collections.abc import Iterator

from sqlalchemy.orm import Session

from airflow.models.taskinstance import TaskInstance
Expand Down
36 changes: 20 additions & 16 deletions airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,23 @@
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING

from airflow._shared.timezones import timezone
from airflow.executors.executor_loader import ExecutorLoader
from airflow.models.taskreschedule import TaskReschedule
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
from airflow.utils.session import provide_session
from airflow.utils.state import TaskInstanceState

if TYPE_CHECKING:
from collections.abc import Iterator

from sqlalchemy.orm import Session

from airflow.models.taskinstance import TaskInstance
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.deps.base_ti_dep import TIDepStatus


class ReadyToRescheduleDep(BaseTIDep):
"""Determines whether a task is ready to be rescheduled."""
Expand All @@ -34,27 +44,22 @@ class ReadyToRescheduleDep(BaseTIDep):
RESCHEDULEABLE_STATES = {TaskInstanceState.UP_FOR_RESCHEDULE, None}

@provide_session
def _get_dep_statuses(self, ti, session, dep_context):
def _get_dep_statuses(
self,
ti: TaskInstance,
session: Session,
dep_context: DepContext,
) -> Iterator[TIDepStatus]:
"""
Determine whether a task is ready to be rescheduled.

Only tasks in NONE state with at least one row in task_reschedule table are
Only tasks in NONE or UP_FOR_RESCHEDULE state with at least one row in task_reschedule table are
handled by this dependency class, otherwise this dependency is considered as passed.
This dependency fails if the latest reschedule request's reschedule date is still
in the future.
"""
from airflow.models.mappedoperator import MappedOperator

is_mapped = isinstance(ti.task, MappedOperator)
executor, _ = ExecutorLoader.import_default_executor_cls()
if (
# Mapped sensors don't have the reschedule property (it can only be calculated after unmapping),
# so we don't check them here. They are handled below by checking TaskReschedule instead.
not is_mapped and not getattr(ti.task, "reschedule", False)
):
yield self._passing_status(reason="Task is not in reschedule mode.")
return

if dep_context.ignore_in_reschedule_period:
yield self._passing_status(
reason="The context specified that being in a reschedule period was permitted."
Expand All @@ -75,14 +80,13 @@ def _get_dep_statuses(self, ti, session, dep_context):
if not next_reschedule_date:
# Because mapped sensors don't have the reschedule property, here's the last resort
# and we need a slightly different passing reason
if is_mapped:
if isinstance(ti.task, MappedOperator):
yield self._passing_status(reason="The task is mapped and not in reschedule mode")
return
yield self._passing_status(reason="There is no reschedule request for this task instance.")
return

now = timezone.utcnow()
if now >= next_reschedule_date:
if (now := timezone.utcnow()) >= next_reschedule_date:
yield self._passing_status(reason="Task instance id ready for reschedule.")
return

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,6 @@ def test_should_pass_if_ignore_in_reschedule_period_is_set(self, not_expected_tr
dep_context = DepContext(ignore_in_reschedule_period=True)
assert ReadyToRescheduleDep().is_met(ti=ti, dep_context=dep_context)

def test_should_pass_if_not_reschedule_mode(self, not_expected_tr_db_call):
ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
del ti.task.reschedule
assert ReadyToRescheduleDep().is_met(ti=ti)

def test_should_pass_if_not_in_none_state(self, not_expected_tr_db_call):
ti = self._get_task_instance(State.UP_FOR_RETRY)
assert ReadyToRescheduleDep().is_met(ti=ti)
Expand All @@ -126,6 +121,17 @@ def test_should_pass_after_reschedule_date_multiple(self):
self._create_task_reschedule(ti, [-21, -11, -1])
assert ReadyToRescheduleDep().is_met(ti=ti)

def test_should_fail_before_reschedule_date_even_if_task_is_not_reschedule_mode(self):
"""
When a task is in UP_FOR_RESCHEDULE state but the operator itself is not in reschedule mode
(i.e. reschedule was triggered by infrastructure/startup failure), we still must respect the
TaskReschedule.reschedule_date.
"""
ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
del ti.task.reschedule
self._create_task_reschedule(ti, 1)
assert not ReadyToRescheduleDep().is_met(ti=ti)

def test_should_fail_before_reschedule_date_one(self):
ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
self._create_task_reschedule(ti, 1)
Expand All @@ -142,11 +148,6 @@ def test_mapped_task_should_pass_if_ignore_in_reschedule_period_is_set(self, not
dep_context = DepContext(ignore_in_reschedule_period=True)
assert ReadyToRescheduleDep().is_met(ti=ti, dep_context=dep_context)

def test_mapped_task_should_pass_if_not_reschedule_mode(self, not_expected_tr_db_call):
ti = self._get_task_instance(State.UP_FOR_RESCHEDULE, map_index=42)
del ti.task.reschedule
assert ReadyToRescheduleDep().is_met(ti=ti)

def test_mapped_task_should_pass_if_not_in_none_state(self, not_expected_tr_db_call):
ti = self._get_task_instance(State.UP_FOR_RETRY, map_index=42)
assert ReadyToRescheduleDep().is_met(ti=ti)
Expand Down
53 changes: 48 additions & 5 deletions task-sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import time
from collections.abc import Callable, Iterable, Iterator, Mapping
from contextlib import suppress
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from itertools import product
from pathlib import Path
from typing import TYPE_CHECKING, Annotated, Any, Literal
Expand All @@ -41,7 +41,11 @@
from airflow.configuration import conf
from airflow.dag_processing.bundles.base import BaseDagBundle, BundleVersionLock
from airflow.dag_processing.bundles.manager import DagBundlesManager
from airflow.exceptions import AirflowInactiveAssetInInletOrOutletException, AirflowTaskTimeout
from airflow.exceptions import (
AirflowInactiveAssetInInletOrOutletException,
AirflowRescheduleException,
AirflowTaskTimeout,
)
from airflow.listeners.listener import get_listener_manager
from airflow.sdk.api.client import get_hostname, getuser
from airflow.sdk.api.datamodels._generated import (
Expand Down Expand Up @@ -604,6 +608,33 @@ def _xcom_push_to_db(ti: RuntimeTaskInstance, key: str, value: Any) -> None:
)


def _maybe_reschedule_startup_failure(
*,
ti_context: TIRunContext,
log: Logger,
) -> None:
"""
Attempt to reschedule the task when a startup failure occurs.

This does not count as a retry. If the reschedule limit is exceeded, this function
returns and the caller should fail the task.
"""
missing_dag_retires = conf.getint("workers", "missing_dag_retires", fallback=3)
missing_dag_retry_delay = conf.getint("workers", "missing_dag_retry_delay", fallback=60)

reschedule_count = int(getattr(ti_context, "task_reschedule_count", 0) or 0)
if missing_dag_retires > 0 and reschedule_count < missing_dag_retires:
raise AirflowRescheduleException(
reschedule_date=datetime.now(tz=timezone.utc) + timedelta(seconds=missing_dag_retry_delay)
)

log.error(
"Startup reschedule limit exceeded",
reschedule_count=reschedule_count,
max_reschedules=missing_dag_retires,
)


def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance:
# TODO: Task-SDK:
# Using DagBag here is about 98% wrong, but it'll do for now
Expand Down Expand Up @@ -638,6 +669,7 @@ def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance:
log.error(
"Dag not found during start up", dag_id=what.ti.dag_id, bundle=bundle_info, path=what.dag_rel_path
)
_maybe_reschedule_startup_failure(ti_context=what.ti_context, log=log)
sys.exit(1)

# install_loader()
Expand All @@ -652,6 +684,7 @@ def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance:
bundle=bundle_info,
path=what.dag_rel_path,
)
_maybe_reschedule_startup_failure(ti_context=what.ti_context, log=log)
sys.exit(1)

if not isinstance(task, (BaseOperator, MappedOperator)):
Expand Down Expand Up @@ -1547,7 +1580,17 @@ def main():
SUPERVISOR_COMMS = CommsDecoder[ToTask, ToSupervisor](log=log)

try:
ti, context, log = startup()
try:
ti, context, log = startup()
except AirflowRescheduleException as reschedule:
log.warning("Rescheduling task during startup, marking task as UP_FOR_RESCHEDULE")
SUPERVISOR_COMMS.send(
msg=RescheduleTask(
reschedule_date=reschedule.reschedule_date,
end_date=datetime.now(tz=timezone.utc),
)
)
sys.exit(0)
with BundleVersionLock(
bundle_name=ti.bundle_instance.name,
bundle_version=ti.bundle_instance.version,
Expand All @@ -1557,10 +1600,10 @@ def main():
finalize(ti, state, context, log, error)
except KeyboardInterrupt:
log.exception("Ctrl-c hit")
exit(2)
sys.exit(2)
except Exception:
log.exception("Top level error")
exit(1)
sys.exit(1)
finally:
# Ensure the request socket is closed on the child side in all circumstances
# before the process fully terminates.
Expand Down
Loading