From 8b3aeb236eb37c99db72389c10af36f03cd5d023 Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Wed, 4 Sep 2024 17:06:29 +0100 Subject: [PATCH] fix(data-warehouse): Exclude rows missing the cursor path of incremental value (#24782) --- .../pipelines/sql_database/__init__.py | 38 ++++---- .../pipelines/sql_database/helpers.py | 10 ++- .../tests/data_imports/test_end_to_end.py | 90 +++++++++++++++++++ posthog/warehouse/types.py | 4 +- 4 files changed, 120 insertions(+), 22 deletions(-) diff --git a/posthog/temporal/data_imports/pipelines/sql_database/__init__.py b/posthog/temporal/data_imports/pipelines/sql_database/__init__.py index 0fc7f7394b6ad..900c70d3b56c6 100644 --- a/posthog/temporal/data_imports/pipelines/sql_database/__init__.py +++ b/posthog/temporal/data_imports/pipelines/sql_database/__init__.py @@ -173,24 +173,26 @@ def sql_database( # and pass them in here to get empty table materialization binary_columns_to_drop = get_binary_columns(engine, schema or "", table.name) - yield dlt.resource( - table_rows, - name=table.name, - primary_key=get_primary_key(table), - merge_key=get_primary_key(table), - write_disposition={ - "disposition": "merge", - "strategy": "upsert", - } - if incremental - else "replace", - spec=SqlDatabaseTableConfiguration, - table_format="delta", - columns=get_column_hints(engine, schema or "", table.name), - ).add_map(remove_columns(binary_columns_to_drop, team_id))( - engine=engine, - table=table, - incremental=incremental, + yield ( + dlt.resource( + table_rows, + name=table.name, + primary_key=get_primary_key(table), + merge_key=get_primary_key(table), + write_disposition={ + "disposition": "merge", + "strategy": "upsert", + } + if incremental + else "replace", + spec=SqlDatabaseTableConfiguration, + table_format="delta", + columns=get_column_hints(engine, schema or "", table.name), + ).add_map(remove_columns(binary_columns_to_drop, team_id))( + engine=engine, + table=table, + incremental=incremental, + ) ) diff --git a/posthog/temporal/data_imports/pipelines/sql_database/helpers.py b/posthog/temporal/data_imports/pipelines/sql_database/helpers.py index 894407beda8a0..d877effb3e374 100644 --- a/posthog/temporal/data_imports/pipelines/sql_database/helpers.py +++ b/posthog/temporal/data_imports/pipelines/sql_database/helpers.py @@ -54,10 +54,16 @@ def make_query(self) -> Select[Any]: last_value_func = self.incremental.last_value_func if last_value_func is max: # Query ordered and filtered according to last_value function order_by = self.cursor_column.asc() # type: ignore - filter_op = operator.gt + if self.last_value == self.incremental.initial_value: + filter_op = operator.ge + else: + filter_op = operator.gt elif last_value_func is min: order_by = self.cursor_column.desc() # type: ignore - filter_op = operator.lt + if self.last_value == self.incremental.initial_value: + filter_op = operator.le + else: + filter_op = operator.lt else: # Custom last_value, load everything and let incremental handle filtering return query query = query.order_by(order_by) diff --git a/posthog/temporal/tests/data_imports/test_end_to_end.py b/posthog/temporal/tests/data_imports/test_end_to_end.py index 054f5f4a3c471..8556ff7bf1c5c 100644 --- a/posthog/temporal/tests/data_imports/test_end_to_end.py +++ b/posthog/temporal/tests/data_imports/test_end_to_end.py @@ -687,6 +687,96 @@ async def test_postgres_schema_evolution(team, postgres_config, postgres_connect assert any(x == "_dlt_load_id" for x in columns) +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_sql_database_missing_incremental_values(team, postgres_config, postgres_connection): + await postgres_connection.execute( + "CREATE TABLE IF NOT EXISTS {schema}.test_table (id integer)".format(schema=postgres_config["schema"]) + ) + await postgres_connection.execute( + "INSERT INTO {schema}.test_table (id) VALUES (1)".format(schema=postgres_config["schema"]) + ) + await postgres_connection.execute( + "INSERT INTO {schema}.test_table (id) VALUES (null)".format(schema=postgres_config["schema"]) + ) + await postgres_connection.commit() + + await _run( + team=team, + schema_name="test_table", + table_name="postgres_test_table", + source_type="Postgres", + job_inputs={ + "host": postgres_config["host"], + "port": postgres_config["port"], + "database": postgres_config["database"], + "user": postgres_config["user"], + "password": postgres_config["password"], + "schema": postgres_config["schema"], + "ssh_tunnel_enabled": "False", + }, + mock_data_response=[], + sync_type=ExternalDataSchema.SyncType.INCREMENTAL, + sync_type_config={"incremental_field": "id", "incremental_field_type": "integer"}, + ) + + res = await sync_to_async(execute_hogql_query)("SELECT * FROM postgres_test_table", team) + columns = res.columns + + assert columns is not None + assert len(columns) == 3 + assert any(x == "id" for x in columns) + assert any(x == "_dlt_id" for x in columns) + assert any(x == "_dlt_load_id" for x in columns) + + # Exclude rows that don't have the incremental cursor key set + assert len(res.results) == 1 + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_sql_database_incremental_initual_value(team, postgres_config, postgres_connection): + await postgres_connection.execute( + "CREATE TABLE IF NOT EXISTS {schema}.test_table (id integer)".format(schema=postgres_config["schema"]) + ) + # Setting `id` to `0` - the same as an `integer` incremental initial value + await postgres_connection.execute( + "INSERT INTO {schema}.test_table (id) VALUES (0)".format(schema=postgres_config["schema"]) + ) + await postgres_connection.commit() + + await _run( + team=team, + schema_name="test_table", + table_name="postgres_test_table", + source_type="Postgres", + job_inputs={ + "host": postgres_config["host"], + "port": postgres_config["port"], + "database": postgres_config["database"], + "user": postgres_config["user"], + "password": postgres_config["password"], + "schema": postgres_config["schema"], + "ssh_tunnel_enabled": "False", + }, + mock_data_response=[], + sync_type=ExternalDataSchema.SyncType.INCREMENTAL, + sync_type_config={"incremental_field": "id", "incremental_field_type": "integer"}, + ) + + res = await sync_to_async(execute_hogql_query)("SELECT * FROM postgres_test_table", team) + columns = res.columns + + assert columns is not None + assert len(columns) == 3 + assert any(x == "id" for x in columns) + assert any(x == "_dlt_id" for x in columns) + assert any(x == "_dlt_load_id" for x in columns) + + # Include rows that have the same incremental value as the `initial_value` + assert len(res.results) == 1 + + @pytest.mark.django_db(transaction=True) @pytest.mark.asyncio async def test_billing_limits(team, stripe_customer): diff --git a/posthog/warehouse/types.py b/posthog/warehouse/types.py index 57455ac361232..910367854f23c 100644 --- a/posthog/warehouse/types.py +++ b/posthog/warehouse/types.py @@ -1,8 +1,8 @@ -from enum import Enum +from enum import StrEnum from typing import TypedDict -class IncrementalFieldType(Enum): +class IncrementalFieldType(StrEnum): Integer = "integer" Numeric = "numeric" # For snowflake DateTime = "datetime"