diff --git a/posthog/temporal/data_imports/pipelines/salesforce/__init__.py b/posthog/temporal/data_imports/pipelines/salesforce/__init__.py index c1e457ece8e55..a918077af63f5 100644 --- a/posthog/temporal/data_imports/pipelines/salesforce/__init__.py +++ b/posthog/temporal/data_imports/pipelines/salesforce/__init__.py @@ -1,3 +1,4 @@ +from typing import Any, Optional import dlt from dlt.sources.helpers.rest_client.paginators import BasePaginator from dlt.sources.helpers.requests import Response, Request @@ -157,7 +158,7 @@ def __init__(self, instance_url): super().__init__() self.instance_url = instance_url - def update_state(self, response: Response) -> None: + def update_state(self, response: Response, data: Optional[list[Any]] = None) -> None: res = response.json() self._next_page = None diff --git a/posthog/temporal/data_imports/pipelines/stripe/__init__.py b/posthog/temporal/data_imports/pipelines/stripe/__init__.py index 51f3f7283f064..5b2e6aa65d4c0 100644 --- a/posthog/temporal/data_imports/pipelines/stripe/__init__.py +++ b/posthog/temporal/data_imports/pipelines/stripe/__init__.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import Any, Optional import dlt from dlt.sources.helpers.rest_client.paginators import BasePaginator from dlt.sources.helpers.requests import Response, Request @@ -264,7 +264,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: class StripePaginator(BasePaginator): - def update_state(self, response: Response) -> None: + def update_state(self, response: Response, data: Optional[list[Any]] = None) -> None: res = response.json() self._starting_after = None diff --git a/posthog/temporal/data_imports/pipelines/zendesk/__init__.py b/posthog/temporal/data_imports/pipelines/zendesk/__init__.py index 0bf2510cce8f3..36d842e4d3889 100644 --- a/posthog/temporal/data_imports/pipelines/zendesk/__init__.py +++ b/posthog/temporal/data_imports/pipelines/zendesk/__init__.py @@ -1,4 +1,5 @@ import base64 +from typing import Any, Optional import dlt from dlt.sources.helpers.rest_client.paginators import BasePaginator, JSONLinkPaginator from dlt.sources.helpers.requests import Response, Request @@ -235,7 +236,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: class ZendeskTicketsIncrementalEndpointPaginator(BasePaginator): - def update_state(self, response: Response) -> None: + def update_state(self, response: Response, data: Optional[list[Any]] = None) -> None: res = response.json() self._next_start_time = None @@ -260,7 +261,7 @@ def update_request(self, request: Request) -> None: class ZendeskIncrementalEndpointPaginator(BasePaginator): - def update_state(self, response: Response) -> None: + def update_state(self, response: Response, data: Optional[list[Any]] = None) -> None: res = response.json() self._next_page = None diff --git a/posthog/temporal/tests/data_imports/test_end_to_end.py b/posthog/temporal/tests/data_imports/test_end_to_end.py index 98a316458e76d..6ce2ef6e83e02 100644 --- a/posthog/temporal/tests/data_imports/test_end_to_end.py +++ b/posthog/temporal/tests/data_imports/test_end_to_end.py @@ -82,7 +82,14 @@ async def minio_client(): async def _run( - team: Team, schema_name: str, table_name: str, source_type: str, job_inputs: dict[str, str], mock_data_response: Any + team: Team, + schema_name: str, + table_name: str, + source_type: str, + job_inputs: dict[str, str], + mock_data_response: Any, + sync_type: Optional[ExternalDataSchema.SyncType] = None, + sync_type_config: Optional[dict] = None, ): source = await sync_to_async(ExternalDataSource.objects.create)( source_id=uuid.uuid4(), @@ -98,6 +105,8 @@ async def _run( name=schema_name, team_id=team.pk, source_id=source.pk, + sync_type=sync_type, + sync_type_config=sync_type_config or {}, ) workflow_id = str(uuid.uuid4()) @@ -560,3 +569,65 @@ def get_jobs(): ) assert len(s3_objects["Contents"]) != 0 + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_postgres_schema_evolution(team, postgres_config, postgres_connection): + await postgres_connection.execute( + "CREATE TABLE IF NOT EXISTS {schema}.test_table (id integer)".format(schema=postgres_config["schema"]) + ) + await postgres_connection.execute( + "INSERT INTO {schema}.test_table (id) VALUES (1)".format(schema=postgres_config["schema"]) + ) + await postgres_connection.commit() + + _workflow_id, inputs = await _run( + team=team, + schema_name="test_table", + table_name="postgres_test_table", + source_type="Postgres", + job_inputs={ + "host": postgres_config["host"], + "port": postgres_config["port"], + "database": postgres_config["database"], + "user": postgres_config["user"], + "password": postgres_config["password"], + "schema": postgres_config["schema"], + "ssh_tunnel_enabled": "False", + }, + mock_data_response=[], + sync_type=ExternalDataSchema.SyncType.INCREMENTAL, + sync_type_config={"incremental_field": "id", "incremental_field_type": "integer"}, + ) + + res = await sync_to_async(execute_hogql_query)("SELECT * FROM postgres_test_table", team) + columns = res.columns + + assert columns is not None + assert len(columns) == 3 + assert any(x == "id" for x in columns) + assert any(x == "_dlt_id" for x in columns) + assert any(x == "_dlt_load_id" for x in columns) + + # Evole schema + await postgres_connection.execute( + "ALTER TABLE {schema}.test_table ADD new_col integer".format(schema=postgres_config["schema"]) + ) + await postgres_connection.execute( + "INSERT INTO {schema}.test_table (id, new_col) VALUES (2, 2)".format(schema=postgres_config["schema"]) + ) + await postgres_connection.commit() + + # Execute the same schema again - load + await _execute_run(str(uuid.uuid4()), inputs, []) + + res = await sync_to_async(execute_hogql_query)("SELECT * FROM postgres_test_table", team) + columns = res.columns + + assert columns is not None + assert len(columns) == 4 + assert any(x == "id" for x in columns) + assert any(x == "new_col" for x in columns) + assert any(x == "_dlt_id" for x in columns) + assert any(x == "_dlt_load_id" for x in columns) diff --git a/posthog/temporal/tests/external_data/test_external_data_job.py b/posthog/temporal/tests/external_data/test_external_data_job.py index aa16aad42cba4..0c3e1b0001e57 100644 --- a/posthog/temporal/tests/external_data/test_external_data_job.py +++ b/posthog/temporal/tests/external_data/test_external_data_job.py @@ -511,7 +511,7 @@ def mock_to_object_store_rs_credentials(class_self): Bucket=BUCKET_NAME, Prefix=f"{folder_path}/customer/" ) - assert len(job_1_customer_objects["Contents"]) == 3 + assert len(job_1_customer_objects["Contents"]) == 2 with ( mock.patch.object(RESTClient, "paginate", mock_charges_paginate), @@ -536,7 +536,7 @@ def mock_to_object_store_rs_credentials(class_self): job_2_charge_objects = await minio_client.list_objects_v2( Bucket=BUCKET_NAME, Prefix=f"{job_2.folder_path()}/charge/" ) - assert len(job_2_charge_objects["Contents"]) == 3 + assert len(job_2_charge_objects["Contents"]) == 2 @pytest.mark.django_db(transaction=True) @@ -749,7 +749,7 @@ def mock_to_object_store_rs_credentials(class_self): Bucket=BUCKET_NAME, Prefix=f"{folder_path}/customer/" ) - assert len(job_1_customer_objects["Contents"]) == 3 + assert len(job_1_customer_objects["Contents"]) == 2 await sync_to_async(job_1.refresh_from_db)() assert job_1.rows_synced == 1 @@ -914,7 +914,7 @@ def mock_to_object_store_rs_credentials(class_self): job_1_team_objects = await minio_client.list_objects_v2( Bucket=BUCKET_NAME, Prefix=f"{folder_path}/posthog_test/" ) - assert len(job_1_team_objects["Contents"]) == 3 + assert len(job_1_team_objects["Contents"]) == 2 @pytest.mark.django_db(transaction=True) diff --git a/requirements.in b/requirements.in index 4ffa9af65ca47..4311e8243859e 100644 --- a/requirements.in +++ b/requirements.in @@ -33,8 +33,8 @@ djangorestframework==3.15.1 djangorestframework-csv==2.1.1 djangorestframework-dataclasses==1.2.0 django-fernet-encrypted-fields==0.1.3 -dlt==0.5.3 -dlt[deltalake]==0.5.3 +dlt==0.5.4 +dlt[deltalake]==0.5.4 dnspython==2.2.1 drf-exceptions-hog==0.4.0 drf-extensions==0.7.0 diff --git a/requirements.txt b/requirements.txt index e485c3b6e5b98..67cb72032dcc3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -129,7 +129,7 @@ defusedxml==0.6.0 # via # python3-openid # social-auth-core -deltalake==0.18.2 +deltalake==0.19.1 # via dlt distro==1.9.0 # via openai @@ -206,7 +206,7 @@ djangorestframework-csv==2.1.1 # via -r requirements.in djangorestframework-dataclasses==1.2.0 # via -r requirements.in -dlt==0.5.3 +dlt==0.5.4 # via -r requirements.in dnspython==2.2.1 # via -r requirements.in @@ -214,7 +214,7 @@ drf-exceptions-hog==0.4.0 # via -r requirements.in drf-extensions==0.7.0 # via -r requirements.in -drf-spectacular==0.27.1 +drf-spectacular==0.27.2 # via -r requirements.in et-xmlfile==1.1.0 # via openpyxl @@ -258,8 +258,6 @@ googleapis-common-protos==1.60.0 # via # google-api-core # grpcio-status -greenlet==3.0.3 - # via sqlalchemy grpcio==1.57.0 # via # google-api-core @@ -434,8 +432,6 @@ pyarrow==17.0.0 # -r requirements.in # deltalake # dlt -pyarrow-hotfix==0.6 - # via deltalake pyasn1==0.5.0 # via # pyasn1-modules