Skip to content

Commit

Permalink
Added scheduling code to ensure we await all promises on shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite committed Sep 21, 2023
1 parent c7a797d commit 9058102
Showing 1 changed file with 64 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ export class SessionRecordingIngesterV2 {
recordingConsumerConfig: PluginsServerConfig
topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS

private promises: Promise<any>[] = []

constructor(
private serverConfig: PluginsServerConfig,
private postgres: PostgresRouter,
Expand Down Expand Up @@ -140,21 +142,21 @@ export class SessionRecordingIngesterV2 {

this.offsetsRefresher = new BackgroundRefresher(async () => {
const results = await Promise.all(
Object.keys(this.partitionAssignments).map(async (partition) => {
this.assignedTopicPartitions.map(async ({ partition }) => {
return new Promise<[number, number]>((resolve, reject) => {
if (!this.batchConsumer) {
return reject('Not connected')
}
this.batchConsumer.consumer.queryWatermarkOffsets(
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
parseInt(partition),
partition,
(err, offsets) => {
if (err) {
status.error('🔥', 'Failed to query kafka watermark offsets', err)
return reject()
}

resolve([parseInt(partition), offsets.highOffset])
resolve([partition, offsets.highOffset])
}
)
})
Expand All @@ -168,6 +170,24 @@ export class SessionRecordingIngesterV2 {
}, 5000)
}

private get assignedTopicPartitions(): TopicPartition[] {
return Object.keys(this.partitionAssignments).map((partition) => ({
partition: parseInt(partition),
topic: this.topic,
}))
}

private scheduleWork<T>(promise: Promise<T>): Promise<T> {
/**
* Helper to handle graceful shutdowns. Every time we do some work we add a promise to this array and remove it when finished.
* That way when shutting down we can wait for all promises to finish before exiting.
*/
this.promises.push(promise)
promise.finally(() => (this.promises = this.promises.filter((p) => p !== promise)))

return promise
}

public async consume(event: IncomingRecordingMessage, sentrySpan?: Sentry.Span): Promise<void> {
// we have to reset this counter once we're consuming messages since then we know we're not re-balancing
// otherwise the consumer continues to report however many sessions were revoked at the last re-balance forever
Expand Down Expand Up @@ -426,12 +446,7 @@ export class SessionRecordingIngesterV2 {

if (this.serverConfig.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) {
this.partitionLockInterval = setInterval(async () => {
await this.partitionLocker.claim(
Object.keys(this.partitionAssignments).map((partition) => ({
partition: parseInt(partition),
topic: this.topic,
}))
)
await this.partitionLocker.claim(this.assignedTopicPartitions)
}, PARTITION_LOCK_INTERVAL_MS)
}

Expand Down Expand Up @@ -478,7 +493,7 @@ export class SessionRecordingIngesterV2 {
}

if (err.code === CODES.ERRORS.ERR__REVOKE_PARTITIONS) {
return this.onRevokePartitions(topicPartitions)
return this.scheduleWork(this.onRevokePartitions(topicPartitions))
}

// We had a "real" error
Expand Down Expand Up @@ -510,23 +525,12 @@ export class SessionRecordingIngesterV2 {
await this.batchConsumer?.stop()

// Simulate a revoke command to try and flush all sessions
// The rebalance event should have done this but we do it again as an extra precaution and to await the flushes
await this.onRevokePartitions(
Object.keys(this.partitionAssignments).map((partition) => ({
partition: parseInt(partition),
topic: this.topic,
})) as TopicPartition[]
)
// There is a race between the revoke callback and this function - Either way one of them gets there and covers the revocations
void this.scheduleWork(this.onRevokePartitions(this.assignedTopicPartitions))

await this.realtimeManager.unsubscribe()
await this.replayEventsIngester.stop()

// This is inefficient but currently necessary due to new instances restarting from the committed offset point
await this.destroySessions(Object.entries(this.sessions))

this.sessions = {}

gaugeRealtimeSessions.reset()
await Promise.allSettled(this.promises)
}

public isHealthy() {
Expand All @@ -549,54 +553,68 @@ export class SessionRecordingIngesterV2 {
* As a result, we need to drop all sessions currently managed for the revoked partitions
*/

/**
* IDEA
*
* 1. Pull out all sessions we are revoking.
* 2. Reset all the relevant metrics - that way we don't have to worry about races
* 3. Lock all the partitions we are revoking - gives us N seconds to deal with them
* 4. Flush all the sessions (probably with a timeout)
* 5. We can safely destroy them as they aren't pat of the sessions map
*/

const revokedPartitions = topicPartitions.map((x) => x.partition)
if (!revokedPartitions.length) {
return
}

const sessionsToDrop = Object.entries(this.sessions).filter(([_, sessionManager]) =>
revokedPartitions.includes(sessionManager.partition)
)
const sessionsToDrop: SessionManager[] = []

// First we pull out all sessions that are being dropped. This way if we get reassigned and start consuming, we don't accidentally destroy them
Object.entries(this.sessions).forEach(([key, sessionManager]) => {
if (revokedPartitions.includes(sessionManager.partition)) {
sessionsToDrop.push(sessionManager)
delete this.sessions[key]
}
})

// Reset all metrics for the revoked partitions
topicPartitions.forEach((topicPartition: TopicPartition) => {
const partition = topicPartition.partition

delete this.partitionAssignments[partition]
gaugeLag.remove({ partition })
gaugeLagMilliseconds.remove({ partition })
gaugeOffsetCommitted.remove({ partition })
gaugeOffsetCommitFailed.remove({ partition })
this.offsetHighWaterMarker.revoke(topicPartition)
})

gaugeSessionsRevoked.set(sessionsToDrop.length)
gaugeSessionsHandled.remove()

// Attempt to flush all sessions
// TODO: Improve this to
// - work from oldest to newest
// - have some sort of timeout so we don't get stuck here forever
if (this.serverConfig.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) {
// Extend our claim on these partitions to give us time to flush
await this.partitionLocker.claim(topicPartitions)
status.info('🔁', `blob_ingester_consumer - flushing ${sessionsToDrop.length} sessions on revoke...`)

// Flush all the sessions we are supposed to drop
await runInstrumentedFunction({
statsKey: `recordingingester.onRevokePartitions.flushSessions`,
logExecutionTime: true,
func: async () => {
await Promise.allSettled(
sessionsToDrop
.map(([_, x]) => x)
.sort((x) => x.buffer.oldestKafkaTimestamp ?? Infinity)
.map((x) => x.flush('partition_shutdown'))
)
},
})
}

topicPartitions.forEach((topicPartition: TopicPartition) => {
const partition = topicPartition.partition

delete this.partitionAssignments[partition]
gaugeLag.remove({ partition })
gaugeLagMilliseconds.remove({ partition })
gaugeOffsetCommitted.remove({ partition })
gaugeOffsetCommitFailed.remove({ partition })
this.offsetHighWaterMarker.revoke(topicPartition)
})

if (this.serverConfig.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) {
await this.partitionLocker.release(topicPartitions)
}
await this.destroySessions(sessionsToDrop)

await Promise.allSettled(sessionsToDrop.map((x) => x.destroy()))
await this.offsetsRefresher.refresh()
}

Expand Down

0 comments on commit 9058102

Please sign in to comment.