Skip to content

Commit

Permalink
fix(data-warehouse): Ensure last synced at is always updated (#24811)
Browse files Browse the repository at this point in the history
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
Gilbert09 and github-actions[bot] authored Sep 13, 2024
1 parent 056b561 commit e26055b
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 3 deletions.
7 changes: 6 additions & 1 deletion posthog/temporal/data_imports/pipelines/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from deltalake.exceptions import DeltaError
from collections import Counter

from posthog.warehouse.data_load.validate_schema import validate_schema_and_update_table
from posthog.warehouse.data_load.validate_schema import update_last_synced_at, validate_schema_and_update_table
from posthog.warehouse.models.external_data_job import ExternalDataJob, get_external_data_job
from posthog.warehouse.models.external_data_schema import ExternalDataSchema, aget_schema_by_id
from posthog.warehouse.models.external_data_source import ExternalDataSource
Expand Down Expand Up @@ -252,6 +252,11 @@ def _run(self) -> dict[str, int]:
else:
self.logger.info("No table_counts, skipping validate_schema_and_update_table")

# Update last_synced_at on schema
async_to_sync(update_last_synced_at)(
job_id=self.inputs.run_id, schema_id=str(self.inputs.schema_id), team_id=self.inputs.team_id
)

# Cleanup: delete local state from the file system
pipeline.drop()

Expand Down
2 changes: 2 additions & 0 deletions posthog/temporal/data_imports/pipelines/test/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def mock_create_pipeline(local_self: Any):
"posthog.temporal.data_imports.pipelines.pipeline.validate_schema_and_update_table"
) as mock_validate_schema_and_update_table,
patch("posthog.temporal.data_imports.pipelines.pipeline.get_delta_tables"),
patch("posthog.temporal.data_imports.pipelines.pipeline.update_last_synced_at"),
):
pipeline = await self._create_pipeline("Customer", False)
res = await pipeline.run()
Expand All @@ -99,6 +100,7 @@ def mock_create_pipeline(local_self: Any):
"posthog.temporal.data_imports.pipelines.pipeline.validate_schema_and_update_table"
) as mock_validate_schema_and_update_table,
patch("posthog.temporal.data_imports.pipelines.pipeline.get_delta_tables"),
patch("posthog.temporal.data_imports.pipelines.pipeline.update_last_synced_at"),
):
pipeline = await self._create_pipeline("Customer", True)
res = await pipeline.run()
Expand Down
5 changes: 4 additions & 1 deletion posthog/temporal/tests/data_imports/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,14 @@ async def _run(

await _execute_run(workflow_id, inputs, mock_data_response)

run = await get_latest_run_if_exists(team_id=team.pk, pipeline_id=source.pk)
run: ExternalDataJob = await get_latest_run_if_exists(team_id=team.pk, pipeline_id=source.pk)

assert run is not None
assert run.status == ExternalDataJob.Status.COMPLETED

await sync_to_async(schema.refresh_from_db)()
assert schema.last_synced_at == run.created_at

res = await sync_to_async(execute_hogql_query)(f"SELECT * FROM {table_name}", team)
assert len(res.results) == 1

Expand Down
9 changes: 8 additions & 1 deletion posthog/warehouse/data_load/validate_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ def dlt_to_hogql_type(dlt_type: TDataType | None) -> str:
return hogql_type.__name__


async def update_last_synced_at(job_id: str, schema_id: str, team_id: int) -> None:
job: ExternalDataJob = await get_external_data_job(job_id=job_id)
schema = await aget_schema_by_id(schema_id=schema_id, team_id=team_id)
schema.last_synced_at = job.created_at

await asave_external_data_schema(schema)


async def validate_schema_and_update_table(
run_id: str,
team_id: int,
Expand Down Expand Up @@ -185,7 +193,6 @@ async def validate_schema_and_update_table(

if schema_model:
schema_model.table = table_created
schema_model.last_synced_at = job.created_at
await asave_external_data_schema(schema_model)

except ServerException as err:
Expand Down

0 comments on commit e26055b

Please sign in to comment.