Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Etl post type hint docs #187

Merged
merged 2 commits into from
Sep 15, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 96 additions & 69 deletions etlhelper/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ class FailedRow(NamedTuple):
exception: Exception


# iter_chunks is where data are retrieved from source database
# All data extraction processes call this function.
def iter_chunks(
select_query: str,
conn: Connection,
Expand All @@ -66,14 +64,19 @@ def iter_chunks(
The transform function is applied to chunks of data as they are extracted
from the database.

:param select_query: str, SQL query to execute
All data extraction functions call this function, directly or indirectly.

:param select_query: SQL query to execute
:param conn: dbapi connection
:param parameters: sequence or dict of bind variables to insert in the query
:param parameters: bind variables to insert in the query
:param row_factory: function that accepts a cursor and returns a function
for parsing each row
:param transform: function that accepts an iterable (e.g. list) of rows and
returns an iterable of rows (possibly of different shape)
:param chunk_size: int, size of chunks to group data by
:param transform: function that accepts a list of rows and
returns an list of rows (possibly of different shape)
:param chunk_size: size of chunks to group data by
:return: generator returning a list of objects which each
represent a row of data using the given row_factory
:raises ETLHelperExtractError: if SQL raises an error
"""
logger.info("Fetching rows (chunk_size=%s)", chunk_size)
logger.debug(f"Fetching:\n\n{select_query}\n\nwith parameters:\n\n"
Expand Down Expand Up @@ -144,14 +147,16 @@ def iter_rows(
Run SQL query against connection and return iterator object to loop over
results, row-by-row.

:param select_query: str, SQL query to execute
:param select_query: SQL query to execute
:param conn: dbapi connection
:param parameters: sequence or dict of bind variables to insert in the query
:param parameters: bind variables to insert in the query
:param row_factory: function that accepts a cursor and returns a function
for parsing each row
:param transform: function that accepts an iterable (e.g. list) of rows and
returns an iterable of rows (possibly of different shape)
:param chunk_size: int, size of chunks to group data by
:param transform: function that accepts a list of rows and
returns an list of rows (possibly of different shape)
:param chunk_size: size of chunks to group data by
:return: generator returning a list of objects which each
represent a row of data using the given row_factory
"""
for chunk in iter_chunks(select_query, conn, row_factory=row_factory,
parameters=parameters, transform=transform,
Expand All @@ -172,13 +177,15 @@ def fetchone(
Get first result of query. See iter_rows for details. Note: iter_rows is
recommended for looping over rows individually.

:param select_query: str, SQL query to execute
:param select_query: SQL query to execute
:param conn: dbapi connection
:param parameters: sequence or dict of bind variables to insert in the query
:param parameters: bind variables to insert in the query
:param row_factory: function that accepts a cursor and returns a function
for parsing each row
:param transform: function that accepts an iterable (e.g. list) of rows and
returns an iterable of rows (possibly of different shape)
:param transform: function that accepts a list of rows and
returns an list of rows (possibly of different shape)
:param chunk_size: size of chunks to group data by
:return: None or a row of data using the given row_factory
"""
try:
result = next(iter_rows(select_query, conn, row_factory=row_factory,
Expand All @@ -203,13 +210,16 @@ def fetchall(
) -> Chunk:
"""
Get all results of query as a list. See iter_rows for details.
:param select_query: str, SQL query to execute

:param select_query: SQL query to execute
:param conn: dbapi connection
:param parameters: sequence or dict of bind variables to insert in the query
:param row_factory: function that accepts a cursor and returns a function for parsing each row
:param transform: function that accepts an iterable (e.g. list) of rows and returns an iterable
of rows (possibly of different shape)
:param chunk_size: int, size of chunks to group data by
:param parameters: bind variables to insert in the query
:param row_factory: function that accepts a cursor and returns a function
for parsing each row
:param transform: function that accepts a list of rows and
returns an list of rows (possibly of different shape)
:param chunk_size: size of chunks to group data by
:return: a row of data using the given row_factory
"""
return list(iter_rows(select_query, conn, row_factory=row_factory,
parameters=parameters, transform=transform,
Expand Down Expand Up @@ -244,15 +254,16 @@ def executemany(
disadvantage is that investigation may be required to determine exactly
which records have been successfully transferred.

:param query: str, SQL insert command with placeholders for data
:param query: SQL insert command with placeholders for data
:param conn: dbapi connection
:param rows: List of tuples containing data to be inserted/updated
:param transform: function that accepts an iterable (e.g. list) of rows and
returns an iterable of rows (possibly of different shape)
:param on_error: Function to be applied to failed rows in each chunk
:param commit_chunks: bool, commit after each chunk has been inserted/updated
:param chunk_size: int, size of chunks to group data by
:return processed, failed: (int, int) number of rows processed, failed
:param rows: an iterable of rows containing data to be inserted/updated
:param transform: function that accepts a list of rows and
returns an list of rows (possibly of different shape)
:param on_error: function to be applied to failed rows in each chunk
:param commit_chunks: commit after each chunk has been inserted/updated
:param chunk_size: size of chunks to group data by
:return: the number of rows processed and the number of rows failed
:raises ETLHelperInsertError: if SQL raises an error
"""
logger.info("Executing many (chunk_size=%s)", chunk_size)
logger.debug("Executing:\n\n%s\n\nagainst:\n\n%s", query, conn)
Expand Down Expand Up @@ -331,13 +342,12 @@ def _execute_by_row(
) -> list[FailedRow]:
"""
Retry execution of rows individually and return failed rows along with
their errors. Successful inserts are committed. This is because
(and other?)
their errors. Successful inserts are committed.

:param query: str, SQL command with placeholders for data
:param chunk: list, list of row parameters
:param conn: open dbapi connection, used for transactions
:returns failed_rows: list of (row, exception) tuples
:param query: SQL query with placeholders for data
:param conn: dbapi connection
:param chunk: list of rows
:return: a list failed rows
"""
failed_rows: list[FailedRow] = []

Expand Down Expand Up @@ -382,19 +392,19 @@ def copy_rows(
tuples. on_error is a function that is called at the end of each chunk,
with the list as the only argument.

:param select_query: str, select rows from Oracle.
:param source_conn: open dbapi connection
:param insert_query:
:param dest_conn: open dbapi connection
:param parameters: sequence or dict of bind variables for select query
:param select_query: SQL query to select data
:param source_conn: dbapi connection
:param insert_query: SQL query to insert data
:param dest_conn: dbapi connection
:param parameters: bind variables to insert in the select query
:param row_factory: function that accepts a cursor and returns a function
for parsing each row
:param transform: function that accepts an iterable (e.g. list) of rows and
returns an iterable of rows (possibly of different shape)
:param on_error: Function to be applied to failed rows in each chunk
:param commit_chunks: bool, commit after each chunk (see executemany)
:param chunk_size: int, size of chunks to group data by
:return processed, failed: (int, int) number of rows processed, failed
:param transform: function that accepts a list of rows and
returns an list of rows (possibly of different shape)
:param on_error: function to be applied to failed rows in each chunk
:param commit_chunks: commit after each chunk (see executemany)
:param chunk_size: size of chunks to group data by
:return: the number of rows processed and the number of rows failed
"""
rows_generator = iter_rows(select_query, source_conn,
parameters=parameters, row_factory=row_factory,
Expand All @@ -415,9 +425,10 @@ def execute(
"""
Run SQL query against connection.

:param query: str, SQL query to execute
:param query: SQL query to execute
:param conn: dbapi connection
:param parameters: sequence or dict of bind variables to insert in the query
:param parameters: bind variables to insert in the query
:raises ETLHelperQueryError: if SQL raises an error
"""
logger.info("Executing query")
logger.debug(f"Executing:\n\n{query}\n\nwith parameters:\n\n"
Expand Down Expand Up @@ -465,19 +476,18 @@ def copy_table_rows(
tuples. on_error is a function that is called at the end of each chunk,
with the list as the only argument.

:param source_conn: open dbapi connection
:param dest_conn: open dbapi connection
:param table: name of table
:param source_conn: dbapi connection
:param dest_conn: dbapi connection
:param target: name of target table, if different from source
:param row_factory: function that accepts a cursor and returns a function
for parsing each row
:param transform: function that accepts an iterable (e.g. list) of rows and
returns an iterable of rows (possibly of different shape)
:param on_error: Function to be applied to failed rows in each chunk
:param commit_chunks: bool, commit after each chunk (see executemany)
:param chunk_size: int, size of chunks to group data by
:param select_sql_suffix: str, SQL clause(s) to append to select statement
e.g. WHERE, ORDER BY, LIMIT
:return processed, failed: (int, int) number of rows processed, failed
:param transform: function that accepts a list of rows and
returns an list of rows (possibly of different shape)
:param on_error: function to be applied to failed rows in each chunk
:param commit_chunks: commit after each chunk (see executemany)
:param chunk_size: size of chunks to group data by
:return: the number of rows processed and the number of rows failed
"""
validate_identifier(table)

Expand Down Expand Up @@ -513,14 +523,14 @@ def load(
with the list as the only argument.

:param table: name of table
:param conn: open dbapi connection
:param conn: dbapi connection
:param rows: iterable of named tuples or dictionaries of data
:param transform: function that accepts an iterable (e.g. list) of rows and
returns an iterable of rows (possibly of different shape)
:param on_error: Function to be applied to failed rows in each chunk
:param commit_chunks: bool, commit after each chunk (see executemany)
:param chunk_size: int, size of chunks to group data by
:return processed, failed: (int, int) number of rows processed, failed
:param transform: function that accepts a list of rows and
returns an list of rows (possibly of different shape)
:param on_error: function to be applied to failed rows in each chunk
:param commit_chunks: commit after each chunk (see executemany)
:param chunk_size: size of chunks to group data by
:return: the number of rows processed and the number of rows failed
"""
# Return early if rows is empty
if not rows:
Expand Down Expand Up @@ -566,8 +576,18 @@ def generate_insert_sql(
conn: Connection
) -> str:
"""Generate insert SQL for table, getting column names from row and the
Generate insert SQL for table, getting column names from row and the
placeholder style from the connection. `row` is either a namedtuple or
a dictionary."""
a dictionary.

:param table: name of table
:param row: a single row as a namedtuple or dict
:param conn: dbapi connection
:return: SQL statement to insert data into the given table
:raises ETLHelperInsertError: if 'row' is not a namedtuple or a dict,
or if the database connection encounters a
parameter error
"""
helper = DB_HELPER_FACTORY.from_conn(conn)
paramstyles = {
"qmark": "?",
Expand Down Expand Up @@ -621,7 +641,9 @@ def validate_identifier(identifier: str) -> None:
Identifiers must comprise alpha-numeric characters, plus `_` or `$` and
cannot start with `$`, or numbers.

:raises ETLHelperBadIdentifierError:
:param identifier: a database identifier
:raises ETLHelperBadIdentifierError: if the 'identifier' contains invalid
characters
"""
# Identifier rules are based on PostgreSQL specifications, defined here:
# https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
Expand All @@ -644,7 +666,12 @@ def _chunker(
) -> Iterator[tuple[Union[Row, None], ...]]:
"""Collect data into fixed-length chunks or blocks.
Code from recipe at https://docs.python.org/3.6/library/itertools.html

:param iterable: an iterable object
:param n_chunks: the number of values in each chunk
:return: generator returning tuples of rows, of length n_chunks,
where empty values are filled using None
"""
# _chunker('ABCDEFG', 3) --> ABC DEF G"
# _chunker((A,B,C,D,E,F,G), 3) --> (A,B,C) (D,E,F) (G,None,None)
args = [iter(iterable)] * n_chunks
return zip_longest(*args, fillvalue=None)
Loading