Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(data-warehouse): Update rows synced for all integrations #21077

Merged
merged 3 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion posthog/temporal/data_imports/external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

# TODO: remove dependency
from posthog.temporal.batch_exports.base import PostHogWorkflow
from posthog.temporal.data_imports.pipelines.helpers import aupdate_job_count
from posthog.temporal.data_imports.pipelines.zendesk.credentials import ZendeskCredentialsToken
from posthog.warehouse.data_load.source_templates import create_warehouse_templates_for_source

Expand Down Expand Up @@ -251,7 +252,8 @@ async def heartbeat() -> None:
heartbeat_task = asyncio.create_task(heartbeat())

try:
await DataImportPipeline(job_inputs, source, logger).run()
total_rows_synced = await DataImportPipeline(job_inputs, source, logger).run()
await aupdate_job_count(inputs.run_id, inputs.team_id, total_rows_synced)
finally:
heartbeat_task.cancel()
await asyncio.wait([heartbeat_task])
Expand Down
3 changes: 1 addition & 2 deletions posthog/temporal/data_imports/pipelines/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ async def check_limit(
model = await aget_external_data_job(team_id, job_id)

if new_count >= CHUNK_SIZE:
await aupdate_job_count(job_id, team_id, new_count)
new_count = 0

status = model.status
Expand All @@ -27,5 +26,5 @@ def aget_external_data_job(team_id, job_id):


@database_sync_to_async
def aupdate_job_count(job_id, team_id, count):
def aupdate_job_count(job_id: str, team_id: int, count: int):
ExternalDataJob.objects.filter(id=job_id, team_id=team_id).update(rows_synced=F("rows_synced") + count)
15 changes: 11 additions & 4 deletions posthog/temporal/data_imports/pipelines/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,24 @@ def _get_schemas(self):

return self.inputs.schemas

def _run(self):
def _run(self) -> int:
pipeline = self._create_pipeline()
pipeline.run(self.source, loader_file_format=self.loader_file_format)

async def run(self) -> None:
row_counts = pipeline.last_trace.last_normalize_info.row_counts
# Remove any DLT tables from the counts
filtered_rows = filter(lambda pair: not pair[0].startswith("_dlt"), row_counts.items())
total_rows_synced = sum(map(lambda pair: pair[1], filtered_rows))

return total_rows_synced

async def run(self) -> int:
schemas = self._get_schemas()
if not schemas:
return
return 0

try:
await asyncio.to_thread(self._run)
return await asyncio.to_thread(self._run)
except PipelineStepFailed:
self.logger.error(f"Data import failed for endpoint")
raise
4 changes: 1 addition & 3 deletions posthog/temporal/data_imports/pipelines/stripe/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dlt.sources import DltResource
from pendulum import DateTime
from asgiref.sync import sync_to_async
from posthog.temporal.data_imports.pipelines.helpers import check_limit, aupdate_job_count
from posthog.temporal.data_imports.pipelines.helpers import check_limit
from posthog.warehouse.models import ExternalDataJob

stripe.api_version = "2022-11-15"
Expand Down Expand Up @@ -96,8 +96,6 @@ async def stripe_pagination(
if not response["has_more"] or status == ExternalDataJob.Status.CANCELLED:
break

await aupdate_job_count(job_id, team_id, count)


@dlt.source(max_table_nesting=0)
def stripe_source(
Expand Down
Loading