Skip to content

Commit 6c4cee7

Browse files
authored
fix: Handle Kubernetes API responses with non-JSON bodies (#55107)
- Add robust JSON parsing with fallback for API exception bodies - Handle 429 responses and other cases where response body is plain text - Update KubernetesExecutor to safely parse ApiException.body - Update KubernetesInstallKueueOperator to handle non-JSON error bodies
1 parent d63d39c commit 6c4cee7

4 files changed

Lines changed: 76 additions & 6 deletions

File tree

providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -368,20 +368,31 @@ def sync(self) -> None:
368368
)
369369
self.fail(task[0], e)
370370
except ApiException as e:
371-
body = json.loads(e.body)
371+
try:
372+
if e.body:
373+
body = json.loads(e.body)
374+
else:
375+
# If no body content, use reason as the message
376+
body = {"message": e.reason}
377+
except (json.JSONDecodeError, ValueError, TypeError):
378+
# If the body is a string (e.g., in a 429 error), it can't be parsed as JSON.
379+
# Use the body directly as the message instead.
380+
body = {"message": e.body}
381+
372382
retries = self.task_publish_retries[key]
373383
# In case of exceeded quota or conflict errors, requeue the task as per the task_publish_max_retries
384+
message = body.get("message", "")
374385
if (
375-
(str(e.status) == "403" and "exceeded quota" in body["message"])
376-
or (str(e.status) == "409" and "object has been modified" in body["message"])
386+
(str(e.status) == "403" and "exceeded quota" in message)
387+
or (str(e.status) == "409" and "object has been modified" in message)
377388
) and (self.task_publish_max_retries == -1 or retries < self.task_publish_max_retries):
378389
self.log.warning(
379390
"[Try %s of %s] Kube ApiException for Task: (%s). Reason: %r. Message: %s",
380391
self.task_publish_retries[key] + 1,
381392
self.task_publish_max_retries,
382393
key,
383394
e.reason,
384-
body["message"],
395+
message,
385396
)
386397
self.task_queue.put(task)
387398
self.task_publish_retries[key] = retries + 1

providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/kueue.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,27 @@ def execute(self, context):
6565
try:
6666
self.hook.apply_from_yaml_file(yaml_objects=yaml_objects)
6767
except FailToCreateError as ex:
68-
error_bodies = [json.loads(e.body) for e in ex.api_exceptions]
68+
error_bodies = []
69+
for e in ex.api_exceptions:
70+
try:
71+
if e.body:
72+
error_bodies.append(json.loads(e.body))
73+
else:
74+
# If no body content, use reason as the message
75+
reason = getattr(e, "reason", "Unknown")
76+
error_bodies.append({"message": reason, "reason": reason})
77+
except (json.JSONDecodeError, ValueError, TypeError):
78+
# If the body is a string (e.g., in a 429 error), it can't be parsed as JSON.
79+
# Use the body directly as the message instead.
80+
error_bodies.append({"message": e.body, "reason": getattr(e, "reason", "Unknown")})
6981
if next((e for e in error_bodies if e.get("reason") == "AlreadyExists"), None):
7082
self.log.info("Kueue is already enabled for the cluster")
7183

7284
if errors := [e for e in error_bodies if e.get("reason") != "AlreadyExists"]:
73-
error_message = "\n".join(e.get("body") for e in errors)
85+
error_message = "\n".join(
86+
e.get("message") or e.get("body") or f"Unknown error: {e.get('reason', 'Unknown')}"
87+
for e in errors
88+
)
7489
raise AirflowException(error_message)
7590
return
7691

providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,27 @@ def setup_method(self) -> None:
386386
State.SUCCESS,
387387
id="409 conflict",
388388
),
389+
pytest.param(
390+
HTTPResponse(body="Too many requests, please try again later.", status=429),
391+
0,
392+
False,
393+
State.FAILED,
394+
id="429 Too Many Requests (non-JSON body)",
395+
),
396+
pytest.param(
397+
HTTPResponse(body="Too many requests, please try again later.", status=429),
398+
1,
399+
False,
400+
State.FAILED,
401+
id="429 Too Many Requests (non-JSON body) (task_publish_max_retries=1)",
402+
),
403+
pytest.param(
404+
HTTPResponse(body="", status=429),
405+
0,
406+
False,
407+
State.FAILED,
408+
id="429 Too Many Requests (empty body)",
409+
),
389410
],
390411
)
391412
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")

providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_kueue.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,29 @@ def test_execute_error(self, mock_hook, mock_log):
115115
mock_hook.return_value.check_kueue_deployment_running.assert_not_called()
116116
mock_log.info.assert_called_once_with("Kueue is already enabled for the cluster")
117117

118+
@mock.patch(KUEUE_OPERATORS_PATH.format("KubernetesInstallKueueOperator.log"))
119+
@mock.patch(KUEUE_OPERATORS_PATH.format("KubernetesHook"))
120+
def test_execute_non_json_response(self, mock_hook, mock_log):
121+
"""Test handling of non-JSON API response bodies (e.g., 429 errors)."""
122+
mock_get_yaml_content_from_file = mock_hook.return_value.get_yaml_content_from_file
123+
mock_yaml_objects = mock_get_yaml_content_from_file.return_value
124+
mock_apply_from_yaml_file = mock_hook.return_value.apply_from_yaml_file
125+
126+
# Create mock exceptions with non-JSON bodies (simulating 429 errors)
127+
api_exceptions = [
128+
mock.MagicMock(body="Too many requests, please try again later.", reason="TooManyRequests"),
129+
mock.MagicMock(body="", reason="RateLimited"), # Empty body case
130+
]
131+
mock_apply_from_yaml_file.side_effect = FailToCreateError(api_exceptions)
132+
expected_error_message = "Too many requests, please try again later.\nRateLimited"
133+
134+
with pytest.raises(AirflowException, match=expected_error_message):
135+
self.operator.execute(context=mock.MagicMock())
136+
137+
mock_get_yaml_content_from_file.assert_called_once_with(kueue_yaml_url=KUEUE_YAML_URL)
138+
mock_apply_from_yaml_file.assert_called_once_with(yaml_objects=mock_yaml_objects)
139+
mock_hook.return_value.check_kueue_deployment_running.assert_not_called()
140+
118141

119142
class TestKubernetesStartKueueJobOperator:
120143
def test_template_fields(self):

0 commit comments

Comments
 (0)