Skip to content

Commit

Permalink
fix: Calls stack loop for parsing session replay message (#24801)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored Sep 5, 2024
1 parent 71df845 commit 5b47368
Showing 1 changed file with 16 additions and 8 deletions.
24 changes: 16 additions & 8 deletions plugin-server/src/main/ingestion-queues/session-recording/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -324,21 +324,29 @@ export const parseKafkaBatch = async (
continue
}

const session_key = `${parsedMessage.team_id}:${parsedMessage.session_id}`
const existingMessage = parsedSessions.get(session_key)
const sessionKey = `${parsedMessage.team_id}:${parsedMessage.session_id}`
const existingMessage = parsedSessions.get(sessionKey)

if (existingMessage === undefined) {
// First message for this session key, store it and continue looping for more
parsedSessions.set(session_key, parsedMessage)
parsedSessions.set(sessionKey, parsedMessage)
continue
}

// Shallow clone the eventsByWindowId object to make sure we don't mutate the original
const newEventsByWindowId: IncomingRecordingMessage['eventsByWindowId'] = Object.entries(
existingMessage.eventsByWindowId
).reduce((acc, [windowId, events]) => {
acc[windowId] = [...events]
return acc
}, {} as IncomingRecordingMessage['eventsByWindowId'])

for (const [windowId, events] of Object.entries(parsedMessage.eventsByWindowId)) {
if (existingMessage.eventsByWindowId[windowId]) {
existingMessage.eventsByWindowId[windowId].push(...events)
} else {
existingMessage.eventsByWindowId[windowId] = events
}
newEventsByWindowId[windowId] = newEventsByWindowId[windowId] || []
newEventsByWindowId[windowId].push(...events)
}

existingMessage.eventsByWindowId = newEventsByWindowId
existingMessage.metadata.rawSize += parsedMessage.metadata.rawSize

// Update the events ranges
Expand Down

0 comments on commit 5b47368

Please sign in to comment.