-
Notifications
You must be signed in to change notification settings - Fork 202
Implementing metadata freshness checks #481
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9712ab2
5f9d507
92373ec
823397f
8204482
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,7 +3,7 @@ | |
| from dbt.contracts.relation import ( | ||
| ComponentName, | ||
| ) | ||
| from dbt.adapters.base.relation import BaseRelation, Policy | ||
| from dbt.adapters.base.relation import BaseRelation, Policy, InformationSchema | ||
| from dbt.adapters.spark.impl import KEY_TABLE_OWNER, KEY_TABLE_STATISTICS | ||
| from dbt.dataclass_schema import StrEnum | ||
|
|
||
|
|
@@ -37,6 +37,16 @@ class DatabricksRelationType(StrEnum): | |
| StreamingTable = "streamingtable" | ||
|
|
||
|
|
||
| @dataclass(frozen=True, eq=False, repr=False) | ||
| class DatabricksInformationSchema(InformationSchema): | ||
| quote_policy: Policy = field(default_factory=lambda: DatabricksQuotePolicy()) | ||
| include_policy: Policy = field(default_factory=lambda: DatabricksIncludePolicy()) | ||
| quote_character: str = "`" | ||
|
|
||
| def is_hive_metastore(self) -> bool: | ||
| return self.database is None or self.database == "hive_metastore" | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just confirming: does dbt's
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. database == catalog. |
||
|
|
||
|
|
||
| @dataclass(frozen=True, eq=False, repr=False) | ||
| class DatabricksRelation(BaseRelation): | ||
| type: Optional[DatabricksRelationType] = None # type: ignore | ||
|
|
@@ -115,3 +125,13 @@ def matches( | |
| @classproperty | ||
| def get_relation_type(cls) -> Type[DatabricksRelationType]: | ||
| return DatabricksRelationType | ||
|
|
||
| def information_schema(self, view_name: Optional[str] = None) -> InformationSchema: | ||
| # some of our data comes from jinja, where things can be `Undefined`. | ||
| if not isinstance(view_name, str): | ||
| view_name = None | ||
|
|
||
| # Kick the user-supplied schema out of the information schema relation | ||
| # Instead address this as <database>.information_schema by default | ||
| info_schema = DatabricksInformationSchema.from_relation(self, view_name) | ||
| return info_schema.incorporate(path={"schema": None}) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| {% macro databricks__get_relation_last_modified(information_schema, relations) -%} | ||
|
|
||
| {%- call statement('last_modified', fetch_result=True) -%} | ||
| {% if information_schema.is_hive_metastore %} | ||
| {%- for relation in relations -%} | ||
| select '{{ relation.schema }}' as schema, | ||
| '{{ relation.identifier }}' as identifier, | ||
| max(timestamp) as last_modified, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just confirming: the output of a Databricks SQL query that includes a
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. with the absence of group by, it will be a single row. |
||
| {{ current_timestamp() }} as snapshotted_at | ||
| from (describe history {{ relation.schema }}.{{ relation.identifier }}) | ||
| {% if not loop.last %} | ||
| union all | ||
| {% endif %} | ||
| {%- endfor -%} | ||
| {% else %} | ||
| select table_schema as schema, | ||
| table_name as identifier, | ||
| last_altered as last_modified, | ||
| {{ current_timestamp() }} as snapshotted_at | ||
| from {{ information_schema }}.tables | ||
| where ( | ||
| {%- for relation in relations -%} | ||
| (table_schema = '{{ relation.schema }}' and | ||
| table_name = '{{ relation.identifier }}'){%- if not loop.last %} or {% endif -%} | ||
| {%- endfor -%} | ||
| ) | ||
| {% endif %} | ||
| {%- endcall -%} | ||
|
|
||
| {{ return(load_result('last_modified')) }} | ||
|
|
||
| {% endmacro %} | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| import os | ||
| import pytest | ||
|
|
||
| from dbt.tests.util import get_artifact, run_dbt | ||
|
|
||
| freshness_via_metadata_schema_yml = """ | ||
| version: 2 | ||
| sources: | ||
| - name: test_source | ||
| freshness: | ||
| warn_after: {count: 10, period: hour} | ||
| error_after: {count: 1, period: day} | ||
| schema: "{{ env_var('DBT_GET_RELATION_TEST_SCHEMA') }}" | ||
| tables: | ||
| - name: test_table | ||
| """ | ||
|
|
||
|
|
||
| class TestGetRelationLastModified: | ||
| @pytest.fixture(scope="class", autouse=True) | ||
| def set_env_vars(self, project): | ||
| os.environ["DBT_GET_RELATION_TEST_SCHEMA"] = project.test_schema | ||
| yield | ||
| del os.environ["DBT_GET_RELATION_TEST_SCHEMA"] | ||
|
|
||
| @pytest.fixture(scope="class") | ||
| def models(self): | ||
| return {"schema.yml": freshness_via_metadata_schema_yml} | ||
|
|
||
| @pytest.fixture(scope="class") | ||
| def custom_schema(self, project, set_env_vars): | ||
| with project.adapter.connection_named("__test"): | ||
| relation = project.adapter.Relation.create( | ||
| database=project.database, schema=os.environ["DBT_GET_RELATION_TEST_SCHEMA"] | ||
| ) | ||
| project.adapter.drop_schema(relation) | ||
| project.adapter.create_schema(relation) | ||
|
|
||
| yield relation.schema | ||
|
|
||
| with project.adapter.connection_named("__test"): | ||
| project.adapter.drop_schema(relation) | ||
|
|
||
| def test_get_relation_last_modified(self, project, custom_schema): | ||
| project.run_sql( | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test is currently hand copied from dbt-snowflake. Nothing I do in the test is specific to databricks, so hopefully it gets pulled into core.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Essentially the test just says, were we able to run the metadata commands and not warn/error. We definitely need another test that asserts that something useful happens. Will add after I'm back.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated the test to ensure the freshness tests passed, as opposed to not having a warning or error. @dataders might be worth considering this form for core; to me at least its more obvious what the test is doing (as opposed to the probe function). Also, this file did not match the format I found in the documentation for sources.json, btw. |
||
| f"create table {custom_schema}.test_table (id integer, name varchar(100) not null);" | ||
| ) | ||
|
|
||
| run_dbt(["source", "freshness"]) | ||
|
|
||
| sources = get_artifact("target/sources.json") | ||
|
|
||
| assert sources["results"][0]["status"] == "pass" | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the first time our dialect has needed to use
_capabilities? Or had we just inherited the_capabilitiesfrom dbt-spark? Not a blocker for this PR but I wonder if more capabilities should be reflected explicitly here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is new in 1.7