Add session-level query tags to Databricks SQL operators (fixes #66839)#66895
Open
SkastVnT wants to merge 1 commit into
Open
Add session-level query tags to Databricks SQL operators (fixes #66839)#66895SkastVnT wants to merge 1 commit into
SkastVnT wants to merge 1 commit into
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide
|
ae3f2a6 to
333417a
Compare
Author
3b0cc58 to
cd0ae67
Compare
Author
78b9d53 to
a6557d8
Compare
2 tasks
Contributor
|
cc @alexott @mwojtyczka @moomindani can you help with review? |
a6557d8 to
d8a3970
Compare
Author
|
Fixed the CI failure. The issue was caused by the new I updated the sensors to pass |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.









Summary
Closes #66839.
When Databricks SQL operators execute queries, those queries are invisible in
system.query.historybecause no session-level tags are attached. This PR injects Airflow context metadata into the DatabricksQUERY_TAGSsession parameter so every query can be traced back to the DAG/task that triggered it.Approach
Tags are serialised into the
key:value,key:valueformat required by Databricks and injected viasession_configuration={"QUERY_TAGS": "..."}insideDatabricksSqlHook.get_conn(). This is the correct mechanism — the connector-levelquery_tags=parameter does not propagate tosystem.query.history.Key design decisions
query_tags={"key": "value"}— readable, type-safe, mergeable_format_query_tags) owns the escaping/truncation rules\,,,:are escaped; values truncated at 128 charsQUERY_TAGSsession_configurationinclude_airflow_query_tags=FalseChanges
providers/databricks/src/airflow/providers/databricks/hooks/databricks_sql.py_format_query_tag_value(value)— escape special chars, truncate at 128 chars._format_query_tags(tags)— convertdict[str, str | None]→"key:value,...".get_conn()now builds asession_configdict, merges any incomingQUERY_TAGSstring with the formatted operator tags, and passes it assession_configurationtosql.connect().providers/databricks/src/airflow/providers/databricks/operators/databricks_sql.py_get_airflow_query_tags(context)that returns a dict of Airflow context keys (airflow_dag_id,airflow_task_id,airflow_run_id,airflow_try_number,airflow_map_index).DatabricksSqlOperatorandDatabricksCopyIntoOperatoracceptquery_tags: dict[str, str | None]andinclude_airflow_query_tags: bool = True.execute(), each operator merges its ownquery_tagswith Airflow context tags (if enabled) and assigns the result tohook.query_tags.Tests
test_databricks_sql.py(hooks): replaced oldquery_tags=connector-kwarg tests with newsession_configuration["QUERY_TAGS"]assertions; addedTestFormatQueryTagsclass covering value escaping, truncation, None-omission, and round-trip correctness.test_databricks_sql.py(operators): existing tests unchanged — they already test the dict-level API.Verification
Tested against a real Databricks workspace: all 4 tasks in the test DAG completed successfully, and
QUERY_TAGSappeared insystem.query.historyas aMAP<STRING,STRING>with the expected Airflow keys.Checklist
ruff formatandruff check --fix— all checks pass.