From 62fd630aab15f8e411839200e8fd84b8365c9cf2 Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Thu, 19 Dec 2024 18:42:43 +0100 Subject: [PATCH] Upgrade deltalake and reduce chunk size --- .../data_imports/pipelines/sql_database/helpers.py | 2 +- .../data_imports/pipelines/sql_database_v2/__init__.py | 3 ++- .../data_imports/pipelines/sql_database_v2/helpers.py | 6 ++++-- .../data_imports/pipelines/sql_database_v2/settings.py | 1 + requirements.in | 2 +- requirements.txt | 7 ++----- 6 files changed, 11 insertions(+), 10 deletions(-) create mode 100644 posthog/temporal/data_imports/pipelines/sql_database_v2/settings.py diff --git a/posthog/temporal/data_imports/pipelines/sql_database/helpers.py b/posthog/temporal/data_imports/pipelines/sql_database/helpers.py index 0400a60b32fd5..9bf72a26f3c1e 100644 --- a/posthog/temporal/data_imports/pipelines/sql_database/helpers.py +++ b/posthog/temporal/data_imports/pipelines/sql_database/helpers.py @@ -24,7 +24,7 @@ def __init__( self, engine: Engine, table: Table, - chunk_size: int = 1000, + chunk_size: int = DEFAULT_CHUNK_SIZE, incremental: Optional[dlt.sources.incremental[Any]] = None, connect_args: Optional[list[str]] = None, db_incremental_field_last_value: Optional[Any] = None, diff --git a/posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py b/posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py index a3fc1c6b2838b..de868d62ee51d 100644 --- a/posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py +++ b/posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py @@ -15,6 +15,7 @@ from dlt.sources.credentials import ConnectionStringCredentials from posthog.settings.utils import get_from_env +from posthog.temporal.data_imports.pipelines.sql_database_v2.settings import DEFAULT_CHUNK_SIZE from posthog.temporal.data_imports.pipelines.sql_database_v2._json import BigQueryJSON from posthog.utils import str_to_bool from posthog.warehouse.models import ExternalDataSource @@ -252,7 +253,7 @@ def sql_database( schema: Optional[str] = dlt.config.value, metadata: Optional[MetaData] = None, table_names: Optional[list[str]] = dlt.config.value, - chunk_size: int = 50000, + chunk_size: int = DEFAULT_CHUNK_SIZE, backend: TableBackend = "pyarrow", detect_precision_hints: Optional[bool] = False, reflection_level: Optional[ReflectionLevel] = "full", diff --git a/posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py b/posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py index acd64c97aae99..74a79650caa15 100644 --- a/posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py +++ b/posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py @@ -18,6 +18,8 @@ from dlt.sources.credentials import ConnectionStringCredentials +from posthog.temporal.data_imports.pipelines.sql_database_v2.settings import DEFAULT_CHUNK_SIZE + from .arrow_helpers import row_tuples_to_arrow from .schema_types import ( default_table_adapter, @@ -44,7 +46,7 @@ def __init__( backend: TableBackend, table: Table, columns: TTableSchemaColumns, - chunk_size: int = 1000, + chunk_size: int = DEFAULT_CHUNK_SIZE, incremental: Optional[dlt.sources.incremental[Any]] = None, db_incremental_field_last_value: Optional[Any] = None, query_adapter_callback: Optional[TQueryAdapter] = None, @@ -302,7 +304,7 @@ class SqlTableResourceConfiguration(BaseConfiguration): table: Optional[str] = None schema: Optional[str] = None incremental: Optional[dlt.sources.incremental] = None # type: ignore[type-arg] - chunk_size: int = 50000 + chunk_size: int = DEFAULT_CHUNK_SIZE backend: TableBackend = "sqlalchemy" detect_precision_hints: Optional[bool] = None defer_table_reflect: Optional[bool] = False diff --git a/posthog/temporal/data_imports/pipelines/sql_database_v2/settings.py b/posthog/temporal/data_imports/pipelines/sql_database_v2/settings.py new file mode 100644 index 0000000000000..d730961c096e8 --- /dev/null +++ b/posthog/temporal/data_imports/pipelines/sql_database_v2/settings.py @@ -0,0 +1 @@ +DEFAULT_CHUNK_SIZE = 10_000 diff --git a/requirements.in b/requirements.in index faefd16d9294d..8c0b98d16587b 100644 --- a/requirements.in +++ b/requirements.in @@ -16,6 +16,7 @@ clickhouse-driver==0.2.7 clickhouse-pool==0.5.3 conditional-cache==1.2 cryptography==39.0.2 +deltalake==0.22.3 dj-database-url==0.5.0 Django~=4.2.15 django-axes==5.9.0 @@ -34,7 +35,6 @@ djangorestframework==3.15.1 djangorestframework-csv==2.1.1 djangorestframework-dataclasses==1.2.0 dlt==1.3.0 -dlt[deltalake]==1.3.0 dnspython==2.2.1 drf-exceptions-hog==0.4.0 drf-extensions==0.7.0 diff --git a/requirements.txt b/requirements.txt index 639c98066ccd4..e4a1521f8dd7a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -138,8 +138,8 @@ defusedxml==0.6.0 # via # python3-openid # social-auth-core -deltalake==0.19.1 - # via dlt +deltalake==0.22.3 + # via -r requirements.in distro==1.9.0 # via openai dj-database-url==0.5.0 @@ -273,8 +273,6 @@ googleapis-common-protos==1.60.0 # via # google-api-core # grpcio-status -greenlet==3.1.1 - # via sqlalchemy grpcio==1.63.2 # via # -r requirements.in @@ -505,7 +503,6 @@ pyarrow==17.0.0 # via # -r requirements.in # deltalake - # dlt # sqlalchemy-bigquery pyasn1==0.5.0 # via