Skip to content
7 changes: 5 additions & 2 deletions task-sdk/src/airflow/sdk/execution_time/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1101,8 +1101,11 @@ def _upload_logs(self):
"""
from airflow.sdk.log import upload_to_remote

with _remote_logging_conn(self.client):
upload_to_remote(self.process_log, self.ti)
try:
with _remote_logging_conn(self.client):
upload_to_remote(self.process_log, self.ti)
except Exception:
self.process_log.exception("Failed to upload remote logs", ti_id=self.id, pid=self.pid)

def _monitor_subprocess(self):
"""
Expand Down
25 changes: 25 additions & 0 deletions task-sdk/tests/task_sdk/execution_time/test_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2889,6 +2889,31 @@ def mock_upload_to_remote(process_log, ti):
assert connection_available["conn_uri"] is not None, "Connection URI was None during upload"


def test_log_upload_failures_are_non_fatal(mocker):
proc = ActivitySubprocess(
process_log=mocker.MagicMock(),
id=TI_ID,
pid=12345,
stdin=mocker.MagicMock(),
client=mocker.MagicMock(),
process=mocker.MagicMock(),
)
proc.ti = mocker.MagicMock()

mocker.patch(
"airflow.sdk.execution_time.supervisor._remote_logging_conn",
side_effect=RuntimeError("upload failed"),
)

proc._upload_logs()

proc.process_log.exception.assert_called_once_with(
"Failed to upload remote logs",
ti_id=TI_ID,
pid=12345,
)


def test_remote_logging_conn_sets_process_context(monkeypatch, mocker):
"""
Test that _remote_logging_conn sets _AIRFLOW_PROCESS_CONTEXT=client.
Expand Down
Loading