Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/src/airflow/api_fastapi/common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class MenuItem(Enum):
CONFIG = "Config"
CONNECTIONS = "Connections"
DAGS = "Dags"
DEADLINES = "Deadlines"
Comment thread
bbovenzi marked this conversation as resolved.
Outdated
DOCS = "Docs"
JOBS = "Jobs"
PLUGINS = "Plugins"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ class DeadlineResponse(BaseModel):
dag_id: str = Field(validation_alias=AliasPath("dagrun", "dag_id"))
dag_run_id: str = Field(validation_alias=AliasPath("dagrun", "run_id"))
alert_name: str | None = Field(validation_alias=AliasPath("deadline_alert", "name"), default=None)
alert_description: str | None = Field(
validation_alias=AliasPath("deadline_alert", "description"), default=None
)


class DeadlineCollectionResponse(BaseModel):
Expand All @@ -53,7 +50,6 @@ class DeadlineAlertResponse(BaseModel):

id: UUID
name: str | None = None
description: str | None = None
Comment thread
pierrejeambrun marked this conversation as resolved.
reference_type: str = Field(validation_alias=AliasPath("reference", "reference_type"))
interval: float = Field(description="Interval in seconds between deadline evaluations.")
Comment thread
imrichardwu marked this conversation as resolved.
created_at: datetime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2309,11 +2309,6 @@ components:
- type: string
- type: 'null'
title: Name
description:
anyOf:
- type: string
- type: 'null'
title: Description
reference_type:
type: string
title: Reference Type
Expand Down Expand Up @@ -2377,11 +2372,6 @@ components:
- type: string
- type: 'null'
title: Alert Name
alert_description:
anyOf:
- type: string
- type: 'null'
title: Alert Description
type: object
required:
- id
Expand Down Expand Up @@ -2896,6 +2886,7 @@ components:
- Config
- Connections
- Dags
- Deadlines
- Docs
- Jobs
- Plugins
Expand Down
14 changes: 12 additions & 2 deletions airflow-core/src/airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ def _create_deadline_alert_records(
for uuid_str, deadline_data in uuid_mapping.items():
alert = DeadlineAlertModel(
id=UUID(uuid_str),
name=deadline_data.get(DeadlineAlertFields.NAME),
reference=deadline_data[DeadlineAlertFields.REFERENCE],
interval=deadline_data[DeadlineAlertFields.INTERVAL],
callback_def=deadline_data[DeadlineAlertFields.CALLBACK],
Expand Down Expand Up @@ -577,8 +578,17 @@ def write_dag(

if deadline_uuid_mapping is not None:
# All deadlines matched — reuse the UUIDs to preserve hash.
# Clear the mapping since the alert rows already exist in the DB;
# no need to delete and recreate identical records.
# Update name in case it changed (it doesn't affect the definition
# match or the hash, so existing rows won't be recreated, but it
# must stay current in the DB).
for uuid_str, deadline_data in deadline_uuid_mapping.items():
session.execute(
update(DeadlineAlertModel)
.where(DeadlineAlertModel.id == UUID(uuid_str))
.values(
name=deadline_data.get(DeadlineAlertFields.NAME),
)
)
Comment thread
imrichardwu marked this conversation as resolved.
Outdated
dag.data["dag"]["deadline"] = existing_deadline_uuids
Comment thread
imrichardwu marked this conversation as resolved.
Outdated
deadline_uuid_mapping = {}
else:
Expand Down
1 change: 1 addition & 0 deletions airflow-core/src/airflow/serialization/decoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def decode_deadline_alert(encoded_data: dict):
reference=reference,
interval=datetime.timedelta(seconds=data[DeadlineAlertFields.INTERVAL]),
callback=deserialize(data[DeadlineAlertFields.CALLBACK]),
name=data.get(DeadlineAlertFields.NAME),
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class DeadlineAlertFields:
serializing DeadlineAlert instances to and from their dictionary representation.
"""

NAME = "name"
REFERENCE = "reference"
INTERVAL = "interval"
CALLBACK = "callback"
Expand Down Expand Up @@ -367,3 +368,4 @@ class SerializedDeadlineAlert:
reference: SerializedReferenceModels.SerializedBaseDeadlineReference
interval: timedelta
callback: Any
name: str | None = None
1 change: 1 addition & 0 deletions airflow-core/src/airflow/serialization/encoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ def encode_deadline_alert(d: DeadlineAlert | SerializedDeadlineAlert) -> dict[st
from airflow.sdk.serde import serialize

return {
"name": getattr(d, "name", None),
Comment thread
imrichardwu marked this conversation as resolved.
Outdated
"reference": encode_deadline_reference(d.reference),
"interval": d.interval.total_seconds(),
"callback": serialize(d.callback),
Expand Down
Comment thread
imrichardwu marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -7996,17 +7996,6 @@ export const $DeadlineAlertResponse = {
],
title: 'Name'
},
description: {
anyOf: [
{
type: 'string'
},
{
type: 'null'
}
],
title: 'Description'
},
reference_type: {
type: 'string',
title: 'Reference Type'
Expand Down Expand Up @@ -8087,17 +8076,6 @@ export const $DeadlineResponse = {
}
],
title: 'Alert Name'
},
alert_description: {
anyOf: [
{
type: 'string'
},
{
type: 'null'
}
],
title: 'Alert Description'
}
},
type: 'object',
Expand Down Expand Up @@ -8556,7 +8534,7 @@ export const $LightGridTaskInstanceSummary = {

export const $MenuItem = {
type: 'string',
enum: ['Required Actions', 'Assets', 'Audit Log', 'Config', 'Connections', 'Dags', 'Docs', 'Jobs', 'Plugins', 'Pools', 'Providers', 'Variables', 'XComs'],
enum: ['Required Actions', 'Assets', 'Audit Log', 'Config', 'Connections', 'Dags', 'Deadlines', 'Docs', 'Jobs', 'Plugins', 'Pools', 'Providers', 'Variables', 'XComs'],
title: 'MenuItem',
description: 'Define all menu items defined in the menu.'
} as const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1951,7 +1951,6 @@ export type DeadlineAlertCollectionResponse = {
export type DeadlineAlertResponse = {
id: string;
name?: string | null;
description?: string | null;
reference_type: string;
/**
* Interval in seconds between deadline evaluations.
Expand Down Expand Up @@ -1979,7 +1978,6 @@ export type DeadlineResponse = {
dag_id: string;
dag_run_id: string;
alert_name?: string | null;
alert_description?: string | null;
};

/**
Expand Down Expand Up @@ -2101,7 +2099,7 @@ export type LightGridTaskInstanceSummary = {
/**
* Define all menu items defined in the menu.
*/
export type MenuItem = 'Required Actions' | 'Assets' | 'Audit Log' | 'Config' | 'Connections' | 'Dags' | 'Docs' | 'Jobs' | 'Plugins' | 'Pools' | 'Providers' | 'Variables' | 'XComs';
export type MenuItem = 'Required Actions' | 'Assets' | 'Audit Log' | 'Config' | 'Connections' | 'Dags' | 'Deadlines' | 'Docs' | 'Jobs' | 'Plugins' | 'Pools' | 'Providers' | 'Variables' | 'XComs';

/**
* Menu Item Collection serializer for responses.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
RUN_DAG2 = "run_dag2" # belongs to DAG_ID_2, which is used for cross-dag filter tests

ALERT_NAME = "SLA Breach Alert"
ALERT_DESCRIPTION = "Fires when SLA is breached"

_CALLBACK_PATH = "tests.unit.api_fastapi.core_api.routes.ui.test_deadlines._noop_callback"

Expand Down Expand Up @@ -156,7 +155,6 @@ def setup(dag_maker, session):
alert = DeadlineAlert(
serialized_dag_id=serialized_dag.id,
name=ALERT_NAME,
description=ALERT_DESCRIPTION,
reference=DeadlineReference.DAGRUN_QUEUED_AT.serialize_reference(),
interval=3600.0,
callback_def={"path": _CALLBACK_PATH},
Expand Down Expand Up @@ -246,7 +244,6 @@ def test_single_deadline_without_alert(self, test_client):
assert deadline1["deadline_time"] == "2025-01-01T12:00:00Z"
assert deadline1["missed"] is False
assert deadline1["alert_name"] is None
assert deadline1["alert_description"] is None
assert deadline1["dag_id"] == DAG_ID
assert deadline1["dag_run_id"] == RUN_SINGLE
assert "id" in deadline1
Expand All @@ -259,14 +256,13 @@ def test_missed_deadline_is_reflected(self, test_client):
assert data["total_entries"] == 1
assert data["deadlines"][0]["missed"] is True

def test_deadline_with_alert_name_and_description(self, test_client):
def test_deadline_with_alert_name(self, test_client):
with assert_queries_count(4):
response = test_client.get(f"/dags/{DAG_ID}/dagRuns/{RUN_ALERT}/deadlines")
assert response.status_code == 200
data = response.json()
assert data["total_entries"] == 1
assert data["deadlines"][0]["alert_name"] == ALERT_NAME
assert data["deadlines"][0]["alert_description"] == ALERT_DESCRIPTION

def test_deadlines_ordered_by_deadline_time_ascending(self, test_client):
with assert_queries_count(4):
Expand Down Expand Up @@ -437,14 +433,13 @@ def test_pagination(self, test_client):
assert [dl["id"] for dl in page2] == all_ids[3:6]

def test_alert_name_present_when_linked(self, test_client):
"""Deadlines linked to a DeadlineAlert include alert_name and alert_description."""
"""Deadlines linked to a DeadlineAlert include alert_name."""
response = test_client.get(f"/dags/{DAG_ID}/dagRuns/~/deadlines")
assert response.status_code == 200
deadlines = response.json()["deadlines"]
alerts = [dl for dl in deadlines if dl["alert_name"] is not None]
assert len(alerts) == 1
assert alerts[0]["alert_name"] == ALERT_NAME
assert alerts[0]["alert_description"] == ALERT_DESCRIPTION

def test_filter_nonexistent_dag_returns_empty(self, test_client):
"""Filtering by a dag_id that doesn't exist returns an empty list."""
Expand Down Expand Up @@ -483,7 +478,6 @@ def test_alert_response_fields(self, test_client):
assert response.status_code == 200
alert = response.json()["deadline_alerts"][0]
assert alert["name"] == ALERT_NAME
assert alert["description"] == ALERT_DESCRIPTION
assert alert["interval"] == 3600.0
assert alert["reference_type"] == "DagRunQueuedAtDeadline"
assert "id" in alert
Expand Down
4 changes: 0 additions & 4 deletions airflow-core/tests/unit/models/test_deadline_alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

DAG_ID = "test_deadline_alert_dag"
DEADLINE_NAME = "Test Alert"
DEADLINE_DESCRIPTION = "This is a test alert description"
DEADLINE_INTERVAL = 60
DEADLINE_CALLBACK = {"path": "test.callback"}
SERIALIZED_DAG_ID = "serialized_dag_uuid"
Expand Down Expand Up @@ -62,7 +61,6 @@ def deadline_alert_orm(dag_maker, session, deadline_reference):
alert = DeadlineAlert(
serialized_dag_id=serialized_dag.id,
name=DEADLINE_NAME,
description=DEADLINE_DESCRIPTION,
reference=deadline_reference,
interval=DEADLINE_INTERVAL,
callback_def=DEADLINE_CALLBACK,
Expand All @@ -86,7 +84,6 @@ def test_deadline_alert_creation(self, deadline_alert_orm):
assert deadline_alert_orm.id is not None
assert deadline_alert_orm.created_at == DEFAULT_DATE
assert deadline_alert_orm.name == DEADLINE_NAME
assert deadline_alert_orm.description == DEADLINE_DESCRIPTION

def test_minimal_deadline_alert_creation(self, dag_maker, session, deadline_reference):
with dag_maker(DAG_ID, session=session):
Expand All @@ -109,7 +106,6 @@ def test_minimal_deadline_alert_creation(self, dag_maker, session, deadline_refe
assert deadline_alert.id is not None
assert deadline_alert.created_at == DEFAULT_DATE
assert deadline_alert.name is None
assert deadline_alert.description is None

def test_deadline_alert_repr(self, deadline_alert_orm, deadline_reference):
repr_str = repr(deadline_alert_orm)
Expand Down
2 changes: 2 additions & 0 deletions task-sdk/src/airflow/sdk/definitions/deadline.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,11 @@ def __init__(
reference: DeadlineReferenceType,
interval: timedelta,
callback: Callback,
name: str | None = None,
):
self.reference = reference
self.interval = interval
self.name = name

if not isinstance(callback, (AsyncCallback, SyncCallback)):
raise ValueError(f"Callbacks of type {type(callback).__name__} are not currently supported")
Expand Down
Loading