Skip to content

Commit 2748607

Browse files
ephraimbuddyfweilun
authored andcommitted
Refactor bundle view_url to not instaniate bundle on server components (apache#52876)
* Refactor bundle view_url to not instaniate bundle on server components This refactor introduces a view_url_template in bundle configuration which is saved in the db and rendered when needed. The url is signed with [api]secret_key for security. Also the view_url_template is also checked for safety before saving to the DB * fixup! Refactor bundle view_url to not instaniate bundle on server components * Rename bundle url to url_template and fix backcompat * Refactor render_url to not depend on dagbundlemodel version * Render view_url_template in view_url * fixup! Render view_url_template in view_url * Add deprecation warning and update s3 bundle with view_url_template * fixup! Add deprecation warning and update s3 bundle with view_url_template * Remove deprecation warning in provider's view_url * fixup! Remove deprecation warning in provider's view_url * fixup! fixup! Remove deprecation warning in provider's view_url * Add backcompat for bundles vs airflow releases * fixup! Add backcompat for bundles vs airflow releases * rename url_template to signed_url_template. Also return None when we can't unsign a url * refactor template signing * fixup! refactor template signing * Fix templating and conflict * Fix backcompat & refactor template signing * fixup! Fix backcompat & refactor template signing * fixup! fixup! Fix backcompat & refactor template signing * skip some test if not airflow 3.1+ * fixup! skip some test if not airflow 3.1+ * fixup! fixup! skip some test if not airflow 3.1+ * Resolve conflict * Add version to be removed for deprecated view_url * Remove template_fields and use regex to extract placeholders * Fix conflict * fixup! Remove template_fields and use regex to extract placeholders * Remove added deadline in dag details * Update docs * Remove ; from url safety check * Log url sanity check errors * compile re
1 parent 131eb8b commit 2748607

20 files changed

Lines changed: 2498 additions & 1438 deletions

File tree

airflow-core/docs/administration-and-deployment/dag-bundles.rst

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,30 @@ For example, adding multiple dag bundles to your ``airflow.cfg`` file:
8383
The whitespace, particularly on the last line, is important so a multi-line value works properly. More details can be found in the
8484
the `configparser docs <https://docs.python.org/3/library/configparser.html#supported-ini-file-structure>`_.
8585

86+
If you want a view url different from the default provided by the dag bundle, you can change the url in the kwargs of the dag bundle configuration.
87+
For example, if you want to use a custom URL for the git dag bundle:
88+
89+
.. code-block:: ini
90+
91+
[dag_processor]
92+
dag_bundle_config_list = [
93+
{
94+
"name": "my_git_repo",
95+
"classpath": "airflow.dag_processing.bundles.git.GitDagBundle",
96+
"kwargs": {
97+
"tracking_ref": "main",
98+
"git_conn_id": "my_git_conn",
99+
"view_url_template": "https://my.custom.git.repo/view/{subdir}",
100+
}
101+
}
102+
]
103+
104+
Above, the ``view_url_template`` is set to a custom URL that will be used to view the Dags in the ``my_git_repo`` bundle. The ``{subdir}`` placeholder will be replaced
105+
with the ``subdir`` attribute of the bundle. The placeholders are attributes of the bundle. You cannot use any placeholder outside of the bundle's attributes.
106+
When you specify a custom URL, it overrides the default URL provided by the dag bundle.
107+
108+
The url is verified for safety, and if it is not safe, the view url for the bundle will be set to ``None``. This is to prevent any potential security issues with unsafe URLs.
109+
86110
You can also override the :ref:`config:dag_processor__refresh_interval` per dag bundle by passing it in kwargs.
87111
This controls how often the dag processor refreshes, or looks for new files, in the dag bundles.
88112

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
d2e81695973bf8b6b30e1f4543627547330ef531e50be633cf589fbdf639b0e8
1+
efbae2f1de68413e5a6f671a306e748581fe454b81e25eeb2927567f11ebd59c

airflow-core/docs/img/airflow_erd.svg

Lines changed: 1387 additions & 1379 deletions
Loading

airflow-core/docs/migrations-ref.rst

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
3939
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
4040
| Revision ID | Revises ID | Airflow Version | Description |
4141
+=========================+==================+===================+==============================================================+
42-
| ``f56f68b9e02f`` (head) | ``09fa89ba1710`` | ``3.1.0`` | Add callback_state to deadline. |
42+
| ``3bda03debd04`` (head) | ``f56f68b9e02f`` | ``3.1.0`` | Add url template and template params to DagBundleModel. |
43+
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
44+
| ``f56f68b9e02f`` | ``09fa89ba1710`` | ``3.1.0`` | Add callback_state to deadline. |
4345
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
4446
| ``09fa89ba1710`` | ``40f7c30a228b`` | ``3.1.0`` | Add trigger_id to deadline. |
4547
+-------------------------+------------------+-------------------+--------------------------------------------------------------+

airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from uuid import UUID
2121

2222
from pydantic import AliasPath, Field, computed_field
23+
from sqlalchemy import select
2324

2425
from airflow.api_fastapi.core_api.base import BaseModel
2526
from airflow.dag_processing.bundles.manager import DagBundlesManager
@@ -41,10 +42,23 @@ class DagVersionResponse(BaseModel):
4142
@property
4243
def bundle_url(self) -> str | None:
4344
if self.bundle_name:
44-
try:
45-
return DagBundlesManager().view_url(self.bundle_name, self.bundle_version)
46-
except ValueError:
47-
return None
45+
# Get the bundle model from the database and render the URL
46+
from airflow.models.dagbundle import DagBundleModel
47+
from airflow.utils.session import create_session
48+
49+
with create_session() as session:
50+
bundle_model = session.scalar(
51+
select(DagBundleModel).where(DagBundleModel.name == self.bundle_name)
52+
)
53+
54+
if bundle_model and hasattr(bundle_model, "signed_url_template"):
55+
return bundle_model.render_url(self.bundle_version)
56+
# fallback to the deprecated option if the bundle model does not have a signed_url_template
57+
# attribute
58+
try:
59+
return DagBundlesManager().view_url(self.bundle_name, self.bundle_version)
60+
except ValueError:
61+
return None
4862
return None
4963

5064

airflow-core/src/airflow/dag_processing/bundles/base.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import os
2323
import shutil
2424
import tempfile
25+
import warnings
2526
from abc import ABC, abstractmethod
2627
from contextlib import contextmanager
2728
from dataclasses import dataclass, field
@@ -35,7 +36,6 @@
3536
from sqlalchemy_utils.types.enriched_datetime.pendulum_datetime import pendulum
3637

3738
from airflow.configuration import conf
38-
from airflow.dag_processing.bundles.manager import DagBundlesManager
3939

4040
if TYPE_CHECKING:
4141
from pendulum import DateTime
@@ -217,7 +217,10 @@ def remove_stale_bundle_versions(self):
217217
This isn't really necessary on worker types that don't share storage
218218
with other processes.
219219
"""
220+
from airflow.dag_processing.bundles.manager import DagBundlesManager
221+
220222
log.info("checking for stale bundle versions locally")
223+
221224
bundles = list(DagBundlesManager().get_all_dag_bundles())
222225
for bundle in bundles:
223226
if not bundle.supports_versioning:
@@ -256,6 +259,7 @@ def __init__(
256259
name: str,
257260
refresh_interval: int = conf.getint("dag_processor", "refresh_interval"),
258261
version: str | None = None,
262+
view_url_template: str | None = None,
259263
) -> None:
260264
self.name = name
261265
self.version = version
@@ -268,6 +272,8 @@ def __init__(
268272
self.versions_dir = get_bundle_versions_base_folder(bundle_name=self.name)
269273
"""Where bundle versions are stored locally for this bundle."""
270274

275+
self._view_url_template = view_url_template
276+
271277
def initialize(self) -> None:
272278
"""
273279
Initialize the bundle.
@@ -316,10 +322,34 @@ def view_url(self, version: str | None = None) -> str | None:
316322
URL to view the bundle on an external website. This is shown to users in the Airflow UI, allowing them to navigate to this url for more details about that version of the bundle.
317323
318324
This needs to function without `initialize` being called.
319-
320325
:param version: Version to view
321326
:return: URL to view the bundle
322327
"""
328+
warnings.warn(
329+
"The 'view_url' method is deprecated and will be removed in a future version. "
330+
"Use 'view_url_template' instead.",
331+
DeprecationWarning,
332+
stacklevel=2,
333+
)
334+
return None
335+
336+
def view_url_template(self) -> str | None:
337+
"""
338+
URL template to view the bundle on an external website.
339+
340+
This is shown to users in the Airflow UI, allowing them to navigate to
341+
this url for more details about that version of the bundle.
342+
343+
The template should use format string placeholders like {version}, {subdir}, etc.
344+
Common placeholders:
345+
- {version}: The version identifier
346+
- {subdir}: The subdirectory within the bundle (if applicable)
347+
348+
This needs to function without `initialize` being called.
349+
350+
:return: URL template string or None if not applicable
351+
"""
352+
return self._view_url_template
323353

324354
@contextmanager
325355
def lock(self):

airflow-core/src/airflow/dag_processing/bundles/manager.py

Lines changed: 126 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
# under the License.
1717
from __future__ import annotations
1818

19+
import warnings
1920
from typing import TYPE_CHECKING
2021

22+
from itsdangerous import URLSafeSerializer
2123
from sqlalchemy import delete
2224

2325
from airflow.configuration import conf
@@ -81,6 +83,61 @@ def _add_example_dag_bundle(config_list):
8183
)
8284

8385

86+
def _is_safe_bundle_url(url: str) -> bool:
87+
"""
88+
Check if a bundle URL is safe to use.
89+
90+
This function validates that the URL:
91+
- Uses HTTP or HTTPS schemes (no JavaScript, data, or other schemes)
92+
- Is properly formatted
93+
- Doesn't contain malicious content
94+
"""
95+
import logging
96+
from urllib.parse import urlparse
97+
98+
logger = logging.getLogger(__name__)
99+
100+
if not url:
101+
return False
102+
103+
try:
104+
parsed = urlparse(url)
105+
if parsed.scheme not in {"http", "https"}:
106+
logger.error(
107+
"Bundle URL uses unsafe scheme '%s'. Only 'http' and 'https' are allowed", parsed.scheme
108+
)
109+
return False
110+
111+
if not parsed.netloc:
112+
logger.error("Bundle URL '%s' has no network location", url)
113+
return False
114+
115+
if any(ord(c) < 32 for c in url):
116+
logger.error("Bundle URL '%s' contains control characters (ASCII < 32)", url)
117+
return False
118+
119+
return True
120+
except Exception as e:
121+
logger.error("Failed to parse bundle URL '%s': %s", url, str(e))
122+
return False
123+
124+
125+
def _sign_bundle_url(url: str, bundle_name: str) -> str:
126+
"""
127+
Sign a bundle URL for integrity verification.
128+
129+
:param url: The URL to sign
130+
:param bundle_name: The name of the bundle (used in the payload)
131+
:return: The signed URL token
132+
"""
133+
serializer = URLSafeSerializer(conf.get_mandatory_value("core", "fernet_key"))
134+
payload = {
135+
"url": url,
136+
"bundle_name": bundle_name,
137+
}
138+
return serializer.dumps(payload)
139+
140+
84141
class DagBundlesManager(LoggingMixin):
85142
"""Manager for DAG bundles."""
86143

@@ -124,12 +181,44 @@ def parse_config(self) -> None:
124181
@provide_session
125182
def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None:
126183
self.log.debug("Syncing DAG bundles to the database")
184+
185+
def _extract_and_sign_template(bundle_name: str) -> tuple[str | None, dict]:
186+
bundle_instance = self.get_bundle(name)
187+
new_template_ = bundle_instance.view_url_template()
188+
new_params_ = self._extract_template_params(bundle_instance)
189+
if new_template_:
190+
if not _is_safe_bundle_url(new_template_):
191+
self.log.warning(
192+
"Bundle %s has unsafe URL template '%s', skipping URL update",
193+
bundle_name,
194+
new_template_,
195+
)
196+
new_template_ = None
197+
else:
198+
# Sign the URL for integrity verification
199+
new_template_ = _sign_bundle_url(new_template_, bundle_name)
200+
self.log.debug("Signed URL template for bundle %s", bundle_name)
201+
return new_template_, new_params_
202+
127203
stored = {b.name: b for b in session.query(DagBundleModel).all()}
204+
128205
for name in self._bundle_config.keys():
129206
if bundle := stored.pop(name, None):
130207
bundle.active = True
208+
new_template, new_params = _extract_and_sign_template(name)
209+
if new_template != bundle.signed_url_template:
210+
bundle.signed_url_template = new_template
211+
self.log.debug("Updated URL template for bundle %s", name)
212+
if new_params != bundle.template_params:
213+
bundle.template_params = new_params
214+
self.log.debug("Updated template parameters for bundle %s", name)
131215
else:
132-
session.add(DagBundleModel(name=name))
216+
new_template, new_params = _extract_and_sign_template(name)
217+
new_bundle = DagBundleModel(name=name)
218+
new_bundle.signed_url_template = new_template
219+
new_bundle.template_params = new_params
220+
221+
session.add(new_bundle)
133222
self.log.info("Added new DAG bundle %s to the database", name)
134223

135224
for name, bundle in stored.items():
@@ -140,6 +229,35 @@ def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None:
140229
session.execute(delete(ParseImportError).where(ParseImportError.bundle_name == name))
141230
self.log.info("Deleted import errors for bundle %s which is no longer configured", name)
142231

232+
@staticmethod
233+
def _extract_template_params(bundle_instance: BaseDagBundle) -> dict:
234+
"""
235+
Extract template parameters from a bundle instance's view_url_template method.
236+
237+
:param bundle_instance: The bundle instance to extract parameters from
238+
:return: Dictionary of template parameters
239+
"""
240+
import re
241+
242+
params: dict[str, str] = {}
243+
template = bundle_instance.view_url_template()
244+
245+
if not template:
246+
return params
247+
248+
# Extract template placeholders using regex
249+
# This matches {placeholder} patterns in the template
250+
PLACEHOLDER_PATTERN = re.compile(r"\{([^}]+)\}")
251+
placeholders = PLACEHOLDER_PATTERN.findall(template)
252+
253+
# Extract values for each placeholder found in the template
254+
for placeholder in placeholders:
255+
field_value = getattr(bundle_instance, placeholder, None)
256+
if field_value:
257+
params[placeholder] = field_value
258+
259+
return params
260+
143261
def get_bundle(self, name: str, version: str | None = None) -> BaseDagBundle:
144262
"""
145263
Get a DAG bundle by name.
@@ -165,5 +283,12 @@ def get_all_dag_bundles(self) -> Iterable[BaseDagBundle]:
165283
yield class_(name=name, version=None, **kwargs)
166284

167285
def view_url(self, name: str, version: str | None = None) -> str | None:
286+
warnings.warn(
287+
"The 'view_url' method is deprecated and will be removed when providers "
288+
"have Airflow 3.1 as the minimum supported version. "
289+
"Use DagBundleModel.render_url() instead.",
290+
DeprecationWarning,
291+
stacklevel=2,
292+
)
168293
bundle = self.get_bundle(name, version)
169294
return bundle.view_url(version=version)
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
19+
"""
20+
Add url template and template params to DagBundleModel.
21+
22+
Revision ID: 3bda03debd04
23+
Revises: f56f68b9e02f
24+
Create Date: 2025-07-04 10:12:12.711292
25+
26+
"""
27+
28+
from __future__ import annotations
29+
30+
import sqlalchemy as sa
31+
from alembic import op
32+
from sqlalchemy_utils import JSONType
33+
34+
# revision identifiers, used by Alembic.
35+
revision = "3bda03debd04"
36+
down_revision = "f56f68b9e02f"
37+
branch_labels = None
38+
depends_on = None
39+
airflow_version = "3.1.0"
40+
41+
42+
def upgrade():
43+
"""Apply Add url and template params to DagBundleModel."""
44+
with op.batch_alter_table("dag_bundle", schema=None) as batch_op:
45+
batch_op.add_column(sa.Column("signed_url_template", sa.String(length=200), nullable=True))
46+
batch_op.add_column(sa.Column("template_params", JSONType(), nullable=True))
47+
48+
49+
def downgrade():
50+
"""Unapply Add url and template params to DagBundleModel."""
51+
with op.batch_alter_table("dag_bundle", schema=None) as batch_op:
52+
batch_op.drop_column("template_params")
53+
batch_op.drop_column("signed_url_template")

0 commit comments

Comments
 (0)