Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update docs code demos #201

Merged
merged 4 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ max-line-length = 120
extend-ignore =
# Do not require self and cls to have type hints
# See: https://github.com/sco1/flake8-annotations/issues/75
ANN101, ANN102
ANN101, ANN102,
# Do not require **kwargs to have type hints
ANN003
per-file-ignores =
# Do not check annotations in these files
abort.py: A,
Expand All @@ -14,3 +16,4 @@ per-file-ignores =
utils.py: A,
etlhelper/db_helpers/*: A,
test/**: A
docs/**: A
2 changes: 1 addition & 1 deletion bin/run_tests_for_developer.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#! /bin/sh
echo "Flake8 checks"
flake8 etlhelper test || exit 1
flake8 etlhelper test docs/code_demos || exit 1

echo "Building container"
docker build \
Expand Down
32 changes: 32 additions & 0 deletions docs/code_demos/connecting_to_databases/db_params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""ETL Helper script to demonstrate DbParams."""
import etlhelper as etl

ORACLEDB = etl.DbParams(
dbtype="ORACLE",
host="localhost",
port=1521,
dbname="mydata",
user="oracle_user",
)

POSTGRESDB = etl.DbParams(
dbtype="PG",
host="localhost",
port=5432,
dbname="mydata",
user="postgres_user",
)

SQLITEDB = etl.DbParams(
dbtype="SQLITE",
filename="/path/to/file.db",
)

MSSQLDB = etl.DbParams(
dbtype="MSSQL",
host="localhost",
port=1433,
dbname="mydata",
user="mssql_user",
odbc_driver="ODBC Driver 17 for SQL Server",
)
2 changes: 2 additions & 0 deletions docs/code_demos/connecting_to_databases/oracle_env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
import os
os.environ["ORACLE_PASSWORD"] = "some-secret-password"
14 changes: 14 additions & 0 deletions docs/code_demos/connecting_to_databases/oracle_lobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""ETL Helper script to demonstrate oracle handling Large Objects (LOBs)."""
import etlhelper as etl
import oracledb
from my_databases import ORACLEDB

select_sql = "SELECT my_clob, my_blob FROM my_table"

with ORACLEDB.connect("ORA_PASSWORD") as conn:
# By default, ETL Helper returns native types
result_as_native = etl.fetchall(select_sql, conn)

# Update oracledb settings to return LOBs
oracledb.defaults.fetch_lobs = True
result_as_lobs = etl.fetchall(select_sql, conn)
3 changes: 3 additions & 0 deletions docs/code_demos/connecting_to_databases/standalone_connect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import etlhelper as etl
from my_databases import ORACLEDB
oracle_conn = etl.connect(ORACLEDB, "ORACLE_PASSWORD")
18 changes: 9 additions & 9 deletions docs/demo_copy.py → docs/code_demos/copy/demo_copy_full.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""ETL Helper script to copy records between databases."""
import datetime as dt
import sqlite3
import datetime as dt
from typing import Generator
import etlhelper as etl

igneous_db_file = "igneous_rocks.db"
Expand All @@ -12,16 +13,15 @@
name TEXT UNIQUE,
category TEXT,
last_update DATETIME
)"""


)
"""
select_sql = "SELECT name FROM igneous_rock"


def transform(chunk):
def transform(chunk: list[dict]) -> Generator[dict, None, None]:
for row in chunk:
row['category'] = 'igneous'
row['last_update'] = dt.datetime.now()
row["category"] = "igneous"
row["last_update"] = dt.datetime.now()
yield row


Expand All @@ -34,8 +34,8 @@ def transform(chunk):

# Copy data
rows = etl.iter_rows(select_sql, src, transform=transform)
etl.load('rock', dest, rows)
etl.load("rock", dest, rows)

# Confirm transfer
for row in etl.fetchall('SELECT * FROM rock', dest):
for row in etl.fetchall("SELECT * FROM rock", dest):
print(row)
13 changes: 13 additions & 0 deletions docs/code_demos/copy/demo_copy_iter_rows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""ETL Helper script to demonstrate copying records between databases with iter_rows and load."""
import etlhelper as etl
from my_databases import POSTGRESDB, ORACLEDB

select_sql = """
SELECT id, name, value FROM my_table
WHERE value > :min_value
"""

with ORACLEDB.connect("ORA_PASSWORD") as src_conn:
with POSTGRESDB.connect("PG_PASSWORD") as dest_conn:
rows = etl.iter_rows(select_sql, src_conn, parameters={"min_value": 99})
etl.load("my_table", dest_conn, rows)
40 changes: 40 additions & 0 deletions docs/code_demos/copy/demo_copy_rows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""ETL Helper script to demonstrate copy_rows."""
import etlhelper as etl
from etlhelper.row_factories import namedtuple_row_factory
from my_databases import POSTGRESDB, ORACLEDB

select_sql = """
SELECT
customer_id,
SUM (amount) AS total_amount
FROM payment
WHERE id > 1000
GROUP BY customer_id
"""

# This insert query uses positional parameters, so a namedtuple_row_factory
# is used.
insert_sql = """
INSERT INTO dest (
customer_id,
total_amount,
loaded_by,
load_time
)
VALUES (
%s,
%s,
current_user,
now()
)
"""

with ORACLEDB.connect("ORA_PASSWORD") as src_conn:
with POSTGRESDB.connect("PG_PASSWORD") as dest_conn:
etl.copy_rows(
select_sql,
src_conn,
insert_sql,
dest_conn,
row_factory=namedtuple_row_factory,
)
7 changes: 7 additions & 0 deletions docs/code_demos/copy/demo_copy_table_rows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""ETL Helper script to demonstrate copy_table_rows."""
import etlhelper as etl
from my_databases import POSTGRESDB, ORACLEDB

with ORACLEDB.connect("ORA_PASSWORD") as src_conn:
with POSTGRESDB.connect("PG_PASSWORD") as dest_conn:
etl.copy_table_rows("my_table", src_conn, dest_conn)
File renamed without changes.
23 changes: 23 additions & 0 deletions docs/code_demos/error_handling/demo_log_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""ETL Helper script to demonstrate logging errors."""
import logging
import sqlite3
import etlhelper as etl

etl.log_to_console()
logger = logging.getLogger("etlhelper")

db_file = "igneous_rocks.db"

rows = [
{"name": "basalt", "grain_size": "fine"},
{"name": "granite", "grain_size": "coarse"}
]


def log_errors(failed_rows: list[tuple[dict, Exception]]) -> None:
for row, exception in failed_rows:
logger.error(exception)


with sqlite3.connect(db_file) as conn:
etl.load("igneous_rock", conn, rows, on_error=log_errors)
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import etlhelper as etl

db_file = "igneous_rocks.db"

create_sql = """
CREATE TABLE IF NOT EXISTS igneous_rock (
id INTEGER PRIMARY KEY,
Expand All @@ -29,5 +30,5 @@
etl.executemany(insert_sql, conn, rows=igneous_rocks)

# Confirm selection
for row in etl.fetchall('SELECT * FROM igneous_rock', conn):
for row in etl.fetchall("SELECT * FROM igneous_rock", conn):
print(row)
12 changes: 12 additions & 0 deletions docs/code_demos/extract/demo_fetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""ETL Helper script to demonstrate using fetch functions."""
import sqlite3
import etlhelper as etl

db_file = "igneous_rocks.db"

with sqlite3.connect(db_file) as conn:
first_row = etl.fetchone("SELECT * FROM igneous_rock", conn)
all_rows = etl.fetchall("SELECT * FROM igneous_rock", conn)

print(first_row)
print(all_rows)
9 changes: 9 additions & 0 deletions docs/code_demos/extract/demo_iter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""ETL Helper script to demonstrate iter_rows."""
import sqlite3
import etlhelper as etl

db_file = "igneous_rocks.db"

with sqlite3.connect(db_file) as conn:
for row in etl.iter_rows("SELECT * FROM igneous_rock", conn):
print(row)
16 changes: 16 additions & 0 deletions docs/code_demos/extract/demo_row_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""ETL Helper script to demonstrate using fetch functions with a given row factory."""
import sqlite3
import etlhelper as etl
from etlhelper.row_factories import namedtuple_row_factory

db_file = "igneous_rocks.db"

with sqlite3.connect(db_file) as conn:
row = etl.fetchone(
"SELECT * FROM igneous_rock",
conn,
row_factory=namedtuple_row_factory,
)

print(row)
print(row.name)
20 changes: 20 additions & 0 deletions docs/code_demos/load/demo_executemany_named.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""ETL Helper script to demonstrate using executemany with a named placeholder query."""
import sqlite3
import etlhelper as etl

db_file = "igneous_rocks.db"

# Insert query changes case and adds update_at column
insert_sql = """
INSERT INTO igneous_rocks (name, grain_size, updated_at)
VALUES (:name, UPPER(:grainsize), datetime('now'))
"""

rows = [
{"name": "basalt", "grain_size": "fine"},
{"name": "granite", "grain_size": "coarse"}
]

with sqlite3.connect(db_file) as conn:
# Note that table must already exist
processed, failed = etl.executemany(insert_sql, conn, rows)
17 changes: 17 additions & 0 deletions docs/code_demos/load/demo_executemany_positional.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""ETL Helper script to demonstrate using executemany with a positional placeholder query."""
import sqlite3
import etlhelper as etl

db_file = "igneous_rocks.db"

# Positional placeholders for data in tuple format
insert_sql = """
INSERT INTO igneous_rocks (name, grain_size, updated_at)
VALUES (?, UPPER(?), datetime('now'))
"""

rows = [("basalt", "fine"), ("granite", "coarse")]

with sqlite3.connect(db_file) as conn:
# Note that table must already exist
processed, failed = etl.executemany(insert_sql, conn, rows)
8 changes: 5 additions & 3 deletions docs/demo_load.py → docs/code_demos/load/demo_load_full.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import etlhelper as etl

db_file = "igneous_rocks.db"

create_sql = """
CREATE TABLE IF NOT EXISTS igneous_rock (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE,
grain_size TEXT
)"""
)
"""

igneous_rocks = [
{"name": "basalt", "grain_size": "fine"},
Expand All @@ -21,8 +23,8 @@
etl.execute(create_sql, conn)

# Insert rows
etl.load('igneous_rock', conn, igneous_rocks)
etl.load("igneous_rock", conn, igneous_rocks)

# Confirm selection
for row in etl.fetchall('SELECT * FROM igneous_rock', conn):
for row in etl.fetchall("SELECT * FROM igneous_rock", conn):
print(row)
14 changes: 14 additions & 0 deletions docs/code_demos/load/demo_load_min.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""ETL Helper script to demonstrate load."""
import sqlite3
import etlhelper as etl

db_file = "igneous_rocks.db"

rows = [
{"name": "basalt", "grain_size": "fine"},
{"name": "granite", "grain_size": "coarse"}
]

with sqlite3.connect(db_file) as conn:
# Note that table must already exist
processed, failed = etl.load("igneous_rock", conn, rows)
27 changes: 27 additions & 0 deletions docs/code_demos/recipes/apache_airflow_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""ETL Helper script to demonstrate using Apache Airflow to schedule tasks."""
import datetime as dt
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from database_to_database import copy_readings


def copy_readings_with_args(**kwargs) -> None:
# Set arguments for copy_readings from context
start = kwargs.get("prev_execution_date")
end = kwargs.get("execution_date")
copy_readings(start, end)


dag = DAG(
"readings",
schedule_interval=dt.timedelta(days=1),
start_date=dt.datetime(2019, 8, 1),
catchup=True,
)

t1 = PythonOperator(
task_id="copy_readings",
python_callable=copy_readings_with_args,
provide_context=True,
dag=dag,
)
Loading
Loading