Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions task-sdk/src/airflow/sdk/execution_time/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1164,10 +1164,12 @@ def final_state(self):
if self._exit_code != 0 and self._terminal_state == SERVER_TERMINATED:
return SERVER_TERMINATED

# Any negative exit code indicates a signal kill
# We consider all signal kills as potentially retryable
# since they're often transient issues that could succeed on retry
if self._exit_code < 0 and self._should_retry:
# Any non zero exit code indicates a failure
# If retries are configured, mark as UP_FOR_RETRY
# Negative exit codes indicate signal kills (often transient)
# Positive exit codes can also be transient failures like network issues in a task communicating to
# external services
if self._exit_code != 0 and self._should_retry:
return TaskInstanceState.UP_FOR_RETRY

return TaskInstanceState.FAILED
Expand Down
21 changes: 18 additions & 3 deletions task-sdk/tests/task_sdk/execution_time/test_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2433,7 +2433,7 @@ def mock_upload_to_remote(process_log, ti):


class TestSignalRetryLogic:
"""Test signal based retry logic in ActivitySubprocess."""
"""Test retry logic for exit codes (signals and non-signal failures) in ActivitySubprocess."""

@pytest.mark.parametrize(
"signal",
Expand Down Expand Up @@ -2486,8 +2486,8 @@ def test_signals_without_retry_always_fail(self, mocker, signal):
result = mock_watched_subprocess.final_state
assert result == TaskInstanceState.FAILED

def test_non_signal_exit_code_goes_to_failed(self, mocker):
"""Test that non signal exit codes go to failed regardless of task retries."""
def test_non_signal_exit_code_with_retry_goes_to_up_for_retry(self, mocker):
"""Test that non-signal exit codes with retries enabled go to UP_FOR_RETRY."""
mock_watched_subprocess = ActivitySubprocess(
process_log=mocker.MagicMock(),
id=TI_ID,
Expand All @@ -2499,6 +2499,21 @@ def test_non_signal_exit_code_goes_to_failed(self, mocker):
mock_watched_subprocess._exit_code = 1
mock_watched_subprocess._should_retry = True

assert mock_watched_subprocess.final_state == TaskInstanceState.UP_FOR_RETRY

def test_non_signal_exit_code_without_retry_goes_to_failed(self, mocker):
"""Test that non-signal exit codes without retries enabled go to FAILED."""
mock_watched_subprocess = ActivitySubprocess(
process_log=mocker.MagicMock(),
id=TI_ID,
pid=12345,
stdin=mocker.Mock(),
process=mocker.Mock(),
client=mocker.Mock(),
)
mock_watched_subprocess._exit_code = 1
mock_watched_subprocess._should_retry = False

assert mock_watched_subprocess.final_state == TaskInstanceState.FAILED


Expand Down