Skip to content

Commit

Permalink
feat: count replay events in ClickHouse as we ingest them (#16994)
Browse files Browse the repository at this point in the history
* feat: count replay events in ClickHouse as we ingest them

* Add to hogql db schema

* Update query snapshots

* Update query snapshots

* Update query snapshots

* don't need it on kafka table

* Update query snapshots

* update desired columns

* switch to counting events and messages

* Update query snapshots

* first pass addition of _timestamp

* maybe like this

* like this?

* Update query snapshots

* explicit message count

* Update query snapshots

* Update query snapshots

* Update query snapshots

* Update query snapshots

* Update UI snapshots for `chromium` (2)

* Update query snapshots

* hogql db schema too

* Update query snapshots

* Update UI snapshots for `chromium` (2)

* fix

---------

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
pauldambra and github-actions[bot] authored Sep 14, 2023
1 parent 5906ea8 commit 68a4e18
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 9 deletions.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 4 additions & 0 deletions plugin-server/src/worker/ingestion/process-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ export interface SummarizedSessionRecordingEvent {
console_warn_count: number
console_error_count: number
size: number
event_count: number
message_count: number
}

export const createSessionReplayEvent = (
Expand Down Expand Up @@ -357,6 +359,8 @@ export const createSessionReplayEvent = (
console_warn_count: Math.trunc(consoleWarnCount),
console_error_count: Math.trunc(consoleErrorCount),
size: Math.trunc(Buffer.byteLength(JSON.stringify(events), 'utf8')),
event_count: Math.trunc(events.length),
message_count: 1,
}

return data
Expand Down
12 changes: 12 additions & 0 deletions plugin-server/tests/main/process-event.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1264,6 +1264,8 @@ const sessionReplayEventTestCases: {
| 'console_warn_count'
| 'console_error_count'
| 'size'
| 'event_count'
| 'message_count'
>
}[] = [
{
Expand All @@ -1280,6 +1282,8 @@ const sessionReplayEventTestCases: {
console_warn_count: 0,
console_error_count: 0,
size: 73,
event_count: 1,
message_count: 1,
},
},
{
Expand All @@ -1296,6 +1300,8 @@ const sessionReplayEventTestCases: {
console_warn_count: 0,
console_error_count: 0,
size: 73,
event_count: 1,
message_count: 1,
},
},
{
Expand Down Expand Up @@ -1352,6 +1358,8 @@ const sessionReplayEventTestCases: {
console_warn_count: 3,
console_error_count: 1,
size: 762,
event_count: 7,
message_count: 1,
},
},
{
Expand Down Expand Up @@ -1390,6 +1398,8 @@ const sessionReplayEventTestCases: {
console_warn_count: 0,
console_error_count: 0,
size: 213,
event_count: 2,
message_count: 1,
},
},
{
Expand Down Expand Up @@ -1417,6 +1427,8 @@ const sessionReplayEventTestCases: {
console_warn_count: 0,
console_error_count: 0,
size: 433,
event_count: 6,
message_count: 1,
},
},
]
Expand Down
26 changes: 26 additions & 0 deletions posthog/clickhouse/migrations/0048_session_replay_events_count.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.models.session_replay_event.migrations_sql import (
DROP_SESSION_REPLAY_EVENTS_TABLE_MV_SQL,
DROP_KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL,
ADD_EVENT_COUNT_WRITABLE_SESSION_REPLAY_EVENTS_TABLE_SQL,
ADD_EVENT_COUNT_DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL,
ADD_EVENT_COUNT_SESSION_REPLAY_EVENTS_TABLE_SQL,
)
from posthog.models.session_replay_event.sql import (
SESSION_REPLAY_EVENTS_TABLE_MV_SQL,
KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL,
)

operations = [
# we have to drop materialized view first so that we're no longer pulling from kakfa
# then we drop the kafka table
run_sql_with_exceptions(DROP_SESSION_REPLAY_EVENTS_TABLE_MV_SQL()),
run_sql_with_exceptions(DROP_KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL()),
# now we can alter the target tables
run_sql_with_exceptions(ADD_EVENT_COUNT_WRITABLE_SESSION_REPLAY_EVENTS_TABLE_SQL()),
run_sql_with_exceptions(ADD_EVENT_COUNT_DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL()),
run_sql_with_exceptions(ADD_EVENT_COUNT_SESSION_REPLAY_EVENTS_TABLE_SQL()),
# and then recreate the materialized views and kafka tables
run_sql_with_exceptions(KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL()),
run_sql_with_exceptions(SESSION_REPLAY_EVENTS_TABLE_MV_SQL()),
]
44 changes: 38 additions & 6 deletions posthog/clickhouse/test/__snapshots__/test_schema.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,9 @@
console_log_count Int64,
console_warn_count Int64,
console_error_count Int64,
size Int64
size Int64,
event_count Int64,
message_count Int64
) ENGINE = Kafka('test.kafka.broker:9092', 'clickhouse_session_replay_events_test', 'group1', 'JSONEachRow')

'
Expand Down Expand Up @@ -922,7 +924,9 @@
console_log_count Int64,
console_warn_count Int64,
console_error_count Int64,
size Int64
size Int64,
event_count Int64,
message_count Int64
) ENGINE = Kafka('kafka:9092', 'clickhouse_session_replay_events_test', 'group1', 'JSONEachRow')

'
Expand Down Expand Up @@ -1344,7 +1348,15 @@
console_warn_count SimpleAggregateFunction(sum, Int64),
console_error_count SimpleAggregateFunction(sum, Int64),
-- this column allows us to estimate the amount of data that is being ingested
size SimpleAggregateFunction(sum, Int64)
size SimpleAggregateFunction(sum, Int64),
-- this allows us to count the number of messages received in a session
-- often very useful in incidents or debugging
message_count SimpleAggregateFunction(sum, Int64),
-- this allows us to count the number of snapshot events received in a session
-- often very useful in incidents or debugging
-- because we batch events we expect message_count to be lower than event_count
event_count SimpleAggregateFunction(sum, Int64),
_timestamp SimpleAggregateFunction(max, DateTime)
) ENGINE = Distributed('posthog', 'posthog_test', 'sharded_session_replay_events', sipHash64(distinct_id))

'
Expand Down Expand Up @@ -1377,7 +1389,11 @@
sum(console_log_count) as console_log_count,
sum(console_warn_count) as console_warn_count,
sum(console_error_count) as console_error_count,
sum(size) as size
sum(size) as size,
-- we can count the number of kafka messages instead of sending it explicitly
sum(message_count) as message_count,
sum(event_count) as event_count,
max(_timestamp) as _timestamp
FROM posthog_test.kafka_session_replay_events
group by session_id, team_id

Expand Down Expand Up @@ -1608,7 +1624,15 @@
console_warn_count SimpleAggregateFunction(sum, Int64),
console_error_count SimpleAggregateFunction(sum, Int64),
-- this column allows us to estimate the amount of data that is being ingested
size SimpleAggregateFunction(sum, Int64)
size SimpleAggregateFunction(sum, Int64),
-- this allows us to count the number of messages received in a session
-- often very useful in incidents or debugging
message_count SimpleAggregateFunction(sum, Int64),
-- this allows us to count the number of snapshot events received in a session
-- often very useful in incidents or debugging
-- because we batch events we expect message_count to be lower than event_count
event_count SimpleAggregateFunction(sum, Int64),
_timestamp SimpleAggregateFunction(max, DateTime)
) ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_{shard}/posthog.session_replay_events', '{replica}')

PARTITION BY toYYYYMM(min_first_timestamp)
Expand Down Expand Up @@ -2226,7 +2250,15 @@
console_warn_count SimpleAggregateFunction(sum, Int64),
console_error_count SimpleAggregateFunction(sum, Int64),
-- this column allows us to estimate the amount of data that is being ingested
size SimpleAggregateFunction(sum, Int64)
size SimpleAggregateFunction(sum, Int64),
-- this allows us to count the number of messages received in a session
-- often very useful in incidents or debugging
message_count SimpleAggregateFunction(sum, Int64),
-- this allows us to count the number of snapshot events received in a session
-- often very useful in incidents or debugging
-- because we batch events we expect message_count to be lower than event_count
event_count SimpleAggregateFunction(sum, Int64),
_timestamp SimpleAggregateFunction(max, DateTime)
) ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_{shard}/posthog.session_replay_events', '{replica}')

PARTITION BY toYYYYMM(min_first_timestamp)
Expand Down
4 changes: 4 additions & 0 deletions posthog/hogql/database/schema/session_replay_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
"console_warn_count": IntegerDatabaseField(name="console_warn_count"),
"console_error_count": IntegerDatabaseField(name="console_error_count"),
"size": IntegerDatabaseField(name="size"),
"event_count": IntegerDatabaseField(name="event_count"),
"message_count": IntegerDatabaseField(name="message_count"),
"pdi": LazyJoin(
from_field="distinct_id",
join_table=PersonDistinctIdsTable(),
Expand Down Expand Up @@ -77,6 +79,8 @@ def select_from_session_replay_events_table(requested_fields: Dict[str, List[str
"console_error_count": ast.Call(name="sum", args=[ast.Field(chain=[table_name, "console_error_count"])]),
"distinct_id": ast.Call(name="any", args=[ast.Field(chain=[table_name, "distinct_id"])]),
"size": ast.Call(name="sum", args=[ast.Field(chain=[table_name, "size"])]),
"event_count": ast.Call(name="sum", args=[ast.Field(chain=[table_name, "event_count"])]),
"message_count": ast.Call(name="sum", args=[ast.Field(chain=[table_name, "message_count"])]),
}

select_fields: List[ast.Expr] = []
Expand Down
32 changes: 32 additions & 0 deletions posthog/hogql/database/test/__snapshots__/test_database.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,14 @@
"key": "size",
"type": "integer"
},
{
"key": "event_count",
"type": "integer"
},
{
"key": "message_count",
"type": "integer"
},
{
"key": "pdi",
"type": "lazy_table",
Expand Down Expand Up @@ -405,6 +413,14 @@
"key": "size",
"type": "integer"
},
{
"key": "event_count",
"type": "integer"
},
{
"key": "message_count",
"type": "integer"
},
{
"key": "pdi",
"type": "lazy_table",
Expand Down Expand Up @@ -849,6 +865,14 @@
"key": "size",
"type": "integer"
},
{
"key": "event_count",
"type": "integer"
},
{
"key": "message_count",
"type": "integer"
},
{
"key": "pdi",
"type": "lazy_table",
Expand Down Expand Up @@ -978,6 +1002,14 @@
"key": "size",
"type": "integer"
},
{
"key": "event_count",
"type": "integer"
},
{
"key": "message_count",
"type": "integer"
},
{
"key": "pdi",
"type": "lazy_table",
Expand Down
26 changes: 26 additions & 0 deletions posthog/models/session_replay_event/migrations_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,29 @@
table_name=SESSION_REPLAY_EVENTS_DATA_TABLE(),
cluster=settings.CLICKHOUSE_CLUSTER,
)

# migration to add size column to the session replay table
ALTER_SESSION_REPLAY_ADD_EVENT_COUNT_COLUMN = """
ALTER TABLE {table_name} on CLUSTER '{cluster}'
ADD COLUMN IF NOT EXISTS message_count SimpleAggregateFunction(sum, Int64),
ADD COLUMN IF NOT EXISTS event_count SimpleAggregateFunction(sum, Int64),
-- fly by addition so that we can track lag in the data the same way as for other tables
ADD COLUMN IF NOT EXISTS _timestamp SimpleAggregateFunction(max, DateTime)
"""

ADD_EVENT_COUNT_DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL = (
lambda: ALTER_SESSION_REPLAY_ADD_EVENT_COUNT_COLUMN.format(
table_name="session_replay_events",
cluster=settings.CLICKHOUSE_CLUSTER,
)
)

ADD_EVENT_COUNT_WRITABLE_SESSION_REPLAY_EVENTS_TABLE_SQL = lambda: ALTER_SESSION_REPLAY_ADD_EVENT_COUNT_COLUMN.format(
table_name="writable_session_replay_events",
cluster=settings.CLICKHOUSE_CLUSTER,
)

ADD_EVENT_COUNT_SESSION_REPLAY_EVENTS_TABLE_SQL = lambda: ALTER_SESSION_REPLAY_ADD_EVENT_COUNT_COLUMN.format(
table_name=SESSION_REPLAY_EVENTS_DATA_TABLE(),
cluster=settings.CLICKHOUSE_CLUSTER,
)
20 changes: 17 additions & 3 deletions posthog/models/session_replay_event/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
console_log_count Int64,
console_warn_count Int64,
console_error_count Int64,
size Int64
size Int64,
event_count Int64,
message_count Int64
) ENGINE = {engine}
"""

Expand All @@ -53,7 +55,15 @@
console_warn_count SimpleAggregateFunction(sum, Int64),
console_error_count SimpleAggregateFunction(sum, Int64),
-- this column allows us to estimate the amount of data that is being ingested
size SimpleAggregateFunction(sum, Int64)
size SimpleAggregateFunction(sum, Int64),
-- this allows us to count the number of messages received in a session
-- often very useful in incidents or debugging
message_count SimpleAggregateFunction(sum, Int64),
-- this allows us to count the number of snapshot events received in a session
-- often very useful in incidents or debugging
-- because we batch events we expect message_count to be lower than event_count
event_count SimpleAggregateFunction(sum, Int64),
_timestamp SimpleAggregateFunction(max, DateTime)
) ENGINE = {engine}
"""

Expand Down Expand Up @@ -117,7 +127,11 @@
sum(console_log_count) as console_log_count,
sum(console_warn_count) as console_warn_count,
sum(console_error_count) as console_error_count,
sum(size) as size
sum(size) as size,
-- we can count the number of kafka messages instead of sending it explicitly
sum(message_count) as message_count,
sum(event_count) as event_count,
max(_timestamp) as _timestamp
FROM {database}.kafka_session_replay_events
group by session_id, team_id
""".format(
Expand Down

0 comments on commit 68a4e18

Please sign in to comment.