Skip to content

Commit

Permalink
fix: Reset structlog on temporal worker (#18581)
Browse files Browse the repository at this point in the history
* fix: Reset structlog on temporal worker

* Update query snapshots

* Update query snapshots

* fix: Move logging to activities exclusively

* fix: Remove Workflow info from logger

---------

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
tomasfarias and github-actions[bot] authored Nov 14, 2023
1 parent 4a332b3 commit 34a6bc4
Show file tree
Hide file tree
Showing 11 changed files with 95 additions and 89 deletions.
20 changes: 14 additions & 6 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ def create_batch_export_run(
data_interval_start: str,
data_interval_end: str,
status: str = BatchExportRun.Status.STARTING,
):
) -> BatchExportRun:
"""Create a BatchExportRun after a Temporal Workflow execution.
In a first approach, this method is intended to be called only by Temporal Workflows,
Expand All @@ -364,16 +364,20 @@ def create_batch_export_run(
return run


def update_batch_export_run_status(run_id: UUID, status: str, latest_error: str | None):
def update_batch_export_run_status(run_id: UUID, status: str, latest_error: str | None) -> BatchExportRun:
"""Update the status of an BatchExportRun with given id.
Arguments:
id: The id of the BatchExportRun to update.
"""
updated = BatchExportRun.objects.filter(id=run_id).update(status=status, latest_error=latest_error)
model = BatchExportRun.objects.filter(id=run_id)
updated = model.update(status=status, latest_error=latest_error)

if not updated:
raise ValueError(f"BatchExportRun with id {run_id} not found.")

return model.get()


def sync_batch_export(batch_export: BatchExport, created: bool):
workflow, workflow_inputs = DESTINATION_WORKFLOWS[batch_export.destination.type]
Expand Down Expand Up @@ -447,7 +451,7 @@ def create_batch_export_backfill(
start_at: str,
end_at: str,
status: str = BatchExportRun.Status.RUNNING,
):
) -> BatchExportBackfill:
"""Create a BatchExportBackfill.
Expand All @@ -470,13 +474,17 @@ def create_batch_export_backfill(
return backfill


def update_batch_export_backfill_status(backfill_id: UUID, status: str):
def update_batch_export_backfill_status(backfill_id: UUID, status: str) -> BatchExportBackfill:
"""Update the status of an BatchExportBackfill with given id.
Arguments:
id: The id of the BatchExportBackfill to update.
status: The new status to assign to the BatchExportBackfill.
"""
updated = BatchExportBackfill.objects.filter(id=backfill_id).update(status=status)
model = BatchExportBackfill.objects.filter(id=backfill_id)
updated = model.update(status=status)

if not updated:
raise ValueError(f"BatchExportBackfill with id {backfill_id} not found.")

return model.get()
5 changes: 4 additions & 1 deletion posthog/management/commands/start_temporal_worker.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import asyncio
import logging

import structlog
from temporalio import workflow

with workflow.unsafe.imports_passed_through():
from django.core.management.base import BaseCommand
from django.conf import settings
from django.core.management.base import BaseCommand

from prometheus_client import start_http_server

Expand Down Expand Up @@ -70,6 +71,8 @@ def handle(self, *args, **options):
options["client_key"] = "--SECRET--"
logging.info(f"Starting Temporal Worker with options: {options}")

structlog.reset_defaults()

metrics_port = int(options["metrics_port"])
start_http_server(port=metrics_port)

Expand Down
1 change: 1 addition & 0 deletions posthog/temporal/tests/batch_exports/test_run_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ async def test_update_export_run_status(activity_environment, team, batch_export
update_inputs = UpdateBatchExportRunStatusInputs(
id=str(run_id),
status="Completed",
team_id=inputs.team_id,
)
await activity_environment.run(update_export_run_status, update_inputs)

Expand Down
13 changes: 1 addition & 12 deletions posthog/temporal/workflows/backfill_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
create_batch_export_backfill_model,
update_batch_export_backfill_model_status,
)
from posthog.temporal.workflows.logger import bind_batch_exports_logger


class HeartbeatDetails(typing.NamedTuple):
Expand Down Expand Up @@ -284,13 +283,6 @@ def parse_inputs(inputs: list[str]) -> BackfillBatchExportInputs:
@temporalio.workflow.run
async def run(self, inputs: BackfillBatchExportInputs) -> None:
"""Workflow implementation to backfill a BatchExport."""
logger = await bind_batch_exports_logger(team_id=inputs.team_id)
logger.info(
"Starting Backfill for BatchExport: %s - %s",
inputs.start_at,
inputs.end_at,
)

create_batch_export_backfill_inputs = CreateBatchExportBackfillInputs(
team_id=inputs.team_id,
batch_export_id=inputs.batch_export_id,
Expand Down Expand Up @@ -347,16 +339,13 @@ async def run(self, inputs: BackfillBatchExportInputs) -> None:

except temporalio.exceptions.ActivityError as e:
if isinstance(e.cause, temporalio.exceptions.CancelledError):
logger.error("Backfill was cancelled.")
update_inputs.status = "Cancelled"
else:
logger.exception("Backfill failed.", exc_info=e.cause)
update_inputs.status = "Failed"

raise

except Exception as e:
logger.exception("Backfill failed with an unexpected error.", exc_info=e)
except Exception:
update_inputs.status = "Failed"
raise

Expand Down
60 changes: 48 additions & 12 deletions posthog/temporal/workflows/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,12 @@ async def create_export_run(inputs: CreateBatchExportRunInputs) -> str:
Intended to be used in all export workflows, usually at the start, to create a model
instance to represent them in our database.
"""
logger = await bind_batch_exports_logger(team_id=inputs.team_id)
logger.info(
"Creating batch export for range %s - %s",
inputs.data_interval_start,
inputs.data_interval_end,
)
# 'sync_to_async' type hints are fixed in asgiref>=3.4.1
# But one of our dependencies is pinned to asgiref==3.3.2.
# Remove these comments once we upgrade.
Expand All @@ -514,18 +520,34 @@ class UpdateBatchExportRunStatusInputs:

id: str
status: str
team_id: int
latest_error: str | None = None


@activity.defn
async def update_export_run_status(inputs: UpdateBatchExportRunStatusInputs):
"""Activity that updates the status of an BatchExportRun."""
await sync_to_async(update_batch_export_run_status)(
logger = await bind_batch_exports_logger(team_id=inputs.team_id)

batch_export_run = await sync_to_async(update_batch_export_run_status)(
run_id=uuid.UUID(inputs.id),
status=inputs.status,
latest_error=inputs.latest_error,
) # type: ignore

if batch_export_run.status == "Failed":
logger.error("BatchExport failed with error: %s", batch_export_run.latest_error)

elif batch_export_run.status == "Cancelled":
logger.warning("BatchExport was cancelled.")

else:
logger.info(
"Successfully finished exporting batch %s - %s",
batch_export_run.data_interval_start,
batch_export_run.data_interval_end,
)


@dataclasses.dataclass
class CreateBatchExportBackfillInputs:
Expand All @@ -543,6 +565,12 @@ async def create_batch_export_backfill_model(inputs: CreateBatchExportBackfillIn
Intended to be used in all batch export backfill workflows, usually at the start, to create a
model instance to represent them in our database.
"""
logger = await bind_batch_exports_logger(team_id=inputs.team_id)
logger.info(
"Creating historical export for batches in range %s - %s",
inputs.start_at,
inputs.end_at,
)
# 'sync_to_async' type hints are fixed in asgiref>=3.4.1
# But one of our dependencies is pinned to asgiref==3.3.2.
# Remove these comments once we upgrade.
Expand All @@ -568,7 +596,23 @@ class UpdateBatchExportBackfillStatusInputs:
@activity.defn
async def update_batch_export_backfill_model_status(inputs: UpdateBatchExportBackfillStatusInputs):
"""Activity that updates the status of an BatchExportRun."""
await sync_to_async(update_batch_export_backfill_status)(backfill_id=uuid.UUID(inputs.id), status=inputs.status) # type: ignore
backfill = await sync_to_async(update_batch_export_backfill_status)(
backfill_id=uuid.UUID(inputs.id), status=inputs.status
) # type: ignore
logger = await bind_batch_exports_logger(team_id=backfill.team_id)

if backfill.status == "Failed":
logger.error("Historical export failed")

elif backfill.status == "Cancelled":
logger.warning("Historical export was cancelled.")

else:
logger.info(
"Successfully finished exporting historical batches in %s - %s",
backfill.start_at,
backfill.end_at,
)


async def execute_batch_export_insert_activity(
Expand Down Expand Up @@ -600,7 +644,6 @@ async def execute_batch_export_insert_activity(
maximum_retry_interval_seconds: Maximum interval in seconds between retries.
"""
destination = workflow.info().workflow_type.lower()
logger = await bind_batch_exports_logger(team_id=inputs.team_id)

retry_policy = RetryPolicy(
initial_interval=dt.timedelta(seconds=initial_retry_interval_seconds),
Expand All @@ -617,28 +660,21 @@ async def execute_batch_export_insert_activity(
heartbeat_timeout=dt.timedelta(seconds=heartbeat_timeout_seconds) if heartbeat_timeout_seconds else None,
retry_policy=retry_policy,
)

except exceptions.ActivityError as e:
if isinstance(e.cause, exceptions.CancelledError):
logger.error("BatchExport was cancelled.")
update_inputs.status = "Cancelled"
else:
logger.exception("BatchExport failed.", exc_info=e.cause)
update_inputs.status = "Failed"

update_inputs.latest_error = str(e.cause)
raise

except Exception as e:
logger.exception("BatchExport failed with an unexpected error.", exc_info=e)
except Exception:
update_inputs.status = "Failed"
update_inputs.latest_error = "An unexpected error has ocurred"
raise

else:
logger.info(
"Successfully finished exporting batch %s - %s", inputs.data_interval_start, inputs.data_interval_end
)

finally:
EXPORT_FINISHED.labels(destination=destination, status=update_inputs.status.lower()).inc()
await workflow.execute_activity(
Expand Down
10 changes: 1 addition & 9 deletions posthog/temporal/workflows/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,6 @@ def parse_inputs(inputs: list[str]) -> BigQueryBatchExportInputs:
@workflow.run
async def run(self, inputs: BigQueryBatchExportInputs):
"""Workflow implementation to export data to BigQuery."""
logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="BigQuery")
data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end)
logger.info(
"Starting batch export %s - %s",
data_interval_start,
data_interval_end,
)

data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end)

create_export_run_inputs = CreateBatchExportRunInputs(
Expand All @@ -240,7 +232,7 @@ async def run(self, inputs: BigQueryBatchExportInputs):
),
)

update_inputs = UpdateBatchExportRunStatusInputs(id=run_id, status="Completed")
update_inputs = UpdateBatchExportRunStatusInputs(id=run_id, status="Completed", team_id=inputs.team_id)

insert_inputs = BigQueryInsertInputs(
team_id=inputs.team_id,
Expand Down
27 changes: 3 additions & 24 deletions posthog/temporal/workflows/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,12 @@ def get_temporal_context() -> dict[str, str | int]:
* workflow_run_id: The ID of the Temporal Workflow Execution running the batch export.
* workflow_type: The name of the Temporal Workflow.
We attempt to fetch the context from the activity information, and then from the workflow
information. If both are undefined, an empty dict is returned. When running this in
an activity or a workflow, at least one context will be defined.
We attempt to fetch the context from the activity information. If undefined, an empty dict
is returned. When running this in an activity the context will be defined.
"""
activity_info = attempt_to_fetch_activity_info()
workflow_info = attempt_to_fetch_workflow_info()

info = activity_info or workflow_info
info = activity_info

if info is None:
return {}
Expand Down Expand Up @@ -222,25 +220,6 @@ def attempt_to_fetch_activity_info() -> Info | None:
return (workflow_id, workflow_type, workflow_run_id, attempt)


def attempt_to_fetch_workflow_info() -> Info | None:
"""Fetch Workflow information from Temporal.
Returns:
None if calling outside a Workflow, else the relevant Info.
"""
try:
workflow_info = temporalio.workflow.info()
except RuntimeError:
return None
else:
workflow_id = workflow_info.workflow_id
workflow_type = workflow_info.workflow_type
workflow_run_id = workflow_info.run_id
attempt = workflow_info.attempt

return (workflow_id, workflow_type, workflow_run_id, attempt)


class KafkaLogProducerFromQueue:
"""Produce log messages to Kafka by getting them from a queue.
Expand Down
12 changes: 5 additions & 7 deletions posthog/temporal/workflows/postgres_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,13 +273,7 @@ def parse_inputs(inputs: list[str]) -> PostgresBatchExportInputs:
@workflow.run
async def run(self, inputs: PostgresBatchExportInputs):
"""Workflow implementation to export data to Postgres."""
logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="PostgreSQL")
data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end)
logger.info(
"Starting Postgres export batch %s - %s",
data_interval_start,
data_interval_end,
)

create_export_run_inputs = CreateBatchExportRunInputs(
team_id=inputs.team_id,
Expand All @@ -299,7 +293,11 @@ async def run(self, inputs: PostgresBatchExportInputs):
),
)

update_inputs = UpdateBatchExportRunStatusInputs(id=run_id, status="Completed")
update_inputs = UpdateBatchExportRunStatusInputs(
id=run_id,
status="Completed",
team_id=inputs.team_id,
)

insert_inputs = PostgresInsertInputs(
team_id=inputs.team_id,
Expand Down
10 changes: 6 additions & 4 deletions posthog/temporal/workflows/redshift_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
from posthog.batch_exports.service import RedshiftBatchExportInputs
from posthog.temporal.workflows.base import PostHogWorkflow
from posthog.temporal.workflows.batch_exports import (
ROWS_EXPORTED,
CreateBatchExportRunInputs,
UpdateBatchExportRunStatusInputs,
create_export_run,
execute_batch_export_insert_activity,
get_data_interval,
get_results_iterator,
get_rows_count,
ROWS_EXPORTED,
)
from posthog.temporal.workflows.clickhouse import get_client
from posthog.temporal.workflows.logger import bind_batch_exports_logger
Expand Down Expand Up @@ -225,9 +225,7 @@ def parse_inputs(inputs: list[str]) -> RedshiftBatchExportInputs:
@workflow.run
async def run(self, inputs: RedshiftBatchExportInputs):
"""Workflow implementation to export data to Redshift."""
logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="Redshift")
data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end)
logger.info("Starting Redshift export batch %s - %s", data_interval_start, data_interval_end)

create_export_run_inputs = CreateBatchExportRunInputs(
team_id=inputs.team_id,
Expand All @@ -247,7 +245,11 @@ async def run(self, inputs: RedshiftBatchExportInputs):
),
)

update_inputs = UpdateBatchExportRunStatusInputs(id=run_id, status="Completed")
update_inputs = UpdateBatchExportRunStatusInputs(
id=run_id,
status="Completed",
team_id=inputs.team_id,
)

insert_inputs = RedshiftInsertInputs(
team_id=inputs.team_id,
Expand Down
Loading

0 comments on commit 34a6bc4

Please sign in to comment.