Skip to content

Commit

Permalink
feat: Support multiple models in S3 batch export (#23105)
Browse files Browse the repository at this point in the history
* refactor: Update metrics to fetch counts at request time

* fix: Move import to method

* fix: Add function

* feat: Custom schemas for batch exports

* feat: Frontend support for model field

* fix: Clean-up

* fix: Add missing migration

* fix: Make new field nullable

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (1)

* fix: Bump migration number

* fix: Bump migration number

* refactor: Update metrics to fetch counts at request time

* refactor: Switch to counting runs

* refactor: Support batch export models as views

* fix: Merge conflict

* fix: Quality check fixes

* refactor: Update metrics to fetch counts at request time

* fix: Move import to method

* fix: Add function

* fix: Typing fixes

* feat: Custom schemas for batch exports

* feat: Frontend support for model field

* fix: Clean-up

* fix: Add missing migration

* fix: Make new field nullable

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (1)

* fix: Bump migration number

* fix: Clean-up unused code

* fix: Clean-up unused function

* fix: Only run extra clickhouse queries in batch exports tests

* feat: Support multiple models in all batch export destinations

This commit introduces a new `batch_export_model` input for all batch
export destinations. This input is of type `BatchExportModel` which
can be used to indicate which model a batch export is supposed to
target. This opens the door to creating "persons" model exports, and
eventually extending this further with more changes.

The change was done in a backwards compatible way, so
`batch_export_schema` was not removed, and existing export
configurations should continue to work (as evidenced by unit tests
passing without changes).

However, moving forward all batch exports will be created with
`batch_export_schema` set to `None` and `batch_export_model` defined
in its place.

After updating existing exports, `batch_export_schema` and any other code
associated with backwards compatibility can be deleted.

* fix: Add additional type hints and update tests

* chore: Add test utilities for multi-model batch exports

* fix: Pass properties included as keyword arguments

* fix: Support custom key prefixes for multi-model

* feat: Unit testing for S3 batch exports with multi-model support

* fix: Add and correct type hints

* fix: Typo in parameter

* fix: API tests now work with model parameter

* fix: Re-add hosts to docker-compose

* fix: Use UTC timezone alias

* fix: Add missing test column

* revert: Python 3.11 alias

---------

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
tomasfarias and github-actions[bot] authored Jun 25, 2024
1 parent f237981 commit ae3f5a0
Show file tree
Hide file tree
Showing 15 changed files with 680 additions and 351 deletions.
2 changes: 1 addition & 1 deletion posthog/api/test/batch_exports/test_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ def test_create_batch_export_with_custom_schema(client: HttpClient):
}

assert batch_export.schema == expected_schema
assert args["batch_export_schema"] == expected_schema
assert args["batch_export_model"] == {"name": "events", "schema": expected_schema}


@pytest.mark.parametrize(
Expand Down
37 changes: 20 additions & 17 deletions posthog/api/test/batch_exports/test_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,23 +315,26 @@ def test_can_patch_hogql_query(client: HttpClient):
args = json.loads(decoded_payload[0].data)
assert args["bucket_name"] == "my-production-s3-bucket"
assert args["interval"] == "hour"
assert args["batch_export_schema"] == {
"fields": [
{
"alias": "uuid",
"expression": "toString(events.uuid)",
},
{
"alias": "test",
"expression": "%(hogql_val_0)s",
},
{
"alias": "n",
"expression": "accurateCastOrNull(plus(1, 1), %(hogql_val_1)s)",
},
],
"values": {"hogql_val_0": "test", "hogql_val_1": "Int64"},
"hogql_query": "SELECT toString(uuid) AS uuid, 'test' AS test, toInt(plus(1, 1)) AS n FROM events",
assert args["batch_export_model"] == {
"name": "events",
"schema": {
"fields": [
{
"alias": "uuid",
"expression": "toString(events.uuid)",
},
{
"alias": "test",
"expression": "%(hogql_val_0)s",
},
{
"alias": "n",
"expression": "accurateCastOrNull(plus(1, 1), %(hogql_val_1)s)",
},
],
"values": {"hogql_val_0": "test", "hogql_val_1": "Int64"},
"hogql_query": "SELECT toString(uuid) AS uuid, 'test' AS test, toInt(plus(1, 1)) AS n FROM events",
},
}


Expand Down
2 changes: 1 addition & 1 deletion posthog/batch_exports/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import structlog
from django.db import transaction
from django.utils.timezone import now
from rest_framework import request, response, serializers, viewsets, filters
from rest_framework import filters, request, response, serializers, viewsets
from rest_framework.decorators import action
from rest_framework.exceptions import (
NotAuthenticated,
Expand Down
37 changes: 29 additions & 8 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,15 @@ class BatchExportSchema(typing.TypedDict):
values: dict[str, str]


@dataclass
class BatchExportModel:
name: str
schema: BatchExportSchema | None


class BatchExportsInputsProtocol(typing.Protocol):
team_id: int
batch_export_schema: BatchExportSchema | None = None
batch_export_model: BatchExportModel | None = None
is_backfill: bool = False


Expand Down Expand Up @@ -90,10 +96,11 @@ class S3BatchExportInputs:
include_events: list[str] | None = None
encryption: str | None = None
kms_key_id: str | None = None
batch_export_schema: BatchExportSchema | None = None
endpoint_url: str | None = None
file_format: str = "JSONLines"
is_backfill: bool = False
batch_export_model: BatchExportModel | None = None
batch_export_schema: BatchExportSchema | None = None


@dataclass
Expand All @@ -114,8 +121,9 @@ class SnowflakeBatchExportInputs:
role: str | None = None
exclude_events: list[str] | None = None
include_events: list[str] | None = None
batch_export_schema: BatchExportSchema | None = None
is_backfill: bool = False
batch_export_model: BatchExportModel | None = None
batch_export_schema: BatchExportSchema | None = None


@dataclass
Expand All @@ -136,8 +144,9 @@ class PostgresBatchExportInputs:
data_interval_end: str | None = None
exclude_events: list[str] | None = None
include_events: list[str] | None = None
batch_export_schema: BatchExportSchema | None = None
is_backfill: bool = False
batch_export_model: BatchExportModel | None = None
batch_export_schema: BatchExportSchema | None = None


@dataclass
Expand Down Expand Up @@ -165,8 +174,9 @@ class BigQueryBatchExportInputs:
exclude_events: list[str] | None = None
include_events: list[str] | None = None
use_json_type: bool = False
batch_export_schema: BatchExportSchema | None = None
is_backfill: bool = False
batch_export_model: BatchExportModel | None = None
batch_export_schema: BatchExportSchema | None = None


@dataclass
Expand All @@ -181,8 +191,9 @@ class HttpBatchExportInputs:
data_interval_end: str | None = None
exclude_events: list[str] | None = None
include_events: list[str] | None = None
batch_export_schema: BatchExportSchema | None = None
is_backfill: bool = False
batch_export_model: BatchExportModel | None = None
batch_export_schema: BatchExportSchema | None = None


@dataclass
Expand All @@ -193,8 +204,9 @@ class NoOpInputs:
team_id: int
interval: str = "hour"
arg: str = ""
batch_export_schema: BatchExportSchema | None = None
is_backfill: bool = False
batch_export_model: BatchExportModel | None = None
batch_export_schema: BatchExportSchema | None = None


DESTINATION_WORKFLOWS = {
Expand Down Expand Up @@ -609,7 +621,16 @@ def sync_batch_export(batch_export: BatchExport, created: bool):
team_id=batch_export.team.id,
batch_export_id=str(batch_export.id),
interval=str(batch_export.interval),
batch_export_schema=batch_export.schema,
batch_export_model=BatchExportModel(
name=batch_export.model or "events",
schema=batch_export.schema,
),
# TODO: This field is deprecated, but we still set it for backwards compatibility.
# New exports created will always have `batch_export_schema` set to `None`, but existing
# batch exports may still be using it.
# This assignment should be removed after updating all existing exports to use
# `batch_export_model` instead.
batch_export_schema=None,
**destination_config,
)
),
Expand Down
59 changes: 49 additions & 10 deletions posthog/temporal/batch_exports/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from posthog.batch_exports.models import BatchExportBackfill, BatchExportRun
from posthog.batch_exports.service import (
BatchExportField,
BatchExportModel,
BatchExportSchema,
acount_failed_batch_export_runs,
acreate_batch_export_backfill,
acreate_batch_export_run,
Expand Down Expand Up @@ -111,37 +113,74 @@ def default_fields() -> list[BatchExportField]:
]


DEFAULT_MODELS = {"events", "persons"}


async def iter_model_records(
client: ClickHouseClient, model: str, team_id: int, is_backfill: bool, **parameters
client: ClickHouseClient,
model: BatchExportModel | BatchExportSchema | None,
team_id: int,
is_backfill: bool,
destination_default_fields: list[BatchExportField] | None = None,
**parameters,
) -> AsyncRecordsGenerator:
if model in DEFAULT_MODELS:
if destination_default_fields is None:
batch_export_default_fields = default_fields()
else:
batch_export_default_fields = destination_default_fields

if isinstance(model, BatchExportModel):
async for record in iter_records_from_model_view(
client=client, model=model, team_id=team_id, is_backfill=is_backfill, **parameters
client=client,
model_name=model.name,
team_id=team_id,
is_backfill=is_backfill,
fields=model.schema["fields"] if model.schema is not None else batch_export_default_fields,
extra_query_parameters=model.schema["values"] if model.schema is not None else None,
**parameters,
):
yield record

else:
for record in iter_records(client, team_id=team_id, is_backfill=is_backfill, **parameters):
for record in iter_records(
client,
team_id=team_id,
is_backfill=is_backfill,
fields=model["fields"] if model is not None else batch_export_default_fields,
extra_query_parameters=model["values"] if model is not None else None,
**parameters,
):
yield record


async def iter_records_from_model_view(
client: ClickHouseClient, model: str, is_backfill: bool, team_id: int, **parameters
client: ClickHouseClient,
model_name: str,
is_backfill: bool,
team_id: int,
interval_start: str,
interval_end: str,
**parameters,
) -> AsyncRecordsGenerator:
if model == "persons":
if model_name == "persons":
view = SELECT_FROM_PERSONS_VIEW
else:
# TODO: Let this model be exported by `astream_query_as_arrow`.
# Just to reduce risk, I don't want to change the function that runs 100% of the exports
# without battle testing it first.
# There are already changes going out to the queries themselves that will impact events in a
# positive way. So, we can come back later and drop this block.
for record_batch in iter_records(client, team_id=team_id, is_backfill=is_backfill, **parameters):
for record_batch in iter_records(
client,
team_id=team_id,
is_backfill=is_backfill,
interval_start=interval_start,
interval_end=interval_end,
**parameters,
):
yield record_batch
return

parameters["team_id"] = team_id
parameters["interval_start"] = dt.datetime.fromisoformat(interval_start).strftime("%Y-%m-%d %H:%M:%S")
parameters["interval_end"] = dt.datetime.fromisoformat(interval_end).strftime("%Y-%m-%d %H:%M:%S")
async for record_batch in client.astream_query_as_arrow(view, query_parameters=parameters):
yield record_batch

Expand Down
26 changes: 15 additions & 11 deletions posthog/temporal/batch_exports/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from posthog.batch_exports.models import BatchExportRun
from posthog.batch_exports.service import (
BatchExportField,
BatchExportModel,
BatchExportSchema,
BigQueryBatchExportInputs,
)
Expand Down Expand Up @@ -150,9 +151,11 @@ class BigQueryInsertInputs:
exclude_events: list[str] | None = None
include_events: list[str] | None = None
use_json_type: bool = False
batch_export_schema: BatchExportSchema | None = None
run_id: str | None = None
is_backfill: bool = False
batch_export_model: BatchExportModel | None = None
# TODO: Remove after updating existing batch exports
batch_export_schema: BatchExportSchema | None = None


@contextlib.contextmanager
Expand Down Expand Up @@ -230,24 +233,23 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records
if not await client.is_alive():
raise ConnectionError("Cannot establish connection to ClickHouse")

if inputs.batch_export_schema is None:
fields = bigquery_default_fields()
query_parameters = None

model: BatchExportModel | BatchExportSchema | None = None
if inputs.batch_export_schema is None and "batch_export_model" in {
field.name for field in dataclasses.fields(inputs)
}:
model = inputs.batch_export_model
else:
fields = inputs.batch_export_schema["fields"]
query_parameters = inputs.batch_export_schema["values"]
model = inputs.batch_export_schema

records_iterator = iter_model_records(
client=client,
model="events",
model=model,
team_id=inputs.team_id,
interval_start=data_interval_start,
interval_end=inputs.data_interval_end,
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
fields=fields,
extra_query_parameters=query_parameters,
destination_default_fields=bigquery_default_fields(),
is_backfill=inputs.is_backfill,
)

Expand Down Expand Up @@ -408,9 +410,11 @@ async def run(self, inputs: BigQueryBatchExportInputs):
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
use_json_type=inputs.use_json_type,
batch_export_schema=inputs.batch_export_schema,
run_id=run_id,
is_backfill=inputs.is_backfill,
batch_export_model=inputs.batch_export_model,
# TODO: Remove after updating existing batch exports.
batch_export_schema=inputs.batch_export_schema,
)

await execute_batch_export_insert_activity(
Expand Down
9 changes: 6 additions & 3 deletions posthog/temporal/batch_exports/http_batch_export.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import asyncio
import dataclasses
import datetime as dt
import io
import json
from dataclasses import dataclass

import aiohttp
from django.conf import settings
Expand All @@ -11,6 +11,7 @@

from posthog.batch_exports.service import (
BatchExportField,
BatchExportModel,
BatchExportSchema,
HttpBatchExportInputs,
)
Expand Down Expand Up @@ -92,7 +93,7 @@ def from_activity_details(cls, details) -> "HeartbeatDetails":
return HeartbeatDetails(last_uploaded_timestamp)


@dataclass
@dataclasses.dataclass
class HttpInsertInputs:
"""Inputs for HTTP insert activity."""

Expand All @@ -104,8 +105,9 @@ class HttpInsertInputs:
exclude_events: list[str] | None = None
include_events: list[str] | None = None
run_id: str | None = None
batch_export_schema: BatchExportSchema | None = None
is_backfill: bool = False
batch_export_model: BatchExportModel | None = None
batch_export_schema: BatchExportSchema | None = None


async def maybe_resume_from_heartbeat(inputs: HttpInsertInputs) -> str:
Expand Down Expand Up @@ -357,6 +359,7 @@ async def run(self, inputs: HttpBatchExportInputs):
batch_export_schema=inputs.batch_export_schema,
run_id=run_id,
is_backfill=inputs.is_backfill,
batch_export_model=inputs.batch_export_model,
)

await execute_batch_export_insert_activity(
Expand Down
Loading

0 comments on commit ae3f5a0

Please sign in to comment.