Skip to content

Add async connection testing via workers for security isolation#62343

Open
anishgirianish wants to merge 41 commits intoapache:mainfrom
anishgirianish:async-connection-test-worker
Open

Add async connection testing via workers for security isolation#62343
anishgirianish wants to merge 41 commits intoapache:mainfrom
anishgirianish:async-connection-test-worker

Conversation

@anishgirianish
Copy link
Copy Markdown
Contributor

@anishgirianish anishgirianish commented Feb 23, 2026


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Summary

Follows the direction proposed by @potiuk in #59643 to move connection testing off the API server and onto workers.

Connection testing has been disabled by default since Airflow 2.7.0 because executing user-supplied driver code (ODBC/JDBC) on the API server poses security risks, and workers typically have network access to external systems that API servers don't.

This moves the whole thing onto workers. A dedicated TestConnection workload goes through the scheduler, gets dispatched to a supporting executor, and the worker runs test_connection()` with a proper timeout. Results come back through the Execution API. Design was discussed on dev@ : "[DISCUSS] Move connection testing to workers" (Feb 2026).

Demo

breeze-e2e-rundown-compressed.mp4

Overview

  • Dedicated workload type : not piggybacking on ExecuteCallback, so connection tests never compete with correctness-critical callbacks
  • Scheduler dispatch + reaper: PENDING tests get dispatched to a supporting executor, capped by max_connection_test_concurrency (default 4). A reaper catches stuck tests after timeout + grace period
  • Worker-side timeout : signal.alarm enforcement in LocalExecutor, results reported back via Execution API
  • Request Buffer Mechanism: <TODO:add details>
  • Queue parameter: optional queue field on the API, wired through to scheduler dispatch.
  • Fail-fast: supports_connection_test flag on BaseExecutor, immediate FAILED if no executor supports it

Config

  • [scheduler] connection_test_timeout: worker timeout, default 60s
  • [scheduler] max_connection_test_concurrency: dispatch budget, default 4
  • [scheduler] connection_test_reaper_interval: reaper frequency, default 30s

Not in this PR

  • UI changes (will create separate pr for this)

References


  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

Copy link
Copy Markdown
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! LGTM overall.

@anishgirianish
Copy link
Copy Markdown
Contributor Author

@jason810496 Thanks for the thorough review! Addressed your feedback in the latest push:

  • Removed result_status column — state is sufficient
  • Moved _ImportPathCallbackDef to connection_test.py with a create_callback() factory method

Could you please take another look when you get a chance? Thanks!

@anishgirianish anishgirianish force-pushed the async-connection-test-worker branch 2 times, most recently from 33392ec to 59d2c88 Compare February 24, 2026 21:31
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 42 out of 42 changed files in this pull request and generated 4 comments.

Comment on lines +95 to +105
__table_args__ = (
Index("idx_connection_test_request_state_created_at", state, created_at),
# since mysql lacks filtered/partial indices, this creates a
# duplicate index on mysql. Not the end of the world
Index(
"idx_connection_test_request_active_conn",
"connection_id",
unique=True,
postgresql_where=text("state IN ('pending', 'queued', 'running')"),
sqlite_where=text("state IN ('pending', 'queued', 'running')"),
),
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idx_connection_test_request_active_conn index is declared unique=True with postgresql_where/sqlite_where. On MySQL (which doesn't support partial indexes), this will compile as an unconditional UNIQUE index on connection_id, preventing any subsequent connection tests for the same connection_id even after earlier tests reach terminal states. Consider making this non-unique on MySQL (conditional DDL in the migration) or using a MySQL-compatible pattern (e.g., generated/nullable “active_connection_id” column with a unique index) to enforce uniqueness only for active states.

Copilot uses AI. Check for mistakes.
Comment on lines +72 to +83
# since mysql lacks filtered/partial indices, this creates a
# duplicate index on mysql
op.create_index(
op.f("idx_connection_test_request_active_conn"),
"connection_test_request",
["connection_id"],
unique=True,
postgresql_where=sa.text("state IN ('pending', 'queued', 'running')"),
sqlite_where=sa.text("state IN ('pending', 'queued', 'running')"),
)


Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This migration creates idx_connection_test_request_active_conn as unique=True with only postgresql_where/sqlite_where. On MySQL, that becomes an unconditional UNIQUE index on connection_id, which will block creating a second test request for the same connection after the first one completes. Please adjust the migration to avoid an unconditional unique index on MySQL (dialect-conditional index creation or a MySQL-compatible partial-uniqueness workaround).

Suggested change
# since mysql lacks filtered/partial indices, this creates a
# duplicate index on mysql
op.create_index(
op.f("idx_connection_test_request_active_conn"),
"connection_test_request",
["connection_id"],
unique=True,
postgresql_where=sa.text("state IN ('pending', 'queued', 'running')"),
sqlite_where=sa.text("state IN ('pending', 'queued', 'running')"),
)
dialect_name = op.get_bind().dialect.name
if dialect_name in {"postgresql", "sqlite"}:
op.create_index(
op.f("idx_connection_test_request_active_conn"),
"connection_test_request",
["connection_id"],
unique=True,
postgresql_where=sa.text("state IN ('pending', 'queued', 'running')"),
sqlite_where=sa.text("state IN ('pending', 'queued', 'running')"),
)
else:
# MySQL does not support filtered/partial indexes. Create a plain
# non-unique index instead of an unconditional unique index.
op.create_index(
op.f("idx_connection_test_request_active_conn"),
"connection_test_request",
["connection_id"],
unique=False,
)

Copilot uses AI. Check for mistakes.
ct.commit_to_connection_table(session=session)
session.flush()

from sqlalchemy import select
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid imports inside test functions; move from sqlalchemy import select to the module import section so the test file has a consistent import style and avoids per-test import side effects.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:airflow-ctl area:API Airflow's REST/HTTP API area:db-migrations PRs with DB migration area:task-sdk area:UI Related to UI/UX. For Frontend Developers. kind:documentation ready for maintainer review Set after triaging when all criteria pass.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants