Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQLAlchemy backlog #74

Open
6 tasks
amotl opened this issue Jun 1, 2022 · 3 comments
Open
6 tasks

SQLAlchemy backlog #74

amotl opened this issue Jun 1, 2022 · 3 comments

Comments

@amotl
Copy link
Member

amotl commented Jun 1, 2022

Hi there,

while working on crate/crate-python#391, some backlog items have accumulated. I will gather them within this ticket.

Internals

We've identified a few shortcomings in the internal implementation of the CrateDB SQLAlchemy dialect. While it seems to work in general, those spots can well be improved, in order to better align with the internal API hooks of SQLAlchemy, and how the CrateDB dialect interacts with that.

More

With kind regards,
Andreas.

@robd003
Copy link

robd003 commented Jun 20, 2022

Getting support for async SQLAlchemy would be super useful

I have a few queries that can take slightly over 1 sec to execute and being able to not block would be HUGE

@amotl
Copy link
Member Author

amotl commented Mar 30, 2023

Dear Robert,

support for asynchronous communication with SQLAlchemy, based on the asyncpg and psycopg3 drivers, is being evaluated at crate/crate-python#532. Please note that this is experimental, and we currently have no schedule about when or how this will be released.

With kind regards,
Andreas.

@amotl
Copy link
Member Author

amotl commented Jul 5, 2023

Coming from crate/crate-python#553 (comment) and crate/crate-python#553 (comment), there are a few additional backlog items for SQLAlchemy/pandas/Dask:

  • Expand support for fast-path bulk insert operations to the other DML operations UPDATE and DELETE offered by the CrateDB bulk operations endpoint, when this is sensible within a pandas/Dask environment.
  • Don't stop at to_sql(), also cover optimal read_sql() techniques within the documentation, when possible.
  • In the context of determining the available number of CPU cores on your machine, it may also make sense to refer to the os.cpu_count() or multiprocessing.cpu_count() functions, and/or the cpu-count package. 1
  • Another detail could be to elaborate a bit about the other main argument to the dask.from_pandas() function, chunksize=, which can only be used exclusively to the npartitions= argument. 2
    python ddf = dask.from_pandas(df, chunksize=CHUNKSIZE)
    In this way, you don't specify the number of partitions, but also use the chunk size as a parameter to configure the workload scheduler. This concept might fit even better with typical ETL tasks from/to database systems, which we are exploring here to make them more efficient with CrateDB.
    You'd probably use the same chunksize value here, which will also be used for configuring the outbound batch chunker to the database, but I am not sure about it yet. If this is the case, it would make its usage even easier in different scenarios, because a user would just need to configure a good chunk size, not different to the basic pandas usage at all, and not need to spend much specific thoughts on compute resources at all.

Footnotes

  1. Rationale: While you will probably know the number of cores in advance if you are professionally scheduling cluster workloads anyway, I think inquiring the number of available cores, and using that figure on demand, still makes totally sense if your program is meant to run on different environments, for example down to Jupyter notebooks, which don't reach out to a cluster, and just process fractions of the whole workload(s) on smaller workstations, but still aim to utilize their resources as good as possible, i.e. to prevent only using 4 cores while 16 would be available. Apologies for that beast of a sentence.

  2. dask.from_pandas() function

    def from_pandas(
        data: pd.DataFrame | pd.Series,
        npartitions: int | None = None,
        chunksize: int | None = None,
        sort: bool = True,
        name: str | None = None,
    ) -> DataFrame | Series:
        """
        Construct a Dask DataFrame from a Pandas DataFrame
    
        This splits an in-memory Pandas dataframe into several parts and constructs
        a dask.dataframe from those parts on which Dask.dataframe can operate in
        parallel.  By default, the input dataframe will be sorted by the index to
        produce cleanly-divided partitions (with known divisions).  To preserve the
        input ordering, make sure the input index is monotonically-increasing. The
        ``sort=False`` option will also avoid reordering, but will not result in
        known divisions.
    
        Note that, despite parallelism, Dask.dataframe may not always be faster
        than Pandas.  We recommend that you stay with Pandas for as long as
        possible before switching to Dask.dataframe.
    
        npartitions : int, optional
            The number of partitions of the index to create. Note that if there
            are duplicate values or insufficient elements in ``data.index``, the
            output may have fewer partitions than requested.
        chunksize : int, optional
            The desired number of rows per index partition to use. Note that
            depending on the size and index of the dataframe, actual partition
            sizes may vary.
        """
    

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants