From dae05d8709874ff7a667aec4cadfcc0bd7942dde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Wed, 11 Oct 2023 10:43:14 +0200 Subject: [PATCH 01/23] feat(batch-exports): Add backfill model and service support --- .../0354_add_batch_export_backfill_model.py | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 posthog/migrations/0354_add_batch_export_backfill_model.py diff --git a/posthog/migrations/0354_add_batch_export_backfill_model.py b/posthog/migrations/0354_add_batch_export_backfill_model.py new file mode 100644 index 0000000000000..1428af2d258e1 --- /dev/null +++ b/posthog/migrations/0354_add_batch_export_backfill_model.py @@ -0,0 +1,83 @@ +# Generated by Django 3.2.19 on 2023-10-11 08:17 + +from django.db import migrations, models +import django.db.models.deletion +import posthog.models.utils + + +class Migration(migrations.Migration): + + dependencies = [ + ("posthog", "0353_add_5_minute_interval_to_batch_exports"), + ] + + operations = [ + migrations.CreateModel( + name="BatchExportBackfill", + fields=[ + ( + "id", + models.UUIDField( + default=posthog.models.utils.UUIDT, editable=False, primary_key=True, serialize=False + ), + ), + ("start_at", models.DateTimeField(help_text="The start of the data interval.")), + ("end_at", models.DateTimeField(help_text="The end of the data interval.")), + ( + "status", + models.CharField( + choices=[ + ("Cancelled", "Cancelled"), + ("Completed", "Completed"), + ("ContinuedAsNew", "Continuedasnew"), + ("Failed", "Failed"), + ("Terminated", "Terminated"), + ("TimedOut", "Timedout"), + ("Running", "Running"), + ("Starting", "Starting"), + ], + help_text="The status of this backfill.", + max_length=64, + ), + ), + ( + "created_at", + models.DateTimeField( + auto_now_add=True, help_text="The timestamp at which this BatchExportBackfill was created." + ), + ), + ( + "finished_at", + models.DateTimeField( + help_text="The timestamp at which this BatchExportBackfill finished, successfully or not.", + null=True, + ), + ), + ( + "last_updated_at", + models.DateTimeField( + auto_now=True, help_text="The timestamp at which this BatchExportBackfill was last updated." + ), + ), + ( + "batch_export", + models.ForeignKey( + help_text="The BatchExport this backfill belongs to.", + on_delete=django.db.models.deletion.CASCADE, + to="posthog.batchexport", + ), + ), + ( + "team", + models.ForeignKey( + help_text="The team this belongs to.", + on_delete=django.db.models.deletion.CASCADE, + to="posthog.team", + ), + ), + ], + options={ + "abstract": False, + }, + ), + ] From b67e94fd639c23ad42df3e167c59e97ecb5bfbf3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Fri, 13 Oct 2023 11:59:59 +0200 Subject: [PATCH 02/23] test(batch-exports-backfills): Add Workflow test --- .../0354_add_batch_export_backfill_model.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/posthog/migrations/0354_add_batch_export_backfill_model.py b/posthog/migrations/0354_add_batch_export_backfill_model.py index 1428af2d258e1..6f7d2c3e99f3f 100644 --- a/posthog/migrations/0354_add_batch_export_backfill_model.py +++ b/posthog/migrations/0354_add_batch_export_backfill_model.py @@ -1,4 +1,4 @@ -# Generated by Django 3.2.19 on 2023-10-11 08:17 +# Generated by Django 3.2.19 on 2023-10-13 09:13 from django.db import migrations, models import django.db.models.deletion @@ -12,6 +12,21 @@ class Migration(migrations.Migration): ] operations = [ + migrations.AlterField( + model_name="batchexportdestination", + name="type", + field=models.CharField( + choices=[ + ("S3", "S3"), + ("Snowflake", "Snowflake"), + ("Postgres", "Postgres"), + ("BigQuery", "Bigquery"), + ("NoOp", "Noop"), + ], + help_text="A choice of supported BatchExportDestination types.", + max_length=64, + ), + ), migrations.CreateModel( name="BatchExportBackfill", fields=[ From e2340689015d2472e743db6634122cc4c5c8db9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 17 Oct 2023 11:29:35 +0200 Subject: [PATCH 03/23] chore(batch-exports-backfill): Bump migration --- .../0354_add_batch_export_backfill_model.py | 98 ------------------- 1 file changed, 98 deletions(-) delete mode 100644 posthog/migrations/0354_add_batch_export_backfill_model.py diff --git a/posthog/migrations/0354_add_batch_export_backfill_model.py b/posthog/migrations/0354_add_batch_export_backfill_model.py deleted file mode 100644 index 6f7d2c3e99f3f..0000000000000 --- a/posthog/migrations/0354_add_batch_export_backfill_model.py +++ /dev/null @@ -1,98 +0,0 @@ -# Generated by Django 3.2.19 on 2023-10-13 09:13 - -from django.db import migrations, models -import django.db.models.deletion -import posthog.models.utils - - -class Migration(migrations.Migration): - - dependencies = [ - ("posthog", "0353_add_5_minute_interval_to_batch_exports"), - ] - - operations = [ - migrations.AlterField( - model_name="batchexportdestination", - name="type", - field=models.CharField( - choices=[ - ("S3", "S3"), - ("Snowflake", "Snowflake"), - ("Postgres", "Postgres"), - ("BigQuery", "Bigquery"), - ("NoOp", "Noop"), - ], - help_text="A choice of supported BatchExportDestination types.", - max_length=64, - ), - ), - migrations.CreateModel( - name="BatchExportBackfill", - fields=[ - ( - "id", - models.UUIDField( - default=posthog.models.utils.UUIDT, editable=False, primary_key=True, serialize=False - ), - ), - ("start_at", models.DateTimeField(help_text="The start of the data interval.")), - ("end_at", models.DateTimeField(help_text="The end of the data interval.")), - ( - "status", - models.CharField( - choices=[ - ("Cancelled", "Cancelled"), - ("Completed", "Completed"), - ("ContinuedAsNew", "Continuedasnew"), - ("Failed", "Failed"), - ("Terminated", "Terminated"), - ("TimedOut", "Timedout"), - ("Running", "Running"), - ("Starting", "Starting"), - ], - help_text="The status of this backfill.", - max_length=64, - ), - ), - ( - "created_at", - models.DateTimeField( - auto_now_add=True, help_text="The timestamp at which this BatchExportBackfill was created." - ), - ), - ( - "finished_at", - models.DateTimeField( - help_text="The timestamp at which this BatchExportBackfill finished, successfully or not.", - null=True, - ), - ), - ( - "last_updated_at", - models.DateTimeField( - auto_now=True, help_text="The timestamp at which this BatchExportBackfill was last updated." - ), - ), - ( - "batch_export", - models.ForeignKey( - help_text="The BatchExport this backfill belongs to.", - on_delete=django.db.models.deletion.CASCADE, - to="posthog.batchexport", - ), - ), - ( - "team", - models.ForeignKey( - help_text="The team this belongs to.", - on_delete=django.db.models.deletion.CASCADE, - to="posthog.team", - ), - ), - ], - options={ - "abstract": False, - }, - ), - ] From aaa40f1b61c601ed2679ab21de582301d6ac1ea5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Wed, 18 Oct 2023 13:35:13 +0200 Subject: [PATCH 04/23] feat(batch-exports): Add RedshiftBatchExportWorkflow --- posthog/temporal/workflows/postgres_batch_export.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/posthog/temporal/workflows/postgres_batch_export.py b/posthog/temporal/workflows/postgres_batch_export.py index 8b66cfb0abb2c..39fa473eea2b2 100644 --- a/posthog/temporal/workflows/postgres_batch_export.py +++ b/posthog/temporal/workflows/postgres_batch_export.py @@ -138,6 +138,19 @@ class PostgresInsertInputs: port: int = 5432 exclude_events: list[str] | None = None include_events: list[str] | None = None + fields: list[tuple[str, str]] = [ + ("uuid", "VARCHAR(200)"), + ("event", "VARCHAR(200)"), + ("properties", "JSONB"), + ("elements", "JSONB"), + ("set", "JSONB"), + ("set_once", "JSONB"), + ("distinct_id", "VARCHAR(200)"), + ("team_id", "INTEGER"), + ("ip", "VARCHAR(200)"), + ("site_url", "VARCHAR(200)"), + ("timestamp", "TIMESTAMP WITH TIME ZONE"), + ] @activity.defn From 33500ae3e296cca8bca908e4b5956394c1cdcc6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Wed, 18 Oct 2023 13:41:29 +0200 Subject: [PATCH 05/23] feat(batch-exports): Add Redshift to BatchExport destinations --- ...6_add_redshift_batch_export_destination.py | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 posthog/migrations/0356_add_redshift_batch_export_destination.py diff --git a/posthog/migrations/0356_add_redshift_batch_export_destination.py b/posthog/migrations/0356_add_redshift_batch_export_destination.py new file mode 100644 index 0000000000000..65b74ff8905db --- /dev/null +++ b/posthog/migrations/0356_add_redshift_batch_export_destination.py @@ -0,0 +1,29 @@ +# Generated by Django 3.2.19 on 2023-10-18 11:40 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("posthog", "0355_add_batch_export_backfill_model"), + ] + + operations = [ + migrations.AlterField( + model_name="batchexportdestination", + name="type", + field=models.CharField( + choices=[ + ("S3", "S3"), + ("Snowflake", "Snowflake"), + ("Postgres", "Postgres"), + ("Redshift", "Redshift"), + ("BigQuery", "Bigquery"), + ("NoOp", "Noop"), + ], + help_text="A choice of supported BatchExportDestination types.", + max_length=64, + ), + ), + ] From 574fffe2f0e241cb3dfec8dd22edfae14f334dce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Wed, 18 Oct 2023 14:22:50 +0200 Subject: [PATCH 06/23] feat(batch-exports): Support properties_data_type Redshift plugin parameter --- posthog/temporal/workflows/redshift_batch_export.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/posthog/temporal/workflows/redshift_batch_export.py b/posthog/temporal/workflows/redshift_batch_export.py index 74c1fb52662cc..2166c0cf034bb 100644 --- a/posthog/temporal/workflows/redshift_batch_export.py +++ b/posthog/temporal/workflows/redshift_batch_export.py @@ -241,6 +241,8 @@ async def run(self, inputs: RedshiftBatchExportInputs): update_inputs = UpdateBatchExportRunStatusInputs(id=run_id, status="Completed") + properties_type = "VARCHAR(65535)" if inputs.properties_data_type == "varchar" else "SUPER" + insert_inputs = RedshiftInsertInputs( team_id=inputs.team_id, user=inputs.user, From 6e4ed85a8a3b35f6e2b3037f3479d5c70747448b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Thu, 19 Oct 2023 12:34:26 +0200 Subject: [PATCH 07/23] refactor(batch-exports): Insert rows instead of using COPY --- posthog/temporal/workflows/postgres_batch_export.py | 13 ------------- posthog/temporal/workflows/redshift_batch_export.py | 2 -- 2 files changed, 15 deletions(-) diff --git a/posthog/temporal/workflows/postgres_batch_export.py b/posthog/temporal/workflows/postgres_batch_export.py index 39fa473eea2b2..8b66cfb0abb2c 100644 --- a/posthog/temporal/workflows/postgres_batch_export.py +++ b/posthog/temporal/workflows/postgres_batch_export.py @@ -138,19 +138,6 @@ class PostgresInsertInputs: port: int = 5432 exclude_events: list[str] | None = None include_events: list[str] | None = None - fields: list[tuple[str, str]] = [ - ("uuid", "VARCHAR(200)"), - ("event", "VARCHAR(200)"), - ("properties", "JSONB"), - ("elements", "JSONB"), - ("set", "JSONB"), - ("set_once", "JSONB"), - ("distinct_id", "VARCHAR(200)"), - ("team_id", "INTEGER"), - ("ip", "VARCHAR(200)"), - ("site_url", "VARCHAR(200)"), - ("timestamp", "TIMESTAMP WITH TIME ZONE"), - ] @activity.defn diff --git a/posthog/temporal/workflows/redshift_batch_export.py b/posthog/temporal/workflows/redshift_batch_export.py index 2166c0cf034bb..74c1fb52662cc 100644 --- a/posthog/temporal/workflows/redshift_batch_export.py +++ b/posthog/temporal/workflows/redshift_batch_export.py @@ -241,8 +241,6 @@ async def run(self, inputs: RedshiftBatchExportInputs): update_inputs = UpdateBatchExportRunStatusInputs(id=run_id, status="Completed") - properties_type = "VARCHAR(65535)" if inputs.properties_data_type == "varchar" else "SUPER" - insert_inputs = RedshiftInsertInputs( team_id=inputs.team_id, user=inputs.user, From ee51aebe5e8e43a3b752bd0a223848f3f0905e0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Wed, 1 Nov 2023 19:56:07 +0100 Subject: [PATCH 08/23] fix: Remove unused migration --- ...6_add_redshift_batch_export_destination.py | 29 ------------------- 1 file changed, 29 deletions(-) delete mode 100644 posthog/migrations/0356_add_redshift_batch_export_destination.py diff --git a/posthog/migrations/0356_add_redshift_batch_export_destination.py b/posthog/migrations/0356_add_redshift_batch_export_destination.py deleted file mode 100644 index 65b74ff8905db..0000000000000 --- a/posthog/migrations/0356_add_redshift_batch_export_destination.py +++ /dev/null @@ -1,29 +0,0 @@ -# Generated by Django 3.2.19 on 2023-10-18 11:40 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - - dependencies = [ - ("posthog", "0355_add_batch_export_backfill_model"), - ] - - operations = [ - migrations.AlterField( - model_name="batchexportdestination", - name="type", - field=models.CharField( - choices=[ - ("S3", "S3"), - ("Snowflake", "Snowflake"), - ("Postgres", "Postgres"), - ("Redshift", "Redshift"), - ("BigQuery", "Bigquery"), - ("NoOp", "Noop"), - ], - help_text="A choice of supported BatchExportDestination types.", - max_length=64, - ), - ), - ] From 4627485fe2597862c437e6280b8420cd1056b19d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Thu, 2 Nov 2023 02:04:41 +0100 Subject: [PATCH 09/23] chore: Require aiokafka --- requirements.in | 1 + requirements.txt | 8 +++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/requirements.in b/requirements.in index d5c9e9449a32b..c9ff24b4ec4f9 100644 --- a/requirements.in +++ b/requirements.in @@ -6,6 +6,7 @@ # aiohttp>=3.8.4 aioboto3==11.1 +aiokafka>=0.8 antlr4-python3-runtime==4.13.1 amqp==5.1.1 boto3==1.26.76 diff --git a/requirements.txt b/requirements.txt index 44eef0d14f9b6..955af5581ba93 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,6 +16,8 @@ aiohttp==3.8.5 # openai aioitertools==0.11.0 # via aiobotocore +aiokafka==0.8.1 + # via -r requirements.in aiosignal==1.2.0 # via aiohttp amqp==5.1.1 @@ -39,6 +41,7 @@ async-generator==1.10 async-timeout==4.0.2 # via # aiohttp + # aiokafka # redis attrs==21.4.0 # via @@ -280,7 +283,9 @@ jsonschema==4.4.0 kafka-helper==0.2 # via -r requirements.in kafka-python==2.0.2 - # via -r requirements.in + # via + # -r requirements.in + # aiokafka kombu==5.3.2 # via # -r requirements.in @@ -324,6 +329,7 @@ outcome==1.1.0 # via trio packaging==23.1 # via + # aiokafka # google-cloud-bigquery # prance # snowflake-connector-python From 5bd120106dbfcf9e3a707fafb3bac52f36131b43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Thu, 2 Nov 2023 02:05:13 +0100 Subject: [PATCH 10/23] feat: Implement new structlog batch exports logger --- posthog/temporal/workflows/logger.py | 224 +++++++++++++++++++++++++++ 1 file changed, 224 insertions(+) create mode 100644 posthog/temporal/workflows/logger.py diff --git a/posthog/temporal/workflows/logger.py b/posthog/temporal/workflows/logger.py new file mode 100644 index 0000000000000..58fd92fc68565 --- /dev/null +++ b/posthog/temporal/workflows/logger.py @@ -0,0 +1,224 @@ +import asyncio +import json +import logging + +import aiokafka +import structlog +import temporalio.activity +import temporalio.workflow +from django.conf import settings + +from posthog.kafka_client.topics import KAFKA_LOG_ENTRIES + + +async def bind_batch_exports_logger(team_id: int, export_destination: str) -> structlog.stdlib.AsyncBoundLogger: + """Return a logger for BatchExports.""" + if not structlog.is_configured(): + await configure_logger() + + logger = structlog.get_logger() + + return logger.bind(team=team_id, destination=export_destination) + + +async def configure_logger(): + queue = asyncio.Queue(maxsize=-1) + put_in_queue = PutInQueueProcessor(queue) + + structlog.configure( + processors=[ + structlog.stdlib.filter_by_level, + structlog.processors.add_log_level, + structlog.stdlib.add_logger_name, + structlog.stdlib.PositionalArgumentsFormatter(), + structlog.processors.format_exc_info, + structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S.%f", utc=True), + add_batch_export_context, + structlog.stdlib.PositionalArgumentsFormatter(), + put_in_queue, + structlog.processors.KeyValueRenderer(), + ], + wrapper_class=structlog.stdlib.AsyncBoundLogger, + cache_logger_on_first_use=True, + ) + task = asyncio.create_task(KafkaLogProducerFromQueue(queue=queue, topic=KAFKA_LOG_ENTRIES).listen_and_produce()) + + async def worker_shutdown_handler(): + """Gracefully handle a Temporal Worker shutting down. + + Graceful handling means: + * Waiting until the queue is fully processed to avoid missing log messages. + * Cancel task listening on queue. + """ + await temporalio.activity.wait_for_worker_shutdown() + + await queue.join() + task.cancel() + + await asyncio.wait([task]) + + asyncio.create_task(worker_shutdown_handler()) + + +class PutInQueueProcessor: + """A StructLog processor that puts event_dict into a queue. + + The idea is that any event_dicts can be processed later by any queue listeners. + """ + + def __init__(self, queue: asyncio.Queue): + self.queue = queue + + def __call__(self, logger: logging.Logger, method_name: str, event_dict: structlog.typing.EventDict): + self.queue.put_nowait(event_dict) + + return event_dict + + +def add_batch_export_context(logger: logging.Logger, method_name: str, event_dict: structlog.typing.EventDict): + """A StructLog processor to populate event dict with batch export context variables. + + More specifically, the batch export context variables are coming from Temporal: + * workflow_run_id: The ID of the Temporal Workflow Execution running the batch export. + * workflow_id: The ID of the Temporal Workflow running the batch export. + * attempt: The current attempt number of the Temporal Workflow. + * log_source_id: The batch export ID. + * log_source: Either "batch_exports" or "batch_exports_backfill". + + We attempt to fetch the context from the activity information, and then from the workflow + information. If both are undefined, nothing is populated. When running this processor in + an activity or a workflow, at least one will be defined. + """ + activity_info = attempt_to_fetch_activity_info() + workflow_info = attempt_to_fetch_workflow_info() + + info = activity_info or workflow_info + + if info is None: + return event_dict + + workflow_id, workflow_type, workflow_run_id, attempt = info + + if workflow_type == "backfill-batch-export": + # This works because the WorkflowID is made up like f"{batch_export_id}-Backfill-{data_interval_end}" + log_source_id = workflow_id.split("Backfill")[0] + log_source = "batch_exports_backfill" + else: + # This works because the WorkflowID is made up like f"{batch_export_id}-{data_interval_end}" + # Since 'data_interval_end' is an iso formatted datetime string, it has two '-' to separate the + # date. Plus one more leaves us at the end of right at the end of 'batch_export_id'. + log_source_id = workflow_id.rsplit("-", maxsplit=3)[0] + log_source = "batch_exports" + + event_dict["workflow_id"] = workflow_id + event_dict["workflow_type"] = workflow_type + event_dict["log_source_id"] = log_source_id + event_dict["log_source"] = log_source + event_dict["workflow_run_id"] = workflow_run_id + event_dict["attempt"] = attempt + + return event_dict + + +Info = tuple[str, str, str, int] + + +def attempt_to_fetch_activity_info() -> Info | None: + """Fetch Activity information from Temporal. + + Returns: + None if calling outside an Activity, else the relevant Info. + """ + try: + activity_info = temporalio.activity.info() + except RuntimeError: + return None + else: + workflow_id = activity_info.workflow_id + workflow_type = activity_info.workflow_type + workflow_run_id = activity_info.workflow_run_id + attempt = activity_info.attempt + + return (workflow_id, workflow_type, workflow_run_id, attempt) + + +def attempt_to_fetch_workflow_info() -> Info | None: + """Fetch Workflow information from Temporal. + + Returns: + None if calling outside a Workflow, else the relevant Info. + """ + try: + workflow_info = temporalio.workflow.info() + except RuntimeError: + return None + else: + workflow_id = workflow_info.workflow_id + workflow_type = workflow_info.workflow_type + workflow_run_id = workflow_info.run_id + attempt = workflow_info.attempt + + return (workflow_id, workflow_type, workflow_run_id, attempt) + + +class KafkaLogProducerFromQueue: + """Produce log messages to Kafka by getting them from a queue. + + This KafkaLogProducerFromQueue was designed to ingest logs into the ClickHouse log_entries table. + For this reason, the messages we produce to Kafka are serialized as JSON in the schema expected by + the log_entries table. Eventually, we could de-couple this producer from the table schema, but + schema changes are rare in ClickHouse, and for now we are only using this for logs, so the tight + coupling is preferred over the extra complexity of de-coupling this producer. + + Attributes: + queue: The queue we are listening to get log event_dicts to serialize and produce. + topic: The topic to produce to. This should be left to the default KAFKA_LOG_ENTRIES. + key: The key for Kafka partitioning. Default to None for random partition. + producer: Optionally, bring your own aiokafka.AIOKafkaProducer. This is mostly here for testing. + """ + + def __init__( + self, + queue: asyncio.Queue, + topic: str = KAFKA_LOG_ENTRIES, + key: str | None = None, + producer: aiokafka.AIOKafkaProducer | None = None, + ): + self.queue = queue + self.producer = ( + producer + if producer is not None + else aiokafka.AIOKafkaProducer( + bootstrap_servers=settings.KAFKA_HOSTS, + security_protocol=settings.KAFKA_SECURITY_PROTOCOL or "PLAINTEXT", + ) + ) + self.topic = topic + self.key = key + + async def listen_and_produce(self): + """Listen to messages in queue and produce them to Kafka as they come.""" + await self.producer.start() + + try: + while True: + event_dict = await self.queue.get() + + data = { + "instance_id": event_dict["workflow_run_id"], + "level": event_dict["level"], + "log_source": event_dict["log_source"], + "log_source_id": event_dict["log_source_id"], + "message": event_dict["event"], + "team_id": event_dict["team"], + "timestamp": event_dict["timestamp"], + } + + kafka_message = json.dumps(data).encode("utf-8") + + await self.producer.send_and_wait(self.topic, kafka_message, key=self.key) + + self.queue.task_done() + + finally: + await self.producer.stop() From c1b1e008d12d3b776d7f83ebb7c1d4d1130b135e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Thu, 2 Nov 2023 02:08:14 +0100 Subject: [PATCH 11/23] refactor: Use new structlog logger everywhere --- .../workflows/backfill_batch_export.py | 15 +- posthog/temporal/workflows/batch_exports.py | 157 +----------------- .../workflows/bigquery_batch_export.py | 20 +-- posthog/temporal/workflows/logger.py | 10 +- .../workflows/postgres_batch_export.py | 18 +- .../workflows/redshift_batch_export.py | 16 +- posthog/temporal/workflows/s3_batch_export.py | 44 ++--- .../workflows/snowflake_batch_export.py | 22 +-- 8 files changed, 80 insertions(+), 222 deletions(-) diff --git a/posthog/temporal/workflows/backfill_batch_export.py b/posthog/temporal/workflows/backfill_batch_export.py index 17f55ae1d8b54..1302f6e16b8df 100644 --- a/posthog/temporal/workflows/backfill_batch_export.py +++ b/posthog/temporal/workflows/backfill_batch_export.py @@ -20,9 +20,9 @@ CreateBatchExportBackfillInputs, UpdateBatchExportBackfillStatusInputs, create_batch_export_backfill_model, - get_batch_exports_logger, update_batch_export_backfill_model_status, ) +from posthog.temporal.workflows.logger import bind_batch_exports_logger class HeartbeatDetails(typing.NamedTuple): @@ -284,10 +284,9 @@ def parse_inputs(inputs: list[str]) -> BackfillBatchExportInputs: @temporalio.workflow.run async def run(self, inputs: BackfillBatchExportInputs) -> None: """Workflow implementation to backfill a BatchExport.""" - logger = get_batch_exports_logger(inputs=inputs) - logger.info( - "Starting Backfill for BatchExport %s: %s - %s", - inputs.batch_export_id, + logger = await bind_batch_exports_logger(team_id=inputs.team_id) + await logger.info( + "Starting Backfill for BatchExport: %s - %s", inputs.start_at, inputs.end_at, ) @@ -348,16 +347,16 @@ async def run(self, inputs: BackfillBatchExportInputs) -> None: except temporalio.exceptions.ActivityError as e: if isinstance(e.cause, temporalio.exceptions.CancelledError): - logger.error("Backfill was cancelled.") + await logger.error("Backfill was cancelled.") update_inputs.status = "Cancelled" else: - logger.exception("Backfill failed.", exc_info=e.cause) + await logger.exception("Backfill failed.", exc_info=e.cause) update_inputs.status = "Failed" raise except Exception as e: - logger.exception("Backfill failed with an unexpected error.", exc_info=e) + await logger.exception("Backfill failed with an unexpected error.", exc_info=e) update_inputs.status = "Failed" raise diff --git a/posthog/temporal/workflows/batch_exports.py b/posthog/temporal/workflows/batch_exports.py index 063069388801f..be9433625127f 100644 --- a/posthog/temporal/workflows/batch_exports.py +++ b/posthog/temporal/workflows/batch_exports.py @@ -4,9 +4,6 @@ import datetime as dt import gzip import json -import logging -import logging.handlers -import queue import tempfile import typing import uuid @@ -18,14 +15,12 @@ from temporalio.common import RetryPolicy from posthog.batch_exports.service import ( - BatchExportsInputsProtocol, create_batch_export_backfill, create_batch_export_run, update_batch_export_backfill_status, update_batch_export_run_status, ) -from posthog.kafka_client.client import KafkaProducer -from posthog.kafka_client.topics import KAFKA_LOG_ENTRIES +from posthog.temporal.workflows.logger import bind_batch_exports_logger SELECT_QUERY_TEMPLATE = Template( """ @@ -465,136 +460,6 @@ def reset(self): self.records_since_last_reset = 0 -class BatchExportLoggerAdapter(logging.LoggerAdapter): - """Adapter that adds batch export details to log records.""" - - def __init__( - self, - logger: logging.Logger, - extra=None, - ) -> None: - """Create the logger adapter.""" - super().__init__(logger, extra or {}) - - def process(self, msg: str, kwargs) -> tuple[typing.Any, collections.abc.MutableMapping[str, typing.Any]]: - """Override to add batch exports details.""" - workflow_id = None - workflow_run_id = None - attempt = None - - try: - activity_info = activity.info() - except RuntimeError: - pass - else: - workflow_run_id = activity_info.workflow_run_id - workflow_id = activity_info.workflow_id - attempt = activity_info.attempt - - try: - workflow_info = workflow.info() - except RuntimeError: - pass - else: - workflow_run_id = workflow_info.run_id - workflow_id = workflow_info.workflow_id - attempt = workflow_info.attempt - - if workflow_id is None or workflow_run_id is None or attempt is None: - return (None, {}) - - # This works because the WorkflowID is made up like f"{batch_export_id}-{data_interval_end}" - # Since {data_interval_date} is an iso formatted datetime string, it has two '-' to separate the - # date. Plus one more leaves us at the end of {batch_export_id}. - batch_export_id = workflow_id.rsplit("-", maxsplit=3)[0] - - extra = kwargs.get("extra", None) or {} - extra["workflow_id"] = workflow_id - extra["batch_export_id"] = batch_export_id - extra["workflow_run_id"] = workflow_run_id - extra["attempt"] = attempt - - if isinstance(self.extra, dict): - extra = extra | self.extra - kwargs["extra"] = extra - - return (msg, kwargs) - - @property - def base_logger(self) -> logging.Logger: - """Underlying logger usable for actions such as adding handlers/formatters.""" - return self.logger - - -class BatchExportsLogRecord(logging.LogRecord): - team_id: int - batch_export_id: str - workflow_run_id: str - attempt: int - - -class KafkaLoggingHandler(logging.Handler): - def __init__(self, topic, key=None): - super().__init__() - self.producer = KafkaProducer() - self.topic = topic - self.key = key - - def emit(self, record): - if record.name == "kafka": - return - - # This is a lie, but as long as this handler is used together - # with BatchExportLoggerAdapter we should be fine. - # This is definitely cheaper than a bunch if checks for attributes. - record = typing.cast(BatchExportsLogRecord, record) - - msg = self.format(record) - data = { - "instance_id": record.workflow_run_id, - "level": record.levelname, - "log_source": "batch_exports", - "log_source_id": record.batch_export_id, - "message": msg, - "team_id": record.team_id, - "timestamp": dt.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"), - } - - try: - future = self.producer.produce(topic=self.topic, data=data, key=self.key) - future.get(timeout=1) - except Exception as e: - logging.exception("Failed to produce log to Kafka topic %s", self.topic, exc_info=e) - - def close(self): - self.producer.close() - logging.Handler.close(self) - - -LOG_QUEUE: queue.Queue = queue.Queue(-1) -QUEUE_HANDLER = logging.handlers.QueueHandler(LOG_QUEUE) -QUEUE_HANDLER.setLevel(logging.DEBUG) - -KAFKA_HANDLER = KafkaLoggingHandler(topic=KAFKA_LOG_ENTRIES) -KAFKA_HANDLER.setLevel(logging.DEBUG) -QUEUE_LISTENER = logging.handlers.QueueListener(LOG_QUEUE, KAFKA_HANDLER) - -logger = logging.getLogger(__name__) -logger.addHandler(QUEUE_HANDLER) -logger.setLevel(logging.DEBUG) - - -def get_batch_exports_logger(inputs: BatchExportsInputsProtocol) -> BatchExportLoggerAdapter: - """Return a logger for BatchExports.""" - # Need a type comment as _thread is private. - if QUEUE_LISTENER._thread is None: # type: ignore - QUEUE_LISTENER.start() - - adapter = BatchExportLoggerAdapter(logger, {"team_id": inputs.team_id}) - - return adapter - - @dataclasses.dataclass class CreateBatchExportRunInputs: """Inputs to the create_export_run activity. @@ -620,9 +485,6 @@ async def create_export_run(inputs: CreateBatchExportRunInputs) -> str: Intended to be used in all export workflows, usually at the start, to create a model instance to represent them in our database. """ - logger = get_batch_exports_logger(inputs=inputs) - logger.info(f"Creating BatchExportRun model instance in team {inputs.team_id}.") - # 'sync_to_async' type hints are fixed in asgiref>=3.4.1 # But one of our dependencies is pinned to asgiref==3.3.2. # Remove these comments once we upgrade. @@ -633,8 +495,6 @@ async def create_export_run(inputs: CreateBatchExportRunInputs) -> str: status=inputs.status, ) - logger.info(f"Created BatchExportRun {run.id} in team {inputs.team_id}.") - return str(run.id) @@ -673,9 +533,6 @@ async def create_batch_export_backfill_model(inputs: CreateBatchExportBackfillIn Intended to be used in all export workflows, usually at the start, to create a model instance to represent them in our database. """ - logger = get_batch_exports_logger(inputs=inputs) - logger.info(f"Creating BatchExportBackfill model instance in team {inputs.team_id}.") - # 'sync_to_async' type hints are fixed in asgiref>=3.4.1 # But one of our dependencies is pinned to asgiref==3.3.2. # Remove these comments once we upgrade. @@ -687,8 +544,6 @@ async def create_batch_export_backfill_model(inputs: CreateBatchExportBackfillIn team_id=inputs.team_id, ) - logger.info(f"Created BatchExportBackfill {run.id} in team {inputs.team_id}.") - return str(run.id) @@ -734,7 +589,7 @@ async def execute_batch_export_insert_activity( initial_retry_interval_seconds: When retrying, seconds until the first retry. maximum_retry_interval_seconds: Maximum interval in seconds between retries. """ - logger = get_batch_exports_logger(inputs=inputs) + logger = await bind_batch_exports_logger(team_id=inputs.team_id) retry_policy = RetryPolicy( initial_interval=dt.timedelta(seconds=initial_retry_interval_seconds), @@ -752,23 +607,23 @@ async def execute_batch_export_insert_activity( ) except exceptions.ActivityError as e: if isinstance(e.cause, exceptions.CancelledError): - logger.error("BatchExport was cancelled.") + await logger.error("BatchExport was cancelled.") update_inputs.status = "Cancelled" else: - logger.exception("BatchExport failed.", exc_info=e.cause) + await logger.exception("BatchExport failed.", exc_info=e.cause) update_inputs.status = "Failed" update_inputs.latest_error = str(e.cause) raise except Exception as e: - logger.exception("BatchExport failed with an unexpected error.", exc_info=e) + await logger.exception("BatchExport failed with an unexpected error.", exc_info=e) update_inputs.status = "Failed" update_inputs.latest_error = "An unexpected error has ocurred" raise else: - logger.info( + await logger.info( "Successfully finished exporting batch %s - %s", inputs.data_interval_start, inputs.data_interval_end ) diff --git a/posthog/temporal/workflows/bigquery_batch_export.py b/posthog/temporal/workflows/bigquery_batch_export.py index d9557d31bb07b..6deaa2b3c4b02 100644 --- a/posthog/temporal/workflows/bigquery_batch_export.py +++ b/posthog/temporal/workflows/bigquery_batch_export.py @@ -17,12 +17,12 @@ UpdateBatchExportRunStatusInputs, create_export_run, execute_batch_export_insert_activity, - get_batch_exports_logger, get_data_interval, get_results_iterator, get_rows_count, ) from posthog.temporal.workflows.clickhouse import get_client +from posthog.temporal.workflows.logger import bind_batch_exports_logger def load_jsonl_file_to_bigquery_table(jsonl_file, table, table_schema, bigquery_client): @@ -98,9 +98,9 @@ def bigquery_client(inputs: BigQueryInsertInputs): @activity.defn async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs): """Activity streams data from ClickHouse to BigQuery.""" - logger = get_batch_exports_logger(inputs=inputs) - logger.info( - "Running BigQuery export batch %s - %s", + logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="BigQuery") + await logger.info( + "Exporting batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, ) @@ -119,14 +119,14 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs): ) if count == 0: - logger.info( + await logger.info( "Nothing to export in batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, ) return - logger.info("BatchExporting %s rows to BigQuery", count) + await logger.info("BatchExporting %s rows to BigQuery", count) results_iterator = get_results_iterator( client=client, @@ -173,7 +173,7 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs): jsonl_file.write_records_to_jsonl([row]) if jsonl_file.tell() > settings.BATCH_EXPORT_BIGQUERY_UPLOAD_CHUNK_SIZE_BYTES: - logger.info( + await logger.info( "Copying %s records of size %s bytes to BigQuery", jsonl_file.records_since_last_reset, jsonl_file.bytes_since_last_reset, @@ -187,7 +187,7 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs): jsonl_file.reset() if jsonl_file.tell() > 0: - logger.info( + await logger.info( "Copying %s records of size %s bytes to BigQuery", jsonl_file.records_since_last_reset, jsonl_file.bytes_since_last_reset, @@ -214,9 +214,9 @@ def parse_inputs(inputs: list[str]) -> BigQueryBatchExportInputs: @workflow.run async def run(self, inputs: BigQueryBatchExportInputs): """Workflow implementation to export data to BigQuery.""" - logger = get_batch_exports_logger(inputs=inputs) + logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="BigQuery") data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end) - logger.info( + await logger.info( "Starting BigQuery export batch %s - %s", data_interval_start, data_interval_end, diff --git a/posthog/temporal/workflows/logger.py b/posthog/temporal/workflows/logger.py index 58fd92fc68565..d1a031d32a50a 100644 --- a/posthog/temporal/workflows/logger.py +++ b/posthog/temporal/workflows/logger.py @@ -11,18 +11,18 @@ from posthog.kafka_client.topics import KAFKA_LOG_ENTRIES -async def bind_batch_exports_logger(team_id: int, export_destination: str) -> structlog.stdlib.AsyncBoundLogger: +async def bind_batch_exports_logger(team_id: int, destination: str | None = None) -> structlog.stdlib.AsyncBoundLogger: """Return a logger for BatchExports.""" if not structlog.is_configured(): await configure_logger() logger = structlog.get_logger() - return logger.bind(team=team_id, destination=export_destination) + return logger.bind(team=team_id, destination=destination) async def configure_logger(): - queue = asyncio.Queue(maxsize=-1) + queue: asyncio.Queue = asyncio.Queue(maxsize=-1) put_in_queue = PutInQueueProcessor(queue) structlog.configure( @@ -69,13 +69,13 @@ class PutInQueueProcessor: def __init__(self, queue: asyncio.Queue): self.queue = queue - def __call__(self, logger: logging.Logger, method_name: str, event_dict: structlog.typing.EventDict): + def __call__(self, logger: logging.Logger, method_name: str, event_dict: structlog.types.EventDict): self.queue.put_nowait(event_dict) return event_dict -def add_batch_export_context(logger: logging.Logger, method_name: str, event_dict: structlog.typing.EventDict): +def add_batch_export_context(logger: logging.Logger, method_name: str, event_dict: structlog.types.EventDict): """A StructLog processor to populate event dict with batch export context variables. More specifically, the batch export context variables are coming from Temporal: diff --git a/posthog/temporal/workflows/postgres_batch_export.py b/posthog/temporal/workflows/postgres_batch_export.py index 8b66cfb0abb2c..55639a3e6337e 100644 --- a/posthog/temporal/workflows/postgres_batch_export.py +++ b/posthog/temporal/workflows/postgres_batch_export.py @@ -19,12 +19,12 @@ UpdateBatchExportRunStatusInputs, create_export_run, execute_batch_export_insert_activity, - get_batch_exports_logger, get_data_interval, get_results_iterator, get_rows_count, ) from posthog.temporal.workflows.clickhouse import get_client +from posthog.temporal.workflows.logger import bind_batch_exports_logger @contextlib.contextmanager @@ -143,8 +143,8 @@ class PostgresInsertInputs: @activity.defn async def insert_into_postgres_activity(inputs: PostgresInsertInputs): """Activity streams data from ClickHouse to Postgres.""" - logger = get_batch_exports_logger(inputs=inputs) - logger.info( + logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="PostgreSQL") + await logger.info( "Running Postgres export batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, @@ -164,14 +164,14 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs): ) if count == 0: - logger.info( + await logger.info( "Nothing to export in batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, ) return - logger.info("BatchExporting %s rows to Postgres", count) + await logger.info("BatchExporting %s rows to Postgres", count) results_iterator = get_results_iterator( client=client, @@ -226,7 +226,7 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs): pg_file.write_records_to_tsv([row], fieldnames=schema_columns) if pg_file.tell() > settings.BATCH_EXPORT_POSTGRES_UPLOAD_CHUNK_SIZE_BYTES: - logger.info( + await logger.info( "Copying %s records of size %s bytes to Postgres", pg_file.records_since_last_reset, pg_file.bytes_since_last_reset, @@ -241,7 +241,7 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs): pg_file.reset() if pg_file.tell() > 0: - logger.info( + await logger.info( "Copying %s records of size %s bytes to Postgres", pg_file.records_since_last_reset, pg_file.bytes_since_last_reset, @@ -274,9 +274,9 @@ def parse_inputs(inputs: list[str]) -> PostgresBatchExportInputs: @workflow.run async def run(self, inputs: PostgresBatchExportInputs): """Workflow implementation to export data to Postgres.""" - logger = get_batch_exports_logger(inputs=inputs) + logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="PostgreSQL") data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end) - logger.info( + await logger.info( "Starting Postgres export batch %s - %s", data_interval_start, data_interval_end, diff --git a/posthog/temporal/workflows/redshift_batch_export.py b/posthog/temporal/workflows/redshift_batch_export.py index 74c1fb52662cc..14d289f640c03 100644 --- a/posthog/temporal/workflows/redshift_batch_export.py +++ b/posthog/temporal/workflows/redshift_batch_export.py @@ -18,12 +18,12 @@ UpdateBatchExportRunStatusInputs, create_export_run, execute_batch_export_insert_activity, - get_batch_exports_logger, get_data_interval, get_results_iterator, get_rows_count, ) from posthog.temporal.workflows.clickhouse import get_client +from posthog.temporal.workflows.logger import bind_batch_exports_logger from posthog.temporal.workflows.postgres_batch_export import ( PostgresInsertInputs, create_table_in_postgres, @@ -110,9 +110,9 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs): the Redshift-specific properties_data_type to indicate the type of JSON-like fields. """ - logger = get_batch_exports_logger(inputs=inputs) - logger.info( - "Running Postgres export batch %s - %s", + logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="Redshift") + await logger.info( + "Exporting batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, ) @@ -131,14 +131,14 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs): ) if count == 0: - logger.info( + await logger.info( "Nothing to export in batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, ) return - logger.info("BatchExporting %s rows to Postgres", count) + await logger.info("BatchExporting %s rows", count) results_iterator = get_results_iterator( client=client, @@ -217,9 +217,9 @@ def parse_inputs(inputs: list[str]) -> RedshiftBatchExportInputs: @workflow.run async def run(self, inputs: RedshiftBatchExportInputs): """Workflow implementation to export data to Redshift.""" - logger = get_batch_exports_logger(inputs=inputs) + logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="Redshift") data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end) - logger.info("Starting Redshift export batch %s - %s", data_interval_start, data_interval_end) + await logger.info("Starting Redshift export batch %s - %s", data_interval_start, data_interval_end) create_export_run_inputs = CreateBatchExportRunInputs( team_id=inputs.team_id, diff --git a/posthog/temporal/workflows/s3_batch_export.py b/posthog/temporal/workflows/s3_batch_export.py index 6a81aeeb93a77..dc1154d6eca48 100644 --- a/posthog/temporal/workflows/s3_batch_export.py +++ b/posthog/temporal/workflows/s3_batch_export.py @@ -20,12 +20,12 @@ UpdateBatchExportRunStatusInputs, create_export_run, execute_batch_export_insert_activity, - get_batch_exports_logger, get_data_interval, get_results_iterator, get_rows_count, ) from posthog.temporal.workflows.clickhouse import get_client +from posthog.temporal.workflows.logger import bind_batch_exports_logger def get_allowed_template_variables(inputs) -> dict[str, str]: @@ -303,7 +303,7 @@ class S3InsertInputs: async def initialize_and_resume_multipart_upload(inputs: S3InsertInputs) -> tuple[S3MultiPartUpload, str]: """Initialize a S3MultiPartUpload and resume it from a hearbeat state if available.""" - logger = get_batch_exports_logger(inputs=inputs) + logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="S3") key = get_s3_key(inputs) s3_upload = S3MultiPartUpload( @@ -323,19 +323,22 @@ async def initialize_and_resume_multipart_upload(inputs: S3InsertInputs) -> tupl except IndexError: # This is the error we expect when no details as the sequence will be empty. interval_start = inputs.data_interval_start - logger.info( - f"Did not receive details from previous activity Excecution. Export will start from the beginning: {interval_start}" + await logger.debug( + "Did not receive details from previous activity Excecution. Export will start from the beginning %s", + interval_start, ) except Exception: # We still start from the beginning, but we make a point to log unexpected errors. # Ideally, any new exceptions should be added to the previous block after the first time and we will never land here. interval_start = inputs.data_interval_start - logger.warning( - f"Did not receive details from previous activity Excecution due to an unexpected error. Export will start from the beginning: {interval_start}", + await logger.warning( + "Did not receive details from previous activity Excecution due to an unexpected error. Export will start from the beginning %s", + interval_start, ) else: - logger.info( - f"Received details from previous activity. Export will attempt to resume from: {interval_start}", + await logger.info( + "Received details from previous activity. Export will attempt to resume from %s", + interval_start, ) s3_upload.continue_from_state(upload_state) @@ -343,8 +346,9 @@ async def initialize_and_resume_multipart_upload(inputs: S3InsertInputs) -> tupl # Even if we receive details we cannot resume a brotli compressed upload as we have lost the compressor state. interval_start = inputs.data_interval_start - logger.info( - f"Export will start from the beginning as we are using brotli compression: {interval_start}", + await logger.info( + f"Export will start from the beginning as we are using brotli compression: %s", + interval_start, ) await s3_upload.abort() @@ -362,9 +366,9 @@ async def insert_into_s3_activity(inputs: S3InsertInputs): runs, timing out after say 30 seconds or something and upload multiple files. """ - logger = get_batch_exports_logger(inputs=inputs) - logger.info( - "Running S3 export batch %s - %s", + logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="S3") + await logger.info( + "Exporting batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, ) @@ -383,14 +387,14 @@ async def insert_into_s3_activity(inputs: S3InsertInputs): ) if count == 0: - logger.info( + await logger.info( "Nothing to export in batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, ) return - logger.info("BatchExporting %s rows to S3", count) + await logger.info("BatchExporting %s rows to S3", count) s3_upload, interval_start = await initialize_and_resume_multipart_upload(inputs) @@ -416,7 +420,7 @@ async def insert_into_s3_activity(inputs: S3InsertInputs): async def worker_shutdown_handler(): """Handle the Worker shutting down by heart-beating our latest status.""" await activity.wait_for_worker_shutdown() - logger.warn( + await logger.warn( f"Worker shutting down! Reporting back latest exported part {last_uploaded_part_timestamp}", ) activity.heartbeat(last_uploaded_part_timestamp, s3_upload.to_state()) @@ -442,7 +446,7 @@ async def worker_shutdown_handler(): local_results_file.write_records_to_jsonl([record]) if local_results_file.tell() > settings.BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES: - logger.info( + await logger.info( "Uploading part %s containing %s records with size %s bytes to S3", s3_upload.part_number + 1, local_results_file.records_since_last_reset, @@ -457,7 +461,7 @@ async def worker_shutdown_handler(): local_results_file.reset() if local_results_file.tell() > 0 and result is not None: - logger.info( + await logger.info( "Uploading last part %s containing %s records with size %s bytes to S3", s3_upload.part_number + 1, local_results_file.records_since_last_reset, @@ -490,9 +494,9 @@ def parse_inputs(inputs: list[str]) -> S3BatchExportInputs: @workflow.run async def run(self, inputs: S3BatchExportInputs): """Workflow implementation to export data to S3 bucket.""" - logger = get_batch_exports_logger(inputs=inputs) + logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="S3") data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end) - logger.info("Starting S3 export batch %s - %s", data_interval_start, data_interval_end) + await logger.info("Starting S3 export batch %s - %s", data_interval_start, data_interval_end) create_export_run_inputs = CreateBatchExportRunInputs( team_id=inputs.team_id, diff --git a/posthog/temporal/workflows/snowflake_batch_export.py b/posthog/temporal/workflows/snowflake_batch_export.py index ec556e527192a..f32597cef9b56 100644 --- a/posthog/temporal/workflows/snowflake_batch_export.py +++ b/posthog/temporal/workflows/snowflake_batch_export.py @@ -16,12 +16,12 @@ UpdateBatchExportRunStatusInputs, create_export_run, execute_batch_export_insert_activity, - get_batch_exports_logger, get_data_interval, get_results_iterator, get_rows_count, ) from posthog.temporal.workflows.clickhouse import get_client +from posthog.temporal.workflows.logger import bind_batch_exports_logger class SnowflakeFileNotUploadedError(Exception): @@ -98,9 +98,9 @@ async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs): TODO: We're using JSON here, it's not the most efficient way to do this. """ - logger = get_batch_exports_logger(inputs=inputs) - logger.info( - "Running Snowflake export batch %s - %s", + logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="Snowflake") + await logger.info( + "Exporting batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, ) @@ -119,14 +119,14 @@ async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs): ) if count == 0: - logger.info( + await logger.info( "Nothing to export in batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, ) return - logger.info("BatchExporting %s rows to Snowflake", count) + await logger.info("BatchExporting %s rows", count) conn = snowflake.connector.connect( user=inputs.user, @@ -183,7 +183,7 @@ async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs): break except json.JSONDecodeError: - logger.info( + await logger.info( "Failed to decode a JSON value while iterating, potentially due to a ClickHouse error" ) # This is raised by aiochclient as we try to decode an error message from ClickHouse. @@ -219,7 +219,7 @@ async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs): local_results_file.tell() and local_results_file.tell() > settings.BATCH_EXPORT_SNOWFLAKE_UPLOAD_CHUNK_SIZE_BYTES ): - logger.info("Uploading to Snowflake") + await logger.info("Uploading to Snowflake") # Flush the file to make sure everything is written local_results_file.flush() @@ -294,10 +294,10 @@ def parse_inputs(inputs: list[str]) -> SnowflakeBatchExportInputs: @workflow.run async def run(self, inputs: SnowflakeBatchExportInputs): """Workflow implementation to export data to Snowflake table.""" - logger = get_batch_exports_logger(inputs=inputs) + logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="Snowflake") data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end) - logger.info( - "Starting Snowflake export batch %s - %s", + await logger.info( + "Starting export batch %s - %s", data_interval_start, data_interval_end, ) From 2042e24f92f9fdca626eaa3fa296d21fa031c578 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Thu, 2 Nov 2023 17:58:15 +0100 Subject: [PATCH 12/23] test: Add tests, fix things --- .../workflows/backfill_batch_export.py | 8 +- .../workflows/bigquery_batch_export.py | 18 +-- posthog/temporal/workflows/logger.py | 113 +++++++++++------- .../workflows/postgres_batch_export.py | 12 +- .../workflows/redshift_batch_export.py | 8 +- posthog/temporal/workflows/s3_batch_export.py | 24 ++-- .../workflows/snowflake_batch_export.py | 14 +-- 7 files changed, 110 insertions(+), 87 deletions(-) diff --git a/posthog/temporal/workflows/backfill_batch_export.py b/posthog/temporal/workflows/backfill_batch_export.py index 1302f6e16b8df..d8872d6c31623 100644 --- a/posthog/temporal/workflows/backfill_batch_export.py +++ b/posthog/temporal/workflows/backfill_batch_export.py @@ -285,7 +285,7 @@ def parse_inputs(inputs: list[str]) -> BackfillBatchExportInputs: async def run(self, inputs: BackfillBatchExportInputs) -> None: """Workflow implementation to backfill a BatchExport.""" logger = await bind_batch_exports_logger(team_id=inputs.team_id) - await logger.info( + await logger.ainfo( "Starting Backfill for BatchExport: %s - %s", inputs.start_at, inputs.end_at, @@ -347,16 +347,16 @@ async def run(self, inputs: BackfillBatchExportInputs) -> None: except temporalio.exceptions.ActivityError as e: if isinstance(e.cause, temporalio.exceptions.CancelledError): - await logger.error("Backfill was cancelled.") + await logger.aerror("Backfill was cancelled.") update_inputs.status = "Cancelled" else: - await logger.exception("Backfill failed.", exc_info=e.cause) + await logger.aexception("Backfill failed.", exc_info=e.cause) update_inputs.status = "Failed" raise except Exception as e: - await logger.exception("Backfill failed with an unexpected error.", exc_info=e) + await logger.aexception("Backfill failed with an unexpected error.", exc_info=e) update_inputs.status = "Failed" raise diff --git a/posthog/temporal/workflows/bigquery_batch_export.py b/posthog/temporal/workflows/bigquery_batch_export.py index 6deaa2b3c4b02..aee1abfad6aa5 100644 --- a/posthog/temporal/workflows/bigquery_batch_export.py +++ b/posthog/temporal/workflows/bigquery_batch_export.py @@ -99,7 +99,7 @@ def bigquery_client(inputs: BigQueryInsertInputs): async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs): """Activity streams data from ClickHouse to BigQuery.""" logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="BigQuery") - await logger.info( + await logger.ainfo( "Exporting batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, @@ -119,14 +119,14 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs): ) if count == 0: - await logger.info( + await logger.ainfo( "Nothing to export in batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, ) return - await logger.info("BatchExporting %s rows to BigQuery", count) + await logger.ainfo("BatchExporting %s rows", count) results_iterator = get_results_iterator( client=client, @@ -173,8 +173,8 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs): jsonl_file.write_records_to_jsonl([row]) if jsonl_file.tell() > settings.BATCH_EXPORT_BIGQUERY_UPLOAD_CHUNK_SIZE_BYTES: - await logger.info( - "Copying %s records of size %s bytes to BigQuery", + await logger.ainfo( + "Copying %s records of size %s bytes", jsonl_file.records_since_last_reset, jsonl_file.bytes_since_last_reset, ) @@ -187,8 +187,8 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs): jsonl_file.reset() if jsonl_file.tell() > 0: - await logger.info( - "Copying %s records of size %s bytes to BigQuery", + await logger.ainfo( + "Copying %s records of size %s bytes", jsonl_file.records_since_last_reset, jsonl_file.bytes_since_last_reset, ) @@ -216,8 +216,8 @@ async def run(self, inputs: BigQueryBatchExportInputs): """Workflow implementation to export data to BigQuery.""" logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="BigQuery") data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end) - await logger.info( - "Starting BigQuery export batch %s - %s", + await logger.ainfo( + "Starting batch export %s - %s", data_interval_start, data_interval_end, ) diff --git a/posthog/temporal/workflows/logger.py b/posthog/temporal/workflows/logger.py index d1a031d32a50a..29961783a4ed3 100644 --- a/posthog/temporal/workflows/logger.py +++ b/posthog/temporal/workflows/logger.py @@ -11,37 +11,52 @@ from posthog.kafka_client.topics import KAFKA_LOG_ENTRIES -async def bind_batch_exports_logger(team_id: int, destination: str | None = None) -> structlog.stdlib.AsyncBoundLogger: +async def bind_batch_exports_logger(team_id: int, destination: str | None = None) -> structlog.stdlib.BoundLogger: """Return a logger for BatchExports.""" if not structlog.is_configured(): await configure_logger() logger = structlog.get_logger() - return logger.bind(team=team_id, destination=destination) + return logger.new(team_id=team_id, destination=destination) -async def configure_logger(): - queue: asyncio.Queue = asyncio.Queue(maxsize=-1) - put_in_queue = PutInQueueProcessor(queue) +async def configure_logger( + logger_factory=structlog.PrintLoggerFactory, + extra_processors: list[structlog.types.Processor] | None = None, + queue: asyncio.Queue | None = None, + producer: aiokafka.AIOKafkaProducer | None = None, + cache_logger_on_first_use: bool = True, +) -> tuple: + """Configure a StructLog logger for batch exports. + + Args: + queue: Optionally, bring your own log queue. + producer: Optionally, bring your own Kafka producer. + """ + log_queue = queue if queue is not None else asyncio.Queue(maxsize=-1) + put_in_queue = PutInBatchExportsLogQueueProcessor(log_queue) + + base_processors: list[structlog.types.Processor] = [ + structlog.processors.add_log_level, + structlog.processors.format_exc_info, + structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S.%f", utc=True), + structlog.stdlib.PositionalArgumentsFormatter(), + add_batch_export_context, + put_in_queue, + structlog.processors.EventRenamer("msg"), + structlog.processors.JSONRenderer(), + ] + extra_processors_to_add = extra_processors if extra_processors is not None else [] structlog.configure( - processors=[ - structlog.stdlib.filter_by_level, - structlog.processors.add_log_level, - structlog.stdlib.add_logger_name, - structlog.stdlib.PositionalArgumentsFormatter(), - structlog.processors.format_exc_info, - structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S.%f", utc=True), - add_batch_export_context, - structlog.stdlib.PositionalArgumentsFormatter(), - put_in_queue, - structlog.processors.KeyValueRenderer(), - ], - wrapper_class=structlog.stdlib.AsyncBoundLogger, - cache_logger_on_first_use=True, + processors=base_processors + extra_processors_to_add, + logger_factory=logger_factory(), + cache_logger_on_first_use=cache_logger_on_first_use, + ) + listen_task = asyncio.create_task( + KafkaLogProducerFromQueue(queue=log_queue, topic=KAFKA_LOG_ENTRIES, producer=producer).listen_and_produce() ) - task = asyncio.create_task(KafkaLogProducerFromQueue(queue=queue, topic=KAFKA_LOG_ENTRIES).listen_and_produce()) async def worker_shutdown_handler(): """Gracefully handle a Temporal Worker shutting down. @@ -52,15 +67,17 @@ async def worker_shutdown_handler(): """ await temporalio.activity.wait_for_worker_shutdown() - await queue.join() - task.cancel() + await log_queue.join() + listen_task.cancel() - await asyncio.wait([task]) + await asyncio.wait([listen_task]) - asyncio.create_task(worker_shutdown_handler()) + worker_shutdown_handler_task = asyncio.create_task(worker_shutdown_handler()) + return (listen_task, worker_shutdown_handler_task) -class PutInQueueProcessor: + +class PutInBatchExportsLogQueueProcessor: """A StructLog processor that puts event_dict into a queue. The idea is that any event_dicts can be processed later by any queue listeners. @@ -69,8 +86,25 @@ class PutInQueueProcessor: def __init__(self, queue: asyncio.Queue): self.queue = queue - def __call__(self, logger: logging.Logger, method_name: str, event_dict: structlog.types.EventDict): - self.queue.put_nowait(event_dict) + def __call__( + self, logger: logging.Logger, method_name: str, event_dict: structlog.types.EventDict + ) -> structlog.types.EventDict: + try: + message_dict = { + "instance_id": event_dict["workflow_run_id"], + "level": event_dict["level"], + "log_source": event_dict["log_source"], + "log_source_id": event_dict["log_source_id"], + "message": event_dict["event"], + "team_id": event_dict["team_id"], + "timestamp": event_dict["timestamp"], + } + except KeyError: + # We don't have the required keys to ingest this log. + # This could be because we are running outside an Activity/Workflow context. + return event_dict + + self.queue.put_nowait(json.dumps(message_dict).encode("utf-8")) return event_dict @@ -101,7 +135,7 @@ def add_batch_export_context(logger: logging.Logger, method_name: str, event_dic if workflow_type == "backfill-batch-export": # This works because the WorkflowID is made up like f"{batch_export_id}-Backfill-{data_interval_end}" - log_source_id = workflow_id.split("Backfill")[0] + log_source_id = workflow_id.split("-Backfill")[0] log_source = "batch_exports_backfill" else: # This works because the WorkflowID is made up like f"{batch_export_id}-{data_interval_end}" @@ -185,16 +219,17 @@ def __init__( producer: aiokafka.AIOKafkaProducer | None = None, ): self.queue = queue + self.topic = topic + self.key = key self.producer = ( producer if producer is not None else aiokafka.AIOKafkaProducer( bootstrap_servers=settings.KAFKA_HOSTS, - security_protocol=settings.KAFKA_SECURITY_PROTOCOL or "PLAINTEXT", + security_protocol=settings.KAFKA_SECURITY_PROTOCOL or "PLAINTE1XT", + acks="all", ) ) - self.topic = topic - self.key = key async def listen_and_produce(self): """Listen to messages in queue and produce them to Kafka as they come.""" @@ -202,21 +237,9 @@ async def listen_and_produce(self): try: while True: - event_dict = await self.queue.get() - - data = { - "instance_id": event_dict["workflow_run_id"], - "level": event_dict["level"], - "log_source": event_dict["log_source"], - "log_source_id": event_dict["log_source_id"], - "message": event_dict["event"], - "team_id": event_dict["team"], - "timestamp": event_dict["timestamp"], - } - - kafka_message = json.dumps(data).encode("utf-8") + msg = await self.queue.get() - await self.producer.send_and_wait(self.topic, kafka_message, key=self.key) + await self.producer.send_and_wait(self.topic, msg, key=self.key) self.queue.task_done() diff --git a/posthog/temporal/workflows/postgres_batch_export.py b/posthog/temporal/workflows/postgres_batch_export.py index 55639a3e6337e..30999d73f58a1 100644 --- a/posthog/temporal/workflows/postgres_batch_export.py +++ b/posthog/temporal/workflows/postgres_batch_export.py @@ -144,7 +144,7 @@ class PostgresInsertInputs: async def insert_into_postgres_activity(inputs: PostgresInsertInputs): """Activity streams data from ClickHouse to Postgres.""" logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="PostgreSQL") - await logger.info( + await logger.ainfo( "Running Postgres export batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, @@ -164,14 +164,14 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs): ) if count == 0: - await logger.info( + await logger.ainfo( "Nothing to export in batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, ) return - await logger.info("BatchExporting %s rows to Postgres", count) + await logger.ainfo("BatchExporting %s rows to Postgres", count) results_iterator = get_results_iterator( client=client, @@ -226,7 +226,7 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs): pg_file.write_records_to_tsv([row], fieldnames=schema_columns) if pg_file.tell() > settings.BATCH_EXPORT_POSTGRES_UPLOAD_CHUNK_SIZE_BYTES: - await logger.info( + await logger.ainfo( "Copying %s records of size %s bytes to Postgres", pg_file.records_since_last_reset, pg_file.bytes_since_last_reset, @@ -241,7 +241,7 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs): pg_file.reset() if pg_file.tell() > 0: - await logger.info( + await logger.ainfo( "Copying %s records of size %s bytes to Postgres", pg_file.records_since_last_reset, pg_file.bytes_since_last_reset, @@ -276,7 +276,7 @@ async def run(self, inputs: PostgresBatchExportInputs): """Workflow implementation to export data to Postgres.""" logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="PostgreSQL") data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end) - await logger.info( + await logger.ainfo( "Starting Postgres export batch %s - %s", data_interval_start, data_interval_end, diff --git a/posthog/temporal/workflows/redshift_batch_export.py b/posthog/temporal/workflows/redshift_batch_export.py index 14d289f640c03..fa649a2cfe552 100644 --- a/posthog/temporal/workflows/redshift_batch_export.py +++ b/posthog/temporal/workflows/redshift_batch_export.py @@ -111,7 +111,7 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs): fields. """ logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="Redshift") - await logger.info( + await logger.ainfo( "Exporting batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, @@ -131,14 +131,14 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs): ) if count == 0: - await logger.info( + await logger.ainfo( "Nothing to export in batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, ) return - await logger.info("BatchExporting %s rows", count) + await logger.ainfo("BatchExporting %s rows", count) results_iterator = get_results_iterator( client=client, @@ -219,7 +219,7 @@ async def run(self, inputs: RedshiftBatchExportInputs): """Workflow implementation to export data to Redshift.""" logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="Redshift") data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end) - await logger.info("Starting Redshift export batch %s - %s", data_interval_start, data_interval_end) + await logger.ainfo("Starting Redshift export batch %s - %s", data_interval_start, data_interval_end) create_export_run_inputs = CreateBatchExportRunInputs( team_id=inputs.team_id, diff --git a/posthog/temporal/workflows/s3_batch_export.py b/posthog/temporal/workflows/s3_batch_export.py index dc1154d6eca48..8288169f30416 100644 --- a/posthog/temporal/workflows/s3_batch_export.py +++ b/posthog/temporal/workflows/s3_batch_export.py @@ -323,7 +323,7 @@ async def initialize_and_resume_multipart_upload(inputs: S3InsertInputs) -> tupl except IndexError: # This is the error we expect when no details as the sequence will be empty. interval_start = inputs.data_interval_start - await logger.debug( + await logger.adebug( "Did not receive details from previous activity Excecution. Export will start from the beginning %s", interval_start, ) @@ -331,12 +331,12 @@ async def initialize_and_resume_multipart_upload(inputs: S3InsertInputs) -> tupl # We still start from the beginning, but we make a point to log unexpected errors. # Ideally, any new exceptions should be added to the previous block after the first time and we will never land here. interval_start = inputs.data_interval_start - await logger.warning( + await logger.awarning( "Did not receive details from previous activity Excecution due to an unexpected error. Export will start from the beginning %s", interval_start, ) else: - await logger.info( + await logger.ainfo( "Received details from previous activity. Export will attempt to resume from %s", interval_start, ) @@ -346,7 +346,7 @@ async def initialize_and_resume_multipart_upload(inputs: S3InsertInputs) -> tupl # Even if we receive details we cannot resume a brotli compressed upload as we have lost the compressor state. interval_start = inputs.data_interval_start - await logger.info( + await logger.ainfo( f"Export will start from the beginning as we are using brotli compression: %s", interval_start, ) @@ -367,7 +367,7 @@ async def insert_into_s3_activity(inputs: S3InsertInputs): files. """ logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="S3") - await logger.info( + await logger.ainfo( "Exporting batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, @@ -387,14 +387,14 @@ async def insert_into_s3_activity(inputs: S3InsertInputs): ) if count == 0: - await logger.info( + await logger.ainfo( "Nothing to export in batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, ) return - await logger.info("BatchExporting %s rows to S3", count) + await logger.ainfo("BatchExporting %s rows to S3", count) s3_upload, interval_start = await initialize_and_resume_multipart_upload(inputs) @@ -420,7 +420,7 @@ async def insert_into_s3_activity(inputs: S3InsertInputs): async def worker_shutdown_handler(): """Handle the Worker shutting down by heart-beating our latest status.""" await activity.wait_for_worker_shutdown() - await logger.warn( + await logger.awarn( f"Worker shutting down! Reporting back latest exported part {last_uploaded_part_timestamp}", ) activity.heartbeat(last_uploaded_part_timestamp, s3_upload.to_state()) @@ -447,7 +447,7 @@ async def worker_shutdown_handler(): if local_results_file.tell() > settings.BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES: await logger.info( - "Uploading part %s containing %s records with size %s bytes to S3", + "Uploading part %s containing %s records with size %s bytes", s3_upload.part_number + 1, local_results_file.records_since_last_reset, local_results_file.bytes_since_last_reset, @@ -461,8 +461,8 @@ async def worker_shutdown_handler(): local_results_file.reset() if local_results_file.tell() > 0 and result is not None: - await logger.info( - "Uploading last part %s containing %s records with size %s bytes to S3", + await logger.ainfo( + "Uploading last part %s containing %s records with size %s bytes", s3_upload.part_number + 1, local_results_file.records_since_last_reset, local_results_file.bytes_since_last_reset, @@ -496,7 +496,7 @@ async def run(self, inputs: S3BatchExportInputs): """Workflow implementation to export data to S3 bucket.""" logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="S3") data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end) - await logger.info("Starting S3 export batch %s - %s", data_interval_start, data_interval_end) + await logger.ainfo("Starting batch export %s - %s", data_interval_start, data_interval_end) create_export_run_inputs = CreateBatchExportRunInputs( team_id=inputs.team_id, diff --git a/posthog/temporal/workflows/snowflake_batch_export.py b/posthog/temporal/workflows/snowflake_batch_export.py index f32597cef9b56..a4cfafc7f9adf 100644 --- a/posthog/temporal/workflows/snowflake_batch_export.py +++ b/posthog/temporal/workflows/snowflake_batch_export.py @@ -99,7 +99,7 @@ async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs): TODO: We're using JSON here, it's not the most efficient way to do this. """ logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="Snowflake") - await logger.info( + await logger.ainfo( "Exporting batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, @@ -119,14 +119,14 @@ async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs): ) if count == 0: - await logger.info( + await logger.ainfo( "Nothing to export in batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, ) return - await logger.info("BatchExporting %s rows", count) + await logger.ainfo("BatchExporting %s rows", count) conn = snowflake.connector.connect( user=inputs.user, @@ -183,7 +183,7 @@ async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs): break except json.JSONDecodeError: - await logger.info( + await logger.ainfo( "Failed to decode a JSON value while iterating, potentially due to a ClickHouse error" ) # This is raised by aiochclient as we try to decode an error message from ClickHouse. @@ -219,7 +219,7 @@ async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs): local_results_file.tell() and local_results_file.tell() > settings.BATCH_EXPORT_SNOWFLAKE_UPLOAD_CHUNK_SIZE_BYTES ): - await logger.info("Uploading to Snowflake") + await logger.ainfo("Uploading to Snowflake") # Flush the file to make sure everything is written local_results_file.flush() @@ -296,8 +296,8 @@ async def run(self, inputs: SnowflakeBatchExportInputs): """Workflow implementation to export data to Snowflake table.""" logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="Snowflake") data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end) - await logger.info( - "Starting export batch %s - %s", + await logger.ainfo( + "Starting batch export %s - %s", data_interval_start, data_interval_end, ) From 2006fc61379aaa812d5b4af20e749b9a206adc0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Thu, 2 Nov 2023 18:02:27 +0100 Subject: [PATCH 13/23] fix: Remove old tests --- .../tests/batch_exports/test_batch_exports.py | 108 ------------------ 1 file changed, 108 deletions(-) diff --git a/posthog/temporal/tests/batch_exports/test_batch_exports.py b/posthog/temporal/tests/batch_exports/test_batch_exports.py index 3b988307e2e91..4819b8aa2025d 100644 --- a/posthog/temporal/tests/batch_exports/test_batch_exports.py +++ b/posthog/temporal/tests/batch_exports/test_batch_exports.py @@ -1,13 +1,8 @@ import csv -import dataclasses import datetime as dt import io import json -import logging import operator -import random -import string -import uuid from random import randint from unittest.mock import patch @@ -24,8 +19,6 @@ from posthog.temporal.tests.utils.events import generate_test_events_in_clickhouse from posthog.temporal.workflows.batch_exports import ( BatchExportTemporaryFile, - KafkaLoggingHandler, - get_batch_exports_logger, get_data_interval, get_results_iterator, get_rows_count, @@ -540,104 +533,3 @@ def test_batch_export_temporary_file_write_records_to_tsv(records): assert be_file.bytes_since_last_reset == 0 assert be_file.records_total == len(records) assert be_file.records_since_last_reset == 0 - - -def test_kafka_logging_handler_produces_to_kafka(caplog): - """Test a mocked call to Kafka produce from the KafkaLoggingHandler.""" - logger_name = "test-logger" - logger = logging.getLogger(logger_name) - handler = KafkaLoggingHandler(topic=KAFKA_LOG_ENTRIES) - handler.setLevel(logging.DEBUG) - logger.addHandler(handler) - - team_id = random.randint(1, 10000) - batch_export_id = str(uuid.uuid4()) - run_id = str(uuid.uuid4()) - timestamp = "2023-09-21 00:01:01.000001" - - expected_tuples = [] - expected_kafka_produce_calls_kwargs = [] - - with patch("posthog.kafka_client.client._KafkaProducer.produce") as produce: - with caplog.at_level(logging.DEBUG): - with freeze_time(timestamp): - for level in (10, 20, 30, 40, 50): - random_message = "".join(random.choice(string.ascii_letters) for _ in range(30)) - - logger.log( - level, - random_message, - extra={ - "team_id": team_id, - "batch_export_id": batch_export_id, - "workflow_run_id": run_id, - }, - ) - - expected_tuples.append( - ( - logger_name, - level, - random_message, - ) - ) - data = { - "message": random_message, - "team_id": team_id, - "log_source": "batch_exports", - "log_source_id": batch_export_id, - "instance_id": run_id, - "timestamp": timestamp, - "level": logging.getLevelName(level), - } - expected_kafka_produce_calls_kwargs.append({"topic": KAFKA_LOG_ENTRIES, "data": data, "key": None}) - - assert caplog.record_tuples == expected_tuples - - kafka_produce_calls_kwargs = [call.kwargs for call in produce.call_args_list] - assert kafka_produce_calls_kwargs == expected_kafka_produce_calls_kwargs - - -@dataclasses.dataclass -class TestInputs: - team_id: int - data_interval_end: str | None = None - interval: str = "hour" - batch_export_id: str = "" - - -@dataclasses.dataclass -class TestInfo: - workflow_id: str - run_id: str - workflow_run_id: str - attempt: int - - -@pytest.mark.parametrize("context", [activity.__name__, workflow.__name__]) -def test_batch_export_logger_adapter(context, caplog): - """Test BatchExportLoggerAdapter sets the appropiate context variables.""" - team_id = random.randint(1, 10000) - inputs = TestInputs(team_id=team_id) - logger = get_batch_exports_logger(inputs=inputs) - - batch_export_id = str(uuid.uuid4()) - run_id = str(uuid.uuid4()) - attempt = random.randint(1, 10) - info = TestInfo( - workflow_id=f"{batch_export_id}-{dt.datetime.utcnow().isoformat()}", - run_id=run_id, - workflow_run_id=run_id, - attempt=attempt, - ) - - with patch("posthog.kafka_client.client._KafkaProducer.produce"): - with patch(context + ".info", return_value=info): - for level in (10, 20, 30, 40, 50): - logger.log(level, "test") - - records = caplog.get_records("call") - assert all(record.team_id == team_id for record in records) - assert all(record.batch_export_id == batch_export_id for record in records) - assert all(record.workflow_run_id == run_id for record in records) - assert all(record.attempt == attempt for record in records) From 566d4334c56459f7776722afeeea82b549337b28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Thu, 2 Nov 2023 18:23:59 +0100 Subject: [PATCH 14/23] chore: Change typing of return logger --- posthog/temporal/workflows/logger.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/posthog/temporal/workflows/logger.py b/posthog/temporal/workflows/logger.py index 29961783a4ed3..9358d12dbd0ae 100644 --- a/posthog/temporal/workflows/logger.py +++ b/posthog/temporal/workflows/logger.py @@ -11,7 +11,9 @@ from posthog.kafka_client.topics import KAFKA_LOG_ENTRIES -async def bind_batch_exports_logger(team_id: int, destination: str | None = None) -> structlog.stdlib.BoundLogger: +async def bind_batch_exports_logger( + team_id: int, destination: str | None = None +) -> structlog.types.FilteringBoundLogger: """Return a logger for BatchExports.""" if not structlog.is_configured(): await configure_logger() From 564e8da9ed83218191ca12ba5e9dd073480e215d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Thu, 2 Nov 2023 18:40:14 +0100 Subject: [PATCH 15/23] chore: Bump structlog --- requirements.in | 1 + requirements.txt | 6 ++++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/requirements.in b/requirements.in index c9ff24b4ec4f9..47dc2e15ea1c7 100644 --- a/requirements.in +++ b/requirements.in @@ -77,6 +77,7 @@ snowflake-connector-python==3.0.4 social-auth-app-django==5.0.0 social-auth-core==4.3.0 statshog==1.0.6 +structlog==23.2.0 sqlparse==0.4.4 temporalio==1.1.0 token-bucket==0.3.0 diff --git a/requirements.txt b/requirements.txt index 955af5581ba93..99725364b3364 100644 --- a/requirements.txt +++ b/requirements.txt @@ -499,8 +499,10 @@ sqlparse==0.4.4 # django statshog==1.0.6 # via -r requirements.in -structlog==21.2.0 - # via django-structlog +structlog==23.2.0 + # via + # -r requirements.in + # django-structlog temporalio==1.1.0 # via -r requirements.in tenacity==6.1.0 From d26048150a37a349a11f1440eab09584473d7cf4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Thu, 2 Nov 2023 18:42:58 +0100 Subject: [PATCH 16/23] chore: Extend docstrings --- posthog/temporal/workflows/logger.py | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/posthog/temporal/workflows/logger.py b/posthog/temporal/workflows/logger.py index 9358d12dbd0ae..d120f8cd49cb8 100644 --- a/posthog/temporal/workflows/logger.py +++ b/posthog/temporal/workflows/logger.py @@ -14,7 +14,7 @@ async def bind_batch_exports_logger( team_id: int, destination: str | None = None ) -> structlog.types.FilteringBoundLogger: - """Return a logger for BatchExports.""" + """Return a bound logger for BatchExports.""" if not structlog.is_configured(): await configure_logger() @@ -32,9 +32,18 @@ async def configure_logger( ) -> tuple: """Configure a StructLog logger for batch exports. + Configuring the logger involves: + * Setting up processors. + * Spawning a task to listen for Kafka logs. + * Spawning a task to shutdown gracefully on worker shutdown. + Args: + logger_factory: Optionally, override the logger_factory. + extra_processors: Optionally, add any processors at the end of the chain. queue: Optionally, bring your own log queue. producer: Optionally, bring your own Kafka producer. + cache_logger_on_first_use: Set whether to cache logger for performance. + Should always be True except in tests. """ log_queue = queue if queue is not None else asyncio.Queue(maxsize=-1) put_in_queue = PutInBatchExportsLogQueueProcessor(log_queue) @@ -82,7 +91,7 @@ async def worker_shutdown_handler(): class PutInBatchExportsLogQueueProcessor: """A StructLog processor that puts event_dict into a queue. - The idea is that any event_dicts can be processed later by any queue listeners. + We format event_dict as a message to be sent to Kafka by a queue listener. """ def __init__(self, queue: asyncio.Queue): @@ -91,6 +100,11 @@ def __init__(self, queue: asyncio.Queue): def __call__( self, logger: logging.Logger, method_name: str, event_dict: structlog.types.EventDict ) -> structlog.types.EventDict: + """Put a message into the queue, if we have all the necessary details. + + Always return event_dict so that processors that come later in the chain can do + their own thing. + """ try: message_dict = { "instance_id": event_dict["workflow_run_id"], @@ -234,7 +248,11 @@ def __init__( ) async def listen_and_produce(self): - """Listen to messages in queue and produce them to Kafka as they come.""" + """Listen to messages in queue and produce them to Kafka as they come. + + This is designed to be ran as an asyncio.Task, as it will wait forever for the queue + to have messages. + """ await self.producer.start() try: From 27a5f8a65d392e5740b060656d06c6fb949a1006 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Thu, 2 Nov 2023 22:49:23 +0100 Subject: [PATCH 17/23] fix: Don't use async logging as it's unsupported by temporal runtime --- .../test_bigquery_batch_export_workflow.py | 3 +++ .../workflows/backfill_batch_export.py | 8 +++---- posthog/temporal/workflows/batch_exports.py | 8 +++---- .../workflows/bigquery_batch_export.py | 12 +++++----- posthog/temporal/workflows/logger.py | 1 - .../workflows/postgres_batch_export.py | 12 +++++----- .../workflows/redshift_batch_export.py | 8 +++---- posthog/temporal/workflows/s3_batch_export.py | 22 +++++++++---------- .../workflows/snowflake_batch_export.py | 12 +++++----- 9 files changed, 44 insertions(+), 42 deletions(-) diff --git a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py index 72646e2e993c4..32327e05479bb 100644 --- a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py @@ -34,6 +34,8 @@ TEST_TIME = dt.datetime.utcnow() +pytestmark = [pytest.mark.asyncio_event_loop, pytest.mark.asyncio] + def assert_events_in_bigquery( client, table_id, dataset_id, events, bq_ingested_timestamp, exclude_events: list[str] | None = None @@ -439,6 +441,7 @@ async def never_finish_activity(_: BigQueryInsertInputs) -> str: task_queue=settings.TEMPORAL_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), ) + await asyncio.sleep(5) await handle.cancel() diff --git a/posthog/temporal/workflows/backfill_batch_export.py b/posthog/temporal/workflows/backfill_batch_export.py index d8872d6c31623..724a745451d4f 100644 --- a/posthog/temporal/workflows/backfill_batch_export.py +++ b/posthog/temporal/workflows/backfill_batch_export.py @@ -285,7 +285,7 @@ def parse_inputs(inputs: list[str]) -> BackfillBatchExportInputs: async def run(self, inputs: BackfillBatchExportInputs) -> None: """Workflow implementation to backfill a BatchExport.""" logger = await bind_batch_exports_logger(team_id=inputs.team_id) - await logger.ainfo( + logger.info( "Starting Backfill for BatchExport: %s - %s", inputs.start_at, inputs.end_at, @@ -347,16 +347,16 @@ async def run(self, inputs: BackfillBatchExportInputs) -> None: except temporalio.exceptions.ActivityError as e: if isinstance(e.cause, temporalio.exceptions.CancelledError): - await logger.aerror("Backfill was cancelled.") + logger.error("Backfill was cancelled.") update_inputs.status = "Cancelled" else: - await logger.aexception("Backfill failed.", exc_info=e.cause) + logger.exception("Backfill failed.", exc_info=e.cause) update_inputs.status = "Failed" raise except Exception as e: - await logger.aexception("Backfill failed with an unexpected error.", exc_info=e) + logger.exception("Backfill failed with an unexpected error.", exc_info=e) update_inputs.status = "Failed" raise diff --git a/posthog/temporal/workflows/batch_exports.py b/posthog/temporal/workflows/batch_exports.py index be9433625127f..e35b91191bffa 100644 --- a/posthog/temporal/workflows/batch_exports.py +++ b/posthog/temporal/workflows/batch_exports.py @@ -607,23 +607,23 @@ async def execute_batch_export_insert_activity( ) except exceptions.ActivityError as e: if isinstance(e.cause, exceptions.CancelledError): - await logger.error("BatchExport was cancelled.") + logger.error("BatchExport was cancelled.") update_inputs.status = "Cancelled" else: - await logger.exception("BatchExport failed.", exc_info=e.cause) + logger.exception("BatchExport failed.", exc_info=e.cause) update_inputs.status = "Failed" update_inputs.latest_error = str(e.cause) raise except Exception as e: - await logger.exception("BatchExport failed with an unexpected error.", exc_info=e) + logger.exception("BatchExport failed with an unexpected error.", exc_info=e) update_inputs.status = "Failed" update_inputs.latest_error = "An unexpected error has ocurred" raise else: - await logger.info( + logger.info( "Successfully finished exporting batch %s - %s", inputs.data_interval_start, inputs.data_interval_end ) diff --git a/posthog/temporal/workflows/bigquery_batch_export.py b/posthog/temporal/workflows/bigquery_batch_export.py index aee1abfad6aa5..a743d665bb15b 100644 --- a/posthog/temporal/workflows/bigquery_batch_export.py +++ b/posthog/temporal/workflows/bigquery_batch_export.py @@ -99,7 +99,7 @@ def bigquery_client(inputs: BigQueryInsertInputs): async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs): """Activity streams data from ClickHouse to BigQuery.""" logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="BigQuery") - await logger.ainfo( + logger.info( "Exporting batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, @@ -119,14 +119,14 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs): ) if count == 0: - await logger.ainfo( + logger.info( "Nothing to export in batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, ) return - await logger.ainfo("BatchExporting %s rows", count) + logger.info("BatchExporting %s rows", count) results_iterator = get_results_iterator( client=client, @@ -173,7 +173,7 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs): jsonl_file.write_records_to_jsonl([row]) if jsonl_file.tell() > settings.BATCH_EXPORT_BIGQUERY_UPLOAD_CHUNK_SIZE_BYTES: - await logger.ainfo( + logger.info( "Copying %s records of size %s bytes", jsonl_file.records_since_last_reset, jsonl_file.bytes_since_last_reset, @@ -187,7 +187,7 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs): jsonl_file.reset() if jsonl_file.tell() > 0: - await logger.ainfo( + logger.info( "Copying %s records of size %s bytes", jsonl_file.records_since_last_reset, jsonl_file.bytes_since_last_reset, @@ -216,7 +216,7 @@ async def run(self, inputs: BigQueryBatchExportInputs): """Workflow implementation to export data to BigQuery.""" logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="BigQuery") data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end) - await logger.ainfo( + logger.info( "Starting batch export %s - %s", data_interval_start, data_interval_end, diff --git a/posthog/temporal/workflows/logger.py b/posthog/temporal/workflows/logger.py index d120f8cd49cb8..ffb14685f5ce4 100644 --- a/posthog/temporal/workflows/logger.py +++ b/posthog/temporal/workflows/logger.py @@ -78,7 +78,6 @@ async def worker_shutdown_handler(): """ await temporalio.activity.wait_for_worker_shutdown() - await log_queue.join() listen_task.cancel() await asyncio.wait([listen_task]) diff --git a/posthog/temporal/workflows/postgres_batch_export.py b/posthog/temporal/workflows/postgres_batch_export.py index 30999d73f58a1..70bae5e92e40f 100644 --- a/posthog/temporal/workflows/postgres_batch_export.py +++ b/posthog/temporal/workflows/postgres_batch_export.py @@ -144,7 +144,7 @@ class PostgresInsertInputs: async def insert_into_postgres_activity(inputs: PostgresInsertInputs): """Activity streams data from ClickHouse to Postgres.""" logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="PostgreSQL") - await logger.ainfo( + logger.info( "Running Postgres export batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, @@ -164,14 +164,14 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs): ) if count == 0: - await logger.ainfo( + logger.info( "Nothing to export in batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, ) return - await logger.ainfo("BatchExporting %s rows to Postgres", count) + logger.info("BatchExporting %s rows to Postgres", count) results_iterator = get_results_iterator( client=client, @@ -226,7 +226,7 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs): pg_file.write_records_to_tsv([row], fieldnames=schema_columns) if pg_file.tell() > settings.BATCH_EXPORT_POSTGRES_UPLOAD_CHUNK_SIZE_BYTES: - await logger.ainfo( + logger.info( "Copying %s records of size %s bytes to Postgres", pg_file.records_since_last_reset, pg_file.bytes_since_last_reset, @@ -241,7 +241,7 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs): pg_file.reset() if pg_file.tell() > 0: - await logger.ainfo( + logger.info( "Copying %s records of size %s bytes to Postgres", pg_file.records_since_last_reset, pg_file.bytes_since_last_reset, @@ -276,7 +276,7 @@ async def run(self, inputs: PostgresBatchExportInputs): """Workflow implementation to export data to Postgres.""" logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="PostgreSQL") data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end) - await logger.ainfo( + logger.info( "Starting Postgres export batch %s - %s", data_interval_start, data_interval_end, diff --git a/posthog/temporal/workflows/redshift_batch_export.py b/posthog/temporal/workflows/redshift_batch_export.py index fa649a2cfe552..3fb29661e5f3c 100644 --- a/posthog/temporal/workflows/redshift_batch_export.py +++ b/posthog/temporal/workflows/redshift_batch_export.py @@ -111,7 +111,7 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs): fields. """ logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="Redshift") - await logger.ainfo( + logger.info( "Exporting batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, @@ -131,14 +131,14 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs): ) if count == 0: - await logger.ainfo( + logger.info( "Nothing to export in batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, ) return - await logger.ainfo("BatchExporting %s rows", count) + logger.info("BatchExporting %s rows", count) results_iterator = get_results_iterator( client=client, @@ -219,7 +219,7 @@ async def run(self, inputs: RedshiftBatchExportInputs): """Workflow implementation to export data to Redshift.""" logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="Redshift") data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end) - await logger.ainfo("Starting Redshift export batch %s - %s", data_interval_start, data_interval_end) + logger.info("Starting Redshift export batch %s - %s", data_interval_start, data_interval_end) create_export_run_inputs = CreateBatchExportRunInputs( team_id=inputs.team_id, diff --git a/posthog/temporal/workflows/s3_batch_export.py b/posthog/temporal/workflows/s3_batch_export.py index 8288169f30416..8a5a851d28b1c 100644 --- a/posthog/temporal/workflows/s3_batch_export.py +++ b/posthog/temporal/workflows/s3_batch_export.py @@ -323,7 +323,7 @@ async def initialize_and_resume_multipart_upload(inputs: S3InsertInputs) -> tupl except IndexError: # This is the error we expect when no details as the sequence will be empty. interval_start = inputs.data_interval_start - await logger.adebug( + logger.debug( "Did not receive details from previous activity Excecution. Export will start from the beginning %s", interval_start, ) @@ -331,12 +331,12 @@ async def initialize_and_resume_multipart_upload(inputs: S3InsertInputs) -> tupl # We still start from the beginning, but we make a point to log unexpected errors. # Ideally, any new exceptions should be added to the previous block after the first time and we will never land here. interval_start = inputs.data_interval_start - await logger.awarning( + logger.warning( "Did not receive details from previous activity Excecution due to an unexpected error. Export will start from the beginning %s", interval_start, ) else: - await logger.ainfo( + logger.info( "Received details from previous activity. Export will attempt to resume from %s", interval_start, ) @@ -346,7 +346,7 @@ async def initialize_and_resume_multipart_upload(inputs: S3InsertInputs) -> tupl # Even if we receive details we cannot resume a brotli compressed upload as we have lost the compressor state. interval_start = inputs.data_interval_start - await logger.ainfo( + logger.info( f"Export will start from the beginning as we are using brotli compression: %s", interval_start, ) @@ -367,7 +367,7 @@ async def insert_into_s3_activity(inputs: S3InsertInputs): files. """ logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="S3") - await logger.ainfo( + logger.info( "Exporting batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, @@ -387,14 +387,14 @@ async def insert_into_s3_activity(inputs: S3InsertInputs): ) if count == 0: - await logger.ainfo( + logger.info( "Nothing to export in batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, ) return - await logger.ainfo("BatchExporting %s rows to S3", count) + logger.info("BatchExporting %s rows to S3", count) s3_upload, interval_start = await initialize_and_resume_multipart_upload(inputs) @@ -420,7 +420,7 @@ async def insert_into_s3_activity(inputs: S3InsertInputs): async def worker_shutdown_handler(): """Handle the Worker shutting down by heart-beating our latest status.""" await activity.wait_for_worker_shutdown() - await logger.awarn( + logger.warn( f"Worker shutting down! Reporting back latest exported part {last_uploaded_part_timestamp}", ) activity.heartbeat(last_uploaded_part_timestamp, s3_upload.to_state()) @@ -446,7 +446,7 @@ async def worker_shutdown_handler(): local_results_file.write_records_to_jsonl([record]) if local_results_file.tell() > settings.BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES: - await logger.info( + logger.info( "Uploading part %s containing %s records with size %s bytes", s3_upload.part_number + 1, local_results_file.records_since_last_reset, @@ -461,7 +461,7 @@ async def worker_shutdown_handler(): local_results_file.reset() if local_results_file.tell() > 0 and result is not None: - await logger.ainfo( + logger.info( "Uploading last part %s containing %s records with size %s bytes", s3_upload.part_number + 1, local_results_file.records_since_last_reset, @@ -496,7 +496,7 @@ async def run(self, inputs: S3BatchExportInputs): """Workflow implementation to export data to S3 bucket.""" logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="S3") data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end) - await logger.ainfo("Starting batch export %s - %s", data_interval_start, data_interval_end) + logger.info("Starting batch export %s - %s", data_interval_start, data_interval_end) create_export_run_inputs = CreateBatchExportRunInputs( team_id=inputs.team_id, diff --git a/posthog/temporal/workflows/snowflake_batch_export.py b/posthog/temporal/workflows/snowflake_batch_export.py index a4cfafc7f9adf..026e9a512c016 100644 --- a/posthog/temporal/workflows/snowflake_batch_export.py +++ b/posthog/temporal/workflows/snowflake_batch_export.py @@ -99,7 +99,7 @@ async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs): TODO: We're using JSON here, it's not the most efficient way to do this. """ logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="Snowflake") - await logger.ainfo( + logger.info( "Exporting batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, @@ -119,14 +119,14 @@ async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs): ) if count == 0: - await logger.ainfo( + logger.info( "Nothing to export in batch %s - %s", inputs.data_interval_start, inputs.data_interval_end, ) return - await logger.ainfo("BatchExporting %s rows", count) + logger.info("BatchExporting %s rows", count) conn = snowflake.connector.connect( user=inputs.user, @@ -183,7 +183,7 @@ async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs): break except json.JSONDecodeError: - await logger.ainfo( + logger.info( "Failed to decode a JSON value while iterating, potentially due to a ClickHouse error" ) # This is raised by aiochclient as we try to decode an error message from ClickHouse. @@ -219,7 +219,7 @@ async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs): local_results_file.tell() and local_results_file.tell() > settings.BATCH_EXPORT_SNOWFLAKE_UPLOAD_CHUNK_SIZE_BYTES ): - await logger.ainfo("Uploading to Snowflake") + logger.info("Uploading to Snowflake") # Flush the file to make sure everything is written local_results_file.flush() @@ -296,7 +296,7 @@ async def run(self, inputs: SnowflakeBatchExportInputs): """Workflow implementation to export data to Snowflake table.""" logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="Snowflake") data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end) - await logger.ainfo( + logger.info( "Starting batch export %s - %s", data_interval_start, data_interval_end, From 2182351b3c774272ba75898aef4c7e79d1b8d6a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Fri, 3 Nov 2023 12:39:25 +0100 Subject: [PATCH 18/23] test: Add logger tests --- .../tests/batch_exports/test_logger.py | 379 ++++++++++++++++++ 1 file changed, 379 insertions(+) create mode 100644 posthog/temporal/tests/batch_exports/test_logger.py diff --git a/posthog/temporal/tests/batch_exports/test_logger.py b/posthog/temporal/tests/batch_exports/test_logger.py new file mode 100644 index 0000000000000..7a26881acea2d --- /dev/null +++ b/posthog/temporal/tests/batch_exports/test_logger.py @@ -0,0 +1,379 @@ +import asyncio +import dataclasses +import datetime as dt +import json +import random +import time +import uuid + +import aiokafka +import freezegun +import pytest +import pytest_asyncio +import structlog +import temporalio.activity +import temporalio.testing +from django.conf import settings + +from posthog.clickhouse.client import sync_execute +from posthog.clickhouse.log_entries import ( + KAFKA_LOG_ENTRIES_TABLE_SQL, + LOG_ENTRIES_TABLE, + LOG_ENTRIES_TABLE_MV_SQL, + TRUNCATE_LOG_ENTRIES_TABLE_SQL, +) +from posthog.kafka_client.topics import KAFKA_LOG_ENTRIES +from posthog.temporal.workflows.logger import bind_batch_exports_logger, configure_logger + +pytestmark = [pytest.mark.asyncio_event_loop, pytest.mark.asyncio] + + +class LogCapture: + """A test StructLog processor to capture logs.""" + + def __init__(self): + self.entries = [] + + def __call__(self, logger, method_name, event_dict): + """Append event_dict to entries and drop the log.""" + self.entries.append(event_dict) + raise structlog.DropEvent() + + +@pytest.fixture() +def log_capture(): + """Return a LogCapture processor for inspection in tests.""" + return LogCapture() + + +class QueueCapture(asyncio.Queue): + """A test asyncio.Queue that captures items that we put into it.""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.entries = [] + + def put_nowait(self, item): + """Append item to entries and delegate to asyncio.Queue.""" + self.entries.append(item) + super().put_nowait(item) + + +@pytest_asyncio.fixture() +async def queue(): + """Return a QueueCapture queue for inspection in tests.""" + queue = QueueCapture(maxsize=-1) + + yield queue + + +class CaptureKafkaProducer(aiokafka.AIOKafkaProducer): + """A test aiokafka.AIOKafkaProducer that captures calls to send_and_wait.""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.entries = [] + + async def send_and_wait(self, topic, value=None, key=None, partition=None, timestamp_ms=None, headers=None): + """Append an entry and delegate to aiokafka.AIOKafkaProducer.""" + + self.entries.append( + { + "topic": topic, + "value": value, + "key": key, + "partition": partition, + "timestamp_ms": timestamp_ms, + "headers": headers, + } + ) + return await super().send_and_wait(topic, value, key, partition, timestamp_ms, headers) + + +@pytest_asyncio.fixture(scope="function") +async def producer(event_loop): + """Yield a CaptureKafkaProducer to inspect entries captured. + + After usage, we ensure the producer was closed to avoid leaking/warnings. + """ + producer = CaptureKafkaProducer(bootstrap_servers=settings.KAFKA_HOSTS, loop=event_loop) + + yield producer + + if producer._closed is False: + await producer.stop() + + +@pytest_asyncio.fixture(autouse=True) +async def configure(log_capture, queue, producer): + """Configure StructLog logging for testing. + + The extra parameters configured for testing are: + * Add a LogCapture processor to capture logs. + * Set the queue and producer to capture messages sent. + * Do not cache logger to ensure each test starts clean. + """ + tasks = await configure_logger( + extra_processors=[log_capture], queue=queue, producer=producer, cache_logger_on_first_use=False + ) + yield tasks + + for task in tasks: + # Clean up logger tasks to avoid leaking/warnings. + task.cancel() + + await asyncio.wait(tasks) + + +@pytest.mark.asyncio +async def test_batch_exports_logger_binds_context(log_capture): + """Test whether we can bind context variables.""" + logger = await bind_batch_exports_logger(team_id=1, destination="Somewhere") + + logger.info("Hi! This is an info log") + logger.error("Hi! This is an erro log") + + assert len(log_capture.entries) == 2 + + info_entry, error_entry = log_capture.entries + info_dict, error_dict = json.loads(info_entry), json.loads(error_entry) + assert info_dict["team_id"] == 1 + assert info_dict["destination"] == "Somewhere" + + assert error_dict["team_id"] == 1 + assert error_dict["destination"] == "Somewhere" + + +@pytest.mark.asyncio +async def test_batch_exports_logger_formats_positional_args(log_capture): + """Test whether positional arguments are formatted in the message.""" + logger = await bind_batch_exports_logger(team_id=1, destination="Somewhere") + + logger.info("Hi! This is an %s log", "info") + logger.error("Hi! This is an %s log", "error") + + assert len(log_capture.entries) == 2 + + info_entry, error_entry = log_capture.entries + info_dict, error_dict = json.loads(info_entry), json.loads(error_entry) + assert info_dict["msg"] == "Hi! This is an info log" + assert error_dict["msg"] == "Hi! This is an error log" + + +@dataclasses.dataclass +class TestActivityInfo: + """Provide our own Activity Info for testing.""" + + workflow_id: str + workflow_type: str + workflow_run_id: str + attempt: int + + +@pytest.fixture +def activity_environment(request): + """Return a testing temporal ActivityEnvironment.""" + env = temporalio.testing.ActivityEnvironment() + env.info = request.param + return env + + +BATCH_EXPORT_ID = str(uuid.uuid4()) + + +@pytest.mark.parametrize( + "activity_environment", + [ + TestActivityInfo( + workflow_id=f"{BATCH_EXPORT_ID}-{dt.datetime.utcnow()}", + workflow_type="s3-export", + workflow_run_id=str(uuid.uuid4()), + attempt=random.randint(1, 10000), + ), + TestActivityInfo( + workflow_id=f"{BATCH_EXPORT_ID}-Backfill-{dt.datetime.utcnow()}", + workflow_type="backfill-batch-export", + workflow_run_id=str(uuid.uuid4()), + attempt=random.randint(1, 10000), + ), + ], + indirect=True, +) +async def test_batch_exports_logger_binds_activity_context( + log_capture, + activity_environment, +): + """Test whether our logger binds variables from a Temporal Activity.""" + + @temporalio.activity.defn + async def log_activity(): + """A simple temporal activity that just logs.""" + logger = await bind_batch_exports_logger(team_id=1, destination="Somewhere") + + logger.info("Hi! This is an %s log from an activity", "info") + + await activity_environment.run(log_activity) + + assert len(log_capture.entries) == 1 + + info_dict = json.loads(log_capture.entries[0]) + assert info_dict["team_id"] == 1 + assert info_dict["destination"] == "Somewhere" + assert info_dict["workflow_id"] == activity_environment.info.workflow_id + assert info_dict["workflow_type"] == activity_environment.info.workflow_type + assert info_dict["log_source_id"] == BATCH_EXPORT_ID + assert info_dict["workflow_run_id"] == activity_environment.info.workflow_run_id + assert info_dict["attempt"] == activity_environment.info.attempt + + if activity_environment.info.workflow_type == "backfill-batch-export": + assert info_dict["log_source"] == "batch_exports_backfill" + else: + assert info_dict["log_source"] == "batch_exports" + + +@freezegun.freeze_time("2023-11-02 10:00:00.123123") +@pytest.mark.parametrize( + "activity_environment", + [ + TestActivityInfo( + workflow_id=f"{BATCH_EXPORT_ID}-{dt.datetime.utcnow()}", + workflow_type="s3-export", + workflow_run_id=str(uuid.uuid4()), + attempt=random.randint(1, 10000), + ), + TestActivityInfo( + workflow_id=f"{BATCH_EXPORT_ID}-Backfill-{dt.datetime.utcnow()}", + workflow_type="backfill-batch-export", + workflow_run_id=str(uuid.uuid4()), + attempt=random.randint(1, 10000), + ), + ], + indirect=True, +) +async def test_batch_exports_logger_puts_in_queue(activity_environment, queue): + """Test whether our logger puts entries into a queue for async processing.""" + + @temporalio.activity.defn + async def log_activity(): + """A simple temporal activity that just logs.""" + logger = await bind_batch_exports_logger(team_id=2, destination="Somewhere") + + logger.info("Hi! This is an %s log from an activity", "info") + + await activity_environment.run(log_activity) + + assert len(queue.entries) == 1 + message_dict = json.loads(queue.entries[0].decode("utf-8")) + + assert message_dict["instance_id"] == activity_environment.info.workflow_run_id + assert message_dict["level"] == "info" + + if activity_environment.info.workflow_type == "backfill-batch-export": + assert message_dict["log_source"] == "batch_exports_backfill" + else: + assert message_dict["log_source"] == "batch_exports" + + assert message_dict["log_source_id"] == BATCH_EXPORT_ID + assert message_dict["message"] == "Hi! This is an info log from an activity" + assert message_dict["team_id"] == 2 + assert message_dict["timestamp"] == "2023-11-02 10:00:00.123123" + + +@pytest.fixture() +def log_entries_table(): + """Manage log_entries table for testing.""" + sync_execute(KAFKA_LOG_ENTRIES_TABLE_SQL()) + sync_execute(LOG_ENTRIES_TABLE_MV_SQL) + sync_execute(TRUNCATE_LOG_ENTRIES_TABLE_SQL) + + yield LOG_ENTRIES_TABLE + + sync_execute(f"DROP TABLE {LOG_ENTRIES_TABLE}_mv") + sync_execute(f"DROP TABLE kafka_{LOG_ENTRIES_TABLE}") + sync_execute(TRUNCATE_LOG_ENTRIES_TABLE_SQL) + + +@freezegun.freeze_time("2023-11-03 10:00:00.123123") +@pytest.mark.django_db +@pytest.mark.parametrize( + "activity_environment", + [ + TestActivityInfo( + workflow_id=f"{BATCH_EXPORT_ID}-{dt.datetime.utcnow()}", + workflow_type="s3-export", + workflow_run_id=str(uuid.uuid4()), + attempt=random.randint(1, 10000), + ), + TestActivityInfo( + workflow_id=f"{BATCH_EXPORT_ID}-Backfill-{dt.datetime.utcnow()}", + workflow_type="backfill-batch-export", + workflow_run_id=str(uuid.uuid4()), + attempt=random.randint(1, 10000), + ), + ], + indirect=True, +) +async def test_batch_exports_logger_produces_to_kafka(activity_environment, producer, queue, log_entries_table): + """Test whether our logger produces messages to Kafka. + + We also check if those messages are ingested into ClickHouse. + """ + + @temporalio.activity.defn + async def log_activity(): + """A simple temporal activity that just logs.""" + logger = await bind_batch_exports_logger(team_id=3, destination="Somewhere") + + logger.info("Hi! This is an %s log from an activity", "info") + + await activity_environment.run(log_activity) + assert len(queue.entries) == 1 + + await queue.join() + + if activity_environment.info.workflow_type == "backfill-batch-export": + expected_log_source = "batch_exports_backfill" + else: + expected_log_source = "batch_exports" + + expected_dict = { + "instance_id": activity_environment.info.workflow_run_id, + "level": "info", + "log_source": expected_log_source, + "log_source_id": BATCH_EXPORT_ID, + "message": "Hi! This is an info log from an activity", + "team_id": 3, + "timestamp": "2023-11-03 10:00:00.123123", + } + + assert len(producer.entries) == 1 + assert producer.entries[0] == { + "topic": KAFKA_LOG_ENTRIES, + "value": json.dumps(expected_dict).encode("utf-8"), + "key": None, + "partition": None, + "timestamp_ms": None, + "headers": None, + } + + results = sync_execute( + f"SELECT instance_id, level, log_source, log_source_id, message, team_id, timestamp FROM {log_entries_table}" + ) + + while not results: + # It may take a bit for CH to ingest. + time.sleep(2) + results = sync_execute( + f"SELECT instance_id, level, log_source, log_source_id, message, team_id, timestamp FROM {log_entries_table}" + ) + + assert len(results) == 1 # type: ignore + + row = results[0] # type: ignore + assert row[0] == activity_environment.info.workflow_run_id + assert row[1] == "info" + assert row[2] == expected_log_source + assert row[3] == BATCH_EXPORT_ID + assert row[4] == "Hi! This is an info log from an activity" + assert row[5] == 3 + assert row[6].isoformat() == "2023-11-03T10:00:00.123123+00:00" From d81f0b1cf8b8090f1159df563768a034019019ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 7 Nov 2023 15:03:22 +0100 Subject: [PATCH 19/23] fix: Mix pytestmark lists --- .../batch_exports/test_bigquery_batch_export_workflow.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py index 32327e05479bb..d21249d43d8ef 100644 --- a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py @@ -30,12 +30,10 @@ insert_into_bigquery_activity, ) -pytestmark = [pytest.mark.asyncio, pytest.mark.django_db] +pytestmark = [pytest.mark.asyncio, pytest.mark.asyncio_event_loop, pytest.mark.django_db] TEST_TIME = dt.datetime.utcnow() -pytestmark = [pytest.mark.asyncio_event_loop, pytest.mark.asyncio] - def assert_events_in_bigquery( client, table_id, dataset_id, events, bq_ingested_timestamp, exclude_events: list[str] | None = None From 7d19cb14bd99985159f8e2e36db40f84773b71ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 7 Nov 2023 15:33:18 +0100 Subject: [PATCH 20/23] fix: Remove unused imports --- posthog/temporal/tests/batch_exports/test_batch_exports.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/posthog/temporal/tests/batch_exports/test_batch_exports.py b/posthog/temporal/tests/batch_exports/test_batch_exports.py index 4819b8aa2025d..15f726255cce4 100644 --- a/posthog/temporal/tests/batch_exports/test_batch_exports.py +++ b/posthog/temporal/tests/batch_exports/test_batch_exports.py @@ -4,15 +4,9 @@ import json import operator from random import randint -from unittest.mock import patch import pytest -from freezegun import freeze_time -from temporalio import activity, workflow -from posthog.clickhouse.log_entries import ( - KAFKA_LOG_ENTRIES, -) from posthog.temporal.tests.utils.datetimes import ( to_isoformat, ) From eb0b92764e63fe6bb1570ef6ec5a0f291702753f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 7 Nov 2023 16:34:13 +0100 Subject: [PATCH 21/23] fix: Cleanup pytest warnings --- .../temporal/tests/batch_exports/test_logger.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/posthog/temporal/tests/batch_exports/test_logger.py b/posthog/temporal/tests/batch_exports/test_logger.py index 7a26881acea2d..2817d481bb202 100644 --- a/posthog/temporal/tests/batch_exports/test_logger.py +++ b/posthog/temporal/tests/batch_exports/test_logger.py @@ -125,7 +125,6 @@ async def configure(log_capture, queue, producer): await asyncio.wait(tasks) -@pytest.mark.asyncio async def test_batch_exports_logger_binds_context(log_capture): """Test whether we can bind context variables.""" logger = await bind_batch_exports_logger(team_id=1, destination="Somewhere") @@ -144,7 +143,6 @@ async def test_batch_exports_logger_binds_context(log_capture): assert error_dict["destination"] == "Somewhere" -@pytest.mark.asyncio async def test_batch_exports_logger_formats_positional_args(log_capture): """Test whether positional arguments are formatted in the message.""" logger = await bind_batch_exports_logger(team_id=1, destination="Somewhere") @@ -161,7 +159,7 @@ async def test_batch_exports_logger_formats_positional_args(log_capture): @dataclasses.dataclass -class TestActivityInfo: +class ActivityInfo: """Provide our own Activity Info for testing.""" workflow_id: str @@ -184,13 +182,13 @@ def activity_environment(request): @pytest.mark.parametrize( "activity_environment", [ - TestActivityInfo( + ActivityInfo( workflow_id=f"{BATCH_EXPORT_ID}-{dt.datetime.utcnow()}", workflow_type="s3-export", workflow_run_id=str(uuid.uuid4()), attempt=random.randint(1, 10000), ), - TestActivityInfo( + ActivityInfo( workflow_id=f"{BATCH_EXPORT_ID}-Backfill-{dt.datetime.utcnow()}", workflow_type="backfill-batch-export", workflow_run_id=str(uuid.uuid4()), @@ -235,13 +233,13 @@ async def log_activity(): @pytest.mark.parametrize( "activity_environment", [ - TestActivityInfo( + ActivityInfo( workflow_id=f"{BATCH_EXPORT_ID}-{dt.datetime.utcnow()}", workflow_type="s3-export", workflow_run_id=str(uuid.uuid4()), attempt=random.randint(1, 10000), ), - TestActivityInfo( + ActivityInfo( workflow_id=f"{BATCH_EXPORT_ID}-Backfill-{dt.datetime.utcnow()}", workflow_type="backfill-batch-export", workflow_run_id=str(uuid.uuid4()), @@ -298,13 +296,13 @@ def log_entries_table(): @pytest.mark.parametrize( "activity_environment", [ - TestActivityInfo( + ActivityInfo( workflow_id=f"{BATCH_EXPORT_ID}-{dt.datetime.utcnow()}", workflow_type="s3-export", workflow_run_id=str(uuid.uuid4()), attempt=random.randint(1, 10000), ), - TestActivityInfo( + ActivityInfo( workflow_id=f"{BATCH_EXPORT_ID}-Backfill-{dt.datetime.utcnow()}", workflow_type="backfill-batch-export", workflow_run_id=str(uuid.uuid4()), From d495f7c0a4546170a878917769367216e188b161 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 7 Nov 2023 16:34:37 +0100 Subject: [PATCH 22/23] fix: Create and drop dataset for bigquery tests --- .../test_bigquery_batch_export_workflow.py | 41 +++++++++++++------ 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py index d21249d43d8ef..8ea50196d8b2c 100644 --- a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py @@ -108,9 +108,6 @@ def bigquery_config() -> dict[str, str]: "private_key_id": credentials["private_key_id"], "token_uri": credentials["token_uri"], "client_email": credentials["client_email"], - # Not part of the credentials. - # Hardcoded to test dataset. - "dataset_id": "BatchExports", } @@ -119,10 +116,25 @@ def bigquery_client() -> typing.Generator[bigquery.Client, None, None]: """Manage a bigquery.Client for testing.""" client = bigquery.Client() - try: - yield client - finally: - client.close() + yield client + + client.close() + + +@pytest.fixture +def bigquery_dataset(bigquery_config, bigquery_client) -> typing.Generator[bigquery.Dataset, None, None]: + """Manage a bigquery dataset for testing. + + We clean up the dataset after every test. Could be quite time expensive, but guarantees a clean slate. + """ + dataset_id = f"{bigquery_config['project_id']}.BatchExportsTest_{str(uuid4()).replace('-', '')}" + + dataset = bigquery.Dataset(dataset_id) + dataset = bigquery_client.create_dataset(dataset) + + yield dataset + + bigquery_client.delete_dataset(dataset_id, delete_contents=True, not_found_ok=True) @pytest.mark.skipif( @@ -131,7 +143,7 @@ def bigquery_client() -> typing.Generator[bigquery.Client, None, None]: ) @pytest.mark.parametrize("exclude_events", [None, ["test-exclude"]], indirect=True) async def test_insert_into_bigquery_activity_inserts_data_into_bigquery_table( - clickhouse_client, activity_environment, bigquery_client, bigquery_config, exclude_events + clickhouse_client, activity_environment, bigquery_client, bigquery_config, exclude_events, bigquery_dataset ): """Test that the insert_into_bigquery_activity function inserts data into a BigQuery table. @@ -194,6 +206,7 @@ async def test_insert_into_bigquery_activity_inserts_data_into_bigquery_table( insert_inputs = BigQueryInsertInputs( team_id=team_id, table_id=f"test_insert_activity_table_{team_id}", + dataset_id=bigquery_dataset.dataset_id, data_interval_start=data_interval_start.isoformat(), data_interval_end=data_interval_end.isoformat(), exclude_events=exclude_events, @@ -208,7 +221,7 @@ async def test_insert_into_bigquery_activity_inserts_data_into_bigquery_table( assert_events_in_bigquery( client=bigquery_client, table_id=f"test_insert_activity_table_{team_id}", - dataset_id=bigquery_config["dataset_id"], + dataset_id=bigquery_dataset.dataset_id, events=events + events_with_no_properties, bq_ingested_timestamp=ingested_timestamp, exclude_events=exclude_events, @@ -221,12 +234,15 @@ def table_id(ateam, interval): @pytest_asyncio.fixture -async def bigquery_batch_export(ateam, table_id, bigquery_config, interval, exclude_events, temporal_client): +async def bigquery_batch_export( + ateam, table_id, bigquery_config, interval, exclude_events, temporal_client, bigquery_dataset +): destination_data = { "type": "BigQuery", "config": { **bigquery_config, "table_id": table_id, + "dataset_id": bigquery_dataset.dataset_id, "exclude_events": exclude_events, }, } @@ -257,7 +273,6 @@ async def bigquery_batch_export(ateam, table_id, bigquery_config, interval, excl @pytest.mark.parametrize("exclude_events", [None, ["test-exclude"]], indirect=True) async def test_bigquery_export_workflow( clickhouse_client, - bigquery_config, bigquery_client, bigquery_batch_export, interval, @@ -303,7 +318,7 @@ async def test_bigquery_export_workflow( inputs = BigQueryBatchExportInputs( team_id=ateam.pk, batch_export_id=str(bigquery_batch_export.id), - data_interval_end="2023-04-25 14:30:00.000000", + data_interval_end=data_interval_end.isoformat(), interval=interval, **bigquery_batch_export.destination.config, ) @@ -340,7 +355,7 @@ async def test_bigquery_export_workflow( assert_events_in_bigquery( client=bigquery_client, table_id=table_id, - dataset_id=bigquery_config["dataset_id"], + dataset_id=bigquery_batch_export.destination.config["dataset_id"], events=events, bq_ingested_timestamp=ingested_timestamp, exclude_events=exclude_events, From 7f371cd1eda32dc4ad6d7f4936b99fbe33058f4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 7 Nov 2023 16:37:32 +0100 Subject: [PATCH 23/23] fix: Typing issue? --- posthog/temporal/workflows/logger.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/posthog/temporal/workflows/logger.py b/posthog/temporal/workflows/logger.py index ffb14685f5ce4..2261671dc56e7 100644 --- a/posthog/temporal/workflows/logger.py +++ b/posthog/temporal/workflows/logger.py @@ -7,13 +7,13 @@ import temporalio.activity import temporalio.workflow from django.conf import settings +from structlog.processors import EventRenamer +from structlog.typing import FilteringBoundLogger from posthog.kafka_client.topics import KAFKA_LOG_ENTRIES -async def bind_batch_exports_logger( - team_id: int, destination: str | None = None -) -> structlog.types.FilteringBoundLogger: +async def bind_batch_exports_logger(team_id: int, destination: str | None = None) -> FilteringBoundLogger: """Return a bound logger for BatchExports.""" if not structlog.is_configured(): await configure_logger() @@ -55,7 +55,7 @@ async def configure_logger( structlog.stdlib.PositionalArgumentsFormatter(), add_batch_export_context, put_in_queue, - structlog.processors.EventRenamer("msg"), + EventRenamer("msg"), structlog.processors.JSONRenderer(), ] extra_processors_to_add = extra_processors if extra_processors is not None else []