From dc2111e33aeebb9c69f4b1b4bdb026b0c04a2ee7 Mon Sep 17 00:00:00 2001 From: Leo Rudczenko Date: Wed, 22 May 2024 10:56:34 +0100 Subject: [PATCH] Move all full coding examples into individual scripts and clean up their formatting --- .../connecting_to_databases/db_params.py | 32 +++++ .../connecting_to_databases/oracle_env.py | 2 + .../connecting_to_databases/oracle_lobs.py | 14 +++ .../standalone_connect.py | 3 + .../copy/demo_copy_full.py} | 18 +-- docs/code_demos/copy/demo_copy_iter_rows.py | 13 ++ docs/code_demos/copy/demo_copy_rows.py | 40 +++++++ docs/code_demos/copy/demo_copy_table_rows.py | 7 ++ .../error_handling/demo_extract_error.py} | 0 .../error_handling/demo_log_error.py | 23 ++++ .../demo_sql_conflict_error.py} | 3 +- docs/code_demos/extract/demo_fetch.py | 12 ++ docs/code_demos/extract/demo_iter.py | 9 ++ docs/code_demos/extract/demo_row_factory.py | 16 +++ .../code_demos/load/demo_executemany_named.py | 20 ++++ .../load/demo_executemany_positional.py | 17 +++ .../load/demo_load_full.py} | 8 +- docs/code_demos/load/demo_load_min.py | 14 +++ .../recipes/apache_airflow_integration.py | 27 +++++ docs/code_demos/recipes/csv_files.py | 65 ++++++++++ docs/code_demos/recipes/csv_files_pandas.py | 11 ++ docs/code_demos/recipes/database_to_api.py | 113 ++++++++++++++++++ .../recipes/database_to_database.py | 57 +++++++++ docs/code_demos/transform/demo_return.py | 31 +++++ docs/code_demos/transform/demo_yield.py | 27 +++++ docs/code_demos/utilities/enable_logger.py | 3 + .../utilities/return_autogenerated.py | 10 ++ docs/code_demos/utilities/table_info.py | 6 + docs/code_demos/utilities/use_logger.py | 7 ++ docs/connecting_to_databases.rst | 46 ++----- docs/demo_namedtuple.py | 11 -- docs/etl_functions/copy.rst | 62 +--------- docs/etl_functions/error_handling.rst | 25 +--- docs/etl_functions/extract.rst | 25 +--- docs/etl_functions/load.rst | 54 +-------- docs/etl_functions/transform.rst | 45 +------ docs/index.rst | 4 +- docs/recipes/apache_airflow_integration.rst | 31 +---- docs/recipes/csv_files.rst | 82 +------------ docs/recipes/database_to_api.rst | 109 +---------------- docs/recipes/database_to_database.rst | 57 +-------- docs/utilities.rst | 40 ++----- 42 files changed, 652 insertions(+), 547 deletions(-) create mode 100644 docs/code_demos/connecting_to_databases/db_params.py create mode 100644 docs/code_demos/connecting_to_databases/oracle_env.py create mode 100644 docs/code_demos/connecting_to_databases/oracle_lobs.py create mode 100644 docs/code_demos/connecting_to_databases/standalone_connect.py rename docs/{demo_copy.py => code_demos/copy/demo_copy_full.py} (72%) create mode 100644 docs/code_demos/copy/demo_copy_iter_rows.py create mode 100644 docs/code_demos/copy/demo_copy_rows.py create mode 100644 docs/code_demos/copy/demo_copy_table_rows.py rename docs/{demo_error.py => code_demos/error_handling/demo_extract_error.py} (100%) create mode 100644 docs/code_demos/error_handling/demo_log_error.py rename docs/{demo_on_conflict.py => code_demos/error_handling/demo_sql_conflict_error.py} (92%) create mode 100644 docs/code_demos/extract/demo_fetch.py create mode 100644 docs/code_demos/extract/demo_iter.py create mode 100644 docs/code_demos/extract/demo_row_factory.py create mode 100644 docs/code_demos/load/demo_executemany_named.py create mode 100644 docs/code_demos/load/demo_executemany_positional.py rename docs/{demo_load.py => code_demos/load/demo_load_full.py} (81%) create mode 100644 docs/code_demos/load/demo_load_min.py create mode 100644 docs/code_demos/recipes/apache_airflow_integration.py create mode 100644 docs/code_demos/recipes/csv_files.py create mode 100644 docs/code_demos/recipes/csv_files_pandas.py create mode 100644 docs/code_demos/recipes/database_to_api.py create mode 100644 docs/code_demos/recipes/database_to_database.py create mode 100644 docs/code_demos/transform/demo_return.py create mode 100644 docs/code_demos/transform/demo_yield.py create mode 100644 docs/code_demos/utilities/enable_logger.py create mode 100644 docs/code_demos/utilities/return_autogenerated.py create mode 100644 docs/code_demos/utilities/table_info.py create mode 100644 docs/code_demos/utilities/use_logger.py delete mode 100644 docs/demo_namedtuple.py diff --git a/docs/code_demos/connecting_to_databases/db_params.py b/docs/code_demos/connecting_to_databases/db_params.py new file mode 100644 index 0000000..4c9ac13 --- /dev/null +++ b/docs/code_demos/connecting_to_databases/db_params.py @@ -0,0 +1,32 @@ +"""ETL Helper script to demonstrate DbParams.""" +import etlhelper as etl + +ORACLEDB = etl.DbParams( + dbtype="ORACLE", + host="localhost", + port=1521, + dbname="mydata", + user="oracle_user", +) + +POSTGRESDB = etl.DbParams( + dbtype="PG", + host="localhost", + port=5432, + dbname="mydata", + user="postgres_user", +) + +SQLITEDB = etl.DbParams( + dbtype="SQLITE", + filename="/path/to/file.db", +) + +MSSQLDB = etl.DbParams( + dbtype="MSSQL", + host="localhost", + port=1433, + dbname="mydata", + user="mssql_user", + odbc_driver="ODBC Driver 17 for SQL Server", +) diff --git a/docs/code_demos/connecting_to_databases/oracle_env.py b/docs/code_demos/connecting_to_databases/oracle_env.py new file mode 100644 index 0000000..241a564 --- /dev/null +++ b/docs/code_demos/connecting_to_databases/oracle_env.py @@ -0,0 +1,2 @@ +import os +os.environ["ORACLE_PASSWORD"] = "some-secret-password" diff --git a/docs/code_demos/connecting_to_databases/oracle_lobs.py b/docs/code_demos/connecting_to_databases/oracle_lobs.py new file mode 100644 index 0000000..f995797 --- /dev/null +++ b/docs/code_demos/connecting_to_databases/oracle_lobs.py @@ -0,0 +1,14 @@ +"""ETL Helper script to demonstrate oracle handling Large Objects (LOBs).""" +import etlhelper as etl +import oracledb +from my_databases import ORACLEDB + +select_sql = "SELECT my_clob, my_blob FROM my_table" + +with ORACLEDB.connect("ORA_PASSWORD") as conn: + # By default, ETL Helper returns native types + result_as_native = etl.fetchall(select_sql, conn) + + # Update oracledb settings to return LOBs + oracledb.defaults.fetch_lobs = True + result_as_lobs = etl.fetchall(select_sql, conn) diff --git a/docs/code_demos/connecting_to_databases/standalone_connect.py b/docs/code_demos/connecting_to_databases/standalone_connect.py new file mode 100644 index 0000000..a83473c --- /dev/null +++ b/docs/code_demos/connecting_to_databases/standalone_connect.py @@ -0,0 +1,3 @@ +import etlhelper as etl +from my_databases import ORACLEDB +oracle_conn = etl.connect(ORACLEDB, "ORACLE_PASSWORD") diff --git a/docs/demo_copy.py b/docs/code_demos/copy/demo_copy_full.py similarity index 72% rename from docs/demo_copy.py rename to docs/code_demos/copy/demo_copy_full.py index fe16939..41d7020 100644 --- a/docs/demo_copy.py +++ b/docs/code_demos/copy/demo_copy_full.py @@ -1,6 +1,7 @@ """ETL Helper script to copy records between databases.""" -import datetime as dt import sqlite3 +import datetime as dt +from typing import Generator import etlhelper as etl igneous_db_file = "igneous_rocks.db" @@ -12,16 +13,15 @@ name TEXT UNIQUE, category TEXT, last_update DATETIME - )""" - - + ) +""" select_sql = "SELECT name FROM igneous_rock" -def transform(chunk): +def transform(chunk: list[dict]) -> Generator[dict, None, None]: for row in chunk: - row['category'] = 'igneous' - row['last_update'] = dt.datetime.now() + row["category"] = "igneous" + row["last_update"] = dt.datetime.now() yield row @@ -34,8 +34,8 @@ def transform(chunk): # Copy data rows = etl.iter_rows(select_sql, src, transform=transform) - etl.load('rock', dest, rows) + etl.load("rock", dest, rows) # Confirm transfer - for row in etl.fetchall('SELECT * FROM rock', dest): + for row in etl.fetchall("SELECT * FROM rock", dest): print(row) diff --git a/docs/code_demos/copy/demo_copy_iter_rows.py b/docs/code_demos/copy/demo_copy_iter_rows.py new file mode 100644 index 0000000..d9b6dc1 --- /dev/null +++ b/docs/code_demos/copy/demo_copy_iter_rows.py @@ -0,0 +1,13 @@ +"""ETL Helper script to demonstrate copying records between databases with iter_rows and load.""" +import etlhelper as etl +from my_databases import POSTGRESDB, ORACLEDB + +select_sql = """ + SELECT id, name, value FROM my_table + WHERE value > :min_value +""" + +with ORACLEDB.connect("ORA_PASSWORD") as src_conn: + with POSTGRESDB.connect("PG_PASSWORD") as dest_conn: + rows = etl.iter_rows(select_sql, src_conn, parameters={"min_value": 99}) + etl.load("my_table", dest_conn, rows) diff --git a/docs/code_demos/copy/demo_copy_rows.py b/docs/code_demos/copy/demo_copy_rows.py new file mode 100644 index 0000000..569366e --- /dev/null +++ b/docs/code_demos/copy/demo_copy_rows.py @@ -0,0 +1,40 @@ +"""ETL Helper script to demonstrate copy_rows.""" +import etlhelper as etl +from etlhelper.row_factories import namedtuple_row_factory +from my_databases import POSTGRESDB, ORACLEDB + +select_sql = """ + SELECT + customer_id, + SUM (amount) AS total_amount + FROM payment + WHERE id > 1000 + GROUP BY customer_id +""" + +# This insert query uses positional parameters, so a namedtuple_row_factory +# is used. +insert_sql = """ + INSERT INTO dest ( + customer_id, + total_amount, + loaded_by, + load_time + ) + VALUES ( + %s, + %s, + current_user, + now() + ) +""" + +with ORACLEDB.connect("ORA_PASSWORD") as src_conn: + with POSTGRESDB.connect("PG_PASSWORD") as dest_conn: + etl.copy_rows( + select_sql, + src_conn, + insert_sql, + dest_conn, + row_factory=namedtuple_row_factory, + ) diff --git a/docs/code_demos/copy/demo_copy_table_rows.py b/docs/code_demos/copy/demo_copy_table_rows.py new file mode 100644 index 0000000..b1ef548 --- /dev/null +++ b/docs/code_demos/copy/demo_copy_table_rows.py @@ -0,0 +1,7 @@ +"""ETL Helper script to demonstrate copy_table_rows.""" +import etlhelper as etl +from my_databases import POSTGRESDB, ORACLEDB + +with ORACLEDB.connect("ORA_PASSWORD") as src_conn: + with POSTGRESDB.connect("PG_PASSWORD") as dest_conn: + etl.copy_table_rows("my_table", src_conn, dest_conn) diff --git a/docs/demo_error.py b/docs/code_demos/error_handling/demo_extract_error.py similarity index 100% rename from docs/demo_error.py rename to docs/code_demos/error_handling/demo_extract_error.py diff --git a/docs/code_demos/error_handling/demo_log_error.py b/docs/code_demos/error_handling/demo_log_error.py new file mode 100644 index 0000000..6e950f8 --- /dev/null +++ b/docs/code_demos/error_handling/demo_log_error.py @@ -0,0 +1,23 @@ +"""ETL Helper script to demonstrate logging errors.""" +import logging +import sqlite3 +import etlhelper as etl + +etl.log_to_console() +logger = logging.getLogger("etlhelper") + +db_file = "igneous_rocks.db" + +rows = [ + {"name": "basalt", "grain_size": "fine"}, + {"name": "granite", "grain_size": "coarse"} +] + + +def log_errors(failed_rows: list[tuple[dict, Exception]]) -> None: + for row, exception in failed_rows: + logger.error(exception) + + +with sqlite3.connect(db_file) as conn: + etl.load("igneous_rock", conn, rows, on_error=log_errors) diff --git a/docs/demo_on_conflict.py b/docs/code_demos/error_handling/demo_sql_conflict_error.py similarity index 92% rename from docs/demo_on_conflict.py rename to docs/code_demos/error_handling/demo_sql_conflict_error.py index f3aaca4..3ec435a 100644 --- a/docs/demo_on_conflict.py +++ b/docs/code_demos/error_handling/demo_sql_conflict_error.py @@ -3,6 +3,7 @@ import etlhelper as etl db_file = "igneous_rocks.db" + create_sql = """ CREATE TABLE IF NOT EXISTS igneous_rock ( id INTEGER PRIMARY KEY, @@ -29,5 +30,5 @@ etl.executemany(insert_sql, conn, rows=igneous_rocks) # Confirm selection - for row in etl.fetchall('SELECT * FROM igneous_rock', conn): + for row in etl.fetchall("SELECT * FROM igneous_rock", conn): print(row) diff --git a/docs/code_demos/extract/demo_fetch.py b/docs/code_demos/extract/demo_fetch.py new file mode 100644 index 0000000..942b11c --- /dev/null +++ b/docs/code_demos/extract/demo_fetch.py @@ -0,0 +1,12 @@ +"""ETL Helper script to demonstrate using fetch functions.""" +import sqlite3 +import etlhelper as etl + +db_file = "igneous_rocks.db" + +with sqlite3.connect(db_file) as conn: + first_row = etl.fetchone("SELECT * FROM igneous_rock", conn) + all_rows = etl.fetchall("SELECT * FROM igneous_rock", conn) + +print(first_row) +print(all_rows) diff --git a/docs/code_demos/extract/demo_iter.py b/docs/code_demos/extract/demo_iter.py new file mode 100644 index 0000000..3fecedc --- /dev/null +++ b/docs/code_demos/extract/demo_iter.py @@ -0,0 +1,9 @@ +"""ETL Helper script to demonstrate iter_rows.""" +import sqlite3 +import etlhelper as etl + +db_file = "igneous_rocks.db" + +with sqlite3.connect(db_file) as conn: + for row in etl.iter_rows("SELECT * FROM igneous_rock", conn): + print(row) diff --git a/docs/code_demos/extract/demo_row_factory.py b/docs/code_demos/extract/demo_row_factory.py new file mode 100644 index 0000000..6a5d21d --- /dev/null +++ b/docs/code_demos/extract/demo_row_factory.py @@ -0,0 +1,16 @@ +"""ETL Helper script to demonstrate using fetch functions with a given row factory.""" +import sqlite3 +import etlhelper as etl +from etlhelper.row_factories import namedtuple_row_factory + +db_file = "igneous_rocks.db" + +with sqlite3.connect(db_file) as conn: + row = etl.fetchone( + "SELECT * FROM igneous_rock", + conn, + row_factory=namedtuple_row_factory, + ) + +print(row) +print(row.name) diff --git a/docs/code_demos/load/demo_executemany_named.py b/docs/code_demos/load/demo_executemany_named.py new file mode 100644 index 0000000..0c791c7 --- /dev/null +++ b/docs/code_demos/load/demo_executemany_named.py @@ -0,0 +1,20 @@ +"""ETL Helper script to demonstrate using executemany with a named placeholder query.""" +import sqlite3 +import etlhelper as etl + +db_file = "igneous_rocks.db" + +# Insert query changes case and adds update_at column +insert_sql = """ + INSERT INTO igneous_rocks (name, grain_size, updated_at) + VALUES (:name, UPPER(:grainsize), datetime('now')) +""" + +rows = [ + {"name": "basalt", "grain_size": "fine"}, + {"name": "granite", "grain_size": "coarse"} +] + +with sqlite3.connect(db_file) as conn: + # Note that table must already exist + processed, failed = etl.executemany(insert_sql, conn, rows) diff --git a/docs/code_demos/load/demo_executemany_positional.py b/docs/code_demos/load/demo_executemany_positional.py new file mode 100644 index 0000000..ec3275d --- /dev/null +++ b/docs/code_demos/load/demo_executemany_positional.py @@ -0,0 +1,17 @@ +"""ETL Helper script to demonstrate using executemany with a positional placeholder query.""" +import sqlite3 +import etlhelper as etl + +db_file = "igneous_rocks.db" + +# Positional placeholders for data in tuple format +insert_sql = """ + INSERT INTO igneous_rocks (name, grain_size, updated_at) + VALUES (?, UPPER(?), datetime('now')) +""" + +rows = [("basalt", "fine"), ("granite", "coarse")] + +with sqlite3.connect(db_file) as conn: + # Note that table must already exist + processed, failed = etl.executemany(insert_sql, conn, rows) diff --git a/docs/demo_load.py b/docs/code_demos/load/demo_load_full.py similarity index 81% rename from docs/demo_load.py rename to docs/code_demos/load/demo_load_full.py index d4128b6..63e781d 100644 --- a/docs/demo_load.py +++ b/docs/code_demos/load/demo_load_full.py @@ -3,12 +3,14 @@ import etlhelper as etl db_file = "igneous_rocks.db" + create_sql = """ CREATE TABLE IF NOT EXISTS igneous_rock ( id INTEGER PRIMARY KEY, name TEXT UNIQUE, grain_size TEXT - )""" + ) +""" igneous_rocks = [ {"name": "basalt", "grain_size": "fine"}, @@ -21,8 +23,8 @@ etl.execute(create_sql, conn) # Insert rows - etl.load('igneous_rock', conn, igneous_rocks) + etl.load("igneous_rock", conn, igneous_rocks) # Confirm selection - for row in etl.fetchall('SELECT * FROM igneous_rock', conn): + for row in etl.fetchall("SELECT * FROM igneous_rock", conn): print(row) diff --git a/docs/code_demos/load/demo_load_min.py b/docs/code_demos/load/demo_load_min.py new file mode 100644 index 0000000..e54691d --- /dev/null +++ b/docs/code_demos/load/demo_load_min.py @@ -0,0 +1,14 @@ +"""ETL Helper script to demonstrate load.""" +import sqlite3 +import etlhelper as etl + +db_file = "igneous_rocks.db" + +rows = [ + {"name": "basalt", "grain_size": "fine"}, + {"name": "granite", "grain_size": "coarse"} +] + +with sqlite3.connect(db_file) as conn: + # Note that table must already exist + processed, failed = etl.load("igneous_rock", conn, rows) diff --git a/docs/code_demos/recipes/apache_airflow_integration.py b/docs/code_demos/recipes/apache_airflow_integration.py new file mode 100644 index 0000000..c6322e5 --- /dev/null +++ b/docs/code_demos/recipes/apache_airflow_integration.py @@ -0,0 +1,27 @@ +"""ETL Helper script to demonstrate using Apache Airflow to schedule tasks.""" +import datetime as dt +from airflow import DAG +from airflow.operators.python_operator import PythonOperator +from database_to_database import copy_readings + + +def copy_readings_with_args(**kwargs) -> None: + # Set arguments for copy_readings from context + start = kwargs.get("prev_execution_date") + end = kwargs.get("execution_date") + copy_readings(start, end) + + +dag = DAG( + "readings", + schedule_interval=dt.timedelta(days=1), + start_date=dt.datetime(2019, 8, 1), + catchup=True, +) + +t1 = PythonOperator( + task_id="copy_readings", + python_callable=copy_readings_with_args, + provide_context=True, + dag=dag, +) diff --git a/docs/code_demos/recipes/csv_files.py b/docs/code_demos/recipes/csv_files.py new file mode 100644 index 0000000..04324ae --- /dev/null +++ b/docs/code_demos/recipes/csv_files.py @@ -0,0 +1,65 @@ +""" +Script to create database and load observations data from csv file. It also +demonstrates how an `on_error` function can handle failed rows. + +Generate observations.csv with: +curl 'https://sensors.bgs.ac.uk/FROST-Server/v1.1/Observations?$select=@iot.id,result,phenomenonTime&$top=20000&$resultFormat=csv' -o observations.csv # noqa +""" +import csv +import sqlite3 +import datetime as dt +from typing import Iterable + +import etlhelper as etl + + +def load_observations(csv_file: str, conn: sqlite3.Connection) -> None: + """Load observations from csv_file to db_file.""" + # Drop table (helps with repeated test runs!) + drop_table_sql = """ + DROP TABLE IF EXISTS observations + """ + etl.execute(drop_table_sql, conn) + + # Create table (reject ids with no remainder when divided by 1000) + create_table_sql = """ + CREATE TABLE IF NOT EXISTS observations ( + id INTEGER PRIMARY KEY CHECK (id % 1000), + time TIMESTAMP, + result FLOAT + )""" + etl.execute(create_table_sql, conn) + + # Load data + with open(csv_file, "rt") as f: + reader = csv.DictReader(f) + etl.load("observations", conn, reader, transform=transform, on_error=on_error) + + +# A transform function that takes an iterable of rows and returns an iterable +# of rows. +def transform(rows: Iterable[dict]) -> Iterable[dict]: + """Rename time column and convert to Python datetime.""" + for row in rows: + # Dictionaries are mutable, so rows can be modified in place. + time_value = row.pop("phenomenonTime") + row["time"] = dt.datetime.strptime(time_value, "%Y-%m-%dT%H:%M:%S.%fZ") + + return rows + + +# The on_error function is called after each chunk with all the failed rows +def on_error(failed_rows: list[tuple[dict, Exception]]) -> None: + """Print the IDs of failed rows""" + rows, exceptions = zip(*failed_rows) + failed_ids = [row["id"] for row in rows] + print(f"Failed IDs: {failed_ids}") + + +if __name__ == "__main__": + from etlhelper import log_to_console + log_to_console() + + db = etl.DbParams(dbtype="SQLITE", filename="observations.sqlite") + with db.connect() as conn: + load_observations("observations.csv", conn) diff --git a/docs/code_demos/recipes/csv_files_pandas.py b/docs/code_demos/recipes/csv_files_pandas.py new file mode 100644 index 0000000..136c637 --- /dev/null +++ b/docs/code_demos/recipes/csv_files_pandas.py @@ -0,0 +1,11 @@ +"""ETL Helper script to demonstrate compatibility when creating an SQLAlchemy connection.""" +import pandas as pd +from sqlalchemy import create_engine + +from my_databases import ORACLEDB + +engine = create_engine(ORACLEDB.get_sqlalchemy_connection_string("ORACLE_PASSWORD")) + +sql = "SELECT * FROM my_table" +df = pd.read_sql(sql, engine) +df.to_csv("my_data.csv", header=True, index=False, float_format="%.3f") diff --git a/docs/code_demos/recipes/database_to_api.py b/docs/code_demos/recipes/database_to_api.py new file mode 100644 index 0000000..8534f32 --- /dev/null +++ b/docs/code_demos/recipes/database_to_api.py @@ -0,0 +1,113 @@ +"""ETL Helper script to demonstrate creating an API to a database using aiohttp.""" +import asyncio +import json +import logging +import datetime as dt + +import aiohttp +from etlhelper import iter_chunks + +from db import ORACLE_DB + +logger = logging.getLogger("copy_sensors_async") + +SELECT_SENSORS = """ + SELECT CODE, DESCRIPTION + FROM BGS.DIC_SEN_SENSOR + WHERE date_updated BETWEEN :startdate AND :enddate + ORDER BY date_updated + """ +BASE_URL = "http://localhost:9200/" +HEADERS = {"Content-Type": "application/json"} + + +def copy_sensors(startdate: dt.datetime, enddate: dt.datetime) -> None: + """Read sensors from Oracle and post to REST API.""" + logger.info("Copying sensors with timestamps from %s to %s", + startdate.isoformat(), enddate.isoformat()) + row_count = 0 + + with ORACLE_DB.connect("ORACLE_PASSWORD") as conn: + # chunks is a generator that yields lists of dictionaries + chunks = iter_chunks( + SELECT_SENSORS, + conn, + parameters={"startdate": startdate, "enddate": enddate}, + transform=transform_sensors, + ) + + for chunk in chunks: + result = asyncio.run(post_chunk(chunk)) + row_count += len(result) + logger.info("%s items transferred", row_count) + + logger.info("Transfer complete") + + +def transform_sensors(chunk: list[tuple]) -> list[tuple]: + """Transform rows to dictionaries suitable for converting to JSON.""" + new_chunk = [] + + for row in chunk: + new_row = { + "sample_code": row.CODE, + "description": row.DESCRIPTION, + "metadata": { + "source": "ORACLE_DB", # fixed value + "transferred_at": dt.datetime.now().isoformat(), # dynamic value + } + } + logger.debug(new_row) + new_chunk.append(new_row) + + return new_chunk + + +async def post_chunk(chunk: list[tuple]) -> list: + """Post multiple items to API asynchronously.""" + async with aiohttp.ClientSession() as session: + # Build list of tasks + tasks = [] + for item in chunk: + tasks.append(post_one(item, session)) + + # Process tasks in parallel. An exception in any will be raised. + result = await asyncio.gather(*tasks) + + return result + + +async def post_one(item: tuple, session: aiohttp.ClientSession) -> int: + """Post a single item to API using existing aiohttp Session.""" + # Post the item + response = await session.post( + BASE_URL + "sensors/_doc", + headers=HEADERS, + data=json.dumps(item), + ) + + # Log responses before throwing errors because error info is not included + # in generated Exceptions and so cannot otherwise be seen for debugging. + if response.status >= 400: + response_text = await response.text() + logger.error( + "The following item failed: %s\nError message:\n(%s)", + item, + response_text, + ) + await response.raise_for_status() + + return response.status + + +if __name__ == "__main__": + # Configure logging + handler = logging.StreamHandler() + formatter = logging.Formatter("%(asctime)s %(levelname)-8s %(message)s") + handler.setFormatter(formatter) + logger.setLevel(logging.INFO) + logger.addHandler(handler) + + # Copy data from 1 January 2000 to 00:00:00 today + today = dt.datetime.combine(dt.date.today(), dt.time.min) + copy_sensors(dt.datetime(2000, 1, 1), today) diff --git a/docs/code_demos/recipes/database_to_database.py b/docs/code_demos/recipes/database_to_database.py new file mode 100644 index 0000000..d053430 --- /dev/null +++ b/docs/code_demos/recipes/database_to_database.py @@ -0,0 +1,57 @@ +"""ETL Helper script to demonstrate copying data from an Oracle database into a PostgreSQL database.""" +import datetime as dt +from textwrap import dedent +import etlhelper as etl +from my_databases import ORACLEDB, POSTGRESDB + +CREATE_SQL = dedent(""" + CREATE TABLE IF NOT EXISTS sensordata.readings + ( + sensor_data_id bigint PRIMARY KEY, + measure_id bigint, + time_stamp timestamp without time zone, + meas_value double precision + ) + """).strip() + +DELETE_SQL = dedent(""" + DELETE FROM sensordata.readings + WHERE time_stamp BETWEEN %(startdate)s AND %(enddate)s + """).strip() + +SELECT_SQL = dedent(""" + SELECT id, measure_id, time_stamp, reading + FROM sensor_data + WHERE time_stamp BETWEEN :startdate AND :enddate + ORDER BY time_stamp + """).strip() + +INSERT_SQL = dedent(""" + INSERT INTO sensordata.readings (sensor_data_id, measure_id, time_stamp, + meas_value) + VALUES (%s, %s, %s, %s) + """).strip() + + +def copy_readings(startdate: dt.datetime, enddate: dt.datetime) -> None: + params = {"startdate": startdate, "enddate": enddate} + + with ORACLEDB.connect("ORA_PASSWORD") as src_conn: + with POSTGRESDB.connect("PG_PASSWORD") as dest_conn: + etl.execute(CREATE_SQL, dest_conn) + etl.execute(DELETE_SQL, dest_conn, parameters=params) + etl.copy_rows( + SELECT_SQL, + src_conn, + INSERT_SQL, + dest_conn, + parameters=params, + ) + + +if __name__ == "__main__": + # Copy data from 00:00:00 yesterday to 00:00:00 today + today = dt.combine(dt.date.today(), dt.time.min) + yesterday = today - dt.timedelta(1) + + copy_readings(yesterday, today) diff --git a/docs/code_demos/transform/demo_return.py b/docs/code_demos/transform/demo_return.py new file mode 100644 index 0000000..a91a332 --- /dev/null +++ b/docs/code_demos/transform/demo_return.py @@ -0,0 +1,31 @@ +"""ETL Helper script to demonstrate using a transform function which returns a list of rows.""" +import random +import sqlite3 +from typing import Iterator +import etlhelper as etl +from etlhelper.row_factories import namedtuple_row_factory + +db_file = "igneous_rocks.db" +select_sql = "SELECT * FROM igneous_rock" + + +def my_transform(chunk: Iterator[tuple]) -> list[tuple]: + # Append random integer (1-10), filter if <5. + + new_chunk = [] + for row in chunk: # each row is a namedtuple (immutable) + extra_value = random.randrange(10) + if extra_value >= 5: # some rows are dropped + new_row = (*row, extra_value) # new rows have extra column + new_chunk.append(new_row) + + return new_chunk + + +with sqlite3.connect(db_file) as conn: + rows = etl.fetchall( + select_sql, + conn, + row_factory=namedtuple_row_factory, + transform=my_transform, + ) diff --git a/docs/code_demos/transform/demo_yield.py b/docs/code_demos/transform/demo_yield.py new file mode 100644 index 0000000..3c83b38 --- /dev/null +++ b/docs/code_demos/transform/demo_yield.py @@ -0,0 +1,27 @@ +"""ETL Helper script to demonstrate using a transform function which yields individual rows.""" +import sqlite3 +from typing import Iterator +import etlhelper as etl +from etlhelper.row_factories import dict_row_factory + +db_file = "igneous_rocks.db" +select_sql = "SELECT * FROM igneous_rock" + + +def my_transform(chunk: Iterator[dict]) -> Iterator[dict]: + # Add prefix to id, remove newlines, set lower case names + + for row in chunk: # each row is a dictionary (mutable) + row["id"] += 1000 + row["description"] = row["description"].replace("\n", " ") + row["name"] = row["name"].lower() + yield row + + +with sqlite3.connect(db_file) as conn: + rows = etl.fetchall( + select_sql, + conn, + row_factory=dict_row_factory, + transform=my_transform, + ) diff --git a/docs/code_demos/utilities/enable_logger.py b/docs/code_demos/utilities/enable_logger.py new file mode 100644 index 0000000..e49b6d0 --- /dev/null +++ b/docs/code_demos/utilities/enable_logger.py @@ -0,0 +1,3 @@ +import etlhelper as etl + +etl.log_to_console() diff --git a/docs/code_demos/utilities/return_autogenerated.py b/docs/code_demos/utilities/return_autogenerated.py new file mode 100644 index 0000000..cc67a15 --- /dev/null +++ b/docs/code_demos/utilities/return_autogenerated.py @@ -0,0 +1,10 @@ +"""ETL Helper script to demonstrate autogenerated field values being returned.""" +import etlhelper as etl +from my_databases import POSTGRESDB + +insert_sql = "INSERT INTO my_table (message) VALUES ('hello') RETURNING id" + +with POSTGRESDB.connect("PGPASSWORD") as conn: + result = etl.fetchone(insert_sql, conn) + +print(result["id"]) diff --git a/docs/code_demos/utilities/table_info.py b/docs/code_demos/utilities/table_info.py new file mode 100644 index 0000000..271b35d --- /dev/null +++ b/docs/code_demos/utilities/table_info.py @@ -0,0 +1,6 @@ +"""ETL Helper script to demonstrate table_info.""" +from etlhelper.utils import table_info +from my_databases import ORACLEDB + +with ORACLEDB.connect("ORA_PASSWORD") as conn: + columns = table_info("my_table", conn, schema="my_schema") diff --git a/docs/code_demos/utilities/use_logger.py b/docs/code_demos/utilities/use_logger.py new file mode 100644 index 0000000..920b556 --- /dev/null +++ b/docs/code_demos/utilities/use_logger.py @@ -0,0 +1,7 @@ +import logging + +import etlhelper as etl + +etl.log_to_console() +etl_logger = logging.getLogger("etlhelper") +etl_logger.info("Hello world!") diff --git a/docs/connecting_to_databases.rst b/docs/connecting_to_databases.rst index e8cc084..a60b0b4 100644 --- a/docs/connecting_to_databases.rst +++ b/docs/connecting_to_databases.rst @@ -25,22 +25,8 @@ The instantiation checks that the correct attributes have been provided for the specified ``dbtype``. See :ref:`passwords ` section for how to provide passwords. -.. code:: python - - import etlhelper as etl - - ORACLEDB = etl.DbParams(dbtype='ORACLE', host="localhost", port=1521, - dbname="mydata", user="oracle_user") - - POSTGRESDB = etl.DbParams(dbtype='PG', host="localhost", port=5432, - dbname="mydata", user="postgres_user") - - SQLITEDB = etl.DbParams(dbtype='SQLITE', filename='/path/to/file.db') - - MSSQLDB = etl.DbParams(dbtype='MSSQL', host="localhost", port=1433, - dbname="mydata", user="mssql_user", - odbc_driver="ODBC Driver 17 for SQL Server") - +.. literalinclude:: code_demos/connecting_to_databases/db_params.py + :language: python :class:`DbParams ` objects can also be created from environment variables, using the :func:`from_environment() ` function. @@ -84,10 +70,8 @@ Using context-manager syntax as below ensures that the connection is closed afte A standalone :func:`etlhelper.connect() ` function provides backwards-compatibility with previous releases of ``etlhelper``: -.. code:: python - - import etlhelper as etl - oracle_conn = etl.connect(ORACLEDB, 'ORACLE_PASSWORD') +.. literalinclude:: code_demos/connecting_to_databases/standalone_connect.py + :language: python Both versions accept additional keyword arguments that are passed to the `DB API 2.0-compatible connect function `_ @@ -118,10 +102,8 @@ Environment variables can be set on the command line via: Or in a Python terminal via: -.. code:: python - - import os - os.environ['ORACLE_PASSWORD'] = 'some-secret-password' +.. literalinclude:: code_demos/connecting_to_databases/oracle_env.py + :language: python No password is required for SQLite databases. @@ -142,20 +124,8 @@ data transfer. However, it is not suitable for LOBs larger than 1 Gb. To return CLOB and BLOB columns as LOBs, configure the driver as follows: -.. code:: python - - import etlhelper as etl - import oracledb - - select_sql = "SELECT my_clob, my_blob FROM my_table" - - with ORACLEDB.connect("ORA_PASSWORD") as conn: - # By default, ETL Helper returns native types - result_as_native = etl.fetchall(select_sql, conn) - - # Update oracledb settings to return LOBs - oracledb.defaults.fetch_lobs = True - result_as_lobs = etl.fetchall(select_sql, conn) +.. literalinclude:: code_demos/connecting_to_databases/oracle_lobs.py + :language: python See the `oracledb docs `__ diff --git a/docs/demo_namedtuple.py b/docs/demo_namedtuple.py deleted file mode 100644 index cef0748..0000000 --- a/docs/demo_namedtuple.py +++ /dev/null @@ -1,11 +0,0 @@ -"""Extract script using namedtuple_row_factory""" -import sqlite3 -import etlhelper as etl -from etlhelper.row_factories import namedtuple_row_factory - -with sqlite3.connect("igneous_rocks.db") as conn: - row = etl.fetchone('SELECT * FROM igneous_rock', conn, - row_factory=namedtuple_row_factory) - -print(row) -print(row.name) diff --git a/docs/etl_functions/copy.rst b/docs/etl_functions/copy.rst index e63f00f..d0faebe 100644 --- a/docs/etl_functions/copy.rst +++ b/docs/etl_functions/copy.rst @@ -12,14 +12,8 @@ to copy all the data from one table to another. It can take a ``transform`` function in case some modification of the data, e.g. change of case of column names, is required. -.. code:: python - - import etlhelper as etl - from my_databases import POSTGRESDB, ORACLEDB - - with ORACLEDB.connect("ORA_PASSWORD") as src_conn: - with POSTGRESDB.connect("PG_PASSWORD") as dest_conn: - etl.copy_table_rows('my_table', src_conn, dest_conn) +.. literalinclude:: ../code_demos/copy/demo_copy_table_rows.py + :language: python The ``chunk_size``, ``commit_chunks`` and ``on_error`` parameters can all be set. A tuple with counts of rows processed and failed is @@ -35,20 +29,8 @@ Combining ``iter_rows`` with ``load`` For extra control selecting the data to be transferred, ``iter_rows`` can be combined with ``load``. -.. code:: python - - import etlhelper as etl - from my_databases import POSTGRESDB, ORACLEDB - - select_sql = """ - SELECT id, name, value FROM my_table - WHERE value > :min_value - """ - - with ORACLEDB.connect("ORA_PASSWORD") as src_conn: - with POSTGRESDB.connect("PG_PASSWORD") as dest_conn: - rows = etl.iter_rows(select_sql, src_conn, parameters={'min_value': 99}) - etl.load('my_table', dest_conn, rows) +.. literalinclude:: ../code_demos/copy/demo_copy_iter_rows.py + :language: python copy_rows --------- @@ -61,40 +43,8 @@ The source and destination tables must already exist. For example, here we use GROUP BY and WHERE in the SELECT query and insert extra auto-generated values via the INSERT query. -.. code:: python - - import etlhelper as etl - from my_databases import POSTGRESDB, ORACLEDB - - select_sql = """ - SELECT - customer_id, - SUM (amount) AS total_amount - FROM payment - WHERE id > 1000 - GROUP BY customer_id - """ - - # This insert query uses positional parameters, so a namedtuple_row_factory - # is used. - insert_sql = """ - INSERT INTO dest ( - customer_id, - total_amount, - loaded_by, - load_time) - VALUES ( - %s, - %s, - current_user, - now() - ) - """ - - with ORACLEDB.connect("ORA_PASSWORD") as src_conn: - with POSTGRESDB.connect("PG_PASSWORD") as dest_conn: - copy_rows(select_sql, src_conn, insert_sql, dest_conn, - row_factory=namedtuple_row_factory) +.. literalinclude:: ../code_demos/copy/demo_copy_rows.py + :language: python ``parameters`` can be passed to the SELECT query as before and the ``commit_chunks``, ``chunk_size`` and ``on_error`` options can be set. diff --git a/docs/etl_functions/error_handling.rst b/docs/etl_functions/error_handling.rst index dcb733b..36dabc5 100644 --- a/docs/etl_functions/error_handling.rst +++ b/docs/etl_functions/error_handling.rst @@ -18,7 +18,7 @@ the :class:`ETLHelperQueryError `, classes print the SQL query and the required paramstyle as well as the error message returned by the database. -.. literalinclude:: ../demo_error.py +.. literalinclude:: ../code_demos/error_handling/demo_extract_error.py :language: python The output is: @@ -53,28 +53,15 @@ all the errors into a list to process at the end. .. code:: python errors = [] - executemany(sql, conn, rows, on_error=errors.extend) + etl.executemany(sql, conn, rows, on_error=errors.extend) if errors: do_something() Errors can be logged to the ``etlhelper`` logger. -.. code:: python - - import logging - - import etlhelper as etl - - etl.log_to_console() - logger = logging.getLogger("etlhelper") - - - def log_errors(failed_rows): - for row, exception in failed_rows: - logger.error(exception) - - executemany(sql, conn, rows, on_error=log_errors) +.. literalinclude:: ../code_demos/error_handling/demo_log_error.py + :language: python The IDs of failed rows can be written to a file. @@ -85,7 +72,7 @@ The IDs of failed rows can be written to a file. for row, exception in failed_rows: out_file.write(f"{row.id}\n") - executemany(sql, conn, rows, on_error=write_bad_ids) + etl.executemany(sql, conn, rows, on_error=write_bad_ids) ``executemany``, ``load``, ``copy_rows`` and ``copy_table_rows`` can all take an ``on_error`` parameter. They each return a tuple containing the @@ -110,7 +97,7 @@ The following example for SQLite will ignore duplicate rows. Different databases have different syntax and capabilities, including ``upsert`` and ``merge``. -.. literalinclude:: ../demo_on_conflict.py +.. literalinclude:: ../code_demos/error_handling/demo_sql_conflict_error.py :language: python The output is: diff --git a/docs/etl_functions/extract.rst b/docs/etl_functions/extract.rst index 4834c59..fe3eb88 100644 --- a/docs/etl_functions/extract.rst +++ b/docs/etl_functions/extract.rst @@ -13,17 +13,8 @@ The ``fetch*`` functions *return* results once they have finished with the datab - :func:`fetchall() `: returns all results as a list. This function returns once all rows have been fetched into memory. -.. code:: python - - import sqlite3 - import etlhelper as etl - - with sqlite3.connect('igneous_rocks.db') as conn: - first_row = etl.fetchone('SELECT * FROM igneous_rock', conn) - all_rows = etl.fetchall('SELECT * FROM igneous_rock', conn): - - print(first_row) - print(all_rows) +.. literalinclude:: ../code_demos/extract/demo_fetch.py + :language: python returns @@ -48,14 +39,8 @@ The database connection must remain open until all results have been processed. The following is an example of :func:`iter_rows() `: -.. code:: python - - import sqlite3 - import etlhelper as etl - - with sqlite3.connect('igneous_rocks.db') as conn: - for row in etl.iter_rows('SELECT * FROM igneous_rock', conn) - print(row) +.. literalinclude:: ../code_demos/extract/demo_iter.py + :language: python returns @@ -114,7 +99,7 @@ Row factories control the output format of returned rows. The default row factory for ETL Helper is a dictionary, but this can be changed with the ``row_factory`` argument. -.. literalinclude:: ../demo_namedtuple.py +.. literalinclude:: ../code_demos/extract/demo_row_factory.py :language: python The output is: diff --git a/docs/etl_functions/load.rst b/docs/etl_functions/load.rst index f88b783..362df95 100644 --- a/docs/etl_functions/load.rst +++ b/docs/etl_functions/load.rst @@ -10,19 +10,8 @@ ETL Helper provides two functions for loading of data. - :func:`load() `: Inserts data from an iterable of dictionaries or namedtuples into an existing target table. -.. code:: python - - import sqlite3 - import etlhelper as etl - - rows = [ - {"name": "basalt", "grain_size": "fine"}, - {"name": "granite", "grain_size": "coarse"} - ] - - with sqlite3.connect('igneous_rocks.db') as conn: - # Note that table must already exist - processed, failed = etl.load('igneous_rock', conn, rows) +.. literalinclude:: ../code_demos/load/demo_load_min.py + :language: python NOTE: the ``load`` function uses the first row of data to generate the list of column for the insert query. If later items in the data contain @@ -32,45 +21,14 @@ raised. - :func:`executemany() `: Applies SQL query with parameters supplied by iterable of data. Customising the SQL query allows fine control. -.. code:: python - - import sqlite3 - import etlhelper as etl - - rows = [ - {"name": "basalt", "grain_size": "fine"}, - {"name": "granite", "grain_size": "coarse"} - ] - - # Insert query changes case and adds update_at column - insert_sql = """ - INSERT INTO igneous_rocks (name, grain_size, updated_at) - VALUES (:name, UPPER(:grainsize), datetime('now')) - """ - - with sqlite3.connect('igneous_rocks.db') as conn: - # Note that table must already exist - processed, failed = etl.executemany(insert_sql, conn, rows) +.. literalinclude:: ../code_demos/load/demo_executemany_named.py + :language: python The INSERT query must container placeholders with an appropriate format for the input data e.g. positional for tuples, named for dictionaries. -.. code:: python - - from etlhelper import executemany - - rows = [("basalt", "fine"), ("granite", "coarse")] - - # Positional placeholders for data in tuple format - insert_sql = """ - INSERT INTO igneous_rocks (name, grain_size, updated_at) - VALUES (?, UPPER(?), datetime('now')) - """ - - with sqlite3.connect('igneous_rocks.db') as conn: - # Note that table must already exist - processed, failed = etl.executemany(insert_sql, conn, rows) - +.. literalinclude:: ../code_demos/load/demo_executemany_positional.py + :language: python :func:`executemany() ` can also be used with UPDATE commands. diff --git a/docs/etl_functions/transform.rst b/docs/etl_functions/transform.rst index 4f7ed9a..6a71152 100644 --- a/docs/etl_functions/transform.rst +++ b/docs/etl_functions/transform.rst @@ -21,25 +21,8 @@ factories e.g., ``dict_row_factory`` in-place. The ``yield`` keyword makes ``my_transform`` a generator function that returns an ``Iterator`` that can loop over the rows. -.. code:: python - - from typing import Iterator - import etlhelper as etl - from etlhelper.row_factories import dict_row_factory - - - def my_transform(chunk: Iterator[dict]) -> Iterator[dict]: - # Add prefix to id, remove newlines, set lower case email addresses - - for row in chunk: # each row is a dictionary (mutable) - row['id'] += 1000 - row['description'] = row['description'].replace('\n', ' ') - row['email'] = row['email'].lower() - yield row - - - etl.fetchall(select_sql, src_conn, row_factory=dict_row_factory, - transform=my_transform) +.. literalinclude:: ../code_demos/transform/demo_yield.py + :language: python It is also possible to assemble the complete transformed chunk and return it. @@ -48,28 +31,8 @@ different number of rows, and be of different length, to the input. Because ``namedtuple``\ s are immutable, we have to create a ``new_row`` from each input ``row``. -.. code:: python - - import random - from typing import Iterator - import etlhelper as etl - from etlhelper.row_factories import namedtuple_row_factory - - - def my_transform(chunk: Iterator[tuple]) -> list[tuple]: - # Append random integer (1-10), filter if <5. - - new_chunk = [] - for row in chunk: # each row is a namedtuple (immutable) - extra_value = random.randrange(10) - if extra_value >= 5: # some rows are dropped - new_row = (*row, extra_value) # new rows have extra column - new_chunk.append(new_row) - - return new_chunk - - etl.fetchall(select_sql, src_conn, row_factory=namedtuple_row_factory, - transform=my_transform) +.. literalinclude:: ../code_demos/transform/demo_return.py + :language: python Any Python code can be used within the function and extra data can result from a calculation, a call to a webservice or a query against diff --git a/docs/index.rst b/docs/index.rst index 789e9e8..534ee1d 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -83,7 +83,7 @@ Loading data The following script uses the ``execute``, ``load`` and ``fetchall`` functions to create a database table and populate it with data. -.. literalinclude:: demo_load.py +.. literalinclude:: code_demos/load/demo_load_full.py :language: python The output is: @@ -98,7 +98,7 @@ Copying data This script copies data to another database, with transformation and logging. -.. literalinclude:: demo_copy.py +.. literalinclude:: code_demos/copy/demo_copy_full.py :language: python The output is: diff --git a/docs/recipes/apache_airflow_integration.rst b/docs/recipes/apache_airflow_integration.rst index 016898f..1a948f7 100644 --- a/docs/recipes/apache_airflow_integration.rst +++ b/docs/recipes/apache_airflow_integration.rst @@ -4,33 +4,10 @@ Calling ETL Helper scripts from Apache Airflow The following is an `Apache Airflow DAG `__ that uses -the ``copy_readings`` function defined in the script above. The Airflow +the ``copy_readings`` function defined in the `Database to database +`__ script. The Airflow scheduler will create tasks for each day since 1 August 2019 and call ``copy_readings`` with the appropriate start and end times. -.. code:: python - - # readings_dag.py - - import datetime as dt - from airflow import DAG - from airflow.operators.python_operator import PythonOperator - import copy_readings - - - def copy_readings_with_args(**kwargs): - # Set arguments for copy_readings from context - start = kwargs.get('prev_execution_date') - end = kwargs.get('execution_date') - copy_readings.copy_readings(start, end) - - dag = DAG('readings', - schedule_interval=dt.timedelta(days=1), - start_date=dt.datetime(2019, 8, 1), - catchup=True) - - t1 = PythonOperator( - task_id='copy_readings', - python_callable=copy_readings_with_args, - provide_context=True, - dag=dag) \ No newline at end of file +.. literalinclude:: ../code_demos/recipes/apache_airflow_integration.py + :language: python diff --git a/docs/recipes/csv_files.rst b/docs/recipes/csv_files.rst index 6ae7efb..45f784b 100644 --- a/docs/recipes/csv_files.rst +++ b/docs/recipes/csv_files.rst @@ -10,72 +10,8 @@ has a ``CHECK`` constraint that rejects any rows with an ID divisible by 1000. An example ``on_error`` function prints the IDs of rows that fail to insert. -.. code:: python - - """ - Script to create database and load observations data from csv file. It also - demonstrates how an `on_error` function can handle failed rows. - - Generate observations.csv with: - curl 'https://sensors.bgs.ac.uk/FROST-Server/v1.1/Observations?$select=@iot.id,result,phenomenonTime&$top=20000&$resultFormat=csv' -o observations.csv - """ - import csv - import datetime as dt - from typing import Iterable, List, Tuple - - from etlhelper import execute, load, DbParams - - - def load_observations(csv_file, conn): - """Load observations from csv_file to db_file.""" - # Drop table (helps with repeated test runs!) - drop_table_sql = """ - DROP TABLE IF EXISTS observations - """ - execute(drop_table_sql, conn) - - # Create table (reject ids with no remainder when divided by 1000) - create_table_sql = """ - CREATE TABLE IF NOT EXISTS observations ( - id INTEGER PRIMARY KEY CHECK (id % 1000), - time TIMESTAMP, - result FLOAT - )""" - execute(create_table_sql, conn) - - # Load data - with open(csv_file, 'rt') as f: - reader = csv.DictReader(f) - load('observations', conn, reader, transform=transform, on_error=on_error) - - - # A transform function that takes an iterable of rows and returns an iterable - # of rows. - def transform(rows: Iterable[dict]) -> Iterable[dict]: - """Rename time column and convert to Python datetime.""" - for row in rows: - # Dictionaries are mutable, so rows can be modified in place. - time_value = row.pop('phenomenonTime') - row['time'] = dt.datetime.strptime(time_value, "%Y-%m-%dT%H:%M:%S.%fZ") - - return rows - - - # The on_error function is called after each chunk with all the failed rows - def on_error(failed_rows: List[Tuple[dict, Exception]]) -> None: - """Print the IDs of failed rows""" - rows, exceptions = zip(*failed_rows) - failed_ids = [row['id'] for row in rows] - print(f"Failed IDs: {failed_ids}") - - - if __name__ == "__main__": - from etlhelper import log_to_console - log_to_console() - - db = DbParams(dbtype="SQLITE", filename="observations.sqlite") - with db.connect() as conn: - load_observations('observations.csv', conn) +.. literalinclude:: ../code_demos/recipes/csv_files.py + :language: python Export data to CSV ^^^^^^^^^^^^^^^^^^ @@ -86,15 +22,5 @@ library can connect to databases via SQLAlchemy. It has powerful tools for manipulating tabular data. ETL Helper makes it easy to prepare the SQL Alchemy connection. -.. code:: python - - import pandas as pd - from sqlalchemy import create_engine - - from my_databases import ORACLEDB - - engine = create_engine(ORACLEDB.get_sqlalchemy_connection_string("ORACLE_PASSWORD")) - - sql = "SELECT * FROM my_table" - df = pd.read_sql(sql, engine) - df.to_csv('my_data.csv', header=True, index=False, float_format='%.3f') +.. literalinclude:: ../code_demos/recipes/csv_files_pandas.py + :language: python diff --git a/docs/recipes/database_to_api.rst b/docs/recipes/database_to_api.rst index 7111278..903341b 100644 --- a/docs/recipes/database_to_api.rst +++ b/docs/recipes/database_to_api.rst @@ -18,113 +18,8 @@ asynchronously. The API is often the bottleneck in such pipelines and we have seen significant speed increases (e.g. 10x) using asynchronous transfer as opposed to posting records in series. -.. code:: python - - # copy_sensors_async.py - import asyncio - import datetime as dt - import json - import logging - - import aiohttp - from etlhelper import iter_chunks - - from db import ORACLE_DB - - logger = logging.getLogger("copy_sensors_async") - - SELECT_SENSORS = """ - SELECT CODE, DESCRIPTION - FROM BGS.DIC_SEN_SENSOR - WHERE date_updated BETWEEN :startdate AND :enddate - ORDER BY date_updated - """ - BASE_URL = "http://localhost:9200/" - HEADERS = {'Content-Type': 'application/json'} - - - def copy_sensors(startdate, enddate): - """Read sensors from Oracle and post to REST API.""" - logger.info("Copying sensors with timestamps from %s to %s", - startdate.isoformat(), enddate.isoformat()) - row_count = 0 - - with ORACLE_DB.connect('ORACLE_PASSWORD') as conn: - # chunks is a generator that yields lists of dictionaries - chunks = iter_chunks(SELECT_SENSORS, conn, - parameters={"startdate": startdate, - "enddate": enddate}, - transform=transform_sensors) - - for chunk in chunks: - result = asyncio.run(post_chunk(chunk)) - row_count += len(result) - logger.info("%s items transferred", row_count) - - logger.info("Transfer complete") - - - def transform_sensors(chunk): - """Transform rows to dictionaries suitable for converting to JSON.""" - new_chunk = [] - - for row in chunk: - new_row = { - 'sample_code': row.CODE, - 'description': row.DESCRIPTION, - 'metadata': { - 'source': 'ORACLE_DB', # fixed value - 'transferred_at': dt.datetime.now().isoformat() # dynamic value - } - } - logger.debug(new_row) - new_chunk.append(new_row) - - return new_chunk - - - async def post_chunk(chunk): - """Post multiple items to API asynchronously.""" - async with aiohttp.ClientSession() as session: - # Build list of tasks - tasks = [] - for item in chunk: - tasks.append(post_one(item, session)) - - # Process tasks in parallel. An exception in any will be raised. - result = await asyncio.gather(*tasks) - - return result - - - async def post_one(item, session): - """Post a single item to API using existing aiohttp Session.""" - # Post the item - response = await session.post(BASE_URL + 'sensors/_doc', headers=HEADERS, - data=json.dumps(item)) - - # Log responses before throwing errors because error info is not included - # in generated Exceptions and so cannot otherwise be seen for debugging. - if response.status >= 400: - response_text = await response.text() - logger.error('The following item failed: %s\nError message:\n(%s)', - item, response_text) - await response.raise_for_status() - - return response.status - - - if __name__ == "__main__": - # Configure logging - handler = logging.StreamHandler() - formatter = logging.Formatter('%(asctime)s %(levelname)-8s %(message)s') - handler.setFormatter(formatter) - logger.setLevel(logging.INFO) - logger.addHandler(handler) - - # Copy data from 1 January 2000 to 00:00:00 today - today = dt.datetime.combine(dt.date.today(), dt.time.min) - copy_sensors(dt.datetime(2000, 1, 1), today) +.. literalinclude:: ../code_demos/recipes/database_to_api.py + :language: python In this example, failed rows will fail the whole job. Removing the ``raise_for_status()`` call will let them just be logged instead. diff --git a/docs/recipes/database_to_database.rst b/docs/recipes/database_to_database.rst index 397ce7b..b2fc46f 100644 --- a/docs/recipes/database_to_database.rst +++ b/docs/recipes/database_to_database.rst @@ -5,61 +5,8 @@ The following is a template for an ETL script. It copies copy all the sensor readings from the previous day from an Oracle source to PostgreSQL destination. -.. code:: python - - # copy_readings.py - - import datetime as dt - from etl_helper import copy_rows - from my_databases import ORACLEDB, POSTGRESDB - - CREATE_SQL = dedent(""" - CREATE TABLE IF NOT EXISTS sensordata.readings - ( - sensor_data_id bigint PRIMARY KEY, - measure_id bigint, - time_stamp timestamp without time zone, - meas_value double precision - ) - """).strip() - - DELETE_SQL = dedent(""" - DELETE FROM sensordata.readings - WHERE time_stamp BETWEEN %(startdate)s AND %(enddate)s - """).strip() - - SELECT_SQL = dedent(""" - SELECT id, measure_id, time_stamp, reading - FROM sensor_data - WHERE time_stamp BETWEEN :startdate AND :enddate - ORDER BY time_stamp - """).strip() - - INSERT_SQL = dedent(""" - INSERT INTO sensordata.readings (sensor_data_id, measure_id, time_stamp, - meas_value) - VALUES (%s, %s, %s, %s) - """).strip() - - - def copy_readings(startdate, enddate): - params = {'startdate': startdate, 'enddate': enddate} - - with ORACLEDB.connect("ORA_PASSWORD") as src_conn: - with POSTGRESDB.connect("PG_PASSWORD") as dest_conn: - execute(CREATE_SQL dest_conn) - execute(DELETE_SQL, dest_conn, parameters=params) - copy_rows(SELECT_SQL, src_conn, - INSERT_SQL, dest_conn, - parameters=params) - - - if __name__ == "__main__": - # Copy data from 00:00:00 yesterday to 00:00:00 today - today = dt.combine(dt.date.today(), dt.time.min) - yesterday = today - dt.timedelta(1) - - copy_readings(yesterday, today) +.. literalinclude:: ../code_demos/recipes/database_to_database.py + :language: python It is valuable to create `idempotent `__ diff --git a/docs/utilities.rst b/docs/utilities.rst index d067a76..3e60856 100644 --- a/docs/utilities.rst +++ b/docs/utilities.rst @@ -16,17 +16,8 @@ Some database engines can return autogenerated values (e.g. primary key IDs) after INSERT statements. To capture these values, use the ``fetchone`` method to execute the SQL command instead. -.. code:: python - - import etlhelper as etl - - insert_sql = "INSERT INTO my_table (message) VALUES ('hello') RETURNING id" - - with POSTGRESDB.connect('PGPASSWORD') as conn: - result = etl.fetchone(insert_sql, conn) - - print(result['id']) - +.. literalinclude:: code_demos/utilities/return_autogenerated.py + :language: python Log to console ^^^^^^^^^^^^^^ @@ -37,11 +28,8 @@ log level to ``INFO``. Setting the level to ``DEBUG`` provides information on the query that was run, example data and the database connection. To enable the logger, use: -.. code:: python - - import etlhelper as etl - - etl.log_to_console() +.. literalinclude:: code_demos/utilities/enable_logger.py + :language: python Output from a call to ``copy_rows`` will look like: @@ -59,16 +47,8 @@ credentials in clear text. To use the etlhelper logger directly, access it via: -.. code:: python - - import logging - - import etlhelper as etl - - etl.log_to_console() - etl_logger = logging.getLogger("etlhelper") - etl_logger.info("Hello world!") - +.. literalinclude:: code_demos/utilities/use_logger.py + :language: python Table info ^^^^^^^^^^ @@ -77,12 +57,8 @@ The ``table_info`` function provides basic metadata for a table. An optional schema can be used. Note that for ``sqlite`` the schema value is currently ignored. -.. code:: python - - from etlhelper.utils import table_info - - with ORACLEDB.connect("ORA_PASSWORD") as conn: - columns = table_info('my_table', conn, schema='my_schema') +.. literalinclude:: code_demos/utilities/table_info.py + :language: python The returned value is a list of named tuples of four values. Each tuple represents one column in the table, giving its name, type, if it has a