diff --git a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts index 94b930625b41d..470905f58ac2d 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts @@ -324,21 +324,29 @@ export const parseKafkaBatch = async ( continue } - const session_key = `${parsedMessage.team_id}:${parsedMessage.session_id}` - const existingMessage = parsedSessions.get(session_key) + const sessionKey = `${parsedMessage.team_id}:${parsedMessage.session_id}` + const existingMessage = parsedSessions.get(sessionKey) + if (existingMessage === undefined) { // First message for this session key, store it and continue looping for more - parsedSessions.set(session_key, parsedMessage) + parsedSessions.set(sessionKey, parsedMessage) continue } + // Shallow clone the eventsByWindowId object to make sure we don't mutate the original + const newEventsByWindowId: IncomingRecordingMessage['eventsByWindowId'] = Object.entries( + existingMessage.eventsByWindowId + ).reduce((acc, [windowId, events]) => { + acc[windowId] = [...events] + return acc + }, {} as IncomingRecordingMessage['eventsByWindowId']) + for (const [windowId, events] of Object.entries(parsedMessage.eventsByWindowId)) { - if (existingMessage.eventsByWindowId[windowId]) { - existingMessage.eventsByWindowId[windowId].push(...events) - } else { - existingMessage.eventsByWindowId[windowId] = events - } + newEventsByWindowId[windowId] = newEventsByWindowId[windowId] || [] + newEventsByWindowId[windowId].push(...events) } + + existingMessage.eventsByWindowId = newEventsByWindowId existingMessage.metadata.rawSize += parsedMessage.metadata.rawSize // Update the events ranges