Skip to content

Commit

Permalink
fix(plugin-server): consumer loop throwing trigger a shutdown (#18073)
Browse files Browse the repository at this point in the history
* fix(plugin-server): join analytics ingestion server at top level

* Update plugin-server/src/main/pluginsServer.ts

Co-authored-by: Xavier Vello <[email protected]>

---------

Co-authored-by: Xavier Vello <[email protected]>
  • Loading branch information
bretthoerner and xvello authored Oct 19, 2023
1 parent bae6be9 commit dec7f41
Showing 1 changed file with 21 additions and 32 deletions.
53 changes: 21 additions & 32 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as Sentry from '@sentry/node'
import fs from 'fs'
import { Server } from 'http'
import { BatchConsumer } from 'kafka/batch-consumer'
import { CompressionCodecs, CompressionTypes, Consumer, KafkaJSProtocolError } from 'kafkajs'
// @ts-expect-error no type definitions
import SnappyCodec from 'kafkajs-snappy'
Expand Down Expand Up @@ -99,10 +100,7 @@ export async function startPluginsServer(
// (default 60 seconds) to allow for the person to be created in the
// meantime.
let bufferConsumer: Consumer | undefined
let stopSessionRecordingEventsConsumer: (() => void) | undefined
let stopSessionRecordingBlobConsumer: (() => void) | undefined
let joinSessionRecordingEventsConsumer: ((timeout?: number) => Promise<void>) | undefined
let joinSessionRecordingBlobConsumer: ((timeout?: number) => Promise<void>) | undefined
let jobsConsumer: Consumer | undefined
let schedulerTasksConsumer: Consumer | undefined

Expand Down Expand Up @@ -142,7 +140,6 @@ export async function startPluginsServer(
stopWebhooksHandlerConsumer?.(),
bufferConsumer?.disconnect(),
jobsConsumer?.disconnect(),
stopSessionRecordingEventsConsumer?.(),
stopSessionRecordingBlobConsumer?.(),
schedulerTasksConsumer?.disconnect(),
])
Expand All @@ -152,8 +149,19 @@ export async function startPluginsServer(
}

await closeHub?.()
}

status.info('👋', 'Over and out!')
// If join rejects or throws, then the consumer is unhealthy and we should shut down the process.
// Ideally we would also join all the other background tasks as well to ensure we stop the
// server if we hit any errors and don't end up with zombie instances, but I'll leave that
// refactoring for another time. Note that we have the liveness health checks already, so in K8s
// cases zombies should be reaped anyway, albeit not in the most efficient way.
function shutdownOnConsumerExit(consumer: BatchConsumer) {
consumer.join().catch(async (error) => {
status.error('💥', 'Unexpected task joined!', { error: error.stack ?? error })
await closeJobs()
process.exit(1)
})
}

for (const signal of ['SIGINT', 'SIGTERM', 'SIGHUP']) {
Expand All @@ -164,6 +172,7 @@ export async function startPluginsServer(
// This makes async exit possible with the process waiting until jobs are closed
status.info('👋', 'process handling beforeExit event. Closing jobs...')
await closeJobs()
status.info('👋', 'Over and out!')
process.exit(0)
})

Expand Down Expand Up @@ -285,6 +294,7 @@ export async function startPluginsServer(
)

analyticsEventsIngestionConsumer = queue
shutdownOnConsumerExit(analyticsEventsIngestionConsumer.consumer!)
healthChecks['analytics-ingestion'] = isAnalyticsEventsIngestionHealthy
}

Expand All @@ -299,6 +309,7 @@ export async function startPluginsServer(
})

analyticsEventsIngestionHistoricalConsumer = queue
shutdownOnConsumerExit(analyticsEventsIngestionHistoricalConsumer.consumer!)
healthChecks['analytics-ingestion-historical'] = isAnalyticsEventsIngestionHistoricalHealthy
}

Expand All @@ -307,9 +318,12 @@ export async function startPluginsServer(
serverInstance = serverInstance ? serverInstance : { hub }

piscina = piscina ?? (await makePiscina(serverConfig, hub))
analyticsEventsIngestionOverflowConsumer = await startAnalyticsEventsIngestionOverflowConsumer({
const queue = await startAnalyticsEventsIngestionOverflowConsumer({
hub: hub,
})

analyticsEventsIngestionOverflowConsumer = queue
shutdownOnConsumerExit(analyticsEventsIngestionOverflowConsumer.consumer!)
}

if (capabilities.processAsyncOnEventHandlers) {
Expand Down Expand Up @@ -414,7 +428,7 @@ export async function startPluginsServer(

if (batchConsumer) {
stopSessionRecordingBlobConsumer = () => ingester.stop()
joinSessionRecordingBlobConsumer = () => batchConsumer.join()
shutdownOnConsumerExit(batchConsumer)
healthChecks['session-recordings-blob'] = () => ingester.isHealthy() ?? false
}
}
Expand All @@ -423,31 +437,6 @@ export async function startPluginsServer(
httpServer = createHttpServer(serverConfig.HTTP_SERVER_PORT, healthChecks, analyticsEventsIngestionConsumer)
}

// If session recordings consumer is defined, then join it. If join
// resolves, then the consumer has stopped and we should shut down
// everything else. Ideally we would also join all the other background
// tasks as well to ensure we stop the server if we hit any errors and
// don't end up with zombie instances, but I'll leave that refactoring
// for another time. Note that we have the liveness health checks
// already, so in K8s cases zombies should be reaped anyway, albeit not
// in the most efficient way.
//
// When extending to other consumers, we would want to do something like
//
// ```
// try {
// await Promise.race([sessionConsumer.join(), analyticsConsumer.join(), ...])
// } finally {
// await closeJobs()
// }
// ```
if (joinSessionRecordingEventsConsumer) {
joinSessionRecordingEventsConsumer().catch(closeJobs)
}
if (joinSessionRecordingBlobConsumer) {
joinSessionRecordingBlobConsumer().catch(closeJobs)
}

return serverInstance ?? { stop: closeJobs }
} catch (error) {
Sentry.captureException(error)
Expand Down

0 comments on commit dec7f41

Please sign in to comment.