Skip to content

Commit 78b9d53

Browse files
authored
Merge branch 'main' into feat/databricks-sql-query-tags
2 parents cd0ae67 + 120dbed commit 78b9d53

79 files changed

Lines changed: 2426 additions & 201 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/skills/airflow-translations/locales/de.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ that **must be used consistently**:
7474
| Dag Processor | Dag Prozessor | |
7575
| Heartbeat | Lebenszeichen | e.g., "Letztes Lebenszeichen" |
7676
| Upstream / Downstream | Vorgelagert / Nachgelagert | |
77+
| Deadline | Frist | |
7778

7879
## 3. Task/Run States
7980

airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ def create_backfill_dry_run(
291291
to_date=to_date,
292292
reverse=body.run_backwards,
293293
reprocess_behavior=body.reprocess_behavior,
294+
dag_run_conf=body.dag_run_conf,
294295
session=session,
295296
)
296297
backfills = [

airflow-core/src/airflow/api_fastapi/execution_api/datamodels/variable.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,10 @@ class VariablePostBody(StrictBaseModel):
3434

3535
value: str | None = Field(alias="val")
3636
description: str | None = Field(default=None)
37+
38+
39+
class VariableKeysResponse(StrictBaseModel):
40+
"""Variable keys schema for list responses."""
41+
42+
keys: list[str]
43+
total_entries: int

airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@
2020
import logging
2121
from typing import Annotated
2222

23-
from fastapi import APIRouter, Depends, HTTPException, Path, Request, status
23+
from fastapi import APIRouter, Depends, HTTPException, Path, Query, Request, status
24+
from sqlalchemy import func, select
2425

26+
from airflow.api_fastapi.common.db.common import SessionDep
2527
from airflow.api_fastapi.execution_api.datamodels.variable import (
28+
VariableKeysResponse,
2629
VariablePostBody,
2730
VariableResponse,
2831
)
@@ -54,17 +57,55 @@ async def has_variable_access(
5457
return True
5558

5659

57-
router = APIRouter(
58-
responses={status.HTTP_404_NOT_FOUND: {"description": "Variable not found"}},
59-
dependencies=[Depends(has_variable_access)],
60-
)
60+
router = APIRouter()
6161

6262
log = logging.getLogger(__name__)
6363

6464

65+
# /keys must be declared before /{variable_key:path} so the static path is
66+
# matched first; otherwise the catch-all path param would swallow it.
67+
# has_variable_access is applied per-route below (not at router level) because
68+
# it requires a variable_key path parameter that /keys does not have.
69+
@router.get(
70+
"/keys",
71+
responses={
72+
status.HTTP_401_UNAUTHORIZED: {"description": "Unauthorized"},
73+
},
74+
)
75+
def get_variable_keys(
76+
session: SessionDep,
77+
team_name: Annotated[str | None, Depends(get_team_name_dep)] = None,
78+
prefix: Annotated[str | None, Query()] = None,
79+
limit: Annotated[int, Query(ge=1, le=10_000)] = 1000,
80+
offset: Annotated[int, Query(ge=0)] = 0,
81+
) -> VariableKeysResponse:
82+
"""
83+
Get Airflow Variable keys, optionally filtered by prefix.
84+
85+
.. note::
86+
This endpoint deliberately bypasses the per-variable ``has_variable_access``
87+
check, since access scoping requires a specific variable key. Any authenticated
88+
task within a team can therefore enumerate every variable key in that team —
89+
including keys for variables it would not be allowed to read. This is consistent
90+
with Airflow's security model (workers within a deployment trust each other),
91+
but the asymmetry between key enumeration and value access is intentional.
92+
"""
93+
stmt = select(Variable.key).order_by(Variable.key)
94+
if prefix is not None:
95+
stmt = stmt.where(Variable.key.startswith(prefix, autoescape=True))
96+
if team_name is not None:
97+
stmt = stmt.where(Variable.team_name == team_name)
98+
99+
total_entries = session.scalar(select(func.count()).select_from(stmt.subquery())) or 0
100+
keys = session.scalars(stmt.offset(offset).limit(limit)).all()
101+
return VariableKeysResponse(keys=list(keys), total_entries=total_entries)
102+
103+
65104
@router.get(
66105
"/{variable_key:path}",
106+
dependencies=[Depends(has_variable_access)],
67107
responses={
108+
status.HTTP_404_NOT_FOUND: {"description": "Variable not found"},
68109
status.HTTP_401_UNAUTHORIZED: {"description": "Unauthorized"},
69110
status.HTTP_403_FORBIDDEN: {"description": "Task does not have access to the variable"},
70111
},
@@ -90,8 +131,10 @@ def get_variable(
90131

91132
@router.put(
92133
"/{variable_key:path}",
134+
dependencies=[Depends(has_variable_access)],
93135
status_code=status.HTTP_201_CREATED,
94136
responses={
137+
status.HTTP_404_NOT_FOUND: {"description": "Variable not found"},
95138
status.HTTP_401_UNAUTHORIZED: {"description": "Unauthorized"},
96139
status.HTTP_403_FORBIDDEN: {"description": "Task does not have access to the variable"},
97140
},
@@ -108,8 +151,10 @@ def put_variable(
108151

109152
@router.delete(
110153
"/{variable_key:path}",
154+
dependencies=[Depends(has_variable_access)],
111155
status_code=status.HTTP_204_NO_CONTENT,
112156
responses={
157+
status.HTTP_404_NOT_FOUND: {"description": "Variable not found"},
113158
status.HTTP_401_UNAUTHORIZED: {"description": "Unauthorized"},
114159
status.HTTP_403_FORBIDDEN: {"description": "Task does not have access to the variable"},
115160
},

airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,11 @@
4646
AddStateEndpoints,
4747
AddTeamNameField,
4848
)
49+
from airflow.api_fastapi.execution_api.versions.v2026_06_30 import AddVariableKeysEndpoint
4950

5051
bundle = VersionBundle(
5152
HeadVersion(),
53+
Version("2026-06-30", AddVariableKeysEndpoint),
5254
Version(
5355
"2026-06-16",
5456
AddRetryPolicyFields,
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from __future__ import annotations
19+
20+
from cadwyn import VersionChange, endpoint
21+
22+
23+
class AddVariableKeysEndpoint(VersionChange):
24+
"""Add GET /variables/keys endpoint for listing variable keys with optional prefix filter."""
25+
26+
description = __doc__
27+
28+
instructions_to_migrate_to_previous_version = (endpoint("/variables/keys", ["GET"]).didnt_exist,)

airflow-core/src/airflow/dag_processing/processor.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
GetTaskStates,
5151
GetTICount,
5252
GetVariable,
53+
GetVariableKeys,
5354
GetXCom,
5455
GetXComCount,
5556
GetXComSequenceItem,
@@ -61,6 +62,7 @@
6162
PrevSuccessfulDagRunResult,
6263
PutVariable,
6364
TaskStatesResult,
65+
VariableKeysResult,
6466
VariableResult,
6567
XComCountResponse,
6668
XComResult,
@@ -128,6 +130,7 @@ class DagFileParsingResult(BaseModel):
128130
DagFileParsingResult
129131
| GetConnection
130132
| GetVariable
133+
| GetVariableKeys
131134
| PutVariable
132135
| GetTaskStates
133136
| GetTICount
@@ -147,6 +150,7 @@ class DagFileParsingResult(BaseModel):
147150
DagFileParseRequest
148151
| ConnectionResult
149152
| VariableResult
153+
| VariableKeysResult
150154
| TaskStatesResult
151155
| PreviousDagRunResult
152156
| PreviousTIResult
@@ -628,6 +632,10 @@ def _handle_request(self, msg: ToManager, log: FilteringBoundLogger, req_id: int
628632
dump_opts = {"exclude_unset": True}
629633
else:
630634
resp = var
635+
elif isinstance(msg, GetVariableKeys):
636+
from airflow.sdk.execution_time.request_handlers import handle_get_variable_keys
637+
638+
resp, dump_opts = handle_get_variable_keys(self.client, msg)
631639
elif isinstance(msg, PutVariable):
632640
self.client.variables.set(msg.key, msg.value, msg.description)
633641
elif isinstance(msg, DeleteVariable):

airflow-core/src/airflow/executors/base_executor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,15 +297,15 @@ def _get_workloads_to_schedule(self, open_slots: int) -> list[tuple[WorkloadKey,
297297

298298
return workloads_to_schedule
299299

300-
def _process_workloads(self, workloads: Sequence[ExecutorWorkload]) -> None:
300+
def _process_workloads(self, workload_items: Sequence[ExecutorWorkload]) -> None:
301301
"""
302302
Process the given workloads.
303303
304304
This method must be implemented by subclasses to define how they handle
305305
the execution of workloads (e.g., queuing them to workers, submitting to
306306
external systems, etc.).
307307
308-
:param workloads: List of workloads to process
308+
:param workload_items: List of workloads to process
309309
"""
310310
raise NotImplementedError(f"{type(self).__name__} must implement _process_workloads()")
311311

airflow-core/src/airflow/jobs/triggerer_job_runner.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
GetTaskStates,
7575
GetTICount,
7676
GetVariable,
77+
GetVariableKeys,
7778
GetXCom,
7879
MaskSecret,
7980
OKResponse,
@@ -82,6 +83,7 @@
8283
TaskStatesResult,
8384
TICount,
8485
UpdateHITLDetail,
86+
VariableKeysResult,
8587
VariableResult,
8688
XComResult,
8789
_new_encoder,
@@ -90,6 +92,7 @@
9092
from airflow.sdk.execution_time.request_handlers import (
9193
handle_get_connection,
9294
handle_get_variable,
95+
handle_get_variable_keys,
9396
handle_mask_secret,
9497
)
9598
from airflow.sdk.execution_time.supervisor import WatchedSubprocess, make_buffered_socket_reader
@@ -302,6 +305,7 @@ def from_api_response(cls, response: HITLDetailResponse) -> HITLDetailResponseRe
302305
| messages.TriggerStateSync
303306
| ConnectionResult
304307
| VariableResult
308+
| VariableKeysResult
305309
| XComResult
306310
| DagRunStateResult
307311
| DRCount
@@ -323,6 +327,7 @@ def from_api_response(cls, response: HITLDetailResponse) -> HITLDetailResponseRe
323327
| GetConnection
324328
| DeleteVariable
325329
| GetVariable
330+
| GetVariableKeys
326331
| PutVariable
327332
| DeleteXCom
328333
| GetXCom
@@ -534,6 +539,8 @@ def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger, r
534539
resp = self.client.variables.delete(msg.key)
535540
elif isinstance(msg, GetVariable):
536541
resp, dump_opts = handle_get_variable(self.client, msg)
542+
elif isinstance(msg, GetVariableKeys):
543+
resp, dump_opts = handle_get_variable_keys(self.client, msg)
537544
elif isinstance(msg, PutVariable):
538545
self.client.variables.set(msg.key, msg.value, msg.description)
539546
elif isinstance(msg, DeleteXCom):

airflow-core/src/airflow/models/backfill.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ def _do_dry_run(
295295
reverse: bool,
296296
reprocess_behavior: ReprocessBehavior,
297297
session: Session,
298+
dag_run_conf: dict | None = None,
298299
) -> Iterable[DagRunInfo]:
299300
from airflow.models.serialized_dag import SerializedDagModel
300301

@@ -310,7 +311,7 @@ def _do_dry_run(
310311
if dag.allowed_run_types is not None and DagRunType.BACKFILL_JOB not in dag.allowed_run_types:
311312
raise DagRunTypeNotAllowed(f"Dag with dag_id: '{dag_id}' does not allow backfill runs")
312313

313-
_validate_backfill_params(dag, reverse, from_date, to_date, reprocess_behavior)
314+
_validate_backfill_params(dag, reverse, from_date, to_date, reprocess_behavior, dag_run_conf)
314315

315316
dagrun_info_list = _get_info_list(
316317
dag=dag,

0 commit comments

Comments
 (0)