Skip to content

amazon provider: deferrable AWS hook in triggerer can lose connection when TriggerCommsDecoder.send() hits cross-loop RuntimeError #64213

@hkc-8010

Description

@hkc-8010

Apache Airflow Provider(s)

amazon

Versions of Apache Airflow Providers

I observed this in a managed Airflow 3.1.8 deployment (Astronomer Runtime 3.1-14).

I was not able to reliably extract the exact provider package version from the running managed image via kubectl exec, but the code path involved matches current providers/amazon source in main (currently apache-airflow-providers-amazon 9.24.0) and the runtime image definitely includes the Task SDK fallback added for triggerer connection access.

Apache Airflow version

3.1.8

Operating System

Debian GNU/Linux 12 (bookworm)

Deployment

Astronomer

Deployment details

Managed Astronomer deployment using triggerer + deferrable operators.

The failing operator path was a deferrable S3KeySensor / S3KeyTrigger using an AWS Airflow connection with extra containing role_arn and region_name.

The same connection works in non-deferrable/reschedule mode and also resolves correctly through the triggerer supervisor's in-process execution API client.

What happened

A deferrable S3KeySensor worked until it deferred. Once execution moved into the triggerer path, the AWS hook behaved as if the configured Airflow connection did not exist, logged:

Unable to find AWS Connection ID 'aws_quadrant', switching to empty.
No connection ID provided. Fallback on boto3 credential strategy (region_name=None). If you have boto3 credentials configured, they will be used directly.

and then failed against S3 with:

An error occurred (403) when calling the HeadObject operation: Forbidden

Important detail: the Airflow connection itself was valid. Worker-side execution could use it, and the triggerer supervisor's in-process API client could also fetch it successfully.

After tracing the triggerer-side code path, the failure appears to happen during sync connection lookup from the AWS hook.

The relevant path is:

  • S3KeyTrigger.run() -> await self.hook.get_async_conn()
  • AwsBaseHook.get_async_conn() wraps _get_async_conn() in sync_to_async(...)
  • _get_async_conn() calls self.get_client_type(...)
  • self.region_name / self.conn_config call self.get_connection(...)
  • Task SDK's ExecutionAPISecretsBackend.get_connection() uses SUPERVISOR_COMMS.send(GetConnection(...))
  • In triggerer context, SUPERVISOR_COMMS is a TriggerCommsDecoder

What I found is that TriggerCommsDecoder.send() can raise this runtime error when called from the worker thread created by sync_to_async:

RuntimeError: Task <Task pending name='Task-3' coro=<AsyncToSync.__call__.<locals>.new_loop_wrap() running at ...>> got Future <Future pending> attached to a different loop

ExecutionAPISecretsBackend.get_connection() currently special-cases only one RuntimeError message:

"You cannot use AsyncToSync in the same thread as an async event loop"

When it gets the different-loop error above, it falls through to the generic exception handler and returns None. That makes AwsBaseHook.conn_config think the connection is missing, which triggers the warning and empty-credentials fallback.

What you think should happen instead

Deferrable AWS hooks in triggerer context should resolve the same Airflow connection that non-deferrable execution resolves.

At minimum, the triggerer-side connection lookup should not silently degrade to None when TriggerCommsDecoder.send() fails with the different-loop RuntimeError shown above.

Instead, one of these should happen:

  1. The sync triggerer connection path should work correctly from the sync_to_async worker thread.
  2. The AWS hook / Task SDK path should use the async connection getter in triggerer contexts.
  3. The Task SDK fallback should handle this different-loop runtime error as well, rather than returning None and causing silent credential fallback.

How to reproduce

I have two reproductions.

End-user reproduction

  1. Create an AWS Airflow connection whose extra contains a role_arn and region_name.
  2. Use that connection in a deferrable S3KeySensor / S3KeyTrigger.
  3. Make sure the worker-side path can defer successfully.
  4. Once the triggerer resumes polling, the triggerer logs that it cannot find the AWS connection and falls back to empty boto3 credentials.
  5. S3 calls then fail with 403 Forbidden because the role assumption information from the Airflow connection was lost.

This did not happen when the same sensor was switched back to non-deferrable / reschedule mode.

Minimal triggerer-side code-path reproduction

In a triggerer-like context where:

  • task_runner.SUPERVISOR_COMMS is a TriggerCommsDecoder
  • the supervisor side can successfully answer GetConnection(conn_id) using the in-process execution API client
  • the AWS hook resolves its connection via the sync path from inside sync_to_async

calling:

hook = S3Hook(aws_conn_id="aws_quadrant")
role, region = await asyncio.to_thread(lambda: (hook.conn_config.role_arn, hook.conn_config.region_name))

can reproduce the customer-facing warning:

Unable to find AWS Connection ID 'aws_quadrant', switching to empty.

and yields:

role None
region None

while a lower-level repro of TriggerCommsDecoder.send(GetConnection(...)) from the worker thread surfaces:

RuntimeError: ... got Future <Future pending> attached to a different loop

Anything else

A few notes that may help narrow this down:

  • The triggerer supervisor's in-process execution API client can fetch the connection successfully.
  • The Airflow connection record itself is valid and includes the expected extra payload.
  • This does not look like a serialization issue in ConnectionResponse / ConnectionResult.
  • This also does not look like simply "missing Fix connection access in triggerer for deferrable operators #57154"; the runtime image I checked already included the Task SDK fallback for the AsyncToSync event-loop error. The problem is that the actual runtime error here is a different one (Future attached to a different loop), so the current special-case logic does not catch it.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions