Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(data-warehouse): Ensure last synced at is always updated #24811

Merged
merged 4 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
FROM events LEFT JOIN (
SELECT person_static_cohort.person_id AS cohort_person_id, 1 AS matched, person_static_cohort.cohort_id AS cohort_id
FROM person_static_cohort
WHERE and(equals(person_static_cohort.team_id, 420), in(person_static_cohort.cohort_id, [1]))) AS __in_cohort ON equals(__in_cohort.cohort_person_id, events.person_id)
WHERE and(equals(person_static_cohort.team_id, 420), in(person_static_cohort.cohort_id, [2]))) AS __in_cohort ON equals(__in_cohort.cohort_person_id, events.person_id)
WHERE and(equals(events.team_id, 420), 1, ifNull(equals(__in_cohort.matched, 1), 0))
LIMIT 100
SETTINGS readonly=2, max_execution_time=60, allow_experimental_object_type=1, format_csv_allow_double_quotes=0, max_ast_elements=4000000, max_expanded_ast_elements=4000000, max_bytes_before_external_group_by=0
Expand All @@ -42,7 +42,7 @@
FROM events LEFT JOIN (
SELECT person_id AS cohort_person_id, 1 AS matched, cohort_id
FROM static_cohort_people
WHERE in(cohort_id, [1])) AS __in_cohort ON equals(__in_cohort.cohort_person_id, person_id)
WHERE in(cohort_id, [2])) AS __in_cohort ON equals(__in_cohort.cohort_person_id, person_id)
WHERE and(1, equals(__in_cohort.matched, 1))
LIMIT 100
'''
Expand All @@ -55,7 +55,7 @@
FROM events LEFT JOIN (
SELECT person_static_cohort.person_id AS cohort_person_id, 1 AS matched, person_static_cohort.cohort_id AS cohort_id
FROM person_static_cohort
WHERE and(equals(person_static_cohort.team_id, 420), in(person_static_cohort.cohort_id, [2]))) AS __in_cohort ON equals(__in_cohort.cohort_person_id, events.person_id)
WHERE and(equals(person_static_cohort.team_id, 420), in(person_static_cohort.cohort_id, [3]))) AS __in_cohort ON equals(__in_cohort.cohort_person_id, events.person_id)
WHERE and(equals(events.team_id, 420), 1, ifNull(equals(__in_cohort.matched, 1), 0))
LIMIT 100
SETTINGS readonly=2, max_execution_time=60, allow_experimental_object_type=1, format_csv_allow_double_quotes=0, max_ast_elements=4000000, max_expanded_ast_elements=4000000, max_bytes_before_external_group_by=0
Expand All @@ -66,7 +66,7 @@
FROM events LEFT JOIN (
SELECT person_id AS cohort_person_id, 1 AS matched, cohort_id
FROM static_cohort_people
WHERE in(cohort_id, [2])) AS __in_cohort ON equals(__in_cohort.cohort_person_id, person_id)
WHERE in(cohort_id, [3])) AS __in_cohort ON equals(__in_cohort.cohort_person_id, person_id)
WHERE and(1, equals(__in_cohort.matched, 1))
LIMIT 100
'''
Expand Down
7 changes: 6 additions & 1 deletion posthog/temporal/data_imports/pipelines/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from deltalake.exceptions import DeltaError
from collections import Counter

from posthog.warehouse.data_load.validate_schema import validate_schema_and_update_table
from posthog.warehouse.data_load.validate_schema import update_last_synced_at, validate_schema_and_update_table
from posthog.warehouse.models.external_data_job import ExternalDataJob, get_external_data_job
from posthog.warehouse.models.external_data_schema import ExternalDataSchema, aget_schema_by_id
from posthog.warehouse.models.external_data_source import ExternalDataSource
Expand Down Expand Up @@ -252,6 +252,11 @@ def _run(self) -> dict[str, int]:
else:
self.logger.info("No table_counts, skipping validate_schema_and_update_table")

# Update last_synced_at on schema
async_to_sync(update_last_synced_at)(
job_id=self.inputs.run_id, schema_id=str(self.inputs.schema_id), team_id=self.inputs.team_id
)

# Cleanup: delete local state from the file system
pipeline.drop()

Expand Down
2 changes: 2 additions & 0 deletions posthog/temporal/data_imports/pipelines/test/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def mock_create_pipeline(local_self: Any):
"posthog.temporal.data_imports.pipelines.pipeline.validate_schema_and_update_table"
) as mock_validate_schema_and_update_table,
patch("posthog.temporal.data_imports.pipelines.pipeline.get_delta_tables"),
patch("posthog.temporal.data_imports.pipelines.pipeline.update_last_synced_at"),
):
pipeline = await self._create_pipeline("Customer", False)
res = await pipeline.run()
Expand All @@ -99,6 +100,7 @@ def mock_create_pipeline(local_self: Any):
"posthog.temporal.data_imports.pipelines.pipeline.validate_schema_and_update_table"
) as mock_validate_schema_and_update_table,
patch("posthog.temporal.data_imports.pipelines.pipeline.get_delta_tables"),
patch("posthog.temporal.data_imports.pipelines.pipeline.update_last_synced_at"),
):
pipeline = await self._create_pipeline("Customer", True)
res = await pipeline.run()
Expand Down
5 changes: 4 additions & 1 deletion posthog/temporal/tests/data_imports/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,14 @@ async def _run(

await _execute_run(workflow_id, inputs, mock_data_response)

run = await get_latest_run_if_exists(team_id=team.pk, pipeline_id=source.pk)
run: ExternalDataJob = await get_latest_run_if_exists(team_id=team.pk, pipeline_id=source.pk)

assert run is not None
assert run.status == ExternalDataJob.Status.COMPLETED

await sync_to_async(schema.refresh_from_db)()
assert schema.last_synced_at == run.created_at

res = await sync_to_async(execute_hogql_query)(f"SELECT * FROM {table_name}", team)
assert len(res.results) == 1

Expand Down
9 changes: 8 additions & 1 deletion posthog/warehouse/data_load/validate_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ def dlt_to_hogql_type(dlt_type: TDataType | None) -> str:
return hogql_type.__name__


async def update_last_synced_at(job_id: str, schema_id: str, team_id: int) -> None:
job: ExternalDataJob = await get_external_data_job(job_id=job_id)
schema = await aget_schema_by_id(schema_id=schema_id, team_id=team_id)
schema.last_synced_at = job.created_at

await asave_external_data_schema(schema)


async def validate_schema_and_update_table(
run_id: str,
team_id: int,
Expand Down Expand Up @@ -185,7 +193,6 @@ async def validate_schema_and_update_table(

if schema_model:
schema_model.table = table_created
schema_model.last_synced_at = job.created_at
await asave_external_data_schema(schema_model)

except ServerException as err:
Expand Down
Loading