Add async connection testing via workers for security isolation#62343
Add async connection testing via workers for security isolation#62343anishgirianish wants to merge 41 commits intoapache:mainfrom
Conversation
39ba192 to
3efcd26
Compare
airflow-core/src/airflow/api_fastapi/core_api/routes/public/connections.py
Outdated
Show resolved
Hide resolved
airflow-core/src/airflow/api_fastapi/core_api/routes/public/connections.py
Show resolved
Hide resolved
airflow-core/src/airflow/api_fastapi/execution_api/routes/connection_tests.py
Show resolved
Hide resolved
|
@jason810496 Thanks for the thorough review! Addressed your feedback in the latest push:
Could you please take another look when you get a chance? Thanks! |
33392ec to
59d2c88
Compare
| __table_args__ = ( | ||
| Index("idx_connection_test_request_state_created_at", state, created_at), | ||
| # since mysql lacks filtered/partial indices, this creates a | ||
| # duplicate index on mysql. Not the end of the world | ||
| Index( | ||
| "idx_connection_test_request_active_conn", | ||
| "connection_id", | ||
| unique=True, | ||
| postgresql_where=text("state IN ('pending', 'queued', 'running')"), | ||
| sqlite_where=text("state IN ('pending', 'queued', 'running')"), | ||
| ), |
There was a problem hiding this comment.
The idx_connection_test_request_active_conn index is declared unique=True with postgresql_where/sqlite_where. On MySQL (which doesn't support partial indexes), this will compile as an unconditional UNIQUE index on connection_id, preventing any subsequent connection tests for the same connection_id even after earlier tests reach terminal states. Consider making this non-unique on MySQL (conditional DDL in the migration) or using a MySQL-compatible pattern (e.g., generated/nullable “active_connection_id” column with a unique index) to enforce uniqueness only for active states.
| # since mysql lacks filtered/partial indices, this creates a | ||
| # duplicate index on mysql | ||
| op.create_index( | ||
| op.f("idx_connection_test_request_active_conn"), | ||
| "connection_test_request", | ||
| ["connection_id"], | ||
| unique=True, | ||
| postgresql_where=sa.text("state IN ('pending', 'queued', 'running')"), | ||
| sqlite_where=sa.text("state IN ('pending', 'queued', 'running')"), | ||
| ) | ||
|
|
||
|
|
There was a problem hiding this comment.
This migration creates idx_connection_test_request_active_conn as unique=True with only postgresql_where/sqlite_where. On MySQL, that becomes an unconditional UNIQUE index on connection_id, which will block creating a second test request for the same connection after the first one completes. Please adjust the migration to avoid an unconditional unique index on MySQL (dialect-conditional index creation or a MySQL-compatible partial-uniqueness workaround).
| # since mysql lacks filtered/partial indices, this creates a | |
| # duplicate index on mysql | |
| op.create_index( | |
| op.f("idx_connection_test_request_active_conn"), | |
| "connection_test_request", | |
| ["connection_id"], | |
| unique=True, | |
| postgresql_where=sa.text("state IN ('pending', 'queued', 'running')"), | |
| sqlite_where=sa.text("state IN ('pending', 'queued', 'running')"), | |
| ) | |
| dialect_name = op.get_bind().dialect.name | |
| if dialect_name in {"postgresql", "sqlite"}: | |
| op.create_index( | |
| op.f("idx_connection_test_request_active_conn"), | |
| "connection_test_request", | |
| ["connection_id"], | |
| unique=True, | |
| postgresql_where=sa.text("state IN ('pending', 'queued', 'running')"), | |
| sqlite_where=sa.text("state IN ('pending', 'queued', 'running')"), | |
| ) | |
| else: | |
| # MySQL does not support filtered/partial indexes. Create a plain | |
| # non-unique index instead of an unconditional unique index. | |
| op.create_index( | |
| op.f("idx_connection_test_request_active_conn"), | |
| "connection_test_request", | |
| ["connection_id"], | |
| unique=False, | |
| ) |
| ct.commit_to_connection_table(session=session) | ||
| session.flush() | ||
|
|
||
| from sqlalchemy import select |
There was a problem hiding this comment.
Avoid imports inside test functions; move from sqlalchemy import select to the module import section so the test file has a consistent import style and avoids per-test import side effects.
Was generative AI tooling used to co-author this PR?
Summary
Follows the direction proposed by @potiuk in #59643 to move connection testing off the API server and onto workers.
Connection testing has been disabled by default since Airflow 2.7.0 because executing user-supplied driver code (ODBC/JDBC) on the API server poses security risks, and workers typically have network access to external systems that API servers don't.
This moves the whole thing onto workers. A dedicated
TestConnectionworkload goes through the scheduler, gets dispatched to a supporting executor, and the worker runs test_connection()` with a proper timeout. Results come back through the Execution API. Design was discussed on dev@ : "[DISCUSS] Move connection testing to workers" (Feb 2026).Demo
breeze-e2e-rundown-compressed.mp4
Overview
ExecuteCallback, so connection tests never compete with correctness-critical callbacksmax_connection_test_concurrency(default 4). A reaper catches stuck tests after timeout + grace periodsignal.alarmenforcement in LocalExecutor, results reported back via Execution APIqueuefield on the API, wired through to scheduler dispatch.supports_connection_testflag on BaseExecutor, immediate FAILED if no executor supports itConfig
[scheduler] connection_test_timeout: worker timeout, default 60s[scheduler] max_connection_test_concurrency: dispatch budget, default 4[scheduler] connection_test_reaper_interval: reaper frequency, default 30sNot in this PR
References
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.