diff --git a/.gitignore b/.gitignore index 84b3cd2..03316eb 100644 --- a/.gitignore +++ b/.gitignore @@ -155,3 +155,8 @@ cython_debug/ # VS Code project files .vscode/ + +# Data files +data/**/*.csv +data/**/*.md +*.db \ No newline at end of file diff --git a/src/data_ingestion/get_data.py b/src/data_ingestion/get_data.py index bf8f8e5..5dae283 100644 --- a/src/data_ingestion/get_data.py +++ b/src/data_ingestion/get_data.py @@ -1,46 +1,126 @@ """Contains functions used to aquire the data from external sources""" +from typing import Any, Literal import zipfile import shutil import os import io import pathlib import requests +import pathlib +from ..utils import data_connections + +DB_PATH = pathlib.Path(".db") +DATA_PATH = pathlib.Path(__file__).parent.parent.parent / "data" +INPUT_PATH = DATA_PATH / "input" +ARTIFICIAL_HES_BASE_URL = f"https://s3.eu-west-2.amazonaws.com/files.digital.nhs.uk/assets/Services/Artificial+data/Artificial+HES+final" -def download_zip_from_url(zip_file_url:str, overwrite:bool=False, output_path:str=None) -> str: +def download_zip_from_url( + zip_file_url: str, overwrite: bool = False, output_path: pathlib.Path = None +) -> str: """Downloads a zipfile from the specified URL - + Parameters ---------- zip_file_url : str The url string of where the zipfile is held overwrite : bool - if True, then running this again will overwrite existing files of the same name, otherwise + if True, then running this again will overwrite existing files of the same name, otherwise it will not. - output_path : str - Where you want the zip to be saved to - if left as "None" then it will be saved to + output_path : pathlib.Path + Where you want the zip to be saved to - if left as "None" then it will be saved to "data/{filename}" Returns ---------- - output_path : str + output_path : pathlib.Path """ - filename = pathlib.Path(zip_file_url).name + filename = pathlib.Path(zip_file_url).stem + if output_path is None: - output_path = f"data/{filename}" + output_path = INPUT_PATH / filename if os.path.exists(output_path) and overwrite is True: shutil.rmtree(output_path, ignore_errors=False, onerror=None) elif os.path.exists(output_path) and overwrite is not True: raise Exception(f"The zipfile already exists at: {output_path}") - response = requests.get(zip_file_url, stream=True,timeout=3600) + response = requests.get(zip_file_url, stream=True, timeout=3600) downloaded_zip = zipfile.ZipFile(io.BytesIO(response.content)) downloaded_zip.extractall(output_path) return output_path +def download_artificial_hes_zip( + dataset_name: Literal["ae", "apc", "op"], + version: str = "202302_v1", + size: Literal["sample", "full"] = "sample", +) -> pathlib.Path: + """ + Download and unpack artificial hes zip file. + + Parameters + ---------- + dataset_name : Literal["ae", "apc", "op"] + Name of dataset to download. + version : str, optional + Version to download, by default "202302_v1" + size : str, optional + Size to download, by default "sample" + + Returns + ------- + pathlib.Path + Path to the downloaded file. + """ + zip_name = f"artificial_hes_{dataset_name}_{version}_{size}.zip" + zip_url = f"{ARTIFICIAL_HES_BASE_URL}/{zip_name}" + zip_path = download_zip_from_url(zip_url, overwrite=True) + return zip_path + + +def get_user_inputs() -> dict[str, Any]: + """ + Get user inputs to configure the main function. + + Returns + ------- + dict[str, Any] + User input variables. + """ + replace = input("Replace tables if they already exist? (y/n, default=n): ") + replace = replace == "y" + + if not replace: + exists_ok = input( + "Continue without error if tables already exist? (y/n, default=y): " + ) + exists_ok = exists_ok == "" or exists_ok == "y" + else: + exists_ok = True + + return { + "replace": replace, + "exists_ok": exists_ok, + } + + if __name__ == "__main__": - ARTIFICIAL_HES_URL = "https://s3.eu-west-2.amazonaws.com/files.digital.nhs.uk/assets/Services/Artificial+data/Artificial+HES+final/artificial_hes_ae_202302_v1_sample.zip" - download_zip_from_url(ARTIFICIAL_HES_URL,overwrite=True) + user_inputs = get_user_inputs() + + conn = data_connections.get_duckdb_connection() + zip_path = download_artificial_hes_zip("ae") + + for csv_path in INPUT_PATH.glob("**/*.csv"): + table_name = data_connections.create_table_from_csv( + conn, + csv_path, + replace=user_inputs["replace"], + exists_ok=user_inputs["exists_ok"], + ) + + df = data_connections.read_table_to_df(conn, table_name) + + print(f"Printing results from table '{table_name}'") + print(df) diff --git a/src/utils/data_connections.py b/src/utils/data_connections.py index 7429b0b..d506c63 100644 --- a/src/utils/data_connections.py +++ b/src/utils/data_connections.py @@ -1,77 +1,170 @@ """ Purpose of script: handles reading data in and writing data back out. """ -import sqlalchemy as sa -import os -import pandas as pd import logging +import pathlib +from textwrap import dedent +import duckdb +import pandas as pd +DB_PATH = pathlib.Path(".db") # Default database path logger = logging.getLogger(__name__) -def read_sql_file(sql_file_path: str, sql_file_name: str, database, schema, table) -> str: - """Reads a SQL file and replaces placeholders with given fields. +def get_duckdb_connection(db_path: pathlib.Path = DB_PATH) -> duckdb.DuckDBPyConnection: + """ + Create a connection to the duckdb database at the given path. - Parameters: - sql_file_name: .sql file name - database: database name - table: table name - schema: schema name - sql folder: location of sql file (relative to root directory) + Parameters + ---------- + db_path : pathlib.Path, optional + Path to the database file, by default DB_PATH - Returns: - string + Returns + ------- + duckdb.DuckDBPyConnection + Connection object. """ - logger.info(f"Reading in SQL script: {sql_file_name} from: {sql_file_path}.") + conn = duckdb.connect(str(db_path.resolve())) + conn.execute("SET GLOBAL pandas_analyze_sample=100000") + return conn - with open(os.path.join(sql_file_path, sql_file_name), 'r') as sql_file: - sql_query = sql_file.read() - sql_params = {'database': database, 'schema': schema, 'table': table} +def create_table( + conn: duckdb.DuckDBPyConnection, + source: str, + table_name: str, + replace: bool = False, + exists_ok: bool = True, +) -> str: + """ + Create DuckDB table. + + Parameters + ---------- + conn : duckdb.DuckDBPyConnection + Database connection. + source : str + Specifies the source in duckdb syntax. Can be a query string + or a file source (e.g. https://duckdb.org/docs/guides/import/csv_import). + table_name: str + Name of the table to be created. + replace : bool, optional + Should the table be replaced if it exists, by default False + exists_ok : bool, optional + Raise errors if the table already exists, by default True. + If replace=True then this parameter has no effect. + + Returns + ------- + str + Name of the table that was created. + """ + if replace: + create_expr = "CREATE OR REPLACE TABLE" + elif exists_ok: + create_expr = "CREATE TABLE IF NOT EXISTS" + else: + create_expr = "CREATE TABLE" + + query = f""" + {create_expr} {table_name} + AS + SELECT * FROM {source} + """ + conn.sql(dedent(query)) + + return table_name - new_sql_query = sql_query.format(**sql_params) - return new_sql_query +def create_table_from_csv( + conn: duckdb.DuckDBPyConnection, + csv_path: pathlib.Path, + replace: bool = False, + exists_ok: bool = True, +) -> str: + """ + Create DuckDB table using a csv source. + + Parameters + ---------- + conn : duckdb.DuckDBPyConnection + Database connection. + csv_path : pathlib.Path + Path to the csv file. + replace : bool, optional + Should the table be replaced if it exists, by default False + exists_ok : bool, optional + Raise errors if the table already exists, by default True. + If replace=True then this parameter has no effect. -def make_database_connection(server, database): - """Creates SQL Server connection. + Returns + ------- + str + Name of the table that was created. """ - conn = sa.create_engine(f"mssql+pyodbc://{server}/{database}?driver=SQL+Server&trusted_connection=yes", - fast_executemany=True) - - return conn + return create_table( + conn, + source=f"read_csv_auto('{csv_path.resolve()}')", + table_name=csv_path.stem, + replace=replace, + exists_ok=exists_ok, + ) -def get_df_from_server(conn, server, database, query) -> pd.DataFrame: - """Constructs a pandas DataFrame from running a SQL query on a given SQL server using SQL Alchemy . - Requires mssql and pyodbc packages. - Parameters: - server: server name - database: database name - query: string containing a sql query +def read_table_to_df(conn: duckdb.DuckDBPyConnection, table_name: str) -> pd.DataFrame: + """ + Read data from a duckdb table into a pandas dataframe. + + Parameters + ---------- + conn : duckdb.DuckDBPyConnection + Database connection. + table_name : str + Name of table to query. - Returns: - pandas Dataframe + Returns + ------- + pd.DataFrame + DataFrame from the query. """ - logger.info("Reading in dataframe from SQL Server.") - conn.execution_options(autocommit=True) - logger.info(f"Getting dataframe from SQL database {database}") - logger.info(f"Running query:\n\n {query}") - df = pd.read_sql_query(query, conn) + query = f"SELECT * FROM {table_name}" + df = conn.sql(query).df() return df -def write_df_to_server(conn, server, database, df_to_write, table_name) -> None: - """Writes a pandas DataFrame to a table on a given SQL server using SQL Alchemy. - Requires mssql and pyodbc packages. - Parameters: - database: database name - df_to_write: df to write to a SQL Server table - table_name: SQL Server table name +def write_df_to_table( + conn: duckdb.DuckDBPyConnection, + df: pd.DataFrame, + table_name: str, + replace: bool = False, + exists_ok: bool = True, +) -> str: + """ + Create DuckDB table using a pandas dataframe. + + Parameters + ---------- + conn : duckdb.DuckDBPyConnection + Database connection. + df : pd.DataFrame + Pandas dataframe to create the table with. + table_name : str + Name of the table to create. + replace : bool, optional + Should the table be replaced if it exists, by default False + exists_ok : bool, optional + Raise errors if the table already exists, by default True. + If replace=True then this parameter has no effect. Returns - Write to a SQL Server table. + ------- + str + Name of the table that was created. """ - logger.info(f"Writing dataframe to SQL Server designated {table_name}.") - conn.execution_options(autocommit=True) - df_to_write.to_sql(name=table_name, con=conn, if_exists='fail', index=False) + # NOTE: duckdb can read 'df' based on referencing via a string, so it + # looks like df is unused from the arguments, but that is just the syntax + # highlighting not realsing! + return create_table( + conn, source="df", table_name=table_name, replace=replace, exists_ok=exists_ok + )