From 34a6bc404f1616d3530416c80ac13a5c05e40854 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 14 Nov 2023 12:17:07 +0100 Subject: [PATCH] fix: Reset structlog on temporal worker (#18581) * fix: Reset structlog on temporal worker * Update query snapshots * Update query snapshots * fix: Move logging to activities exclusively * fix: Remove Workflow info from logger --------- Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> --- posthog/batch_exports/service.py | 20 +++++-- .../commands/start_temporal_worker.py | 5 +- .../tests/batch_exports/test_run_updates.py | 1 + .../workflows/backfill_batch_export.py | 13 +--- posthog/temporal/workflows/batch_exports.py | 60 +++++++++++++++---- .../workflows/bigquery_batch_export.py | 10 +--- posthog/temporal/workflows/logger.py | 27 +-------- .../workflows/postgres_batch_export.py | 12 ++-- .../workflows/redshift_batch_export.py | 10 ++-- posthog/temporal/workflows/s3_batch_export.py | 8 ++- .../workflows/snowflake_batch_export.py | 18 +++--- 11 files changed, 95 insertions(+), 89 deletions(-) diff --git a/posthog/batch_exports/service.py b/posthog/batch_exports/service.py index 6abe24f075349..fc74d6f51f253 100644 --- a/posthog/batch_exports/service.py +++ b/posthog/batch_exports/service.py @@ -341,7 +341,7 @@ def create_batch_export_run( data_interval_start: str, data_interval_end: str, status: str = BatchExportRun.Status.STARTING, -): +) -> BatchExportRun: """Create a BatchExportRun after a Temporal Workflow execution. In a first approach, this method is intended to be called only by Temporal Workflows, @@ -364,16 +364,20 @@ def create_batch_export_run( return run -def update_batch_export_run_status(run_id: UUID, status: str, latest_error: str | None): +def update_batch_export_run_status(run_id: UUID, status: str, latest_error: str | None) -> BatchExportRun: """Update the status of an BatchExportRun with given id. Arguments: id: The id of the BatchExportRun to update. """ - updated = BatchExportRun.objects.filter(id=run_id).update(status=status, latest_error=latest_error) + model = BatchExportRun.objects.filter(id=run_id) + updated = model.update(status=status, latest_error=latest_error) + if not updated: raise ValueError(f"BatchExportRun with id {run_id} not found.") + return model.get() + def sync_batch_export(batch_export: BatchExport, created: bool): workflow, workflow_inputs = DESTINATION_WORKFLOWS[batch_export.destination.type] @@ -447,7 +451,7 @@ def create_batch_export_backfill( start_at: str, end_at: str, status: str = BatchExportRun.Status.RUNNING, -): +) -> BatchExportBackfill: """Create a BatchExportBackfill. @@ -470,13 +474,17 @@ def create_batch_export_backfill( return backfill -def update_batch_export_backfill_status(backfill_id: UUID, status: str): +def update_batch_export_backfill_status(backfill_id: UUID, status: str) -> BatchExportBackfill: """Update the status of an BatchExportBackfill with given id. Arguments: id: The id of the BatchExportBackfill to update. status: The new status to assign to the BatchExportBackfill. """ - updated = BatchExportBackfill.objects.filter(id=backfill_id).update(status=status) + model = BatchExportBackfill.objects.filter(id=backfill_id) + updated = model.update(status=status) + if not updated: raise ValueError(f"BatchExportBackfill with id {backfill_id} not found.") + + return model.get() diff --git a/posthog/management/commands/start_temporal_worker.py b/posthog/management/commands/start_temporal_worker.py index 17299f64fe920..31ac647a65449 100644 --- a/posthog/management/commands/start_temporal_worker.py +++ b/posthog/management/commands/start_temporal_worker.py @@ -1,11 +1,12 @@ import asyncio import logging +import structlog from temporalio import workflow with workflow.unsafe.imports_passed_through(): - from django.core.management.base import BaseCommand from django.conf import settings + from django.core.management.base import BaseCommand from prometheus_client import start_http_server @@ -70,6 +71,8 @@ def handle(self, *args, **options): options["client_key"] = "--SECRET--" logging.info(f"Starting Temporal Worker with options: {options}") + structlog.reset_defaults() + metrics_port = int(options["metrics_port"]) start_http_server(port=metrics_port) diff --git a/posthog/temporal/tests/batch_exports/test_run_updates.py b/posthog/temporal/tests/batch_exports/test_run_updates.py index 5b0974e5e8500..76f7f9bfee14d 100644 --- a/posthog/temporal/tests/batch_exports/test_run_updates.py +++ b/posthog/temporal/tests/batch_exports/test_run_updates.py @@ -122,6 +122,7 @@ async def test_update_export_run_status(activity_environment, team, batch_export update_inputs = UpdateBatchExportRunStatusInputs( id=str(run_id), status="Completed", + team_id=inputs.team_id, ) await activity_environment.run(update_export_run_status, update_inputs) diff --git a/posthog/temporal/workflows/backfill_batch_export.py b/posthog/temporal/workflows/backfill_batch_export.py index 53201e3bf9bc0..2cd77af1a0a44 100644 --- a/posthog/temporal/workflows/backfill_batch_export.py +++ b/posthog/temporal/workflows/backfill_batch_export.py @@ -22,7 +22,6 @@ create_batch_export_backfill_model, update_batch_export_backfill_model_status, ) -from posthog.temporal.workflows.logger import bind_batch_exports_logger class HeartbeatDetails(typing.NamedTuple): @@ -284,13 +283,6 @@ def parse_inputs(inputs: list[str]) -> BackfillBatchExportInputs: @temporalio.workflow.run async def run(self, inputs: BackfillBatchExportInputs) -> None: """Workflow implementation to backfill a BatchExport.""" - logger = await bind_batch_exports_logger(team_id=inputs.team_id) - logger.info( - "Starting Backfill for BatchExport: %s - %s", - inputs.start_at, - inputs.end_at, - ) - create_batch_export_backfill_inputs = CreateBatchExportBackfillInputs( team_id=inputs.team_id, batch_export_id=inputs.batch_export_id, @@ -347,16 +339,13 @@ async def run(self, inputs: BackfillBatchExportInputs) -> None: except temporalio.exceptions.ActivityError as e: if isinstance(e.cause, temporalio.exceptions.CancelledError): - logger.error("Backfill was cancelled.") update_inputs.status = "Cancelled" else: - logger.exception("Backfill failed.", exc_info=e.cause) update_inputs.status = "Failed" raise - except Exception as e: - logger.exception("Backfill failed with an unexpected error.", exc_info=e) + except Exception: update_inputs.status = "Failed" raise diff --git a/posthog/temporal/workflows/batch_exports.py b/posthog/temporal/workflows/batch_exports.py index 758047f5b10b3..d6892100a78f2 100644 --- a/posthog/temporal/workflows/batch_exports.py +++ b/posthog/temporal/workflows/batch_exports.py @@ -495,6 +495,12 @@ async def create_export_run(inputs: CreateBatchExportRunInputs) -> str: Intended to be used in all export workflows, usually at the start, to create a model instance to represent them in our database. """ + logger = await bind_batch_exports_logger(team_id=inputs.team_id) + logger.info( + "Creating batch export for range %s - %s", + inputs.data_interval_start, + inputs.data_interval_end, + ) # 'sync_to_async' type hints are fixed in asgiref>=3.4.1 # But one of our dependencies is pinned to asgiref==3.3.2. # Remove these comments once we upgrade. @@ -514,18 +520,34 @@ class UpdateBatchExportRunStatusInputs: id: str status: str + team_id: int latest_error: str | None = None @activity.defn async def update_export_run_status(inputs: UpdateBatchExportRunStatusInputs): """Activity that updates the status of an BatchExportRun.""" - await sync_to_async(update_batch_export_run_status)( + logger = await bind_batch_exports_logger(team_id=inputs.team_id) + + batch_export_run = await sync_to_async(update_batch_export_run_status)( run_id=uuid.UUID(inputs.id), status=inputs.status, latest_error=inputs.latest_error, ) # type: ignore + if batch_export_run.status == "Failed": + logger.error("BatchExport failed with error: %s", batch_export_run.latest_error) + + elif batch_export_run.status == "Cancelled": + logger.warning("BatchExport was cancelled.") + + else: + logger.info( + "Successfully finished exporting batch %s - %s", + batch_export_run.data_interval_start, + batch_export_run.data_interval_end, + ) + @dataclasses.dataclass class CreateBatchExportBackfillInputs: @@ -543,6 +565,12 @@ async def create_batch_export_backfill_model(inputs: CreateBatchExportBackfillIn Intended to be used in all batch export backfill workflows, usually at the start, to create a model instance to represent them in our database. """ + logger = await bind_batch_exports_logger(team_id=inputs.team_id) + logger.info( + "Creating historical export for batches in range %s - %s", + inputs.start_at, + inputs.end_at, + ) # 'sync_to_async' type hints are fixed in asgiref>=3.4.1 # But one of our dependencies is pinned to asgiref==3.3.2. # Remove these comments once we upgrade. @@ -568,7 +596,23 @@ class UpdateBatchExportBackfillStatusInputs: @activity.defn async def update_batch_export_backfill_model_status(inputs: UpdateBatchExportBackfillStatusInputs): """Activity that updates the status of an BatchExportRun.""" - await sync_to_async(update_batch_export_backfill_status)(backfill_id=uuid.UUID(inputs.id), status=inputs.status) # type: ignore + backfill = await sync_to_async(update_batch_export_backfill_status)( + backfill_id=uuid.UUID(inputs.id), status=inputs.status + ) # type: ignore + logger = await bind_batch_exports_logger(team_id=backfill.team_id) + + if backfill.status == "Failed": + logger.error("Historical export failed") + + elif backfill.status == "Cancelled": + logger.warning("Historical export was cancelled.") + + else: + logger.info( + "Successfully finished exporting historical batches in %s - %s", + backfill.start_at, + backfill.end_at, + ) async def execute_batch_export_insert_activity( @@ -600,7 +644,6 @@ async def execute_batch_export_insert_activity( maximum_retry_interval_seconds: Maximum interval in seconds between retries. """ destination = workflow.info().workflow_type.lower() - logger = await bind_batch_exports_logger(team_id=inputs.team_id) retry_policy = RetryPolicy( initial_interval=dt.timedelta(seconds=initial_retry_interval_seconds), @@ -617,28 +660,21 @@ async def execute_batch_export_insert_activity( heartbeat_timeout=dt.timedelta(seconds=heartbeat_timeout_seconds) if heartbeat_timeout_seconds else None, retry_policy=retry_policy, ) + except exceptions.ActivityError as e: if isinstance(e.cause, exceptions.CancelledError): - logger.error("BatchExport was cancelled.") update_inputs.status = "Cancelled" else: - logger.exception("BatchExport failed.", exc_info=e.cause) update_inputs.status = "Failed" update_inputs.latest_error = str(e.cause) raise - except Exception as e: - logger.exception("BatchExport failed with an unexpected error.", exc_info=e) + except Exception: update_inputs.status = "Failed" update_inputs.latest_error = "An unexpected error has ocurred" raise - else: - logger.info( - "Successfully finished exporting batch %s - %s", inputs.data_interval_start, inputs.data_interval_end - ) - finally: EXPORT_FINISHED.labels(destination=destination, status=update_inputs.status.lower()).inc() await workflow.execute_activity( diff --git a/posthog/temporal/workflows/bigquery_batch_export.py b/posthog/temporal/workflows/bigquery_batch_export.py index 65aa7d64fab2e..f337bf8b1bca4 100644 --- a/posthog/temporal/workflows/bigquery_batch_export.py +++ b/posthog/temporal/workflows/bigquery_batch_export.py @@ -212,14 +212,6 @@ def parse_inputs(inputs: list[str]) -> BigQueryBatchExportInputs: @workflow.run async def run(self, inputs: BigQueryBatchExportInputs): """Workflow implementation to export data to BigQuery.""" - logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="BigQuery") - data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end) - logger.info( - "Starting batch export %s - %s", - data_interval_start, - data_interval_end, - ) - data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end) create_export_run_inputs = CreateBatchExportRunInputs( @@ -240,7 +232,7 @@ async def run(self, inputs: BigQueryBatchExportInputs): ), ) - update_inputs = UpdateBatchExportRunStatusInputs(id=run_id, status="Completed") + update_inputs = UpdateBatchExportRunStatusInputs(id=run_id, status="Completed", team_id=inputs.team_id) insert_inputs = BigQueryInsertInputs( team_id=inputs.team_id, diff --git a/posthog/temporal/workflows/logger.py b/posthog/temporal/workflows/logger.py index 829e08983bbeb..4c013c82eb86e 100644 --- a/posthog/temporal/workflows/logger.py +++ b/posthog/temporal/workflows/logger.py @@ -165,14 +165,12 @@ def get_temporal_context() -> dict[str, str | int]: * workflow_run_id: The ID of the Temporal Workflow Execution running the batch export. * workflow_type: The name of the Temporal Workflow. - We attempt to fetch the context from the activity information, and then from the workflow - information. If both are undefined, an empty dict is returned. When running this in - an activity or a workflow, at least one context will be defined. + We attempt to fetch the context from the activity information. If undefined, an empty dict + is returned. When running this in an activity the context will be defined. """ activity_info = attempt_to_fetch_activity_info() - workflow_info = attempt_to_fetch_workflow_info() - info = activity_info or workflow_info + info = activity_info if info is None: return {} @@ -222,25 +220,6 @@ def attempt_to_fetch_activity_info() -> Info | None: return (workflow_id, workflow_type, workflow_run_id, attempt) -def attempt_to_fetch_workflow_info() -> Info | None: - """Fetch Workflow information from Temporal. - - Returns: - None if calling outside a Workflow, else the relevant Info. - """ - try: - workflow_info = temporalio.workflow.info() - except RuntimeError: - return None - else: - workflow_id = workflow_info.workflow_id - workflow_type = workflow_info.workflow_type - workflow_run_id = workflow_info.run_id - attempt = workflow_info.attempt - - return (workflow_id, workflow_type, workflow_run_id, attempt) - - class KafkaLogProducerFromQueue: """Produce log messages to Kafka by getting them from a queue. diff --git a/posthog/temporal/workflows/postgres_batch_export.py b/posthog/temporal/workflows/postgres_batch_export.py index 2de91c2ee691a..e408256e180d3 100644 --- a/posthog/temporal/workflows/postgres_batch_export.py +++ b/posthog/temporal/workflows/postgres_batch_export.py @@ -273,13 +273,7 @@ def parse_inputs(inputs: list[str]) -> PostgresBatchExportInputs: @workflow.run async def run(self, inputs: PostgresBatchExportInputs): """Workflow implementation to export data to Postgres.""" - logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="PostgreSQL") data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end) - logger.info( - "Starting Postgres export batch %s - %s", - data_interval_start, - data_interval_end, - ) create_export_run_inputs = CreateBatchExportRunInputs( team_id=inputs.team_id, @@ -299,7 +293,11 @@ async def run(self, inputs: PostgresBatchExportInputs): ), ) - update_inputs = UpdateBatchExportRunStatusInputs(id=run_id, status="Completed") + update_inputs = UpdateBatchExportRunStatusInputs( + id=run_id, + status="Completed", + team_id=inputs.team_id, + ) insert_inputs = PostgresInsertInputs( team_id=inputs.team_id, diff --git a/posthog/temporal/workflows/redshift_batch_export.py b/posthog/temporal/workflows/redshift_batch_export.py index 7339564a5b9a8..81d4ddd7fbe28 100644 --- a/posthog/temporal/workflows/redshift_batch_export.py +++ b/posthog/temporal/workflows/redshift_batch_export.py @@ -14,6 +14,7 @@ from posthog.batch_exports.service import RedshiftBatchExportInputs from posthog.temporal.workflows.base import PostHogWorkflow from posthog.temporal.workflows.batch_exports import ( + ROWS_EXPORTED, CreateBatchExportRunInputs, UpdateBatchExportRunStatusInputs, create_export_run, @@ -21,7 +22,6 @@ get_data_interval, get_results_iterator, get_rows_count, - ROWS_EXPORTED, ) from posthog.temporal.workflows.clickhouse import get_client from posthog.temporal.workflows.logger import bind_batch_exports_logger @@ -225,9 +225,7 @@ def parse_inputs(inputs: list[str]) -> RedshiftBatchExportInputs: @workflow.run async def run(self, inputs: RedshiftBatchExportInputs): """Workflow implementation to export data to Redshift.""" - logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="Redshift") data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end) - logger.info("Starting Redshift export batch %s - %s", data_interval_start, data_interval_end) create_export_run_inputs = CreateBatchExportRunInputs( team_id=inputs.team_id, @@ -247,7 +245,11 @@ async def run(self, inputs: RedshiftBatchExportInputs): ), ) - update_inputs = UpdateBatchExportRunStatusInputs(id=run_id, status="Completed") + update_inputs = UpdateBatchExportRunStatusInputs( + id=run_id, + status="Completed", + team_id=inputs.team_id, + ) insert_inputs = RedshiftInsertInputs( team_id=inputs.team_id, diff --git a/posthog/temporal/workflows/s3_batch_export.py b/posthog/temporal/workflows/s3_batch_export.py index 383886529a047..fdd9eba836c55 100644 --- a/posthog/temporal/workflows/s3_batch_export.py +++ b/posthog/temporal/workflows/s3_batch_export.py @@ -493,9 +493,7 @@ def parse_inputs(inputs: list[str]) -> S3BatchExportInputs: @workflow.run async def run(self, inputs: S3BatchExportInputs): """Workflow implementation to export data to S3 bucket.""" - logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="S3") data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end) - logger.info("Starting batch export %s - %s", data_interval_start, data_interval_end) create_export_run_inputs = CreateBatchExportRunInputs( team_id=inputs.team_id, @@ -515,7 +513,11 @@ async def run(self, inputs: S3BatchExportInputs): ), ) - update_inputs = UpdateBatchExportRunStatusInputs(id=run_id, status="Completed") + update_inputs = UpdateBatchExportRunStatusInputs( + id=run_id, + status="Completed", + team_id=inputs.team_id, + ) insert_inputs = S3InsertInputs( bucket_name=inputs.bucket_name, diff --git a/posthog/temporal/workflows/snowflake_batch_export.py b/posthog/temporal/workflows/snowflake_batch_export.py index 3154297ed5fe2..3b9a72ab96471 100644 --- a/posthog/temporal/workflows/snowflake_batch_export.py +++ b/posthog/temporal/workflows/snowflake_batch_export.py @@ -12,6 +12,8 @@ from posthog.batch_exports.service import SnowflakeBatchExportInputs from posthog.temporal.workflows.base import PostHogWorkflow from posthog.temporal.workflows.batch_exports import ( + BYTES_EXPORTED, + ROWS_EXPORTED, CreateBatchExportRunInputs, UpdateBatchExportRunStatusInputs, create_export_run, @@ -19,8 +21,6 @@ get_data_interval, get_results_iterator, get_rows_count, - ROWS_EXPORTED, - BYTES_EXPORTED, ) from posthog.temporal.workflows.clickhouse import get_client from posthog.temporal.workflows.logger import bind_batch_exports_logger @@ -304,14 +304,6 @@ def parse_inputs(inputs: list[str]) -> SnowflakeBatchExportInputs: @workflow.run async def run(self, inputs: SnowflakeBatchExportInputs): """Workflow implementation to export data to Snowflake table.""" - logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="Snowflake") - data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end) - logger.info( - "Starting batch export %s - %s", - data_interval_start, - data_interval_end, - ) - data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end) create_export_run_inputs = CreateBatchExportRunInputs( @@ -332,7 +324,11 @@ async def run(self, inputs: SnowflakeBatchExportInputs): ), ) - update_inputs = UpdateBatchExportRunStatusInputs(id=run_id, status="Completed") + update_inputs = UpdateBatchExportRunStatusInputs( + id=run_id, + status="Completed", + team_id=inputs.team_id, + ) insert_inputs = SnowflakeInsertInputs( team_id=inputs.team_id,