Skip to content

Commit

Permalink
fix: Additional parsing of incoming messages (#17937)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored Oct 12, 2023
1 parent 5da50e6 commit 00c18b6
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as Sentry from '@sentry/node'
import { captureException } from '@sentry/node'
import { captureException, captureMessage } from '@sentry/node'
import { mkdirSync, rmSync } from 'node:fs'
import { CODES, features, librdkafkaVersion, Message, TopicPartition } from 'node-rdkafka'
import { Counter, Gauge, Histogram } from 'prom-client'
Expand All @@ -9,7 +9,7 @@ import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/ka
import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer'
import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config'
import { runInstrumentedFunction } from '../../../main/utils'
import { PipelineEvent, PluginsServerConfig, RawEventMessage, RedisPool, TeamId } from '../../../types'
import { PipelineEvent, PluginsServerConfig, RawEventMessage, RedisPool, RRWebEvent, TeamId } from '../../../types'
import { BackgroundRefresher } from '../../../utils/background-refresher'
import { PostgresRouter } from '../../../utils/db/postgres'
import { status } from '../../../utils/status'
Expand Down Expand Up @@ -276,7 +276,10 @@ export class SessionRecordingIngesterV2 {
return statusWarn('invalid_json', { error })
}

if (event.event !== '$snapshot_items' || !event.properties?.$snapshot_items?.length) {
const { $snapshot_items, $session_id, $window_id } = event.properties || {}

// NOTE: This is simple validation - ideally we should do proper schema based validation
if (event.event !== '$snapshot_items' || !$snapshot_items || !$session_id) {
status.warn('🙈', 'Received non-snapshot message, ignoring')
return
}
Expand Down Expand Up @@ -307,6 +310,34 @@ export class SessionRecordingIngesterV2 {
})
}

const invalidEvents: RRWebEvent[] = []
const events: RRWebEvent[] = $snapshot_items.filter((event: any) => {
if (!event.timestamp) {
invalidEvents.push(event)
return false
}
return true
})

if (invalidEvents.length) {
captureMessage('[session-manager]: invalid rrweb events filtered out from message', {
extra: { events: invalidEvents },
tags: {
team_id: teamId,
session_id: $session_id,
},
})
}

if (!events.length) {
status.warn('🙈', 'Event contained no valid rrweb events, ignoring')

return statusWarn('invalid_rrweb_events', {
token: messagePayload.token,
teamId: messagePayload.team_id,
})
}

const recordingMessage: IncomingRecordingMessage = {
metadata: {
partition: message.partition,
Expand All @@ -317,9 +348,9 @@ export class SessionRecordingIngesterV2 {

team_id: teamId,
distinct_id: messagePayload.distinct_id,
session_id: event.properties?.$session_id,
window_id: event.properties?.$window_id,
events: event.properties.$snapshot_items,
session_id: $session_id,
window_id: $window_id,
events: events,
}

return recordingMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,76 @@ describe('ingester', () => {
window_id: '018a47c2-2f4a-70a8-b480-5e52f5480448',
})
})

it('filters out invalid rrweb events', async () => {
const numeric_id = 12345

const createMessage = ($snapshot_items) => {
return {
value: Buffer.from(
JSON.stringify({
uuid: '018a47df-a0f6-7761-8635-439a0aa873bb',
distinct_id: String(numeric_id),
ip: '127.0.0.1',
site_url: 'http://127.0.0.1:8000',
data: JSON.stringify({
uuid: '018a47df-a0f6-7761-8635-439a0aa873bb',
event: '$snapshot_items',
properties: {
distinct_id: numeric_id,
$session_id: '018a47c2-2f4a-70a8-b480-5e51d8b8d070',
$window_id: '018a47c2-2f4a-70a8-b480-5e52f5480448',
$snapshot_items: $snapshot_items,
},
}),
token: 'the_token',
})
),
timestamp: 1,
size: 1,
topic: 'the_topic',
offset: 1,
partition: 1,
} satisfies Message
}

const parsedMessage = await ingester.parseKafkaMessage(
createMessage([
{
type: 6,
data: {},
timestamp: null,
},
]),
() => Promise.resolve(1)
)
expect(parsedMessage).toEqual(undefined)

const parsedMessage2 = await ingester.parseKafkaMessage(
createMessage([
{
type: 6,
data: {},
timestamp: null,
},
{
type: 6,
data: {},
timestamp: 123,
},
]),
() => Promise.resolve(1)
)
expect(parsedMessage2).toMatchObject({
events: [
{
data: {},
timestamp: 123,
type: 6,
},
],
})
})
})

describe('offset committing', () => {
Expand Down

0 comments on commit 00c18b6

Please sign in to comment.