diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts index 2471afc28b6c7..8e059d5814193 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts @@ -1,5 +1,5 @@ import * as Sentry from '@sentry/node' -import { captureException } from '@sentry/node' +import { captureException, captureMessage } from '@sentry/node' import { mkdirSync, rmSync } from 'node:fs' import { CODES, features, librdkafkaVersion, Message, TopicPartition } from 'node-rdkafka' import { Counter, Gauge, Histogram } from 'prom-client' @@ -9,7 +9,7 @@ import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/ka import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer' import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config' import { runInstrumentedFunction } from '../../../main/utils' -import { PipelineEvent, PluginsServerConfig, RawEventMessage, RedisPool, TeamId } from '../../../types' +import { PipelineEvent, PluginsServerConfig, RawEventMessage, RedisPool, RRWebEvent, TeamId } from '../../../types' import { BackgroundRefresher } from '../../../utils/background-refresher' import { PostgresRouter } from '../../../utils/db/postgres' import { status } from '../../../utils/status' @@ -276,7 +276,10 @@ export class SessionRecordingIngesterV2 { return statusWarn('invalid_json', { error }) } - if (event.event !== '$snapshot_items' || !event.properties?.$snapshot_items?.length) { + const { $snapshot_items, $session_id, $window_id } = event.properties || {} + + // NOTE: This is simple validation - ideally we should do proper schema based validation + if (event.event !== '$snapshot_items' || !$snapshot_items || !$session_id) { status.warn('🙈', 'Received non-snapshot message, ignoring') return } @@ -307,6 +310,34 @@ export class SessionRecordingIngesterV2 { }) } + const invalidEvents: RRWebEvent[] = [] + const events: RRWebEvent[] = $snapshot_items.filter((event: any) => { + if (!event.timestamp) { + invalidEvents.push(event) + return false + } + return true + }) + + if (invalidEvents.length) { + captureMessage('[session-manager]: invalid rrweb events filtered out from message', { + extra: { events: invalidEvents }, + tags: { + team_id: teamId, + session_id: $session_id, + }, + }) + } + + if (!events.length) { + status.warn('🙈', 'Event contained no valid rrweb events, ignoring') + + return statusWarn('invalid_rrweb_events', { + token: messagePayload.token, + teamId: messagePayload.team_id, + }) + } + const recordingMessage: IncomingRecordingMessage = { metadata: { partition: message.partition, @@ -317,9 +348,9 @@ export class SessionRecordingIngesterV2 { team_id: teamId, distinct_id: messagePayload.distinct_id, - session_id: event.properties?.$session_id, - window_id: event.properties?.$window_id, - events: event.properties.$snapshot_items, + session_id: $session_id, + window_id: $window_id, + events: events, } return recordingMessage diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v2.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v2.test.ts index ac943e896a95c..33364eaed9245 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v2.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v2.test.ts @@ -240,6 +240,76 @@ describe('ingester', () => { window_id: '018a47c2-2f4a-70a8-b480-5e52f5480448', }) }) + + it('filters out invalid rrweb events', async () => { + const numeric_id = 12345 + + const createMessage = ($snapshot_items) => { + return { + value: Buffer.from( + JSON.stringify({ + uuid: '018a47df-a0f6-7761-8635-439a0aa873bb', + distinct_id: String(numeric_id), + ip: '127.0.0.1', + site_url: 'http://127.0.0.1:8000', + data: JSON.stringify({ + uuid: '018a47df-a0f6-7761-8635-439a0aa873bb', + event: '$snapshot_items', + properties: { + distinct_id: numeric_id, + $session_id: '018a47c2-2f4a-70a8-b480-5e51d8b8d070', + $window_id: '018a47c2-2f4a-70a8-b480-5e52f5480448', + $snapshot_items: $snapshot_items, + }, + }), + token: 'the_token', + }) + ), + timestamp: 1, + size: 1, + topic: 'the_topic', + offset: 1, + partition: 1, + } satisfies Message + } + + const parsedMessage = await ingester.parseKafkaMessage( + createMessage([ + { + type: 6, + data: {}, + timestamp: null, + }, + ]), + () => Promise.resolve(1) + ) + expect(parsedMessage).toEqual(undefined) + + const parsedMessage2 = await ingester.parseKafkaMessage( + createMessage([ + { + type: 6, + data: {}, + timestamp: null, + }, + { + type: 6, + data: {}, + timestamp: 123, + }, + ]), + () => Promise.resolve(1) + ) + expect(parsedMessage2).toMatchObject({ + events: [ + { + data: {}, + timestamp: 123, + type: 6, + }, + ], + }) + }) }) describe('offset committing', () => {