Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## dbt-databricks 1.7.x (TBD)

### Features

- Added support for getting freshness from metadata ([481](https://github.com/databricks/dbt-databricks/pull/481))

## dbt-databricks 1.7.0rc1 (October 13, 2023)

### Fixes
Expand Down
5 changes: 5 additions & 0 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dbt.adapters.base.impl import catch_as_completed
from dbt.adapters.base.meta import available
from dbt.adapters.base.relation import BaseRelation, InformationSchema
from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability
from dbt.adapters.spark.impl import (
SparkAdapter,
GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME,
Expand Down Expand Up @@ -107,6 +108,10 @@ class DatabricksAdapter(SparkAdapter):

AdapterSpecificConfigs = DatabricksConfig

_capabilities = CapabilityDict(
{Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full)}
)

@available.parse(lambda *a, **k: 0)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the first time our dialect has needed to use _capabilities? Or had we just inherited the _capabilities from dbt-spark? Not a blocker for this PR but I wonder if more capabilities should be reflected explicitly here.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is new in 1.7

def compare_dbr_version(self, major: int, minor: int) -> int:
"""
Expand Down
22 changes: 21 additions & 1 deletion dbt/adapters/databricks/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dbt.contracts.relation import (
ComponentName,
)
from dbt.adapters.base.relation import BaseRelation, Policy
from dbt.adapters.base.relation import BaseRelation, Policy, InformationSchema
from dbt.adapters.spark.impl import KEY_TABLE_OWNER, KEY_TABLE_STATISTICS
from dbt.dataclass_schema import StrEnum

Expand Down Expand Up @@ -37,6 +37,16 @@ class DatabricksRelationType(StrEnum):
StreamingTable = "streamingtable"


@dataclass(frozen=True, eq=False, repr=False)
class DatabricksInformationSchema(InformationSchema):
quote_policy: Policy = field(default_factory=lambda: DatabricksQuotePolicy())
include_policy: Policy = field(default_factory=lambda: DatabricksIncludePolicy())
quote_character: str = "`"

def is_hive_metastore(self) -> bool:
return self.database is None or self.database == "hive_metastore"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just confirming: does dbt's database == Databricks' schema?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

database == catalog.



@dataclass(frozen=True, eq=False, repr=False)
class DatabricksRelation(BaseRelation):
type: Optional[DatabricksRelationType] = None # type: ignore
Expand Down Expand Up @@ -115,3 +125,13 @@ def matches(
@classproperty
def get_relation_type(cls) -> Type[DatabricksRelationType]:
return DatabricksRelationType

def information_schema(self, view_name: Optional[str] = None) -> InformationSchema:
# some of our data comes from jinja, where things can be `Undefined`.
if not isinstance(view_name, str):
view_name = None

# Kick the user-supplied schema out of the information schema relation
# Instead address this as <database>.information_schema by default
info_schema = DatabricksInformationSchema.from_relation(self, view_name)
return info_schema.incorporate(path={"schema": None})
32 changes: 32 additions & 0 deletions dbt/include/databricks/macros/metadata.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{% macro databricks__get_relation_last_modified(information_schema, relations) -%}

{%- call statement('last_modified', fetch_result=True) -%}
{% if information_schema.is_hive_metastore %}
{%- for relation in relations -%}
select '{{ relation.schema }}' as schema,
'{{ relation.identifier }}' as identifier,
max(timestamp) as last_modified,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just confirming: the output of a Databricks SQL query that includes a max() aggregation is never more than a single row, yes? That's why there is no GROUP BY present here?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with the absence of group by, it will be a single row.

{{ current_timestamp() }} as snapshotted_at
from (describe history {{ relation.schema }}.{{ relation.identifier }})
{% if not loop.last %}
union all
{% endif %}
{%- endfor -%}
{% else %}
select table_schema as schema,
table_name as identifier,
last_altered as last_modified,
{{ current_timestamp() }} as snapshotted_at
from {{ information_schema }}.tables
where (
{%- for relation in relations -%}
(table_schema = '{{ relation.schema }}' and
table_name = '{{ relation.identifier }}'){%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
{% endif %}
{%- endcall -%}

{{ return(load_result('last_modified')) }}

{% endmacro %}
53 changes: 53 additions & 0 deletions tests/functional/adapter/test_source_freshness.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import os
import pytest

from dbt.tests.util import get_artifact, run_dbt

freshness_via_metadata_schema_yml = """
version: 2
sources:
- name: test_source
freshness:
warn_after: {count: 10, period: hour}
error_after: {count: 1, period: day}
schema: "{{ env_var('DBT_GET_RELATION_TEST_SCHEMA') }}"
tables:
- name: test_table
"""


class TestGetRelationLastModified:
@pytest.fixture(scope="class", autouse=True)
def set_env_vars(self, project):
os.environ["DBT_GET_RELATION_TEST_SCHEMA"] = project.test_schema
yield
del os.environ["DBT_GET_RELATION_TEST_SCHEMA"]

@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": freshness_via_metadata_schema_yml}

@pytest.fixture(scope="class")
def custom_schema(self, project, set_env_vars):
with project.adapter.connection_named("__test"):
relation = project.adapter.Relation.create(
database=project.database, schema=os.environ["DBT_GET_RELATION_TEST_SCHEMA"]
)
project.adapter.drop_schema(relation)
project.adapter.create_schema(relation)

yield relation.schema

with project.adapter.connection_named("__test"):
project.adapter.drop_schema(relation)

def test_get_relation_last_modified(self, project, custom_schema):
project.run_sql(
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is currently hand copied from dbt-snowflake. Nothing I do in the test is specific to databricks, so hopefully it gets pulled into core.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Essentially the test just says, were we able to run the metadata commands and not warn/error. We definitely need another test that asserts that something useful happens. Will add after I'm back.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the test to ensure the freshness tests passed, as opposed to not having a warning or error. @dataders might be worth considering this form for core; to me at least its more obvious what the test is doing (as opposed to the probe function). Also, this file did not match the format I found in the documentation for sources.json, btw.

f"create table {custom_schema}.test_table (id integer, name varchar(100) not null);"
)

run_dbt(["source", "freshness"])

sources = get_artifact("target/sources.json")

assert sources["results"][0]["status"] == "pass"