Skip to content

Commit

Permalink
fix: Wait for ingester shutdown before redis shutdown (#17594)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored Sep 25, 2023
1 parent d9388ad commit cbe3a95
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ export class PartitionLocker {
keys,
},
})
throw error
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ export class RealtimeManager extends EventEmitter {
)

this.pubsubRedis?.disconnect()
this.pubsubRedis = undefined
}

private async run<T>(description: string, fn: (client: Redis) => Promise<T>): Promise<T | null> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { PipelineEvent, PluginsServerConfig, RawEventMessage, RedisPool, TeamId
import { BackgroundRefresher } from '../../../utils/background-refresher'
import { PostgresRouter } from '../../../utils/db/postgres'
import { status } from '../../../utils/status'
import { createRedisPool } from '../../../utils/utils'
import { fetchTeamTokensWithRecordings } from '../../../worker/ingestion/team-manager'
import { ObjectStorage } from '../../services/object_storage'
import { addSentryBreadcrumbsEventListeners } from '../kafka-metrics'
Expand Down Expand Up @@ -94,6 +95,7 @@ type PartitionMetrics = {
}

export class SessionRecordingIngesterV2 {
redisPool: RedisPool
sessions: Record<string, SessionManager> = {}
offsetHighWaterMarker: OffsetHighWaterMarker
realtimeManager: RealtimeManager
Expand All @@ -112,10 +114,11 @@ export class SessionRecordingIngesterV2 {
constructor(
private serverConfig: PluginsServerConfig,
private postgres: PostgresRouter,
private objectStorage: ObjectStorage,
private redisPool: RedisPool
private objectStorage: ObjectStorage
) {
this.recordingConsumerConfig = sessionRecordingConsumerConfig(this.serverConfig)
this.redisPool = createRedisPool(this.serverConfig)

this.realtimeManager = new RealtimeManager(this.redisPool, this.recordingConsumerConfig)
this.partitionLocker = new PartitionLocker(
this.redisPool,
Expand Down Expand Up @@ -509,24 +512,30 @@ export class SessionRecordingIngesterV2 {
})
}

public async stop(): Promise<void> {
public async stop(): Promise<PromiseSettledResult<any>[]> {
status.info('🔁', 'blob_ingester_consumer - stopping')

if (this.partitionLockInterval) {
clearInterval(this.partitionLockInterval)
}

// Mark as stopping so that we don't actually process any more incoming messages, but still keep the process alive
await this.batchConsumer?.stop()

// Simulate a revoke command to try and flush all sessions
// There is a race between the revoke callback and this function - Either way one of them gets there and covers the revocations
void this.scheduleWork(this.onRevokePartitions(this.assignedTopicPartitions))
void this.scheduleWork(this.realtimeManager.unsubscribe())
void this.scheduleWork(this.replayEventsIngester.stop())

const promiseResults = await Promise.allSettled(this.promises)

// Finally we clear up redis once we are sure everything else has been handled
await this.redisPool.drain()
await this.redisPool.clear()

await this.realtimeManager.unsubscribe()
await this.replayEventsIngester.stop()
await Promise.allSettled(this.promises)
status.info('👍', 'blob_ingester_consumer - stopped!')

return promiseResults
}

public isHealthy() {
Expand Down
15 changes: 3 additions & 12 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { captureEventLoopMetrics } from '../utils/metrics'
import { cancelAllScheduledJobs } from '../utils/node-schedule'
import { PubSub } from '../utils/pubsub'
import { status } from '../utils/status'
import { createRedisPool, delay } from '../utils/utils'
import { delay } from '../utils/utils'
import { OrganizationManager } from '../worker/ingestion/organization-manager'
import { TeamManager } from '../worker/ingestion/team-manager'
import Piscina, { makePiscina as defaultMakePiscina } from '../worker/piscina'
Expand Down Expand Up @@ -420,27 +420,18 @@ export async function startPluginsServer(
const statsd = hub?.statsd ?? createStatsdClient(serverConfig, null)
const postgres = hub?.postgres ?? new PostgresRouter(serverConfig, statsd)
const s3 = hub?.objectStorage ?? getObjectStorage(recordingConsumerConfig)
const redisPool = hub?.db.redisPool ?? createRedisPool(recordingConsumerConfig)

if (!s3) {
throw new Error("Can't start session recording blob ingestion without object storage")
}
// NOTE: We intentionally pass in the original serverConfig as the ingester uses both kafkas
const ingester = new SessionRecordingIngesterV2(serverConfig, postgres, s3, redisPool)
const ingester = new SessionRecordingIngesterV2(serverConfig, postgres, s3)
await ingester.start()

const batchConsumer = ingester.batchConsumer

if (batchConsumer) {
stopSessionRecordingBlobConsumer = async () => {
// Tricky - in some cases the hub is responsible, in which case it will drain and clear. Otherwise we are responsible.
if (!hub?.db.redisPool) {
await redisPool.drain()
await redisPool.clear()
}

await ingester.stop()
}
stopSessionRecordingBlobConsumer = () => ingester.stop()
joinSessionRecordingBlobConsumer = () => batchConsumer.join()
healthChecks['session-recordings-blob'] = () => ingester.isHealthy() ?? false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ describe('ingester', () => {
const team = await getFirstTeam(hub)
teamToken = team.api_token
await deleteKeysWithPrefix(hub)

ingester = new SessionRecordingIngesterV2(config, hub.postgres, hub.objectStorage)
await ingester.start()
})

afterEach(async () => {
Expand All @@ -86,12 +89,6 @@ describe('ingester', () => {
jest.useRealTimers()
})

// these tests assume that a flush won't run while they run
beforeEach(async () => {
ingester = new SessionRecordingIngesterV2(config, hub.postgres, hub.objectStorage, hub.redisPool)
await ingester.start()
})

it('creates a new session manager if needed', async () => {
const event = createIncomingRecordingMessage()
await ingester.consume(event)
Expand Down Expand Up @@ -339,7 +336,7 @@ describe('ingester', () => {
jest.setTimeout(5000) // Increased to cover lock delay

beforeEach(async () => {
otherIngester = new SessionRecordingIngesterV2(config, hub.postgres, hub.objectStorage, hub.redisPool)
otherIngester = new SessionRecordingIngesterV2(config, hub.postgres, hub.objectStorage)
await otherIngester.start()
})

Expand Down Expand Up @@ -443,4 +440,62 @@ describe('ingester', () => {
).toEqual(['2:session_id_4:1'])
})
})

describe('stop()', () => {
const setup = async (): Promise<void> => {
const partitionMsgs1 = [
createKafkaMessage(
teamToken,
{
partition: 1,
offset: 1,
},
{
$session_id: 'session_id_1',
}
),

createKafkaMessage(
teamToken,
{
partition: 1,
offset: 2,
},
{
$session_id: 'session_id_2',
}
),
]

await ingester.onAssignPartitions([createTP(1)])
await ingester.handleEachBatch(partitionMsgs1)
}

// NOTE: This test is a sanity check for the follow up test. It demonstrates what happens if we shutdown in the wrong order
// It doesn't reliably work though as the onRevoke is called via the kafka lib ending up with dangling promises so rather it is here as a reminder
// demonstation for when we need it
it.skip('shuts down with error if redis forcefully shutdown', async () => {
await setup()

await ingester.redisPool.drain()
await ingester.redisPool.clear()

// revoke, realtime unsub, replay stop
await expect(ingester.stop()).resolves.toMatchObject([
{ status: 'rejected' },
{ status: 'fulfilled' },
{ status: 'fulfilled' },
])
})
it('shuts down without error', async () => {
await setup()

// revoke, realtime unsub, replay stop
await expect(ingester.stop()).resolves.toMatchObject([
{ status: 'fulfilled' },
{ status: 'fulfilled' },
{ status: 'fulfilled' },
])
})
})
})

0 comments on commit cbe3a95

Please sign in to comment.