Skip to content

Commit

Permalink
chore(batch-exports): add some Prometheus metrics for batch exports
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner committed Nov 7, 2023
1 parent 06e9938 commit 6d10ea5
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 79 deletions.
2 changes: 1 addition & 1 deletion posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def unpause_batch_export(
note: str | None = None,
backfill: bool = False,
) -> None:
"""Pause this BatchExport.
"""Unpause this BatchExport.
We pass the call to the underlying Temporal Schedule. Additionally, we can trigger a backfill
to backfill runs missed while paused.
Expand Down
6 changes: 6 additions & 0 deletions posthog/management/commands/start_temporal_worker.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import asyncio
import logging
import os

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
from django.core.management.base import BaseCommand
from django.conf import settings

from prometheus_client import start_http_server

from posthog.temporal.worker import start_worker


Expand Down Expand Up @@ -63,6 +66,9 @@ def handle(self, *args, **options):
options["client_key"] = "--SECRET--"
logging.info(f"Starting Temporal Worker with options: {options}")

port = int(os.environ.get("PROMETHEUS_METRICS_EXPORT_PORT", 8001))
start_http_server(port=port)

asyncio.run(
start_worker(
temporal_host,
Expand Down
12 changes: 6 additions & 6 deletions posthog/temporal/workflows/backfill_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ class HeartbeatDetails(typing.NamedTuple):
def make_activity_heartbeat_while_running(
self, function_to_run: collections.abc.Callable, heartbeat_every: dt.timedelta
) -> collections.abc.Callable[..., collections.abc.Coroutine]:
"""Return a callable that returns a coroutine that hearbeats with these HeartbeatDetails.
"""Return a callable that returns a coroutine that heartbeats with these HeartbeatDetails.
The returned callable wraps 'function_to_run' while heartbeatting 'factor' times for every
'heartbeat_timeout'.
The returned callable wraps 'function_to_run' while heartbeating every 'heartbeat_every'
seconds.
"""

async def heartbeat() -> None:
"""Heartbeat factor times every heartbeat_timeout."""
"""Heartbeat every 'heartbeat_every' seconds."""
while True:
await asyncio.sleep(heartbeat_every.total_seconds())
temporalio.activity.heartbeat(self)
Expand Down Expand Up @@ -99,8 +99,8 @@ class BackfillScheduleInputs:
async def backfill_schedule(inputs: BackfillScheduleInputs) -> None:
"""Temporal Activity to backfill a Temporal Schedule.
The backfill is broken up into batches of inputs.buffer_limit size. After a backfill batch is requested,
we wait for it to be done before continuing with the next.
The backfill is broken up into batches of inputs.buffer_limit size. After a backfill batch is
requested, we wait for it to be done before continuing with the next.
This activity heartbeats while waiting to allow cancelling an ongoing backfill.
"""
Expand Down
21 changes: 17 additions & 4 deletions posthog/temporal/workflows/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import brotli
from asgiref.sync import sync_to_async
from prometheus_client import Counter
from temporalio import activity, exceptions, workflow
from temporalio.common import RetryPolicy

Expand Down Expand Up @@ -47,6 +48,15 @@
"""
)

ROWS_EXPORTED = Counter("batch_export_rows_exported", "Number of rows exported.", labelnames=("destination",))
BYTES_EXPORTED = Counter("batch_export_bytes_exported", "Number of bytes exported.", labelnames=("destination",))
EXPORT_STARTED = Counter("batch_export_started", "Number of batch exports started.", labelnames=("destination",))
EXPORT_FINISHED = Counter(
"batch_export_finished",
"Number of batch exports finished, for any reason (including failure).",
labelnames=("destination", "status"),
)


async def get_rows_count(
client,
Expand Down Expand Up @@ -246,7 +256,7 @@ def get_data_interval(interval: str, data_interval_end: str | None) -> tuple[dt.
msg = (
"Expected 'TemporalScheduledStartTime' of type 'list[str]' or 'list[datetime], found 'NoneType'."
"This should be set by the Temporal Schedule unless triggering workflow manually."
"In the latter case, ensure 'S3BatchExportInputs.data_interval_end' is set."
"In the latter case, ensure '{Type}BatchExportInputs.data_interval_end' is set."
)
raise TypeError(msg)

Expand All @@ -260,7 +270,7 @@ def get_data_interval(interval: str, data_interval_end: str | None) -> tuple[dt.

else:
msg = (
f"Expected search attribute to be of type 'str' or 'datetime' found '{data_interval_end_search_attr[0]}' "
f"Expected search attribute to be of type 'str' or 'datetime' but found '{data_interval_end_search_attr[0]}' "
f"of type '{type(data_interval_end_search_attr[0])}'."
)
raise TypeError(msg)
Expand Down Expand Up @@ -670,8 +680,8 @@ class CreateBatchExportBackfillInputs:
async def create_batch_export_backfill_model(inputs: CreateBatchExportBackfillInputs) -> str:
"""Activity that creates an BatchExportBackfill.
Intended to be used in all export workflows, usually at the start, to create a model
instance to represent them in our database.
Intended to be used in all batch export backfill workflows, usually at the start, to create a
model instance to represent them in our database.
"""
logger = get_batch_exports_logger(inputs=inputs)
logger.info(f"Creating BatchExportBackfill model instance in team {inputs.team_id}.")
Expand Down Expand Up @@ -707,6 +717,7 @@ async def update_batch_export_backfill_model_status(inputs: UpdateBatchExportBac


async def execute_batch_export_insert_activity(
destination: str,
activity,
inputs,
non_retryable_error_types: list[str],
Expand Down Expand Up @@ -743,6 +754,7 @@ async def execute_batch_export_insert_activity(
non_retryable_error_types=non_retryable_error_types,
)
try:
EXPORT_STARTED.labels(destination=destination).inc()
await workflow.execute_activity(
activity,
inputs,
Expand Down Expand Up @@ -773,6 +785,7 @@ async def execute_batch_export_insert_activity(
)

finally:
EXPORT_FINISHED.labels(destination=destination, status=update_inputs.status.lower()).inc()
await workflow.execute_activity(
update_export_run_status,
update_inputs,
Expand Down
33 changes: 16 additions & 17 deletions posthog/temporal/workflows/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
get_data_interval,
get_results_iterator,
get_rows_count,
ROWS_EXPORTED,
BYTES_EXPORTED,
)
from posthog.temporal.workflows.clickhouse import get_client

Expand Down Expand Up @@ -162,6 +164,17 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs):
)

with BatchExportTemporaryFile() as jsonl_file:

def flush_to_bigquery():
logger.info(
"Copying %s records of size %s bytes to BigQuery",
jsonl_file.records_since_last_reset,
jsonl_file.bytes_since_last_reset,
)
load_jsonl_file_to_bigquery_table(jsonl_file, bigquery_table, table_schema, bq_client)
ROWS_EXPORTED.labels(destination="bigquery").inc(jsonl_file.records_since_last_reset)
BYTES_EXPORTED.labels(destination="bigquery").inc(jsonl_file.bytes_since_last_reset)

for result in results_iterator:
row = {
field.name: json.dumps(result[field.name]) if field.name in json_columns else result[field.name]
Expand All @@ -173,26 +186,11 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs):
jsonl_file.write_records_to_jsonl([row])

if jsonl_file.tell() > settings.BATCH_EXPORT_BIGQUERY_UPLOAD_CHUNK_SIZE_BYTES:
logger.info(
"Copying %s records of size %s bytes to BigQuery",
jsonl_file.records_since_last_reset,
jsonl_file.bytes_since_last_reset,
)
load_jsonl_file_to_bigquery_table(
jsonl_file,
bigquery_table,
table_schema,
bq_client,
)
flush_to_bigquery()
jsonl_file.reset()

if jsonl_file.tell() > 0:
logger.info(
"Copying %s records of size %s bytes to BigQuery",
jsonl_file.records_since_last_reset,
jsonl_file.bytes_since_last_reset,
)
load_jsonl_file_to_bigquery_table(jsonl_file, bigquery_table, table_schema, bq_client)
flush_to_bigquery()


@workflow.defn(name="bigquery-export")
Expand Down Expand Up @@ -260,6 +258,7 @@ async def run(self, inputs: BigQueryBatchExportInputs):
)

await execute_batch_export_insert_activity(
"bigquery",
insert_into_bigquery_activity,
insert_inputs,
non_retryable_error_types=[
Expand Down
42 changes: 20 additions & 22 deletions posthog/temporal/workflows/postgres_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
get_data_interval,
get_results_iterator,
get_rows_count,
ROWS_EXPORTED,
BYTES_EXPORTED,
)
from posthog.temporal.workflows.clickhouse import get_client

Expand Down Expand Up @@ -218,29 +220,8 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs):

with BatchExportTemporaryFile() as pg_file:
with postgres_connection(inputs) as connection:
for result in results_iterator:
row = {
key: json.dumps(result[key]) if key in json_columns and result[key] is not None else result[key]
for key in schema_columns
}
pg_file.write_records_to_tsv([row], fieldnames=schema_columns)

if pg_file.tell() > settings.BATCH_EXPORT_POSTGRES_UPLOAD_CHUNK_SIZE_BYTES:
logger.info(
"Copying %s records of size %s bytes to Postgres",
pg_file.records_since_last_reset,
pg_file.bytes_since_last_reset,
)
copy_tsv_to_postgres(
pg_file,
connection,
inputs.schema,
inputs.table_name,
schema_columns,
)
pg_file.reset()

if pg_file.tell() > 0:
def flush_to_postgres():
logger.info(
"Copying %s records of size %s bytes to Postgres",
pg_file.records_since_last_reset,
Expand All @@ -253,6 +234,22 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs):
inputs.table_name,
schema_columns,
)
ROWS_EXPORTED.labels(destination="postgres").inc(pg_file.records_since_last_reset)
BYTES_EXPORTED.labels(destination="postgres").inc(pg_file.bytes_since_last_reset)

for result in results_iterator:
row = {
key: json.dumps(result[key]) if key in json_columns and result[key] is not None else result[key]
for key in schema_columns
}
pg_file.write_records_to_tsv([row], fieldnames=schema_columns)

if pg_file.tell() > settings.BATCH_EXPORT_POSTGRES_UPLOAD_CHUNK_SIZE_BYTES:
flush_to_postgres()
pg_file.reset()

if pg_file.tell() > 0:
flush_to_postgres()


@workflow.defn(name="postgres-export")
Expand Down Expand Up @@ -319,6 +316,7 @@ async def run(self, inputs: PostgresBatchExportInputs):
)

await execute_batch_export_insert_activity(
"postgres",
insert_into_postgres_activity,
insert_inputs,
non_retryable_error_types=[
Expand Down
13 changes: 11 additions & 2 deletions posthog/temporal/workflows/redshift_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
get_data_interval,
get_results_iterator,
get_rows_count,
ROWS_EXPORTED,
)
from posthog.temporal.workflows.clickhouse import get_client
from posthog.temporal.workflows.postgres_batch_export import (
Expand Down Expand Up @@ -69,17 +70,24 @@ def insert_records_to_redshift(
)
template = sql.SQL("({})").format(sql.SQL(", ").join(map(sql.Placeholder, columns)))

def flush_to_redshift():
psycopg2.extras.execute_values(cursor, query, batch, template)
ROWS_EXPORTED.labels(destination="redshift").inc(len(batch))
# It would be nice to record BYTES_EXPORTED for Redshift, but it's not worth estimating
# the byte size of each batch the way things are currently written. We can revisit this
# in the future if we decide it's useful enough.

for record in records:
batch.append(record)

if len(batch) < batch_size:
continue

psycopg2.extras.execute_values(cursor, query, batch, template)
flush_to_redshift()
batch = []

if len(batch) > 0:
psycopg2.extras.execute_values(cursor, query, batch, template)
flush_to_redshift()


@dataclass
Expand Down Expand Up @@ -259,6 +267,7 @@ async def run(self, inputs: RedshiftBatchExportInputs):
)

await execute_batch_export_insert_activity(
"redshift",
insert_into_redshift_activity,
insert_inputs,
non_retryable_error_types=[],
Expand Down
45 changes: 22 additions & 23 deletions posthog/temporal/workflows/s3_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
get_data_interval,
get_results_iterator,
get_rows_count,
ROWS_EXPORTED,
BYTES_EXPORTED,
)
from posthog.temporal.workflows.clickhouse import get_client

Expand Down Expand Up @@ -425,6 +427,23 @@ async def worker_shutdown_handler():

async with s3_upload as s3_upload:
with BatchExportTemporaryFile(compression=inputs.compression) as local_results_file:

async def flush_to_s3(last=False):
logger.info(
"Uploading %spart %s containing %s records with size %s bytes to S3",
"last " if last else "",
s3_upload.part_number + 1,
local_results_file.records_since_last_reset,
local_results_file.bytes_since_last_reset,
)

await s3_upload.upload_part(local_results_file)
ROWS_EXPORTED.labels(destination="s3").inc(local_results_file.records_since_last_reset)
BYTES_EXPORTED.labels(destination="s3").inc(local_results_file.bytes_since_last_reset)

last_uploaded_part_timestamp = result["inserted_at"]
activity.heartbeat(last_uploaded_part_timestamp, s3_upload.to_state())

for result in results_iterator:
record = {
"created_at": result["created_at"],
Expand All @@ -442,32 +461,11 @@ async def worker_shutdown_handler():
local_results_file.write_records_to_jsonl([record])

if local_results_file.tell() > settings.BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES:
logger.info(
"Uploading part %s containing %s records with size %s bytes to S3",
s3_upload.part_number + 1,
local_results_file.records_since_last_reset,
local_results_file.bytes_since_last_reset,
)

await s3_upload.upload_part(local_results_file)

last_uploaded_part_timestamp = result["inserted_at"]
activity.heartbeat(last_uploaded_part_timestamp, s3_upload.to_state())

await flush_to_s3()
local_results_file.reset()

if local_results_file.tell() > 0 and result is not None:
logger.info(
"Uploading last part %s containing %s records with size %s bytes to S3",
s3_upload.part_number + 1,
local_results_file.records_since_last_reset,
local_results_file.bytes_since_last_reset,
)

await s3_upload.upload_part(local_results_file)

last_uploaded_part_timestamp = result["inserted_at"]
activity.heartbeat(last_uploaded_part_timestamp, s3_upload.to_state())
await flush_to_s3(last=True)

await s3_upload.complete()

Expand Down Expand Up @@ -531,6 +529,7 @@ async def run(self, inputs: S3BatchExportInputs):
)

await execute_batch_export_insert_activity(
"s3",
insert_into_s3_activity,
insert_inputs,
non_retryable_error_types=[
Expand Down
Loading

0 comments on commit 6d10ea5

Please sign in to comment.