From bc553938e3c68830d82321bd0ac16bb09f81aa6d Mon Sep 17 00:00:00 2001 From: Brett Hoerner Date: Thu, 9 Nov 2023 09:22:25 -0700 Subject: [PATCH] chore(batch-exports): add some Prometheus metrics for batch exports (#18467) * chore(batch-exports): add some Prometheus metrics for batch exports * make metrics port an option/setting * get destination from workflow_type --- bin/temporal-django-worker | 2 +- posthog/batch_exports/service.py | 2 +- .../commands/start_temporal_worker.py | 14 ++++++- posthog/settings/__init__.py | 1 + posthog/settings/metrics.py | 3 ++ .../workflows/backfill_batch_export.py | 12 +++--- posthog/temporal/workflows/batch_exports.py | 21 ++++++++-- .../workflows/bigquery_batch_export.py | 32 +++++++-------- .../workflows/postgres_batch_export.py | 41 +++++++++---------- .../workflows/redshift_batch_export.py | 12 +++++- posthog/temporal/workflows/s3_batch_export.py | 41 +++++++++---------- .../workflows/snowflake_batch_export.py | 18 ++++++-- 12 files changed, 119 insertions(+), 80 deletions(-) create mode 100644 posthog/settings/metrics.py diff --git a/bin/temporal-django-worker b/bin/temporal-django-worker index 4f7462879e285..ab10a959a8709 100755 --- a/bin/temporal-django-worker +++ b/bin/temporal-django-worker @@ -4,6 +4,6 @@ set -e trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM EXIT -python3 manage.py start_temporal_worker +python3 manage.py start_temporal_worker "$@" wait diff --git a/posthog/batch_exports/service.py b/posthog/batch_exports/service.py index 547255fabd9f1..6abe24f075349 100644 --- a/posthog/batch_exports/service.py +++ b/posthog/batch_exports/service.py @@ -199,7 +199,7 @@ def unpause_batch_export( note: str | None = None, backfill: bool = False, ) -> None: - """Pause this BatchExport. + """Unpause this BatchExport. We pass the call to the underlying Temporal Schedule. Additionally, we can trigger a backfill to backfill runs missed while paused. diff --git a/posthog/management/commands/start_temporal_worker.py b/posthog/management/commands/start_temporal_worker.py index 6e10a28b31b7d..17299f64fe920 100644 --- a/posthog/management/commands/start_temporal_worker.py +++ b/posthog/management/commands/start_temporal_worker.py @@ -7,6 +7,8 @@ from django.core.management.base import BaseCommand from django.conf import settings +from prometheus_client import start_http_server + from posthog.temporal.worker import start_worker @@ -15,12 +17,12 @@ class Command(BaseCommand): def add_arguments(self, parser): parser.add_argument( - "--temporal_host", + "--temporal-host", default=settings.TEMPORAL_HOST, help="Hostname for Temporal Scheduler", ) parser.add_argument( - "--temporal_port", + "--temporal-port", default=settings.TEMPORAL_PORT, help="Port for Temporal Scheduler", ) @@ -49,6 +51,11 @@ def add_arguments(self, parser): default=settings.TEMPORAL_CLIENT_KEY, help="Optional client key", ) + parser.add_argument( + "--metrics-port", + default=settings.PROMETHEUS_METRICS_EXPORT_PORT, + help="Port to export Prometheus metrics on", + ) def handle(self, *args, **options): temporal_host = options["temporal_host"] @@ -63,6 +70,9 @@ def handle(self, *args, **options): options["client_key"] = "--SECRET--" logging.info(f"Starting Temporal Worker with options: {options}") + metrics_port = int(options["metrics_port"]) + start_http_server(port=metrics_port) + asyncio.run( start_worker( temporal_host, diff --git a/posthog/settings/__init__.py b/posthog/settings/__init__.py index 58010175ea580..d1e13759ee074 100644 --- a/posthog/settings/__init__.py +++ b/posthog/settings/__init__.py @@ -30,6 +30,7 @@ from posthog.settings.ingestion import * from posthog.settings.feature_flags import * from posthog.settings.geoip import * +from posthog.settings.metrics import * from posthog.settings.schedules import * from posthog.settings.sentry import * from posthog.settings.shell_plus import * diff --git a/posthog/settings/metrics.py b/posthog/settings/metrics.py new file mode 100644 index 0000000000000..2eea182492736 --- /dev/null +++ b/posthog/settings/metrics.py @@ -0,0 +1,3 @@ +import os + +PROMETHEUS_METRICS_EXPORT_PORT = os.getenv("PROMETHEUS_METRICS_EXPORT_PORT", "8001") diff --git a/posthog/temporal/workflows/backfill_batch_export.py b/posthog/temporal/workflows/backfill_batch_export.py index 17f55ae1d8b54..5e510b07b01be 100644 --- a/posthog/temporal/workflows/backfill_batch_export.py +++ b/posthog/temporal/workflows/backfill_batch_export.py @@ -36,14 +36,14 @@ class HeartbeatDetails(typing.NamedTuple): def make_activity_heartbeat_while_running( self, function_to_run: collections.abc.Callable, heartbeat_every: dt.timedelta ) -> collections.abc.Callable[..., collections.abc.Coroutine]: - """Return a callable that returns a coroutine that hearbeats with these HeartbeatDetails. + """Return a callable that returns a coroutine that heartbeats with these HeartbeatDetails. - The returned callable wraps 'function_to_run' while heartbeatting 'factor' times for every - 'heartbeat_timeout'. + The returned callable wraps 'function_to_run' while heartbeating every 'heartbeat_every' + seconds. """ async def heartbeat() -> None: - """Heartbeat factor times every heartbeat_timeout.""" + """Heartbeat every 'heartbeat_every' seconds.""" while True: await asyncio.sleep(heartbeat_every.total_seconds()) temporalio.activity.heartbeat(self) @@ -99,8 +99,8 @@ class BackfillScheduleInputs: async def backfill_schedule(inputs: BackfillScheduleInputs) -> None: """Temporal Activity to backfill a Temporal Schedule. - The backfill is broken up into batches of inputs.buffer_limit size. After a backfill batch is requested, - we wait for it to be done before continuing with the next. + The backfill is broken up into batches of inputs.buffer_limit size. After a backfill batch is + requested, we wait for it to be done before continuing with the next. This activity heartbeats while waiting to allow cancelling an ongoing backfill. """ diff --git a/posthog/temporal/workflows/batch_exports.py b/posthog/temporal/workflows/batch_exports.py index 063069388801f..6efba93df5373 100644 --- a/posthog/temporal/workflows/batch_exports.py +++ b/posthog/temporal/workflows/batch_exports.py @@ -14,6 +14,7 @@ import brotli from asgiref.sync import sync_to_async +from prometheus_client import Counter from temporalio import activity, exceptions, workflow from temporalio.common import RetryPolicy @@ -47,6 +48,15 @@ """ ) +ROWS_EXPORTED = Counter("batch_export_rows_exported", "Number of rows exported.", labelnames=("destination",)) +BYTES_EXPORTED = Counter("batch_export_bytes_exported", "Number of bytes exported.", labelnames=("destination",)) +EXPORT_STARTED = Counter("batch_export_started", "Number of batch exports started.", labelnames=("destination",)) +EXPORT_FINISHED = Counter( + "batch_export_finished", + "Number of batch exports finished, for any reason (including failure).", + labelnames=("destination", "status"), +) + async def get_rows_count( client, @@ -246,7 +256,7 @@ def get_data_interval(interval: str, data_interval_end: str | None) -> tuple[dt. msg = ( "Expected 'TemporalScheduledStartTime' of type 'list[str]' or 'list[datetime], found 'NoneType'." "This should be set by the Temporal Schedule unless triggering workflow manually." - "In the latter case, ensure 'S3BatchExportInputs.data_interval_end' is set." + "In the latter case, ensure '{Type}BatchExportInputs.data_interval_end' is set." ) raise TypeError(msg) @@ -260,7 +270,7 @@ def get_data_interval(interval: str, data_interval_end: str | None) -> tuple[dt. else: msg = ( - f"Expected search attribute to be of type 'str' or 'datetime' found '{data_interval_end_search_attr[0]}' " + f"Expected search attribute to be of type 'str' or 'datetime' but found '{data_interval_end_search_attr[0]}' " f"of type '{type(data_interval_end_search_attr[0])}'." ) raise TypeError(msg) @@ -670,8 +680,8 @@ class CreateBatchExportBackfillInputs: async def create_batch_export_backfill_model(inputs: CreateBatchExportBackfillInputs) -> str: """Activity that creates an BatchExportBackfill. - Intended to be used in all export workflows, usually at the start, to create a model - instance to represent them in our database. + Intended to be used in all batch export backfill workflows, usually at the start, to create a + model instance to represent them in our database. """ logger = get_batch_exports_logger(inputs=inputs) logger.info(f"Creating BatchExportBackfill model instance in team {inputs.team_id}.") @@ -735,6 +745,7 @@ async def execute_batch_export_insert_activity( maximum_retry_interval_seconds: Maximum interval in seconds between retries. """ logger = get_batch_exports_logger(inputs=inputs) + destination = workflow.info().workflow_type.lower() retry_policy = RetryPolicy( initial_interval=dt.timedelta(seconds=initial_retry_interval_seconds), @@ -743,6 +754,7 @@ async def execute_batch_export_insert_activity( non_retryable_error_types=non_retryable_error_types, ) try: + EXPORT_STARTED.labels(destination=destination).inc() await workflow.execute_activity( activity, inputs, @@ -773,6 +785,7 @@ async def execute_batch_export_insert_activity( ) finally: + EXPORT_FINISHED.labels(destination=destination, status=update_inputs.status.lower()).inc() await workflow.execute_activity( update_export_run_status, update_inputs, diff --git a/posthog/temporal/workflows/bigquery_batch_export.py b/posthog/temporal/workflows/bigquery_batch_export.py index d9557d31bb07b..531b3c7445baa 100644 --- a/posthog/temporal/workflows/bigquery_batch_export.py +++ b/posthog/temporal/workflows/bigquery_batch_export.py @@ -21,6 +21,8 @@ get_data_interval, get_results_iterator, get_rows_count, + ROWS_EXPORTED, + BYTES_EXPORTED, ) from posthog.temporal.workflows.clickhouse import get_client @@ -162,6 +164,17 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs): ) with BatchExportTemporaryFile() as jsonl_file: + + def flush_to_bigquery(): + logger.info( + "Copying %s records of size %s bytes to BigQuery", + jsonl_file.records_since_last_reset, + jsonl_file.bytes_since_last_reset, + ) + load_jsonl_file_to_bigquery_table(jsonl_file, bigquery_table, table_schema, bq_client) + ROWS_EXPORTED.labels(destination="bigquery").inc(jsonl_file.records_since_last_reset) + BYTES_EXPORTED.labels(destination="bigquery").inc(jsonl_file.bytes_since_last_reset) + for result in results_iterator: row = { field.name: json.dumps(result[field.name]) if field.name in json_columns else result[field.name] @@ -173,26 +186,11 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs): jsonl_file.write_records_to_jsonl([row]) if jsonl_file.tell() > settings.BATCH_EXPORT_BIGQUERY_UPLOAD_CHUNK_SIZE_BYTES: - logger.info( - "Copying %s records of size %s bytes to BigQuery", - jsonl_file.records_since_last_reset, - jsonl_file.bytes_since_last_reset, - ) - load_jsonl_file_to_bigquery_table( - jsonl_file, - bigquery_table, - table_schema, - bq_client, - ) + flush_to_bigquery() jsonl_file.reset() if jsonl_file.tell() > 0: - logger.info( - "Copying %s records of size %s bytes to BigQuery", - jsonl_file.records_since_last_reset, - jsonl_file.bytes_since_last_reset, - ) - load_jsonl_file_to_bigquery_table(jsonl_file, bigquery_table, table_schema, bq_client) + flush_to_bigquery() @workflow.defn(name="bigquery-export") diff --git a/posthog/temporal/workflows/postgres_batch_export.py b/posthog/temporal/workflows/postgres_batch_export.py index 8b66cfb0abb2c..a7ff877e0f1a7 100644 --- a/posthog/temporal/workflows/postgres_batch_export.py +++ b/posthog/temporal/workflows/postgres_batch_export.py @@ -23,6 +23,8 @@ get_data_interval, get_results_iterator, get_rows_count, + ROWS_EXPORTED, + BYTES_EXPORTED, ) from posthog.temporal.workflows.clickhouse import get_client @@ -218,29 +220,8 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs): with BatchExportTemporaryFile() as pg_file: with postgres_connection(inputs) as connection: - for result in results_iterator: - row = { - key: json.dumps(result[key]) if key in json_columns and result[key] is not None else result[key] - for key in schema_columns - } - pg_file.write_records_to_tsv([row], fieldnames=schema_columns) - if pg_file.tell() > settings.BATCH_EXPORT_POSTGRES_UPLOAD_CHUNK_SIZE_BYTES: - logger.info( - "Copying %s records of size %s bytes to Postgres", - pg_file.records_since_last_reset, - pg_file.bytes_since_last_reset, - ) - copy_tsv_to_postgres( - pg_file, - connection, - inputs.schema, - inputs.table_name, - schema_columns, - ) - pg_file.reset() - - if pg_file.tell() > 0: + def flush_to_postgres(): logger.info( "Copying %s records of size %s bytes to Postgres", pg_file.records_since_last_reset, @@ -253,6 +234,22 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs): inputs.table_name, schema_columns, ) + ROWS_EXPORTED.labels(destination="postgres").inc(pg_file.records_since_last_reset) + BYTES_EXPORTED.labels(destination="postgres").inc(pg_file.bytes_since_last_reset) + + for result in results_iterator: + row = { + key: json.dumps(result[key]) if key in json_columns and result[key] is not None else result[key] + for key in schema_columns + } + pg_file.write_records_to_tsv([row], fieldnames=schema_columns) + + if pg_file.tell() > settings.BATCH_EXPORT_POSTGRES_UPLOAD_CHUNK_SIZE_BYTES: + flush_to_postgres() + pg_file.reset() + + if pg_file.tell() > 0: + flush_to_postgres() @workflow.defn(name="postgres-export") diff --git a/posthog/temporal/workflows/redshift_batch_export.py b/posthog/temporal/workflows/redshift_batch_export.py index 74c1fb52662cc..572ce243196d8 100644 --- a/posthog/temporal/workflows/redshift_batch_export.py +++ b/posthog/temporal/workflows/redshift_batch_export.py @@ -22,6 +22,7 @@ get_data_interval, get_results_iterator, get_rows_count, + ROWS_EXPORTED, ) from posthog.temporal.workflows.clickhouse import get_client from posthog.temporal.workflows.postgres_batch_export import ( @@ -69,17 +70,24 @@ def insert_records_to_redshift( ) template = sql.SQL("({})").format(sql.SQL(", ").join(map(sql.Placeholder, columns))) + def flush_to_redshift(): + psycopg2.extras.execute_values(cursor, query, batch, template) + ROWS_EXPORTED.labels(destination="redshift").inc(len(batch)) + # It would be nice to record BYTES_EXPORTED for Redshift, but it's not worth estimating + # the byte size of each batch the way things are currently written. We can revisit this + # in the future if we decide it's useful enough. + for record in records: batch.append(record) if len(batch) < batch_size: continue - psycopg2.extras.execute_values(cursor, query, batch, template) + flush_to_redshift() batch = [] if len(batch) > 0: - psycopg2.extras.execute_values(cursor, query, batch, template) + flush_to_redshift() @dataclass diff --git a/posthog/temporal/workflows/s3_batch_export.py b/posthog/temporal/workflows/s3_batch_export.py index 6a81aeeb93a77..0d06c8abde9b4 100644 --- a/posthog/temporal/workflows/s3_batch_export.py +++ b/posthog/temporal/workflows/s3_batch_export.py @@ -24,6 +24,8 @@ get_data_interval, get_results_iterator, get_rows_count, + ROWS_EXPORTED, + BYTES_EXPORTED, ) from posthog.temporal.workflows.clickhouse import get_client @@ -425,6 +427,22 @@ async def worker_shutdown_handler(): async with s3_upload as s3_upload: with BatchExportTemporaryFile(compression=inputs.compression) as local_results_file: + + async def flush_to_s3(last_uploaded_part_timestamp: str, last=False): + logger.info( + "Uploading %spart %s containing %s records with size %s bytes to S3", + "last " if last else "", + s3_upload.part_number + 1, + local_results_file.records_since_last_reset, + local_results_file.bytes_since_last_reset, + ) + + await s3_upload.upload_part(local_results_file) + ROWS_EXPORTED.labels(destination="s3").inc(local_results_file.records_since_last_reset) + BYTES_EXPORTED.labels(destination="s3").inc(local_results_file.bytes_since_last_reset) + + activity.heartbeat(last_uploaded_part_timestamp, s3_upload.to_state()) + for result in results_iterator: record = { "created_at": result["created_at"], @@ -442,32 +460,13 @@ async def worker_shutdown_handler(): local_results_file.write_records_to_jsonl([record]) if local_results_file.tell() > settings.BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES: - logger.info( - "Uploading part %s containing %s records with size %s bytes to S3", - s3_upload.part_number + 1, - local_results_file.records_since_last_reset, - local_results_file.bytes_since_last_reset, - ) - - await s3_upload.upload_part(local_results_file) - last_uploaded_part_timestamp = result["inserted_at"] - activity.heartbeat(last_uploaded_part_timestamp, s3_upload.to_state()) - + await flush_to_s3(last_uploaded_part_timestamp) local_results_file.reset() if local_results_file.tell() > 0 and result is not None: - logger.info( - "Uploading last part %s containing %s records with size %s bytes to S3", - s3_upload.part_number + 1, - local_results_file.records_since_last_reset, - local_results_file.bytes_since_last_reset, - ) - - await s3_upload.upload_part(local_results_file) - last_uploaded_part_timestamp = result["inserted_at"] - activity.heartbeat(last_uploaded_part_timestamp, s3_upload.to_state()) + await flush_to_s3(last_uploaded_part_timestamp, last=True) await s3_upload.complete() diff --git a/posthog/temporal/workflows/snowflake_batch_export.py b/posthog/temporal/workflows/snowflake_batch_export.py index ec556e527192a..800991a42a4cf 100644 --- a/posthog/temporal/workflows/snowflake_batch_export.py +++ b/posthog/temporal/workflows/snowflake_batch_export.py @@ -20,6 +20,8 @@ get_data_interval, get_results_iterator, get_rows_count, + ROWS_EXPORTED, + BYTES_EXPORTED, ) from posthog.temporal.workflows.clickhouse import get_client @@ -174,6 +176,14 @@ async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs): ) result = None local_results_file = tempfile.NamedTemporaryFile(suffix=".jsonl") + rows_in_file = 0 + + def flush_to_snowflake(lrf: tempfile._TemporaryFileWrapper, rows_in_file: int): + lrf.flush() + put_file_to_snowflake_table(cursor, lrf.name, inputs.table_name) + ROWS_EXPORTED.labels(destination="snowflake").inc(rows_in_file) + BYTES_EXPORTED.labels(destination="snowflake").inc(lrf.tell()) + try: while True: try: @@ -212,6 +222,7 @@ async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs): # Write the results to a local file local_results_file.write(json.dumps(result).encode("utf-8")) local_results_file.write("\n".encode("utf-8")) + rows_in_file += 1 # Write results to Snowflake when the file reaches 50MB and # reset the file, or if there is nothing else to write. @@ -222,16 +233,15 @@ async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs): logger.info("Uploading to Snowflake") # Flush the file to make sure everything is written - local_results_file.flush() - put_file_to_snowflake_table(cursor, local_results_file.name, inputs.table_name) + flush_to_snowflake(local_results_file, rows_in_file) # Delete the temporary file and create a new one local_results_file.close() local_results_file = tempfile.NamedTemporaryFile(suffix=".jsonl") + rows_in_file = 0 # Flush the file to make sure everything is written - local_results_file.flush() - put_file_to_snowflake_table(cursor, local_results_file.name, inputs.table_name) + flush_to_snowflake(local_results_file, rows_in_file) # We don't need the file anymore, close (and delete) it. local_results_file.close()