Skip to content

Commit

Permalink
Refactor load section
Browse files Browse the repository at this point in the history
  • Loading branch information
volcan01010 committed May 14, 2024
1 parent 3890bdb commit e019121
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 54 deletions.
16 changes: 8 additions & 8 deletions docs/demo_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
)"""


select_sql = "SELECT name FROM igneous_rock"


def transform(chunk):
for row in chunk:
new_row = {
"name": row["name"],
"category": "igneous",
"last_update": dt.datetime.now()
}
yield new_row
row['category'] = 'igneous'
row['last_update'] = dt.datetime.now()
yield row


etl.log_to_console()
Expand All @@ -33,8 +33,8 @@ def transform(chunk):
etl.execute(create_sql, dest)

# Copy data
rows = etl.copy_table_rows('igneous_rock', src, dest,
target='rock', transform=transform)
rows = etl.iter_rows(select_sql, src, transform=transform)
etl.load('rock', dest, rows)

# Confirm transfer
for row in etl.fetchall('SELECT * FROM rock', dest):
Expand Down
5 changes: 0 additions & 5 deletions docs/etl_functions/extract.rst
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,3 @@ This reads rows from the database in *chunks* to prevent them all being
loaded into memory at once.
The ``chunk_size`` argument sets the number of rows in each chunk.
The default ``chunk_size`` is 5000.

Return values
-------------

TODO!
136 changes: 98 additions & 38 deletions docs/etl_functions/load.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,60 +2,120 @@
Load
^^^^

Insert rows
-----------

``execute`` can be used to insert a single row or to execute other
single statements e.g. “CREATE TABLE …”. The ``executemany`` function is
used to insert multiple rows of data. Large datasets are broken into
chunks and inserted in batches to reduce the number of queries. The
INSERT query must container placeholders with an appropriate format for
the input data e.g. positional for tuples, named for dictionaries. The
number of rows that were processed and the number that failed is
returned.
Functions
---------

ETL Helper provides two functions for loading of data.

- :func:`load() <etlhelper.load>`: Inserts data from an iterable of dictionaries
or namedtuples into an existing target table.

.. code:: python
import sqlite3
import etlhelper as etl
rows = [
{"name": "basalt", "grain_size": "fine"},
{"name": "granite", "grain_size": "coarse"}
]
with sqlite3.connect('igneous_rocks.db') as conn:
# Note that table must already exist
processed, failed = etl.load('igneous_rock', conn, rows)
NOTE: the ``load`` function uses the first row of data to generate the
list of column for the insert query. If later items in the data contain
extra columns, those columns will not be inserted and no error will be
raised.

- :func:`executemany() <etlhelper.executemany>`: Applies SQL query with parameters
supplied by iterable of data. Customising the SQL query allows fine control.

.. code:: python
import sqlite3
import etlhelper as etl
rows = [
{"name": "basalt", "grain_size": "fine"},
{"name": "granite", "grain_size": "coarse"}
]
# Insert query changes case and adds update_at column
insert_sql = """
INSERT INTO igneous_rocks (name, grain_size, updated_at)
VALUES (:name, UPPER(:grainsize), datetime('now'))
"""
with sqlite3.connect('igneous_rocks.db') as conn:
# Note that table must already exist
processed, failed = etl.executemany(insert_sql, conn, rows)
The INSERT query must container placeholders with an appropriate format for
the input data e.g. positional for tuples, named for dictionaries.

.. code:: python
from etlhelper import executemany
rows = [(1, 'value'), (2, 'another value')]
insert_sql = "INSERT INTO some_table (col1, col2) VALUES (%s, %s)"
rows = [("basalt", "fine"), ("granite", "coarse")]
# Positional placeholders for data in tuple format
insert_sql = """
INSERT INTO igneous_rocks (name, grain_size, updated_at)
VALUES (?, UPPER(?), datetime('now'))
"""
with sqlite3.connect('igneous_rocks.db') as conn:
# Note that table must already exist
processed, failed = etl.executemany(insert_sql, conn, rows)
:func:`executemany() <etlhelper.executemany>` can also be used with UPDATE commands.

with POSTGRESDB.connect('PGPASSWORD') as conn:
processed, failed = executemany(insert_sql, conn, rows, chunk_size=1000)
Keyword arguments
-----------------

chunk_size
""""""""""

:func:`load() <etlhelper.load>` uses :func:`executemany() <etlhelper.executemany>`
behind the scenes.
Large datasets are broken into chunks and inserted in batches to reduce the number
of queries, and only one chunk of data is held in memory at any time.
Within a data processing pipeline, the next step can begin as soon as the first
chunk has been processed.
The database connection must remain open until all data have been processed.

The ``chunk_size`` default is 5,000 and it can be set with a keyword
argument. The ``commit_chunks`` flag defaults to ``True``. This ensures
argument.

commit_chunks
"""""""""""""

The ``commit_chunks`` flag defaults to ``True``. This ensures
that an error during a large data transfer doesn’t require all the
records to be sent again. Some work may be required to determine which
records remain to be sent. Setting ``commit_chunks`` to ``False`` will
roll back the entire transfer in case of an error.

Some database engines can return autogenerated values (e.g. primary key
IDs) after INSERT statements. To capture these values, use the
``fetchone`` method to execute the SQL command instead.

.. code:: python
transform
"""""""""

insert_sql = "INSERT INTO my_table (message) VALUES ('hello') RETURNING id"
The ``transform`` parameter takes a callable (e.g. function) that
transforms the data before returning it.
See the :ref:`Transform <transform>` section for details.

with POSTGRESDB.connect('PGPASSWORD') as conn:
result = fetchone(insert_sql, conn)
on_error
""""""""

print(result.id)
Accepts a Python function that will be applied to failed rows.
See on_error section for details. ADD LINK

The ``load`` function is similar to ``executemany`` except that it
autogenerates an insert query based on the data provided. It uses
``generate_insert_query`` to remove the need to explicitly write the
query for simple cases. By calling this function manually, users can
create a base insert query that can be extended with clauses such as
``ON CONFLICT DO NOTHING``.

NOTE: the ``load`` function uses the first row of data to generate the
list of column for the insert query. If later items in the data contain
extra columns, those columns will not be inserted and no error will be
raised.
Return values
-------------

As ``generate_insert_query`` creates SQL statements from user-provided
input, it checks the table and column names to ensure that they only
contain valid characters.
The number of rows that were processed and the number that failed is
returned as a tuple.
8 changes: 5 additions & 3 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ ETL Helper is a Python ETL (Extract, Transform, Load) library to simplify data t

ETL Helper makes it easy to run SQL queries via Python and return the
results.
It is built upon the `DBAPI2 specification <https://www.python.org/dev/peps/pep-0249/>`__
and takes care of cursor management, importing drivers and formatting connection strings,
It takes care of cursor management, importing drivers and formatting connection strings,
while providing memory-efficient functions to read, write and transform data.
This reduces the amount of boilerplate code required to manipulate data within relational
databases with Python.
Expand All @@ -42,15 +41,18 @@ Features
- ``fetchall``, ``iter_rows``, ``fetchone`` functions for
querying databases
- Data transfer uses memory-efficient generators (``iter_chunks`` and ``iter_rows``)
- ``execute``, ``executemany``, and ``load`` functions to insert or update data
- ``executemany``, and ``load`` functions to insert or update data
- ``copy_rows`` and ``copy_table_rows`` to transfer data between databases
- User-defined transform functions transform data in-flight
- ``execute`` function for one-off commands
- Helpful error messages display the failed query SQL
- ``on_error`` function to process rows that fail to insert
- ``DbParams`` objects provide consistent way to connect to different
database types (currently Oracle, PostgreSQL, SQLite and MS SQL
Server)
- Timestamped log messages for tracking long-running data transfers
- Built upon the `DBAPI2 specification <https://www.python.org/dev/peps/pep-0249/>`__
for database drivers in Python

These tools can create easy-to-understand, lightweight, versionable and
testable Extract-Transform-Load (ETL) workflows.
Expand Down
35 changes: 35 additions & 0 deletions docs/utilities.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,27 @@ Utilities
ETL Helper provides utility functions to provide logging information, table metadata
and to allow flow control in threaded workflows.

Execute
^^^^^^^

:func:`execute() <etlhelper.execute>` can be used to insert a single row or to execute other
single statements e.g. “CREATE TABLE …”.

Some database engines can return autogenerated values (e.g. primary key
IDs) after INSERT statements. To capture these values, use the
``fetchone`` method to execute the SQL command instead.

.. code:: python
import etlhelper as etl
insert_sql = "INSERT INTO my_table (message) VALUES ('hello') RETURNING id"
with POSTGRESDB.connect('PGPASSWORD') as conn:
result = etl.fetchone(insert_sql, conn)
print(result['id'])
Log to console
^^^^^^^^^^^^^^
Expand Down Expand Up @@ -80,6 +101,20 @@ DEFAULT value, while the VALUE column is of type VARCHAR2, can be NULL
but does have a DEFAULT value.


Generate INSERT SQL
^^^^^^^^^^^^^^^^^^^

The ``generate_insert_query`` is used by the ``load`` function to remove
the need to explicitly write the INSERT query for simple cases.
By calling this function manually, users can create a base insert query
that can be extended with clauses such as ``ON CONFLICT DO NOTHING`` (See
Error Handling for more info).

As ``generate_insert_query`` creates SQL statements from user-provided
input, it checks the table and column names to ensure that they only
contain valid characters.


Aborting running jobs
^^^^^^^^^^^^^^^^^^^^^

Expand Down

0 comments on commit e019121

Please sign in to comment.