Skip to content

Commit

Permalink
fix(data-warehouse): Exclude rows missing the cursor path of incremen…
Browse files Browse the repository at this point in the history
…tal value (#24782)
  • Loading branch information
Gilbert09 authored Sep 4, 2024
1 parent 04ef403 commit 8b3aeb2
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 22 deletions.
38 changes: 20 additions & 18 deletions posthog/temporal/data_imports/pipelines/sql_database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,24 +173,26 @@ def sql_database(
# and pass them in here to get empty table materialization
binary_columns_to_drop = get_binary_columns(engine, schema or "", table.name)

yield dlt.resource(
table_rows,
name=table.name,
primary_key=get_primary_key(table),
merge_key=get_primary_key(table),
write_disposition={
"disposition": "merge",
"strategy": "upsert",
}
if incremental
else "replace",
spec=SqlDatabaseTableConfiguration,
table_format="delta",
columns=get_column_hints(engine, schema or "", table.name),
).add_map(remove_columns(binary_columns_to_drop, team_id))(
engine=engine,
table=table,
incremental=incremental,
yield (
dlt.resource(
table_rows,
name=table.name,
primary_key=get_primary_key(table),
merge_key=get_primary_key(table),
write_disposition={
"disposition": "merge",
"strategy": "upsert",
}
if incremental
else "replace",
spec=SqlDatabaseTableConfiguration,
table_format="delta",
columns=get_column_hints(engine, schema or "", table.name),
).add_map(remove_columns(binary_columns_to_drop, team_id))(
engine=engine,
table=table,
incremental=incremental,
)
)


Expand Down
10 changes: 8 additions & 2 deletions posthog/temporal/data_imports/pipelines/sql_database/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,16 @@ def make_query(self) -> Select[Any]:
last_value_func = self.incremental.last_value_func
if last_value_func is max: # Query ordered and filtered according to last_value function
order_by = self.cursor_column.asc() # type: ignore
filter_op = operator.gt
if self.last_value == self.incremental.initial_value:
filter_op = operator.ge
else:
filter_op = operator.gt
elif last_value_func is min:
order_by = self.cursor_column.desc() # type: ignore
filter_op = operator.lt
if self.last_value == self.incremental.initial_value:
filter_op = operator.le
else:
filter_op = operator.lt
else: # Custom last_value, load everything and let incremental handle filtering
return query
query = query.order_by(order_by)
Expand Down
90 changes: 90 additions & 0 deletions posthog/temporal/tests/data_imports/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,96 @@ async def test_postgres_schema_evolution(team, postgres_config, postgres_connect
assert any(x == "_dlt_load_id" for x in columns)


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_sql_database_missing_incremental_values(team, postgres_config, postgres_connection):
await postgres_connection.execute(
"CREATE TABLE IF NOT EXISTS {schema}.test_table (id integer)".format(schema=postgres_config["schema"])
)
await postgres_connection.execute(
"INSERT INTO {schema}.test_table (id) VALUES (1)".format(schema=postgres_config["schema"])
)
await postgres_connection.execute(
"INSERT INTO {schema}.test_table (id) VALUES (null)".format(schema=postgres_config["schema"])
)
await postgres_connection.commit()

await _run(
team=team,
schema_name="test_table",
table_name="postgres_test_table",
source_type="Postgres",
job_inputs={
"host": postgres_config["host"],
"port": postgres_config["port"],
"database": postgres_config["database"],
"user": postgres_config["user"],
"password": postgres_config["password"],
"schema": postgres_config["schema"],
"ssh_tunnel_enabled": "False",
},
mock_data_response=[],
sync_type=ExternalDataSchema.SyncType.INCREMENTAL,
sync_type_config={"incremental_field": "id", "incremental_field_type": "integer"},
)

res = await sync_to_async(execute_hogql_query)("SELECT * FROM postgres_test_table", team)
columns = res.columns

assert columns is not None
assert len(columns) == 3
assert any(x == "id" for x in columns)
assert any(x == "_dlt_id" for x in columns)
assert any(x == "_dlt_load_id" for x in columns)

# Exclude rows that don't have the incremental cursor key set
assert len(res.results) == 1


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_sql_database_incremental_initual_value(team, postgres_config, postgres_connection):
await postgres_connection.execute(
"CREATE TABLE IF NOT EXISTS {schema}.test_table (id integer)".format(schema=postgres_config["schema"])
)
# Setting `id` to `0` - the same as an `integer` incremental initial value
await postgres_connection.execute(
"INSERT INTO {schema}.test_table (id) VALUES (0)".format(schema=postgres_config["schema"])
)
await postgres_connection.commit()

await _run(
team=team,
schema_name="test_table",
table_name="postgres_test_table",
source_type="Postgres",
job_inputs={
"host": postgres_config["host"],
"port": postgres_config["port"],
"database": postgres_config["database"],
"user": postgres_config["user"],
"password": postgres_config["password"],
"schema": postgres_config["schema"],
"ssh_tunnel_enabled": "False",
},
mock_data_response=[],
sync_type=ExternalDataSchema.SyncType.INCREMENTAL,
sync_type_config={"incremental_field": "id", "incremental_field_type": "integer"},
)

res = await sync_to_async(execute_hogql_query)("SELECT * FROM postgres_test_table", team)
columns = res.columns

assert columns is not None
assert len(columns) == 3
assert any(x == "id" for x in columns)
assert any(x == "_dlt_id" for x in columns)
assert any(x == "_dlt_load_id" for x in columns)

# Include rows that have the same incremental value as the `initial_value`
assert len(res.results) == 1


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_billing_limits(team, stripe_customer):
Expand Down
4 changes: 2 additions & 2 deletions posthog/warehouse/types.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from enum import Enum
from enum import StrEnum
from typing import TypedDict


class IncrementalFieldType(Enum):
class IncrementalFieldType(StrEnum):
Integer = "integer"
Numeric = "numeric" # For snowflake
DateTime = "datetime"
Expand Down

0 comments on commit 8b3aeb2

Please sign in to comment.