Skip to content

Commit

Permalink
SQLAlchemy: Add documentation and tests for usage with Dask
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Jul 6, 2023
1 parent 061efb0 commit 4aeb98c
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 4 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ Unreleased
- SQLAlchemy: Added ``insert_bulk`` fast-path ``INSERT`` method for pandas, in
order to support efficient batch inserts using CrateDB's "bulk operations" endpoint.

- SQLAlchemy: Add documentation and software tests for usage with Dask


2023/04/18 0.31.1
=================
Expand Down
124 changes: 120 additions & 4 deletions docs/by-example/sqlalchemy/dataframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,20 @@
SQLAlchemy: DataFrame operations
================================

.. rubric:: Table of Contents

.. contents::
:local:


About
=====

This section of the documentation demonstrates support for efficient batch/bulk
``INSERT`` operations with `pandas`_ and `Dask`_, using the CrateDB SQLAlchemy dialect.

Efficient bulk operations are needed for typical `ETL`_ batch processing and
data streaming workloads, for example to move data in- and out of OLAP data
data streaming workloads, for example to move data in and out of OLAP data
warehouses, as contrasted to interactive online transaction processing (OLTP)
applications. The strategies of `batching`_ together series of records for
improving performance are also referred to as `chunking`_.
Expand All @@ -21,6 +27,8 @@ improving performance are also referred to as `chunking`_.
Introduction
============

pandas
------
The :ref:`pandas DataFrame <pandas:api.dataframe>` is a structure that contains
two-dimensional data and its corresponding labels. DataFrames are widely used
in data science, machine learning, scientific computing, and many other
Expand All @@ -34,11 +42,29 @@ powerful than tables or spreadsheets because they are an integral part of the
The :ref:`pandas I/O subsystem <pandas:api.io>` for `relational databases`_
using `SQL`_ is based on `SQLAlchemy`_.

Dask
----
`Dask`_ is a flexible library for parallel computing in Python, which scales
Python code from multi-core local machines to large distributed clusters in
the cloud. Dask provides a familiar user interface by mirroring the APIs of
other libraries in the PyData ecosystem, including `pandas`_, `scikit-learn`_,
and `NumPy`_.

.. rubric:: Table of Contents
A :doc:`dask:dataframe` is a large parallel DataFrame composed of many smaller
pandas DataFrames, split along the index. These pandas DataFrames may live on
disk for larger-than-memory computing on a single machine, or on many different
machines in a cluster. One Dask DataFrame operation triggers many operations on
the constituent pandas DataFrames.

.. contents::
:local:

Compatibility notes
===================

.. NOTE::

Please note that DataFrame support for pandas and Dask is only validated
with Python 3.8 and higher, and SQLAlchemy 1.4 and higher. We recommend
to use the most recent versions of those libraries.


Efficient ``INSERT`` operations with pandas
Expand Down Expand Up @@ -118,6 +144,91 @@ workload across multiple batches, using a defined chunk size.
tutorial <wide-narrow-pandas-tutorial_>`_ about the same topic.


Efficient ``INSERT`` operations with Dask
=========================================

The same ``bulk_insert`` function presented in the previous section will also
be used in the context of `Dask`_, in order to make the
:func:`dask:dask.dataframe.to_sql` method more efficiently, based on the
`CrateDB bulk operations`_ endpoint.

The example below will partition your insert workload into equal-sized parts, and
schedule it to be executed on Dask cluster resources, using a defined number of
compute partitions. Each worker instance will then insert its partition's records
in a batched/chunked manner, using a defined chunk size, effectively using the
pandas implementation introduced in the previous section.

>>> import dask.dataframe as dd
>>> from pandas._testing import makeTimeDataFrame
>>> from crate.client.sqlalchemy.support import insert_bulk
...
>>> # Define the number of records, the number of computing partitions,
>>> # and the chunk size of each database insert operation.
>>> INSERT_RECORDS = 100
>>> NPARTITIONS = 4
>>> CHUNK_SIZE = 25
...
>>> # Create a Dask DataFrame.
>>> df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S")
>>> ddf = dd.from_pandas(df, npartitions=NPARTITIONS)
...
>>> # Insert content of DataFrame using multiple workers on a
>>> # compute cluster, transferred using batches of records.
>>> ddf.to_sql(
... name="test-testdrive",
... uri=f"crate://{crate_host}",
... if_exists="replace",
... index=False,
... chunksize=CHUNK_SIZE,
... method=insert_bulk,
... parallel=True,
... )


.. TIP::

You will observe that optimizing your workload will now also involve determining a
good value for the ``NPARTITIONS`` argument, based on the capacity and topology of
the available compute resources, and based on workload characteristics or policies
like peak- vs. balanced- vs. shared-usage. For example, on a machine or cluster fully
dedicated to the problem at hand, you may want to use all available processor cores,
while on a shared system, this strategy may not be appropriate.

If you want to dedicate all available compute resources on your machine, you may want
to use the number of CPU cores as a value to the ``NPARTITIONS`` argument. You can find
out about the available CPU cores on your machine, for example by running the ``nproc``
command in your terminal.

Depending on the implementation and runtime behavior of the compute task, the optimal
number of worker processes, determined by the ``NPARTITIONS`` argument, also needs to be
figured out by running a few test iterations. For that purpose, you can use the
`insert_dask.py`_ program as a blueprint.

Adjusting this value in both directions is perfectly fine: If you observe that you are
overloading the machine, maybe because there are workloads scheduled other than the one
you are running, try to reduce the value. If fragments/steps of your implementation
involve waiting for network or disk I/O, you may want to increase the number of workers
beyond the number of available CPU cores, to increase utilization. On the other hand,
you should be wary about not over-committing resources too much, as it may slow your
system down.

Before getting more serious with Dask, you are welcome to read and watch the excellent
:doc:`dask:best-practices` and :ref:`dask:dataframe.performance` resources, in order to
learn about things to avoid, and beyond. For finding out if your compute workload
scheduling is healthy, you can, for example, use Dask's :doc:`dask:dashboard`.

.. WARNING::

Because the settings assigned in the example above fit together well, the ``to_sql()``
instruction will effectively run four insert operations, executed in parallel, and
scheduled optimally on the available cluster resources.

However, not using those settings sensibly, you can easily misconfigure the resource
scheduling system, and overload the underlying hardware or operating system, virtualized
or not. This is why experimenting with different parameters, and a real dataset, is crucial.



.. hidden: Disconnect from database
>>> engine.dispose()
Expand All @@ -126,14 +237,19 @@ workload across multiple batches, using a defined chunk size.
.. _batching: https://en.wikipedia.org/wiki/Batch_processing#Common_batch_processing_usage
.. _chunking: https://en.wikipedia.org/wiki/Chunking_(computing)
.. _CrateDB bulk operations: https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
.. _Dask: https://en.wikipedia.org/wiki/Dask_(software)
.. _DataFrame computing: https://realpython.com/pandas-dataframe/
.. _ETL: https://en.wikipedia.org/wiki/Extract,_transform,_load
.. _insert_dask.py: https://github.com/crate/cratedb-examples/blob/main/by-language/python-sqlalchemy/insert_dask.py
.. _insert_pandas.py: https://github.com/crate/cratedb-examples/blob/main/by-language/python-sqlalchemy/insert_pandas.py
.. _leveling up to 200_000: https://acepor.github.io/2017/08/03/using-chunksize/
.. _NumPy: https://en.wikipedia.org/wiki/NumPy
.. _pandas: https://en.wikipedia.org/wiki/Pandas_(software)
.. _pandas DataFrame: https://pandas.pydata.org/pandas-docs/stable/reference/frame.html
.. _Python: https://en.wikipedia.org/wiki/Python_(programming_language)
.. _relational databases: https://en.wikipedia.org/wiki/Relational_database
.. _scikit-learn: https://en.wikipedia.org/wiki/Scikit-learn
.. _SNAT port exhaustion: https://learn.microsoft.com/en-us/azure/load-balancer/troubleshoot-outbound-connection
.. _SQL: https://en.wikipedia.org/wiki/SQL
.. _SQLAlchemy: https://aosabook.org/en/v2/sqlalchemy.html
.. _the chunksize should not be too small: https://acepor.github.io/2017/08/03/using-chunksize/
Expand Down
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
'py': ('https://docs.python.org/3/', None),
'sa': ('https://docs.sqlalchemy.org/en/14/', None),
'urllib3': ('https://urllib3.readthedocs.io/en/1.26.13/', None),
'dask': ('https://docs.dask.org/en/stable/', None),
'pandas': ('https://pandas.pydata.org/docs/', None),
})

Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def read(path):
'zope.testrunner>=5,<7',
'zc.customdoctests>=1.0.1,<2',
'createcoverage>=1,<2',
'dask',
'stopit>=1.1.2,<2',
'flake8>=4,<7',
'pandas',
Expand Down
47 changes: 47 additions & 0 deletions src/crate/client/sqlalchemy/tests/bulk_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,50 @@ def test_bulk_save_pandas(self, mock_cursor):

# Verify number of batches.
self.assertEqual(effective_op_count, OPCOUNT)

@skipIf(sys.version_info < (3, 8), "SQLAlchemy/Dask is not supported on Python <3.8")
@skipIf(SA_VERSION < SA_1_4, "SQLAlchemy 1.3 is not supported by pandas")
@patch('crate.client.connection.Cursor', mock_cursor=FakeCursor)
def test_bulk_save_dask(self, mock_cursor):
"""
Verify bulk INSERT with Dask.
"""
import dask.dataframe as dd
from pandas._testing import makeTimeDataFrame
from crate.client.sqlalchemy.support import insert_bulk

# 42 records / 4 partitions means each partition has a size of 10.5 elements.
# Because the chunk size 8 is slightly smaller than 10, the partition will not
# fit into it, so two batches will be emitted to the database for each data
# partition. 4 partitions * 2 batches = 8 insert operations will be emitted.
# Those settings are a perfect example of non-optimal settings, and have been
# made so on purpose, in order to demonstrate that using optimal settings
# is crucial.
INSERT_RECORDS = 42
NPARTITIONS = 4
CHUNK_SIZE = 8
OPCOUNT = math.ceil(INSERT_RECORDS / NPARTITIONS / CHUNK_SIZE) * NPARTITIONS

# Create a DataFrame to feed into the database.
df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S")
ddf = dd.from_pandas(df, npartitions=NPARTITIONS)

dburi = "crate://localhost:4200"
retval = ddf.to_sql(
name="test-testdrive",
uri=dburi,
if_exists="replace",
index=False,
chunksize=CHUNK_SIZE,
method=insert_bulk,
parallel=True,
)
self.assertIsNone(retval)

# Each of the insert operation incurs another call to the cursor object. This is probably
# the initial connection from the DB-API driver, to inquire the database version.
# This compensation formula has been determined empirically / by educated guessing.
effective_op_count = (mock_cursor.call_count - 2 * NPARTITIONS) - 2

# Verify number of batches.
self.assertEqual(effective_op_count, OPCOUNT)

0 comments on commit 4aeb98c

Please sign in to comment.