Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: count replay events in ClickHouse as we ingest them #16994

Merged
merged 30 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
ff8b2e9
feat: count replay events in ClickHouse as we ingest them
pauldambra Aug 10, 2023
3e8af12
Add to hogql db schema
pauldambra Aug 10, 2023
bee57bb
Update query snapshots
github-actions[bot] Aug 10, 2023
fe2a25a
Update query snapshots
github-actions[bot] Aug 10, 2023
2064f1b
Update query snapshots
github-actions[bot] Aug 10, 2023
b8af913
don't need it on kafka table
pauldambra Aug 10, 2023
906bbce
Update query snapshots
github-actions[bot] Aug 10, 2023
564ad68
Merge branch 'master' into feat/count-replay-events
pauldambra Sep 11, 2023
7c776b8
update desired columns
pauldambra Sep 11, 2023
66d5b9d
switch to counting events and messages
pauldambra Sep 11, 2023
6d89f3c
Update query snapshots
github-actions[bot] Sep 11, 2023
c6f0844
Merge branch 'master' into feat/count-replay-events
pauldambra Sep 12, 2023
e56a1ee
first pass addition of _timestamp
pauldambra Sep 12, 2023
dee3732
maybe like this
pauldambra Sep 12, 2023
4d7e4c2
like this?
pauldambra Sep 12, 2023
4a2223c
Update query snapshots
github-actions[bot] Sep 12, 2023
c558ba2
Merge branch 'master' into feat/count-replay-events
pauldambra Sep 13, 2023
50c5097
explicit message count
pauldambra Sep 13, 2023
72e5b7b
Update query snapshots
github-actions[bot] Sep 13, 2023
e9deb66
Update query snapshots
github-actions[bot] Sep 13, 2023
4ea881f
Update query snapshots
github-actions[bot] Sep 13, 2023
b8bfffe
Update query snapshots
github-actions[bot] Sep 13, 2023
d946483
Update UI snapshots for `chromium` (2)
github-actions[bot] Sep 13, 2023
3304e71
Update query snapshots
github-actions[bot] Sep 13, 2023
9990557
hogql db schema too
pauldambra Sep 13, 2023
0dc7fdf
Update query snapshots
github-actions[bot] Sep 13, 2023
ec31d81
Update UI snapshots for `chromium` (2)
github-actions[bot] Sep 13, 2023
7faefca
Merge branch 'master' into feat/count-replay-events
pauldambra Sep 13, 2023
129ae94
fix
pauldambra Sep 13, 2023
98ba0af
Merge branch 'master' into feat/count-replay-events
pauldambra Sep 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 2 additions & 0 deletions plugin-server/src/worker/ingestion/process-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ export interface SummarizedSessionRecordingEvent {
console_warn_count: number
console_error_count: number
size: number
event_count: number
}

export const createSessionReplayEvent = (
Expand Down Expand Up @@ -355,6 +356,7 @@ export const createSessionReplayEvent = (
console_warn_count: consoleWarnCount,
console_error_count: consoleErrorCount,
size: Buffer.byteLength(JSON.stringify(events), 'utf8'),
event_count: events.length,
}

return data
Expand Down
6 changes: 6 additions & 0 deletions plugin-server/tests/main/process-event.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1236,6 +1236,7 @@ const sessionReplayEventTestCases: {
| 'console_warn_count'
| 'console_error_count'
| 'size'
| 'event_count'
>
}[] = [
{
Expand All @@ -1252,6 +1253,7 @@ const sessionReplayEventTestCases: {
console_warn_count: 0,
console_error_count: 0,
size: 73,
event_count: 1,
},
},
{
Expand All @@ -1268,6 +1270,7 @@ const sessionReplayEventTestCases: {
console_warn_count: 0,
console_error_count: 0,
size: 73,
event_count: 1,
},
},
{
Expand Down Expand Up @@ -1324,6 +1327,7 @@ const sessionReplayEventTestCases: {
console_warn_count: 3,
console_error_count: 1,
size: 762,
event_count: 7,
},
},
{
Expand Down Expand Up @@ -1362,6 +1366,7 @@ const sessionReplayEventTestCases: {
console_warn_count: 0,
console_error_count: 0,
size: 213,
event_count: 2,
},
},
{
Expand Down Expand Up @@ -1389,6 +1394,7 @@ const sessionReplayEventTestCases: {
console_warn_count: 0,
console_error_count: 0,
size: 433,
event_count: 6,
},
},
]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.models.session_replay_event.migrations_sql import (
DROP_SESSION_REPLAY_EVENTS_TABLE_MV_SQL,
DROP_KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL,
ADD_EVENT_COUNT_WRITABLE_SESSION_REPLAY_EVENTS_TABLE_SQL,
ADD_EVENT_COUNT_DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL,
ADD_EVENT_COUNT_SESSION_REPLAY_EVENTS_TABLE_SQL,
)
from posthog.models.session_replay_event.sql import (
SESSION_REPLAY_EVENTS_TABLE_MV_SQL,
KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL,
)

operations = [
# we have to drop materialized view first so that we're no longer pulling from kakfa
# then we drop the kafka table
run_sql_with_exceptions(DROP_SESSION_REPLAY_EVENTS_TABLE_MV_SQL()),
run_sql_with_exceptions(DROP_KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL()),
# now we can alter the target tables
run_sql_with_exceptions(ADD_EVENT_COUNT_WRITABLE_SESSION_REPLAY_EVENTS_TABLE_SQL()),
run_sql_with_exceptions(ADD_EVENT_COUNT_DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL()),
run_sql_with_exceptions(ADD_EVENT_COUNT_SESSION_REPLAY_EVENTS_TABLE_SQL()),
# and then recreate the materialized views and kafka tables
run_sql_with_exceptions(KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL()),
run_sql_with_exceptions(SESSION_REPLAY_EVENTS_TABLE_MV_SQL()),
]
38 changes: 32 additions & 6 deletions posthog/clickhouse/test/__snapshots__/test_schema.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@
console_log_count Int64,
console_warn_count Int64,
console_error_count Int64,
size Int64
size Int64,
event_count Int64
) ENGINE = Kafka('test.kafka.broker:9092', 'clickhouse_session_replay_events_test', 'group1', 'JSONEachRow')

'
Expand Down Expand Up @@ -922,7 +923,8 @@
console_log_count Int64,
console_warn_count Int64,
console_error_count Int64,
size Int64
size Int64,
event_count Int64
) ENGINE = Kafka('kafka:9092', 'clickhouse_session_replay_events_test', 'group1', 'JSONEachRow')

'
Expand Down Expand Up @@ -1344,7 +1346,14 @@
console_warn_count SimpleAggregateFunction(sum, Int64),
console_error_count SimpleAggregateFunction(sum, Int64),
-- this column allows us to estimate the amount of data that is being ingested
size SimpleAggregateFunction(sum, Int64)
size SimpleAggregateFunction(sum, Int64),
-- this allows us to count the number of messages received in a session
-- often very useful in incidents or debugging
message_count SimpleAggregateFunction(sum, Int64)
-- this allows us to count the number of snapshot events received in a session
-- often very useful in incidents or debugging
-- because we batch events we expect message_count to be lower than event_count
event_count SimpleAggregateFunction(sum, Int64)
) ENGINE = Distributed('posthog', 'posthog_test', 'sharded_session_replay_events', sipHash64(distinct_id))

'
Expand Down Expand Up @@ -1377,7 +1386,10 @@
sum(console_log_count) as console_log_count,
sum(console_warn_count) as console_warn_count,
sum(console_error_count) as console_error_count,
sum(size) as size
sum(size) as size,
-- we can count the number of kafka messages instead of sending it explicitly
count(*) as message_count,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is clever... that's not always a good thing 🤣

Is it better to explicitly add message_count: 1 in the plugin server to aid the future traveller?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be a bit more clear with message_count: 1

sum(event_count) as event_count
FROM posthog_test.kafka_session_replay_events
group by session_id, team_id

Expand Down Expand Up @@ -1608,7 +1620,14 @@
console_warn_count SimpleAggregateFunction(sum, Int64),
console_error_count SimpleAggregateFunction(sum, Int64),
-- this column allows us to estimate the amount of data that is being ingested
size SimpleAggregateFunction(sum, Int64)
size SimpleAggregateFunction(sum, Int64),
-- this allows us to count the number of messages received in a session
-- often very useful in incidents or debugging
message_count SimpleAggregateFunction(sum, Int64)
-- this allows us to count the number of snapshot events received in a session
-- often very useful in incidents or debugging
-- because we batch events we expect message_count to be lower than event_count
event_count SimpleAggregateFunction(sum, Int64)
) ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_{shard}/posthog.session_replay_events', '{replica}')

PARTITION BY toYYYYMM(min_first_timestamp)
Expand Down Expand Up @@ -2226,7 +2245,14 @@
console_warn_count SimpleAggregateFunction(sum, Int64),
console_error_count SimpleAggregateFunction(sum, Int64),
-- this column allows us to estimate the amount of data that is being ingested
size SimpleAggregateFunction(sum, Int64)
size SimpleAggregateFunction(sum, Int64),
-- this allows us to count the number of messages received in a session
-- often very useful in incidents or debugging
message_count SimpleAggregateFunction(sum, Int64)
-- this allows us to count the number of snapshot events received in a session
-- often very useful in incidents or debugging
-- because we batch events we expect message_count to be lower than event_count
event_count SimpleAggregateFunction(sum, Int64)
) ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_{shard}/posthog.session_replay_events', '{replica}')

PARTITION BY toYYYYMM(min_first_timestamp)
Expand Down
2 changes: 2 additions & 0 deletions posthog/hogql/database/schema/session_replay_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"console_warn_count": IntegerDatabaseField(name="console_warn_count"),
"console_error_count": IntegerDatabaseField(name="console_error_count"),
"size": IntegerDatabaseField(name="size"),
"event_count": IntegerDatabaseField(name="event_count"),
"pdi": LazyJoin(
from_field="distinct_id",
join_table=PersonDistinctIdsTable(),
Expand Down Expand Up @@ -77,6 +78,7 @@ def select_from_session_replay_events_table(requested_fields: Dict[str, List[str
"console_error_count": ast.Call(name="sum", args=[ast.Field(chain=[table_name, "console_error_count"])]),
"distinct_id": ast.Call(name="any", args=[ast.Field(chain=[table_name, "distinct_id"])]),
"size": ast.Call(name="sum", args=[ast.Field(chain=[table_name, "size"])]),
"event_count": ast.Call(name="sum", args=[ast.Field(chain=[table_name, "event_count"])]),
}

select_fields: List[ast.Expr] = []
Expand Down
16 changes: 16 additions & 0 deletions posthog/hogql/database/test/__snapshots__/test_database.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@
"key": "size",
"type": "integer"
},
{
"key": "event_count",
"type": "integer"
},
{
"key": "pdi",
"type": "lazy_table",
Expand Down Expand Up @@ -405,6 +409,10 @@
"key": "size",
"type": "integer"
},
{
"key": "event_count",
"type": "integer"
},
{
"key": "pdi",
"type": "lazy_table",
Expand Down Expand Up @@ -849,6 +857,10 @@
"key": "size",
"type": "integer"
},
{
"key": "event_count",
"type": "integer"
},
{
"key": "pdi",
"type": "lazy_table",
Expand Down Expand Up @@ -978,6 +990,10 @@
"key": "size",
"type": "integer"
},
{
"key": "event_count",
"type": "integer"
},
{
"key": "pdi",
"type": "lazy_table",
Expand Down
24 changes: 24 additions & 0 deletions posthog/models/session_replay_event/migrations_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,27 @@
table_name=SESSION_REPLAY_EVENTS_DATA_TABLE(),
cluster=settings.CLICKHOUSE_CLUSTER,
)

# migration to add size column to the session replay table
ALTER_SESSION_REPLAY_ADD_EVENT_COUNT_COLUMN = """
ALTER TABLE {table_name} on CLUSTER '{cluster}'
ADD COLUMN IF NOT EXISTS message_count SimpleAggregateFunction(sum, Int64),
ADD COLUMN IF NOT EXISTS event_count SimpleAggregateFunction(sum, Int64)
"""

ADD_EVENT_COUNT_DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL = (
lambda: ALTER_SESSION_REPLAY_ADD_EVENT_COUNT_COLUMN.format(
table_name="session_replay_events",
cluster=settings.CLICKHOUSE_CLUSTER,
)
)

ADD_EVENT_COUNT_WRITABLE_SESSION_REPLAY_EVENTS_TABLE_SQL = lambda: ALTER_SESSION_REPLAY_ADD_EVENT_COUNT_COLUMN.format(
table_name="writable_session_replay_events",
cluster=settings.CLICKHOUSE_CLUSTER,
)

ADD_EVENT_COUNT_SESSION_REPLAY_EVENTS_TABLE_SQL = lambda: ALTER_SESSION_REPLAY_ADD_EVENT_COUNT_COLUMN.format(
table_name=SESSION_REPLAY_EVENTS_DATA_TABLE(),
cluster=settings.CLICKHOUSE_CLUSTER,
)
17 changes: 14 additions & 3 deletions posthog/models/session_replay_event/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
console_log_count Int64,
console_warn_count Int64,
console_error_count Int64,
size Int64
size Int64,
event_count Int64
) ENGINE = {engine}
"""

Expand All @@ -53,7 +54,14 @@
console_warn_count SimpleAggregateFunction(sum, Int64),
console_error_count SimpleAggregateFunction(sum, Int64),
-- this column allows us to estimate the amount of data that is being ingested
size SimpleAggregateFunction(sum, Int64)
size SimpleAggregateFunction(sum, Int64),
-- this allows us to count the number of messages received in a session
-- often very useful in incidents or debugging
message_count SimpleAggregateFunction(sum, Int64),
-- this allows us to count the number of snapshot events received in a session
-- often very useful in incidents or debugging
-- because we batch events we expect message_count to be lower than event_count
event_count SimpleAggregateFunction(sum, Int64)
) ENGINE = {engine}
"""

Expand Down Expand Up @@ -117,7 +125,10 @@
sum(console_log_count) as console_log_count,
sum(console_warn_count) as console_warn_count,
sum(console_error_count) as console_error_count,
sum(size) as size
sum(size) as size,
-- we can count the number of kafka messages instead of sending it explicitly
count(*) as message_count,
sum(event_count) as event_count
FROM {database}.kafka_session_replay_events
group by session_id, team_id
""".format(
Expand Down