From c65bce33b2f8a352dbdb9a9e6102c9b695ab5e2d Mon Sep 17 00:00:00 2001 From: John A Stevenson Date: Mon, 25 Sep 2023 15:43:22 +0100 Subject: [PATCH] Replace docs with summary and draft v1.0 release notes --- README.md | 1129 ++--------------------------------------------------- 1 file changed, 43 insertions(+), 1086 deletions(-) diff --git a/README.md b/README.md index 3b822a1..d7203ec 100644 --- a/README.md +++ b/README.md @@ -2,1108 +2,81 @@ > etlhelper is a Python ETL library to simplify data transfer into and out of databases. -> Note: There are a number of breaking changes planned for `etlhelper` version 1.0. -> Please pin the version number in your dependency list to avoid disruption and -> watch the project on GitHub for notification of new releases (in Custom section). - - -## Overview - -`etlhelper` makes it easy to run a SQL query via Python and return the results. +`etlhelper` makes it easy to run a SQL queries via Python and return the results. It is built upon the [DBAPI2 -specification](https://www.python.org/dev/peps/pep-0249/) and takes care of -importing drivers, formatting connection strings and cursor management. -This reduces the amount of boilerplate code required to query a relational -database with Python. - -### Features - -+ `DbParams` objects provide consistent way to connect to different database types (currently Oracle, PostgreSQL, SQLite and MS SQL Server) -+ `fetchall`, `iter_rows`, `fetchone` and other functions for querying database -+ `execute`, `executemany`, and `load` functions to insert data -+ `copy_rows` and `copy_table_rows` to transfer data from one database to another -+ `on_error` function to process rows that fail to insert -+ Support for parameterised queries and in-flight transformation of data -+ Output results as namedtuple or dictionary -+ Timestamped log messages for tracking long-running data transfers -+ Helpful error messages display the failed query SQL - -These tools can create easy-to-understand, lightweight, versionable and testable Extract-Transform-Load (ETL) workflows. -`etlhelper` is not a tool for coordinating ETL jobs (use [Apache Airflow](https://airflow.apache.org)), for -converting GIS data formats (use [ogr2ogr](https://gdal.org/programs/ogr2ogr.html) or [fiona](https://pypi.org/project/Fiona/)), for translating between SQL dialects or providing Object Relation Mapping (use [SQLAlchemy](https://www.sqlalchemy.org/)). -However, it can be used in conjunction with each of these. +specification](https://www.python.org/dev/peps/pep-0249/) and takes care of cursor management, importing drivers and formatting connection strings, while providing memory-efficient functions to read, write and transform data. +This reduces the amount of boilerplate code required to manipulate data within relational +databases with Python. -![screencast](https://github.com/BritishGeologicalSurvey/etlhelper/blob/main/docs/media/screencast.gif?raw=true) +--- -The documentation below explains how the main features are used. -See the individual function docstrings for full details of parameters and -options. ++ **Documentation**: https://britishgeologicalsurvey.github.io/etlhelper/ ++ **Source code**: https://github.com/BritishGeologicalSurvey/etlhelper -For a high level introduction to `etlhelper`, see the FOSS4GUK 2019 presentation _Open Source Spatial ETL with Python and Apache Airflow_: [video](https://www.youtube.com/watch?v=12rzUW4ps74&feature=youtu.be&t=6238) (20 mins), -[slides](https://volcan01010.github.io/FOSS4G2019-talk). +--- +## Version 1.0 coming soon! -### Documentation - - + [Installation](#installation) - + [Connect to databases](#connect-to-databases) - + [Transfer data](#transfer-data) - + [Utilities](#utilities) - + [Recipes](#recipes) - + [Development](#development) - + [References](#references) - - -## Installation - -### Python packages +The code for ETL Helper version 1.0 has now been merged into `main`, although not yet deployed to PyPI. +It contains many breaking changes. +The [documentation pages](https://britishgeologicalsurvey.github.io/etlhelper/) now correspond to the upcoming version 1.0. +To use this now, install directly from GitHub: ```bash -pip install etlhelper -``` - -Database driver packages are not included by default and should be specified in -square brackets. -Options are `oracle` (installs oracledb), `mssql` (installs pyodbc) and `postgres` (installs psycopg2). -Multiple values can be separated by commas. - -``` -pip install etlhelper[oracle,postgres] -``` - -The `sqlite3` driver is included within Python's Standard Library. - - -### Database driver dependencies - -Some database drivers have additional dependencies. -On Linux, these can be installed via the system package manager. - -pyodbc (for MS SQL Server): - -+ Follow instructions on [Microsoft SQL Docs website](https://docs.microsoft.com/en-us/sql/connect/odbc/linux-mac/installing-the-microsoft-odbc-driver-for-sql-server?view=sql-server-2017) - - -## Connect to databases - -### DbParams - -Database connection details are defined by `DbParams` objects. -Connections are made via their `connect` functions (see below). -`DbParams` objects are created as follows or from environment variables using the -`from_environment()` function. -The class initialisation function checks that the correct attributes have been provided for -a given `dbtype`. - -```python -from etlhelper import DbParams - -ORACLEDB = DbParams(dbtype='ORACLE', host="localhost", port=1521, - dbname="mydata", user="oracle_user") - -POSTGRESDB = DbParams(dbtype='PG', host="localhost", port=5432, - dbname="mydata", user="postgres_user") - -SQLITEDB = DbParams(dbtype='SQLITE', filename='/path/to/file.db') - -MSSQLDB = DbParams(dbtype='MSSQL', host="localhost", port=1433, - dbname="mydata", user="mssql_user", - odbc_driver="ODBC Driver 17 for SQL Server") -``` - -DbParams objects have a function to check if a given database can be reached -over the network. This does not require a username or password. - -```python -if not ORACLEDB.is_reachable(): - raise ETLHelperError("Network problems") -``` - -Other methods/properties are `get_connection_string`, -`get_sqlalchemy_connection_string`, `paramstyle` and `copy`. -See function docstrings for details. - -### `connect` function - -The `DbParams.connect()` function returns a DBAPI2 connection as provided by the -underlying driver. -Using context-manager syntax as below ensures that the connection is closed -after use. - -```python -with SQLITEDB.connect() as conn1: - with POSTGRESDB.connect('PGPASSWORD') as conn2: - do_something() -``` - -A standalone `connect` function provides backwards-compatibility with -previous releases of `etlhelper`: - -```python -from etlhelper import connect -conn3 = connect(ORACLEDB, 'ORACLE_PASSWORD') -``` - -Both versions accept additional keyword arguments that are passed to the `connect` -function of the underlying driver. For example, the following sets the character -encoding used by oracledb to ensure that values are returned as UTF-8: - -```python -conn4 = connect(ORACLEDB, 'ORACLE_PASSWORD', encoding="UTF-8", nencoding="UTF8") -``` - -The above is a solution when special characters are scrambled in the returned data. - -#### Handling of LOBs for Oracle connections - -Oracle databases have special column types for Character Large Object (CLOB) and -Binary Large Object (BLOB). -In ETLHelper, the `oracledb` driver has been configured to return these as native -Python `str` and `bytes` objects respectively. -This is comparable to the behaviour of other database drivers e.g. SQLite, -PostgreSQL and avoids the user having to take the extra step of reading the LOB -and results in faster data transfer. -However, it is not suitable for LOBs larger than 1 Gb. - -To return CLOB and BLOB columns as LOBs, configure the driver as follows: - -```python -import etlhelper as etl -import oracledb - -select_sql = "SELECT my_clob, my_blob FROM my_table" - -with ORACLEDB.connect("ORA_PASSWORD") as conn: - # By default, ETL Helper returns native types - result_as_native = etl.fetchall(select_sql, conn) - - # Update oracledb settings to return LOBs - oracledb.defaults.fetch_lobs = True - result_as_lobs = etl.fetchall(select_sql, conn) -``` - -See the [oracledb docs](https://python-oracledb.readthedocs.io/en/latest/user_guide/lob_data.html#fetching-lobs-as-strings-and-bytes) for more information. - - -#### Disabling fast_executemany for SQL Server and other pyODBC connections - -By default an `etlhelper` pyODBC connection uses a cursor with its -`fast_executemany` attribute set to `True`. This setting improves the -performance of the `executemany` when performing bulk inserts to a -SQL Server database. However, this overides the default behaviour -of pyODBC and there are some limitations in doing this. Importantly, -it is only recommended for applications that use Microsoft's ODBC Driver for -SQL Server. See [pyODBC fast_executemany](https://github.com/mkleehammer/pyodbc/wiki/Features-beyond-the-DB-API#fast_executemany). - -Using `fast_executemany` may raise a `MemoryError` if query involves columns of types -`TEXT` and `NTEXT`, which are now deprecated. -Under these circumstances, `etlhelper` falls back on `fast_executemany` being set to -`False` and produces a warning output. See [Inserting into SQL server with -fast_executemany results in MemoryError](https://github.com/mkleehammer/pyodbc/issues/547). - -If required, the `fast_executemany` attribute can be set to `False` via the -`connect` function: - -```python -conn = connect(MSSQLDB, 'MSSQL_PASSWORD', fast_executemany=False) -``` - -This keyword argument is used by `etlhelper`, any further keyword arguments are -passed to the `connect` function of the underlying driver. - -#### Connecting to servers with self-signed certificates with SQL Server - -Since the ODBC Driver 18 for SQL Server, the default setting has been to fail -certificate validation for servers with self-signed certificates. -It is possible to override this setting within the connection string. - -ETLHelper provides an optional argument to the `connect` function to apply the -override and trust the server's self-signed certificate. - -```python -conn = connect(MSSQLDB, 'MSSQL_PASSWORD', trust_server_certificate=True) -``` - -### Passwords - -Database passwords must be specified via an environment variable. -This reduces the temptation to store them within scripts. -This can be done on the command line via: - -+ `export ORACLE_PASSWORD=some-secret-password` on Linux -+ `set ORACLE_PASSWORD=some-secret-password` on Windows - -Or in a Python terminal via: - -```python -import os -os.environ['ORACLE_PASSWORD'] = 'some-secret-password' -``` - -No password is required for SQLite databases. - - -## Transfer data - -### Fetch all - -The `fetchall` function returns a list of named tuples containing data as -native Python objects. - -```python -from my_databases import ORACLEDB -from etlhelper import fetchall - -sql = "SELECT * FROM src" - -with ORACLEDB.connect("ORA_PASSWORD") as conn: - fetchall(sql, conn) +pip install git+https://github.com/BritishGeologicalSurvey/etlhelper.git@main ``` -returns - -``` -[Row(id=1, value=1.234, simple_text='text', utf8_text='Öæ°\nz', - day=datetime.date(2018, 12, 7), - date_time=datetime.datetime(2018, 12, 7, 13, 1, 59)), - Row(id=2, value=2.234, simple_text='text', utf8_text='Öæ°\nz', - day=datetime.date(2018, 12, 8), - date_time=datetime.datetime(2018, 12, 8, 13, 1, 59)), - Row(id=3, value=2.234, simple_text='text', utf8_text='Öæ°\nz', - day=datetime.date(2018, 12, 9), - date_time=datetime.datetime(2018, 12, 9, 13, 1, 59))] -``` - -Data are accessible via index (`row[4]`) or name (`row.day`). - -Other functions are provided to select data. `fetchone` and -`fetchall` are equivalent to the cursor methods specified in the DBAPI v2.0. -ETL Helper does not include a `fetchmany` function - instead use `iter_chunks` -to loop over a result set in batches of multiple rows. - - -#### iter_rows - -It is recommended to use `iter_rows` for looping over large result sets. It -is a generator function that only yields data as requested. This ensures that -the data are not all loaded into memory at once. - -``` -with ORACLEDB.connect("ORA_PASSWORD") as conn: - for row in iter_rows(sql, conn): - do_something(row) -``` - - -#### Parameters - -Variables can be inserted into queries by passing them as parameters. -These "bind variables" are sanitised by the underlying drivers to prevent [SQL -injection attacks](https://xkcd.com/327/). -The required [paramstyle](https://www.python.org/dev/peps/pep-0249/#paramstyle) -can be checked with `MY_DB.paramstyle`. -A tuple is used for positional placeholders, or a dictionary for named -placeholders. - -```python -select_sql = "SELECT * FROM src WHERE id = :id" - -with ORACLEDB.connect("ORA_PASSWORD") as conn: - fetchall(sql, conn, parameters={'id': 1}) -``` -#### Row factories +### Draft release notes for v1 -Row factories control the output format of returned rows. +ETL Helper version 1.0 contains many breaking changes and new ways of working. -For example return each row as a dictionary, use the following: +#### Breaking changes -```python -from etlhelper import fetchall -from etlhelper.row_factories import dict_row_factory ++ The `cxOracle` driver has been replaced with the `python-oracledb` driver. `python-oracledb` does not depend on Oracle Instant Client and all related code `setup_oracle_client` has been removed as it is no longer required. This is a great sign of progress, as one of the initial purposes of ETL Helper was to give us an easy way to install Oracle Instant Client. The `read_lob` flag has been deprecated as part of this change. See the Oracle driver documentation for details. ++ The default row factory has been changed from `namedtuple_row_factory`, to `dict_row_factory`. This is because dictionaries are mutable and so easier to use in transform functions. They are also more widely understood. To recreate the current behaviour in existing scripts, set `row_factory=namedtuple_row_factory`. ++ `get_rows` is deprecated; use `fetchall` instead. This does the same thing, but is better aligned with the DB API 2.0. ++ `fetchmany` is deprecated; use `iter_chunks` instead. The behaviour of ETL Helper's `fetchmany` was different to `cursor.fetchmany` as defined by the DB API 2.0 and this was confusing. It is also possible to use `chunks = etl.iter_chunks(...); do_something(next(chunk))`. ++ `dump_rows` is deprecated; use `for row in iter_rows(...); my_func(row)` instead. ++ The ETL Helper logger is no longer given a handler by default. This is the correct behaviour for a library. Activate logging with `etl.log_to_console()`. ++ The minimum supported Python version is 3.9. This allows us to benefit from security and typing improvements. -sql = "SELECT * FROM my_table" +#### Non-breaking changes -with ORACLEDB.connect('ORACLE_PASSWORD') as conn: - for row in fetchall(sql, conn, row_factory=dict_row_factory): - print(row['id']) -``` - -The `dict_row_factory` is useful when data are to be serialised to JSON/YAML, -as those formats use dictionaries as input. - -Four different row_factories are included, based in built-in Python types: - -|Row Factory|Attribute access|Mutable|Parameter placeholder| -|---|---|---|---| -|dict_row_factory (default)| `row["id"]`| Yes | Named | -|tuple_row_factory| `row[0]`| No | Positional | -|list_row_factory| `row[0]`| Yes | Positional | -|namedtuple_row_factory| `row.id` or `row[0]` | No | Positional | - -The choice of row factory depends on the use case. In general named tuples -and dictionaries are best for readable code, while using tuples or lists can -give a slight increase in performance. -Mutable rows are convenient when used with transform functions because they -can be modified without need to create a whole new output row. - -When using `copy_rows`, it is necessary to use appropriate parameter placeholder -style for the chosen row factory in the INSERT query. -Using the `dict_row_factory` requires a switch from named to positional -parameter placeholders (e.g. `%(id)s` instead of `%s` for PostgreSQL, `:id` -instead of `:1` for Oracle). -The `pyodbc` driver for MSSQL only supports positional placeholders. - -When using the `load` function in conjuction with `iter_chunks` data must be -either named tuples or dictionaries. - - -#### Transform - -The `transform` parameter allows passing of a function to transform the data -before returning it. -The function must take a list of rows and return a list of modified rows. -Rows of mutable types (dict, list) can be modified in-place, while rows of immutable types (tuples, namedtuples) must be created as new objects from the input rows. -See `transform` for more details. - - -#### Chunk size - -All data extraction functions use `iter_chunks` behind the scenes. -This reads rows from the database in chunks to prevent them all being loaded -into memory at once. -The default `chunk_size` is 5000 and this can be set via keyword argument. - - -### Insert rows - -`execute` can be used to insert a single row or to execute other single -statements e.g. "CREATE TABLE ...". -The `executemany` function is used to insert multiple rows of data. -Large datasets are broken into chunks and inserted in batches to reduce the -number of queries. -The INSERT query must container placeholders with an appropriate format for -the input data e.g. positional for tuples, named for dictionaries. -The number of rows that were processed and the number that failed is returned. - -```python -from etlhelper import executemany - -rows = [(1, 'value'), (2, 'another value')] -insert_sql = "INSERT INTO some_table (col1, col2) VALUES (%s, %s)" - -with POSTGRESDB.connect('PGPASSWORD') as conn: - processed, failed = executemany(insert_sql, conn, rows, chunk_size=1000) -``` ++ There is a new Sphinx-based documentation webpage at https://britishgeologicalsurvey.github.io/etlhelper. ++ There is a new preferred syntax, using `import etlhelper as etl` to provide easy access to the public functions. ++ `executemany` and `load` functions can now take a transform function. ++ Type hints have been added to the ETL functions ++ The build system uses `pyproject.toml` instead of `setup.py` and `versioneer`. ++ The CI pipeline has moved from Travis to GitHub actions ++ The CI pipeline runs the PostgreSQL integration tests (which cover the bulk of the application logic) -The `chunk_size` default is 5,000 and it can be set with a keyword argument. -The `commit_chunks` flag defaults to `True`. -This ensures that an error during a large data transfer doesn't require all the -records to be sent again. -Some work may be required to determine which records remain to be sent. -Setting `commit_chunks` to `False` will roll back the entire transfer in case -of an error. -Some database engines can return autogenerated values (e.g. primary key IDs) -after INSERT statements. -To capture these values, use the `fetchone` method to execute the SQL command -instead. - -```python -insert_sql = "INSERT INTO my_table (message) VALUES ('hello') RETURNING id" - -with POSTGRESDB.connect('PGPASSWORD') as conn: - result = fetchone(insert_sql, conn) - -print(result.id) -``` - -The `load` function is similar to `executemany` except that it autogenerates -an insert query based on the data provided. It uses `generate_insert_query` -to remove the need to explicitly write the query for simple cases. By -calling this function manually, users can create a base insert query that can -be extended with clauses such as `ON CONFLICT DO NOTHING`. - -NOTE: the `load` function uses the first row of data to generate the list of column for the insert query. If later items in the data contain extra columns, those columns will not be inserted and no error will be raised. - -As `generate_insert_query` creates SQL statements from user-provided input, -it checks the table and column names to ensure that they only contain valid -characters. - - -#### Handling insert errors - -The default behaviour of `etlhelper` is to raise an exception on the first -error and abort the transfer. -Sometimes it is desirable to ignore the errors and to do something else with -the failed rows. -The `on_error` parameter allows a function to be passed that is applied to the -failed rows of each chunk. -The input is a list of (row, exception) tuples. - -Different examples are given here. The simplest approach is to collect all the -errors into a list to process at the end. - -```python -errors = [] -executemany(sql, conn, rows, on_error=errors.extend) - -if errors: - do_something() -``` - -Errors can be logged to the `etlhelper` logger. - -```python -import logging - -import etlhelper as etl - -etl.log_to_console() -logger = logging.getLogger("etlhelper") - - -def log_errors(failed_rows): - for row, exception in failed_rows: - logger.error(exception) - -executemany(sql, conn, rows, on_error=log_errors) -``` - -The IDs of failed rows can be written to a file. - -```python -def write_bad_ids(failed_rows): - with open('bad_ids.txt', 'at') as out_file: - for row, exception in failed_rows: - out_file.write(f"{row.id}\n") - -executemany(sql, conn, rows, on_error=write_bad_ids) -``` - -`executemany`, `load`, `copy_rows` and `copy_table_rows` can all take an -`on_error` parameter. They each return a tuple containing the number of rows -processed and the number of rows that failed. - -### Copy table rows - -`copy_table_rows` provides a simple way to copy all the data from one table to -another. -It can take a `transform` function in case some modification of the data, e.g. -change of case of column names, is required. - -```python -from my_databases import POSTGRESDB, ORACLEDB -from etlhelper import copy_table_rows - -with ORACLEDB.connect("ORA_PASSWORD") as src_conn: - with POSTGRESDB.connect("PG_PASSWORD") as dest_conn: - copy_table_rows('my_table', src_conn, dest_conn) -``` - -The `chunk_size`, `commit_chunks` and `on_error` parameters can all be set. -A tuple with counts of rows processed and failed is returned. - -Note that the target table must already exist. -If it doesn't, you can use `execute` with a `CREATE TABLE IF NOT EXISTS ...` -statement to create it first. -See the recipes for examples. - -### Combining `iter_rows` with `load` - -For extra control selecting the data to be transferred, `iter_rows` can be -combined with `load`. - -```python -from my_databases import POSTGRESDB, ORACLEDB -from etlhelper import iter_rows, load - -select_sql = """ - SELECT id, name, value FROM my_table - WHERE value > :min_value -""" - -with ORACLEDB.connect("ORA_PASSWORD") as src_conn: - with POSTGRESDB.connect("PG_PASSWORD") as dest_conn: - rows = iter_rows(select_sql, src_conn, parameters={'min_value': 99}) - load('my_table', dest_conn, rows) -``` - -### Copy rows - -Customising both queries gives the greatest control on data selection and loading. -`copy_rows` takes the results from a SELECT query and applies them as parameters -to an INSERT query. -The source and destination tables must already exist. -For example, here we use GROUP BY and WHERE in the SELECT query and insert extra -auto-generated values via the INSERT query. - -```python -from my_databases import POSTGRESDB, ORACLEDB -from etlhelper import copy_rows - -select_sql = """ - SELECT - customer_id, - SUM (amount) AS total_amount - FROM payment - WHERE id > 1000 - GROUP BY customer_id -""" -insert_sql = """ - INSERT INTO dest (customer_id, total_amount, loaded_by, load_time) - VALUES (%s, %s, current_user, now()) -""" - -with ORACLEDB.connect("ORA_PASSWORD") as src_conn: - with POSTGRESDB.connect("PG_PASSWORD") as dest_conn: - copy_rows(select_sql, src_conn, insert_sql, dest_conn) -``` - -`parameters` can be passed to the SELECT query as before and the -`commit_chunks`, `chunk_size` and `on_error` options can be set. - -A tuple of rows processed and failed is returned. - - -### Transform - -Data can be transformed in-flight by applying a transform function. This is -any Python callable (e.g. function or class) that takes an iterator and returns -another iterator (e.g. list or generator via the `yield` statement). -Transform functions are applied to data as they are read from the database (in the case of data fetching -functions and `copy_rows`), or before they are passed as query parameters (to `executemany` or `load`). -When used with `copy_rows` or `executemany` the INSERT query must contain the correct placeholders for the -transform result. - -The `iter_chunks` and `iter_rows` functions that are used internally return -generators. Each chunk or row of data is only accessed when it is required. -This allows data transformation to be performed via [memory-efficient -iterator-chains](https://dbader.org/blog/python-iterator-chains). - -The simplest transform functions modify data returned mutable row factories e.g., -`dict_row_factory` in-place. -The `yield` keyword makes `my_transform` a generator function that returns an `Iterator` -that can loop over the rows. - -```python -from typing import Iterator -from etlhelper.row_factories import dict_row_factory - - -def my_transform(chunk: Iterator[dict]) -> Iterator[dict]: - # Add prefix to id, remove newlines, set lower case email addresses - - for row in chunk: # each row is a dictionary (mutable) - row['id'] += 1000 - row['description'] = row['description'].replace('\n', ' ') - row['email'] = row['email'].lower() - yield row - - -fetchall(select_sql, src_conn, row_factory=dict_row_factory, - transform=my_transform) -``` - -It is also possible to assemble the complete transformed chunk and return it. -This code demonstrates that the returned chunk can have a different number -of rows, and be of different length, to the input. -Because `namedtuple`s are immutable, we have to create a `new_row` from each -input `row`. - -```python -import random -from typing import Iterator -from etlhelper.row_factories import namedtuple_row_factory - - -def my_transform(chunk: Iterator[tuple]) -> list[tuple]: - # Append random integer (1-10), filter if <5. - - new_chunk = [] - for row in chunk: # each row is a namedtuple (immutable) - extra_value = random.randrange(10) - if extra_value >= 5: # some rows are dropped - new_row = (*row, extra_value) # new rows have extra column - new_chunk.append(new_row) - - return new_chunk - -fetchall(select_sql, src_conn, row_factory=namedtuple_row_factory, - transform=my_transform) -``` - -Any Python code can be used within the function and extra data can result from a calculation, -a call to a webservice or a query against another database. -As a standalone function with known inputs and outputs, the transform functions are -also easy to test. - - -### Aborting running jobs - -When running as a script, `etlhelper` jobs can be stopped by pressing _CTRL-C_. -This option is not available when the job is running as a background process, -e.g. in a GUI application. -The `abort_etlhelper_threads()` function is provided to cancel jobs running in -a separate thread by raising an `ETLHelperAbort` exception within the thread. - -The state of the data when the job is cancelled (or crashes) depends on the -arguments passed to `executemany` (or the functions that call it e.g. `load`, -`copy_rows`). - -+ If `commit_chunks` is `True` (default), all chunks up to the one where the - error occured are committed. -+ If `commit_chunks` is `False`, everything is rolled back and the database is - unchanged. -+ If an `on_error` function is defined, all rows without errors are committed. - - -## Utilities - -The following utility functions provide useful database metadata. - - -### Table info - - -The `table_info` function provides basic metadata for a table. An optional schema -can be used. Note that for `sqlite` the schema value is currently ignored. - -```python -from etlhelper.utils import table_info - -with ORACLEDB.connect("ORA_PASSWORD") as conn: - columns = table_info('my_table', conn, schema='my_schema') -``` - -The returned value is a list of named tuples of four values. Each tuple represents -one column in the table, giving its name, type, if it has a NOT NULL constraint -and if is has a DEFAULT value constraint. For example, - -```python -[ - Column(name='ID', type='NUMBER', not_null=1, has_default=0), - Column(name='VALUE', type='VARCHAR2', not_null=0, has_default=1), -] -``` - -the ID column is of type NUMBER and has a NOT NULL constraint but not a DEFAULT value, -while the VALUE column is of type VARCHAR2, can be NULL but does have a DEFAULT value. - - -## Recipes - -The following recipes demonstrate how `etlhelper` can be used. - - -### Debug SQL and monitor progress with logging - -ETL Helper provides a custom logging handler. -Time-stamped messages indicating the number of rows processed can be enabled by -setting the log level to `INFO`. -Setting the level to `DEBUG` provides information on the query that was run, -example data and the database connection. -To enable the logger, use: - -```python -import etlhelper as etl - -etl.log_to_console() -``` - -Output from a call to `copy_rows` will look like: - -``` -2019-10-07 15:06:22,411 iter_chunks: Fetching rows -2019-10-07 15:06:22,413 executemany: 1 rows processed -2019-10-07 15:06:22,416 executemany: 2 rows processed -2019-10-07 15:06:22,419 executemany: 3 rows processed -2019-10-07 15:06:22,420 iter_chunks: 3 rows returned -2019-10-07 15:06:22,420 executemany: 3 rows processed in total -``` - -Note: errors on database connections output messages may include login -credentials in clear text. - -To use the etlhelper logger directly, access it via: - -```python -import logging - -import etlhelper as etl - -etl.log_to_console() -etl_logger = logging.getLogger("etlhelper") -etl_logger.info("Hello world!") -``` - - -### Database to database copy ETL script template - -The following is a template for an ETL script. -It copies copy all the sensor readings from the previous day from an Oracle -source to PostgreSQL destination. - -```python -# copy_readings.py - -import datetime as dt -from etl_helper import copy_rows -from my_databases import ORACLEDB, POSTGRESDB - -CREATE_SQL = dedent(""" - CREATE TABLE IF NOT EXISTS sensordata.readings - ( - sensor_data_id bigint PRIMARY KEY, - measure_id bigint, - time_stamp timestamp without time zone, - meas_value double precision - ) - """).strip() - -DELETE_SQL = dedent(""" - DELETE FROM sensordata.readings - WHERE time_stamp BETWEEN %(startdate)s AND %(enddate)s - """).strip() - -SELECT_SQL = dedent(""" - SELECT id, measure_id, time_stamp, reading - FROM sensor_data - WHERE time_stamp BETWEEN :startdate AND :enddate - ORDER BY time_stamp - """).strip() - -INSERT_SQL = dedent(""" - INSERT INTO sensordata.readings (sensor_data_id, measure_id, time_stamp, - meas_value) - VALUES (%s, %s, %s, %s) - """).strip() - - -def copy_readings(startdate, enddate): - params = {'startdate': startdate, 'enddate': enddate} - - with ORACLEDB.connect("ORA_PASSWORD") as src_conn: - with POSTGRESDB.connect("PG_PASSWORD") as dest_conn: - execute(CREATE_SQL dest_conn) - execute(DELETE_SQL, dest_conn, parameters=params) - copy_rows(SELECT_SQL, src_conn, - INSERT_SQL, dest_conn, - parameters=params) - - -if __name__ == "__main__": - # Copy data from 00:00:00 yesterday to 00:00:00 today - today = dt.combine(dt.date.today(), dt.time.min) - yesterday = today - dt.timedelta(1) - - copy_readings(yesterday, today) -``` - -It is valuable to create [idempotent](https://stackoverflow.com/questions/1077412/what-is-an-idempotent-operation) scripts to ensure that they can be rerun without problems. -In this example, the "CREATE TABLE IF NOT EXISTS" command can be called repeatedly. -The DELETE_SQL command clears existing data prior to insertion to prevent duplicate key errors. -SQL syntax such as "INSERT OR UPDATE", "UPSERT" or "INSERT ... ON CONFLICT" may be more efficient, but the the exact commands depend on the target database type. - - -### Calling ETL Helper scripts from Apache Airflow - -The following is an [Apache Airflow -DAG](https://airflow.apache.org/docs/stable/concepts.html) that uses the `copy_readings` function -defined in the script above. -The Airflow scheduler will create tasks for each day since 1 August 2019 and -call `copy_readings` with the appropriate start and end times. - -```python -# readings_dag.py - -import datetime as dt -from airflow import DAG -from airflow.operators.python_operator import PythonOperator -import copy_readings - - -def copy_readings_with_args(**kwargs): - # Set arguments for copy_readings from context - start = kwargs.get('prev_execution_date') - end = kwargs.get('execution_date') - copy_readings.copy_readings(start, end) - -dag = DAG('readings', - schedule_interval=dt.timedelta(days=1), - start_date=dt.datetime(2019, 8, 1), - catchup=True) - -t1 = PythonOperator( - task_id='copy_readings', - python_callable=copy_readings_with_args, - provide_context=True, - dag=dag) -``` - - -### Spatial ETL - -No specific drivers are required for spatial data if they are transferred as -Well Known Text. - -```python -select_sql_oracle = """ - SELECT - id, - SDO_UTIL.TO_WKTGEOMETRY(geom) - FROM src - """ - -insert_sql_postgis = """ - INSERT INTO dest (id, geom) VALUES ( - %s, - ST_Transform(ST_GeomFromText(%s, 4326), 27700) - ) - """ -``` - -Other spatial operations e.g. coordinate transforms, intersections and -buffering can be carried out in the SQL. -Transform functions can manipulate geometries using the [Shapely](https://pypi.org/project/Shapely/) library. - - -### Database to API / NoSQL copy ETL script template - -`etlhelper` can be combined with Python's -[aiohttp](https://docs.aiohttp.org/en/stable/) library to create an ETL -for posting data from a database into an HTTP API. -The API could be a NoSQL document store (e.g. ElasticSearch, Cassandra) or some other -web service. - -This example transfers data from Oracle to ElasticSearch. -It uses `iter_chunks` to fetch data from the database without loading it all into -memory at once. -A custom transform function creates a dictionary structure from each row -of data. -This is "dumped" into JSON and posted to the API via `aiohttp`. - -`aiohttp` allows the records in each chunk to be posted to the API -asynchronously. -The API is often the bottleneck in such pipelines and we have seen significant -speed increases (e.g. 10x) using asynchronous transfer as opposed to posting -records in series. - - -```python -# copy_sensors_async.py -import asyncio -import datetime as dt -import json -import logging - -import aiohttp -from etlhelper import iter_chunks - -from db import ORACLE_DB - -logger = logging.getLogger("copy_sensors_async") - -SELECT_SENSORS = """ - SELECT CODE, DESCRIPTION - FROM BGS.DIC_SEN_SENSOR - WHERE date_updated BETWEEN :startdate AND :enddate - ORDER BY date_updated - """ -BASE_URL = "http://localhost:9200/" -HEADERS = {'Content-Type': 'application/json'} - - -def copy_sensors(startdate, enddate): - """Read sensors from Oracle and post to REST API.""" - logger.info("Copying sensors with timestamps from %s to %s", - startdate.isoformat(), enddate.isoformat()) - row_count = 0 - - with ORACLE_DB.connect('ORACLE_PASSWORD') as conn: - # chunks is a generator that yields lists of dictionaries - chunks = iter_chunks(SELECT_SENSORS, conn, - parameters={"startdate": startdate, - "enddate": enddate}, - transform=transform_sensors) - - for chunk in chunks: - result = asyncio.run(post_chunk(chunk)) - row_count += len(result) - logger.info("%s items transferred", row_count) - - logger.info("Transfer complete") - - -def transform_sensors(chunk): - """Transform rows to dictionaries suitable for converting to JSON.""" - new_chunk = [] - - for row in chunk: - new_row = { - 'sample_code': row.CODE, - 'description': row.DESCRIPTION, - 'metadata': { - 'source': 'ORACLE_DB', # fixed value - 'transferred_at': dt.datetime.now().isoformat() # dynamic value - } - } - logger.debug(new_row) - new_chunk.append(new_row) - - return new_chunk - - -async def post_chunk(chunk): - """Post multiple items to API asynchronously.""" - async with aiohttp.ClientSession() as session: - # Build list of tasks - tasks = [] - for item in chunk: - tasks.append(post_one(item, session)) - - # Process tasks in parallel. An exception in any will be raised. - result = await asyncio.gather(*tasks) - - return result - - -async def post_one(item, session): - """Post a single item to API using existing aiohttp Session.""" - # Post the item - response = await session.post(BASE_URL + 'sensors/_doc', headers=HEADERS, - data=json.dumps(item)) - - # Log responses before throwing errors because error info is not included - # in generated Exceptions and so cannot otherwise be seen for debugging. - if response.status >= 400: - response_text = await response.text() - logger.error('The following item failed: %s\nError message:\n(%s)', - item, response_text) - await response.raise_for_status() - - return response.status +## Installation -if __name__ == "__main__": - # Configure logging - handler = logging.StreamHandler() - formatter = logging.Formatter('%(asctime)s %(levelname)-8s %(message)s') - handler.setFormatter(formatter) - logger.setLevel(logging.INFO) - logger.addHandler(handler) +ETL Helper is available on PyPI (version 0.14.3). +Documentation for the most recent version on PyPI is [here](https://github.com/BritishGeologicalSurvey/etlhelper/tree/v0.14.3#readme). - # Copy data from 1 January 2000 to 00:00:00 today - today = dt.datetime.combine(dt.date.today(), dt.time.min) - copy_sensors(dt.datetime(2000, 1, 1), today) +```bash +pip install etlhelper ``` -In this example, failed rows will fail the whole job. Removing the -`raise_for_status()` call will let them just be logged instead. - - -### CSV load script template - -The following script is an example of using the `load` function to import data -from a CSV file into a database. -It shows how a `transform` function can perform common parsing tasks such as -renaming columns and converting timestamps into datetime objects. -The database has a `CHECK` constraint that rejects any rows with an ID -divisible by 1000. -An example `on_error` function prints the IDs of rows that fail to insert. - -```python -""" -Script to create database and load observations data from csv file. It also -demonstrates how an `on_error` function can handle failed rows. - -Generate observations.csv with: -curl 'https://sensors.bgs.ac.uk/FROST-Server/v1.1/Observations?$select=@iot.id,result,phenomenonTime&$top=20000&$resultFormat=csv' -o observations.csv -""" -import csv -import datetime as dt -from typing import Iterable, List, Tuple - -from etlhelper import execute, load, DbParams - - -def load_observations(csv_file, conn): - """Load observations from csv_file to db_file.""" - # Drop table (helps with repeated test runs!) - drop_table_sql = """ - DROP TABLE IF EXISTS observations - """ - execute(drop_table_sql, conn) - - # Create table (reject ids with no remainder when divided by 1000) - create_table_sql = """ - CREATE TABLE IF NOT EXISTS observations ( - id INTEGER PRIMARY KEY CHECK (id % 1000), - time TIMESTAMP, - result FLOAT - )""" - execute(create_table_sql, conn) +Database driver packages are not included by default and should be specified in square brackets. Options are oracle (installs oracledb), mssql (installs pyodbc) and postgres (installs psycopg2). Multiple values can be separated by commas. - # Load data - with open(csv_file, 'rt') as f: - reader = csv.DictReader(f) - load('observations', conn, reader, transform=transform, on_error=on_error) - - -# A transform function that takes an iterable of rows and returns an iterable -# of rows. -def transform(rows: Iterable[dict]) -> Iterable[dict]: - """Rename time column and convert to Python datetime.""" - for row in rows: - # Dictionaries are mutable, so rows can be modified in place. - time_value = row.pop('phenomenonTime') - row['time'] = dt.datetime.strptime(time_value, "%Y-%m-%dT%H:%M:%S.%fZ") - - return rows - - -# The on_error function is called after each chunk with all the failed rows -def on_error(failed_rows: List[Tuple[dict, Exception]]) -> None: - """Print the IDs of failed rows""" - rows, exceptions = zip(*failed_rows) - failed_ids = [row['id'] for row in rows] - print(f"Failed IDs: {failed_ids}") - - -if __name__ == "__main__": - from etlhelper import log_to_console - log_to_console() - - db = DbParams(dbtype="SQLITE", filename="observations.sqlite") - with db.connect() as conn: - load_observations('observations.csv', conn) +```bash +pip install etlhelper[oracle,postgres] ``` +See the individual database driver configuration pages for any OS-level dependencies. -### Export data to CSV -The [Pandas](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_sql.html) library can connect to databases via SQLAlchemy. -It has powerful tools for manipulating tabular data. -ETL Helper makes it easy to prepare the SQL Alchemy connection. +TODO: Add simple example script. -```python -import pandas as pd -from sqlalchemy import create_engine - -from my_databases import ORACLEDB - -engine = create_engine(ORACLEDB.get_sqlalchemy_connection_string("ORACLE_PASSWORD")) - -sql = "SELECT * FROM my_table" -df = pd.read_sql(sql, engine) -df.to_csv('my_data.csv', header=True, index=False, float_format='%.3f') -``` ## Development -### Maintainers - ETL Helper was created by and is maintained by British Geological Survey Informatics. + John A Stevenson ([volcan01010](https://github.com/volcan01010)) @@ -1113,25 +86,9 @@ ETL Helper was created by and is maintained by British Geological Survey Informa + Daniel Sutton ([kerberpolis](https://github.com/kerberpolis)) + Leo Rudczenko ([leorudczenko](https://github.com/leorudczenko)) -### Development status - -The code is still under active development and breaking changes are possible. -Users should pin the version in their dependency lists and -[watch](https://docs.github.com/en/github/managing-subscriptions-and-notifications-on-github/viewing-your-subscriptions#configuring-your-watch-settings-for-an-individual-repository) -the repository for new releases. See [CONTRIBUTING.md](CONTRIBUTING.md) for details on how to contribute. - -### Licence +## Licence ETL Helper is distributed under the [LGPL v3.0 licence](LICENSE). Copyright: © BGS / UKRI 2019 - - -## References - -+ [PEP249 DB API2](https://www.python.org/dev/peps/pep-0249/#cursor-objects) -+ [psycopg2](http://initd.org/psycopg/docs/cursor.html) -+ [oracledb](https://python-oracledb.readthedocs.io/en/latest/index.html) -+ [pyodbc](https://pypi.org/project/pyodbc/) -+ [sqlite3](https://docs.python.org/3/library/sqlite3.html)