Skip to content

Commit

Permalink
Ensure last synced at is always updated
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilbert09 committed Sep 5, 2024
1 parent 04ef403 commit c2f76bd
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 3 deletions.
7 changes: 6 additions & 1 deletion posthog/temporal/data_imports/pipelines/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from deltalake.exceptions import DeltaError
from collections import Counter

from posthog.warehouse.data_load.validate_schema import validate_schema_and_update_table
from posthog.warehouse.data_load.validate_schema import update_last_synced_at, validate_schema_and_update_table
from posthog.warehouse.models.external_data_job import ExternalDataJob, get_external_data_job
from posthog.warehouse.models.external_data_schema import ExternalDataSchema, aget_schema_by_id
from posthog.warehouse.models.external_data_source import ExternalDataSource
Expand Down Expand Up @@ -254,6 +254,11 @@ def _run(self) -> dict[str, int]:
else:
self.logger.info("No table_counts, skipping validate_schema_and_update_table")

# Update last_synced_at on schema
async_to_sync(update_last_synced_at)(
job_id=self.inputs.run_id, schema_id=str(self.inputs.schema_id), team_id=self.inputs.team_id
)

# Delete local state from the file system
pipeline.drop()

Expand Down
5 changes: 4 additions & 1 deletion posthog/temporal/tests/data_imports/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,14 @@ async def _run(

await _execute_run(workflow_id, inputs, mock_data_response)

run = await get_latest_run_if_exists(team_id=team.pk, pipeline_id=source.pk)
run: ExternalDataJob = await get_latest_run_if_exists(team_id=team.pk, pipeline_id=source.pk)

assert run is not None
assert run.status == ExternalDataJob.Status.COMPLETED

await sync_to_async(schema.refresh_from_db)()
assert schema.last_synced_at == run.created_at

res = await sync_to_async(execute_hogql_query)(f"SELECT * FROM {table_name}", team)
assert len(res.results) == 1

Expand Down
9 changes: 8 additions & 1 deletion posthog/warehouse/data_load/validate_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ def dlt_to_hogql_type(dlt_type: TDataType | None) -> str:
return hogql_type.__name__


async def update_last_synced_at(job_id: str, schema_id: str, team_id: int) -> None:
job: ExternalDataJob = await get_external_data_job(job_id=job_id)
schema = await aget_schema_by_id(schema_id=schema_id, team_id=team_id)
schema.last_synced_at = job.created_at

await asave_external_data_schema(schema)


async def validate_schema_and_update_table(
run_id: str,
team_id: int,
Expand Down Expand Up @@ -185,7 +193,6 @@ async def validate_schema_and_update_table(

if schema_model:
schema_model.table = table_created
schema_model.last_synced_at = job.created_at
await asave_external_data_schema(schema_model)

except ServerException as err:
Expand Down

0 comments on commit c2f76bd

Please sign in to comment.