-
Notifications
You must be signed in to change notification settings - Fork 19
Amazon Athena data source #634
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
93755ca
f9c1940
cc011c2
98b7f5f
0044f7b
437e4ba
132a946
ccefd89
778684d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| python_sources( | ||
| skip_black=True, | ||
| dependencies=[ | ||
| "src/py/splitgraph/splitgraph/resources/icons", | ||
| ], | ||
| ) |
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,136 @@ | ||||||||||
| import base64 | ||||||||||
| import json | ||||||||||
| import urllib.parse | ||||||||||
| from copy import deepcopy | ||||||||||
| from typing import TYPE_CHECKING, Any, Dict, Optional | ||||||||||
|
|
||||||||||
| from splitgraph.core.types import Credentials, Params, TableInfo | ||||||||||
| from splitgraph.hooks.data_source.fdw import ForeignDataWrapperDataSource | ||||||||||
| from splitgraph.ingestion.common import build_commandline_help | ||||||||||
|
|
||||||||||
| if TYPE_CHECKING: | ||||||||||
| from splitgraph.engine.postgres.engine import PostgresEngine | ||||||||||
|
|
||||||||||
|
|
||||||||||
| class AmazonAthenaDataSource(ForeignDataWrapperDataSource): | ||||||||||
| credentials_schema: Dict[str, Any] = { | ||||||||||
| "type": "object", | ||||||||||
| "properties": { | ||||||||||
| "aws_access_key_id": {"type": "string", "title": "AWS Access Key Id"}, | ||||||||||
| "aws_secret_access_key": {"type": "string", "title": "AWS Secret Access Key"}, | ||||||||||
| }, | ||||||||||
| } | ||||||||||
|
|
||||||||||
| params_schema = { | ||||||||||
| "type": "object", | ||||||||||
| "properties": { | ||||||||||
| "region_name": { | ||||||||||
| "type": "string", | ||||||||||
| "title": "S3 region", | ||||||||||
| "description": "Region of the S3 bucket", | ||||||||||
| }, | ||||||||||
| "schema_name": { | ||||||||||
| "type": "string", | ||||||||||
| "title": "Schema", | ||||||||||
| "description": "Athena database name", | ||||||||||
| }, | ||||||||||
| "s3_staging_dir": { | ||||||||||
| "title": "S3 results folder", | ||||||||||
| "type": "string", | ||||||||||
| "description": "Folder for storing query output", | ||||||||||
| }, | ||||||||||
| }, | ||||||||||
| "required": ["region_name", "schema_name", "s3_staging_dir"], | ||||||||||
| } | ||||||||||
|
|
||||||||||
| supports_mount = True | ||||||||||
| supports_load = True | ||||||||||
| supports_sync = False | ||||||||||
|
|
||||||||||
| commandline_help = """Mount a Amazon Athena database. | ||||||||||
|
|
||||||||||
| This will mount an Athena schema or a table: | ||||||||||
|
|
||||||||||
| \b | ||||||||||
| ``` | ||||||||||
| $ sgr mount athena s3 -o@- <<EOF | ||||||||||
| { | ||||||||||
| "aws_access_key_id": "ABCD", | ||||||||||
| "aws_secret_access_key": "abcd", | ||||||||||
| "region_name": "eu-west-3", | ||||||||||
| "schema_name": "mydatabase", | ||||||||||
| "s3_staging_dir": "s3://my-bucket/output/", | ||||||||||
| } | ||||||||||
| EOF | ||||||||||
| ``` | ||||||||||
| """ | ||||||||||
|
|
||||||||||
| commandline_kwargs_help: str = ( | ||||||||||
| build_commandline_help(credentials_schema) + "\n" + build_commandline_help(params_schema) | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| _icon_file = "snowflake.svg" | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use something like https://symbols.getvecta.com/stencil_5/0_aws-athena.bb0d0ced14.svg? |
||||||||||
|
|
||||||||||
| def __init__( | ||||||||||
| self, | ||||||||||
| engine: "PostgresEngine", | ||||||||||
| credentials: Credentials, | ||||||||||
| params: Params, | ||||||||||
| tables: Optional[TableInfo] = None, | ||||||||||
| ): | ||||||||||
| super().__init__(engine, credentials, params, tables) | ||||||||||
|
|
||||||||||
| def get_fdw_name(self): | ||||||||||
| return "multicorn" | ||||||||||
|
|
||||||||||
| @classmethod | ||||||||||
| def get_name(cls) -> str: | ||||||||||
| return "Amazon Athena" | ||||||||||
|
|
||||||||||
| @classmethod | ||||||||||
| def get_description(cls) -> str: | ||||||||||
| return "Query data in Amazon S3 files and folders" | ||||||||||
|
|
||||||||||
| def get_table_options( | ||||||||||
| self, table_name: str, tables: Optional[TableInfo] = None | ||||||||||
| ) -> Dict[str, str]: | ||||||||||
| result = super().get_table_options(table_name, tables) | ||||||||||
| result["tablename"] = result.get("tablename", table_name) | ||||||||||
| return result | ||||||||||
|
|
||||||||||
| def get_server_options(self): | ||||||||||
| options: Dict[str, Optional[str]] = { | ||||||||||
| "wrapper": "multicorn.sqlalchemyfdw.SqlAlchemyFdw", | ||||||||||
| "db_url": self._build_db_url(), | ||||||||||
| } | ||||||||||
|
|
||||||||||
| # For some reason, in SQLAlchemy, if this is not passed | ||||||||||
| # to the FDW params (even if it is in the DB URL), it doesn't | ||||||||||
| # schema-qualify tables and server-side cursors don't work for scanning | ||||||||||
| # (loads the whole table instead of scrolling through it). | ||||||||||
| if "schema" in self.params: | ||||||||||
| options["schema"] = self.params["schema"] | ||||||||||
|
|
||||||||||
| return options | ||||||||||
|
|
||||||||||
| def _build_db_url(self) -> str: | ||||||||||
| """Construct the SQLAlchemy Amazon Athena db_url""" | ||||||||||
|
|
||||||||||
| aws_access_key_id = self.params["aws_access_key_id"] | ||||||||||
| aws_secret_access_key = self.params["aws_secret_access_key"] | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this comes from credentials (not merged into params)
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Curiously, it actually works properly either way. |
||||||||||
| region_name = self.params["region_name"] | ||||||||||
| schema_name = self.params["schema_name"] | ||||||||||
| s3_staging_dir = self.params["s3_staging_dir"] | ||||||||||
|
|
||||||||||
| db_url = ( | ||||||||||
| f"awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@" | ||||||||||
| f"athena.{region_name}.amazonaws.com:443/" | ||||||||||
| f"{schema_name}?s3_staging_dir={s3_staging_dir}" | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| return db_url | ||||||||||
|
|
||||||||||
| def get_remote_schema_name(self) -> str: | ||||||||||
| if "schema_name" not in self.params: | ||||||||||
| raise ValueError("Cannot IMPORT FOREIGN SCHEMA without a schema_name!") | ||||||||||
| return str(self.params["schema_name"]) | ||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -241,23 +241,29 @@ class CSVDataSource(ForeignDataWrapperDataSource): | |
|
|
||
| If passed an URL, this will live query a CSV file on an HTTP server. If passed | ||
| S3 access credentials, this will scan a bucket for CSV files, infer their schema | ||
| and make them available to query over SQL. | ||
| and make them available to query over SQL. | ||
|
|
||
| For example: | ||
| For example: | ||
|
|
||
| \b | ||
| ``` | ||
| sgr mount csv target_schema -o@- <<EOF | ||
| { | ||
| "s3_endpoint": "cdn.mycompany.com:9000", | ||
| "s3_access_key": "ABCDEF", | ||
| "s3_secret_key": "GHIJKL", | ||
| "s3_bucket": "data", | ||
| "s3_object_prefix": "csv_files/current/", | ||
| "autodetect_header": true, | ||
| "autodetect_dialect": true, | ||
| "autodetect_encoding": true | ||
| } | ||
| { | ||
| "s3_access_key": "ABCD", | ||
| "s3_secret_key": "abcd", | ||
| "connection": | ||
| { | ||
| "connection_type": "s3", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation |
||
| "s3_bucket": "my-bucket-name", | ||
| "s3_endpoint": "s3.amazonaws.com", | ||
| "s3_region": "eu-west-3", | ||
| "s3_object_prefix": "", | ||
| "s3_object": "iris/iris.csv" | ||
| }, | ||
| "autodetect_header": true, | ||
| "autodetect_dialect": true, | ||
| "autodetect_encoding": true | ||
| } | ||
| EOF | ||
| ``` | ||
| """ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should be required, e.g.: