Skip to content

Commit

Permalink
feat(batch-exports): Add HTTP Batch Export destination
Browse files Browse the repository at this point in the history
To possibly be reused in the future, but for now it only submits
payloads in the PostHog /batch format.
  • Loading branch information
bretthoerner committed Feb 13, 2024
1 parent 857da0e commit f520db8
Show file tree
Hide file tree
Showing 9 changed files with 782 additions and 1 deletion.
2 changes: 2 additions & 0 deletions posthog/batch_exports/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class Destination(models.TextChoices):
POSTGRES = "Postgres"
REDSHIFT = "Redshift"
BIGQUERY = "BigQuery"
HTTP = "HTTP"
NOOP = "NoOp"

secret_fields = {
Expand All @@ -36,6 +37,7 @@ class Destination(models.TextChoices):
"Postgres": set("password"),
"Redshift": set("password"),
"BigQuery": {"private_key", "private_key_id", "client_email", "token_uri"},
"HTTP": set("token"),
"NoOp": set(),
}

Expand Down
16 changes: 16 additions & 0 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,21 @@ class BigQueryBatchExportInputs:
batch_export_schema: BatchExportSchema | None = None


@dataclass
class HttpBatchExportInputs:
"""Inputs for Http export workflow."""

batch_export_id: str
team_id: int
url: str
token: str
interval: str = "hour"
data_interval_end: str | None = None
exclude_events: list[str] | None = None
include_events: list[str] | None = None
batch_export_schema: BatchExportSchema | None = None


@dataclass
class NoOpInputs:
"""NoOp Workflow is used for testing, it takes a single argument to echo back."""
Expand All @@ -174,6 +189,7 @@ class NoOpInputs:
"Postgres": ("postgres-export", PostgresBatchExportInputs),
"Redshift": ("redshift-export", RedshiftBatchExportInputs),
"BigQuery": ("bigquery-export", BigQueryBatchExportInputs),
"HTTP": ("http-export", HttpBatchExportInputs),
"NoOp": ("no-op", NoOpInputs),
}

Expand Down
3 changes: 2 additions & 1 deletion posthog/settings/temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
BATCH_EXPORT_SNOWFLAKE_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 100 # 100MB
BATCH_EXPORT_POSTGRES_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 50 # 50MB
BATCH_EXPORT_BIGQUERY_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 100 # 100MB

BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 10 # 10MB
BATCH_EXPORT_HTTP_BATCH_SIZE = 1000

UNCONSTRAINED_TIMESTAMP_TEAM_IDS = get_list(os.getenv("UNCONSTRAINED_TIMESTAMP_TEAM_IDS", ""))
6 changes: 6 additions & 0 deletions posthog/temporal/batch_exports/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
SnowflakeBatchExportWorkflow,
insert_into_snowflake_activity,
)
from posthog.temporal.batch_exports.http_batch_export import (
HttpBatchExportWorkflow,
insert_into_http_activity,
)
from posthog.temporal.batch_exports.squash_person_overrides import *

WORKFLOWS = [
Expand All @@ -42,6 +46,7 @@
RedshiftBatchExportWorkflow,
S3BatchExportWorkflow,
SnowflakeBatchExportWorkflow,
HttpBatchExportWorkflow,
SquashPersonOverridesWorkflow,
]

Expand All @@ -58,6 +63,7 @@
insert_into_redshift_activity,
insert_into_s3_activity,
insert_into_snowflake_activity,
insert_into_http_activity,
noop_activity,
prepare_dictionary,
prepare_person_overrides,
Expand Down
8 changes: 8 additions & 0 deletions posthog/temporal/batch_exports/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,14 @@ def write(self, content: bytes | str):

return result

def write_record_as_bytes(self, record: bytes):
result = self.write(record)

self.records_total += 1
self.records_since_last_reset += 1

return result

def write_records_to_jsonl(self, records):
"""Write records to a temporary file as JSONL."""
if len(records) == 1:
Expand Down
251 changes: 251 additions & 0 deletions posthog/temporal/batch_exports/http_batch_export.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
import aiohttp
import datetime as dt
import json
from dataclasses import dataclass

from django.conf import settings
from temporalio import activity, workflow
from temporalio.common import RetryPolicy

from posthog.batch_exports.service import BatchExportField, BatchExportSchema, HttpBatchExportInputs
from posthog.temporal.batch_exports.base import PostHogWorkflow
from posthog.temporal.batch_exports.batch_exports import (
BatchExportTemporaryFile,
CreateBatchExportRunInputs,
UpdateBatchExportRunStatusInputs,
create_export_run,
execute_batch_export_insert_activity,
get_data_interval,
get_rows_count,
iter_records,
json_dumps_bytes,
)
from posthog.temporal.batch_exports.clickhouse import get_client
from posthog.temporal.batch_exports.metrics import (
get_bytes_exported_metric,
get_rows_exported_metric,
)
from posthog.temporal.common.logger import bind_temporal_worker_logger


def http_default_fields() -> list[BatchExportField]:
return [
BatchExportField(expression="toString(uuid)", alias="uuid"),
BatchExportField(expression="timestamp", alias="timestamp"),
BatchExportField(expression="event", alias="event"),
BatchExportField(expression="nullIf(properties, '')", alias="properties"),
BatchExportField(expression="toString(distinct_id)", alias="distinct_id"),
BatchExportField(expression="toJSONString(elements_chain)", alias="elements_chain"),
]


@dataclass
class HttpInsertInputs:
"""Inputs for HTTP insert activity."""

team_id: int
url: str
token: str
data_interval_start: str
data_interval_end: str
exclude_events: list[str] | None = None
include_events: list[str] | None = None
batch_export_schema: BatchExportSchema | None = None


async def post_json_file_to_url(url, batch_file):
batch_file.seek(0)
async with aiohttp.ClientSession() as session:
headers = {"Content-Type": "application/json"}
async with session.post(url, data=batch_file, headers=headers, raise_for_status=True) as response:
return response


@activity.defn
async def insert_into_http_activity(inputs: HttpInsertInputs):
"""Activity streams data from ClickHouse to an HTTP Endpoint."""
logger = await bind_temporal_worker_logger(team_id=inputs.team_id, destination="HTTP")
logger.info(
"Exporting batch %s - %s",
inputs.data_interval_start,
inputs.data_interval_end,
)

async with get_client() as client:
if not await client.is_alive():
raise ConnectionError("Cannot establish connection to ClickHouse")

count = await get_rows_count(
client=client,
team_id=inputs.team_id,
interval_start=inputs.data_interval_start,
interval_end=inputs.data_interval_end,
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
)

if count == 0:
logger.info(
"Nothing to export in batch %s - %s",
inputs.data_interval_start,
inputs.data_interval_end,
)
return

logger.info("BatchExporting %s rows", count)

if inputs.batch_export_schema is not None:
raise NotImplementedError("Batch export schema is not supported for HTTP export")

fields = http_default_fields()
columns = [field["alias"] for field in fields]

record_iterator = iter_records(
client=client,
team_id=inputs.team_id,
interval_start=inputs.data_interval_start,
interval_end=inputs.data_interval_end,
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
fields=fields,
extra_query_parameters=None,
)

rows_exported = get_rows_exported_metric()
bytes_exported = get_bytes_exported_metric()

# The HTTP destination currently only supports the PostHog batch capture endpoint. In the
# future we may support other endpoints, but we'll need a way to template the request body,
# headers, etc.
#
# For now, we write the batch out in PostHog capture format, which means each Batch Export
# temporary file starts with a header and ends with a footer.
#
# For example:
#
# Header written when temp file is opened: {"api_key": "api-key-from-inputs","batch": [
# Each record is written out as an object: {"event": "foo", ...},
# Finally, a footer is written out: ]}
#
# Why write to a file at all? Because we need to serialize the data anyway, and it's the
# safest way to stay within batch endpoint payload limits and not waste process memory.
posthog_batch_header = """{"api_key": "%s","batch": [""" % inputs.token
posthog_batch_footer = "]}"

with BatchExportTemporaryFile() as batch_file:

def write_event_to_batch(event):
if batch_file.records_since_last_reset == 0:
batch_file.write(posthog_batch_header)
else:
batch_file.write(",")

batch_file.write_record_as_bytes(json_dumps_bytes(event))

async def flush_batch():
logger.debug(
"Sending %s records of size %s bytes",
batch_file.records_since_last_reset,
batch_file.bytes_since_last_reset,
)

batch_file.write(posthog_batch_footer)
await post_json_file_to_url(inputs.url, batch_file)

rows_exported.add(batch_file.records_since_last_reset)
bytes_exported.add(batch_file.bytes_since_last_reset)

for record_batch in record_iterator:
for row in record_batch.select(columns).to_pylist():
# Format result row as PostHog event, write JSON to the batch file.

properties = row["properties"]
properties = json.loads(properties) if properties else None
if row["event"] == "$autocapture" and row["elements_chain"] is not None:
properties["$elements_chain"] = row["elements_chain"]

capture_event = {
"uuid": row["uuid"],
"distinct_id": row["distinct_id"],
"timestamp": row["timestamp"],
"event": row["event"],
"properties": properties,
}

write_event_to_batch(capture_event)

if (
batch_file.tell() > settings.BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES
or batch_file.records_since_last_reset >= settings.BATCH_EXPORT_HTTP_BATCH_SIZE
):
await flush_batch()
batch_file.reset()

if batch_file.tell() > 0:
await flush_batch()


@workflow.defn(name="http-export")
class HttpBatchExportWorkflow(PostHogWorkflow):
"""A Temporal Workflow to export ClickHouse data to an HTTP endpoint.
This Workflow is intended to be executed both manually and by a Temporal
Schedule. When ran by a schedule, `data_interval_end` should be set to
`None` so that we will fetch the end of the interval from the Temporal
search attribute `TemporalScheduledStartTime`.
"""

@staticmethod
def parse_inputs(inputs: list[str]) -> HttpBatchExportInputs:
"""Parse inputs from the management command CLI."""
loaded = json.loads(inputs[0])
return HttpBatchExportInputs(**loaded)

@workflow.run
async def run(self, inputs: HttpBatchExportInputs):
"""Workflow implementation to export data to an HTTP Endpoint."""
data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end)

create_export_run_inputs = CreateBatchExportRunInputs(
team_id=inputs.team_id,
batch_export_id=inputs.batch_export_id,
data_interval_start=data_interval_start.isoformat(),
data_interval_end=data_interval_end.isoformat(),
)
run_id = await workflow.execute_activity(
create_export_run,
create_export_run_inputs,
start_to_close_timeout=dt.timedelta(minutes=5),
retry_policy=RetryPolicy(
initial_interval=dt.timedelta(seconds=10),
maximum_interval=dt.timedelta(seconds=60),
maximum_attempts=0,
non_retryable_error_types=[],
),
)

update_inputs = UpdateBatchExportRunStatusInputs(
id=run_id,
status="Completed",
team_id=inputs.team_id,
)

insert_inputs = HttpInsertInputs(
team_id=inputs.team_id,
url=inputs.url,
token=inputs.token,
data_interval_start=data_interval_start.isoformat(),
data_interval_end=data_interval_end.isoformat(),
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
batch_export_schema=inputs.batch_export_schema,
)

await execute_batch_export_insert_activity(
insert_into_http_activity,
insert_inputs,
non_retryable_error_types=[],
update_inputs=update_inputs,
# Disable heartbeat timeout until we add heartbeat support.
heartbeat_timeout_seconds=None,
)
Loading

0 comments on commit f520db8

Please sign in to comment.