Apache Airflow Provider(s)
core (triggerer)
Versions of Apache Airflow Providers
Observed in Apache Airflow 3.1.8 (Astronomer Runtime 3.1-14).
Apache Airflow version
3.1.8
Operating System
Debian GNU/Linux 12 (bookworm)
Deployment
Astronomer (managed)
Deployment details
Managed Astronomer deployment with triggerer enabled, using deferrable operators including CloudDataTransferServiceS3ToGCSOperator (Google Cloud Storage Transfer Service) and BigQuery deferrable operators. Single triggerer replica, DEFAULT_CAPACITY=1000, 0.5 vCPU / 1.92 GiB memory.
What happened
The triggerer's TriggerRunner subprocess crashes with RuntimeError: Response read out of order! Got frame.id=N, expect_id=N+1 raised from TriggerCommsDecoder._aget_response. When this exception propagates up through TriggerRunner.arun() → sync_state_to_supervisor(), it kills the entire TriggerRunner subprocess rather than just the individual failing trigger. After this fatal crash, the triggerer pod remains alive (its HTTP server continues serving requests) but the TriggerRunner subprocess does not restart, leaving all deferred tasks stuck indefinitely.
The error was observed repeatedly — at least a dozen times over an 11-hour window — across multiple TriggerRunner subprocess restarts. The final fatal crash propagated to arun() itself and the subprocess did not recover:
2026-04-02T05:43:00.310571Z [error] Trigger runner failed [airflow.jobs.triggerer_job_runner]
RuntimeError: Response read out of order! Got frame.id=18314, expect_id=18315
File triggerer_job_runner.py:880, in arun
File triggerer_job_runner.py:1090, in sync_state_to_supervisor
File triggerer_job_runner.py:1101, in asend
File triggerer_job_runner.py:801, in asend
File triggerer_job_runner.py:791, in _aget_response
An earlier instance at 02:37:35 UTC shows the full call chain, originating from a BigQuery trigger calling a synchronous method from within an async context:
RuntimeError: Response read out of order! Got frame.id=25902, expect_id=25903
task: <Task finished name='kingpd-dimensions/manual__2026-04-01T00:00:00+00:00/d_kingpd_flavour_f_act_sum.insert-step-1/-1/1 (ID 158353)'
exception=RuntimeError('Response read out of order! Got frame.id=25902, expect_id=25903')>
Full traceback:
greenback/_impl.py:116 greenback_shim
greenback/_impl.py:201 _greenback_shim
greenback/_impl.py:81 trampoline
outcome/_impl.py:185 send
triggerer_job_runner.py:1152 run_trigger
providers/google/cloud/triggers/bigquery.py:199 run
providers/google/cloud/triggers/bigquery.py:157 safe_to_cancel
providers/google/cloud/triggers/bigquery.py:131 get_task_state
asgiref/sync.py:439 __call__ ← sync_to_async wrapping a sync fn
greenback/_impl.py:210 _greenback_shim
concurrent/futures/thread.py:59 run ← runs in thread pool
asgiref/sync.py:491 thread_handler
sdk/execution_time/task_runner.py:514 get_task_states
triggerer_job_runner.py:772 send ← TriggerCommsDecoder.send()
asgiref/sync.py:262 __call__ ← async_to_sync()
concurrent/futures/_base.py:449 result
concurrent/futures/_base.py:401 __get_result
asgiref/sync.py:300 main_wrap
triggerer_job_runner.py:801 asend
triggerer_job_runner.py:791 _aget_response ← frame.id mismatch raised here
After the fatal 05:43 crash, the triggerer pod served only HTTP 404 Not Found responses for trigger log requests — confirming no triggers were being executed — for the remainder of the observation window (07:30–07:53+ UTC).
Root Cause Analysis
The bug is a thread-safety violation in TriggerCommsDecoder.asend().
# triggerer_job_runner.py
async def asend(self, msg: ToTriggerSupervisor) -> ToTriggerRunner | None:
frame = _RequestFrame(id=next(self.id_counter), body=msg.model_dump())
bytes = frame.as_bytes()
async with self._async_lock: # ← asyncio.Lock: only safe within ONE event loop
self._async_writer.write(bytes)
return await self._aget_response(frame.id)
TriggerCommsDecoder.send() calls async_to_sync(self.asend)(msg). When async_to_sync is called from a thread (e.g., from asgiref.sync_to_async running a synchronous method of a trigger), it spins up a new asyncio event loop in that thread. The asyncio.Lock (self._async_lock) is bound to a single event loop and provides no mutual exclusion across threads — it only serializes coroutines within the same event loop.
When two concurrent callers invoke TriggerCommsDecoder.send():
- Caller A:
sync_state_to_supervisor() from the main TriggerRunner.arun() event loop
- Caller B: a BigQuery trigger's
get_task_state() via sync_to_async → thread → async_to_sync(asend) on a separate event loop
Both callers write their request frames and then await the response for their own frame ID. Because the writes and reads are not mutually exclusive across threads, Caller A reads the response intended for Caller B (frame IDs arrive out of order), raising RuntimeError: Response read out of order! Got frame.id=N, expect_id=N+1.
The specific trigger type that initiates the cross-thread send is any trigger that:
- calls a synchronous method from its
async def run() loop (via asgiref.sync_to_async or greenback)
- and that synchronous method calls
TriggerCommsDecoder.send() (e.g., via task_runner.get_task_states)
In the observed incidents, BigQueryTableExistenceTrigger calling safe_to_cancel → get_task_state → task_runner.get_task_states is the initiating trigger. But the victim can be any trigger running concurrently in the same TriggerRunner subprocess.
What you think should happen instead
TriggerCommsDecoder.asend() should be safe to call from multiple threads simultaneously. The asyncio.Lock should be replaced with a threading.Lock (or a threading.RLock) that provides mutual exclusion across threads, not just within a single event loop. Alternatively, the communication channel could be restructured so that cross-thread sends use a different mechanism (e.g., asyncio.run_coroutine_threadsafe with the parent event loop rather than async_to_sync).
A minimal fix would be:
async def asend(self, msg: ToTriggerSupervisor) -> ToTriggerRunner | None:
frame = _RequestFrame(id=next(self.id_counter), body=msg.model_dump())
bytes = frame.as_bytes()
with self._thread_lock: # threading.Lock — cross-thread safe
self._async_writer.write(bytes)
return await self._aget_response(frame.id)
where self._thread_lock = threading.Lock() is added alongside the existing asyncio.Lock.
Note: self.id_counter (a itertools.count) is also shared across threads; it should be verified as thread-safe (it is in CPython due to the GIL, but worth noting).
How to reproduce
- Run a deployment with multiple deferrable operators active concurrently, including at least one trigger type that calls
task_runner.get_task_states (or any synchronous SDK method) from inside async def run() via sync_to_async or greenback.
- Set
AIRFLOW__TRIGGERER__DEFAULT_CAPACITY high (e.g., 1000) to maximize concurrency.
- Observe
RuntimeError: Response read out of order! in triggerer logs when the race condition fires.
- Observe
Trigger runner failed with the same error logged from arun when it propagates to the main loop.
- After the fatal crash, all deferred tasks remain stuck in
DEFERRED state indefinitely; the triggerer pod is alive but serves only 404 Not Found for trigger log requests.
Relationship to existing issues
This is related to but distinct from #64213, which covers a different RuntimeError from the same TriggerCommsDecoder.send() path (Task got Future attached to a different loop). Both issues share the same root cause (thread-unsafe asyncio.Lock in asend), but produce different error messages depending on which async/thread boundary is crossed first.
Anything else
- The error repeats across multiple
TriggerRunner subprocess restarts (observed ~12 times in 11 hours before the fatal crash).
- The fatal variant (
Trigger runner failed propagating through arun) is more severe than non-fatal variants: the TriggerRunner subprocess does not restart, requiring a full triggerer pod restart to recover.
AIRFLOW__TRIGGERER__DEFAULT_CAPACITY=1000 on a 0.5 vCPU pod was a contributing factor — higher concurrency increases the probability of the race condition.
- Confirmed on Airflow 3.1.8 / Python 3.12 /
asgiref 3.x / greenback installed.
Apache Airflow Provider(s)
core (triggerer)
Versions of Apache Airflow Providers
Observed in Apache Airflow 3.1.8 (Astronomer Runtime 3.1-14).
Apache Airflow version
3.1.8
Operating System
Debian GNU/Linux 12 (bookworm)
Deployment
Astronomer (managed)
Deployment details
Managed Astronomer deployment with triggerer enabled, using deferrable operators including
CloudDataTransferServiceS3ToGCSOperator(Google Cloud Storage Transfer Service) and BigQuery deferrable operators. Single triggerer replica,DEFAULT_CAPACITY=1000, 0.5 vCPU / 1.92 GiB memory.What happened
The triggerer's
TriggerRunnersubprocess crashes withRuntimeError: Response read out of order! Got frame.id=N, expect_id=N+1raised fromTriggerCommsDecoder._aget_response. When this exception propagates up throughTriggerRunner.arun()→sync_state_to_supervisor(), it kills the entireTriggerRunnersubprocess rather than just the individual failing trigger. After this fatal crash, the triggerer pod remains alive (its HTTP server continues serving requests) but theTriggerRunnersubprocess does not restart, leaving all deferred tasks stuck indefinitely.The error was observed repeatedly — at least a dozen times over an 11-hour window — across multiple
TriggerRunnersubprocess restarts. The final fatal crash propagated toarun()itself and the subprocess did not recover:An earlier instance at 02:37:35 UTC shows the full call chain, originating from a BigQuery trigger calling a synchronous method from within an async context:
After the fatal 05:43 crash, the triggerer pod served only HTTP
404 Not Foundresponses for trigger log requests — confirming no triggers were being executed — for the remainder of the observation window (07:30–07:53+ UTC).Root Cause Analysis
The bug is a thread-safety violation in
TriggerCommsDecoder.asend().TriggerCommsDecoder.send()callsasync_to_sync(self.asend)(msg). Whenasync_to_syncis called from a thread (e.g., fromasgiref.sync_to_asyncrunning a synchronous method of a trigger), it spins up a new asyncio event loop in that thread. Theasyncio.Lock(self._async_lock) is bound to a single event loop and provides no mutual exclusion across threads — it only serializes coroutines within the same event loop.When two concurrent callers invoke
TriggerCommsDecoder.send():sync_state_to_supervisor()from the mainTriggerRunner.arun()event loopget_task_state()viasync_to_async→ thread →async_to_sync(asend)on a separate event loopBoth callers write their request frames and then await the response for their own frame ID. Because the writes and reads are not mutually exclusive across threads, Caller A reads the response intended for Caller B (frame IDs arrive out of order), raising
RuntimeError: Response read out of order! Got frame.id=N, expect_id=N+1.The specific trigger type that initiates the cross-thread send is any trigger that:
async def run()loop (viaasgiref.sync_to_asyncorgreenback)TriggerCommsDecoder.send()(e.g., viatask_runner.get_task_states)In the observed incidents,
BigQueryTableExistenceTriggercallingsafe_to_cancel→get_task_state→task_runner.get_task_statesis the initiating trigger. But the victim can be any trigger running concurrently in the sameTriggerRunnersubprocess.What you think should happen instead
TriggerCommsDecoder.asend()should be safe to call from multiple threads simultaneously. Theasyncio.Lockshould be replaced with athreading.Lock(or athreading.RLock) that provides mutual exclusion across threads, not just within a single event loop. Alternatively, the communication channel could be restructured so that cross-thread sends use a different mechanism (e.g.,asyncio.run_coroutine_threadsafewith the parent event loop rather thanasync_to_sync).A minimal fix would be:
where
self._thread_lock = threading.Lock()is added alongside the existingasyncio.Lock.Note:
self.id_counter(aitertools.count) is also shared across threads; it should be verified as thread-safe (it is in CPython due to the GIL, but worth noting).How to reproduce
task_runner.get_task_states(or any synchronous SDK method) from insideasync def run()viasync_to_asyncorgreenback.AIRFLOW__TRIGGERER__DEFAULT_CAPACITYhigh (e.g., 1000) to maximize concurrency.RuntimeError: Response read out of order!in triggerer logs when the race condition fires.Trigger runner failedwith the same error logged fromarunwhen it propagates to the main loop.DEFERREDstate indefinitely; the triggerer pod is alive but serves only404 Not Foundfor trigger log requests.Relationship to existing issues
This is related to but distinct from #64213, which covers a different
RuntimeErrorfrom the sameTriggerCommsDecoder.send()path (Task got Future attached to a different loop). Both issues share the same root cause (thread-unsafeasyncio.Lockinasend), but produce different error messages depending on which async/thread boundary is crossed first.Anything else
TriggerRunnersubprocess restarts (observed ~12 times in 11 hours before the fatal crash).Trigger runner failedpropagating througharun) is more severe than non-fatal variants: theTriggerRunnersubprocess does not restart, requiring a full triggerer pod restart to recover.AIRFLOW__TRIGGERER__DEFAULT_CAPACITY=1000on a 0.5 vCPU pod was a contributing factor — higher concurrency increases the probability of the race condition.asgiref3.x /greenbackinstalled.