Fix execution timeout enforcement in task supervisor (#57174)#59657
Fix execution timeout enforcement in task supervisor (#57174)#59657qwe-kev wants to merge 7 commits intoapache:mainfrom
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
|
|
This looks really good - can you please add test cases covering the timeout handling in supervisor ? |
|
Added a comprehensive test suite for timeout handling as requested. All tests are passing. Test Coverage:
|
|
Hi, I wanted to follow up on this contribution. Is there anything I can do to help move this forward or any additional information needed? |
Turning that PR green might be a good start. |
|
Hi, I wanted to follow up on this contribution. |
|
@qwe-kev can you help fixing the CI and resolving merge conflicts on this one? We could take a look once that's done. |
|
@qwe-kev This PR has been converted to draft because it does not yet meet our Pull Request quality criteria. Issues found:
What to do next:
Converting a PR to draft is not a rejection — it is an invitation to bring the PR up to the project's standards so that maintainer review time is spent productively. There is no rush — take your time and work at your own pace. We appreciate your contribution and are happy to wait for updates. If you have questions, feel free to ask on the Airflow Slack. |
closes: apache#53337 related: apache#57174 This PR implements proper execution timeout handling for Airflow 3.0 by moving timeout enforcement from the task process to the supervisor process. Previously, execution_timeout was handled inside the task process using a timeout decorator. This approach failed when: - Task process encountered SIGSEGV or other signals (apache#57174) - Native code ran in tight loops without handling Python signals - Process was killed before timeout could be enforced Changes: - Added TaskExecutionTimeout message for worker-to-supervisor communication - Supervisor monitors execution time and enforces timeout with SIGTERM/SIGKILL - Removed in-process timeout decorator from task execution - Timeout measurement starts after DAG parsing (excludes startup overhead) Implementation: 1. Worker sends timeout_seconds to supervisor after DAG parsing 2. Supervisor tracks elapsed time using monotonic clock 3. On timeout: sends SIGTERM, then SIGKILL after 5-second grace period This ensures reliable timeout enforcement at the supervisor level, preventing runaway tasks even when the task process encounters errors.
- Add 9 unit tests for timeout handling logic - Test SIGTERM/SIGKILL escalation behavior - Test grace period enforcement - Test monotonic clock usage - Test message serialization - Add 4 integration tests with real subprocesses - Test actual timeout enforcement - Test SIGKILL escalation when task ignores SIGTERM - Test tasks completing before/without timeout - Add client_with_ti_start fixture for mocking API client - Tests account for MIN_HEARTBEAT_INTERVAL in timing assertions All 13 tests passing. No existing tests broken.
defined on BaseOperator. Addresses review feedback
- Changed falsy checks to 'is None' checks in supervisor.py to handle edge case where timeout values could be 0.0 - Added validation in task_runner.py to only send positive timeouts - Prevents 0.0 (falsy) from being incorrectly treated as None Addresses reviewer feedback
- Add test coverage for TaskExecutionTimeout message in test_supervisor.py - Remove deprecated test_run_task_timeout and test_execution_timeout from test_task_runner.py (timeout now handled by supervisor, covered by TestExecutionTimeoutIntegration tests) - Remove unused imports (AirflowTaskTimeout, _execute_task, time) - Set expected_body=None for TaskExecutionTimeout as it's a one-way message All tests now pass (689 passed).
6081148 to
bee59e7
Compare
closes: #53337
related: #57174
This PR implements proper execution timeout handling for Airflow 3.0 by moving timeout enforcement from the task process to the supervisor process.
Previously, execution_timeout was handled inside the task process using a timeout decorator. This approach failed when:
Changes
Implementation
This ensures reliable timeout enforcement at the supervisor level, preventing runaway tasks even when the task process encounters errors.