diff --git a/posthog/temporal/data_imports/pipelines/pipeline.py b/posthog/temporal/data_imports/pipelines/pipeline.py index 0dbfd8f5c3fd7..c7485738890c2 100644 --- a/posthog/temporal/data_imports/pipelines/pipeline.py +++ b/posthog/temporal/data_imports/pipelines/pipeline.py @@ -17,7 +17,7 @@ from deltalake.exceptions import DeltaError from collections import Counter -from posthog.warehouse.data_load.validate_schema import validate_schema_and_update_table +from posthog.warehouse.data_load.validate_schema import update_last_synced_at, validate_schema_and_update_table from posthog.warehouse.models.external_data_job import ExternalDataJob, get_external_data_job from posthog.warehouse.models.external_data_schema import ExternalDataSchema, aget_schema_by_id from posthog.warehouse.models.external_data_source import ExternalDataSource @@ -254,6 +254,11 @@ def _run(self) -> dict[str, int]: else: self.logger.info("No table_counts, skipping validate_schema_and_update_table") + # Update last_synced_at on schema + async_to_sync(update_last_synced_at)( + job_id=self.inputs.run_id, schema_id=str(self.inputs.schema_id), team_id=self.inputs.team_id + ) + # Delete local state from the file system pipeline.drop() diff --git a/posthog/temporal/tests/data_imports/test_end_to_end.py b/posthog/temporal/tests/data_imports/test_end_to_end.py index 054f5f4a3c471..b4eab2ed156ea 100644 --- a/posthog/temporal/tests/data_imports/test_end_to_end.py +++ b/posthog/temporal/tests/data_imports/test_end_to_end.py @@ -131,11 +131,14 @@ async def _run( await _execute_run(workflow_id, inputs, mock_data_response) - run = await get_latest_run_if_exists(team_id=team.pk, pipeline_id=source.pk) + run: ExternalDataJob = await get_latest_run_if_exists(team_id=team.pk, pipeline_id=source.pk) assert run is not None assert run.status == ExternalDataJob.Status.COMPLETED + await sync_to_async(schema.refresh_from_db)() + assert schema.last_synced_at == run.created_at + res = await sync_to_async(execute_hogql_query)(f"SELECT * FROM {table_name}", team) assert len(res.results) == 1 diff --git a/posthog/warehouse/data_load/validate_schema.py b/posthog/warehouse/data_load/validate_schema.py index 1ac51e511b460..9917f2e3f7e45 100644 --- a/posthog/warehouse/data_load/validate_schema.py +++ b/posthog/warehouse/data_load/validate_schema.py @@ -66,6 +66,14 @@ def dlt_to_hogql_type(dlt_type: TDataType | None) -> str: return hogql_type.__name__ +async def update_last_synced_at(job_id: str, schema_id: str, team_id: int) -> None: + job: ExternalDataJob = await get_external_data_job(job_id=job_id) + schema = await aget_schema_by_id(schema_id=schema_id, team_id=team_id) + schema.last_synced_at = job.created_at + + await asave_external_data_schema(schema) + + async def validate_schema_and_update_table( run_id: str, team_id: int, @@ -185,7 +193,6 @@ async def validate_schema_and_update_table( if schema_model: schema_model.table = table_created - schema_model.last_synced_at = job.created_at await asave_external_data_schema(schema_model) except ServerException as err: