diff --git a/etlhelper/etl.py b/etlhelper/etl.py index 1808026..57c865f 100644 --- a/etlhelper/etl.py +++ b/etlhelper/etl.py @@ -46,8 +46,6 @@ class FailedRow(NamedTuple): exception: Exception -# iter_chunks is where data are retrieved from source database -# All data extraction processes call this function. def iter_chunks( select_query: str, conn: Connection, @@ -66,14 +64,19 @@ def iter_chunks( The transform function is applied to chunks of data as they are extracted from the database. - :param select_query: str, SQL query to execute + All data extraction functions call this function, directly or indirectly. + + :param select_query: SQL query to execute :param conn: dbapi connection - :param parameters: sequence or dict of bind variables to insert in the query + :param parameters: bind variables to insert in the query :param row_factory: function that accepts a cursor and returns a function for parsing each row - :param transform: function that accepts an iterable (e.g. list) of rows and - returns an iterable of rows (possibly of different shape) - :param chunk_size: int, size of chunks to group data by + :param transform: function that accepts a list of rows and + returns an list of rows (possibly of different shape) + :param chunk_size: size of chunks to group data by + :return: generator returning a list of objects which each + represent a row of data using the given row_factory + :raises ETLHelperExtractError: if SQL raises an error """ logger.info("Fetching rows (chunk_size=%s)", chunk_size) logger.debug(f"Fetching:\n\n{select_query}\n\nwith parameters:\n\n" @@ -144,14 +147,16 @@ def iter_rows( Run SQL query against connection and return iterator object to loop over results, row-by-row. - :param select_query: str, SQL query to execute + :param select_query: SQL query to execute :param conn: dbapi connection - :param parameters: sequence or dict of bind variables to insert in the query + :param parameters: bind variables to insert in the query :param row_factory: function that accepts a cursor and returns a function for parsing each row - :param transform: function that accepts an iterable (e.g. list) of rows and - returns an iterable of rows (possibly of different shape) - :param chunk_size: int, size of chunks to group data by + :param transform: function that accepts a list of rows and + returns an list of rows (possibly of different shape) + :param chunk_size: size of chunks to group data by + :return: generator returning a list of objects which each + represent a row of data using the given row_factory """ for chunk in iter_chunks(select_query, conn, row_factory=row_factory, parameters=parameters, transform=transform, @@ -172,13 +177,15 @@ def fetchone( Get first result of query. See iter_rows for details. Note: iter_rows is recommended for looping over rows individually. - :param select_query: str, SQL query to execute + :param select_query: SQL query to execute :param conn: dbapi connection - :param parameters: sequence or dict of bind variables to insert in the query + :param parameters: bind variables to insert in the query :param row_factory: function that accepts a cursor and returns a function for parsing each row - :param transform: function that accepts an iterable (e.g. list) of rows and - returns an iterable of rows (possibly of different shape) + :param transform: function that accepts a list of rows and + returns an list of rows (possibly of different shape) + :param chunk_size: size of chunks to group data by + :return: None or a row of data using the given row_factory """ try: result = next(iter_rows(select_query, conn, row_factory=row_factory, @@ -203,13 +210,16 @@ def fetchall( ) -> Chunk: """ Get all results of query as a list. See iter_rows for details. - :param select_query: str, SQL query to execute + + :param select_query: SQL query to execute :param conn: dbapi connection - :param parameters: sequence or dict of bind variables to insert in the query - :param row_factory: function that accepts a cursor and returns a function for parsing each row - :param transform: function that accepts an iterable (e.g. list) of rows and returns an iterable - of rows (possibly of different shape) - :param chunk_size: int, size of chunks to group data by + :param parameters: bind variables to insert in the query + :param row_factory: function that accepts a cursor and returns a function + for parsing each row + :param transform: function that accepts a list of rows and + returns an list of rows (possibly of different shape) + :param chunk_size: size of chunks to group data by + :return: a row of data using the given row_factory """ return list(iter_rows(select_query, conn, row_factory=row_factory, parameters=parameters, transform=transform, @@ -244,15 +254,16 @@ def executemany( disadvantage is that investigation may be required to determine exactly which records have been successfully transferred. - :param query: str, SQL insert command with placeholders for data + :param query: SQL insert command with placeholders for data :param conn: dbapi connection - :param rows: List of tuples containing data to be inserted/updated - :param transform: function that accepts an iterable (e.g. list) of rows and - returns an iterable of rows (possibly of different shape) - :param on_error: Function to be applied to failed rows in each chunk - :param commit_chunks: bool, commit after each chunk has been inserted/updated - :param chunk_size: int, size of chunks to group data by - :return processed, failed: (int, int) number of rows processed, failed + :param rows: an iterable of rows containing data to be inserted/updated + :param transform: function that accepts a list of rows and + returns an list of rows (possibly of different shape) + :param on_error: function to be applied to failed rows in each chunk + :param commit_chunks: commit after each chunk has been inserted/updated + :param chunk_size: size of chunks to group data by + :return: the number of rows processed and the number of rows failed + :raises ETLHelperInsertError: if SQL raises an error """ logger.info("Executing many (chunk_size=%s)", chunk_size) logger.debug("Executing:\n\n%s\n\nagainst:\n\n%s", query, conn) @@ -331,13 +342,12 @@ def _execute_by_row( ) -> list[FailedRow]: """ Retry execution of rows individually and return failed rows along with - their errors. Successful inserts are committed. This is because - (and other?) + their errors. Successful inserts are committed. - :param query: str, SQL command with placeholders for data - :param chunk: list, list of row parameters - :param conn: open dbapi connection, used for transactions - :returns failed_rows: list of (row, exception) tuples + :param query: SQL query with placeholders for data + :param conn: dbapi connection + :param chunk: list of rows + :return: a list failed rows """ failed_rows: list[FailedRow] = [] @@ -382,19 +392,19 @@ def copy_rows( tuples. on_error is a function that is called at the end of each chunk, with the list as the only argument. - :param select_query: str, select rows from Oracle. - :param source_conn: open dbapi connection - :param insert_query: - :param dest_conn: open dbapi connection - :param parameters: sequence or dict of bind variables for select query + :param select_query: SQL query to select data + :param source_conn: dbapi connection + :param insert_query: SQL query to insert data + :param dest_conn: dbapi connection + :param parameters: bind variables to insert in the select query :param row_factory: function that accepts a cursor and returns a function for parsing each row - :param transform: function that accepts an iterable (e.g. list) of rows and - returns an iterable of rows (possibly of different shape) - :param on_error: Function to be applied to failed rows in each chunk - :param commit_chunks: bool, commit after each chunk (see executemany) - :param chunk_size: int, size of chunks to group data by - :return processed, failed: (int, int) number of rows processed, failed + :param transform: function that accepts a list of rows and + returns an list of rows (possibly of different shape) + :param on_error: function to be applied to failed rows in each chunk + :param commit_chunks: commit after each chunk (see executemany) + :param chunk_size: size of chunks to group data by + :return: the number of rows processed and the number of rows failed """ rows_generator = iter_rows(select_query, source_conn, parameters=parameters, row_factory=row_factory, @@ -415,9 +425,10 @@ def execute( """ Run SQL query against connection. - :param query: str, SQL query to execute + :param query: SQL query to execute :param conn: dbapi connection - :param parameters: sequence or dict of bind variables to insert in the query + :param parameters: bind variables to insert in the query + :raises ETLHelperQueryError: if SQL raises an error """ logger.info("Executing query") logger.debug(f"Executing:\n\n{query}\n\nwith parameters:\n\n" @@ -465,19 +476,18 @@ def copy_table_rows( tuples. on_error is a function that is called at the end of each chunk, with the list as the only argument. - :param source_conn: open dbapi connection - :param dest_conn: open dbapi connection + :param table: name of table + :param source_conn: dbapi connection + :param dest_conn: dbapi connection :param target: name of target table, if different from source :param row_factory: function that accepts a cursor and returns a function for parsing each row - :param transform: function that accepts an iterable (e.g. list) of rows and - returns an iterable of rows (possibly of different shape) - :param on_error: Function to be applied to failed rows in each chunk - :param commit_chunks: bool, commit after each chunk (see executemany) - :param chunk_size: int, size of chunks to group data by - :param select_sql_suffix: str, SQL clause(s) to append to select statement - e.g. WHERE, ORDER BY, LIMIT - :return processed, failed: (int, int) number of rows processed, failed + :param transform: function that accepts a list of rows and + returns an list of rows (possibly of different shape) + :param on_error: function to be applied to failed rows in each chunk + :param commit_chunks: commit after each chunk (see executemany) + :param chunk_size: size of chunks to group data by + :return: the number of rows processed and the number of rows failed """ validate_identifier(table) @@ -513,14 +523,14 @@ def load( with the list as the only argument. :param table: name of table - :param conn: open dbapi connection + :param conn: dbapi connection :param rows: iterable of named tuples or dictionaries of data - :param transform: function that accepts an iterable (e.g. list) of rows and - returns an iterable of rows (possibly of different shape) - :param on_error: Function to be applied to failed rows in each chunk - :param commit_chunks: bool, commit after each chunk (see executemany) - :param chunk_size: int, size of chunks to group data by - :return processed, failed: (int, int) number of rows processed, failed + :param transform: function that accepts a list of rows and + returns an list of rows (possibly of different shape) + :param on_error: function to be applied to failed rows in each chunk + :param commit_chunks: commit after each chunk (see executemany) + :param chunk_size: size of chunks to group data by + :return: the number of rows processed and the number of rows failed """ # Return early if rows is empty if not rows: @@ -566,8 +576,18 @@ def generate_insert_sql( conn: Connection ) -> str: """Generate insert SQL for table, getting column names from row and the + Generate insert SQL for table, getting column names from row and the placeholder style from the connection. `row` is either a namedtuple or - a dictionary.""" + a dictionary. + + :param table: name of table + :param row: a single row as a namedtuple or dict + :param conn: dbapi connection + :return: SQL statement to insert data into the given table + :raises ETLHelperInsertError: if 'row' is not a namedtuple or a dict, + or if the database connection encounters a + parameter error + """ helper = DB_HELPER_FACTORY.from_conn(conn) paramstyles = { "qmark": "?", @@ -621,7 +641,9 @@ def validate_identifier(identifier: str) -> None: Identifiers must comprise alpha-numeric characters, plus `_` or `$` and cannot start with `$`, or numbers. - :raises ETLHelperBadIdentifierError: + :param identifier: a database identifier + :raises ETLHelperBadIdentifierError: if the 'identifier' contains invalid + characters """ # Identifier rules are based on PostgreSQL specifications, defined here: # https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS @@ -644,7 +666,12 @@ def _chunker( ) -> Iterator[tuple[Union[Row, None], ...]]: """Collect data into fixed-length chunks or blocks. Code from recipe at https://docs.python.org/3.6/library/itertools.html + + :param iterable: an iterable object + :param n_chunks: the number of values in each chunk + :return: generator returning tuples of rows, of length n_chunks, + where empty values are filled using None """ - # _chunker('ABCDEFG', 3) --> ABC DEF G" + # _chunker((A,B,C,D,E,F,G), 3) --> (A,B,C) (D,E,F) (G,None,None) args = [iter(iterable)] * n_chunks return zip_longest(*args, fillvalue=None)