-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy path1.10.4.patch
More file actions
103 lines (95 loc) · 4.37 KB
/
1.10.4.patch
File metadata and controls
103 lines (95 loc) · 4.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
From bc90066396ae576fa339017f9dc1bd5a20eed268 Mon Sep 17 00:00:00 2001
From: Kostya Esmukov <kostya@esmukov.ru>
Date: Sun, 14 Jul 2019 13:15:54 +0300
Subject: [PATCH] Add support for declarative dags provided by
airflow-declarative project
---
airflow/models/dagbag.py | 15 +++++++++++++--
airflow/utils/dag_processing.py | 4 ++--
airflow/www/views.py | 8 ++++++--
setup.py | 1 +
4 files changed, 22 insertions(+), 6 deletions(-)
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index a789a80a..7a3ef67e 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -172,8 +172,9 @@ class DagBag(BaseDagBag, LoggingMixin):
mods = []
is_zipfile = zipfile.is_zipfile(filepath)
+ _, file_ext = os.path.splitext(os.path.split(filepath)[-1])
if not is_zipfile:
- if safe_mode:
+ if safe_mode and file_ext not in ('.yaml', '.yml'):
with open(filepath, 'rb') as f:
content = f.read()
if not all([s in content for s in (b'DAG', b'airflow')]):
@@ -197,7 +198,17 @@ class DagBag(BaseDagBag, LoggingMixin):
with timeout(configuration.conf.getint('core', "DAGBAG_IMPORT_TIMEOUT")):
try:
- m = imp.load_source(mod_name, filepath)
+ if file_ext in ('.yaml', '.yml'):
+ # Avoid the possibility of circular import error
+ # by importing Declarative here, in a function:
+ import airflow_declarative
+
+ declarative_dags_list = airflow_declarative.from_path(filepath)
+ m = imp.new_module(mod_name)
+ for i, dag in enumerate(declarative_dags_list):
+ setattr(m, "dag%s" % i, dag)
+ else:
+ m = imp.load_source(mod_name, filepath)
mods.append(m)
except Exception as e:
self.log.exception("Failed to import: %s", filepath)
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 33a54678..bf1ce166 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -347,7 +347,7 @@ def list_py_file_paths(directory, safe_mode=True,
continue
mod_name, file_ext = os.path.splitext(
os.path.split(file_path)[-1])
- if file_ext != '.py' and not zipfile.is_zipfile(file_path):
+ if file_ext not in ('.py', '.yaml', '.yml') and not zipfile.is_zipfile(file_path):
continue
if any([re.findall(p, file_path) for p in patterns]):
continue
@@ -361,7 +361,7 @@ def list_py_file_paths(directory, safe_mode=True,
might_contain_dag = all(
[s in content for s in (b'DAG', b'airflow')])
- if not might_contain_dag:
+ if not might_contain_dag and file_ext == '.py':
continue
file_paths.append(file_path)
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 32eccc02..6b627e9f 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -672,8 +672,12 @@ class Airflow(AirflowViewMixin, BaseView):
try:
with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as f:
code = f.read()
- html_code = highlight(
- code, lexers.PythonLexer(), HtmlFormatter(linenos=True))
+ if dag.fileloc.endswith(('.yml', '.yaml')):
+ html_code = highlight(
+ code, lexers.YamlLexer(), HtmlFormatter(linenos=True))
+ else:
+ html_code = highlight(
+ code, lexers.PythonLexer(), HtmlFormatter(linenos=True))
except IOError as e:
html_code = str(e)
diff --git a/setup.py b/setup.py
index 57173230..1d78acff 100644
--- a/setup.py
+++ b/setup.py
@@ -304,6 +304,7 @@ def do_setup():
zip_safe=False,
scripts=['airflow/bin/airflow'],
install_requires=[
+ 'airflow-declarative',
'alembic>=1.0, <2.0',
'cached_property~=1.5',
'configparser>=3.5.0, <3.6.0',
--
2.22.0