Skip to content

Commit

Permalink
SQLAlchemy: Add documentation and tests for usage with Dask
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed May 15, 2023
1 parent a6c64ab commit 7da9592
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Unreleased

- SQLAlchemy: Added ``insert_bulk`` fast-path ``INSERT`` method for pandas, in
order to support efficient batch inserts using CrateDB's bulk operations endpoint.
- SQLAlchemy: Add documentation and software tests for usage with Dask


2023/04/18 0.31.1
Expand Down
121 changes: 118 additions & 3 deletions docs/by-example/sqlalchemy/dataframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@
SQLAlchemy: DataFrame operations
================================

.. rubric:: Table of Contents

.. contents::
:local:


About
=====

Expand All @@ -21,6 +27,8 @@ improving performance are also referred to as `chunking`_.
Introduction
============

pandas
------
The :ref:`pandas DataFrame <pandas:api.dataframe>` is a structure that contains
two-dimensional data and its corresponding labels. DataFrames are widely used
in data science, machine learning, scientific computing, and many other
Expand All @@ -34,11 +42,29 @@ powerful than tables or spreadsheets because they are an integral part of the
The :ref:`pandas I/O subsystem <pandas:api.io>` for `relational databases`_
using `SQL`_ is based on `SQLAlchemy`_.

Dask
----
`Dask`_ is a flexible library for parallel computing in Python, which scales
Python code from multi-core local machines to large distributed clusters in
the cloud. Dask provides a familiar user interface by mirroring the APIs of
other libraries in the PyData ecosystem, including `pandas`_, `scikit-learn`_,
and `NumPy`_.

.. rubric:: Table of Contents
A :doc:`dask:dataframe` is a large parallel DataFrame composed of many smaller
pandas DataFrames, split along the index. These pandas DataFrames may live on
disk for larger-than-memory computing on a single machine, or on many different
machines in a cluster. One Dask DataFrame operation triggers many operations on
the constituent pandas DataFrames.

.. contents::
:local:

Compatibility notes
===================

.. NOTE::

Please note that DataFrame support for pandas and Dask is only validated
with Python 3.8 and higher, and SQLAlchemy 1.4 and higher. We recommend
to use the most recent versions of those libraries.


Efficient ``INSERT`` operations with pandas
Expand Down Expand Up @@ -118,6 +144,90 @@ workload across multiple batches, using a defined chunk size.
tutorial <wide-narrow-pandas-tutorial_>`_ about the same topic.


Efficient ``INSERT`` operations with Dask
=========================================

The same ``bulk_insert`` function presented in the previous section will also
be used in the context of `Dask`_, in order to make the
:func:`dask:dask.dataframe.to_sql` method more efficiently, based on the
`CrateDB bulk operations`_ endpoint.

The example below will partition your insert workload into equal-sized parts, and
schedule it to be executed on Dask cluster resources, using a defined number of
compute partitions. Each worker instance will then insert its partition's records
in a batched/chunked manner, using a defined chunk size, effectively using the
pandas implementation introduced in the previous section.

>>> import dask.dataframe as dd
>>> from pandas._testing import makeTimeDataFrame
>>> from crate.client.sqlalchemy.support import insert_bulk
...
>>> # Define the number of records, the number of computing partitions,
>>> # and the chunk size of each database insert operation.
>>> INSERT_RECORDS = 100
>>> NPARTITIONS = 4
>>> CHUNK_SIZE = 25
...
>>> # Create a Dask DataFrame.
>>> df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S")
>>> ddf = dd.from_pandas(df, npartitions=NPARTITIONS)
...
>>> # Insert content of DataFrame using multiple workers on a
>>> # compute cluster, transferred using batches of records.
>>> ddf.to_sql(
... name="test-testdrive",
... uri=f"crate://{crate_host}",
... if_exists="replace",
... index=False,
... chunksize=CHUNK_SIZE,
... method=insert_bulk,
... parallel=True,
... )


.. TIP::

You will observe that optimizing your workload will now also involve determining a
good value for the ``NPARTITIONS`` argument, based on the capacity and topology of
the available compute resources , and based on workload characteristics or policies
like peak- vs. balanced- vs. shared-usage. For example, on a machine or cluster fully
dedicated to the problem at hand, you may want to use all available processor cores,
while on a shared system, this strategy may not be appropriate.

If you want to dedicate all available compute resources on your machine, you may want
to use the number of CPU cores as a value to the ``NPARTITIONS`` argument. You can find
out about the available CPU core on your machine, for example by running the ``nproc``
command in your terminal.

Depending on the implementation and runtime behavior of the compute task, the optimal
number of worker processes, determined by the ``NPARTITIONS`` argument, also needs to be
figured out by running a few test iterations. For that purpose, you can use the
`insert_dask.py`_ program as a blueprint.

Adjusting this value in both directions is perfectly fine: If you observe that you are
overloading the machine, maybe because there are workloads scheduled other than the one
you are running, try to reduce the value. If fragments/steps of your implementation
involve waiting for network or disk I/O, you may want to increase the number of workers
beyond the number of available CPU cores, to increase utilization. On the other hand,
you should be wary about not over-committing resources.

Before getting more serious with Dask, you are welcome to read and watch the excellent
:doc:`dask:best-practices`, in order to learn about things to avoid, and beyond. For
finding out if your compute workload scheduling is healthy, you can, for example, use
Dask's :doc:`dask:dashboard`.

.. WARNING::

Because the settings assigned in the example above fit together well, the ``to_sql()``
instruction will effectively run four insert operations, executed in parallel, and
scheduled optimally on the available cluster resources.

However, not using those settings sensibly, you can easily misconfigure the resource
scheduling system, and overload the underlying hardware, virtualized or not. This is
why experimenting with different parameters, and a real dataset, is crucial.



.. hidden: Disconnect from database
>>> engine.dispose()
Expand All @@ -126,14 +236,19 @@ workload across multiple batches, using a defined chunk size.
.. _batching: https://en.wikipedia.org/wiki/Batch_processing#Common_batch_processing_usage
.. _chunking: https://en.wikipedia.org/wiki/Chunking_(computing)
.. _CrateDB bulk operations: https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
.. _Dask: https://en.wikipedia.org/wiki/Dask_(software)
.. _DataFrame computing: https://realpython.com/pandas-dataframe/
.. _ETL: https://en.wikipedia.org/wiki/Extract,_transform,_load
.. _insert_dask.py: https://github.com/crate/cratedb-examples/blob/main/by-language/python-sqlalchemy/insert_dask.py
.. _insert_pandas.py: https://github.com/crate/cratedb-examples/blob/main/by-language/python-sqlalchemy/insert_pandas.py
.. _leveling up to 200_000: https://acepor.github.io/2017/08/03/using-chunksize/
.. _NumPy: https://en.wikipedia.org/wiki/NumPy
.. _pandas: https://en.wikipedia.org/wiki/Pandas_(software)
.. _pandas DataFrame: https://pandas.pydata.org/pandas-docs/stable/reference/frame.html
.. _Python: https://en.wikipedia.org/wiki/Python_(programming_language)
.. _relational databases: https://en.wikipedia.org/wiki/Relational_database
.. _scikit-learn: https://en.wikipedia.org/wiki/Scikit-learn
.. _SNAT port exhaustion: https://learn.microsoft.com/en-us/azure/load-balancer/troubleshoot-outbound-connection
.. _SQL: https://en.wikipedia.org/wiki/SQL
.. _SQLAlchemy: https://aosabook.org/en/v2/sqlalchemy.html
.. _the chunksize should not be too small: https://acepor.github.io/2017/08/03/using-chunksize/
Expand Down
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
'py': ('https://docs.python.org/3/', None),
'sa': ('https://docs.sqlalchemy.org/en/14/', None),
'urllib3': ('https://urllib3.readthedocs.io/en/1.26.13/', None),
'dask': ('https://docs.dask.org/en/stable/', None),
'pandas': ('https://pandas.pydata.org/docs/', None),
})

Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def read(path):
'zope.testrunner>=5,<7',
'zc.customdoctests>=1.0.1,<2',
'createcoverage>=1,<2',
'dask',
'stopit>=1.1.2,<2',
'flake8>=4,<7',
'pandas',
Expand Down
44 changes: 44 additions & 0 deletions src/crate/client/sqlalchemy/tests/bulk_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,47 @@ def test_bulk_save_pandas(self, mock_cursor):

# Verify number of batches.
self.assertEqual(effective_op_count, OPCOUNT)

@skipIf(sys.version_info < (3, 8), "SQLAlchemy/Dask is not supported on Python <3.8")
@skipIf(SA_VERSION < SA_1_4, "SQLAlchemy 1.3 is not supported by pandas")
@patch('crate.client.connection.Cursor', mock_cursor=FakeCursor)
def test_bulk_save_dask(self, mock_cursor):
"""
Verify bulk INSERT with Dask.
"""
import dask.dataframe as dd
from pandas._testing import makeTimeDataFrame
from crate.client.sqlalchemy.support import insert_bulk

# 42 records / 4 partitions means each partition has a size of 10.5 elements.
# Because the chunk size 8 is slightly smaller than 10, the partition will not
# fit into it, so two batches will be emitted to the database for each data
# partition. 4 partitions * 2 batches = 8 insert operations will be emitted.
INSERT_RECORDS = 42
NPARTITIONS = 4
CHUNK_SIZE = 8
OPCOUNT = math.ceil(INSERT_RECORDS / NPARTITIONS / CHUNK_SIZE) * NPARTITIONS

# Create a DataFrame to feed into the database.
df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S")
ddf = dd.from_pandas(df, npartitions=NPARTITIONS)

dburi = "crate://localhost:4200"
retval = ddf.to_sql(
name="test-testdrive",
uri=dburi,
if_exists="replace",
index=False,
chunksize=CHUNK_SIZE,
method=insert_bulk,
parallel=True,
)
self.assertIsNone(retval)

# Each of the insert operation incurs another call to the cursor object. This is probably
# the initial connection from the DB-API driver, to inquire the database version.
# This compensation formula has been determined empirically / by educated guessing.
effective_op_count = (mock_cursor.call_count - 2 * NPARTITIONS) - 2

# Verify number of batches.
self.assertEqual(effective_op_count, OPCOUNT)

0 comments on commit 7da9592

Please sign in to comment.