diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/partition-locker.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/partition-locker.ts index b97bcc1e90c67..cdc56f5efabdf 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/partition-locker.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/partition-locker.ts @@ -131,6 +131,7 @@ export class PartitionLocker { keys, }, }) + throw error } } } diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/realtime-manager.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/realtime-manager.ts index 7571ed0835f53..e2c4d50d79bf9 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/realtime-manager.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/realtime-manager.ts @@ -66,6 +66,7 @@ export class RealtimeManager extends EventEmitter { ) this.pubsubRedis?.disconnect() + this.pubsubRedis = undefined } private async run(description: string, fn: (client: Redis) => Promise): Promise { diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts index fd8115a30988f..df9437de004b5 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts @@ -13,6 +13,7 @@ import { PipelineEvent, PluginsServerConfig, RawEventMessage, RedisPool, TeamId import { BackgroundRefresher } from '../../../utils/background-refresher' import { PostgresRouter } from '../../../utils/db/postgres' import { status } from '../../../utils/status' +import { createRedisPool } from '../../../utils/utils' import { fetchTeamTokensWithRecordings } from '../../../worker/ingestion/team-manager' import { ObjectStorage } from '../../services/object_storage' import { addSentryBreadcrumbsEventListeners } from '../kafka-metrics' @@ -94,6 +95,7 @@ type PartitionMetrics = { } export class SessionRecordingIngesterV2 { + redisPool: RedisPool sessions: Record = {} offsetHighWaterMarker: OffsetHighWaterMarker realtimeManager: RealtimeManager @@ -112,10 +114,11 @@ export class SessionRecordingIngesterV2 { constructor( private serverConfig: PluginsServerConfig, private postgres: PostgresRouter, - private objectStorage: ObjectStorage, - private redisPool: RedisPool + private objectStorage: ObjectStorage ) { this.recordingConsumerConfig = sessionRecordingConsumerConfig(this.serverConfig) + this.redisPool = createRedisPool(this.serverConfig) + this.realtimeManager = new RealtimeManager(this.redisPool, this.recordingConsumerConfig) this.partitionLocker = new PartitionLocker( this.redisPool, @@ -509,24 +512,30 @@ export class SessionRecordingIngesterV2 { }) } - public async stop(): Promise { + public async stop(): Promise[]> { status.info('🔁', 'blob_ingester_consumer - stopping') if (this.partitionLockInterval) { clearInterval(this.partitionLockInterval) } - // Mark as stopping so that we don't actually process any more incoming messages, but still keep the process alive await this.batchConsumer?.stop() // Simulate a revoke command to try and flush all sessions // There is a race between the revoke callback and this function - Either way one of them gets there and covers the revocations void this.scheduleWork(this.onRevokePartitions(this.assignedTopicPartitions)) + void this.scheduleWork(this.realtimeManager.unsubscribe()) + void this.scheduleWork(this.replayEventsIngester.stop()) + + const promiseResults = await Promise.allSettled(this.promises) + + // Finally we clear up redis once we are sure everything else has been handled + await this.redisPool.drain() + await this.redisPool.clear() - await this.realtimeManager.unsubscribe() - await this.replayEventsIngester.stop() - await Promise.allSettled(this.promises) status.info('👍', 'blob_ingester_consumer - stopped!') + + return promiseResults } public isHealthy() { diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index eef7fdaa8b6de..30ef80768f985 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -17,7 +17,7 @@ import { captureEventLoopMetrics } from '../utils/metrics' import { cancelAllScheduledJobs } from '../utils/node-schedule' import { PubSub } from '../utils/pubsub' import { status } from '../utils/status' -import { createRedisPool, delay } from '../utils/utils' +import { delay } from '../utils/utils' import { OrganizationManager } from '../worker/ingestion/organization-manager' import { TeamManager } from '../worker/ingestion/team-manager' import Piscina, { makePiscina as defaultMakePiscina } from '../worker/piscina' @@ -420,27 +420,18 @@ export async function startPluginsServer( const statsd = hub?.statsd ?? createStatsdClient(serverConfig, null) const postgres = hub?.postgres ?? new PostgresRouter(serverConfig, statsd) const s3 = hub?.objectStorage ?? getObjectStorage(recordingConsumerConfig) - const redisPool = hub?.db.redisPool ?? createRedisPool(recordingConsumerConfig) if (!s3) { throw new Error("Can't start session recording blob ingestion without object storage") } // NOTE: We intentionally pass in the original serverConfig as the ingester uses both kafkas - const ingester = new SessionRecordingIngesterV2(serverConfig, postgres, s3, redisPool) + const ingester = new SessionRecordingIngesterV2(serverConfig, postgres, s3) await ingester.start() const batchConsumer = ingester.batchConsumer if (batchConsumer) { - stopSessionRecordingBlobConsumer = async () => { - // Tricky - in some cases the hub is responsible, in which case it will drain and clear. Otherwise we are responsible. - if (!hub?.db.redisPool) { - await redisPool.drain() - await redisPool.clear() - } - - await ingester.stop() - } + stopSessionRecordingBlobConsumer = () => ingester.stop() joinSessionRecordingBlobConsumer = () => batchConsumer.join() healthChecks['session-recordings-blob'] = () => ingester.isHealthy() ?? false } diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v2.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v2.test.ts index e983f90d3eb23..53cc8f019d861 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v2.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v2.test.ts @@ -72,6 +72,9 @@ describe('ingester', () => { const team = await getFirstTeam(hub) teamToken = team.api_token await deleteKeysWithPrefix(hub) + + ingester = new SessionRecordingIngesterV2(config, hub.postgres, hub.objectStorage) + await ingester.start() }) afterEach(async () => { @@ -86,12 +89,6 @@ describe('ingester', () => { jest.useRealTimers() }) - // these tests assume that a flush won't run while they run - beforeEach(async () => { - ingester = new SessionRecordingIngesterV2(config, hub.postgres, hub.objectStorage, hub.redisPool) - await ingester.start() - }) - it('creates a new session manager if needed', async () => { const event = createIncomingRecordingMessage() await ingester.consume(event) @@ -339,7 +336,7 @@ describe('ingester', () => { jest.setTimeout(5000) // Increased to cover lock delay beforeEach(async () => { - otherIngester = new SessionRecordingIngesterV2(config, hub.postgres, hub.objectStorage, hub.redisPool) + otherIngester = new SessionRecordingIngesterV2(config, hub.postgres, hub.objectStorage) await otherIngester.start() }) @@ -443,4 +440,62 @@ describe('ingester', () => { ).toEqual(['2:session_id_4:1']) }) }) + + describe('stop()', () => { + const setup = async (): Promise => { + const partitionMsgs1 = [ + createKafkaMessage( + teamToken, + { + partition: 1, + offset: 1, + }, + { + $session_id: 'session_id_1', + } + ), + + createKafkaMessage( + teamToken, + { + partition: 1, + offset: 2, + }, + { + $session_id: 'session_id_2', + } + ), + ] + + await ingester.onAssignPartitions([createTP(1)]) + await ingester.handleEachBatch(partitionMsgs1) + } + + // NOTE: This test is a sanity check for the follow up test. It demonstrates what happens if we shutdown in the wrong order + // It doesn't reliably work though as the onRevoke is called via the kafka lib ending up with dangling promises so rather it is here as a reminder + // demonstation for when we need it + it.skip('shuts down with error if redis forcefully shutdown', async () => { + await setup() + + await ingester.redisPool.drain() + await ingester.redisPool.clear() + + // revoke, realtime unsub, replay stop + await expect(ingester.stop()).resolves.toMatchObject([ + { status: 'rejected' }, + { status: 'fulfilled' }, + { status: 'fulfilled' }, + ]) + }) + it('shuts down without error', async () => { + await setup() + + // revoke, realtime unsub, replay stop + await expect(ingester.stop()).resolves.toMatchObject([ + { status: 'fulfilled' }, + { status: 'fulfilled' }, + { status: 'fulfilled' }, + ]) + }) + }) })