Skip to content

Commit

Permalink
chore(batch-exports): add some Prometheus metrics for batch exports (#…
Browse files Browse the repository at this point in the history
…18467)

* chore(batch-exports): add some Prometheus metrics for batch exports

* make metrics port an option/setting

* get destination from workflow_type
  • Loading branch information
bretthoerner authored Nov 9, 2023
1 parent fc9b268 commit bc55393
Show file tree
Hide file tree
Showing 12 changed files with 119 additions and 80 deletions.
2 changes: 1 addition & 1 deletion bin/temporal-django-worker
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ set -e

trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM EXIT

python3 manage.py start_temporal_worker
python3 manage.py start_temporal_worker "$@"

wait
2 changes: 1 addition & 1 deletion posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def unpause_batch_export(
note: str | None = None,
backfill: bool = False,
) -> None:
"""Pause this BatchExport.
"""Unpause this BatchExport.
We pass the call to the underlying Temporal Schedule. Additionally, we can trigger a backfill
to backfill runs missed while paused.
Expand Down
14 changes: 12 additions & 2 deletions posthog/management/commands/start_temporal_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from django.core.management.base import BaseCommand
from django.conf import settings

from prometheus_client import start_http_server

from posthog.temporal.worker import start_worker


Expand All @@ -15,12 +17,12 @@ class Command(BaseCommand):

def add_arguments(self, parser):
parser.add_argument(
"--temporal_host",
"--temporal-host",
default=settings.TEMPORAL_HOST,
help="Hostname for Temporal Scheduler",
)
parser.add_argument(
"--temporal_port",
"--temporal-port",
default=settings.TEMPORAL_PORT,
help="Port for Temporal Scheduler",
)
Expand Down Expand Up @@ -49,6 +51,11 @@ def add_arguments(self, parser):
default=settings.TEMPORAL_CLIENT_KEY,
help="Optional client key",
)
parser.add_argument(
"--metrics-port",
default=settings.PROMETHEUS_METRICS_EXPORT_PORT,
help="Port to export Prometheus metrics on",
)

def handle(self, *args, **options):
temporal_host = options["temporal_host"]
Expand All @@ -63,6 +70,9 @@ def handle(self, *args, **options):
options["client_key"] = "--SECRET--"
logging.info(f"Starting Temporal Worker with options: {options}")

metrics_port = int(options["metrics_port"])
start_http_server(port=metrics_port)

asyncio.run(
start_worker(
temporal_host,
Expand Down
1 change: 1 addition & 0 deletions posthog/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from posthog.settings.ingestion import *
from posthog.settings.feature_flags import *
from posthog.settings.geoip import *
from posthog.settings.metrics import *
from posthog.settings.schedules import *
from posthog.settings.sentry import *
from posthog.settings.shell_plus import *
Expand Down
3 changes: 3 additions & 0 deletions posthog/settings/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import os

PROMETHEUS_METRICS_EXPORT_PORT = os.getenv("PROMETHEUS_METRICS_EXPORT_PORT", "8001")
12 changes: 6 additions & 6 deletions posthog/temporal/workflows/backfill_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ class HeartbeatDetails(typing.NamedTuple):
def make_activity_heartbeat_while_running(
self, function_to_run: collections.abc.Callable, heartbeat_every: dt.timedelta
) -> collections.abc.Callable[..., collections.abc.Coroutine]:
"""Return a callable that returns a coroutine that hearbeats with these HeartbeatDetails.
"""Return a callable that returns a coroutine that heartbeats with these HeartbeatDetails.
The returned callable wraps 'function_to_run' while heartbeatting 'factor' times for every
'heartbeat_timeout'.
The returned callable wraps 'function_to_run' while heartbeating every 'heartbeat_every'
seconds.
"""

async def heartbeat() -> None:
"""Heartbeat factor times every heartbeat_timeout."""
"""Heartbeat every 'heartbeat_every' seconds."""
while True:
await asyncio.sleep(heartbeat_every.total_seconds())
temporalio.activity.heartbeat(self)
Expand Down Expand Up @@ -99,8 +99,8 @@ class BackfillScheduleInputs:
async def backfill_schedule(inputs: BackfillScheduleInputs) -> None:
"""Temporal Activity to backfill a Temporal Schedule.
The backfill is broken up into batches of inputs.buffer_limit size. After a backfill batch is requested,
we wait for it to be done before continuing with the next.
The backfill is broken up into batches of inputs.buffer_limit size. After a backfill batch is
requested, we wait for it to be done before continuing with the next.
This activity heartbeats while waiting to allow cancelling an ongoing backfill.
"""
Expand Down
21 changes: 17 additions & 4 deletions posthog/temporal/workflows/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import brotli
from asgiref.sync import sync_to_async
from prometheus_client import Counter
from temporalio import activity, exceptions, workflow
from temporalio.common import RetryPolicy

Expand Down Expand Up @@ -47,6 +48,15 @@
"""
)

ROWS_EXPORTED = Counter("batch_export_rows_exported", "Number of rows exported.", labelnames=("destination",))
BYTES_EXPORTED = Counter("batch_export_bytes_exported", "Number of bytes exported.", labelnames=("destination",))
EXPORT_STARTED = Counter("batch_export_started", "Number of batch exports started.", labelnames=("destination",))
EXPORT_FINISHED = Counter(
"batch_export_finished",
"Number of batch exports finished, for any reason (including failure).",
labelnames=("destination", "status"),
)


async def get_rows_count(
client,
Expand Down Expand Up @@ -246,7 +256,7 @@ def get_data_interval(interval: str, data_interval_end: str | None) -> tuple[dt.
msg = (
"Expected 'TemporalScheduledStartTime' of type 'list[str]' or 'list[datetime], found 'NoneType'."
"This should be set by the Temporal Schedule unless triggering workflow manually."
"In the latter case, ensure 'S3BatchExportInputs.data_interval_end' is set."
"In the latter case, ensure '{Type}BatchExportInputs.data_interval_end' is set."
)
raise TypeError(msg)

Expand All @@ -260,7 +270,7 @@ def get_data_interval(interval: str, data_interval_end: str | None) -> tuple[dt.

else:
msg = (
f"Expected search attribute to be of type 'str' or 'datetime' found '{data_interval_end_search_attr[0]}' "
f"Expected search attribute to be of type 'str' or 'datetime' but found '{data_interval_end_search_attr[0]}' "
f"of type '{type(data_interval_end_search_attr[0])}'."
)
raise TypeError(msg)
Expand Down Expand Up @@ -670,8 +680,8 @@ class CreateBatchExportBackfillInputs:
async def create_batch_export_backfill_model(inputs: CreateBatchExportBackfillInputs) -> str:
"""Activity that creates an BatchExportBackfill.
Intended to be used in all export workflows, usually at the start, to create a model
instance to represent them in our database.
Intended to be used in all batch export backfill workflows, usually at the start, to create a
model instance to represent them in our database.
"""
logger = get_batch_exports_logger(inputs=inputs)
logger.info(f"Creating BatchExportBackfill model instance in team {inputs.team_id}.")
Expand Down Expand Up @@ -735,6 +745,7 @@ async def execute_batch_export_insert_activity(
maximum_retry_interval_seconds: Maximum interval in seconds between retries.
"""
logger = get_batch_exports_logger(inputs=inputs)
destination = workflow.info().workflow_type.lower()

retry_policy = RetryPolicy(
initial_interval=dt.timedelta(seconds=initial_retry_interval_seconds),
Expand All @@ -743,6 +754,7 @@ async def execute_batch_export_insert_activity(
non_retryable_error_types=non_retryable_error_types,
)
try:
EXPORT_STARTED.labels(destination=destination).inc()
await workflow.execute_activity(
activity,
inputs,
Expand Down Expand Up @@ -773,6 +785,7 @@ async def execute_batch_export_insert_activity(
)

finally:
EXPORT_FINISHED.labels(destination=destination, status=update_inputs.status.lower()).inc()
await workflow.execute_activity(
update_export_run_status,
update_inputs,
Expand Down
32 changes: 15 additions & 17 deletions posthog/temporal/workflows/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
get_data_interval,
get_results_iterator,
get_rows_count,
ROWS_EXPORTED,
BYTES_EXPORTED,
)
from posthog.temporal.workflows.clickhouse import get_client

Expand Down Expand Up @@ -162,6 +164,17 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs):
)

with BatchExportTemporaryFile() as jsonl_file:

def flush_to_bigquery():
logger.info(
"Copying %s records of size %s bytes to BigQuery",
jsonl_file.records_since_last_reset,
jsonl_file.bytes_since_last_reset,
)
load_jsonl_file_to_bigquery_table(jsonl_file, bigquery_table, table_schema, bq_client)
ROWS_EXPORTED.labels(destination="bigquery").inc(jsonl_file.records_since_last_reset)
BYTES_EXPORTED.labels(destination="bigquery").inc(jsonl_file.bytes_since_last_reset)

for result in results_iterator:
row = {
field.name: json.dumps(result[field.name]) if field.name in json_columns else result[field.name]
Expand All @@ -173,26 +186,11 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs):
jsonl_file.write_records_to_jsonl([row])

if jsonl_file.tell() > settings.BATCH_EXPORT_BIGQUERY_UPLOAD_CHUNK_SIZE_BYTES:
logger.info(
"Copying %s records of size %s bytes to BigQuery",
jsonl_file.records_since_last_reset,
jsonl_file.bytes_since_last_reset,
)
load_jsonl_file_to_bigquery_table(
jsonl_file,
bigquery_table,
table_schema,
bq_client,
)
flush_to_bigquery()
jsonl_file.reset()

if jsonl_file.tell() > 0:
logger.info(
"Copying %s records of size %s bytes to BigQuery",
jsonl_file.records_since_last_reset,
jsonl_file.bytes_since_last_reset,
)
load_jsonl_file_to_bigquery_table(jsonl_file, bigquery_table, table_schema, bq_client)
flush_to_bigquery()


@workflow.defn(name="bigquery-export")
Expand Down
41 changes: 19 additions & 22 deletions posthog/temporal/workflows/postgres_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
get_data_interval,
get_results_iterator,
get_rows_count,
ROWS_EXPORTED,
BYTES_EXPORTED,
)
from posthog.temporal.workflows.clickhouse import get_client

Expand Down Expand Up @@ -218,29 +220,8 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs):

with BatchExportTemporaryFile() as pg_file:
with postgres_connection(inputs) as connection:
for result in results_iterator:
row = {
key: json.dumps(result[key]) if key in json_columns and result[key] is not None else result[key]
for key in schema_columns
}
pg_file.write_records_to_tsv([row], fieldnames=schema_columns)

if pg_file.tell() > settings.BATCH_EXPORT_POSTGRES_UPLOAD_CHUNK_SIZE_BYTES:
logger.info(
"Copying %s records of size %s bytes to Postgres",
pg_file.records_since_last_reset,
pg_file.bytes_since_last_reset,
)
copy_tsv_to_postgres(
pg_file,
connection,
inputs.schema,
inputs.table_name,
schema_columns,
)
pg_file.reset()

if pg_file.tell() > 0:
def flush_to_postgres():
logger.info(
"Copying %s records of size %s bytes to Postgres",
pg_file.records_since_last_reset,
Expand All @@ -253,6 +234,22 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs):
inputs.table_name,
schema_columns,
)
ROWS_EXPORTED.labels(destination="postgres").inc(pg_file.records_since_last_reset)
BYTES_EXPORTED.labels(destination="postgres").inc(pg_file.bytes_since_last_reset)

for result in results_iterator:
row = {
key: json.dumps(result[key]) if key in json_columns and result[key] is not None else result[key]
for key in schema_columns
}
pg_file.write_records_to_tsv([row], fieldnames=schema_columns)

if pg_file.tell() > settings.BATCH_EXPORT_POSTGRES_UPLOAD_CHUNK_SIZE_BYTES:
flush_to_postgres()
pg_file.reset()

if pg_file.tell() > 0:
flush_to_postgres()


@workflow.defn(name="postgres-export")
Expand Down
12 changes: 10 additions & 2 deletions posthog/temporal/workflows/redshift_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
get_data_interval,
get_results_iterator,
get_rows_count,
ROWS_EXPORTED,
)
from posthog.temporal.workflows.clickhouse import get_client
from posthog.temporal.workflows.postgres_batch_export import (
Expand Down Expand Up @@ -69,17 +70,24 @@ def insert_records_to_redshift(
)
template = sql.SQL("({})").format(sql.SQL(", ").join(map(sql.Placeholder, columns)))

def flush_to_redshift():
psycopg2.extras.execute_values(cursor, query, batch, template)
ROWS_EXPORTED.labels(destination="redshift").inc(len(batch))
# It would be nice to record BYTES_EXPORTED for Redshift, but it's not worth estimating
# the byte size of each batch the way things are currently written. We can revisit this
# in the future if we decide it's useful enough.

for record in records:
batch.append(record)

if len(batch) < batch_size:
continue

psycopg2.extras.execute_values(cursor, query, batch, template)
flush_to_redshift()
batch = []

if len(batch) > 0:
psycopg2.extras.execute_values(cursor, query, batch, template)
flush_to_redshift()


@dataclass
Expand Down
Loading

0 comments on commit bc55393

Please sign in to comment.