diff --git a/posthog/hogql/transforms/test/__snapshots__/test_in_cohort.ambr b/posthog/hogql/transforms/test/__snapshots__/test_in_cohort.ambr index f40b7c49597da..369f18ab9d118 100644 --- a/posthog/hogql/transforms/test/__snapshots__/test_in_cohort.ambr +++ b/posthog/hogql/transforms/test/__snapshots__/test_in_cohort.ambr @@ -31,7 +31,7 @@ FROM events LEFT JOIN ( SELECT person_static_cohort.person_id AS cohort_person_id, 1 AS matched, person_static_cohort.cohort_id AS cohort_id FROM person_static_cohort - WHERE and(equals(person_static_cohort.team_id, 420), in(person_static_cohort.cohort_id, [1]))) AS __in_cohort ON equals(__in_cohort.cohort_person_id, events.person_id) + WHERE and(equals(person_static_cohort.team_id, 420), in(person_static_cohort.cohort_id, [2]))) AS __in_cohort ON equals(__in_cohort.cohort_person_id, events.person_id) WHERE and(equals(events.team_id, 420), 1, ifNull(equals(__in_cohort.matched, 1), 0)) LIMIT 100 SETTINGS readonly=2, max_execution_time=60, allow_experimental_object_type=1, format_csv_allow_double_quotes=0, max_ast_elements=4000000, max_expanded_ast_elements=4000000, max_bytes_before_external_group_by=0 @@ -42,7 +42,7 @@ FROM events LEFT JOIN ( SELECT person_id AS cohort_person_id, 1 AS matched, cohort_id FROM static_cohort_people - WHERE in(cohort_id, [1])) AS __in_cohort ON equals(__in_cohort.cohort_person_id, person_id) + WHERE in(cohort_id, [2])) AS __in_cohort ON equals(__in_cohort.cohort_person_id, person_id) WHERE and(1, equals(__in_cohort.matched, 1)) LIMIT 100 ''' @@ -55,7 +55,7 @@ FROM events LEFT JOIN ( SELECT person_static_cohort.person_id AS cohort_person_id, 1 AS matched, person_static_cohort.cohort_id AS cohort_id FROM person_static_cohort - WHERE and(equals(person_static_cohort.team_id, 420), in(person_static_cohort.cohort_id, [2]))) AS __in_cohort ON equals(__in_cohort.cohort_person_id, events.person_id) + WHERE and(equals(person_static_cohort.team_id, 420), in(person_static_cohort.cohort_id, [3]))) AS __in_cohort ON equals(__in_cohort.cohort_person_id, events.person_id) WHERE and(equals(events.team_id, 420), 1, ifNull(equals(__in_cohort.matched, 1), 0)) LIMIT 100 SETTINGS readonly=2, max_execution_time=60, allow_experimental_object_type=1, format_csv_allow_double_quotes=0, max_ast_elements=4000000, max_expanded_ast_elements=4000000, max_bytes_before_external_group_by=0 @@ -66,7 +66,7 @@ FROM events LEFT JOIN ( SELECT person_id AS cohort_person_id, 1 AS matched, cohort_id FROM static_cohort_people - WHERE in(cohort_id, [2])) AS __in_cohort ON equals(__in_cohort.cohort_person_id, person_id) + WHERE in(cohort_id, [3])) AS __in_cohort ON equals(__in_cohort.cohort_person_id, person_id) WHERE and(1, equals(__in_cohort.matched, 1)) LIMIT 100 ''' diff --git a/posthog/temporal/data_imports/pipelines/pipeline.py b/posthog/temporal/data_imports/pipelines/pipeline.py index 03e57174f0527..a183074972d47 100644 --- a/posthog/temporal/data_imports/pipelines/pipeline.py +++ b/posthog/temporal/data_imports/pipelines/pipeline.py @@ -17,7 +17,7 @@ from deltalake.exceptions import DeltaError from collections import Counter -from posthog.warehouse.data_load.validate_schema import validate_schema_and_update_table +from posthog.warehouse.data_load.validate_schema import update_last_synced_at, validate_schema_and_update_table from posthog.warehouse.models.external_data_job import ExternalDataJob, get_external_data_job from posthog.warehouse.models.external_data_schema import ExternalDataSchema, aget_schema_by_id from posthog.warehouse.models.external_data_source import ExternalDataSource @@ -252,6 +252,11 @@ def _run(self) -> dict[str, int]: else: self.logger.info("No table_counts, skipping validate_schema_and_update_table") + # Update last_synced_at on schema + async_to_sync(update_last_synced_at)( + job_id=self.inputs.run_id, schema_id=str(self.inputs.schema_id), team_id=self.inputs.team_id + ) + # Cleanup: delete local state from the file system pipeline.drop() diff --git a/posthog/temporal/data_imports/pipelines/test/test_pipeline.py b/posthog/temporal/data_imports/pipelines/test/test_pipeline.py index 542309727d4e6..965b77ca5f9ae 100644 --- a/posthog/temporal/data_imports/pipelines/test/test_pipeline.py +++ b/posthog/temporal/data_imports/pipelines/test/test_pipeline.py @@ -78,6 +78,7 @@ def mock_create_pipeline(local_self: Any): "posthog.temporal.data_imports.pipelines.pipeline.validate_schema_and_update_table" ) as mock_validate_schema_and_update_table, patch("posthog.temporal.data_imports.pipelines.pipeline.get_delta_tables"), + patch("posthog.temporal.data_imports.pipelines.pipeline.update_last_synced_at"), ): pipeline = await self._create_pipeline("Customer", False) res = await pipeline.run() @@ -99,6 +100,7 @@ def mock_create_pipeline(local_self: Any): "posthog.temporal.data_imports.pipelines.pipeline.validate_schema_and_update_table" ) as mock_validate_schema_and_update_table, patch("posthog.temporal.data_imports.pipelines.pipeline.get_delta_tables"), + patch("posthog.temporal.data_imports.pipelines.pipeline.update_last_synced_at"), ): pipeline = await self._create_pipeline("Customer", True) res = await pipeline.run() diff --git a/posthog/temporal/tests/data_imports/test_end_to_end.py b/posthog/temporal/tests/data_imports/test_end_to_end.py index 8556ff7bf1c5c..aa6108ff6e0ae 100644 --- a/posthog/temporal/tests/data_imports/test_end_to_end.py +++ b/posthog/temporal/tests/data_imports/test_end_to_end.py @@ -131,11 +131,14 @@ async def _run( await _execute_run(workflow_id, inputs, mock_data_response) - run = await get_latest_run_if_exists(team_id=team.pk, pipeline_id=source.pk) + run: ExternalDataJob = await get_latest_run_if_exists(team_id=team.pk, pipeline_id=source.pk) assert run is not None assert run.status == ExternalDataJob.Status.COMPLETED + await sync_to_async(schema.refresh_from_db)() + assert schema.last_synced_at == run.created_at + res = await sync_to_async(execute_hogql_query)(f"SELECT * FROM {table_name}", team) assert len(res.results) == 1 diff --git a/posthog/warehouse/data_load/validate_schema.py b/posthog/warehouse/data_load/validate_schema.py index 1ac51e511b460..9917f2e3f7e45 100644 --- a/posthog/warehouse/data_load/validate_schema.py +++ b/posthog/warehouse/data_load/validate_schema.py @@ -66,6 +66,14 @@ def dlt_to_hogql_type(dlt_type: TDataType | None) -> str: return hogql_type.__name__ +async def update_last_synced_at(job_id: str, schema_id: str, team_id: int) -> None: + job: ExternalDataJob = await get_external_data_job(job_id=job_id) + schema = await aget_schema_by_id(schema_id=schema_id, team_id=team_id) + schema.last_synced_at = job.created_at + + await asave_external_data_schema(schema) + + async def validate_schema_and_update_table( run_id: str, team_id: int, @@ -185,7 +193,6 @@ async def validate_schema_and_update_table( if schema_model: schema_model.table = table_created - schema_model.last_synced_at = job.created_at await asave_external_data_schema(schema_model) except ServerException as err: