From ad84119cad05cf5cb85b75e575bf316dd0d318f2 Mon Sep 17 00:00:00 2001 From: eric Date: Tue, 5 Dec 2023 15:31:03 -0500 Subject: [PATCH] simple batching for now --- .../pipelines/stripe/stripe_pipeline.py | 42 ++++++++++++++----- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/posthog/temporal/data_imports/pipelines/stripe/stripe_pipeline.py b/posthog/temporal/data_imports/pipelines/stripe/stripe_pipeline.py index 8f3c1defd56d0..1bc8a504841d4 100644 --- a/posthog/temporal/data_imports/pipelines/stripe/stripe_pipeline.py +++ b/posthog/temporal/data_imports/pipelines/stripe/stripe_pipeline.py @@ -85,25 +85,45 @@ async def worker_shutdown_handler(): asyncio.create_task(worker_shutdown_handler()) + def flush_to_s3(records, write_to_endpoint): + # init pipeline and run data import + try: + logger.debug( + "Loading %s records", + len(records), + ) + pipeline = create_pipeline(inputs) + len(records) + pipeline.run(records, table_name=write_to_endpoint.lower(), loader_file_format="parquet") + pipeline.drop() + pipeline.deactivate() + + except PipelineStepFailed: + logger.error(f"Data import failed for endpoint {endpoint} with cursor {cursor}") + raise + # clear everything from pipeline + for endpoint in ordered_endpoints: if should_resume and details and endpoint == details.endpoint: starting_after = details.cursor else: starting_after = None + data_to_import = [] async for item, cursor in stripe_pagination(inputs.stripe_secret_key, endpoint, starting_after=starting_after): - try: - # init pipeline and run data import - pipeline = create_pipeline(inputs) - pipeline.run(item, table_name=endpoint.lower(), loader_file_format="parquet") - - # clear everything from pipeline - pipeline.drop() - pipeline.deactivate() + data_to_import.extend(item) + + if len(data_to_import) >= 1000: + flush_to_s3(data_to_import, endpoint) + activity.heartbeat(endpoint, cursor) - except PipelineStepFailed: - logger.error(f"Data import failed for endpoint {endpoint} with cursor {cursor}") - raise + data_to_import = [] + + if len(data_to_import) > 0: + flush_to_s3(data_to_import, endpoint) + activity.heartbeat(endpoint, cursor) + + data_to_import = [] PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING = {ExternalDataSource.Type.STRIPE: ENDPOINTS}