From b41a4947314d79acb9d4cf30186675b6d806501d Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Wed, 4 Sep 2024 14:33:23 +0100 Subject: [PATCH 1/2] Exclude rows missing the cursor path of incremental value --- .../pipelines/sql_database/__init__.py | 56 ++++++++---- .../pipelines/sql_database/helpers.py | 10 ++- .../tests/data_imports/test_end_to_end.py | 90 +++++++++++++++++++ posthog/warehouse/types.py | 4 +- 4 files changed, 138 insertions(+), 22 deletions(-) diff --git a/posthog/temporal/data_imports/pipelines/sql_database/__init__.py b/posthog/temporal/data_imports/pipelines/sql_database/__init__.py index 0fc7f7394b6ad..1beb348147ee4 100644 --- a/posthog/temporal/data_imports/pipelines/sql_database/__init__.py +++ b/posthog/temporal/data_imports/pipelines/sql_database/__init__.py @@ -133,6 +133,19 @@ def internal_remove(doc: dict) -> dict: return internal_remove +def replace_incremental_fields(incremental: Optional[dlt.sources.incremental]): + def internal_replace(doc: dict) -> dict: + if incremental is None: + return doc + + if doc.get(incremental.cursor_path, None) is None: + doc[incremental.cursor_path] = incremental.initial_value + + return doc + + return internal_replace + + @dlt.source(max_table_nesting=0) def sql_database( credentials: Union[ConnectionStringCredentials, Engine, str] = dlt.secrets.value, @@ -173,24 +186,31 @@ def sql_database( # and pass them in here to get empty table materialization binary_columns_to_drop = get_binary_columns(engine, schema or "", table.name) - yield dlt.resource( - table_rows, - name=table.name, - primary_key=get_primary_key(table), - merge_key=get_primary_key(table), - write_disposition={ - "disposition": "merge", - "strategy": "upsert", - } - if incremental - else "replace", - spec=SqlDatabaseTableConfiguration, - table_format="delta", - columns=get_column_hints(engine, schema or "", table.name), - ).add_map(remove_columns(binary_columns_to_drop, team_id))( - engine=engine, - table=table, - incremental=incremental, + yield ( + dlt.resource( + table_rows, + name=table.name, + primary_key=get_primary_key(table), + merge_key=get_primary_key(table), + write_disposition={ + "disposition": "merge", + "strategy": "upsert", + } + if incremental + else "replace", + spec=SqlDatabaseTableConfiguration, + table_format="delta", + columns=get_column_hints(engine, schema or "", table.name), + ) + .add_map( + replace_incremental_fields(incremental), + insert_at=1, # Adds this map func before incremental processing + ) + .add_map(remove_columns(binary_columns_to_drop, team_id))( + engine=engine, + table=table, + incremental=incremental, + ) ) diff --git a/posthog/temporal/data_imports/pipelines/sql_database/helpers.py b/posthog/temporal/data_imports/pipelines/sql_database/helpers.py index 894407beda8a0..d877effb3e374 100644 --- a/posthog/temporal/data_imports/pipelines/sql_database/helpers.py +++ b/posthog/temporal/data_imports/pipelines/sql_database/helpers.py @@ -54,10 +54,16 @@ def make_query(self) -> Select[Any]: last_value_func = self.incremental.last_value_func if last_value_func is max: # Query ordered and filtered according to last_value function order_by = self.cursor_column.asc() # type: ignore - filter_op = operator.gt + if self.last_value == self.incremental.initial_value: + filter_op = operator.ge + else: + filter_op = operator.gt elif last_value_func is min: order_by = self.cursor_column.desc() # type: ignore - filter_op = operator.lt + if self.last_value == self.incremental.initial_value: + filter_op = operator.le + else: + filter_op = operator.lt else: # Custom last_value, load everything and let incremental handle filtering return query query = query.order_by(order_by) diff --git a/posthog/temporal/tests/data_imports/test_end_to_end.py b/posthog/temporal/tests/data_imports/test_end_to_end.py index 6ce2ef6e83e02..7e422a148ac26 100644 --- a/posthog/temporal/tests/data_imports/test_end_to_end.py +++ b/posthog/temporal/tests/data_imports/test_end_to_end.py @@ -631,3 +631,93 @@ async def test_postgres_schema_evolution(team, postgres_config, postgres_connect assert any(x == "new_col" for x in columns) assert any(x == "_dlt_id" for x in columns) assert any(x == "_dlt_load_id" for x in columns) + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_sql_database_missing_incremental_values(team, postgres_config, postgres_connection): + await postgres_connection.execute( + "CREATE TABLE IF NOT EXISTS {schema}.test_table (id integer)".format(schema=postgres_config["schema"]) + ) + await postgres_connection.execute( + "INSERT INTO {schema}.test_table (id) VALUES (1)".format(schema=postgres_config["schema"]) + ) + await postgres_connection.execute( + "INSERT INTO {schema}.test_table (id) VALUES (null)".format(schema=postgres_config["schema"]) + ) + await postgres_connection.commit() + + await _run( + team=team, + schema_name="test_table", + table_name="postgres_test_table", + source_type="Postgres", + job_inputs={ + "host": postgres_config["host"], + "port": postgres_config["port"], + "database": postgres_config["database"], + "user": postgres_config["user"], + "password": postgres_config["password"], + "schema": postgres_config["schema"], + "ssh_tunnel_enabled": "False", + }, + mock_data_response=[], + sync_type=ExternalDataSchema.SyncType.INCREMENTAL, + sync_type_config={"incremental_field": "id", "incremental_field_type": "integer"}, + ) + + res = await sync_to_async(execute_hogql_query)("SELECT * FROM postgres_test_table", team) + columns = res.columns + + assert columns is not None + assert len(columns) == 3 + assert any(x == "id" for x in columns) + assert any(x == "_dlt_id" for x in columns) + assert any(x == "_dlt_load_id" for x in columns) + + # Exclude rows that don't have the incremental cursor key set + assert len(res.results) == 1 + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_sql_database_incremental_initual_value(team, postgres_config, postgres_connection): + await postgres_connection.execute( + "CREATE TABLE IF NOT EXISTS {schema}.test_table (id integer)".format(schema=postgres_config["schema"]) + ) + # Setting `id` to `0` - the same as an `integer` incremental initial value + await postgres_connection.execute( + "INSERT INTO {schema}.test_table (id) VALUES (0)".format(schema=postgres_config["schema"]) + ) + await postgres_connection.commit() + + await _run( + team=team, + schema_name="test_table", + table_name="postgres_test_table", + source_type="Postgres", + job_inputs={ + "host": postgres_config["host"], + "port": postgres_config["port"], + "database": postgres_config["database"], + "user": postgres_config["user"], + "password": postgres_config["password"], + "schema": postgres_config["schema"], + "ssh_tunnel_enabled": "False", + }, + mock_data_response=[], + sync_type=ExternalDataSchema.SyncType.INCREMENTAL, + sync_type_config={"incremental_field": "id", "incremental_field_type": "integer"}, + ) + + res = await sync_to_async(execute_hogql_query)("SELECT * FROM postgres_test_table", team) + columns = res.columns + + assert columns is not None + assert len(columns) == 3 + assert any(x == "id" for x in columns) + assert any(x == "_dlt_id" for x in columns) + assert any(x == "_dlt_load_id" for x in columns) + + # Include rows that have the same incremental value as the `initial_value` + assert len(res.results) == 1 diff --git a/posthog/warehouse/types.py b/posthog/warehouse/types.py index 57455ac361232..910367854f23c 100644 --- a/posthog/warehouse/types.py +++ b/posthog/warehouse/types.py @@ -1,8 +1,8 @@ -from enum import Enum +from enum import StrEnum from typing import TypedDict -class IncrementalFieldType(Enum): +class IncrementalFieldType(StrEnum): Integer = "integer" Numeric = "numeric" # For snowflake DateTime = "datetime" From b45bd900a6f623483a7568403a41b123d696d619 Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Wed, 4 Sep 2024 14:39:07 +0100 Subject: [PATCH 2/2] Remove unneeded func --- .../pipelines/sql_database/__init__.py | 20 +------------------ 1 file changed, 1 insertion(+), 19 deletions(-) diff --git a/posthog/temporal/data_imports/pipelines/sql_database/__init__.py b/posthog/temporal/data_imports/pipelines/sql_database/__init__.py index 1beb348147ee4..900c70d3b56c6 100644 --- a/posthog/temporal/data_imports/pipelines/sql_database/__init__.py +++ b/posthog/temporal/data_imports/pipelines/sql_database/__init__.py @@ -133,19 +133,6 @@ def internal_remove(doc: dict) -> dict: return internal_remove -def replace_incremental_fields(incremental: Optional[dlt.sources.incremental]): - def internal_replace(doc: dict) -> dict: - if incremental is None: - return doc - - if doc.get(incremental.cursor_path, None) is None: - doc[incremental.cursor_path] = incremental.initial_value - - return doc - - return internal_replace - - @dlt.source(max_table_nesting=0) def sql_database( credentials: Union[ConnectionStringCredentials, Engine, str] = dlt.secrets.value, @@ -201,12 +188,7 @@ def sql_database( spec=SqlDatabaseTableConfiguration, table_format="delta", columns=get_column_hints(engine, schema or "", table.name), - ) - .add_map( - replace_incremental_fields(incremental), - insert_at=1, # Adds this map func before incremental processing - ) - .add_map(remove_columns(binary_columns_to_drop, team_id))( + ).add_map(remove_columns(binary_columns_to_drop, team_id))( engine=engine, table=table, incremental=incremental,