Skip to content

Commit

Permalink
fix(data-warehouse): Upgraded DLT and added test for schema evolution (
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilbert09 authored Aug 29, 2024
1 parent 863525e commit 9bcda37
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import Any, Optional
import dlt
from dlt.sources.helpers.rest_client.paginators import BasePaginator
from dlt.sources.helpers.requests import Response, Request
Expand Down Expand Up @@ -157,7 +158,7 @@ def __init__(self, instance_url):
super().__init__()
self.instance_url = instance_url

def update_state(self, response: Response) -> None:
def update_state(self, response: Response, data: Optional[list[Any]] = None) -> None:
res = response.json()

self._next_page = None
Expand Down
4 changes: 2 additions & 2 deletions posthog/temporal/data_imports/pipelines/stripe/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Optional
from typing import Any, Optional
import dlt
from dlt.sources.helpers.rest_client.paginators import BasePaginator
from dlt.sources.helpers.requests import Response, Request
Expand Down Expand Up @@ -264,7 +264,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:


class StripePaginator(BasePaginator):
def update_state(self, response: Response) -> None:
def update_state(self, response: Response, data: Optional[list[Any]] = None) -> None:
res = response.json()

self._starting_after = None
Expand Down
5 changes: 3 additions & 2 deletions posthog/temporal/data_imports/pipelines/zendesk/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import base64
from typing import Any, Optional
import dlt
from dlt.sources.helpers.rest_client.paginators import BasePaginator, JSONLinkPaginator
from dlt.sources.helpers.requests import Response, Request
Expand Down Expand Up @@ -235,7 +236,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:


class ZendeskTicketsIncrementalEndpointPaginator(BasePaginator):
def update_state(self, response: Response) -> None:
def update_state(self, response: Response, data: Optional[list[Any]] = None) -> None:
res = response.json()

self._next_start_time = None
Expand All @@ -260,7 +261,7 @@ def update_request(self, request: Request) -> None:


class ZendeskIncrementalEndpointPaginator(BasePaginator):
def update_state(self, response: Response) -> None:
def update_state(self, response: Response, data: Optional[list[Any]] = None) -> None:
res = response.json()

self._next_page = None
Expand Down
73 changes: 72 additions & 1 deletion posthog/temporal/tests/data_imports/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,14 @@ async def minio_client():


async def _run(
team: Team, schema_name: str, table_name: str, source_type: str, job_inputs: dict[str, str], mock_data_response: Any
team: Team,
schema_name: str,
table_name: str,
source_type: str,
job_inputs: dict[str, str],
mock_data_response: Any,
sync_type: Optional[ExternalDataSchema.SyncType] = None,
sync_type_config: Optional[dict] = None,
):
source = await sync_to_async(ExternalDataSource.objects.create)(
source_id=uuid.uuid4(),
Expand All @@ -98,6 +105,8 @@ async def _run(
name=schema_name,
team_id=team.pk,
source_id=source.pk,
sync_type=sync_type,
sync_type_config=sync_type_config or {},
)

workflow_id = str(uuid.uuid4())
Expand Down Expand Up @@ -560,3 +569,65 @@ def get_jobs():
)

assert len(s3_objects["Contents"]) != 0


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_postgres_schema_evolution(team, postgres_config, postgres_connection):
await postgres_connection.execute(
"CREATE TABLE IF NOT EXISTS {schema}.test_table (id integer)".format(schema=postgres_config["schema"])
)
await postgres_connection.execute(
"INSERT INTO {schema}.test_table (id) VALUES (1)".format(schema=postgres_config["schema"])
)
await postgres_connection.commit()

_workflow_id, inputs = await _run(
team=team,
schema_name="test_table",
table_name="postgres_test_table",
source_type="Postgres",
job_inputs={
"host": postgres_config["host"],
"port": postgres_config["port"],
"database": postgres_config["database"],
"user": postgres_config["user"],
"password": postgres_config["password"],
"schema": postgres_config["schema"],
"ssh_tunnel_enabled": "False",
},
mock_data_response=[],
sync_type=ExternalDataSchema.SyncType.INCREMENTAL,
sync_type_config={"incremental_field": "id", "incremental_field_type": "integer"},
)

res = await sync_to_async(execute_hogql_query)("SELECT * FROM postgres_test_table", team)
columns = res.columns

assert columns is not None
assert len(columns) == 3
assert any(x == "id" for x in columns)
assert any(x == "_dlt_id" for x in columns)
assert any(x == "_dlt_load_id" for x in columns)

# Evole schema
await postgres_connection.execute(
"ALTER TABLE {schema}.test_table ADD new_col integer".format(schema=postgres_config["schema"])
)
await postgres_connection.execute(
"INSERT INTO {schema}.test_table (id, new_col) VALUES (2, 2)".format(schema=postgres_config["schema"])
)
await postgres_connection.commit()

# Execute the same schema again - load
await _execute_run(str(uuid.uuid4()), inputs, [])

res = await sync_to_async(execute_hogql_query)("SELECT * FROM postgres_test_table", team)
columns = res.columns

assert columns is not None
assert len(columns) == 4
assert any(x == "id" for x in columns)
assert any(x == "new_col" for x in columns)
assert any(x == "_dlt_id" for x in columns)
assert any(x == "_dlt_load_id" for x in columns)
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ def mock_to_object_store_rs_credentials(class_self):
Bucket=BUCKET_NAME, Prefix=f"{folder_path}/customer/"
)

assert len(job_1_customer_objects["Contents"]) == 3
assert len(job_1_customer_objects["Contents"]) == 2

with (
mock.patch.object(RESTClient, "paginate", mock_charges_paginate),
Expand All @@ -536,7 +536,7 @@ def mock_to_object_store_rs_credentials(class_self):
job_2_charge_objects = await minio_client.list_objects_v2(
Bucket=BUCKET_NAME, Prefix=f"{job_2.folder_path()}/charge/"
)
assert len(job_2_charge_objects["Contents"]) == 3
assert len(job_2_charge_objects["Contents"]) == 2


@pytest.mark.django_db(transaction=True)
Expand Down Expand Up @@ -749,7 +749,7 @@ def mock_to_object_store_rs_credentials(class_self):
Bucket=BUCKET_NAME, Prefix=f"{folder_path}/customer/"
)

assert len(job_1_customer_objects["Contents"]) == 3
assert len(job_1_customer_objects["Contents"]) == 2

await sync_to_async(job_1.refresh_from_db)()
assert job_1.rows_synced == 1
Expand Down Expand Up @@ -914,7 +914,7 @@ def mock_to_object_store_rs_credentials(class_self):
job_1_team_objects = await minio_client.list_objects_v2(
Bucket=BUCKET_NAME, Prefix=f"{folder_path}/posthog_test/"
)
assert len(job_1_team_objects["Contents"]) == 3
assert len(job_1_team_objects["Contents"]) == 2


@pytest.mark.django_db(transaction=True)
Expand Down
4 changes: 2 additions & 2 deletions requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ djangorestframework==3.15.1
djangorestframework-csv==2.1.1
djangorestframework-dataclasses==1.2.0
django-fernet-encrypted-fields==0.1.3
dlt==0.5.3
dlt[deltalake]==0.5.3
dlt==0.5.4
dlt[deltalake]==0.5.4
dnspython==2.2.1
drf-exceptions-hog==0.4.0
drf-extensions==0.7.0
Expand Down
10 changes: 3 additions & 7 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ defusedxml==0.6.0
# via
# python3-openid
# social-auth-core
deltalake==0.18.2
deltalake==0.19.1
# via dlt
distro==1.9.0
# via openai
Expand Down Expand Up @@ -206,15 +206,15 @@ djangorestframework-csv==2.1.1
# via -r requirements.in
djangorestframework-dataclasses==1.2.0
# via -r requirements.in
dlt==0.5.3
dlt==0.5.4
# via -r requirements.in
dnspython==2.2.1
# via -r requirements.in
drf-exceptions-hog==0.4.0
# via -r requirements.in
drf-extensions==0.7.0
# via -r requirements.in
drf-spectacular==0.27.1
drf-spectacular==0.27.2
# via -r requirements.in
et-xmlfile==1.1.0
# via openpyxl
Expand Down Expand Up @@ -258,8 +258,6 @@ googleapis-common-protos==1.60.0
# via
# google-api-core
# grpcio-status
greenlet==3.0.3
# via sqlalchemy
grpcio==1.57.0
# via
# google-api-core
Expand Down Expand Up @@ -434,8 +432,6 @@ pyarrow==17.0.0
# -r requirements.in
# deltalake
# dlt
pyarrow-hotfix==0.6
# via deltalake
pyasn1==0.5.0
# via
# pyasn1-modules
Expand Down

0 comments on commit 9bcda37

Please sign in to comment.