Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions engine/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ COPY ./engine/src/postgres-elasticsearch-fdw/pg_es_fdw /pg_es_fdw/pg_es_fdw
RUN --mount=type=cache,id=pip-cache,target=/root/.cache/pip \
pip install "git+https://github.com/splitgraph/snowflake-sqlalchemy.git@14e64cc0ef7374df0cecc91923ff6901b0d721b7"

# Install PyAthena for Amazon Athena SQLAlchemy-based FDW, as well as pandas
RUN --mount=type=cache,id=pip-cache,target=/root/.cache/pip \
pip install "PyAthena>=2.4.1" && \
pip install "pandas>=1.0.0"

ENV PATH "${PATH}:/splitgraph/bin"
ENV PYTHONPATH "${PYTHONPATH}:/splitgraph:/pg_es_fdw"

Expand Down
2 changes: 1 addition & 1 deletion engine/src/Multicorn
6 changes: 6 additions & 0 deletions splitgraph/ingestion/athena/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
python_sources(
skip_black=True,
dependencies=[
"src/py/splitgraph/splitgraph/resources/icons",
],
)
136 changes: 136 additions & 0 deletions splitgraph/ingestion/athena/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import base64
import json
import urllib.parse
from copy import deepcopy
from typing import TYPE_CHECKING, Any, Dict, Optional

from splitgraph.core.types import Credentials, Params, TableInfo
from splitgraph.hooks.data_source.fdw import ForeignDataWrapperDataSource
from splitgraph.ingestion.common import build_commandline_help

if TYPE_CHECKING:
from splitgraph.engine.postgres.engine import PostgresEngine


class AmazonAthenaDataSource(ForeignDataWrapperDataSource):
credentials_schema: Dict[str, Any] = {
"type": "object",
"properties": {
"aws_access_key_id": {"type": "string", "title": "AWS Access Key Id"},
"aws_secret_access_key": {"type": "string", "title": "AWS Secret Access Key"},
},
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should be required, e.g.:

Suggested change
}
"required": ["aws_access_key_id", "aws_secret_access_key"],
}


params_schema = {
"type": "object",
"properties": {
"region_name": {
"type": "string",
"title": "S3 region",
"description": "Region of the S3 bucket",
},
"schema_name": {
"type": "string",
"title": "Schema",
"description": "Athena database name",
},
"s3_staging_dir": {
"title": "S3 results folder",
"type": "string",
"description": "Folder for storing query output",
},
},
"required": ["region_name", "schema_name", "s3_staging_dir"],
}

supports_mount = True
supports_load = True
supports_sync = False

commandline_help = """Mount a Amazon Athena database.

This will mount an Athena schema or a table:

\b
```
$ sgr mount athena s3 -o@- <<EOF
{
"aws_access_key_id": "ABCD",
"aws_secret_access_key": "abcd",
"region_name": "eu-west-3",
"schema_name": "mydatabase",
"s3_staging_dir": "s3://my-bucket/output/",
}
EOF
```
"""

commandline_kwargs_help: str = (
build_commandline_help(credentials_schema) + "\n" + build_commandline_help(params_schema)
)

_icon_file = "snowflake.svg"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


def __init__(
self,
engine: "PostgresEngine",
credentials: Credentials,
params: Params,
tables: Optional[TableInfo] = None,
):
super().__init__(engine, credentials, params, tables)

def get_fdw_name(self):
return "multicorn"

@classmethod
def get_name(cls) -> str:
return "Amazon Athena"

@classmethod
def get_description(cls) -> str:
return "Query data in Amazon S3 files and folders"

def get_table_options(
self, table_name: str, tables: Optional[TableInfo] = None
) -> Dict[str, str]:
result = super().get_table_options(table_name, tables)
result["tablename"] = result.get("tablename", table_name)
return result

def get_server_options(self):
options: Dict[str, Optional[str]] = {
"wrapper": "multicorn.sqlalchemyfdw.SqlAlchemyFdw",
"db_url": self._build_db_url(),
}

# For some reason, in SQLAlchemy, if this is not passed
# to the FDW params (even if it is in the DB URL), it doesn't
# schema-qualify tables and server-side cursors don't work for scanning
# (loads the whole table instead of scrolling through it).
if "schema" in self.params:
options["schema"] = self.params["schema"]

return options

def _build_db_url(self) -> str:
"""Construct the SQLAlchemy Amazon Athena db_url"""

aws_access_key_id = self.params["aws_access_key_id"]
aws_secret_access_key = self.params["aws_secret_access_key"]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comes from credentials (not merged into params)

Suggested change
aws_access_key_id = self.params["aws_access_key_id"]
aws_secret_access_key = self.params["aws_secret_access_key"]
aws_access_key_id = self.credentials["aws_access_key_id"]
aws_secret_access_key = self.credentials["aws_secret_access_key"]

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curiously, it actually works properly either way.

region_name = self.params["region_name"]
schema_name = self.params["schema_name"]
s3_staging_dir = self.params["s3_staging_dir"]

db_url = (
f"awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@"
f"athena.{region_name}.amazonaws.com:443/"
f"{schema_name}?s3_staging_dir={s3_staging_dir}"
)

return db_url

def get_remote_schema_name(self) -> str:
if "schema_name" not in self.params:
raise ValueError("Cannot IMPORT FOREIGN SCHEMA without a schema_name!")
return str(self.params["schema_name"])
30 changes: 18 additions & 12 deletions splitgraph/ingestion/csv/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,23 +241,29 @@ class CSVDataSource(ForeignDataWrapperDataSource):

If passed an URL, this will live query a CSV file on an HTTP server. If passed
S3 access credentials, this will scan a bucket for CSV files, infer their schema
and make them available to query over SQL.
and make them available to query over SQL.

For example:
For example:

\b
```
sgr mount csv target_schema -o@- <<EOF
{
"s3_endpoint": "cdn.mycompany.com:9000",
"s3_access_key": "ABCDEF",
"s3_secret_key": "GHIJKL",
"s3_bucket": "data",
"s3_object_prefix": "csv_files/current/",
"autodetect_header": true,
"autodetect_dialect": true,
"autodetect_encoding": true
}
{
"s3_access_key": "ABCD",
"s3_secret_key": "abcd",
"connection":
{
"connection_type": "s3",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation

"s3_bucket": "my-bucket-name",
"s3_endpoint": "s3.amazonaws.com",
"s3_region": "eu-west-3",
"s3_object_prefix": "",
"s3_object": "iris/iris.csv"
},
"autodetect_header": true,
"autodetect_dialect": true,
"autodetect_encoding": true
}
EOF
```
"""
Expand Down
2 changes: 2 additions & 0 deletions splitgraph/ingestion/snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ def get_server_options(self):

if "batch_size" in self.params:
options["batch_size"] = str(self.params["batch_size"])
else:
options["batch_size"] = "10000"

if self.credentials["secret"]["secret_type"] == "private_key":
options["connect_args"] = json.dumps(
Expand Down