Skip to content

Commit

Permalink
Move all full coding examples into individual scripts and clean up th…
Browse files Browse the repository at this point in the history
…eir formatting
  • Loading branch information
leorudczenko committed May 22, 2024
1 parent 19e773e commit dc2111e
Show file tree
Hide file tree
Showing 42 changed files with 652 additions and 547 deletions.
32 changes: 32 additions & 0 deletions docs/code_demos/connecting_to_databases/db_params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""ETL Helper script to demonstrate DbParams."""
import etlhelper as etl

ORACLEDB = etl.DbParams(
dbtype="ORACLE",
host="localhost",
port=1521,
dbname="mydata",
user="oracle_user",
)

POSTGRESDB = etl.DbParams(
dbtype="PG",
host="localhost",
port=5432,
dbname="mydata",
user="postgres_user",
)

SQLITEDB = etl.DbParams(
dbtype="SQLITE",
filename="/path/to/file.db",
)

MSSQLDB = etl.DbParams(
dbtype="MSSQL",
host="localhost",
port=1433,
dbname="mydata",
user="mssql_user",
odbc_driver="ODBC Driver 17 for SQL Server",
)
2 changes: 2 additions & 0 deletions docs/code_demos/connecting_to_databases/oracle_env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
import os
os.environ["ORACLE_PASSWORD"] = "some-secret-password"
14 changes: 14 additions & 0 deletions docs/code_demos/connecting_to_databases/oracle_lobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""ETL Helper script to demonstrate oracle handling Large Objects (LOBs)."""
import etlhelper as etl
import oracledb
from my_databases import ORACLEDB

select_sql = "SELECT my_clob, my_blob FROM my_table"

with ORACLEDB.connect("ORA_PASSWORD") as conn:
# By default, ETL Helper returns native types
result_as_native = etl.fetchall(select_sql, conn)

# Update oracledb settings to return LOBs
oracledb.defaults.fetch_lobs = True
result_as_lobs = etl.fetchall(select_sql, conn)
3 changes: 3 additions & 0 deletions docs/code_demos/connecting_to_databases/standalone_connect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import etlhelper as etl
from my_databases import ORACLEDB
oracle_conn = etl.connect(ORACLEDB, "ORACLE_PASSWORD")
18 changes: 9 additions & 9 deletions docs/demo_copy.py → docs/code_demos/copy/demo_copy_full.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""ETL Helper script to copy records between databases."""
import datetime as dt
import sqlite3
import datetime as dt
from typing import Generator
import etlhelper as etl

igneous_db_file = "igneous_rocks.db"
Expand All @@ -12,16 +13,15 @@
name TEXT UNIQUE,
category TEXT,
last_update DATETIME
)"""


)
"""
select_sql = "SELECT name FROM igneous_rock"


def transform(chunk):
def transform(chunk: list[dict]) -> Generator[dict, None, None]:
for row in chunk:
row['category'] = 'igneous'
row['last_update'] = dt.datetime.now()
row["category"] = "igneous"
row["last_update"] = dt.datetime.now()
yield row


Expand All @@ -34,8 +34,8 @@ def transform(chunk):

# Copy data
rows = etl.iter_rows(select_sql, src, transform=transform)
etl.load('rock', dest, rows)
etl.load("rock", dest, rows)

# Confirm transfer
for row in etl.fetchall('SELECT * FROM rock', dest):
for row in etl.fetchall("SELECT * FROM rock", dest):
print(row)
13 changes: 13 additions & 0 deletions docs/code_demos/copy/demo_copy_iter_rows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""ETL Helper script to demonstrate copying records between databases with iter_rows and load."""
import etlhelper as etl
from my_databases import POSTGRESDB, ORACLEDB

select_sql = """
SELECT id, name, value FROM my_table
WHERE value > :min_value
"""

with ORACLEDB.connect("ORA_PASSWORD") as src_conn:
with POSTGRESDB.connect("PG_PASSWORD") as dest_conn:
rows = etl.iter_rows(select_sql, src_conn, parameters={"min_value": 99})
etl.load("my_table", dest_conn, rows)
40 changes: 40 additions & 0 deletions docs/code_demos/copy/demo_copy_rows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""ETL Helper script to demonstrate copy_rows."""
import etlhelper as etl
from etlhelper.row_factories import namedtuple_row_factory
from my_databases import POSTGRESDB, ORACLEDB

select_sql = """
SELECT
customer_id,
SUM (amount) AS total_amount
FROM payment
WHERE id > 1000
GROUP BY customer_id
"""

# This insert query uses positional parameters, so a namedtuple_row_factory
# is used.
insert_sql = """
INSERT INTO dest (
customer_id,
total_amount,
loaded_by,
load_time
)
VALUES (
%s,
%s,
current_user,
now()
)
"""

with ORACLEDB.connect("ORA_PASSWORD") as src_conn:
with POSTGRESDB.connect("PG_PASSWORD") as dest_conn:
etl.copy_rows(
select_sql,
src_conn,
insert_sql,
dest_conn,
row_factory=namedtuple_row_factory,
)
7 changes: 7 additions & 0 deletions docs/code_demos/copy/demo_copy_table_rows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""ETL Helper script to demonstrate copy_table_rows."""
import etlhelper as etl
from my_databases import POSTGRESDB, ORACLEDB

with ORACLEDB.connect("ORA_PASSWORD") as src_conn:
with POSTGRESDB.connect("PG_PASSWORD") as dest_conn:
etl.copy_table_rows("my_table", src_conn, dest_conn)
File renamed without changes.
23 changes: 23 additions & 0 deletions docs/code_demos/error_handling/demo_log_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""ETL Helper script to demonstrate logging errors."""
import logging
import sqlite3
import etlhelper as etl

etl.log_to_console()
logger = logging.getLogger("etlhelper")

db_file = "igneous_rocks.db"

rows = [
{"name": "basalt", "grain_size": "fine"},
{"name": "granite", "grain_size": "coarse"}
]


def log_errors(failed_rows: list[tuple[dict, Exception]]) -> None:
for row, exception in failed_rows:
logger.error(exception)


with sqlite3.connect(db_file) as conn:
etl.load("igneous_rock", conn, rows, on_error=log_errors)
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import etlhelper as etl

db_file = "igneous_rocks.db"

create_sql = """
CREATE TABLE IF NOT EXISTS igneous_rock (
id INTEGER PRIMARY KEY,
Expand All @@ -29,5 +30,5 @@
etl.executemany(insert_sql, conn, rows=igneous_rocks)

# Confirm selection
for row in etl.fetchall('SELECT * FROM igneous_rock', conn):
for row in etl.fetchall("SELECT * FROM igneous_rock", conn):
print(row)
12 changes: 12 additions & 0 deletions docs/code_demos/extract/demo_fetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""ETL Helper script to demonstrate using fetch functions."""
import sqlite3
import etlhelper as etl

db_file = "igneous_rocks.db"

with sqlite3.connect(db_file) as conn:
first_row = etl.fetchone("SELECT * FROM igneous_rock", conn)
all_rows = etl.fetchall("SELECT * FROM igneous_rock", conn)

print(first_row)
print(all_rows)
9 changes: 9 additions & 0 deletions docs/code_demos/extract/demo_iter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""ETL Helper script to demonstrate iter_rows."""
import sqlite3
import etlhelper as etl

db_file = "igneous_rocks.db"

with sqlite3.connect(db_file) as conn:
for row in etl.iter_rows("SELECT * FROM igneous_rock", conn):
print(row)
16 changes: 16 additions & 0 deletions docs/code_demos/extract/demo_row_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""ETL Helper script to demonstrate using fetch functions with a given row factory."""
import sqlite3
import etlhelper as etl
from etlhelper.row_factories import namedtuple_row_factory

db_file = "igneous_rocks.db"

with sqlite3.connect(db_file) as conn:
row = etl.fetchone(
"SELECT * FROM igneous_rock",
conn,
row_factory=namedtuple_row_factory,
)

print(row)
print(row.name)
20 changes: 20 additions & 0 deletions docs/code_demos/load/demo_executemany_named.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""ETL Helper script to demonstrate using executemany with a named placeholder query."""
import sqlite3
import etlhelper as etl

db_file = "igneous_rocks.db"

# Insert query changes case and adds update_at column
insert_sql = """
INSERT INTO igneous_rocks (name, grain_size, updated_at)
VALUES (:name, UPPER(:grainsize), datetime('now'))
"""

rows = [
{"name": "basalt", "grain_size": "fine"},
{"name": "granite", "grain_size": "coarse"}
]

with sqlite3.connect(db_file) as conn:
# Note that table must already exist
processed, failed = etl.executemany(insert_sql, conn, rows)
17 changes: 17 additions & 0 deletions docs/code_demos/load/demo_executemany_positional.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""ETL Helper script to demonstrate using executemany with a positional placeholder query."""
import sqlite3
import etlhelper as etl

db_file = "igneous_rocks.db"

# Positional placeholders for data in tuple format
insert_sql = """
INSERT INTO igneous_rocks (name, grain_size, updated_at)
VALUES (?, UPPER(?), datetime('now'))
"""

rows = [("basalt", "fine"), ("granite", "coarse")]

with sqlite3.connect(db_file) as conn:
# Note that table must already exist
processed, failed = etl.executemany(insert_sql, conn, rows)
8 changes: 5 additions & 3 deletions docs/demo_load.py → docs/code_demos/load/demo_load_full.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import etlhelper as etl

db_file = "igneous_rocks.db"

create_sql = """
CREATE TABLE IF NOT EXISTS igneous_rock (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE,
grain_size TEXT
)"""
)
"""

igneous_rocks = [
{"name": "basalt", "grain_size": "fine"},
Expand All @@ -21,8 +23,8 @@
etl.execute(create_sql, conn)

# Insert rows
etl.load('igneous_rock', conn, igneous_rocks)
etl.load("igneous_rock", conn, igneous_rocks)

# Confirm selection
for row in etl.fetchall('SELECT * FROM igneous_rock', conn):
for row in etl.fetchall("SELECT * FROM igneous_rock", conn):
print(row)
14 changes: 14 additions & 0 deletions docs/code_demos/load/demo_load_min.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""ETL Helper script to demonstrate load."""
import sqlite3
import etlhelper as etl

db_file = "igneous_rocks.db"

rows = [
{"name": "basalt", "grain_size": "fine"},
{"name": "granite", "grain_size": "coarse"}
]

with sqlite3.connect(db_file) as conn:
# Note that table must already exist
processed, failed = etl.load("igneous_rock", conn, rows)
27 changes: 27 additions & 0 deletions docs/code_demos/recipes/apache_airflow_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""ETL Helper script to demonstrate using Apache Airflow to schedule tasks."""
import datetime as dt
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from database_to_database import copy_readings


def copy_readings_with_args(**kwargs) -> None:
# Set arguments for copy_readings from context
start = kwargs.get("prev_execution_date")
end = kwargs.get("execution_date")
copy_readings(start, end)


dag = DAG(
"readings",
schedule_interval=dt.timedelta(days=1),
start_date=dt.datetime(2019, 8, 1),
catchup=True,
)

t1 = PythonOperator(
task_id="copy_readings",
python_callable=copy_readings_with_args,
provide_context=True,
dag=dag,
)
Loading

0 comments on commit dc2111e

Please sign in to comment.