diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index 86b5e336c8d379..80862b7d8d7196 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -89,8 +89,11 @@ export async function startPluginsServer( // 5. publishes the resulting event to a Kafka topic on which ClickHouse is // listening. let analyticsEventsIngestionConsumer: IngestionConsumer | undefined + let joinAnalyticsEventsIngestionConsumer: ((timeout?: number) => Promise<void>) | undefined let analyticsEventsIngestionOverflowConsumer: IngestionConsumer | undefined + let joinAnalyticsEventsIngestionOverflowConsumer: ((timeout?: number) => Promise<void>) | undefined let analyticsEventsIngestionHistoricalConsumer: IngestionConsumer | undefined + let joinAnalyticsEventsIngestionHistoricalConsumer: ((timeout?: number) => Promise<void>) | undefined let onEventHandlerConsumer: KafkaJSIngestionConsumer | undefined let stopWebhooksHandlerConsumer: () => Promise<void> | undefined @@ -99,9 +102,7 @@ export async function startPluginsServer( // (default 60 seconds) to allow for the person to be created in the // meantime. let bufferConsumer: Consumer | undefined - let stopSessionRecordingEventsConsumer: (() => void) | undefined let stopSessionRecordingBlobConsumer: (() => void) | undefined - let joinSessionRecordingEventsConsumer: ((timeout?: number) => Promise<void>) | undefined let joinSessionRecordingBlobConsumer: ((timeout?: number) => Promise<void>) | undefined let jobsConsumer: Consumer | undefined let schedulerTasksConsumer: Consumer | undefined @@ -142,7 +143,6 @@ export async function startPluginsServer( stopWebhooksHandlerConsumer?.(), bufferConsumer?.disconnect(), jobsConsumer?.disconnect(), - stopSessionRecordingEventsConsumer?.(), stopSessionRecordingBlobConsumer?.(), schedulerTasksConsumer?.disconnect(), ]) @@ -285,6 +285,12 @@ export async function startPluginsServer( ) analyticsEventsIngestionConsumer = queue + joinAnalyticsEventsIngestionConsumer = () => + queue.consumer!.join().catch((error) => { + status.error('💥', 'joinAnalyticsEventsIngestionConsumer error', error) + throw error + }) + healthChecks['analytics-ingestion'] = isAnalyticsEventsIngestionHealthy } @@ -299,6 +305,12 @@ export async function startPluginsServer( }) analyticsEventsIngestionHistoricalConsumer = queue + joinAnalyticsEventsIngestionHistoricalConsumer = () => + queue.consumer!.join().catch((error) => { + status.error('💥', 'joinAnalyticsEventsIngestionHistoricalConsumer error', error) + throw error + }) + healthChecks['analytics-ingestion-historical'] = isAnalyticsEventsIngestionHistoricalHealthy } @@ -307,9 +319,16 @@ export async function startPluginsServer( serverInstance = serverInstance ? serverInstance : { hub } piscina = piscina ?? (await makePiscina(serverConfig, hub)) - analyticsEventsIngestionOverflowConsumer = await startAnalyticsEventsIngestionOverflowConsumer({ + const queue = await startAnalyticsEventsIngestionOverflowConsumer({ hub: hub, }) + + analyticsEventsIngestionOverflowConsumer = queue + joinAnalyticsEventsIngestionOverflowConsumer = () => + queue.consumer!.join().catch((error) => { + status.error('💥', 'joinAnalyticsEventsIngestionOverflowConsumer error', error) + throw error + }) } if (capabilities.processAsyncOnEventHandlers) { @@ -414,7 +433,12 @@ export async function startPluginsServer( if (batchConsumer) { stopSessionRecordingBlobConsumer = () => ingester.stop() - joinSessionRecordingBlobConsumer = () => batchConsumer.join() + joinSessionRecordingBlobConsumer = () => + batchConsumer.join().catch((error) => { + status.error('💥', 'joinSessionRecordingBlobConsumer error', error) + throw error + }) + healthChecks['session-recordings-blob'] = () => ingester.isHealthy() ?? false } } @@ -423,29 +447,21 @@ export async function startPluginsServer( httpServer = createHttpServer(serverConfig.HTTP_SERVER_PORT, healthChecks, analyticsEventsIngestionConsumer) } - // If session recordings consumer is defined, then join it. If join - // resolves, then the consumer has stopped and we should shut down - // everything else. Ideally we would also join all the other background - // tasks as well to ensure we stop the server if we hit any errors and - // don't end up with zombie instances, but I'll leave that refactoring - // for another time. Note that we have the liveness health checks - // already, so in K8s cases zombies should be reaped anyway, albeit not - // in the most efficient way. - // - // When extending to other consumers, we would want to do something like - // - // ``` - // try { - // await Promise.race([sessionConsumer.join(), analyticsConsumer.join(), ...]) - // } finally { - // await closeJobs() - // } - // ``` - if (joinSessionRecordingEventsConsumer) { - joinSessionRecordingEventsConsumer().catch(closeJobs) - } - if (joinSessionRecordingBlobConsumer) { - joinSessionRecordingBlobConsumer().catch(closeJobs) + // If the session recordings or analytics consumer is defined, then join them. If join + // resolves, then the consumer has stopped and we should shut down everything else. Ideally + // we would also join all the other background tasks as well to ensure we stop the server if + // we hit any errors and don't end up with zombie instances, but I'll leave that refactoring + // for another time. Note that we have the liveness health checks already, so in K8s cases + // zombies should be reaped anyway, albeit not in the most efficient way. + const joinPromiseFns = [ + joinSessionRecordingBlobConsumer, + joinAnalyticsEventsIngestionConsumer, + joinAnalyticsEventsIngestionHistoricalConsumer, + joinAnalyticsEventsIngestionOverflowConsumer, + ].filter(Boolean) // Skip any that are undefined. + + if (joinPromiseFns.length > 0) { + Promise.race(joinPromiseFns.map((p) => p?.())).catch(closeJobs) } return serverInstance ?? { stop: closeJobs }