Skip to content

Commit

Permalink
fix(plugin-server): join analytics ingestion server at top level
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner committed Oct 18, 2023
1 parent 71ca57f commit 0347046
Showing 1 changed file with 44 additions and 28 deletions.
72 changes: 44 additions & 28 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,11 @@ export async function startPluginsServer(
// 5. publishes the resulting event to a Kafka topic on which ClickHouse is
// listening.
let analyticsEventsIngestionConsumer: IngestionConsumer | undefined
let joinAnalyticsEventsIngestionConsumer: ((timeout?: number) => Promise<void>) | undefined
let analyticsEventsIngestionOverflowConsumer: IngestionConsumer | undefined
let joinAnalyticsEventsIngestionOverflowConsumer: ((timeout?: number) => Promise<void>) | undefined
let analyticsEventsIngestionHistoricalConsumer: IngestionConsumer | undefined
let joinAnalyticsEventsIngestionHistoricalConsumer: ((timeout?: number) => Promise<void>) | undefined
let onEventHandlerConsumer: KafkaJSIngestionConsumer | undefined
let stopWebhooksHandlerConsumer: () => Promise<void> | undefined

Expand All @@ -99,9 +102,7 @@ export async function startPluginsServer(
// (default 60 seconds) to allow for the person to be created in the
// meantime.
let bufferConsumer: Consumer | undefined
let stopSessionRecordingEventsConsumer: (() => void) | undefined
let stopSessionRecordingBlobConsumer: (() => void) | undefined
let joinSessionRecordingEventsConsumer: ((timeout?: number) => Promise<void>) | undefined
let joinSessionRecordingBlobConsumer: ((timeout?: number) => Promise<void>) | undefined
let jobsConsumer: Consumer | undefined
let schedulerTasksConsumer: Consumer | undefined
Expand Down Expand Up @@ -142,7 +143,6 @@ export async function startPluginsServer(
stopWebhooksHandlerConsumer?.(),
bufferConsumer?.disconnect(),
jobsConsumer?.disconnect(),
stopSessionRecordingEventsConsumer?.(),
stopSessionRecordingBlobConsumer?.(),
schedulerTasksConsumer?.disconnect(),
])
Expand Down Expand Up @@ -285,6 +285,12 @@ export async function startPluginsServer(
)

analyticsEventsIngestionConsumer = queue
joinAnalyticsEventsIngestionConsumer = () =>
queue.consumer!.join().catch((error) => {
status.error('💥', 'joinAnalyticsEventsIngestionConsumer error', error)
throw error
})

healthChecks['analytics-ingestion'] = isAnalyticsEventsIngestionHealthy
}

Expand All @@ -299,6 +305,12 @@ export async function startPluginsServer(
})

analyticsEventsIngestionHistoricalConsumer = queue
joinAnalyticsEventsIngestionHistoricalConsumer = () =>
queue.consumer!.join().catch((error) => {
status.error('💥', 'joinAnalyticsEventsIngestionHistoricalConsumer error', error)
throw error
})

healthChecks['analytics-ingestion-historical'] = isAnalyticsEventsIngestionHistoricalHealthy
}

Expand All @@ -307,9 +319,16 @@ export async function startPluginsServer(
serverInstance = serverInstance ? serverInstance : { hub }

piscina = piscina ?? (await makePiscina(serverConfig, hub))
analyticsEventsIngestionOverflowConsumer = await startAnalyticsEventsIngestionOverflowConsumer({
const queue = await startAnalyticsEventsIngestionOverflowConsumer({
hub: hub,
})

analyticsEventsIngestionOverflowConsumer = queue
joinAnalyticsEventsIngestionOverflowConsumer = () =>
queue.consumer!.join().catch((error) => {
status.error('💥', 'joinAnalyticsEventsIngestionOverflowConsumer error', error)
throw error
})
}

if (capabilities.processAsyncOnEventHandlers) {
Expand Down Expand Up @@ -414,7 +433,12 @@ export async function startPluginsServer(

if (batchConsumer) {
stopSessionRecordingBlobConsumer = () => ingester.stop()
joinSessionRecordingBlobConsumer = () => batchConsumer.join()
joinSessionRecordingBlobConsumer = () =>
batchConsumer.join().catch((error) => {
status.error('💥', 'joinSessionRecordingBlobConsumer error', error)
throw error
})

healthChecks['session-recordings-blob'] = () => ingester.isHealthy() ?? false
}
}
Expand All @@ -423,29 +447,21 @@ export async function startPluginsServer(
httpServer = createHttpServer(serverConfig.HTTP_SERVER_PORT, healthChecks, analyticsEventsIngestionConsumer)
}

// If session recordings consumer is defined, then join it. If join
// resolves, then the consumer has stopped and we should shut down
// everything else. Ideally we would also join all the other background
// tasks as well to ensure we stop the server if we hit any errors and
// don't end up with zombie instances, but I'll leave that refactoring
// for another time. Note that we have the liveness health checks
// already, so in K8s cases zombies should be reaped anyway, albeit not
// in the most efficient way.
//
// When extending to other consumers, we would want to do something like
//
// ```
// try {
// await Promise.race([sessionConsumer.join(), analyticsConsumer.join(), ...])
// } finally {
// await closeJobs()
// }
// ```
if (joinSessionRecordingEventsConsumer) {
joinSessionRecordingEventsConsumer().catch(closeJobs)
}
if (joinSessionRecordingBlobConsumer) {
joinSessionRecordingBlobConsumer().catch(closeJobs)
// If the session recordings or analytics consumer is defined, then join them. If join
// resolves, then the consumer has stopped and we should shut down everything else. Ideally
// we would also join all the other background tasks as well to ensure we stop the server if
// we hit any errors and don't end up with zombie instances, but I'll leave that refactoring
// for another time. Note that we have the liveness health checks already, so in K8s cases
// zombies should be reaped anyway, albeit not in the most efficient way.
const joinPromiseFns = [
joinSessionRecordingBlobConsumer,
joinAnalyticsEventsIngestionConsumer,
joinAnalyticsEventsIngestionHistoricalConsumer,
joinAnalyticsEventsIngestionOverflowConsumer,
].filter(Boolean) // Skip any that are undefined.

if (joinPromiseFns.length > 0) {
Promise.race(joinPromiseFns.map((p) => p?.())).catch(closeJobs)
}

return serverInstance ?? { stop: closeJobs }
Expand Down

0 comments on commit 0347046

Please sign in to comment.