Skip to content

Commit d843441

Browse files
Dev-iLAbhijeet Raj Singh
authored andcommitted
K8s: use joinable manager queues (apache#63789)
The executor already treats both queues as joinable queues. It calls: - task_done() - join() - flush logic that assumes task accounting is tracked A plain manager Queue() does not match that contract. On Python 3.14 this showed up in teardown/error paths as: `ValueError: task_done() called too many times`
1 parent 4bec7fb commit d843441

1 file changed

Lines changed: 2 additions & 2 deletions

File tree

providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,8 @@ def __init__(self, *args, **kwargs):
103103
self.parallelism = self.kube_config.parallelism
104104

105105
self._manager = multiprocessing.Manager()
106-
self.task_queue: Queue[KubernetesJob] = self._manager.Queue()
107-
self.result_queue: Queue[KubernetesResults] = self._manager.Queue()
106+
self.task_queue: Queue[KubernetesJob] = self._manager.JoinableQueue()
107+
self.result_queue: Queue[KubernetesResults] = self._manager.JoinableQueue()
108108
self.kube_scheduler: AirflowKubernetesScheduler | None = None
109109
self.kube_client: client.CoreV1Api | None = None
110110
self.scheduler_job_id: str | None = None

0 commit comments

Comments
 (0)