Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite committed Jun 19, 2024
1 parent e3a07a9 commit 1a9a7e0
Showing 1 changed file with 18 additions and 18 deletions.
36 changes: 18 additions & 18 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,11 @@ export async function startPluginsServer(
}

try {
const services: Promise<any>[] = []
const startService = (
const capabilitiesPromises: Promise<any>[] = []
const startCapabilities = (
capability: keyof PluginServerCapabilities | (keyof PluginServerCapabilities)[],
startup: () => Promise<any> | void
) => {
const start = Date.now()
status.info('⚡️', `Starting service with capabilities ${capability}`)
if (Array.isArray(capability)) {
if (!capability.some((c) => capabilities[c])) {
return
Expand All @@ -213,10 +211,12 @@ export async function startPluginsServer(
return
}

const start = Date.now()
status.info('⚡️', `Starting service with capabilities ${capability}`)
const promise = (startup() ?? Promise.resolve()).then(() =>
status.info('🚀', `Service with capabilities ${capability} started in ${Date.now() - start}ms`)
status.info('🚀', `Capabilities ${capability} started in ${Date.now() - start}ms`)
)
services.push(promise)
capabilitiesPromises.push(promise)
}

status.info('🚀', 'Launching plugin server...')
Expand All @@ -231,7 +231,7 @@ export async function startPluginsServer(
// 3. clickhouse_events_json and plugin_events_ingestion
// 4. conversion_events_buffer
//
startService(['processPluginJobs', 'pluginScheduledTasks'], async () => {
startCapabilities(['processPluginJobs', 'pluginScheduledTasks'], async () => {
const graphileWorker = new GraphileWorker(hub)
shutdownCallbacks.push(async () => graphileWorker.stop())
// `connectProducer` just runs the PostgreSQL migrations. Ideally it
Expand Down Expand Up @@ -321,7 +321,7 @@ export async function startPluginsServer(
pluginServerStartupTimeMs.inc(Date.now() - timer.valueOf())
})

startService('ingestion', async () => {
startCapabilities('ingestion', async () => {
const consumer = await startAnalyticsEventsIngestionConsumer({
hub: hub,
})
Expand All @@ -332,7 +332,7 @@ export async function startPluginsServer(
readyChecks['analytics-ingestion'] = () => consumer.queue.consumerReady
})

startService('ingestionHistorical', async () => {
startCapabilities('ingestionHistorical', async () => {
const consumer = await startAnalyticsEventsIngestionHistoricalConsumer({
hub: hub,
})
Expand All @@ -342,7 +342,7 @@ export async function startPluginsServer(
healthChecks['analytics-ingestion-historical'] = consumer.isHealthy
})

startService('ingestionOverflow', async () => {
startCapabilities('ingestionOverflow', async () => {
const queue = await startAnalyticsEventsIngestionOverflowConsumer({
hub: hub,
})
Expand All @@ -351,7 +351,7 @@ export async function startPluginsServer(
shutdownOnConsumerExit(queue.consumer!)
})

startService('processAsyncOnEventHandlers', async () => {
startCapabilities('processAsyncOnEventHandlers', async () => {
const consumer = await startAsyncOnEventHandlerConsumer({
hub: hub,
})
Expand All @@ -360,7 +360,7 @@ export async function startPluginsServer(
healthChecks['on-event-ingestion'] = consumer.isHealthy
})

startService('processAsyncWebhooksHandlers', async () => {
startCapabilities('processAsyncWebhooksHandlers', async () => {
// TODO: Move to hub
const groupTypeManager = new GroupTypeManager(hub.postgres, hub.teamManager, serverConfig.SITE_URL)

Expand All @@ -381,7 +381,7 @@ export async function startPluginsServer(
healthChecks['webhooks-ingestion'] = consumer.isHealthy
})

startService('sessionRecordingBlobIngestion', async () => {
startCapabilities('sessionRecordingBlobIngestion', async () => {
if (!hub.objectStorage) {
throw new Error("Can't start session recording blob ingestion without object storage")
}
Expand All @@ -404,7 +404,7 @@ export async function startPluginsServer(
}
})

startService('sessionRecordingBlobOverflowIngestion', async () => {
startCapabilities('sessionRecordingBlobOverflowIngestion', async () => {
if (!hub.objectStorage) {
throw new Error("Can't start session recording blob ingestion without object storage")
}
Expand All @@ -428,7 +428,7 @@ export async function startPluginsServer(
}
})

startService('cdpProcessedEvents', async () => {
startCapabilities('cdpProcessedEvents', async () => {
const consumer = new CdpProcessedEventsConsumer(serverConfig, hub)
await consumer.start()

Expand All @@ -437,7 +437,7 @@ export async function startPluginsServer(
healthChecks['cdp-processed-events'] = () => consumer.isHealthy() ?? false
})

startService('cdpFunctionCallbacks', async () => {
startCapabilities('cdpFunctionCallbacks', async () => {
const consumer = new CdpFunctionCallbackConsumer(serverConfig, hub)
await consumer.start()

Expand All @@ -451,7 +451,7 @@ export async function startPluginsServer(
}
})

startService('personOverrides', () => {
startCapabilities('personOverrides', () => {
const personOverridesPeriodicTask = new DeferredPersonOverrideWorker(
hub.postgres,
hub.kafkaProducer,
Expand All @@ -466,7 +466,7 @@ export async function startPluginsServer(
shutdownCallbacks.push(async () => personOverridesPeriodicTask.stop())
})

await Promise.all(services)
await Promise.all(capabilitiesPromises)

// HTTP we setup last as it is somewhat dependent on the other services
if (capabilities.http) {
Expand Down

0 comments on commit 1a9a7e0

Please sign in to comment.