Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,8 @@ cython_debug/

# VS Code project files
.vscode/

# Data files
data/**/*.csv
data/**/*.md
*.db
102 changes: 91 additions & 11 deletions src/data_ingestion/get_data.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,126 @@
"""Contains functions used to aquire the data from external sources"""

from typing import Any, Literal
import zipfile
import shutil
import os
import io
import pathlib
import requests
import pathlib
from ..utils import data_connections

DB_PATH = pathlib.Path(".db")
DATA_PATH = pathlib.Path(__file__).parent.parent.parent / "data"
INPUT_PATH = DATA_PATH / "input"
ARTIFICIAL_HES_BASE_URL = f"https://s3.eu-west-2.amazonaws.com/files.digital.nhs.uk/assets/Services/Artificial+data/Artificial+HES+final"


def download_zip_from_url(zip_file_url:str, overwrite:bool=False, output_path:str=None) -> str:
def download_zip_from_url(
zip_file_url: str, overwrite: bool = False, output_path: pathlib.Path = None
) -> str:
"""Downloads a zipfile from the specified URL

Parameters
----------
zip_file_url : str
The url string of where the zipfile is held
overwrite : bool
if True, then running this again will overwrite existing files of the same name, otherwise
if True, then running this again will overwrite existing files of the same name, otherwise
it will not.
output_path : str
Where you want the zip to be saved to - if left as "None" then it will be saved to
output_path : pathlib.Path
Where you want the zip to be saved to - if left as "None" then it will be saved to
"data/{filename}"

Returns
----------
output_path : str
output_path : pathlib.Path

"""
filename = pathlib.Path(zip_file_url).name
filename = pathlib.Path(zip_file_url).stem

if output_path is None:
output_path = f"data/{filename}"
output_path = INPUT_PATH / filename
if os.path.exists(output_path) and overwrite is True:
shutil.rmtree(output_path, ignore_errors=False, onerror=None)
elif os.path.exists(output_path) and overwrite is not True:
raise Exception(f"The zipfile already exists at: {output_path}")

response = requests.get(zip_file_url, stream=True,timeout=3600)
response = requests.get(zip_file_url, stream=True, timeout=3600)
downloaded_zip = zipfile.ZipFile(io.BytesIO(response.content))
downloaded_zip.extractall(output_path)
return output_path


def download_artificial_hes_zip(
dataset_name: Literal["ae", "apc", "op"],
version: str = "202302_v1",
size: Literal["sample", "full"] = "sample",
) -> pathlib.Path:
"""
Download and unpack artificial hes zip file.

Parameters
----------
dataset_name : Literal["ae", "apc", "op"]
Name of dataset to download.
version : str, optional
Version to download, by default "202302_v1"
size : str, optional
Size to download, by default "sample"

Returns
-------
pathlib.Path
Path to the downloaded file.
"""
zip_name = f"artificial_hes_{dataset_name}_{version}_{size}.zip"
zip_url = f"{ARTIFICIAL_HES_BASE_URL}/{zip_name}"
zip_path = download_zip_from_url(zip_url, overwrite=True)
return zip_path


def get_user_inputs() -> dict[str, Any]:
"""
Get user inputs to configure the main function.

Returns
-------
dict[str, Any]
User input variables.
"""
replace = input("Replace tables if they already exist? (y/n, default=n): ")
replace = replace == "y"

if not replace:
exists_ok = input(
"Continue without error if tables already exist? (y/n, default=y): "
)
exists_ok = exists_ok == "" or exists_ok == "y"
else:
exists_ok = True

return {
"replace": replace,
"exists_ok": exists_ok,
}


if __name__ == "__main__":
ARTIFICIAL_HES_URL = "https://s3.eu-west-2.amazonaws.com/files.digital.nhs.uk/assets/Services/Artificial+data/Artificial+HES+final/artificial_hes_ae_202302_v1_sample.zip"
download_zip_from_url(ARTIFICIAL_HES_URL,overwrite=True)
user_inputs = get_user_inputs()

conn = data_connections.get_duckdb_connection()
zip_path = download_artificial_hes_zip("ae")

for csv_path in INPUT_PATH.glob("**/*.csv"):
table_name = data_connections.create_table_from_csv(
conn,
csv_path,
replace=user_inputs["replace"],
exists_ok=user_inputs["exists_ok"],
)

df = data_connections.read_table_to_df(conn, table_name)

print(f"Printing results from table '{table_name}'")
print(df)
193 changes: 143 additions & 50 deletions src/utils/data_connections.py
Original file line number Diff line number Diff line change
@@ -1,77 +1,170 @@
"""
Purpose of script: handles reading data in and writing data back out.
"""
import sqlalchemy as sa
import os
import pandas as pd
import logging
import pathlib
from textwrap import dedent
import duckdb
import pandas as pd

DB_PATH = pathlib.Path(".db") # Default database path
logger = logging.getLogger(__name__)


def read_sql_file(sql_file_path: str, sql_file_name: str, database, schema, table) -> str:
"""Reads a SQL file and replaces placeholders with given fields.
def get_duckdb_connection(db_path: pathlib.Path = DB_PATH) -> duckdb.DuckDBPyConnection:
"""
Create a connection to the duckdb database at the given path.

Parameters:
sql_file_name: .sql file name
database: database name
table: table name
schema: schema name
sql folder: location of sql file (relative to root directory)
Parameters
----------
db_path : pathlib.Path, optional
Path to the database file, by default DB_PATH

Returns:
string
Returns
-------
duckdb.DuckDBPyConnection
Connection object.
"""
logger.info(f"Reading in SQL script: {sql_file_name} from: {sql_file_path}.")
conn = duckdb.connect(str(db_path.resolve()))
conn.execute("SET GLOBAL pandas_analyze_sample=100000")
return conn

with open(os.path.join(sql_file_path, sql_file_name), 'r') as sql_file:
sql_query = sql_file.read()

sql_params = {'database': database, 'schema': schema, 'table': table}
def create_table(
conn: duckdb.DuckDBPyConnection,
source: str,
table_name: str,
replace: bool = False,
exists_ok: bool = True,
) -> str:
"""
Create DuckDB table.

Parameters
----------
conn : duckdb.DuckDBPyConnection
Database connection.
source : str
Specifies the source in duckdb syntax. Can be a query string
or a file source (e.g. https://duckdb.org/docs/guides/import/csv_import).
table_name: str
Name of the table to be created.
replace : bool, optional
Should the table be replaced if it exists, by default False
exists_ok : bool, optional
Raise errors if the table already exists, by default True.
If replace=True then this parameter has no effect.

Returns
-------
str
Name of the table that was created.
"""
if replace:
create_expr = "CREATE OR REPLACE TABLE"
elif exists_ok:
create_expr = "CREATE TABLE IF NOT EXISTS"
else:
create_expr = "CREATE TABLE"

query = f"""
{create_expr} {table_name}
AS
SELECT * FROM {source}
"""
conn.sql(dedent(query))

return table_name

new_sql_query = sql_query.format(**sql_params)

return new_sql_query
def create_table_from_csv(
conn: duckdb.DuckDBPyConnection,
csv_path: pathlib.Path,
replace: bool = False,
exists_ok: bool = True,
) -> str:
"""
Create DuckDB table using a csv source.

Parameters
----------
conn : duckdb.DuckDBPyConnection
Database connection.
csv_path : pathlib.Path
Path to the csv file.
replace : bool, optional
Should the table be replaced if it exists, by default False
exists_ok : bool, optional
Raise errors if the table already exists, by default True.
If replace=True then this parameter has no effect.

def make_database_connection(server, database):
"""Creates SQL Server connection.
Returns
-------
str
Name of the table that was created.
"""
conn = sa.create_engine(f"mssql+pyodbc://{server}/{database}?driver=SQL+Server&trusted_connection=yes",
fast_executemany=True)

return conn
return create_table(
conn,
source=f"read_csv_auto('{csv_path.resolve()}')",
table_name=csv_path.stem,
replace=replace,
exists_ok=exists_ok,
)

def get_df_from_server(conn, server, database, query) -> pd.DataFrame:
"""Constructs a pandas DataFrame from running a SQL query on a given SQL server using SQL Alchemy .
Requires mssql and pyodbc packages.

Parameters:
server: server name
database: database name
query: string containing a sql query
def read_table_to_df(conn: duckdb.DuckDBPyConnection, table_name: str) -> pd.DataFrame:
"""
Read data from a duckdb table into a pandas dataframe.

Parameters
----------
conn : duckdb.DuckDBPyConnection
Database connection.
table_name : str
Name of table to query.

Returns:
pandas Dataframe
Returns
-------
pd.DataFrame
DataFrame from the query.
"""
logger.info("Reading in dataframe from SQL Server.")
conn.execution_options(autocommit=True)
logger.info(f"Getting dataframe from SQL database {database}")
logger.info(f"Running query:\n\n {query}")
df = pd.read_sql_query(query, conn)
query = f"SELECT * FROM {table_name}"
df = conn.sql(query).df()
return df

def write_df_to_server(conn, server, database, df_to_write, table_name) -> None:
"""Writes a pandas DataFrame to a table on a given SQL server using SQL Alchemy.
Requires mssql and pyodbc packages.

Parameters:
database: database name
df_to_write: df to write to a SQL Server table
table_name: SQL Server table name
def write_df_to_table(
conn: duckdb.DuckDBPyConnection,
df: pd.DataFrame,
table_name: str,
replace: bool = False,
exists_ok: bool = True,
) -> str:
"""
Create DuckDB table using a pandas dataframe.

Parameters
----------
conn : duckdb.DuckDBPyConnection
Database connection.
df : pd.DataFrame
Pandas dataframe to create the table with.
table_name : str
Name of the table to create.
replace : bool, optional
Should the table be replaced if it exists, by default False
exists_ok : bool, optional
Raise errors if the table already exists, by default True.
If replace=True then this parameter has no effect.

Returns
Write to a SQL Server table.
-------
str
Name of the table that was created.
"""
logger.info(f"Writing dataframe to SQL Server designated {table_name}.")
conn.execution_options(autocommit=True)
df_to_write.to_sql(name=table_name, con=conn, if_exists='fail', index=False)
# NOTE: duckdb can read 'df' based on referencing via a string, so it
# looks like df is unused from the arguments, but that is just the syntax
# highlighting not realsing!
return create_table(
conn, source="df", table_name=table_name, replace=replace, exists_ok=exists_ok
)