Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions engine/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ RUN --mount=type=cache,id=pip-cache,target=/root/.cache/pip \
pip install "PyAthena>=2.4.1" && \
pip install "pandas>=1.0.0"

# Install Google's Big Query SQLAlchemy dialect lib
RUN --mount=type=cache,id=pip-cache,target=/root/.cache/pip \
pip install "sqlalchemy-bigquery"

ENV PATH "${PATH}:/splitgraph/bin"
ENV PYTHONPATH "${PYTHONPATH}:/splitgraph:/pg_es_fdw"

Expand Down
4 changes: 4 additions & 0 deletions engine/Dockerfile.debug
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ RUN --mount=type=cache,id=pip-cache,target=/root/.cache/pip \
pip install "PyAthena>=2.4.1" && \
pip install "pandas>=1.0.0"

# Install Google's Big Query SQLAlchemy dialect lib
RUN --mount=type=cache,id=pip-cache,target=/root/.cache/pip \
pip install "sqlalchemy-bigquery"

ENV PATH "${PATH}:/splitgraph/bin"
ENV PYTHONPATH "${PYTHONPATH}:/splitgraph:/pg_es_fdw"

Expand Down
1 change: 1 addition & 0 deletions splitgraph/config/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
"snowflake": "splitgraph.ingestion.snowflake.SnowflakeDataSource",
"dbt": "splitgraph.ingestion.dbt.data_source.DBTDataSource",
"athena": "splitgraph.ingestion.athena.AmazonAthenaDataSource",
"bigquery": "splitgraph.ingestion.bigquery.BigQueryDataSource",
},
}

Expand Down
6 changes: 6 additions & 0 deletions splitgraph/ingestion/bigquery/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
python_sources(
skip_black=True,
dependencies=[
"src/py/splitgraph/splitgraph/resources/icons",
],
)
140 changes: 140 additions & 0 deletions splitgraph/ingestion/bigquery/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import base64
from copy import deepcopy
from typing import TYPE_CHECKING, Any, Dict, Optional

from splitgraph.core.types import Credentials, Params, TableInfo
from splitgraph.hooks.data_source.fdw import ForeignDataWrapperDataSource
from splitgraph.ingestion.common import build_commandline_help

if TYPE_CHECKING:
from splitgraph.engine.postgres.engine import PostgresEngine


class BigQueryDataSource(ForeignDataWrapperDataSource):
credentials_schema: Dict[str, Any] = {
"type": "object",
"properties": {
"credentials": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's (currently) no point in letting users of the JSONSchema (which is used in form generation) to pass credentials via a path. I think this could be simplified to treat the commandline-passed credential string as a path and the one passed via __init__ as a JSON-serialized credential.

JSONSchema:

"credentials": {
    "type": "string",
    "title": "GCP credentials",
    "description": "GCP credentials in JSON format",
}

commandline:

$ sgr mount bigquery bq -o@- <<EOF
{
    "credentials": "/path/to/my/creds.json",
    "project": "my-project-name",
    "dataset_name": "my_dataset"
}
EOF

...

credentials = Credentials({})

with open(params.pop("credentials"), "r") as credentials_file:
    credentials_str = credentials_file.read()

params.pop("credentials")
credentials["credentials"] = credentials_str

"type": "string",
"title": "GCP credentials",
"description": "GCP credentials in JSON format",
},
},
}

params_schema = {
"type": "object",
"properties": {
"project": {
"type": "string",
"title": "GCP project name",
"description": "Name of the GCP project to use",
},
"dataset_name": {
"type": "string",
"title": "Big Query dataset",
"description": "Name of the dataset in Big Query",
},
},
"required": ["project", "dataset_name"],
}

supports_mount = True
supports_load = True
supports_sync = False

commandline_help = """Mount a GCP Big Query project/dataset.

This will mount a Big Query dataset:

\b
```
$ sgr mount bigquery bq -o@- <<EOF
{
"credentials": "/path/to/my/creds.json",
"project": "my-project-name",
"dataset_name": "my_dataset"
}
EOF
```
"""

commandline_kwargs_help: str = (
build_commandline_help(credentials_schema) + "\n" + build_commandline_help(params_schema)
)

_icon_file = "bigquery.svg"

def __init__(
self,
engine: "PostgresEngine",
credentials: Credentials,
params: Params,
tables: Optional[TableInfo] = None,
):
super().__init__(engine, credentials, params, tables)

def get_fdw_name(self):
return "multicorn"

@classmethod
def get_name(cls) -> str:
return "Google BigQuery"

@classmethod
def get_description(cls) -> str:
return "Query data in GCP BigQuery datasets"

@classmethod
def from_commandline(cls, engine, commandline_kwargs) -> "BigQueryDataSource":
params = deepcopy(commandline_kwargs)
credentials = Credentials({})

if "credentials" in params:
with open(params["credentials"], "r") as credentials_file:
credentials_str = credentials_file.read()

params.pop("credentials")
credentials["credentials"] = credentials_str

return cls(engine, credentials, params)

def get_table_options(
self, table_name: str, tables: Optional[TableInfo] = None
) -> Dict[str, str]:
result = super().get_table_options(table_name, tables)
result["tablename"] = result.get("tablename", table_name)
return result

def get_server_options(self):
options: Dict[str, Optional[str]] = {
"wrapper": "multicorn.sqlalchemyfdw.SqlAlchemyFdw",
"db_url": self._build_db_url(),
}

# For some reason, in SQLAlchemy, if this is not passed
# to the FDW params (even if it is in the DB URL), it doesn't
# schema-qualify tables and server-side cursors don't work for scanning
# (loads the whole table instead of scrolling through it).
if "schema" in self.params:
options["schema"] = self.params["schema"]

return options

def _build_db_url(self) -> str:
"""Construct the SQLAlchemy GCP Big Query db_url"""

db_url = f"bigquery://{self.params['project']}/{self.params['dataset_name']}"

if "credentials" in self.credentials:
# base64 encode the credentials
credentials_str = self.credentials["credentials"]
credentials_base64 = base64.urlsafe_b64encode(credentials_str.encode()).decode()
db_url += f"?credentials_base64={credentials_base64}"

return db_url

def get_remote_schema_name(self) -> str:
if "dataset_name" not in self.params:
raise ValueError("Cannot IMPORT FOREIGN SCHEMA without a dataset_name!")
return str(self.params["dataset_name"])
6 changes: 6 additions & 0 deletions splitgraph/resources/icons/biqquery.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
12 changes: 12 additions & 0 deletions test/resources/ingestion/bigquery/dummy_credentials.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"type": "service_account",
"project_id": "project_id",
"private_key_id": "private_key_id",
"private_key": "private_key",
"client_email": "client_email",
"client_id": "client_id",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "client_x509_cert_url"
}
77 changes: 77 additions & 0 deletions test/splitgraph/ingestion/test_bigquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import base64
from unittest.mock import Mock

import pytest
from psycopg2 import DatabaseError

from splitgraph.core.types import Credentials, Params
from splitgraph.hooks.mount_handlers import mount
from splitgraph.ingestion.bigquery import BigQueryDataSource


def test_bigquery_data_source_options_creds_file(local_engine_empty):
source = BigQueryDataSource.from_commandline(
local_engine_empty,
{
"credentials": "test/resources/ingestion/bigquery/dummy_credentials.json",
"project": "bigquery-public-data",
"dataset_name": "hacker_news",
},
)

with open("test/resources/ingestion/bigquery/dummy_credentials.json", "r") as credentials_file:
credentials_str = credentials_file.read()
credentials_base64 = base64.urlsafe_b64encode(credentials_str.encode()).decode()

assert source.get_server_options() == {
"db_url": f"bigquery://bigquery-public-data/hacker_news?credentials_base64={credentials_base64}",
"wrapper": "multicorn.sqlalchemyfdw.SqlAlchemyFdw",
}


def test_bigquery_data_source_options_creds_raw():
source = BigQueryDataSource(
Mock(),
credentials=Credentials({"credentials": "test-raw-creds"}),
params=Params(
{
"project": "bigquery-public-data",
"dataset_name": "hacker_news",
}
),
)

credentials_base64 = base64.urlsafe_b64encode("test-raw-creds".encode()).decode()

assert source.get_server_options() == {
"db_url": f"bigquery://bigquery-public-data/hacker_news?credentials_base64={credentials_base64}",
"wrapper": "multicorn.sqlalchemyfdw.SqlAlchemyFdw",
}


def test_bigquery_data_source_options_no_creds_file():
source = BigQueryDataSource(
Mock(),
credentials=Credentials({}),
params=Params(
{
"project": "bigquery-public-data",
"dataset_name": "hacker_news",
}
),
)

assert source.get_server_options() == {
"db_url": "bigquery://bigquery-public-data/hacker_news",
"wrapper": "multicorn.sqlalchemyfdw.SqlAlchemyFdw",
}


@pytest.mark.mounting
def test_bigquery_mount_expected_error():
with pytest.raises(DatabaseError, match="Could not automatically determine credentials"):
mount(
"bq",
"bigquery",
{"project": "bigquery-public-data", "dataset_name": "hacker_news"},
)