Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support multiple models in S3 batch export #23105

Merged
merged 53 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
22f2454
refactor: Update metrics to fetch counts at request time
tomasfarias Jun 7, 2024
93b5994
fix: Move import to method
tomasfarias Jun 7, 2024
ab52058
fix: Add function
tomasfarias Jun 10, 2024
3ee1342
feat: Custom schemas for batch exports
tomasfarias Jun 12, 2024
9097707
feat: Frontend support for model field
tomasfarias Jun 12, 2024
cd3947a
fix: Clean-up
tomasfarias Jun 12, 2024
5387638
fix: Add missing migration
tomasfarias Jun 12, 2024
806b83b
fix: Make new field nullable
tomasfarias Jun 12, 2024
c514f7b
Update UI snapshots for `chromium` (1)
github-actions[bot] Jun 12, 2024
53090b6
Update UI snapshots for `chromium` (1)
github-actions[bot] Jun 12, 2024
1c61eb0
Update UI snapshots for `chromium` (1)
github-actions[bot] Jun 12, 2024
44f9030
Update UI snapshots for `chromium` (1)
github-actions[bot] Jun 12, 2024
8ecf26b
Update UI snapshots for `chromium` (1)
github-actions[bot] Jun 12, 2024
a444135
Update UI snapshots for `chromium` (1)
github-actions[bot] Jun 12, 2024
2bd9601
fix: Bump migration number
tomasfarias Jun 18, 2024
5ae88c1
fix: Bump migration number
tomasfarias Jun 18, 2024
6210eb9
refactor: Update metrics to fetch counts at request time
tomasfarias Jun 7, 2024
70bf7b5
refactor: Switch to counting runs
tomasfarias Jun 12, 2024
47cc4e0
refactor: Support batch export models as views
tomasfarias Jun 18, 2024
5e85d52
fix: Merge conflict
tomasfarias Jun 18, 2024
56e8f73
fix: Quality check fixes
tomasfarias Jun 18, 2024
7e29650
refactor: Update metrics to fetch counts at request time
tomasfarias Jun 7, 2024
6c5f5e0
fix: Move import to method
tomasfarias Jun 7, 2024
848e257
fix: Add function
tomasfarias Jun 10, 2024
412c333
fix: Typing fixes
tomasfarias Jun 10, 2024
95d8355
feat: Custom schemas for batch exports
tomasfarias Jun 12, 2024
5c180f0
feat: Frontend support for model field
tomasfarias Jun 12, 2024
2fd98ac
fix: Clean-up
tomasfarias Jun 12, 2024
e92a600
fix: Add missing migration
tomasfarias Jun 12, 2024
b2feebc
fix: Make new field nullable
tomasfarias Jun 12, 2024
082917a
Update UI snapshots for `chromium` (1)
github-actions[bot] Jun 12, 2024
aca3436
Update UI snapshots for `chromium` (1)
github-actions[bot] Jun 12, 2024
65a1c8a
Update UI snapshots for `chromium` (1)
github-actions[bot] Jun 12, 2024
14f5c06
Update UI snapshots for `chromium` (1)
github-actions[bot] Jun 12, 2024
2104236
Update UI snapshots for `chromium` (1)
github-actions[bot] Jun 12, 2024
db9f9e4
Update UI snapshots for `chromium` (1)
github-actions[bot] Jun 12, 2024
947b14e
fix: Bump migration number
tomasfarias Jun 18, 2024
212f776
fix: Clean-up unused code
tomasfarias Jun 18, 2024
7c97f50
fix: Clean-up unused function
tomasfarias Jun 18, 2024
d32e42f
fix: Only run extra clickhouse queries in batch exports tests
tomasfarias Jun 19, 2024
baa3e2a
feat: Support multiple models in all batch export destinations
tomasfarias Jun 20, 2024
0c76d93
fix: Add additional type hints and update tests
tomasfarias Jun 20, 2024
99efaec
chore: Add test utilities for multi-model batch exports
tomasfarias Jun 20, 2024
354a936
fix: Pass properties included as keyword arguments
tomasfarias Jun 20, 2024
d87a15e
fix: Support custom key prefixes for multi-model
tomasfarias Jun 20, 2024
38d196f
feat: Unit testing for S3 batch exports with multi-model support
tomasfarias Jun 20, 2024
3f87f03
fix: Add and correct type hints
tomasfarias Jun 20, 2024
850ef73
fix: Typo in parameter
tomasfarias Jun 20, 2024
feaaf9f
fix: API tests now work with model parameter
tomasfarias Jun 20, 2024
591c3eb
fix: Re-add hosts to docker-compose
tomasfarias Jun 24, 2024
674e95d
fix: Use UTC timezone alias
tomasfarias Jun 24, 2024
fc3ef02
fix: Add missing test column
tomasfarias Jun 25, 2024
e32b4a5
revert: Python 3.11 alias
tomasfarias Jun 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion posthog/api/test/batch_exports/test_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ def test_create_batch_export_with_custom_schema(client: HttpClient):
}

assert batch_export.schema == expected_schema
assert args["batch_export_schema"] == expected_schema
assert args["batch_export_model"] == {"name": "events", "schema": expected_schema}


@pytest.mark.parametrize(
Expand Down
37 changes: 20 additions & 17 deletions posthog/api/test/batch_exports/test_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,23 +315,26 @@ def test_can_patch_hogql_query(client: HttpClient):
args = json.loads(decoded_payload[0].data)
assert args["bucket_name"] == "my-production-s3-bucket"
assert args["interval"] == "hour"
assert args["batch_export_schema"] == {
"fields": [
{
"alias": "uuid",
"expression": "toString(events.uuid)",
},
{
"alias": "test",
"expression": "%(hogql_val_0)s",
},
{
"alias": "n",
"expression": "accurateCastOrNull(plus(1, 1), %(hogql_val_1)s)",
},
],
"values": {"hogql_val_0": "test", "hogql_val_1": "Int64"},
"hogql_query": "SELECT toString(uuid) AS uuid, 'test' AS test, toInt(plus(1, 1)) AS n FROM events",
assert args["batch_export_model"] == {
"name": "events",
"schema": {
"fields": [
{
"alias": "uuid",
"expression": "toString(events.uuid)",
},
{
"alias": "test",
"expression": "%(hogql_val_0)s",
},
{
"alias": "n",
"expression": "accurateCastOrNull(plus(1, 1), %(hogql_val_1)s)",
},
],
"values": {"hogql_val_0": "test", "hogql_val_1": "Int64"},
"hogql_query": "SELECT toString(uuid) AS uuid, 'test' AS test, toInt(plus(1, 1)) AS n FROM events",
},
}


Expand Down
2 changes: 1 addition & 1 deletion posthog/batch_exports/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import structlog
from django.db import transaction
from django.utils.timezone import now
from rest_framework import request, response, serializers, viewsets, filters
from rest_framework import filters, request, response, serializers, viewsets
from rest_framework.decorators import action
from rest_framework.exceptions import (
NotAuthenticated,
Expand Down
37 changes: 29 additions & 8 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,15 @@ class BatchExportSchema(typing.TypedDict):
values: dict[str, str]


@dataclass
class BatchExportModel:
name: str
schema: BatchExportSchema | None


class BatchExportsInputsProtocol(typing.Protocol):
team_id: int
batch_export_schema: BatchExportSchema | None = None
batch_export_model: BatchExportModel | None = None
is_backfill: bool = False


Expand Down Expand Up @@ -90,10 +96,11 @@ class S3BatchExportInputs:
include_events: list[str] | None = None
encryption: str | None = None
kms_key_id: str | None = None
batch_export_schema: BatchExportSchema | None = None
endpoint_url: str | None = None
file_format: str = "JSONLines"
is_backfill: bool = False
batch_export_model: BatchExportModel | None = None
batch_export_schema: BatchExportSchema | None = None


@dataclass
Expand All @@ -114,8 +121,9 @@ class SnowflakeBatchExportInputs:
role: str | None = None
exclude_events: list[str] | None = None
include_events: list[str] | None = None
batch_export_schema: BatchExportSchema | None = None
is_backfill: bool = False
batch_export_model: BatchExportModel | None = None
batch_export_schema: BatchExportSchema | None = None


@dataclass
Expand All @@ -136,8 +144,9 @@ class PostgresBatchExportInputs:
data_interval_end: str | None = None
exclude_events: list[str] | None = None
include_events: list[str] | None = None
batch_export_schema: BatchExportSchema | None = None
is_backfill: bool = False
batch_export_model: BatchExportModel | None = None
batch_export_schema: BatchExportSchema | None = None


@dataclass
Expand Down Expand Up @@ -165,8 +174,9 @@ class BigQueryBatchExportInputs:
exclude_events: list[str] | None = None
include_events: list[str] | None = None
use_json_type: bool = False
batch_export_schema: BatchExportSchema | None = None
is_backfill: bool = False
batch_export_model: BatchExportModel | None = None
batch_export_schema: BatchExportSchema | None = None


@dataclass
Expand All @@ -181,8 +191,9 @@ class HttpBatchExportInputs:
data_interval_end: str | None = None
exclude_events: list[str] | None = None
include_events: list[str] | None = None
batch_export_schema: BatchExportSchema | None = None
is_backfill: bool = False
batch_export_model: BatchExportModel | None = None
batch_export_schema: BatchExportSchema | None = None


@dataclass
Expand All @@ -193,8 +204,9 @@ class NoOpInputs:
team_id: int
interval: str = "hour"
arg: str = ""
batch_export_schema: BatchExportSchema | None = None
is_backfill: bool = False
batch_export_model: BatchExportModel | None = None
batch_export_schema: BatchExportSchema | None = None


DESTINATION_WORKFLOWS = {
Expand Down Expand Up @@ -609,7 +621,16 @@ def sync_batch_export(batch_export: BatchExport, created: bool):
team_id=batch_export.team.id,
batch_export_id=str(batch_export.id),
interval=str(batch_export.interval),
batch_export_schema=batch_export.schema,
batch_export_model=BatchExportModel(
name=batch_export.model or "events",
schema=batch_export.schema,
),
# TODO: This field is deprecated, but we still set it for backwards compatibility.
# New exports created will always have `batch_export_schema` set to `None`, but existing
# batch exports may still be using it.
# This assignment should be removed after updating all existing exports to use
# `batch_export_model` instead.
batch_export_schema=None,
**destination_config,
)
),
Expand Down
59 changes: 49 additions & 10 deletions posthog/temporal/batch_exports/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from posthog.batch_exports.models import BatchExportBackfill, BatchExportRun
from posthog.batch_exports.service import (
BatchExportField,
BatchExportModel,
BatchExportSchema,
acount_failed_batch_export_runs,
acreate_batch_export_backfill,
acreate_batch_export_run,
Expand Down Expand Up @@ -111,37 +113,74 @@ def default_fields() -> list[BatchExportField]:
]


DEFAULT_MODELS = {"events", "persons"}


async def iter_model_records(
client: ClickHouseClient, model: str, team_id: int, is_backfill: bool, **parameters
client: ClickHouseClient,
model: BatchExportModel | BatchExportSchema | None,
team_id: int,
is_backfill: bool,
destination_default_fields: list[BatchExportField] | None = None,
**parameters,
) -> AsyncRecordsGenerator:
if model in DEFAULT_MODELS:
if destination_default_fields is None:
batch_export_default_fields = default_fields()
else:
batch_export_default_fields = destination_default_fields

if isinstance(model, BatchExportModel):
async for record in iter_records_from_model_view(
client=client, model=model, team_id=team_id, is_backfill=is_backfill, **parameters
client=client,
model_name=model.name,
team_id=team_id,
is_backfill=is_backfill,
fields=model.schema["fields"] if model.schema is not None else batch_export_default_fields,
extra_query_parameters=model.schema["values"] if model.schema is not None else None,
**parameters,
):
yield record

else:
for record in iter_records(client, team_id=team_id, is_backfill=is_backfill, **parameters):
for record in iter_records(
client,
team_id=team_id,
is_backfill=is_backfill,
fields=model["fields"] if model is not None else batch_export_default_fields,
extra_query_parameters=model["values"] if model is not None else None,
**parameters,
):
yield record


async def iter_records_from_model_view(
client: ClickHouseClient, model: str, is_backfill: bool, team_id: int, **parameters
client: ClickHouseClient,
model_name: str,
is_backfill: bool,
team_id: int,
interval_start: str,
interval_end: str,
**parameters,
) -> AsyncRecordsGenerator:
if model == "persons":
if model_name == "persons":
view = SELECT_FROM_PERSONS_VIEW
else:
# TODO: Let this model be exported by `astream_query_as_arrow`.
# Just to reduce risk, I don't want to change the function that runs 100% of the exports
# without battle testing it first.
# There are already changes going out to the queries themselves that will impact events in a
# positive way. So, we can come back later and drop this block.
for record_batch in iter_records(client, team_id=team_id, is_backfill=is_backfill, **parameters):
for record_batch in iter_records(
client,
team_id=team_id,
is_backfill=is_backfill,
interval_start=interval_start,
interval_end=interval_end,
**parameters,
):
yield record_batch
return

parameters["team_id"] = team_id
parameters["interval_start"] = dt.datetime.fromisoformat(interval_start).strftime("%Y-%m-%d %H:%M:%S")
parameters["interval_end"] = dt.datetime.fromisoformat(interval_end).strftime("%Y-%m-%d %H:%M:%S")
async for record_batch in client.astream_query_as_arrow(view, query_parameters=parameters):
yield record_batch

Expand Down
26 changes: 15 additions & 11 deletions posthog/temporal/batch_exports/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from posthog.batch_exports.models import BatchExportRun
from posthog.batch_exports.service import (
BatchExportField,
BatchExportModel,
BatchExportSchema,
BigQueryBatchExportInputs,
)
Expand Down Expand Up @@ -150,9 +151,11 @@ class BigQueryInsertInputs:
exclude_events: list[str] | None = None
include_events: list[str] | None = None
use_json_type: bool = False
batch_export_schema: BatchExportSchema | None = None
run_id: str | None = None
is_backfill: bool = False
batch_export_model: BatchExportModel | None = None
# TODO: Remove after updating existing batch exports
batch_export_schema: BatchExportSchema | None = None


@contextlib.contextmanager
Expand Down Expand Up @@ -230,24 +233,23 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records
if not await client.is_alive():
raise ConnectionError("Cannot establish connection to ClickHouse")

if inputs.batch_export_schema is None:
fields = bigquery_default_fields()
query_parameters = None

model: BatchExportModel | BatchExportSchema | None = None
if inputs.batch_export_schema is None and "batch_export_model" in {
field.name for field in dataclasses.fields(inputs)
}:
model = inputs.batch_export_model
else:
fields = inputs.batch_export_schema["fields"]
query_parameters = inputs.batch_export_schema["values"]
model = inputs.batch_export_schema

records_iterator = iter_model_records(
client=client,
model="events",
model=model,
team_id=inputs.team_id,
interval_start=data_interval_start,
interval_end=inputs.data_interval_end,
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
fields=fields,
extra_query_parameters=query_parameters,
destination_default_fields=bigquery_default_fields(),
is_backfill=inputs.is_backfill,
)

Expand Down Expand Up @@ -408,9 +410,11 @@ async def run(self, inputs: BigQueryBatchExportInputs):
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
use_json_type=inputs.use_json_type,
batch_export_schema=inputs.batch_export_schema,
run_id=run_id,
is_backfill=inputs.is_backfill,
batch_export_model=inputs.batch_export_model,
# TODO: Remove after updating existing batch exports.
batch_export_schema=inputs.batch_export_schema,
)

await execute_batch_export_insert_activity(
Expand Down
9 changes: 6 additions & 3 deletions posthog/temporal/batch_exports/http_batch_export.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import asyncio
import dataclasses
import datetime as dt
import io
import json
from dataclasses import dataclass

import aiohttp
from django.conf import settings
Expand All @@ -11,6 +11,7 @@

from posthog.batch_exports.service import (
BatchExportField,
BatchExportModel,
BatchExportSchema,
HttpBatchExportInputs,
)
Expand Down Expand Up @@ -92,7 +93,7 @@ def from_activity_details(cls, details) -> "HeartbeatDetails":
return HeartbeatDetails(last_uploaded_timestamp)


@dataclass
@dataclasses.dataclass
class HttpInsertInputs:
"""Inputs for HTTP insert activity."""

Expand All @@ -104,8 +105,9 @@ class HttpInsertInputs:
exclude_events: list[str] | None = None
include_events: list[str] | None = None
run_id: str | None = None
batch_export_schema: BatchExportSchema | None = None
is_backfill: bool = False
batch_export_model: BatchExportModel | None = None
batch_export_schema: BatchExportSchema | None = None


async def maybe_resume_from_heartbeat(inputs: HttpInsertInputs) -> str:
Expand Down Expand Up @@ -357,6 +359,7 @@ async def run(self, inputs: HttpBatchExportInputs):
batch_export_schema=inputs.batch_export_schema,
run_id=run_id,
is_backfill=inputs.is_backfill,
batch_export_model=inputs.batch_export_model,
)

await execute_batch_export_insert_activity(
Expand Down
Loading
Loading