Skip to content

Commit

Permalink
chore(data-warehouse): remove heartbeating and resume logic for now (#…
Browse files Browse the repository at this point in the history
…19293)

* remove haerbeating and resume logic for now

* dont flatten metadata

* reduce number of retries

* Update query snapshots

* Update query snapshots

---------

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
EDsCODE and github-actions[bot] authored Dec 13, 2023
1 parent 255f6b1 commit 7c29838
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 32 deletions.
5 changes: 2 additions & 3 deletions posthog/temporal/data_imports/external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,8 @@ async def run(self, inputs: ExternalDataWorkflowInputs):
await workflow.execute_activity(
run_external_data_job,
job_inputs,
start_to_close_timeout=dt.timedelta(minutes=120),
retry_policy=RetryPolicy(maximum_attempts=10),
heartbeat_timeout=dt.timedelta(seconds=60),
start_to_close_timeout=dt.timedelta(minutes=90),
retry_policy=RetryPolicy(maximum_attempts=5),
)

# check schema first
Expand Down
6 changes: 6 additions & 0 deletions posthog/temporal/data_imports/pipelines/stripe/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ def stripe_source(
stripe_pagination,
name=endpoint,
write_disposition="append",
columns={
"metadata": {
"data_type": "complex",
"nullable": True,
}
},
)(
api_key=api_key,
endpoint=endpoint,
Expand Down
31 changes: 2 additions & 29 deletions posthog/temporal/data_imports/pipelines/stripe/stripe_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,6 @@
from posthog.temporal.common.logger import bind_temporal_worker_logger

import os
from temporalio import activity
from posthog.temporal.common.utils import (
DataImportHeartbeatDetails,
should_resume_from_activity_heartbeat,
)
import asyncio


@dataclass
Expand Down Expand Up @@ -73,34 +67,13 @@ async def run_stripe_pipeline(inputs: StripeJobInputs) -> None:
logger.info(f"No schemas found for source id {inputs.source_id}")
return

should_resume, details = await should_resume_from_activity_heartbeat(activity, DataImportHeartbeatDetails, logger)

if should_resume and details:
schemas = schemas[schemas.index(details.endpoint) :]
logger.info(f"Resuming from {details.endpoint} with cursor {details.cursor}")

endpoint = schemas[0]
cursor = None

async def worker_shutdown_handler():
"""Handle the Worker shutting down by heart-beating our latest status."""
await activity.wait_for_worker_shutdown()
activity.heartbeat(endpoint, cursor)

asyncio.create_task(worker_shutdown_handler())

for endpoint in schemas:
if should_resume and details and endpoint == details.endpoint:
starting_after = details.cursor
else:
starting_after = None

pipeline = create_pipeline(inputs)
try:
source = stripe_source(inputs.stripe_secret_key, endpoint, starting_after=starting_after)
source = stripe_source(inputs.stripe_secret_key, endpoint)
pipeline.run(source, table_name=endpoint.lower(), loader_file_format="parquet")
except PipelineStepFailed:
logger.error(f"Data import failed for endpoint {endpoint} with cursor {cursor}")
logger.error(f"Data import failed for endpoint {endpoint}")
raise


Expand Down

0 comments on commit 7c29838

Please sign in to comment.