Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(data-warehouse): Exclude rows missing the cursor path of incremental value #24782

Merged
merged 3 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 20 additions & 18 deletions posthog/temporal/data_imports/pipelines/sql_database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,24 +173,26 @@ def sql_database(
# and pass them in here to get empty table materialization
binary_columns_to_drop = get_binary_columns(engine, schema or "", table.name)

yield dlt.resource(
table_rows,
name=table.name,
primary_key=get_primary_key(table),
merge_key=get_primary_key(table),
write_disposition={
"disposition": "merge",
"strategy": "upsert",
}
if incremental
else "replace",
spec=SqlDatabaseTableConfiguration,
table_format="delta",
columns=get_column_hints(engine, schema or "", table.name),
).add_map(remove_columns(binary_columns_to_drop, team_id))(
engine=engine,
table=table,
incremental=incremental,
yield (
dlt.resource(
table_rows,
name=table.name,
primary_key=get_primary_key(table),
merge_key=get_primary_key(table),
write_disposition={
"disposition": "merge",
"strategy": "upsert",
}
if incremental
else "replace",
spec=SqlDatabaseTableConfiguration,
table_format="delta",
columns=get_column_hints(engine, schema or "", table.name),
).add_map(remove_columns(binary_columns_to_drop, team_id))(
engine=engine,
table=table,
incremental=incremental,
)
)


Expand Down
10 changes: 8 additions & 2 deletions posthog/temporal/data_imports/pipelines/sql_database/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,16 @@ def make_query(self) -> Select[Any]:
last_value_func = self.incremental.last_value_func
if last_value_func is max: # Query ordered and filtered according to last_value function
order_by = self.cursor_column.asc() # type: ignore
filter_op = operator.gt
if self.last_value == self.incremental.initial_value:
filter_op = operator.ge
else:
filter_op = operator.gt
elif last_value_func is min:
order_by = self.cursor_column.desc() # type: ignore
filter_op = operator.lt
if self.last_value == self.incremental.initial_value:
filter_op = operator.le
else:
filter_op = operator.lt
else: # Custom last_value, load everything and let incremental handle filtering
return query
query = query.order_by(order_by)
Expand Down
90 changes: 90 additions & 0 deletions posthog/temporal/tests/data_imports/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,96 @@ async def test_postgres_schema_evolution(team, postgres_config, postgres_connect
assert any(x == "_dlt_load_id" for x in columns)


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_sql_database_missing_incremental_values(team, postgres_config, postgres_connection):
await postgres_connection.execute(
"CREATE TABLE IF NOT EXISTS {schema}.test_table (id integer)".format(schema=postgres_config["schema"])
)
await postgres_connection.execute(
"INSERT INTO {schema}.test_table (id) VALUES (1)".format(schema=postgres_config["schema"])
)
await postgres_connection.execute(
"INSERT INTO {schema}.test_table (id) VALUES (null)".format(schema=postgres_config["schema"])
)
await postgres_connection.commit()

await _run(
team=team,
schema_name="test_table",
table_name="postgres_test_table",
source_type="Postgres",
job_inputs={
"host": postgres_config["host"],
"port": postgres_config["port"],
"database": postgres_config["database"],
"user": postgres_config["user"],
"password": postgres_config["password"],
"schema": postgres_config["schema"],
"ssh_tunnel_enabled": "False",
},
mock_data_response=[],
sync_type=ExternalDataSchema.SyncType.INCREMENTAL,
sync_type_config={"incremental_field": "id", "incremental_field_type": "integer"},
)

res = await sync_to_async(execute_hogql_query)("SELECT * FROM postgres_test_table", team)
columns = res.columns

assert columns is not None
assert len(columns) == 3
assert any(x == "id" for x in columns)
assert any(x == "_dlt_id" for x in columns)
assert any(x == "_dlt_load_id" for x in columns)

# Exclude rows that don't have the incremental cursor key set
assert len(res.results) == 1


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_sql_database_incremental_initual_value(team, postgres_config, postgres_connection):
await postgres_connection.execute(
"CREATE TABLE IF NOT EXISTS {schema}.test_table (id integer)".format(schema=postgres_config["schema"])
)
# Setting `id` to `0` - the same as an `integer` incremental initial value
await postgres_connection.execute(
"INSERT INTO {schema}.test_table (id) VALUES (0)".format(schema=postgres_config["schema"])
)
await postgres_connection.commit()

await _run(
team=team,
schema_name="test_table",
table_name="postgres_test_table",
source_type="Postgres",
job_inputs={
"host": postgres_config["host"],
"port": postgres_config["port"],
"database": postgres_config["database"],
"user": postgres_config["user"],
"password": postgres_config["password"],
"schema": postgres_config["schema"],
"ssh_tunnel_enabled": "False",
},
mock_data_response=[],
sync_type=ExternalDataSchema.SyncType.INCREMENTAL,
sync_type_config={"incremental_field": "id", "incremental_field_type": "integer"},
)

res = await sync_to_async(execute_hogql_query)("SELECT * FROM postgres_test_table", team)
columns = res.columns

assert columns is not None
assert len(columns) == 3
assert any(x == "id" for x in columns)
assert any(x == "_dlt_id" for x in columns)
assert any(x == "_dlt_load_id" for x in columns)

# Include rows that have the same incremental value as the `initial_value`
assert len(res.results) == 1


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_billing_limits(team, stripe_customer):
Expand Down
4 changes: 2 additions & 2 deletions posthog/warehouse/types.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from enum import Enum
from enum import StrEnum
from typing import TypedDict


class IncrementalFieldType(Enum):
class IncrementalFieldType(StrEnum):
Integer = "integer"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the real change here! We weren't doing proper comparisons on IncrementalFieldType without it

Numeric = "numeric" # For snowflake
DateTime = "datetime"
Expand Down
Loading