Skip to content

Commit ff15983

Browse files
authored
Fix pod_override serialization in DAG details and executor path (#65407)
Sanitize default_args["executor_config"]["pod_override"] in DAG details responses without changing other default_args values, and make V1Pod serialization force the Kubernetes import path so executor config no longer falls back to stringification. This fixes two related Kubernetes serialization bugs: one where the DAG details API could fail when pod_override was present in default_args, and another where V1Pod objects could be flattened into strings before reaching the Kubernetes executor. The updated tests cover both the API response behavior and the serializer regression with real Kubernetes pod objects.
1 parent 6f9fc54 commit ff15983

4 files changed

Lines changed: 143 additions & 2 deletions

File tree

airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
field_validator,
3434
)
3535

36+
from airflow._shared.module_loading import qualname
3637
from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel, make_partial_model
3738
from airflow.api_fastapi.core_api.datamodels.dag_tags import DagTagResponse
3839
from airflow.api_fastapi.core_api.datamodels.dag_versions import DagVersionResponse
@@ -44,6 +45,11 @@
4445
from airflow.serialization.definitions.param import SerializedParamsDict
4546

4647

48+
def _is_response_safe_pod_override(value: Any) -> bool:
49+
"""Whether a pod_override value is already safe to preserve in the response."""
50+
return value is None or isinstance(value, str | int | float | Mapping | list)
51+
52+
4753
@cache
4854
def _get_file_token_serializer() -> URLSafeSerializer:
4955
"""
@@ -216,6 +222,37 @@ def get_doc_md(cls, doc_md: str | None) -> str | None:
216222
return None
217223
return inspect.cleandoc(doc_md)
218224

225+
@field_validator("default_args", mode="before")
226+
@classmethod
227+
def get_default_args(cls, default_args: Mapping | None) -> Mapping | None:
228+
"""
229+
Sanitize default_args for the API response.
230+
231+
Targets the common case where ``executor_config["pod_override"]`` is a
232+
Kubernetes ``V1Pod``: when the value is not a JSON primitive
233+
(``None``/``str``/``int``/``float``) or a ``Mapping``/``list``, it is
234+
rewritten to a fully-qualified type-name string so the response stays
235+
valid JSON. The container check is shallow — a ``Mapping`` or ``list``
236+
whose contents are themselves non-serializable (e.g. nested ``V1Pod``)
237+
will still raise during response serialization, as will any other
238+
non-JSON values elsewhere in ``default_args``.
239+
"""
240+
if default_args is None:
241+
return None
242+
executor_config = default_args.get("executor_config")
243+
if not (isinstance(executor_config, Mapping) and "pod_override" in executor_config):
244+
return default_args
245+
246+
pod_override = executor_config["pod_override"]
247+
if _is_response_safe_pod_override(pod_override):
248+
return default_args
249+
250+
sanitized_executor_config = dict(executor_config)
251+
sanitized_executor_config["pod_override"] = qualname(pod_override)
252+
result = dict(default_args)
253+
result["executor_config"] = sanitized_executor_config
254+
return result
255+
219256
@field_validator("params", mode="before")
220257
@classmethod
221258
def get_params(cls, params: SerializedParamsDict | None) -> dict | None:

airflow-core/src/airflow/serialization/serialized_objects.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,11 @@ def serialize(
491491
)
492492
elif isinstance(var, list):
493493
return [cls.serialize(v, strict=strict) for v in var]
494-
elif var.__class__.__name__ == "V1Pod" and _has_kubernetes() and isinstance(var, k8s.V1Pod):
494+
elif (
495+
var.__class__.__name__ == "V1Pod"
496+
and _has_kubernetes(attempt_import=True)
497+
and isinstance(var, k8s.V1Pod)
498+
):
495499
json_pod = PodGenerator.serialize_pod(var)
496500
return cls._encode(json_pod, type_=DAT.POD)
497501
elif isinstance(var, OutletEventAccessors):

airflow-core/tests/unit/api_fastapi/core_api/datamodels/test_dags.py

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,15 @@
1616
# under the License.
1717
from __future__ import annotations
1818

19+
from datetime import datetime, timedelta, timezone
20+
1921
import pytest
2022

21-
from airflow.api_fastapi.core_api.datamodels.dags import DAGResponse
23+
from airflow._shared.module_loading import qualname
24+
from airflow.api_fastapi.core_api.datamodels.dags import (
25+
DAGDetailsResponse,
26+
DAGResponse,
27+
)
2228
from airflow.utils.types import DagRunType
2329

2430

@@ -58,6 +64,77 @@ def _make_dag_response(**overrides) -> DAGResponse:
5864
return DAGResponse.model_validate(defaults)
5965

6066

67+
class TestGetDefaultArgsValidator:
68+
"""Test the get_default_args field_validator on DAGDetailsResponse."""
69+
70+
def _call_validator(self, value):
71+
"""Invoke the classmethod validator directly."""
72+
return DAGDetailsResponse.get_default_args(value)
73+
74+
def test_none_returns_none(self):
75+
assert self._call_validator(None) is None
76+
77+
def test_plain_dict_is_preserved(self):
78+
result = self._call_validator({"retries": 3, "depends_on_past": False})
79+
assert result == {"retries": 3, "depends_on_past": False}
80+
81+
def test_timedelta_values_are_preserved(self):
82+
td = timedelta(minutes=5)
83+
result = self._call_validator({"retry_delay": td})
84+
assert result == {"retry_delay": td}
85+
86+
def test_datetime_values_are_preserved(self):
87+
start_date = datetime(2024, 1, 1, tzinfo=timezone.utc)
88+
result = self._call_validator({"start_date": start_date})
89+
assert result == {"start_date": start_date}
90+
91+
def test_pod_override_is_replaced_with_type_name(self):
92+
k8s = pytest.importorskip("kubernetes.client.models")
93+
pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="test-pod"))
94+
result = self._call_validator({"executor_config": {"pod_override": pod, "namespace": "custom"}})
95+
assert result == {"executor_config": {"pod_override": qualname(pod), "namespace": "custom"}}
96+
97+
@pytest.mark.parametrize(
98+
"pod_override",
99+
[
100+
pytest.param(None, id="none"),
101+
pytest.param("already-serialized", id="string"),
102+
pytest.param({"metadata": {"name": "pod"}}, id="dict"),
103+
pytest.param([{"metadata": {"name": "pod"}}], id="list"),
104+
],
105+
)
106+
def test_serialized_pod_override_values_are_preserved(self, pod_override):
107+
result = self._call_validator({"executor_config": {"pod_override": pod_override}})
108+
assert result == {"executor_config": {"pod_override": pod_override}}
109+
110+
def test_serialized_pod_override_preserves_other_executor_config_keys(self):
111+
executor_config = {
112+
"pod_override": {"metadata": {"name": "pod"}},
113+
"KubernetesExecutor": {"image": "custom-image"},
114+
}
115+
116+
result = self._call_validator({"executor_config": executor_config})
117+
118+
assert result == {"executor_config": executor_config}
119+
120+
def test_non_serialized_pod_override_object_is_replaced_with_type_name(self):
121+
class Opaque:
122+
pass
123+
124+
value = Opaque()
125+
result = self._call_validator({"executor_config": {"pod_override": value}})
126+
assert result == {"executor_config": {"pod_override": qualname(value)}}
127+
128+
def test_non_pod_override_objects_are_left_unchanged(self):
129+
class Opaque:
130+
def to_dict(self):
131+
return {"password": "secret"}
132+
133+
value = Opaque()
134+
result = self._call_validator({"connection": value})
135+
assert result["connection"] is value
136+
137+
61138
class TestIsBackfillable:
62139
@pytest.mark.parametrize(
63140
"timetable_periodic",

airflow-core/tests/unit/serialization/test_serialized_objects.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
from airflow.sdk.definitions.param import Param
8686
from airflow.sdk.definitions.taskgroup import TaskGroup
8787
from airflow.sdk.execution_time.context import OutletEventAccessor, OutletEventAccessors
88+
from airflow.serialization import serialized_objects
8889
from airflow.serialization.definitions.assets import (
8990
SerializedAsset,
9091
SerializedAssetAlias,
@@ -1038,6 +1039,28 @@ def test_has_kubernetes_uses_existing_import(self):
10381039

10391040
assert result is True
10401041

1042+
def test_serialize_v1pod_attempts_import_before_serializing(self, monkeypatch):
1043+
"""Regression test: V1Pod serialization must call _has_kubernetes(attempt_import=True)."""
1044+
k8s = pytest.importorskip("kubernetes.client.models")
1045+
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
1046+
1047+
calls = []
1048+
1049+
def fake_has_kubernetes(*, attempt_import=False):
1050+
calls.append(attempt_import)
1051+
return True
1052+
1053+
monkeypatch.setattr(serialized_objects, "_has_kubernetes", fake_has_kubernetes)
1054+
monkeypatch.setattr(serialized_objects, "k8s", k8s, raising=False)
1055+
monkeypatch.setattr(serialized_objects, "PodGenerator", PodGenerator, raising=False)
1056+
1057+
pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="test-pod"))
1058+
result = BaseSerialization.serialize(pod)
1059+
1060+
assert isinstance(result, dict), "V1Pod should serialize to a dict, not a string"
1061+
assert result.get(Encoding.TYPE) == DAT.POD, "V1Pod should have type DAT.POD"
1062+
assert True in calls
1063+
10411064

10421065
@pytest.mark.db_test
10431066
def test_serialized_dag_getitem_returns_task(dag_maker):

0 commit comments

Comments
 (0)