From 1a9a7e0df300b05e11b0a5979ce40245df83dc2c Mon Sep 17 00:00:00 2001 From: Ben White Date: Wed, 19 Jun 2024 12:42:51 +0200 Subject: [PATCH] Fixes --- plugin-server/src/main/pluginsServer.ts | 36 ++++++++++++------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index aa21113ef942e..8f35861286612 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -198,13 +198,11 @@ export async function startPluginsServer( } try { - const services: Promise[] = [] - const startService = ( + const capabilitiesPromises: Promise[] = [] + const startCapabilities = ( capability: keyof PluginServerCapabilities | (keyof PluginServerCapabilities)[], startup: () => Promise | void ) => { - const start = Date.now() - status.info('⚡️', `Starting service with capabilities ${capability}`) if (Array.isArray(capability)) { if (!capability.some((c) => capabilities[c])) { return @@ -213,10 +211,12 @@ export async function startPluginsServer( return } + const start = Date.now() + status.info('⚡️', `Starting service with capabilities ${capability}`) const promise = (startup() ?? Promise.resolve()).then(() => - status.info('🚀', `Service with capabilities ${capability} started in ${Date.now() - start}ms`) + status.info('🚀', `Capabilities ${capability} started in ${Date.now() - start}ms`) ) - services.push(promise) + capabilitiesPromises.push(promise) } status.info('🚀', 'Launching plugin server...') @@ -231,7 +231,7 @@ export async function startPluginsServer( // 3. clickhouse_events_json and plugin_events_ingestion // 4. conversion_events_buffer // - startService(['processPluginJobs', 'pluginScheduledTasks'], async () => { + startCapabilities(['processPluginJobs', 'pluginScheduledTasks'], async () => { const graphileWorker = new GraphileWorker(hub) shutdownCallbacks.push(async () => graphileWorker.stop()) // `connectProducer` just runs the PostgreSQL migrations. Ideally it @@ -321,7 +321,7 @@ export async function startPluginsServer( pluginServerStartupTimeMs.inc(Date.now() - timer.valueOf()) }) - startService('ingestion', async () => { + startCapabilities('ingestion', async () => { const consumer = await startAnalyticsEventsIngestionConsumer({ hub: hub, }) @@ -332,7 +332,7 @@ export async function startPluginsServer( readyChecks['analytics-ingestion'] = () => consumer.queue.consumerReady }) - startService('ingestionHistorical', async () => { + startCapabilities('ingestionHistorical', async () => { const consumer = await startAnalyticsEventsIngestionHistoricalConsumer({ hub: hub, }) @@ -342,7 +342,7 @@ export async function startPluginsServer( healthChecks['analytics-ingestion-historical'] = consumer.isHealthy }) - startService('ingestionOverflow', async () => { + startCapabilities('ingestionOverflow', async () => { const queue = await startAnalyticsEventsIngestionOverflowConsumer({ hub: hub, }) @@ -351,7 +351,7 @@ export async function startPluginsServer( shutdownOnConsumerExit(queue.consumer!) }) - startService('processAsyncOnEventHandlers', async () => { + startCapabilities('processAsyncOnEventHandlers', async () => { const consumer = await startAsyncOnEventHandlerConsumer({ hub: hub, }) @@ -360,7 +360,7 @@ export async function startPluginsServer( healthChecks['on-event-ingestion'] = consumer.isHealthy }) - startService('processAsyncWebhooksHandlers', async () => { + startCapabilities('processAsyncWebhooksHandlers', async () => { // TODO: Move to hub const groupTypeManager = new GroupTypeManager(hub.postgres, hub.teamManager, serverConfig.SITE_URL) @@ -381,7 +381,7 @@ export async function startPluginsServer( healthChecks['webhooks-ingestion'] = consumer.isHealthy }) - startService('sessionRecordingBlobIngestion', async () => { + startCapabilities('sessionRecordingBlobIngestion', async () => { if (!hub.objectStorage) { throw new Error("Can't start session recording blob ingestion without object storage") } @@ -404,7 +404,7 @@ export async function startPluginsServer( } }) - startService('sessionRecordingBlobOverflowIngestion', async () => { + startCapabilities('sessionRecordingBlobOverflowIngestion', async () => { if (!hub.objectStorage) { throw new Error("Can't start session recording blob ingestion without object storage") } @@ -428,7 +428,7 @@ export async function startPluginsServer( } }) - startService('cdpProcessedEvents', async () => { + startCapabilities('cdpProcessedEvents', async () => { const consumer = new CdpProcessedEventsConsumer(serverConfig, hub) await consumer.start() @@ -437,7 +437,7 @@ export async function startPluginsServer( healthChecks['cdp-processed-events'] = () => consumer.isHealthy() ?? false }) - startService('cdpFunctionCallbacks', async () => { + startCapabilities('cdpFunctionCallbacks', async () => { const consumer = new CdpFunctionCallbackConsumer(serverConfig, hub) await consumer.start() @@ -451,7 +451,7 @@ export async function startPluginsServer( } }) - startService('personOverrides', () => { + startCapabilities('personOverrides', () => { const personOverridesPeriodicTask = new DeferredPersonOverrideWorker( hub.postgres, hub.kafkaProducer, @@ -466,7 +466,7 @@ export async function startPluginsServer( shutdownCallbacks.push(async () => personOverridesPeriodicTask.stop()) }) - await Promise.all(services) + await Promise.all(capabilitiesPromises) // HTTP we setup last as it is somewhat dependent on the other services if (capabilities.http) {