Skip to content

Commit

Permalink
refactor: Support batch export models as views (#23052)
Browse files Browse the repository at this point in the history
* refactor: Update metrics to fetch counts at request time

* fix: Move import to method

* fix: Add function

* feat: Custom schemas for batch exports

* feat: Frontend support for model field

* fix: Clean-up

* fix: Add missing migration

* fix: Make new field nullable

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (1)

* fix: Bump migration number

* fix: Bump migration number

* refactor: Update metrics to fetch counts at request time

* fix: Actually use include and exclude events

* refactor: Switch to counting runs

* refactor: Support batch export models as views

* fix: Merge conflict

* fix: Quality check fixes

* refactor: Update metrics to fetch counts at request time

* fix: Move import to method

* fix: Add function

* fix: Typing fixes

* feat: Custom schemas for batch exports

* feat: Frontend support for model field

* fix: Clean-up

* fix: Add missing migration

* fix: Make new field nullable

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (1)

* fix: Bump migration number

* fix: Clean-up unused code

* chore: Clean-up unused function and tests

* fix: Clean-up unused function

* fix: HTTP Batch export default fields

* fix: Remove test case on new column not present in base table

* chore: Clean-up unused functions and queries

* fix: Only run extra clickhouse queries in batch exports tests

* refactor: Remove coalesce and use only inserted_at in queries

At this point, inserted_at should always be set for all batch
exports. Only historical exports require _timestamp, but backfills
have already been switched over to query based on timestamp, so they
also do not need to check for inserted_at/_timestamp.

Removing the colaesce and using only inserted_at reduces the size
of the data CH has to fetch by half.

* fix: Remove deprecated test

* fix: Add person_id to person model and enforce ordering

* refactor: Also add version column

---------

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and timgl committed Jun 27, 2024
1 parent 7af5a57 commit 731955f
Show file tree
Hide file tree
Showing 22 changed files with 593 additions and 479 deletions.
23 changes: 0 additions & 23 deletions posthog/api/test/test_app_metrics.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import datetime as dt
import json
import uuid
from unittest import mock

from freezegun.api import freeze_time
Expand All @@ -9,7 +8,6 @@
from posthog.api.test.batch_exports.conftest import start_test_worker
from posthog.api.test.batch_exports.operations import create_batch_export_ok
from posthog.batch_exports.models import BatchExportRun
from posthog.client import sync_execute
from posthog.models.activity_logging.activity_log import Detail, Trigger, log_activity
from posthog.models.plugin import Plugin, PluginConfig
from posthog.models.utils import UUIDT
Expand All @@ -20,20 +18,6 @@
SAMPLE_PAYLOAD = {"dateRange": ["2021-06-10", "2022-06-12"], "parallelism": 1}


def insert_event(team_id: int, timestamp: dt.datetime, event: str = "test-event"):
sync_execute(
"INSERT INTO `sharded_events` (uuid, team_id, event, timestamp) VALUES",
[
{
"uuid": uuid.uuid4(),
"team_id": team_id,
"event": event,
"timestamp": timestamp,
}
],
)


@freeze_time("2021-12-05T13:23:00Z")
class TestAppMetricsAPI(ClickhouseTestMixin, APIBaseTest):
maxDiff = None
Expand Down Expand Up @@ -149,9 +133,6 @@ def test_retrieve_batch_export_runs_app_metrics(self):
data_interval_start=last_updated_at - dt.timedelta(hours=1),
status=BatchExportRun.Status.COMPLETED,
)
for _ in range(3):
insert_event(team_id=self.team.pk, timestamp=last_updated_at - dt.timedelta(minutes=1))

BatchExportRun.objects.create(
batch_export_id=batch_export_id,
data_interval_end=last_updated_at - dt.timedelta(hours=2),
Expand All @@ -164,9 +145,6 @@ def test_retrieve_batch_export_runs_app_metrics(self):
data_interval_start=last_updated_at - dt.timedelta(hours=3),
status=BatchExportRun.Status.FAILED_RETRYABLE,
)
for _ in range(5):
timestamp = last_updated_at - dt.timedelta(hours=2, minutes=1)
insert_event(team_id=self.team.pk, timestamp=timestamp)

response = self.client.get(f"/api/projects/@current/app_metrics/{batch_export_id}?date_from=-7d")
self.assertEqual(response.status_code, status.HTTP_200_OK)
Expand Down Expand Up @@ -235,7 +213,6 @@ def test_retrieve_batch_export_runs_app_metrics_defaults_to_zero(self):
data_interval_start=last_updated_at - dt.timedelta(hours=1),
status=BatchExportRun.Status.COMPLETED,
)
insert_event(team_id=self.team.pk, timestamp=last_updated_at - dt.timedelta(minutes=1))

response = self.client.get(f"/api/projects/@current/app_metrics/{batch_export_id}?date_from=-7d")
self.assertEqual(response.status_code, status.HTTP_200_OK)
Expand Down
135 changes: 135 additions & 0 deletions posthog/batch_exports/sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
CREATE_PERSONS_BATCH_EXPORT_VIEW = """
CREATE OR REPLACE VIEW persons_batch_export AS (
SELECT
pd.team_id AS team_id,
pd.distinct_id AS distinct_id,
toString(p.id) AS person_id,
p.properties AS properties,
pd.version AS version,
pd._timestamp AS _inserted_at
FROM (
SELECT
team_id,
distinct_id,
max(version) AS version,
argMax(person_id, person_distinct_id2.version) AS person_id,
max(_timestamp) AS _timestamp
FROM
person_distinct_id2
WHERE
team_id = {team_id:Int64}
GROUP BY
team_id,
distinct_id
) AS pd
INNER JOIN
person p ON p.id = pd.person_id AND p.team_id = pd.team_id
WHERE
pd.team_id = {team_id:Int64}
AND p.team_id = {team_id:Int64}
AND pd._timestamp >= {interval_start:DateTime64}
AND pd._timestamp < {interval_end:DateTime64}
ORDER BY
_inserted_at
)
"""

CREATE_EVENTS_BATCH_EXPORT_VIEW = """
CREATE OR REPLACE VIEW events_batch_export AS (
SELECT
team_id AS team_id,
min(timestamp) AS timestamp,
event AS event,
any(distinct_id) AS distinct_id,
any(toString(uuid)) AS uuid,
min(COALESCE(inserted_at, _timestamp)) AS _inserted_at,
any(created_at) AS created_at,
any(elements_chain) AS elements_chain,
any(toString(person_id)) AS person_id,
any(nullIf(properties, '')) AS properties,
any(nullIf(person_properties, '')) AS person_properties,
nullIf(JSONExtractString(properties, '$set'), '') AS set,
nullIf(JSONExtractString(properties, '$set_once'), '') AS set_once
FROM
events
PREWHERE
events.inserted_at >= {interval_start:DateTime64}
AND events.inserted_at < {interval_end:DateTime64}
WHERE
team_id = {team_id:Int64}
AND events.timestamp >= {interval_start:DateTime64} - INTERVAL {lookback_days:Int32} DAY
AND events.timestamp < {interval_end:DateTime64} + INTERVAL 1 DAY
AND (length({include_events:Array(String)}) = 0 OR event IN {include_events:Array(String)})
AND (length({exclude_events:Array(String)}) = 0 OR event NOT IN {exclude_events:Array(String)})
GROUP BY
team_id, toDate(events.timestamp), event, cityHash64(events.distinct_id), cityHash64(events.uuid)
ORDER BY
_inserted_at, event
SETTINGS optimize_aggregation_in_order=1
)
"""

CREATE_EVENTS_BATCH_EXPORT_VIEW_UNBOUNDED = """
CREATE OR REPLACE VIEW events_batch_export_unbounded AS (
SELECT
team_id AS team_id,
min(timestamp) AS timestamp,
event AS event,
any(distinct_id) AS distinct_id,
any(toString(uuid)) AS uuid,
min(COALESCE(inserted_at, _timestamp)) AS _inserted_at,
any(created_at) AS created_at,
any(elements_chain) AS elements_chain,
any(toString(person_id)) AS person_id,
any(nullIf(properties, '')) AS properties,
any(nullIf(person_properties, '')) AS person_properties,
nullIf(JSONExtractString(properties, '$set'), '') AS set,
nullIf(JSONExtractString(properties, '$set_once'), '') AS set_once
FROM
events
PREWHERE
events.inserted_at >= {interval_start:DateTime64}
AND events.inserted_at < {interval_end:DateTime64}
WHERE
team_id = {team_id:Int64}
AND (length({include_events:Array(String)}) = 0 OR event IN {include_events:Array(String)})
AND (length({exclude_events:Array(String)}) = 0 OR event NOT IN {exclude_events:Array(String)})
GROUP BY
team_id, toDate(events.timestamp), event, cityHash64(events.distinct_id), cityHash64(events.uuid)
ORDER BY
_inserted_at, event
SETTINGS optimize_aggregation_in_order=1
)
"""

CREATE_EVENTS_BATCH_EXPORT_VIEW_BACKFILL = """
CREATE OR REPLACE VIEW events_batch_export_backfill AS (
SELECT
team_id AS team_id,
min(timestamp) AS timestamp,
event AS event,
any(distinct_id) AS distinct_id,
any(toString(uuid)) AS uuid,
min(COALESCE(inserted_at, _timestamp)) AS _inserted_at,
any(created_at) AS created_at,
any(elements_chain) AS elements_chain,
any(toString(person_id)) AS person_id,
any(nullIf(properties, '')) AS properties,
any(nullIf(person_properties, '')) AS person_properties,
nullIf(JSONExtractString(properties, '$set'), '') AS set,
nullIf(JSONExtractString(properties, '$set_once'), '') AS set_once
FROM
events
WHERE
team_id = {team_id:Int64}
AND events.timestamp >= {interval_start:DateTime64}
AND events.timestamp < {interval_end:DateTime64}
AND (length({include_events:Array(String)}) = 0 OR event IN {include_events:Array(String)})
AND (length({exclude_events:Array(String)}) = 0 OR event NOT IN {exclude_events:Array(String)})
GROUP BY
team_id, toDate(events.timestamp), event, cityHash64(events.distinct_id), cityHash64(events.uuid)
ORDER BY
_inserted_at, event
SETTINGS optimize_aggregation_in_order=1
)
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from posthog.batch_exports.sql import (
CREATE_EVENTS_BATCH_EXPORT_VIEW,
CREATE_EVENTS_BATCH_EXPORT_VIEW_BACKFILL,
CREATE_EVENTS_BATCH_EXPORT_VIEW_UNBOUNDED,
CREATE_PERSONS_BATCH_EXPORT_VIEW,
)
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions

operations = map(
run_sql_with_exceptions,
[
CREATE_PERSONS_BATCH_EXPORT_VIEW,
CREATE_EVENTS_BATCH_EXPORT_VIEW,
CREATE_EVENTS_BATCH_EXPORT_VIEW_UNBOUNDED,
CREATE_EVENTS_BATCH_EXPORT_VIEW_BACKFILL,
],
)
12 changes: 6 additions & 6 deletions posthog/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ def create_clickhouse_tables(num_tables: int):
# Create clickhouse tables to default before running test
# Mostly so that test runs locally work correctly
from posthog.clickhouse.schema import (
CREATE_DATA_QUERIES,
CREATE_DICTIONARY_QUERIES,
CREATE_DISTRIBUTED_TABLE_QUERIES,
CREATE_MERGETREE_TABLE_QUERIES,
CREATE_MV_TABLE_QUERIES,
CREATE_DATA_QUERIES,
CREATE_DICTIONARY_QUERIES,
CREATE_VIEW_QUERIES,
build_query,
)
Expand Down Expand Up @@ -53,24 +53,24 @@ def reset_clickhouse_tables():
from posthog.clickhouse.plugin_log_entries import (
TRUNCATE_PLUGIN_LOG_ENTRIES_TABLE_SQL,
)
from posthog.heatmaps.sql import TRUNCATE_HEATMAPS_TABLE_SQL
from posthog.models.app_metrics.sql import TRUNCATE_APP_METRICS_TABLE_SQL
from posthog.models.channel_type.sql import TRUNCATE_CHANNEL_DEFINITION_TABLE_SQL
from posthog.models.cohort.sql import TRUNCATE_COHORTPEOPLE_TABLE_SQL
from posthog.models.event.sql import TRUNCATE_EVENTS_TABLE_SQL
from posthog.models.group.sql import TRUNCATE_GROUPS_TABLE_SQL
from posthog.models.performance.sql import TRUNCATE_PERFORMANCE_EVENTS_TABLE_SQL
from posthog.models.person.sql import (
TRUNCATE_PERSON_DISTINCT_ID2_TABLE_SQL,
TRUNCATE_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL,
TRUNCATE_PERSON_DISTINCT_ID_TABLE_SQL,
TRUNCATE_PERSON_STATIC_COHORT_TABLE_SQL,
TRUNCATE_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL,
TRUNCATE_PERSON_TABLE_SQL,
)
from posthog.models.sessions.sql import TRUNCATE_SESSIONS_TABLE_SQL
from posthog.session_recordings.sql.session_recording_event_sql import (
TRUNCATE_SESSION_RECORDING_EVENTS_TABLE_SQL,
)
from posthog.models.channel_type.sql import TRUNCATE_CHANNEL_DEFINITION_TABLE_SQL
from posthog.models.sessions.sql import TRUNCATE_SESSIONS_TABLE_SQL
from posthog.heatmaps.sql import TRUNCATE_HEATMAPS_TABLE_SQL

# REMEMBER TO ADD ANY NEW CLICKHOUSE TABLES TO THIS ARRAY!
TABLES_TO_CREATE_DROP = [
Expand Down
Loading

0 comments on commit 731955f

Please sign in to comment.