From 16870d63fc0a8b3e56cf2efd6cd231a6ba37662b Mon Sep 17 00:00:00 2001 From: Ben White Date: Tue, 3 Sep 2024 11:00:50 +0200 Subject: [PATCH] feat: Plugin server services refactor - take 2 (#24737) --- plugin-server/bin/ci_functional_tests.sh | 2 - plugin-server/src/cdp/cdp-consumers.ts | 11 +- plugin-server/src/config/config.ts | 4 - .../analytics-events-ingestion-consumer.ts | 11 +- ...cs-events-ingestion-historical-consumer.ts | 11 +- ...tics-events-ingestion-overflow-consumer.ts | 7 +- .../batch-processing/each-batch-ingestion.ts | 6 +- .../events-ingestion-consumer.ts | 18 +- .../main/ingestion-queues/jobs-consumer.ts | 10 +- .../src/main/ingestion-queues/kafka-queue.ts | 10 +- .../on-event-handler-consumer.ts | 45 +-- .../scheduled-tasks-consumer.ts | 14 +- .../session-recordings-consumer.ts | 11 +- plugin-server/src/main/pluginsServer.ts | 304 ++++++++---------- .../src/main/services/http-server.ts | 40 +-- plugin-server/src/types.ts | 24 +- plugin-server/src/utils/db/hub.ts | 44 ++- plugin-server/src/utils/utils.ts | 26 -- .../event-pipeline/createEventStep.ts | 2 +- .../event-pipeline/prepareEventStep.ts | 2 +- .../worker/ingestion/event-pipeline/runner.ts | 5 +- .../cdp-function-callbacks-consumer.test.ts | 7 +- .../cdp/cdp-processed-events-consumer.test.ts | 7 +- .../tests/cdp/hog-function-manager.test.ts | 7 +- plugin-server/tests/cdp/hog-masker.test.ts | 8 +- plugin-server/tests/cdp/hog-watcher.test.ts | 7 +- plugin-server/tests/http-server.test.ts | 1 - plugin-server/tests/main/capabilities.test.ts | 7 +- plugin-server/tests/main/db.test.ts | 7 +- ...nalytics-events-ingestion-consumer.test.ts | 7 +- .../each-batch-webhooks.test.ts | 7 +- .../main/ingestion-queues/each-batch.test.ts | 1 - .../main/ingestion-queues/kafka-queue.test.ts | 2 - .../run-async-handlers-event-pipeline.test.ts | 12 +- .../run-ingestion-pipeline.test.ts | 14 +- .../services/offset-high-water-mark.test.ts | 7 +- .../services/overflow-manager.test.ts | 7 +- .../session-recordings-consumer.test.ts | 7 +- .../tests/main/process-event.test.ts | 31 +- plugin-server/tests/main/teardown.test.ts | 3 +- plugin-server/tests/sql.test.ts | 7 +- .../tests/utils/db/activity-log.test.ts | 7 +- .../tests/worker/capabilities.test.ts | 7 +- plugin-server/tests/worker/console.test.ts | 7 +- .../tests/worker/dead-letter-queue.test.ts | 12 +- .../worker/ingestion/action-manager.test.ts | 7 +- .../worker/ingestion/action-matcher.test.ts | 7 +- .../worker/ingestion/app-metrics.test.ts | 7 +- .../enrichExceptionEventStep.test.ts | 1 - .../event-pipeline-integration.test.ts | 14 +- .../extractHeatmapDataStep.test.ts | 1 - .../event-pipeline/normalizeEventStep.test.ts | 6 +- .../pluginsProcessEventStep.test.ts | 12 +- .../populateTeamDataStep.test.ts | 1 - .../event-pipeline/prepareEventStep.test.ts | 23 +- .../event-pipeline/processPersonsStep.test.ts | 13 +- .../runAsyncHandlersStep.test.ts | 1 - .../ingestion/event-pipeline/runner.test.ts | 9 +- .../ingestion/group-type-manager.test.ts | 7 +- .../ingestion/organization-manager.test.ts | 7 +- .../worker/ingestion/person-state.test.ts | 12 +- .../worker/ingestion/process-event.test.ts | 7 +- .../ingestion/properties-updater.test.ts | 7 +- .../property-definitions-cache.test.ts | 7 +- .../property-definitions-manager.test.ts | 7 +- .../tests/worker/ingestion/utils.test.ts | 7 +- .../worker/plugins-api-key-manager.test.ts | 7 +- plugin-server/tests/worker/plugins.test.ts | 7 +- .../tests/worker/plugins/inline.test.ts | 7 +- .../plugins/inline_plugins/user-agent.test.ts | 7 +- .../tests/worker/plugins/mmdb.test.ts | 7 +- plugin-server/tests/worker/transforms.test.ts | 7 +- .../tests/worker/vm.extra-lazy.test.ts | 7 +- plugin-server/tests/worker/vm.test.ts | 7 +- plugin-server/tests/worker/vm.timeout.test.ts | 7 +- 75 files changed, 446 insertions(+), 576 deletions(-) diff --git a/plugin-server/bin/ci_functional_tests.sh b/plugin-server/bin/ci_functional_tests.sh index 2d89a623624bd..4ec68bf229c91 100755 --- a/plugin-server/bin/ci_functional_tests.sh +++ b/plugin-server/bin/ci_functional_tests.sh @@ -12,8 +12,6 @@ set -e -o pipefail export WORKER_CONCURRENCY=1 -export CONVERSION_BUFFER_ENABLED=true -export BUFFER_CONVERSION_SECONDS=2 # Make sure we don't have to wait for the default 60 seconds export KAFKA_MAX_MESSAGE_BATCH_SIZE=0 export APP_METRICS_FLUSH_FREQUENCY_MS=0 # Reduce the potential for spurious errors in tests that wait for metrics export APP_METRICS_GATHERED_FOR_ALL=true diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index fef401d472927..1baec072be7bd 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -16,7 +16,7 @@ import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars import { createKafkaProducer } from '../kafka/producer' import { addSentryBreadcrumbsEventListeners } from '../main/ingestion-queues/kafka-metrics' import { runInstrumentedFunction } from '../main/utils' -import { AppMetric2Type, Hub, RawClickHouseEvent, TeamId, TimestampFormat } from '../types' +import { AppMetric2Type, Hub, PluginServerService, RawClickHouseEvent, TeamId, TimestampFormat } from '../types' import { KafkaProducerWrapper } from '../utils/db/kafka-producer-wrapper' import { captureTeamEvent } from '../utils/posthog' import { status } from '../utils/status' @@ -113,6 +113,15 @@ abstract class CdpConsumerBase { this.groupsManager = new GroupsManager(this.hub) } + public get service(): PluginServerService { + return { + id: this.consumerGroupId, + onShutdown: async () => await this.stop(), + healthcheck: () => this.isHealthy() ?? false, + batchConsumer: this.batchConsumer, + } + } + private async captureInternalPostHogEvent( hogFunctionId: HogFunctionType['id'], event: string, diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index 7de2856530e14..936cde4e65656 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -108,10 +108,6 @@ export function getDefaultConfig(): PluginsServerConfig { CLICKHOUSE_DISABLE_EXTERNAL_SCHEMAS_TEAMS: '', CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC: KAFKA_EVENTS_JSON, CLICKHOUSE_HEATMAPS_KAFKA_TOPIC: KAFKA_CLICKHOUSE_HEATMAP_EVENTS, - CONVERSION_BUFFER_ENABLED: false, - CONVERSION_BUFFER_ENABLED_TEAMS: '', - CONVERSION_BUFFER_TOPIC_ENABLED_TEAMS: '', - BUFFER_CONVERSION_SECONDS: isDevEnv() ? 2 : 60, // KEEP IN SYNC WITH posthog/settings/ingestion.py PERSON_INFO_CACHE_TTL: 5 * 60, // 5 min KAFKA_HEALTHCHECK_SECONDS: 20, OBJECT_STORAGE_ENABLED: true, diff --git a/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts b/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts index e452140931c7c..259d231e9d01a 100644 --- a/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts @@ -3,7 +3,7 @@ import { Counter } from 'prom-client' import { buildStringMatcher } from '../../config/config' import { KAFKA_EVENTS_PLUGIN_INGESTION, prefix as KAFKA_PREFIX } from '../../config/kafka-topics' -import { Hub } from '../../types' +import { Hub, PluginServerService } from '../../types' import { status } from '../../utils/status' import { eachBatchParallelIngestion, IngestionOverflowMode } from './batch-processing/each-batch-ingestion' import { IngestionConsumer } from './kafka-queue' @@ -18,7 +18,7 @@ export const startAnalyticsEventsIngestionConsumer = async ({ hub, // TODO: remove needing to pass in the whole hub and be more selective on dependency injection. }: { hub: Hub -}) => { +}): Promise => { /* Consumes analytics events from the Kafka topic `events_plugin_ingestion` and processes them for ingestion into ClickHouse. @@ -66,5 +66,10 @@ export const startAnalyticsEventsIngestionConsumer = async ({ const { isHealthy } = await queue.start() - return { queue, isHealthy } + return { + id: 'analytics-ingestion', + batchConsumer: queue.consumer, + healthcheck: isHealthy, + onShutdown: () => queue.stop(), + } } diff --git a/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-historical-consumer.ts b/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-historical-consumer.ts index 92139751e9760..2b989eaeaab29 100644 --- a/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-historical-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-historical-consumer.ts @@ -2,7 +2,7 @@ import { Message } from 'node-rdkafka' import { buildStringMatcher } from '../../config/config' import { KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL, prefix as KAFKA_PREFIX } from '../../config/kafka-topics' -import { Hub } from '../../types' +import { Hub, PluginServerService } from '../../types' import { status } from '../../utils/status' import { eachBatchParallelIngestion, IngestionOverflowMode } from './batch-processing/each-batch-ingestion' import { IngestionConsumer } from './kafka-queue' @@ -11,7 +11,7 @@ export const startAnalyticsEventsIngestionHistoricalConsumer = async ({ hub, // TODO: remove needing to pass in the whole hub and be more selective on dependency injection. }: { hub: Hub -}) => { +}): Promise => { /* Consumes analytics events from the Kafka topic `events_plugin_ingestion_historical` and processes them for ingestion into ClickHouse. @@ -39,5 +39,10 @@ export const startAnalyticsEventsIngestionHistoricalConsumer = async ({ const { isHealthy } = await queue.start() - return { queue, isHealthy } + return { + id: 'analytics-ingestion-historical', + onShutdown: async () => await queue.stop(), + healthcheck: isHealthy, + batchConsumer: queue.consumer, + } } diff --git a/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.ts b/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.ts index 25f107c90466e..5d6667c283b72 100644 --- a/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.ts @@ -50,5 +50,10 @@ export const startAnalyticsEventsIngestionOverflowConsumer = async ({ const { isHealthy } = await queue.start() - return { queue, isHealthy } + return { + id: 'analytics-ingestion-overflow', + onShutdown: async () => await queue.stop(), + healthcheck: isHealthy, + batchConsumer: queue.consumer, + } } diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts index 245c2edc77baf..2757853e02ebc 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts @@ -170,7 +170,11 @@ export async function eachBatchParallelIngestion( for (const { message, pluginEvent } of currentBatch) { try { const result = (await retryIfRetriable(async () => { - const runner = new EventPipelineRunner(queue.pluginsServer, pluginEvent) + const runner = new EventPipelineRunner( + queue.pluginsServer, + pluginEvent, + queue.eventsProcessor + ) return await runner.runEventPipeline(pluginEvent) })) as IngestResult diff --git a/plugin-server/src/main/ingestion-queues/events-ingestion-consumer.ts b/plugin-server/src/main/ingestion-queues/events-ingestion-consumer.ts index 695185b8074f1..56ea4847db10e 100644 --- a/plugin-server/src/main/ingestion-queues/events-ingestion-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/events-ingestion-consumer.ts @@ -2,7 +2,7 @@ import { Message } from 'node-rdkafka' import { buildStringMatcher } from '../../config/config' import { prefix as KAFKA_PREFIX, suffix as KAFKA_SUFFIX } from '../../config/kafka-topics' -import { Hub } from '../../types' +import { Hub, PluginServerService } from '../../types' import { status } from '../../utils/status' import { eachBatchParallelIngestion, IngestionOverflowMode } from './batch-processing/each-batch-ingestion' import { IngestionConsumer } from './kafka-queue' @@ -27,16 +27,19 @@ export const PIPELINES: { [key: string]: PipelineType } = { }, } +export type PipelineKeyType = keyof typeof PIPELINES + export const startEventsIngestionPipelineConsumer = async ({ hub, // TODO: remove needing to pass in the whole hub and be more selective on dependency injection. - pipeline, + pipelineKey, }: { hub: Hub - pipeline: PipelineType -}) => { + pipelineKey: PipelineKeyType +}): Promise => { /* Consumes events from the topic and consumer passed in. */ + const pipeline = PIPELINES[pipelineKey] const kafka_topic = `${KAFKA_PREFIX}${pipeline.topic}${KAFKA_SUFFIX}` const kafka_consumer = `${KAFKA_PREFIX}${pipeline.consumer_group}` status.info( @@ -59,5 +62,10 @@ export const startEventsIngestionPipelineConsumer = async ({ const { isHealthy } = await queue.start() - return { queue, isHealthy } + return { + id: `events-ingestion-pipeline-${pipelineKey}`, + onShutdown: async () => await queue.stop(), + healthcheck: isHealthy, + batchConsumer: queue.consumer, + } } diff --git a/plugin-server/src/main/ingestion-queues/jobs-consumer.ts b/plugin-server/src/main/ingestion-queues/jobs-consumer.ts index 605a812068c51..1ed19c2fc6fce 100644 --- a/plugin-server/src/main/ingestion-queues/jobs-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/jobs-consumer.ts @@ -8,6 +8,7 @@ import { status } from '../../utils/status' import { GraphileWorker } from '../graphile-worker/graphile-worker' import { instrumentEachBatchKafkaJS, setupEventHandlers } from './kafka-queue' import { latestOffsetTimestampGauge } from './metrics' +import { makeHealthCheck } from './on-event-handler-consumer' const jobsConsumerSuccessCounter = new Counter({ name: 'jobs_consumer_enqueue_success_total', @@ -125,10 +126,11 @@ export const startJobsConsumer = async ({ }, }) + const healthcheck = makeHealthCheck(consumer, serverConfig.KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS) + return { - ...consumer, - stop: async () => { - await consumer.stop() - }, + id: 'jobs-consumer', + healthcheck: async () => await healthcheck(), + onShutdown: async () => await consumer.stop(), } } diff --git a/plugin-server/src/main/ingestion-queues/kafka-queue.ts b/plugin-server/src/main/ingestion-queues/kafka-queue.ts index 5af4147b84e5b..1e7a58a0e076b 100644 --- a/plugin-server/src/main/ingestion-queues/kafka-queue.ts +++ b/plugin-server/src/main/ingestion-queues/kafka-queue.ts @@ -10,6 +10,7 @@ import { KafkaConfig } from '../../utils/db/hub' import { timeoutGuard } from '../../utils/db/utils' import { status } from '../../utils/status' import { killGracefully } from '../../utils/utils' +import { EventsProcessor } from '../../worker/ingestion/process-event' import { addMetricsEventListeners } from './kafka-metrics' type ConsumerManagementPayload = { @@ -166,20 +167,18 @@ type EachBatchFunction = (messages: Message[], queue: IngestionConsumer) => Prom export class IngestionConsumer { public pluginsServer: Hub - public consumerReady: boolean public topic: string public consumerGroupId: string public eachBatch: EachBatchFunction public consumer?: BatchConsumer + public eventsProcessor: EventsProcessor constructor(pluginsServer: Hub, topic: string, consumerGroupId: string, batchHandler: EachBatchFunction) { this.pluginsServer = pluginsServer this.topic = topic this.consumerGroupId = consumerGroupId - - this.consumerReady = false - this.eachBatch = batchHandler + this.eventsProcessor = new EventsProcessor(pluginsServer) } async start(): Promise { @@ -200,7 +199,6 @@ export class IngestionConsumer { topicMetadataRefreshInterval: this.pluginsServer.KAFKA_TOPIC_METADATA_REFRESH_INTERVAL_MS, eachBatch: (payload) => this.eachBatchConsumer(payload), }) - this.consumerReady = true return this.consumer } @@ -216,8 +214,6 @@ export class IngestionConsumer { } catch (error) { status.error('⚠ī¸', 'An error occurred while stopping Kafka queue:\n', error) } - - this.consumerReady = false } } diff --git a/plugin-server/src/main/ingestion-queues/on-event-handler-consumer.ts b/plugin-server/src/main/ingestion-queues/on-event-handler-consumer.ts index 4ebc588000c6e..6d24d6d722cf4 100644 --- a/plugin-server/src/main/ingestion-queues/on-event-handler-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/on-event-handler-consumer.ts @@ -1,7 +1,7 @@ import { Consumer, Kafka } from 'kafkajs' import { KAFKA_EVENTS_JSON, prefix as KAFKA_PREFIX } from '../../config/kafka-topics' -import { Hub, PluginsServerConfig } from '../../types' +import { Hub, PluginServerService, PluginsServerConfig } from '../../types' import { PostgresRouter } from '../../utils/db/postgres' import { status } from '../../utils/status' import { ActionManager } from '../../worker/ingestion/action-manager' @@ -20,7 +20,7 @@ export const startAsyncOnEventHandlerConsumer = async ({ hub, // TODO: remove needing to pass in the whole hub and be more selective on dependency injection. }: { hub: Hub -}) => { +}): Promise => { /* Consumes analytics events from the Kafka topic `clickhouse_events_json` and processes any onEvent plugin handlers configured for the team. @@ -35,9 +35,11 @@ export const startAsyncOnEventHandlerConsumer = async ({ await hub.actionManager.start() await queue.start() - const isHealthy = makeHealthCheck(queue.consumer, queue.sessionTimeout) - - return { queue, isHealthy: () => isHealthy() } + return { + id: 'on-event-ingestion', + healthcheck: makeHealthCheck(queue.consumer, queue.sessionTimeout), + onShutdown: async () => await queue.stop(), + } } export const startAsyncWebhooksHandlerConsumer = async ({ @@ -62,7 +64,7 @@ export const startAsyncWebhooksHandlerConsumer = async ({ groupTypeManager: GroupTypeManager actionMatcher: ActionMatcher actionManager: ActionManager -}) => { +}): Promise => { /* Consumes analytics events from the Kafka topic `clickhouse_events_json` and processes any onEvent plugin handlers configured for the team. @@ -106,23 +108,24 @@ export const startAsyncWebhooksHandlerConsumer = async ({ ), }) - const isHealthy = makeHealthCheck(consumer, serverConfig.KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS) + const onShutdown = async () => { + await actionManager.stop() + try { + await consumer.stop() + } catch (e) { + status.error('🚨', 'Error stopping consumer', e) + } + try { + await consumer.disconnect() + } catch (e) { + status.error('🚨', 'Error disconnecting consumer', e) + } + } return { - stop: async () => { - await actionManager.stop() - try { - await consumer.stop() - } catch (e) { - status.error('🚨', 'Error stopping consumer', e) - } - try { - await consumer.disconnect() - } catch (e) { - status.error('🚨', 'Error disconnecting consumer', e) - } - }, - isHealthy, + id: 'webhooks-ingestion', + healthcheck: makeHealthCheck(consumer, serverConfig.KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS), + onShutdown, } } diff --git a/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts b/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts index 83ea62fdfdd6f..8a225f02766ff 100644 --- a/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts @@ -2,12 +2,13 @@ import { Batch, EachBatchHandler, Kafka } from 'kafkajs' import { KafkaProducerWrapper } from 'utils/db/kafka-producer-wrapper' import { KAFKA_SCHEDULED_TASKS, KAFKA_SCHEDULED_TASKS_DLQ } from '../../config/kafka-topics' -import { PluginsServerConfig } from '../../types' +import { PluginServerService, PluginsServerConfig } from '../../types' import { DependencyUnavailableError } from '../../utils/db/error' import { status } from '../../utils/status' import Piscina from '../../worker/piscina' import { instrumentEachBatchKafkaJS, setupEventHandlers } from './kafka-queue' import { latestOffsetTimestampGauge, scheduledTaskCounter } from './metrics' +import { makeHealthCheck } from './on-event-handler-consumer' // The valid task types that can be scheduled. // TODO: not sure if there is another place that defines these but it would be good to unify. @@ -25,7 +26,7 @@ export const startScheduledTasksConsumer = async ({ piscina: Piscina serverConfig: PluginsServerConfig partitionConcurrency: number -}) => { +}): Promise => { /* Consumes from the scheduled tasks topic, and executes them within a @@ -138,11 +139,12 @@ export const startScheduledTasksConsumer = async ({ }, }) + const healthcheck = makeHealthCheck(consumer, serverConfig.KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS) + return { - ...consumer, - stop: async () => { - await consumer.stop() - }, + id: 'scheduled-tasks-consumer', + healthcheck: async () => await healthcheck(), + onShutdown: async () => await consumer.stop(), } } diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index feed88b570d5f..df72cd6c2dcf4 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -13,7 +13,7 @@ import { import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer' import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../../../kafka/config' import { createKafkaProducer } from '../../../kafka/producer' -import { PluginsServerConfig, RedisPool, TeamId, ValueMatcher } from '../../../types' +import { PluginServerService, PluginsServerConfig, RedisPool, TeamId, ValueMatcher } from '../../../types' import { BackgroundRefresher } from '../../../utils/background-refresher' import { KafkaProducerWrapper } from '../../../utils/db/kafka-producer-wrapper' import { PostgresRouter } from '../../../utils/db/postgres' @@ -235,6 +235,15 @@ export class SessionRecordingIngester { }, 10000) } + public get service(): PluginServerService { + return { + id: 'session-recordings-blob-overflow', + onShutdown: async () => await this.stop(), + healthcheck: () => this.isHealthy() ?? false, + batchConsumer: this.batchConsumer, + } + } + private get connectedBatchConsumer(): KafkaConsumer | undefined { // Helper to only use the batch consumer if we are actually connected to it - otherwise it will throw errors const consumer = this.batchConsumer?.consumer diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index 6f26f480451db..b2dacc0b9767c 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -1,7 +1,6 @@ import * as Sentry from '@sentry/node' import fs from 'fs' import { Server } from 'http' -import { BatchConsumer } from 'kafka/batch-consumer' import { CompressionCodecs, CompressionTypes, KafkaJSProtocolError } from 'kafkajs' // @ts-expect-error no type definitions import SnappyCodec from 'kafkajs-snappy' @@ -18,8 +17,8 @@ import { CdpProcessedEventsConsumer, } from '../cdp/cdp-consumers' import { defaultConfig, sessionRecordingConsumerConfig } from '../config/config' -import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types' -import { createHub, createKafkaClient, createKafkaProducerWrapper } from '../utils/db/hub' +import { Hub, PluginServerCapabilities, PluginServerService, PluginsServerConfig } from '../types' +import { closeHub, createHub, createKafkaClient, createKafkaProducerWrapper } from '../utils/db/hub' import { PostgresRouter } from '../utils/db/postgres' import { cancelAllScheduledJobs } from '../utils/node-schedule' import { posthog } from '../utils/posthog' @@ -41,13 +40,8 @@ import { startGraphileWorker } from './graphile-worker/worker-setup' import { startAnalyticsEventsIngestionConsumer } from './ingestion-queues/analytics-events-ingestion-consumer' import { startAnalyticsEventsIngestionHistoricalConsumer } from './ingestion-queues/analytics-events-ingestion-historical-consumer' import { startAnalyticsEventsIngestionOverflowConsumer } from './ingestion-queues/analytics-events-ingestion-overflow-consumer' -import { - PIPELINES, - PipelineType, - startEventsIngestionPipelineConsumer, -} from './ingestion-queues/events-ingestion-consumer' +import { PIPELINES, startEventsIngestionPipelineConsumer } from './ingestion-queues/events-ingestion-consumer' import { startJobsConsumer } from './ingestion-queues/jobs-consumer' -import { IngestionConsumer, KafkaJSIngestionConsumer } from './ingestion-queues/kafka-queue' import { startAsyncOnEventHandlerConsumer, startAsyncWebhooksHandlerConsumer, @@ -61,11 +55,8 @@ CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec const { version } = require('../../package.json') -// TODO: refactor this into a class, removing the need for many different Servers export type ServerInstance = { - hub: Hub - piscina: Piscina - queue: KafkaJSIngestionConsumer | IngestionConsumer | null + hub?: Hub stop: () => Promise } @@ -78,7 +69,7 @@ export async function startPluginsServer( config: Partial, makePiscina: (serverConfig: PluginsServerConfig, hub: Hub) => Promise = defaultMakePiscina, capabilities?: PluginServerCapabilities -): Promise> { +): Promise { const timer = new Date() const serverConfig: PluginsServerConfig = { @@ -90,27 +81,20 @@ export async function startPluginsServer( status.info('ℹī¸', `${serverConfig.WORKER_CONCURRENCY} workers, ${serverConfig.TASKS_PER_WORKER} tasks per worker`) runStartupProfiles(serverConfig) - // Structure containing initialized clients for Postgres, Kafka, Redis, etc. - let hub: Hub | undefined - // Used to trigger reloads of plugin code/config let pubSub: PubSub | undefined // A Node Worker Thread pool let piscina: Piscina | undefined - const shutdownCallbacks: (() => Promise)[] = [] + const services: PluginServerService[] = [] // Kafka consumer. Handles events that we couldn't find an existing person // to associate. The buffer handles delaying the ingestion of these events // (default 60 seconds) to allow for the person to be created in the // meantime. let httpServer: Server | undefined // server - let graphileWorker: GraphileWorker | undefined - - let closeHub: (() => Promise) | undefined - let lastActivityCheck: NodeJS.Timeout | undefined let stopEventLoopMetrics: (() => void) | undefined @@ -134,7 +118,7 @@ export async function startPluginsServer( await Promise.allSettled([ pubSub?.stop(), graphileWorker?.stop(), - ...shutdownCallbacks.map((cb) => cb()), + ...services.map((service) => service.onShutdown()), posthog.shutdownAsync(), ]) @@ -142,20 +126,9 @@ export async function startPluginsServer( await stopPiscina(piscina) } - await closeHub?.() - } - - // If join rejects or throws, then the consumer is unhealthy and we should shut down the process. - // Ideally we would also join all the other background tasks as well to ensure we stop the - // server if we hit any errors and don't end up with zombie instances, but I'll leave that - // refactoring for another time. Note that we have the liveness health checks already, so in K8s - // cases zombies should be reaped anyway, albeit not in the most efficient way. - function shutdownOnConsumerExit(consumer: BatchConsumer) { - consumer.join().catch(async (error) => { - status.error('đŸ’Ĩ', 'Unexpected task joined!', { error: error.stack ?? error }) - await closeJobs() - process.exit(1) - }) + if (serverInstance.hub) { + await closeHub(serverInstance.hub) + } } for (const signal of ['SIGINT', 'SIGTERM', 'SIGHUP']) { @@ -218,13 +191,17 @@ export async function startPluginsServer( }) capabilities = capabilities ?? getPluginServerCapabilities(serverConfig) - let serverInstance: (Partial & Pick) | undefined + const serverInstance: ServerInstance = { + stop: closeJobs, + } - // A collection of healthchecks that should be used to validate the - // health of the plugin-server. These are used by the /_health endpoint - // to determine if we should trigger a restart of the pod. These should - // be super lightweight and ideally not do any IO. - const healthChecks: { [service: string]: () => Promise | boolean } = {} + const setupHub = async (): Promise => { + if (!serverInstance.hub) { + serverInstance.hub = await createHub(serverConfig, capabilities) + } + + return serverInstance.hub + } // Creating a dedicated single-connection redis client to this Redis, as it's not relevant for hobby // and cloud deploys don't have concurrent uses. We should abstract multi-Redis into a router util. @@ -245,8 +222,7 @@ export async function startPluginsServer( // 4. conversion_events_buffer // if (capabilities.processPluginJobs || capabilities.pluginScheduledTasks) { - ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities) - serverInstance = serverInstance ? serverInstance : { hub } + const hub = await setupHub() graphileWorker = new GraphileWorker(hub) // `connectProducer` just runs the PostgreSQL migrations. Ideally it @@ -263,113 +239,93 @@ export async function startPluginsServer( status.info('👷', 'Graphile worker is ready!') if (capabilities.pluginScheduledTasks) { - const schedulerTasksConsumer = await startScheduledTasksConsumer({ - piscina: piscina, - producer: hub.kafkaProducer, - kafka: hub.kafka, - serverConfig, - partitionConcurrency: serverConfig.KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY, - }) - shutdownCallbacks.push(async () => await schedulerTasksConsumer.disconnect()) + services.push( + await startScheduledTasksConsumer({ + piscina: piscina, + producer: hub.kafkaProducer, + kafka: hub.kafka, + serverConfig, + partitionConcurrency: serverConfig.KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY, + }) + ) } if (capabilities.processPluginJobs) { - const jobsConsumer = await startJobsConsumer({ - kafka: hub.kafka, - producer: hub.kafkaProducer, - graphileWorker: graphileWorker, - serverConfig, - }) - shutdownCallbacks.push(async () => await jobsConsumer.disconnect()) + services.push( + await startJobsConsumer({ + kafka: hub.kafka, + producer: hub.kafkaProducer, + graphileWorker: graphileWorker, + serverConfig, + }) + ) } } if (capabilities.ingestion) { - ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities) - serverInstance = serverInstance ? serverInstance : { hub } - + const hub = await setupHub() piscina = piscina ?? (await makePiscina(serverConfig, hub)) - const { queue, isHealthy } = await startAnalyticsEventsIngestionConsumer({ - hub: hub, - }) - - serverInstance.queue = queue // only used by tests - shutdownOnConsumerExit(queue.consumer!) - healthChecks['analytics-ingestion'] = isHealthy - shutdownCallbacks.push(async () => await queue.stop()) + services.push( + await startAnalyticsEventsIngestionConsumer({ + hub: hub, + }) + ) } if (capabilities.ingestionHistorical) { - ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities) - serverInstance = serverInstance ? serverInstance : { hub } - + const hub = await setupHub() piscina = piscina ?? (await makePiscina(serverConfig, hub)) - const { queue, isHealthy } = await startAnalyticsEventsIngestionHistoricalConsumer({ - hub: hub, - }) - - shutdownOnConsumerExit(queue.consumer!) - healthChecks['analytics-ingestion-historical'] = isHealthy - shutdownCallbacks.push(async () => await queue.stop()) + services.push( + await startAnalyticsEventsIngestionHistoricalConsumer({ + hub: hub, + }) + ) } if (capabilities.eventsIngestionPipelines) { - async function start(pipelineKey: string, pipeline: PipelineType) { - ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities) - serverInstance = serverInstance ? serverInstance : { hub } - piscina = piscina ?? (await makePiscina(serverConfig, hub)) - const { queue, isHealthy: isHealthy } = await startEventsIngestionPipelineConsumer({ - hub: hub, - pipeline: pipeline, - }) + const pipelinesToRun = + serverConfig.PLUGIN_SERVER_EVENTS_INGESTION_PIPELINE === null + ? Object.keys(PIPELINES) + : [serverConfig.PLUGIN_SERVER_EVENTS_INGESTION_PIPELINE] - shutdownCallbacks.push(async () => await queue.stop()) - shutdownOnConsumerExit(queue!.consumer!) - healthChecks[`events-ingestion-pipeline-${pipelineKey}`] = isHealthy - } - if (serverConfig.PLUGIN_SERVER_EVENTS_INGESTION_PIPELINE === null) { - for (const pipelineKey in PIPELINES) { - await start(pipelineKey, PIPELINES[pipelineKey]) - } - } else { - // Validate we have a valid pipeline - const pipelineKey = serverConfig.PLUGIN_SERVER_EVENTS_INGESTION_PIPELINE + for (const pipelineKey of pipelinesToRun) { if (pipelineKey === null || !PIPELINES[pipelineKey]) { throw new Error(`Invalid events ingestion pipeline: ${pipelineKey}`) } - const pipeline: PipelineType = PIPELINES[pipelineKey] - await start(pipelineKey, pipeline) + + const hub = await setupHub() + piscina = piscina ?? (await makePiscina(serverConfig, hub)) + services.push( + await startEventsIngestionPipelineConsumer({ + hub: hub, + pipelineKey: pipelineKey, + }) + ) } } if (capabilities.ingestionOverflow) { - ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities) - serverInstance = serverInstance ? serverInstance : { hub } - + const hub = await setupHub() piscina = piscina ?? (await makePiscina(serverConfig, hub)) - const { queue, isHealthy } = await startAnalyticsEventsIngestionOverflowConsumer({ - hub: hub, - }) - - shutdownCallbacks.push(async () => await queue.stop()) - shutdownOnConsumerExit(queue.consumer!) - healthChecks['analytics-ingestion-overflow'] = isHealthy + services.push( + await startAnalyticsEventsIngestionOverflowConsumer({ + hub: hub, + }) + ) } if (capabilities.processAsyncOnEventHandlers) { - ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities) - serverInstance = serverInstance ? serverInstance : { hub } - + const hub = await setupHub() piscina = piscina ?? (await makePiscina(serverConfig, hub)) - const { queue, isHealthy } = await startAsyncOnEventHandlerConsumer({ - hub: hub, - }) - - shutdownCallbacks.push(async () => await queue.stop()) - healthChecks['on-event-ingestion'] = isHealthy + services.push( + await startAsyncOnEventHandlerConsumer({ + hub: hub, + }) + ) } if (capabilities.processAsyncWebhooksHandlers) { + const hub = serverInstance.hub // If we have a hub, then reuse some of it's attributes, otherwise // we need to create them. We only initialize the ones we need. const postgres = hub?.postgres ?? new PostgresRouter(serverConfig) @@ -390,31 +346,30 @@ export async function startPluginsServer( const actionMatcher = hub?.actionMatcher ?? new ActionMatcher(postgres, actionManager, teamManager) const groupTypeManager = new GroupTypeManager(postgres, teamManager, serverConfig.SITE_URL) - const { stop, isHealthy } = await startAsyncWebhooksHandlerConsumer({ - postgres, - kafka, - teamManager, - organizationManager, - serverConfig, - rustyHook, - appMetrics, - actionMatcher, - actionManager, - groupTypeManager, - }) - - shutdownCallbacks.push(async () => await stop()) - healthChecks['webhooks-ingestion'] = isHealthy + services.push( + await startAsyncWebhooksHandlerConsumer({ + postgres, + kafka, + teamManager, + organizationManager, + serverConfig, + rustyHook, + appMetrics, + actionMatcher, + actionManager, + groupTypeManager, + }) + ) } if (capabilities.syncInlinePlugins) { - ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities) - serverInstance = serverInstance ? serverInstance : { hub } + const hub = await setupHub() await syncInlinePlugins(hub) } - if (hub && serverInstance) { + if (serverInstance.hub) { + const hub = serverInstance.hub pubSub = new PubSub(hub, { [hub.PLUGINS_RELOAD_PUBSUB_CHANNEL]: async () => { status.info('⚡', 'Reloading plugins!') @@ -445,17 +400,14 @@ export async function startPluginsServer( startPreflightSchedules(hub) } - serverInstance.piscina = piscina serverInstance.stop = closeJobs pluginServerStartupTimeMs.inc(Date.now() - timer.valueOf()) status.info('🚀', 'All systems go') - - hub.lastActivity = new Date().valueOf() - hub.lastActivityType = 'serverStart' } if (capabilities.sessionRecordingBlobIngestion) { + const hub = serverInstance.hub const recordingConsumerConfig = sessionRecordingConsumerConfig(serverConfig) const postgres = hub?.postgres ?? new PostgresRouter(serverConfig) const s3 = hub?.objectStorage ?? getObjectStorage(recordingConsumerConfig) @@ -467,16 +419,16 @@ export async function startPluginsServer( const ingester = new SessionRecordingIngester(serverConfig, postgres, s3, false, captureRedis) await ingester.start() - const batchConsumer = ingester.batchConsumer - - if (batchConsumer) { - shutdownCallbacks.push(async () => await ingester.stop()) - shutdownOnConsumerExit(batchConsumer) - healthChecks['session-recordings-blob'] = () => ingester.isHealthy() ?? false - } + services.push({ + id: 'session-recordings-blob', + onShutdown: async () => await ingester.stop(), + healthcheck: () => ingester.isHealthy() ?? false, + batchConsumer: ingester.batchConsumer, + }) } if (capabilities.sessionRecordingBlobOverflowIngestion) { + const hub = serverInstance.hub const recordingConsumerConfig = sessionRecordingConsumerConfig(serverConfig) const postgres = hub?.postgres ?? new PostgresRouter(serverConfig) const s3 = hub?.objectStorage ?? getObjectStorage(recordingConsumerConfig) @@ -488,35 +440,21 @@ export async function startPluginsServer( // NOTE: We don't pass captureRedis to disable overflow computation on the overflow topic const ingester = new SessionRecordingIngester(serverConfig, postgres, s3, true, undefined) await ingester.start() - - const batchConsumer = ingester.batchConsumer - - if (batchConsumer) { - shutdownCallbacks.push(async () => await ingester.stop()) - shutdownOnConsumerExit(batchConsumer) - healthChecks['session-recordings-blob-overflow'] = () => ingester.isHealthy() ?? false - } + services.push(ingester.service) } if (capabilities.cdpProcessedEvents) { - ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities) + const hub = await setupHub() const consumer = new CdpProcessedEventsConsumer(hub) await consumer.start() - - shutdownOnConsumerExit(consumer.batchConsumer!) - shutdownCallbacks.push(async () => await consumer.stop()) - healthChecks['cdp-processed-events'] = () => consumer.isHealthy() ?? false + services.push(consumer.service) } if (capabilities.cdpFunctionCallbacks) { - ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities) + const hub = await setupHub() const consumer = new CdpFunctionCallbackConsumer(hub) await consumer.start() - - shutdownOnConsumerExit(consumer.batchConsumer!) - - shutdownCallbacks.push(async () => await consumer.stop()) - healthChecks['cdp-function-callbacks'] = () => consumer.isHealthy() ?? false + services.push(consumer.service) // NOTE: The function callback service is more idle so can handle http requests as well if (capabilities.http) { @@ -526,20 +464,18 @@ export async function startPluginsServer( } if (capabilities.cdpFunctionOverflow) { - ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities) + const hub = await setupHub() const consumer = new CdpOverflowConsumer(hub) await consumer.start() - - shutdownOnConsumerExit(consumer.batchConsumer!) - shutdownCallbacks.push(async () => await consumer.stop()) - healthChecks['cdp-overflow'] = () => consumer.isHealthy() ?? false + services.push(consumer.service) } if (capabilities.cdpCyclotronWorker) { - ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities) + const hub = await setupHub() if (hub.CYCLOTRON_DATABASE_URL) { const worker = new CdpCyclotronWorker(hub) await worker.start() + services.push(worker.service) } else { // This is a temporary solution until we *require* Cyclotron to be configured. status.warn('đŸ’Ĩ', 'CYCLOTRON_DATABASE_URL is not set, not running Cyclotron worker') @@ -547,14 +483,28 @@ export async function startPluginsServer( } if (capabilities.http) { - const app = setupCommonRoutes(healthChecks, serverInstance?.queue ?? undefined) + const app = setupCommonRoutes(services) httpServer = app.listen(serverConfig.HTTP_SERVER_PORT, () => { status.info('đŸŠē', `Status server listening on port ${serverConfig.HTTP_SERVER_PORT}`) }) } - return serverInstance ?? { stop: closeJobs } + // If join rejects or throws, then the consumer is unhealthy and we should shut down the process. + // Ideally we would also join all the other background tasks as well to ensure we stop the + // server if we hit any errors and don't end up with zombie instances, but I'll leave that + // refactoring for another time. Note that we have the liveness health checks already, so in K8s + // cases zombies should be reaped anyway, albeit not in the most efficient way. + + services.forEach((service) => { + service.batchConsumer?.join().catch(async (error) => { + status.error('đŸ’Ĩ', 'Unexpected task joined!', { error: error.stack ?? error }) + await closeJobs() + process.exit(1) + }) + }) + + return serverInstance } catch (error) { Sentry.captureException(error) status.error('đŸ’Ĩ', 'Launchpad failure!', { error: error.stack ?? error }) diff --git a/plugin-server/src/main/services/http-server.ts b/plugin-server/src/main/services/http-server.ts index 8889f96f22032..d4bb5ee5151a8 100644 --- a/plugin-server/src/main/services/http-server.ts +++ b/plugin-server/src/main/services/http-server.ts @@ -1,7 +1,7 @@ import express, { Request, Response } from 'express' import { DateTime } from 'luxon' -import { IngestionConsumer, KafkaJSIngestionConsumer } from 'main/ingestion-queues/kafka-queue' import * as prometheus from 'prom-client' +import { PluginServerService } from 'types' import { status } from '../../utils/status' import { delay } from '../../utils/utils' @@ -14,12 +14,9 @@ export const expressApp: express.Application = express() expressApp.use(express.json()) -export function setupCommonRoutes( - healthChecks: { [service: string]: () => Promise | boolean }, - analyticsEventsIngestionConsumer?: KafkaJSIngestionConsumer | IngestionConsumer -): express.Application { - expressApp.get('/_health', buildGetHealth(healthChecks)) - expressApp.get('/_ready', buildGetReady(analyticsEventsIngestionConsumer)) +export function setupCommonRoutes(services: Pick[]): express.Application { + expressApp.get('/_health', buildGetHealth(services)) + expressApp.get('/_ready', buildGetHealth(services)) expressApp.get('/_metrics', getMetrics) expressApp.get('/metrics', getMetrics) expressApp.get('/_profile/:type', getProfileByType) @@ -28,7 +25,7 @@ export function setupCommonRoutes( } const buildGetHealth = - (healthChecks: { [service: string]: () => Promise | boolean }) => async (req: Request, res: Response) => { + (services: Pick[]) => async (req: Request, res: Response) => { // Check that all health checks pass. Note that a failure of these // _may_ result in the process being terminated by e.g. Kubernetes // so the stakes are high. @@ -58,11 +55,11 @@ const buildGetHealth = // assume that all promises have resolved. If there was a // rejected promise, the http server should catch it and return // a 500 status code. - Object.entries(healthChecks).map(async ([service, check]) => { + services.map(async (service) => { try { - return { service, status: (await check()) ? 'ok' : 'error' } + return { service: service.id, status: (await service.healthcheck()) ? 'ok' : 'error' } } catch (error) { - return { service, status: 'error', error: error.message } + return { service: service.id, status: 'error', error: error.message } } }) ) @@ -80,27 +77,6 @@ const buildGetHealth = return res.status(statusCode).json({ status: statusCode === 200 ? 'ok' : 'error', checks: checkResultsMapping }) } -const buildGetReady = - (analyticsEventsIngestionConsumer?: KafkaJSIngestionConsumer | IngestionConsumer) => - (req: Request, res: Response) => { - // Check that, if the server should have a kafka queue, - // the Kafka consumer is ready to consume messages - if (!analyticsEventsIngestionConsumer || analyticsEventsIngestionConsumer.consumerReady) { - status.info('💚', 'Server readiness check succeeded') - const responseBody = { - status: 'ok', - } - res.statusCode = 200 - return res.status(200).json(responseBody) - } - - status.info('💔', 'Server readiness check failed') - const responseBody = { - status: 'error', - } - return res.status(503).json(responseBody) - } - const getMetrics = async (req: Request, res: Response) => { try { const metrics = await prometheus.register.metrics() diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 2ca7a3e593f3b..4cf4f66deeb6b 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -13,9 +13,9 @@ import { } from '@posthog/plugin-scaffold' import { Pool as GenericPool } from 'generic-pool' import { Redis } from 'ioredis' +import { BatchConsumer } from 'kafka/batch-consumer' import { Kafka } from 'kafkajs' import { DateTime } from 'luxon' -import { Job } from 'node-schedule' import { VM } from 'vm2' import { ObjectStorage } from './main/services/object_storage' @@ -28,7 +28,6 @@ import { ActionMatcher } from './worker/ingestion/action-matcher' import { AppMetrics } from './worker/ingestion/app-metrics' import { GroupTypeManager } from './worker/ingestion/group-type-manager' import { OrganizationManager } from './worker/ingestion/organization-manager' -import { EventsProcessor } from './worker/ingestion/process-event' import { TeamManager } from './worker/ingestion/team-manager' import { RustyHook } from './worker/rusty-hook' import { PluginsApiKeyManager } from './worker/vm/extensions/helpers/api-key-manager' @@ -96,6 +95,13 @@ export const stringToPluginServerMode = Object.fromEntries( ]) ) as Record +export type PluginServerService = { + id: string + onShutdown: () => Promise + healthcheck: () => boolean | Promise + batchConsumer?: BatchConsumer +} + export type CdpConfig = { CDP_WATCHER_COST_ERROR: number // The max cost of an erroring function CDP_WATCHER_COST_TIMING: number // The max cost of a slow function @@ -202,10 +208,6 @@ export interface PluginsServerConfig extends CdpConfig { HEALTHCHECK_MAX_STALE_SECONDS: number // maximum number of seconds the plugin server can go without ingesting events before the healthcheck fails SITE_URL: string | null KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY: number // (advanced) how many kafka partitions the plugin server should consume from concurrently - CONVERSION_BUFFER_ENABLED: boolean - CONVERSION_BUFFER_ENABLED_TEAMS: string - CONVERSION_BUFFER_TOPIC_ENABLED_TEAMS: string - BUFFER_CONVERSION_SECONDS: number PERSON_INFO_CACHE_TTL: number KAFKA_HEALTHCHECK_SECONDS: number OBJECT_STORAGE_ENABLED: boolean // Disables or enables the use of object storage. It will become mandatory to use object storage @@ -296,9 +298,7 @@ export interface Hub extends PluginsServerConfig { clickhouse: ClickHouse kafka: Kafka kafkaProducer: KafkaProducerWrapper - objectStorage: ObjectStorage - // metrics - pluginMetricsJob: Job | undefined + objectStorage?: ObjectStorage // currently enabled plugin status plugins: Map pluginConfigs: Map @@ -312,7 +312,6 @@ export interface Hub extends PluginsServerConfig { organizationManager: OrganizationManager pluginsApiKeyManager: PluginsApiKeyManager rootAccessManager: RootAccessManager - eventsProcessor: EventsProcessor actionManager: ActionManager actionMatcher: ActionMatcher appMetrics: AppMetrics @@ -320,11 +319,6 @@ export interface Hub extends PluginsServerConfig { groupTypeManager: GroupTypeManager // geoip database, setup in workers mmdb?: ReaderModel - // diagnostics - lastActivity: number - lastActivityType: string - statelessVms: StatelessInstanceMap - conversionBufferEnabledTeams: Set // functions enqueuePluginJob: (job: EnqueuedPluginJob) => Promise // ValueMatchers used for various opt-in/out features diff --git a/plugin-server/src/utils/db/hub.ts b/plugin-server/src/utils/db/hub.ts index 096e1720dbbb3..d8f4815c0a822 100644 --- a/plugin-server/src/utils/db/hub.ts +++ b/plugin-server/src/utils/db/hub.ts @@ -27,7 +27,6 @@ import { ActionMatcher } from '../../worker/ingestion/action-matcher' import { AppMetrics } from '../../worker/ingestion/app-metrics' import { GroupTypeManager } from '../../worker/ingestion/group-type-manager' import { OrganizationManager } from '../../worker/ingestion/organization-manager' -import { EventsProcessor } from '../../worker/ingestion/process-event' import { TeamManager } from '../../worker/ingestion/team-manager' import { RustyHook } from '../../worker/rusty-hook' import { isTestEnv } from '../env-utils' @@ -75,7 +74,7 @@ export function createEventsToDropByToken(eventsToDropByTokenStr?: string): Map< export async function createHub( config: Partial = {}, capabilities: PluginServerCapabilities | null = null -): Promise<[Hub, () => Promise]> { +): Promise { status.info('ℹī¸', `Connecting to all services:`) const serverConfig: PluginsServerConfig = { @@ -88,10 +87,6 @@ export async function createHub( status.updatePrompt(serverConfig.PLUGIN_SERVER_MODE) const instanceId = new UUIDT() - const conversionBufferEnabledTeams = new Set( - serverConfig.CONVERSION_BUFFER_ENABLED_TEAMS.split(',').filter(String).map(Number) - ) - status.info('🤔', `Connecting to ClickHouse...`) const clickhouse = new ClickHouse({ // We prefer to run queries on the offline cluster. @@ -174,7 +169,7 @@ export async function createHub( }) } - const hub: Partial = { + const hub: Hub = { ...serverConfig, instanceId, capabilities, @@ -203,34 +198,31 @@ export async function createHub( rustyHook, actionMatcher, actionManager, - conversionBufferEnabledTeams, pluginConfigsToSkipElementsParsing: buildIntegerMatcher(process.env.SKIP_ELEMENTS_PARSING_PLUGINS, true), eventsToDropByToken: createEventsToDropByToken(process.env.DROP_EVENTS_BY_TOKEN_DISTINCT_ID), + appMetrics: new AppMetrics( + kafkaProducer, + serverConfig.APP_METRICS_FLUSH_FREQUENCY_MS, + serverConfig.APP_METRICS_FLUSH_MAX_QUEUE_SIZE + ), } - // :TODO: This is only used on worker threads, not main - hub.eventsProcessor = new EventsProcessor(hub as Hub) - - hub.appMetrics = new AppMetrics( - kafkaProducer, - serverConfig.APP_METRICS_FLUSH_FREQUENCY_MS, - serverConfig.APP_METRICS_FLUSH_MAX_QUEUE_SIZE - ) + return hub as Hub +} - const closeHub = async () => { - if (!isTestEnv()) { - await hub.appMetrics?.flush() - } - await Promise.allSettled([kafkaProducer.disconnect(), redisPool.drain(), hub.postgres?.end()]) - await redisPool.clear() +export const closeHub = async (hub: Hub): Promise => { + if (!isTestEnv()) { + await hub.appMetrics?.flush() + } + await Promise.allSettled([hub.kafkaProducer.disconnect(), hub.redisPool.drain(), hub.postgres?.end()]) + await hub.redisPool.clear() + if (isTestEnv()) { // Break circular references to allow the hub to be GCed when running unit tests // TODO: change these structs to not directly reference the hub - hub.eventsProcessor = undefined - hub.appMetrics = undefined + ;(hub as any).eventsProcessor = undefined + ;(hub as any).appMetrics = undefined } - - return [hub as Hub, closeHub] } export type KafkaConfig = { diff --git a/plugin-server/src/utils/utils.ts b/plugin-server/src/utils/utils.ts index a49a5161b4b3a..6bb263ec307c7 100644 --- a/plugin-server/src/utils/utils.ts +++ b/plugin-server/src/utils/utils.ts @@ -17,7 +17,6 @@ import { RedisPool, TimestampFormat, } from '../types' -import { Hub } from './../types' import { status } from './status' /** Time until autoexit (due to error) gives up on graceful exit and kills the process right away. */ @@ -577,31 +576,6 @@ export class RaceConditionError extends Error { name = 'RaceConditionError' } -export interface StalenessCheckResult { - isServerStale: boolean - timeSinceLastActivity: number | null - lastActivity?: string | null - lastActivityType?: string - instanceId?: string -} - -export function stalenessCheck(hub: Hub | undefined, stalenessSeconds: number): StalenessCheckResult { - let isServerStale = false - - const timeSinceLastActivity = hub?.lastActivity ? new Date().valueOf() - hub.lastActivity : null - if (timeSinceLastActivity && timeSinceLastActivity > stalenessSeconds * 1000) { - isServerStale = true - } - - return { - isServerStale, - timeSinceLastActivity, - instanceId: hub?.instanceId.toString(), - lastActivity: hub?.lastActivity ? new Date(hub.lastActivity).toISOString() : null, - lastActivityType: hub?.lastActivityType, - } -} - /** Get a value from a properties object by its path. This allows accessing nested properties. */ export function getPropertyValueByPath(properties: Properties, [firstKey, ...nestedKeys]: string[]): any { if (firstKey === undefined) { diff --git a/plugin-server/src/worker/ingestion/event-pipeline/createEventStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/createEventStep.ts index ce1a8438c152c..ea717cfa565dd 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/createEventStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/createEventStep.ts @@ -7,5 +7,5 @@ export function createEventStep( person: Person, processPerson: boolean ): [RawClickHouseEvent, Promise] { - return runner.hub.eventsProcessor.createEvent(event, person, processPerson) + return runner.eventsProcessor.createEvent(event, person, processPerson) } diff --git a/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts index b097643e0ca5f..6725bbf345661 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts @@ -19,7 +19,7 @@ export async function prepareEventStep( tsParsingIngestionWarnings.push(captureIngestionWarning(runner.hub.db.kafkaProducer, team_id, type, details)) } - const preIngestionEvent = await runner.hub.eventsProcessor.processEvent( + const preIngestionEvent = await runner.eventsProcessor.processEvent( String(event.distinct_id), event, team_id, diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts index 0518902516410..5d61a4a11a4fc 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts @@ -8,6 +8,7 @@ import { DependencyUnavailableError } from '../../../utils/db/error' import { timeoutGuard } from '../../../utils/db/utils' import { normalizeProcessPerson } from '../../../utils/event' import { status } from '../../../utils/status' +import { EventsProcessor } from '../process-event' import { captureIngestionWarning, generateEventDeadLetterQueueMessage } from '../utils' import { createEventStep } from './createEventStep' import { enrichExceptionEventStep } from './enrichExceptionEventStep' @@ -50,10 +51,12 @@ class StepErrorNoRetry extends Error { export class EventPipelineRunner { hub: Hub originalEvent: PipelineEvent + eventsProcessor: EventsProcessor - constructor(hub: Hub, event: PipelineEvent) { + constructor(hub: Hub, event: PipelineEvent, eventProcessor: EventsProcessor) { this.hub = hub this.originalEvent = event + this.eventsProcessor = eventProcessor } isEventDisallowed(event: PipelineEvent): boolean { diff --git a/plugin-server/tests/cdp/cdp-function-callbacks-consumer.test.ts b/plugin-server/tests/cdp/cdp-function-callbacks-consumer.test.ts index ec1d9338f7751..278132739e951 100644 --- a/plugin-server/tests/cdp/cdp-function-callbacks-consumer.test.ts +++ b/plugin-server/tests/cdp/cdp-function-callbacks-consumer.test.ts @@ -5,7 +5,7 @@ import { CdpApi } from '../../src/cdp/cdp-api' import { CdpFunctionCallbackConsumer } from '../../src/cdp/cdp-consumers' import { HogFunctionType } from '../../src/cdp/types' import { Hub, Team } from '../../src/types' -import { createHub } from '../../src/utils/db/hub' +import { closeHub, createHub } from '../../src/utils/db/hub' import { getFirstTeam, resetTestDatabase } from '../helpers/sql' import { HOG_EXAMPLES, HOG_FILTERS_EXAMPLES, HOG_INPUTS_EXAMPLES } from './examples' import { insertHogFunction as _insertHogFunction } from './fixtures' @@ -67,7 +67,6 @@ jest.setTimeout(1000) describe('CDP Processed Events Consuner', () => { let processor: CdpFunctionCallbackConsumer let hub: Hub - let closeHub: () => Promise let team: Team const insertHogFunction = async (hogFunction: Partial) => { @@ -79,7 +78,7 @@ describe('CDP Processed Events Consuner', () => { beforeEach(async () => { await resetTestDatabase() - ;[hub, closeHub] = await createHub() + hub = await createHub() team = await getFirstTeam(hub) processor = new CdpFunctionCallbackConsumer(hub) @@ -92,7 +91,7 @@ describe('CDP Processed Events Consuner', () => { afterEach(async () => { jest.setTimeout(10000) await processor.stop() - await closeHub() + await closeHub(hub) }) afterAll(() => { diff --git a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts index 8a4c87efa4450..a5534036f848b 100644 --- a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts +++ b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts @@ -1,7 +1,7 @@ import { CdpProcessedEventsConsumer } from '../../src/cdp/cdp-consumers' import { HogFunctionType } from '../../src/cdp/types' import { Hub, Team } from '../../src/types' -import { createHub } from '../../src/utils/db/hub' +import { closeHub, createHub } from '../../src/utils/db/hub' import { getFirstTeam, resetTestDatabase } from '../helpers/sql' import { HOG_EXAMPLES, HOG_FILTERS_EXAMPLES, HOG_INPUTS_EXAMPLES } from './examples' import { createIncomingEvent, createMessage, insertHogFunction as _insertHogFunction } from './fixtures' @@ -74,7 +74,6 @@ const decodeKafkaMessage = (message: any): any => { describe('CDP Processed Events Consuner', () => { let processor: CdpProcessedEventsConsumer let hub: Hub - let closeHub: () => Promise let team: Team const insertHogFunction = async (hogFunction: Partial) => { @@ -86,7 +85,7 @@ describe('CDP Processed Events Consuner', () => { beforeEach(async () => { await resetTestDatabase() - ;[hub, closeHub] = await createHub() + hub = await createHub() team = await getFirstTeam(hub) processor = new CdpProcessedEventsConsumer(hub) @@ -98,7 +97,7 @@ describe('CDP Processed Events Consuner', () => { afterEach(async () => { jest.setTimeout(10000) await processor.stop() - await closeHub() + await closeHub(hub) }) afterAll(() => { diff --git a/plugin-server/tests/cdp/hog-function-manager.test.ts b/plugin-server/tests/cdp/hog-function-manager.test.ts index ee4b9ded89f4e..2ed100711a74d 100644 --- a/plugin-server/tests/cdp/hog-function-manager.test.ts +++ b/plugin-server/tests/cdp/hog-function-manager.test.ts @@ -1,14 +1,13 @@ import { HogFunctionManager } from '../../src/cdp/hog-function-manager' import { HogFunctionType, IntegrationType } from '../../src/cdp/types' import { Hub } from '../../src/types' -import { createHub } from '../../src/utils/db/hub' +import { closeHub, createHub } from '../../src/utils/db/hub' import { PostgresUse } from '../../src/utils/db/postgres' import { createTeam, resetTestDatabase } from '../helpers/sql' import { insertHogFunction, insertIntegration } from './fixtures' describe('HogFunctionManager', () => { let hub: Hub - let closeServer: () => Promise let manager: HogFunctionManager let hogFunctions: HogFunctionType[] @@ -18,7 +17,7 @@ describe('HogFunctionManager', () => { let teamId2: number beforeEach(async () => { - ;[hub, closeServer] = await createHub() + hub = await createHub() await resetTestDatabase() manager = new HogFunctionManager(hub.postgres, hub) @@ -82,7 +81,7 @@ describe('HogFunctionManager', () => { }) afterEach(async () => { - await closeServer() + await closeHub(hub) }) it('returns the hog functions', async () => { diff --git a/plugin-server/tests/cdp/hog-masker.test.ts b/plugin-server/tests/cdp/hog-masker.test.ts index 9a342572ea403..7e667e64b9141 100644 --- a/plugin-server/tests/cdp/hog-masker.test.ts +++ b/plugin-server/tests/cdp/hog-masker.test.ts @@ -7,7 +7,7 @@ import { BASE_REDIS_KEY, HogMasker } from '../../src/cdp/hog-masker' import { CdpRedis, createCdpRedisPool } from '../../src/cdp/redis' import { HogFunctionType } from '../../src/cdp/types' import { Hub } from '../../src/types' -import { createHub } from '../../src/utils/db/hub' +import { closeHub, createHub } from '../../src/utils/db/hub' import { delay } from '../../src/utils/utils' import { HOG_MASK_EXAMPLES } from './examples' import { createHogExecutionGlobals, createHogFunction } from './fixtures' @@ -19,13 +19,11 @@ describe('HogMasker', () => { describe('integration', () => { let now: number let hub: Hub - let closeHub: () => Promise let masker: HogMasker let redis: CdpRedis beforeEach(async () => { - ;[hub, closeHub] = await createHub() - + hub = await createHub() now = 1720000000000 mockNow.mockReturnValue(now) @@ -46,7 +44,7 @@ describe('HogMasker', () => { } afterEach(async () => { - await closeHub() + await closeHub(hub) jest.clearAllMocks() }) diff --git a/plugin-server/tests/cdp/hog-watcher.test.ts b/plugin-server/tests/cdp/hog-watcher.test.ts index ed5fdd1646717..dfcf4128c8187 100644 --- a/plugin-server/tests/cdp/hog-watcher.test.ts +++ b/plugin-server/tests/cdp/hog-watcher.test.ts @@ -7,7 +7,7 @@ import { BASE_REDIS_KEY, HogWatcher, HogWatcherState } from '../../src/cdp/hog-w import { CdpRedis, createCdpRedisPool } from '../../src/cdp/redis' import { HogFunctionInvocationResult } from '../../src/cdp/types' import { Hub } from '../../src/types' -import { createHub } from '../../src/utils/db/hub' +import { closeHub, createHub } from '../../src/utils/db/hub' import { delay } from '../../src/utils/utils' import { deleteKeysWithPrefix } from './helpers/redis' @@ -42,13 +42,12 @@ describe('HogWatcher', () => { describe('integration', () => { let now: number let hub: Hub - let closeHub: () => Promise let watcher: HogWatcher let mockStateChangeCallback: jest.Mock let redis: CdpRedis beforeEach(async () => { - ;[hub, closeHub] = await createHub() + hub = await createHub() now = 1720000000000 mockNow.mockReturnValue(now) @@ -72,7 +71,7 @@ describe('HogWatcher', () => { afterEach(async () => { jest.useRealTimers() - await closeHub() + await closeHub(hub) jest.clearAllMocks() }) diff --git a/plugin-server/tests/http-server.test.ts b/plugin-server/tests/http-server.test.ts index 3900168cd2039..de28222ea116c 100644 --- a/plugin-server/tests/http-server.test.ts +++ b/plugin-server/tests/http-server.test.ts @@ -75,7 +75,6 @@ describe('http server', () => { }) ) - expect(pluginsServer.queue?.consumerReady).toBeTruthy() await pluginsServer.stop() }) }) diff --git a/plugin-server/tests/main/capabilities.test.ts b/plugin-server/tests/main/capabilities.test.ts index 37dca3160e01f..1d249f5b3387b 100644 --- a/plugin-server/tests/main/capabilities.test.ts +++ b/plugin-server/tests/main/capabilities.test.ts @@ -2,7 +2,7 @@ import { GraphileWorker } from '../../src/main/graphile-worker/graphile-worker' import { startGraphileWorker } from '../../src/main/graphile-worker/worker-setup' import { Hub, LogLevel } from '../../src/types' import { PluginServerMode, stringToPluginServerMode } from '../../src/types' -import { createHub } from '../../src/utils/db/hub' +import { closeHub, createHub } from '../../src/utils/db/hub' import Piscina from '../../src/worker/piscina' jest.mock('../../src/main/ingestion-queues/kafka-queue') @@ -21,17 +21,16 @@ describe('stringToPluginServerMode', () => { describe('capabilities', () => { let hub: Hub let piscina: Piscina - let closeHub: () => Promise beforeEach(async () => { - ;[hub, closeHub] = await createHub({ + hub = await createHub({ LOG_LEVEL: LogLevel.Warn, }) piscina = { run: jest.fn(), on: jest.fn() } as any }) afterEach(async () => { - await closeHub() + await closeHub(hub) }) describe('startGraphileWorker()', () => { diff --git a/plugin-server/tests/main/db.test.ts b/plugin-server/tests/main/db.test.ts index 364f014654ede..ff6aef1c5d855 100644 --- a/plugin-server/tests/main/db.test.ts +++ b/plugin-server/tests/main/db.test.ts @@ -5,7 +5,7 @@ import { defaultConfig } from '../../src/config/config' import { Hub, Person, PropertyOperator, PropertyUpdateOperation, RawAction, Team } from '../../src/types' import { DB } from '../../src/utils/db/db' import { DependencyUnavailableError } from '../../src/utils/db/error' -import { createHub } from '../../src/utils/db/hub' +import { closeHub, createHub } from '../../src/utils/db/hub' import { PostgresRouter, PostgresUse } from '../../src/utils/db/postgres' import { generateKafkaPersonUpdateMessage } from '../../src/utils/db/utils' import { RaceConditionError, UUIDT } from '../../src/utils/utils' @@ -17,11 +17,10 @@ jest.mock('../../src/utils/status') describe('DB', () => { let hub: Hub - let closeServer: () => Promise let db: DB beforeEach(async () => { - ;[hub, closeServer] = await createHub() + hub = await createHub() await resetTestDatabase(undefined, {}, {}, { withExtendedTestData: false }) db = hub.db @@ -31,7 +30,7 @@ describe('DB', () => { }) afterEach(async () => { - await closeServer() + await closeHub(hub) jest.clearAllMocks() }) diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts index 0aac89c33b289..ef43415544733 100644 --- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts @@ -8,7 +8,7 @@ import { IngestionOverflowMode, } from '../../../src/main/ingestion-queues/batch-processing/each-batch-ingestion' import { Hub } from '../../../src/types' -import { createHub } from '../../../src/utils/db/hub' +import { closeHub, createHub } from '../../../src/utils/db/hub' import { ConfiguredLimiter } from '../../../src/utils/token-bucket' import { captureIngestionWarning } from './../../../src/worker/ingestion/utils' @@ -53,7 +53,6 @@ const captureEndpointEvent2 = { describe('eachBatchParallelIngestion with overflow reroute', () => { let hub: Hub - let closeServer: () => Promise let queue: any function createBatchWithMultipleEvents(events: any[], timestamp?: any, withKey = true): Message[] { @@ -69,7 +68,7 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { } beforeEach(async () => { - ;[hub, closeServer] = await createHub() + hub = await createHub() queue = { bufferSleep: jest.fn(), pluginsServer: hub, @@ -78,7 +77,7 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { }) afterEach(async () => { - await closeServer() + await closeHub(hub) jest.clearAllMocks() }) diff --git a/plugin-server/tests/main/ingestion-queues/each-batch-webhooks.test.ts b/plugin-server/tests/main/ingestion-queues/each-batch-webhooks.test.ts index 63092e94ede3f..ef1dbca1e887c 100644 --- a/plugin-server/tests/main/ingestion-queues/each-batch-webhooks.test.ts +++ b/plugin-server/tests/main/ingestion-queues/each-batch-webhooks.test.ts @@ -1,6 +1,6 @@ import { eachMessageWebhooksHandlers } from '../../../src/main/ingestion-queues/batch-processing/each-batch-webhooks' import { ClickHouseTimestamp, ClickHouseTimestampSecondPrecision, Hub, RawClickHouseEvent } from '../../../src/types' -import { createHub } from '../../../src/utils/db/hub' +import { closeHub, createHub } from '../../../src/utils/db/hub' import { PostgresUse } from '../../../src/utils/db/postgres' import { ActionManager } from '../../../src/worker/ingestion/action-manager' import { ActionMatcher } from '../../../src/worker/ingestion/action-matcher' @@ -33,10 +33,9 @@ const clickhouseEvent: RawClickHouseEvent = { describe('eachMessageWebhooksHandlers', () => { let hub: Hub - let closeHub: () => Promise beforeEach(async () => { - ;[hub, closeHub] = await createHub() + hub = await createHub() console.warn = jest.fn() as any await resetTestDatabase() @@ -73,7 +72,7 @@ describe('eachMessageWebhooksHandlers', () => { }) afterEach(async () => { - await closeHub() + await closeHub(hub) }) it('calls runWebhooksHandlersEventPipeline', async () => { diff --git a/plugin-server/tests/main/ingestion-queues/each-batch.test.ts b/plugin-server/tests/main/ingestion-queues/each-batch.test.ts index e693267599197..cb4b33373545c 100644 --- a/plugin-server/tests/main/ingestion-queues/each-batch.test.ts +++ b/plugin-server/tests/main/ingestion-queues/each-batch.test.ts @@ -135,7 +135,6 @@ describe('eachBatchX', () => { WORKER_CONCURRENCY: 1, TASKS_PER_WORKER: 10, INGESTION_CONCURRENCY: 4, - BUFFER_CONVERSION_SECONDS: 60, kafkaProducer: { queueMessage: jest.fn(), }, diff --git a/plugin-server/tests/main/ingestion-queues/kafka-queue.test.ts b/plugin-server/tests/main/ingestion-queues/kafka-queue.test.ts index 4f5f50cb7dd12..a9a1f145e8f4f 100644 --- a/plugin-server/tests/main/ingestion-queues/kafka-queue.test.ts +++ b/plugin-server/tests/main/ingestion-queues/kafka-queue.test.ts @@ -23,8 +23,6 @@ const extraServerConfig: Partial = { WORKER_CONCURRENCY: 1, KAFKA_CONSUMPTION_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION, LOG_LEVEL: LogLevel.Log, - CONVERSION_BUFFER_ENABLED: true, - BUFFER_CONVERSION_SECONDS: 1, } // TODO: merge these tests with postgres/e2e.test.ts diff --git a/plugin-server/tests/main/ingestion-queues/run-async-handlers-event-pipeline.test.ts b/plugin-server/tests/main/ingestion-queues/run-async-handlers-event-pipeline.test.ts index 064d889ad62dc..f84d05f649f6b 100644 --- a/plugin-server/tests/main/ingestion-queues/run-async-handlers-event-pipeline.test.ts +++ b/plugin-server/tests/main/ingestion-queues/run-async-handlers-event-pipeline.test.ts @@ -23,7 +23,7 @@ import { KAFKA_EVENTS_JSON } from '../../../src/config/kafka-topics' import { buildOnEventIngestionConsumer } from '../../../src/main/ingestion-queues/on-event-handler-consumer' import { Hub, ISOTimestamp } from '../../../src/types' import { DependencyUnavailableError } from '../../../src/utils/db/error' -import { createHub } from '../../../src/utils/db/hub' +import { closeHub, createHub } from '../../../src/utils/db/hub' import { PostgresUse } from '../../../src/utils/db/postgres' import { UUIDT } from '../../../src/utils/utils' import { processOnEventStep } from '../../../src/worker/ingestion/event-pipeline/runAsyncHandlersStep' @@ -51,12 +51,11 @@ describe('runAppsOnEventPipeline()', () => { let hub: Hub let redis: Redis.Redis - let closeHub: () => Promise beforeEach(async () => { // Use fake timers to ensure that we don't need to wait on e.g. retry logic. jest.useFakeTimers({ advanceTimers: true }) - ;[hub, closeHub] = await createHub() + hub = await createHub() redis = await hub.redisPool.acquire() await hub.postgres.query(PostgresUse.COMMON_WRITE, POSTGRES_DELETE_TABLES_QUERY, null, 'deleteTables') // Need to clear the DB to avoid unique constraint violations on ids }) @@ -64,7 +63,7 @@ describe('runAppsOnEventPipeline()', () => { afterEach(async () => { await hub.redisPool.release(redis) await teardownPlugins(hub) - await closeHub() + await closeHub(hub) jest.clearAllTimers() jest.useRealTimers() jest.restoreAllMocks() @@ -169,16 +168,15 @@ describe('eachBatchAsyncHandlers', () => { // to https://github.com/piscinajs/piscina#method-runtask-options should be // the case. let hub: Hub - let closeHub: () => Promise let piscina: Piscina beforeEach(async () => { jest.useFakeTimers({ advanceTimers: true }) - ;[hub, closeHub] = await createHub() + hub = await createHub() }) afterEach(async () => { - await closeHub?.() + await closeHub(hub) jest.useRealTimers() }) diff --git a/plugin-server/tests/main/ingestion-queues/run-ingestion-pipeline.test.ts b/plugin-server/tests/main/ingestion-queues/run-ingestion-pipeline.test.ts index fa5c000fd9ae1..28d18b2cdd1c3 100644 --- a/plugin-server/tests/main/ingestion-queues/run-ingestion-pipeline.test.ts +++ b/plugin-server/tests/main/ingestion-queues/run-ingestion-pipeline.test.ts @@ -3,20 +3,20 @@ import { Pool } from 'pg' import { Hub } from '../../../src/types' import { DependencyUnavailableError } from '../../../src/utils/db/error' -import { createHub } from '../../../src/utils/db/hub' +import { closeHub, createHub } from '../../../src/utils/db/hub' import { PostgresUse } from '../../../src/utils/db/postgres' import { UUIDT } from '../../../src/utils/utils' import { EventPipelineRunner } from '../../../src/worker/ingestion/event-pipeline/runner' +import { EventsProcessor } from '../../../src/worker/ingestion/process-event' import { createOrganization, createTeam, POSTGRES_DELETE_TABLES_QUERY } from '../../helpers/sql' describe('workerTasks.runEventPipeline()', () => { let hub: Hub let redis: Redis.Redis - let closeHub: () => Promise const OLD_ENV = process.env beforeAll(async () => { - ;[hub, closeHub] = await createHub() + hub = await createHub() redis = await hub.redisPool.acquire() await hub.postgres.query(PostgresUse.COMMON_WRITE, POSTGRES_DELETE_TABLES_QUERY, undefined, '') // Need to clear the DB to avoid unique constraint violations on ids process.env = { ...OLD_ENV } // Make a copy @@ -24,7 +24,7 @@ describe('workerTasks.runEventPipeline()', () => { afterAll(async () => { await hub.redisPool.release(redis) - await closeHub() + await closeHub(hub) process.env = OLD_ENV // Restore old environment }) @@ -59,9 +59,9 @@ describe('workerTasks.runEventPipeline()', () => { now: new Date().toISOString(), uuid: new UUIDT().toString(), } - await expect(new EventPipelineRunner(hub, event).runEventPipeline(event)).rejects.toEqual( - new DependencyUnavailableError(errorMessage, 'Postgres', new Error(errorMessage)) - ) + await expect( + new EventPipelineRunner(hub, event, new EventsProcessor(hub)).runEventPipeline(event) + ).rejects.toEqual(new DependencyUnavailableError(errorMessage, 'Postgres', new Error(errorMessage))) pgQueryMock.mockRestore() }) }) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/services/offset-high-water-mark.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/services/offset-high-water-mark.test.ts index 6f5493de3ac11..33ac09de4f5e1 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/services/offset-high-water-mark.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/services/offset-high-water-mark.test.ts @@ -6,12 +6,11 @@ import { OffsetHighWaterMarks, } from '../../../../../src/main/ingestion-queues/session-recording/services/offset-high-water-marker' import { Hub } from '../../../../../src/types' -import { createHub } from '../../../../../src/utils/db/hub' +import { closeHub, createHub } from '../../../../../src/utils/db/hub' describe('session offset high-water mark', () => { jest.setTimeout(1000) let hub: Hub - let closeHub: () => Promise const keyPrefix = 'test-high-water-mark' let offsetHighWaterMarker: OffsetHighWaterMarker @@ -41,13 +40,13 @@ describe('session offset high-water mark', () => { } beforeEach(async () => { - ;[hub, closeHub] = await createHub() + hub = await createHub() offsetHighWaterMarker = new OffsetHighWaterMarker(hub.redisPool, keyPrefix) }) afterEach(async () => { await deletePrefixedKeys() - await closeHub() + await closeHub(hub) }) const expectMemoryAndRedisToEqual = async (tp: TopicPartition, toEqual: any) => { diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/services/overflow-manager.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/services/overflow-manager.test.ts index 11ca8f48b86c1..63b507a2dff03 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/services/overflow-manager.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/services/overflow-manager.test.ts @@ -2,7 +2,7 @@ import { Redis } from 'ioredis' import { OverflowManager } from '../../../../../src/main/ingestion-queues/session-recording/services/overflow-manager' import { Hub } from '../../../../../src/types' -import { createHub } from '../../../../../src/utils/db/hub' +import { closeHub, createHub } from '../../../../../src/utils/db/hub' jest.mock('../../../../../src/utils/status') jest.mock('../../../../../src/kafka/producer') @@ -11,12 +11,11 @@ const CAPTURE_OVERFLOW_REDIS_KEY = '@posthog/capture-overflow/replay' describe('overflow manager', () => { let hub: Hub - let closeHub: () => Promise let redis: Redis let overflowManager: OverflowManager beforeAll(async () => { - ;[hub, closeHub] = await createHub() + hub = await createHub() redis = await hub.redisPool.acquire() }) beforeEach(async () => { @@ -27,7 +26,7 @@ describe('overflow manager', () => { afterAll(async () => { await redis.flushdb() await hub.redisPool.release(redis) - await closeHub?.() + await closeHub(hub) }) test('it does not trigger if several keys are under threshold', async () => { diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts index 8035b11e2aa95..fcb2c5bdf9c56 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts @@ -8,7 +8,7 @@ import { waitForExpect } from '../../../../functional_tests/expectations' import { defaultConfig } from '../../../../src/config/config' import { SessionRecordingIngester } from '../../../../src/main/ingestion-queues/session-recording/session-recordings-consumer' import { Hub, PluginsServerConfig, Team } from '../../../../src/types' -import { createHub } from '../../../../src/utils/db/hub' +import { closeHub, createHub } from '../../../../src/utils/db/hub' import { deleteKeysWithPrefix } from '../../../helpers/redis' import { getFirstTeam, resetTestDatabase } from '../../../helpers/sql' import { createIncomingRecordingMessage, createKafkaMessage, createTP } from './fixtures' @@ -61,7 +61,6 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve let ingester: SessionRecordingIngester let hub: Hub - let closeHub: () => Promise let team: Team let teamToken = '' let mockOffsets: Record = {} @@ -92,7 +91,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve topics: [{ name: options.topic, partitions: [{ id: 0 }, { id: 1 }, { id: 2 }] }], }) }) - ;[hub, closeHub] = await createHub() + hub = await createHub() team = await getFirstTeam(hub) teamToken = team.api_token redisConn = await hub.redisPool.acquire(0) @@ -110,7 +109,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve await redisConn.del(CAPTURE_OVERFLOW_REDIS_KEY) await hub.redisPool.release(redisConn) await deleteKeys(hub) - await closeHub() + await closeHub(hub) }) afterAll(() => { diff --git a/plugin-server/tests/main/process-event.test.ts b/plugin-server/tests/main/process-event.test.ts index e1c582becda7c..9c052ee5acebe 100644 --- a/plugin-server/tests/main/process-event.test.ts +++ b/plugin-server/tests/main/process-event.test.ts @@ -21,7 +21,7 @@ import { PropertyDefinitionTypeEnum, Team, } from '../../src/types' -import { createHub } from '../../src/utils/db/hub' +import { closeHub, createHub } from '../../src/utils/db/hub' import { PostgresUse } from '../../src/utils/db/postgres' import { personInitialAndUTMProperties } from '../../src/utils/db/utils' import { posthog } from '../../src/utils/posthog' @@ -54,8 +54,6 @@ export async function createPerson( ) } -export type ReturnWithHub = { hub?: Hub; closeHub?: () => Promise } - type EventsByPerson = [string[], string[]] export const getEventsByPerson = async (hub: Hub): Promise => { @@ -90,22 +88,10 @@ let processEventCounter = 0 let mockClientEventCounter = 0 let team: Team let hub: Hub -let closeHub: () => Promise let redis: IORedis.Redis let eventsProcessor: EventsProcessor let now = DateTime.utc() -async function createTestHub(additionalProps?: Record): Promise<[Hub, () => Promise]> { - const [hub, closeHub] = await createHub({ - ...TEST_CONFIG, - ...(additionalProps ?? {}), - }) - - redis = await hub.redisPool.acquire() - - return [hub, closeHub] -} - async function processEvent( distinctId: string, ip: string | null, @@ -126,7 +112,7 @@ async function processEvent( ...data, } as any as PluginEvent - const runner = new EventPipelineRunner(hub, pluginEvent) + const runner = new EventPipelineRunner(hub, pluginEvent, new EventsProcessor(hub)) await runner.runEventPipeline(pluginEvent) await delayUntilEventIngested(() => hub.db.fetchEvents(), ++processEventCounter) @@ -151,7 +137,10 @@ beforeEach(async () => { ` await resetTestDatabase(testCode, TEST_CONFIG) await resetTestDatabaseClickhouse(TEST_CONFIG) - ;[hub, closeHub] = await createTestHub() + + hub = await createHub({ ...TEST_CONFIG }) + redis = await hub.redisPool.acquire() + eventsProcessor = new EventsProcessor(hub) processEventCounter = 0 mockClientEventCounter = 0 @@ -168,7 +157,7 @@ beforeEach(async () => { afterEach(async () => { await hub.redisPool.release(redis) - await closeHub?.() + await closeHub(hub) }) const capture = async (hub: Hub, eventName: string, properties: any = {}) => { @@ -183,7 +172,7 @@ const capture = async (hub: Hub, eventName: string, properties: any = {}) => { team_id: team.id, uuid: new UUIDT().toString(), } - const runner = new EventPipelineRunner(hub, event) + const runner = new EventPipelineRunner(hub, event, new EventsProcessor(hub)) await runner.runEventPipeline(event) await delayUntilEventIngested(() => hub.db.fetchEvents(), ++mockClientEventCounter) } @@ -2074,7 +2063,7 @@ describe('validates eventUuid', () => { properties: { price: 299.99, name: 'AirPods Pro' }, } - const runner = new EventPipelineRunner(hub, pluginEvent) + const runner = new EventPipelineRunner(hub, pluginEvent, new EventsProcessor(hub)) const result = await runner.runEventPipeline(pluginEvent) expect(result.error).toBeDefined() @@ -2093,7 +2082,7 @@ describe('validates eventUuid', () => { properties: { price: 299.99, name: 'AirPods Pro' }, } - const runner = new EventPipelineRunner(hub, pluginEvent) + const runner = new EventPipelineRunner(hub, pluginEvent, new EventsProcessor(hub)) const result = await runner.runEventPipeline(pluginEvent) expect(result.error).toBeDefined() diff --git a/plugin-server/tests/main/teardown.test.ts b/plugin-server/tests/main/teardown.test.ts index 58137f82bf24f..0440102b83efe 100644 --- a/plugin-server/tests/main/teardown.test.ts +++ b/plugin-server/tests/main/teardown.test.ts @@ -5,6 +5,7 @@ import { waitForExpect } from '../../functional_tests/expectations' import { startPluginsServer } from '../../src/main/pluginsServer' import { Hub, LogLevel, PluginLogEntry, PluginLogEntrySource, PluginLogEntryType } from '../../src/types' import { EventPipelineRunner } from '../../src/worker/ingestion/event-pipeline/runner' +import { EventsProcessor } from '../../src/worker/ingestion/process-event' import { makePiscina } from '../../src/worker/piscina' import { pluginConfig39 } from '../helpers/plugins' import { resetTestDatabase } from '../helpers/sql' @@ -36,7 +37,7 @@ async function getLogEntriesForPluginConfig(hub: Hub, pluginConfigId: number) { describe('teardown', () => { const processEvent = async (hub: Hub, event: PluginEvent) => { - const result = await new EventPipelineRunner(hub, event).runEventPipeline(event) + const result = await new EventPipelineRunner(hub, event, new EventsProcessor(hub)).runEventPipeline(event) const resultEvent = result.args[0] return resultEvent } diff --git a/plugin-server/tests/sql.test.ts b/plugin-server/tests/sql.test.ts index d23b133b4c5bf..2003a61df3630 100644 --- a/plugin-server/tests/sql.test.ts +++ b/plugin-server/tests/sql.test.ts @@ -1,5 +1,5 @@ import { Hub } from '../src/types' -import { createHub } from '../src/utils/db/hub' +import { closeHub, createHub } from '../src/utils/db/hub' import { PostgresUse } from '../src/utils/db/postgres' import { disablePlugin, getActivePluginRows, getPluginAttachmentRows, getPluginConfigRows } from '../src/utils/db/sql' import { commonOrganizationId } from './helpers/plugins' @@ -10,15 +10,14 @@ jest.mock('../src/utils/status') describe('sql', () => { let hub: Hub - let closeHub: () => Promise beforeEach(async () => { - ;[hub, closeHub] = await createHub() + hub = await createHub() await resetTestDatabase(`const processEvent = event => event`) }) afterEach(async () => { - await closeHub() + await closeHub(hub) }) test('getPluginAttachmentRows', async () => { diff --git a/plugin-server/tests/utils/db/activity-log.test.ts b/plugin-server/tests/utils/db/activity-log.test.ts index 896b4d40634cf..d8b9c7ec1fc32 100644 --- a/plugin-server/tests/utils/db/activity-log.test.ts +++ b/plugin-server/tests/utils/db/activity-log.test.ts @@ -1,6 +1,6 @@ import { Hub } from '../../../src/types' import { createPluginActivityLog } from '../../../src/utils/db/activity-log' -import { createHub } from '../../../src/utils/db/hub' +import { closeHub, createHub } from '../../../src/utils/db/hub' import { PostgresUse } from '../../../src/utils/db/postgres' import { pluginConfig39 } from '../../helpers/plugins' import { resetTestDatabase } from '../../helpers/sql' @@ -22,15 +22,14 @@ interface ActivityLog { describe('createPluginActivityLog()', () => { let hub: Hub - let closeHub: () => Promise beforeEach(async () => { await resetTestDatabase() - ;[hub, closeHub] = await createHub({}) + hub = await createHub({}) }) afterEach(async () => { - await closeHub() + await closeHub(hub) }) async function fetchPluginActivityLogs(hub: Hub): Promise> { diff --git a/plugin-server/tests/worker/capabilities.test.ts b/plugin-server/tests/worker/capabilities.test.ts index 8376f49de584b..28ce683719246 100644 --- a/plugin-server/tests/worker/capabilities.test.ts +++ b/plugin-server/tests/worker/capabilities.test.ts @@ -1,5 +1,5 @@ import { Hub, LogLevel, PluginCapabilities } from '../../src/types' -import { createHub } from '../../src/utils/db/hub' +import { closeHub, createHub } from '../../src/utils/db/hub' import { loadSchedule } from '../../src/worker/plugins/loadSchedule' import { setupPlugins } from '../../src/worker/plugins/setup' import { getVMPluginCapabilities, shouldSetupPluginInServer } from '../../src/worker/vm/capabilities' @@ -13,16 +13,15 @@ jest.mock('../../src/worker/plugins/loadPluginsFromDB', () => ({ describe('capabilities', () => { let hub: Hub - let closeHub: () => Promise beforeAll(async () => { console.info = jest.fn() as any console.warn = jest.fn() as any - ;[hub, closeHub] = await createHub({ LOG_LEVEL: LogLevel.Warn }) + hub = await createHub({ LOG_LEVEL: LogLevel.Warn }) }) afterAll(async () => { - await closeHub() + await closeHub(hub) }) describe('getVMPluginCapabilities()', () => { diff --git a/plugin-server/tests/worker/console.test.ts b/plugin-server/tests/worker/console.test.ts index 4535f10f6e327..0237eaf682b2a 100644 --- a/plugin-server/tests/worker/console.test.ts +++ b/plugin-server/tests/worker/console.test.ts @@ -2,7 +2,7 @@ import { ConsoleExtension } from '@posthog/plugin-scaffold' import { KAFKA_PLUGIN_LOG_ENTRIES } from '../../src/config/kafka-topics' import { Hub, PluginLogEntrySource, PluginLogEntryType } from '../../src/types' -import { createHub } from '../../src/utils/db/hub' +import { closeHub, createHub } from '../../src/utils/db/hub' import { createConsole } from '../../src/worker/vm/extensions/console' import { pluginConfig39 } from '../../tests/helpers/plugins' @@ -12,14 +12,13 @@ jest.mock('../../src/utils/db/kafka-producer-wrapper') describe('console extension', () => { let hub: Hub - let closeHub: () => Promise beforeAll(async () => { - ;[hub, closeHub] = await createHub() + hub = await createHub() }) afterAll(async () => { - await closeHub() + await closeHub(hub) }) Object.values(PluginLogEntryType).map((type) => { diff --git a/plugin-server/tests/worker/dead-letter-queue.test.ts b/plugin-server/tests/worker/dead-letter-queue.test.ts index a9ef21a6fdd2c..b9c79471fb42a 100644 --- a/plugin-server/tests/worker/dead-letter-queue.test.ts +++ b/plugin-server/tests/worker/dead-letter-queue.test.ts @@ -1,9 +1,10 @@ import { PluginEvent } from '@posthog/plugin-scaffold/src/types' import { Hub, LogLevel } from '../../src/types' -import { createHub } from '../../src/utils/db/hub' +import { closeHub, createHub } from '../../src/utils/db/hub' import { UUIDT } from '../../src/utils/utils' import { EventPipelineRunner } from '../../src/worker/ingestion/event-pipeline/runner' +import { EventsProcessor } from '../../src/worker/ingestion/process-event' import { generateEventDeadLetterQueueMessage } from '../../src/worker/ingestion/utils' import { delayUntilEventIngested, resetTestDatabaseClickhouse } from '../helpers/clickhouse' import { resetTestDatabase } from '../helpers/sql' @@ -45,22 +46,23 @@ function createEvent(): PluginEvent { describe('events dead letter queue', () => { let hub: Hub - let closeHub: () => Promise beforeEach(async () => { - ;[hub, closeHub] = await createHub({ LOG_LEVEL: LogLevel.Log }) + hub = await createHub({ LOG_LEVEL: LogLevel.Log }) console.warn = jest.fn() as any await resetTestDatabase() await resetTestDatabaseClickhouse() }) afterEach(async () => { - await closeHub() + await closeHub(hub) }) test('events get sent to dead letter queue on error', async () => { const event = createEvent() - const ingestResponse1 = await new EventPipelineRunner(hub, event).runEventPipeline(event) + const ingestResponse1 = await new EventPipelineRunner(hub, event, new EventsProcessor(hub)).runEventPipeline( + event + ) expect(ingestResponse1).toEqual({ lastStep: 'prepareEventStep', error: 'database unavailable', diff --git a/plugin-server/tests/worker/ingestion/action-manager.test.ts b/plugin-server/tests/worker/ingestion/action-manager.test.ts index d9ad3893be6f1..e3c1e65f392ec 100644 --- a/plugin-server/tests/worker/ingestion/action-manager.test.ts +++ b/plugin-server/tests/worker/ingestion/action-manager.test.ts @@ -1,23 +1,22 @@ import { Hub, PropertyOperator } from '../../../src/types' -import { createHub } from '../../../src/utils/db/hub' +import { closeHub, createHub } from '../../../src/utils/db/hub' import { PostgresUse } from '../../../src/utils/db/postgres' import { ActionManager } from '../../../src/worker/ingestion/action-manager' import { resetTestDatabase } from '../../helpers/sql' describe('ActionManager', () => { let hub: Hub - let closeServer: () => Promise let actionManager: ActionManager beforeEach(async () => { - ;[hub, closeServer] = await createHub() + hub = await createHub() await resetTestDatabase() actionManager = new ActionManager(hub.postgres, hub) await actionManager.start() }) afterEach(async () => { - await closeServer() + await closeHub(hub) }) const TEAM_ID = 2 diff --git a/plugin-server/tests/worker/ingestion/action-matcher.test.ts b/plugin-server/tests/worker/ingestion/action-matcher.test.ts index 6b84b80991b10..718ca3bde3983 100644 --- a/plugin-server/tests/worker/ingestion/action-matcher.test.ts +++ b/plugin-server/tests/worker/ingestion/action-matcher.test.ts @@ -14,7 +14,7 @@ import { StringMatching, Team, } from '../../../src/types' -import { createHub } from '../../../src/utils/db/hub' +import { closeHub, createHub } from '../../../src/utils/db/hub' import { UUIDT } from '../../../src/utils/utils' import { ActionManager } from '../../../src/worker/ingestion/action-manager' import { ActionMatcher, castingCompare } from '../../../src/worker/ingestion/action-matcher' @@ -44,14 +44,13 @@ function createTestEvent(overrides: Partial = {}): PostInges describe('ActionMatcher', () => { let hub: Hub - let closeServer: () => Promise let actionManager: ActionManager let actionMatcher: ActionMatcher let actionCounter: number beforeEach(async () => { await resetTestDatabase(undefined, undefined, undefined, { withExtendedTestData: false }) - ;[hub, closeServer] = await createHub() + hub = await createHub() actionManager = new ActionManager(hub.db.postgres, hub) await actionManager.start() actionMatcher = new ActionMatcher(hub.db.postgres, actionManager, hub.teamManager) @@ -59,7 +58,7 @@ describe('ActionMatcher', () => { }) afterEach(async () => { - await closeServer() + await closeHub(hub) }) /** Return a test action created on a common base using provided steps. */ diff --git a/plugin-server/tests/worker/ingestion/app-metrics.test.ts b/plugin-server/tests/worker/ingestion/app-metrics.test.ts index dda7d6dc30e97..bf2642d24b1aa 100644 --- a/plugin-server/tests/worker/ingestion/app-metrics.test.ts +++ b/plugin-server/tests/worker/ingestion/app-metrics.test.ts @@ -1,5 +1,5 @@ import { Hub } from '../../../src/types' -import { createHub } from '../../../src/utils/db/hub' +import { closeHub, createHub } from '../../../src/utils/db/hub' import { KafkaProducerWrapper } from '../../../src/utils/db/kafka-producer-wrapper' import { UUIDT } from '../../../src/utils/utils' import { AppMetricIdentifier, AppMetrics } from '../../../src/worker/ingestion/app-metrics' @@ -275,10 +275,9 @@ describe('AppMetrics()', () => { describe('reading writes from clickhouse', () => { let hub: Hub - let closeHub: () => Promise beforeEach(async () => { - ;[hub, closeHub] = await createHub({ + hub = await createHub({ APP_METRICS_FLUSH_FREQUENCY_MS: 100, APP_METRICS_FLUSH_MAX_QUEUE_SIZE: 5, }) @@ -286,7 +285,7 @@ describe('AppMetrics()', () => { jest.spyOn(hub.kafkaProducer, 'queueMessage').mockReturnValue(Promise.resolve()) }) afterEach(async () => { - await closeHub() + await closeHub(hub) }) async function fetchRowsFromClickhouse() { return (await hub.db.clickhouseQuery(`SELECT * FROM app_metrics FINAL`)).data diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/enrichExceptionEventStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/enrichExceptionEventStep.test.ts index e59f42fb71001..c9e4ea17127f7 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/enrichExceptionEventStep.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/enrichExceptionEventStep.test.ts @@ -37,7 +37,6 @@ describe('enrichExceptionEvent()', () => { produce: jest.fn((e) => Promise.resolve(e)), }, }, - nextStep: (...args: any[]) => args, } }) diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts index bc4bf1254d2e0..5c1ebcbb49394 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts @@ -3,7 +3,7 @@ import { DateTime } from 'luxon' import fetch from 'node-fetch' import { Hook, Hub } from '../../../../src/types' -import { createHub } from '../../../../src/utils/db/hub' +import { closeHub, createHub } from '../../../../src/utils/db/hub' import { PostgresUse } from '../../../../src/utils/db/postgres' import { convertToPostIngestionEvent } from '../../../../src/utils/event' import { UUIDT } from '../../../../src/utils/utils' @@ -15,6 +15,7 @@ import { } from '../../../../src/worker/ingestion/event-pipeline/runAsyncHandlersStep' import { EventPipelineRunner } from '../../../../src/worker/ingestion/event-pipeline/runner' import { HookCommander } from '../../../../src/worker/ingestion/hooks' +import { EventsProcessor } from '../../../../src/worker/ingestion/process-event' import { setupPlugins } from '../../../../src/worker/plugins/setup' import { delayUntilEventIngested, resetTestDatabaseClickhouse } from '../../../helpers/clickhouse' import { commonUserId } from '../../../helpers/plugins' @@ -27,10 +28,9 @@ describe('Event Pipeline integration test', () => { let actionManager: ActionManager let actionMatcher: ActionMatcher let hookCannon: HookCommander - let closeServer: () => Promise const ingestEvent = async (event: PluginEvent) => { - const runner = new EventPipelineRunner(hub, event) + const runner = new EventPipelineRunner(hub, event, new EventsProcessor(hub)) const result = await runner.runEventPipeline(event) const postIngestionEvent = convertToPostIngestionEvent(result.args[0]) return Promise.all([ @@ -43,7 +43,7 @@ describe('Event Pipeline integration test', () => { await resetTestDatabase() await resetTestDatabaseClickhouse() process.env.SITE_URL = 'https://example.com' - ;[hub, closeServer] = await createHub() + hub = await createHub() actionManager = new ActionManager(hub.db.postgres, hub) await actionManager.start() @@ -62,7 +62,7 @@ describe('Event Pipeline integration test', () => { }) afterEach(async () => { - await closeServer() + await closeHub(hub) }) it('handles plugins setting properties', async () => { @@ -255,13 +255,13 @@ describe('Event Pipeline integration test', () => { uuid: new UUIDT().toString(), } - await new EventPipelineRunner(hub, event).runEventPipeline(event) + await new EventPipelineRunner(hub, event, new EventsProcessor(hub)).runEventPipeline(event) expect(hub.db.fetchPerson).toHaveBeenCalledTimes(1) // we query before creating expect(hub.db.createPerson).toHaveBeenCalledTimes(1) // second time single fetch - await new EventPipelineRunner(hub, event).runEventPipeline(event) + await new EventPipelineRunner(hub, event, new EventsProcessor(hub)).runEventPipeline(event) expect(hub.db.fetchPerson).toHaveBeenCalledTimes(2) }) }) diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/extractHeatmapDataStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/extractHeatmapDataStep.test.ts index c98c085298417..2fe66ad4dfa37 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/extractHeatmapDataStep.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/extractHeatmapDataStep.test.ts @@ -138,7 +138,6 @@ describe('extractHeatmapDataStep()', () => { fetchTeam: jest.fn(() => Promise.resolve({ heatmaps_opt_in: true })), }, }, - nextStep: (...args: any[]) => args, } }) diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/normalizeEventStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/normalizeEventStep.test.ts index 3d14f463cbfa5..0ddc692fe004a 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/normalizeEventStep.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/normalizeEventStep.test.ts @@ -13,7 +13,7 @@ function copy(a: any) { describe('normalizeEventStep()', () => { it('normalizes the event with properties set by plugins', async () => { await resetTestDatabase() - const [hub, _] = await createHub() + const hub = await createHub() const organizationId = await createOrganization(hub.db.postgres) const teamId = await createTeam(hub.db.postgres, organizationId) const uuid = new UUIDT().toString() @@ -61,7 +61,7 @@ describe('normalizeEventStep()', () => { it('replaces null byte with unicode replacement character in distinct_id', async () => { await resetTestDatabase() - const [hub, _] = await createHub() + const hub = await createHub() const organizationId = await createOrganization(hub.db.postgres) const teamId = await createTeam(hub.db.postgres, organizationId) const uuid = new UUIDT().toString() @@ -90,7 +90,7 @@ describe('normalizeEventStep()', () => { it('normalizes $process_person_profile=false events by dropping $set and related', async () => { await resetTestDatabase() - const [hub, _] = await createHub() + const hub = await createHub() const organizationId = await createOrganization(hub.db.postgres) const teamId = await createTeam(hub.db.postgres, organizationId) const uuid = new UUIDT().toString() diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/pluginsProcessEventStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/pluginsProcessEventStep.test.ts index eb65cd7aa1e03..457c2529b80b9 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/pluginsProcessEventStep.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/pluginsProcessEventStep.test.ts @@ -19,19 +19,11 @@ const pluginEvent: PluginEvent = { } describe('pluginsProcessEventStep()', () => { - let runner: any - - beforeEach(() => { - runner = { - nextStep: (...args: any[]) => args, - } - }) - it('forwards processed plugin event to `processPersonsStep`', async () => { const processedEvent = { ...pluginEvent, event: 'processed' } jest.mocked(runProcessEvent).mockResolvedValue(processedEvent) - const response = await pluginsProcessEventStep(runner, pluginEvent) + const response = await pluginsProcessEventStep({} as any, pluginEvent) expect(response).toEqual(processedEvent) }) @@ -40,7 +32,7 @@ describe('pluginsProcessEventStep()', () => { jest.mocked(runProcessEvent).mockResolvedValue(null) const droppedEventCounterSpy = jest.spyOn(droppedEventCounter, 'inc') - const response = await pluginsProcessEventStep(runner, pluginEvent) + const response = await pluginsProcessEventStep({} as any, pluginEvent) expect(response).toEqual(null) expect(droppedEventCounterSpy).toHaveBeenCalledTimes(1) diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/populateTeamDataStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/populateTeamDataStep.test.ts index cc43c8fb55763..cb900e56899f6 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/populateTeamDataStep.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/populateTeamDataStep.test.ts @@ -33,7 +33,6 @@ let runner: any beforeEach(() => { resetMetrics() runner = { - nextStep: (...args: any[]) => args, hub: { teamManager: { getTeamByToken: jest.fn((token) => { diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/prepareEventStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/prepareEventStep.test.ts index d09a149d44c8b..8c203468348aa 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/prepareEventStep.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/prepareEventStep.test.ts @@ -2,9 +2,11 @@ import { PluginEvent } from '@posthog/plugin-scaffold' import { DateTime } from 'luxon' import { Hub, Person, Team } from '../../../../src/types' -import { createHub } from '../../../../src/utils/db/hub' +import { closeHub, createHub } from '../../../../src/utils/db/hub' import { UUIDT } from '../../../../src/utils/utils' import { prepareEventStep } from '../../../../src/worker/ingestion/event-pipeline/prepareEventStep' +import { EventPipelineRunner } from '../../../../src/worker/ingestion/event-pipeline/runner' +import { EventsProcessor } from '../../../../src/worker/ingestion/process-event' import { resetTestDatabase } from '../../../helpers/sql' jest.mock('../../../../src/utils/status') @@ -49,13 +51,12 @@ const teamTwo: Team = { } describe('prepareEventStep()', () => { - let runner: any + let runner: Pick let hub: Hub - let closeHub: () => Promise beforeEach(async () => { await resetTestDatabase() - ;[hub, closeHub] = await createHub() + hub = await createHub() // :KLUDGE: We test below whether kafka messages are produced, so make sure the person exists beforehand. await hub.db.createPerson(person.created_at, {}, {}, {}, pluginEvent.team_id, null, false, person.uuid, [ @@ -64,18 +65,18 @@ describe('prepareEventStep()', () => { hub.db.kafkaProducer!.queueMessage = jest.fn() // eslint-disable-next-line @typescript-eslint/require-await - hub.eventsProcessor.teamManager.fetchTeam = jest.fn(async (teamId) => { + hub.teamManager.fetchTeam = jest.fn(async (teamId) => { return teamId === 2 ? teamTwo : null }) runner = { - nextStep: (...args: any[]) => args, hub, + eventsProcessor: new EventsProcessor(hub), } }) afterEach(async () => { - await closeHub() + await closeHub(hub) }) it('goes to `createEventStep` for normal events', async () => { @@ -95,7 +96,7 @@ describe('prepareEventStep()', () => { }) it('scrubs IPs when team.anonymize_ips=true', async () => { - jest.mocked(runner.hub.eventsProcessor.teamManager.fetchTeam).mockReturnValue({ + jest.mocked(runner.hub.teamManager.fetchTeam).mockReturnValue({ ...teamTwo, anonymize_ips: true, }) @@ -116,7 +117,7 @@ describe('prepareEventStep()', () => { it('extracts elements_chain from properties', async () => { const event: PluginEvent = { ...pluginEvent, ip: null, properties: { $elements_chain: 'random string', a: 1 } } const preppedEvent = await prepareEventStep(runner, event) - const [chEvent, _] = await runner.hub.eventsProcessor.createEvent(preppedEvent, person) + const [chEvent, _] = runner.eventsProcessor.createEvent(preppedEvent, person) expect(chEvent.elements_chain).toEqual('random string') expect(chEvent.properties).toEqual('{"a":1}') @@ -133,7 +134,7 @@ describe('prepareEventStep()', () => { }, } const preppedEvent = await prepareEventStep(runner, event) - const [chEvent, _] = await runner.hub.eventsProcessor.createEvent(preppedEvent, person) + const [chEvent, _] = runner.eventsProcessor.createEvent(preppedEvent, person) expect(chEvent.elements_chain).toEqual('random string') expect(chEvent.properties).toEqual('{"a":1}') @@ -147,7 +148,7 @@ describe('prepareEventStep()', () => { properties: { a: 1, $elements: [{ tag_name: 'div', nth_child: 1, nth_of_type: 2, $el_text: 'text' }] }, } const preppedEvent = await prepareEventStep(runner, event) - const [chEvent, _] = await runner.hub.eventsProcessor.createEvent(preppedEvent, person) + const [chEvent, _] = runner.eventsProcessor.createEvent(preppedEvent, person) expect(chEvent.elements_chain).toEqual('div:nth-child="1"nth-of-type="2"text="text"') expect(chEvent.properties).toEqual('{"a":1}') diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/processPersonsStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/processPersonsStep.test.ts index ea3a592f79ed4..61b0b5f4b8310 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/processPersonsStep.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/processPersonsStep.test.ts @@ -2,16 +2,17 @@ import { PluginEvent } from '@posthog/plugin-scaffold' import { DateTime } from 'luxon' import { Hub } from '../../../../src/types' -import { createHub } from '../../../../src/utils/db/hub' +import { closeHub, createHub } from '../../../../src/utils/db/hub' import { UUIDT } from '../../../../src/utils/utils' import { normalizeEventStep } from '../../../../src/worker/ingestion/event-pipeline/normalizeEventStep' import { processPersonsStep } from '../../../../src/worker/ingestion/event-pipeline/processPersonsStep' +import { EventPipelineRunner } from '../../../../src/worker/ingestion/event-pipeline/runner' +import { EventsProcessor } from '../../../../src/worker/ingestion/process-event' import { createOrganization, createTeam, fetchPostgresPersons, resetTestDatabase } from '../../../helpers/sql' describe('processPersonsStep()', () => { - let runner: any + let runner: Pick let hub: Hub - let closeHub: () => Promise let uuid: string let teamId: number @@ -20,10 +21,10 @@ describe('processPersonsStep()', () => { beforeEach(async () => { await resetTestDatabase() - ;[hub, closeHub] = await createHub() + hub = await createHub() runner = { - nextStep: (...args: any[]) => args, hub: hub, + eventsProcessor: new EventsProcessor(hub), } const organizationId = await createOrganization(runner.hub.db.postgres) teamId = await createTeam(runner.hub.db.postgres, organizationId) @@ -47,7 +48,7 @@ describe('processPersonsStep()', () => { timestamp = DateTime.fromISO(pluginEvent.timestamp!) }) afterEach(async () => { - await closeHub?.() + await closeHub(hub) }) it('creates person', async () => { diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/runAsyncHandlersStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/runAsyncHandlersStep.test.ts index 248f4d6fd4d22..759bb5140e9d5 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/runAsyncHandlersStep.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/runAsyncHandlersStep.test.ts @@ -26,7 +26,6 @@ describe('runAsyncHandlersStep()', () => { beforeEach(() => { runner = { - nextStep: (...args: any[]) => args, hub: { capabilities: { processAsyncOnEventHandlers: true, diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts index e70d721828b96..ef9864ccb6b1a 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts @@ -11,6 +11,7 @@ import { prepareEventStep } from '../../../../src/worker/ingestion/event-pipelin import { processPersonsStep } from '../../../../src/worker/ingestion/event-pipeline/processPersonsStep' import { processOnEventStep } from '../../../../src/worker/ingestion/event-pipeline/runAsyncHandlersStep' import { EventPipelineRunner } from '../../../../src/worker/ingestion/event-pipeline/runner' +import { EventsProcessor } from '../../../../src/worker/ingestion/process-event' jest.mock('../../../../src/worker/ingestion/event-pipeline/populateTeamDataStep') jest.mock('../../../../src/worker/ingestion/event-pipeline/pluginsProcessEventStep') @@ -101,7 +102,7 @@ describe('EventPipelineRunner', () => { }, eventsToDropByToken: createEventsToDropByToken('drop_token:drop_id,drop_token_all:*'), } - runner = new TestEventPipelineRunner(hub, pluginEvent) + runner = new TestEventPipelineRunner(hub, pluginEvent, new EventsProcessor(hub)) jest.mocked(populateTeamDataStep).mockResolvedValue(pluginEvent) jest.mocked(pluginsProcessEventStep).mockResolvedValue(pluginEvent) @@ -271,7 +272,7 @@ describe('EventPipelineRunner', () => { kafkaProducer: { queueMessage: jest.fn() }, }, } - const runner = new TestEventPipelineRunner(hub, event) + const runner = new TestEventPipelineRunner(hub, event, new EventsProcessor(hub)) jest.mocked(populateTeamDataStep).mockResolvedValue(event) await runner.runEventPipeline(event) @@ -309,7 +310,7 @@ describe('EventPipelineRunner', () => { // setup just enough mocks that the right pipeline runs - runner = new TestEventPipelineRunner(hub, heatmapEvent) + runner = new TestEventPipelineRunner(hub, heatmapEvent, new EventsProcessor(hub)) jest.mocked(populateTeamDataStep).mockResolvedValue(heatmapEvent as any) @@ -352,7 +353,7 @@ describe('EventPipelineRunner $process_person_profile=false', () => { kafkaProducer: { queueMessage: jest.fn() }, }, } - const runner = new TestEventPipelineRunner(hub, event) + const runner = new TestEventPipelineRunner(hub, event, new EventsProcessor(hub)) jest.mocked(populateTeamDataStep).mockResolvedValue(event) await runner.runEventPipeline(event) diff --git a/plugin-server/tests/worker/ingestion/group-type-manager.test.ts b/plugin-server/tests/worker/ingestion/group-type-manager.test.ts index 3999d5a78cb93..5ee484a996db3 100644 --- a/plugin-server/tests/worker/ingestion/group-type-manager.test.ts +++ b/plugin-server/tests/worker/ingestion/group-type-manager.test.ts @@ -1,5 +1,5 @@ import { Hub } from '../../../src/types' -import { createHub } from '../../../src/utils/db/hub' +import { closeHub, createHub } from '../../../src/utils/db/hub' import { captureTeamEvent } from '../../../src/utils/posthog' import { GroupTypeManager } from '../../../src/worker/ingestion/group-type-manager' import { resetTestDatabase } from '../../helpers/sql' @@ -11,11 +11,10 @@ jest.mock('../../../src/utils/posthog', () => ({ describe('GroupTypeManager()', () => { let hub: Hub - let closeHub: () => Promise let groupTypeManager: GroupTypeManager beforeEach(async () => { - ;[hub, closeHub] = await createHub() + hub = await createHub() await resetTestDatabase() groupTypeManager = new GroupTypeManager(hub.postgres, hub.teamManager) @@ -23,7 +22,7 @@ describe('GroupTypeManager()', () => { jest.spyOn(groupTypeManager, 'insertGroupType') }) afterEach(async () => { - await closeHub() + await closeHub(hub) }) describe('fetchGroupTypes()', () => { diff --git a/plugin-server/tests/worker/ingestion/organization-manager.test.ts b/plugin-server/tests/worker/ingestion/organization-manager.test.ts index 0c734a90b4dc8..214ff486b49f1 100644 --- a/plugin-server/tests/worker/ingestion/organization-manager.test.ts +++ b/plugin-server/tests/worker/ingestion/organization-manager.test.ts @@ -1,5 +1,5 @@ import { Hub } from '../../../src/types' -import { createHub } from '../../../src/utils/db/hub' +import { closeHub, createHub } from '../../../src/utils/db/hub' import { PostgresUse } from '../../../src/utils/db/postgres' import { UUIDT } from '../../../src/utils/utils' import { OrganizationManager } from '../../../src/worker/ingestion/organization-manager' @@ -10,16 +10,15 @@ jest.mock('../../../src/utils/status') describe('OrganizationManager()', () => { let hub: Hub - let closeHub: () => Promise let organizationManager: OrganizationManager beforeEach(async () => { - ;[hub, closeHub] = await createHub() + hub = await createHub() await resetTestDatabase() organizationManager = new OrganizationManager(hub.postgres, hub.teamManager) }) afterEach(async () => { - await closeHub() + await closeHub(hub) }) describe('fetchOrganization()', () => { diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index 7405f6799dab4..fcf548ec0b199 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -3,7 +3,7 @@ import { DateTime } from 'luxon' import { Database, Hub, InternalPerson } from '../../../src/types' import { DependencyUnavailableError } from '../../../src/utils/db/error' -import { createHub } from '../../../src/utils/db/hub' +import { closeHub, createHub } from '../../../src/utils/db/hub' import { PostgresUse } from '../../../src/utils/db/postgres' import { defaultRetryConfig } from '../../../src/utils/retries' import { UUIDT } from '../../../src/utils/utils' @@ -20,7 +20,6 @@ const timestampch = '2020-01-01 12:00:05.000' describe('PersonState.update()', () => { let hub: Hub - let closeHub: () => Promise let teamId: number let organizationId: string @@ -36,7 +35,7 @@ describe('PersonState.update()', () => { let secondUserUuid: string beforeAll(async () => { - ;[hub, closeHub] = await createHub({}) + hub = await createHub({}) await hub.db.clickhouseQuery('SYSTEM STOP MERGES') organizationId = await createOrganization(hub.db.postgres) @@ -62,7 +61,7 @@ describe('PersonState.update()', () => { }) afterAll(async () => { - await closeHub() + await closeHub(hub) await hub.db.clickhouseQuery('SYSTEM START MERGES') }) @@ -1861,17 +1860,16 @@ describe('PersonState.update()', () => { // For some reason these tests failed if I ran them with a hub shared // with other tests, so I'm creating a new hub for each test. let hub: Hub - let closeHub: () => Promise beforeEach(async () => { - ;[hub, closeHub] = await createHub({}) + hub = await createHub({}) jest.spyOn(hub.db, 'fetchPerson') jest.spyOn(hub.db, 'updatePersonDeprecated') }) afterEach(async () => { - await closeHub() + await closeHub(hub) }) it(`no-op if persons already merged`, async () => { diff --git a/plugin-server/tests/worker/ingestion/process-event.test.ts b/plugin-server/tests/worker/ingestion/process-event.test.ts index f54b733f149b2..1187cf5336d29 100644 --- a/plugin-server/tests/worker/ingestion/process-event.test.ts +++ b/plugin-server/tests/worker/ingestion/process-event.test.ts @@ -2,7 +2,7 @@ import * as IORedis from 'ioredis' import { DateTime } from 'luxon' import { Hub, ISOTimestamp, Person, PreIngestionEvent } from '../../../src/types' -import { createHub } from '../../../src/utils/db/hub' +import { closeHub, createHub } from '../../../src/utils/db/hub' import { UUIDT } from '../../../src/utils/utils' import { EventsProcessor } from '../../../src/worker/ingestion/process-event' import { delayUntilEventIngested, resetTestDatabaseClickhouse } from '../../helpers/clickhouse' @@ -13,7 +13,6 @@ jest.mock('../../../src/utils/status') jest.setTimeout(600000) // 600 sec timeout. let hub: Hub -let closeHub: () => Promise let redis: IORedis.Redis let eventsProcessor: EventsProcessor @@ -24,7 +23,7 @@ beforeAll(async () => { beforeEach(async () => { await resetTestDatabase() await resetTestDatabaseClickhouse() - ;[hub, closeHub] = await createHub() + hub = await createHub() redis = await hub.redisPool.acquire() await redis.flushdb() @@ -33,7 +32,7 @@ beforeEach(async () => { afterEach(async () => { await hub.redisPool.release(redis) - await closeHub?.() + await closeHub(hub) }) describe('EventsProcessor#createEvent()', () => { diff --git a/plugin-server/tests/worker/ingestion/properties-updater.test.ts b/plugin-server/tests/worker/ingestion/properties-updater.test.ts index 16cde1c7e84ab..2bbf7aa31a571 100644 --- a/plugin-server/tests/worker/ingestion/properties-updater.test.ts +++ b/plugin-server/tests/worker/ingestion/properties-updater.test.ts @@ -3,7 +3,7 @@ import { DateTime } from 'luxon' import { Group, Hub, Team } from '../../../src/types' import { DB } from '../../../src/utils/db/db' -import { createHub } from '../../../src/utils/db/hub' +import { closeHub, createHub } from '../../../src/utils/db/hub' import { UUIDT } from '../../../src/utils/utils' import { upsertGroup } from '../../../src/worker/ingestion/properties-updater' import { createPromise } from '../../helpers/promises' @@ -13,7 +13,6 @@ jest.mock('../../../src/utils/status') describe('properties-updater', () => { let hub: Hub - let closeServer: () => Promise let db: DB let team: Team @@ -24,7 +23,7 @@ describe('properties-updater', () => { const PAST_TIMESTAMP = DateTime.fromISO('2000-10-14T11:42:06.502Z') beforeEach(async () => { - ;[hub, closeServer] = await createHub() + hub = await createHub() await resetTestDatabase() db = hub.db @@ -36,7 +35,7 @@ describe('properties-updater', () => { }) afterEach(async () => { - await closeServer() + await closeHub(hub) }) describe('upsertGroup()', () => { diff --git a/plugin-server/tests/worker/ingestion/property-definitions-cache.test.ts b/plugin-server/tests/worker/ingestion/property-definitions-cache.test.ts index 92216c9a2ec62..e36c8f0a755bd 100644 --- a/plugin-server/tests/worker/ingestion/property-definitions-cache.test.ts +++ b/plugin-server/tests/worker/ingestion/property-definitions-cache.test.ts @@ -1,5 +1,5 @@ import { Hub, PropertyDefinitionTypeEnum } from '../../../src/types' -import { createHub } from '../../../src/utils/db/hub' +import { closeHub, createHub } from '../../../src/utils/db/hub' import { PostgresUse } from '../../../src/utils/db/postgres' import { UUIDT } from '../../../src/utils/utils' import { PropertyDefinitionsCache } from '../../../src/worker/ingestion/property-definitions-cache' @@ -15,18 +15,17 @@ jest.mock('../../../src/utils/posthog', () => ({ describe('PropertyDefinitionsManager()', () => { let hub: Hub - let closeHub: () => Promise let cache: PropertyDefinitionsCache beforeEach(async () => { - ;[hub, closeHub] = await createHub() + hub = await createHub() await resetTestDatabase() cache = new PropertyDefinitionsCache(hub) }) afterEach(async () => { - await closeHub() + await closeHub(hub) }) describe('with pre-existing data', () => { diff --git a/plugin-server/tests/worker/ingestion/property-definitions-manager.test.ts b/plugin-server/tests/worker/ingestion/property-definitions-manager.test.ts index 192ad46342e15..8ec4eaad75859 100644 --- a/plugin-server/tests/worker/ingestion/property-definitions-manager.test.ts +++ b/plugin-server/tests/worker/ingestion/property-definitions-manager.test.ts @@ -1,7 +1,7 @@ import { DateTime, Settings } from 'luxon' import { DateTimePropertyTypeFormat, Hub, PropertyDefinitionTypeEnum, PropertyType } from '../../../src/types' -import { createHub } from '../../../src/utils/db/hub' +import { closeHub, createHub } from '../../../src/utils/db/hub' import { PostgresUse } from '../../../src/utils/db/postgres' import { posthog } from '../../../src/utils/posthog' import { UUIDT } from '../../../src/utils/utils' @@ -21,14 +21,13 @@ jest.mock('../../../src/utils/posthog', () => ({ describe('PropertyDefinitionsManager()', () => { let hub: Hub - let closeHub: () => Promise let manager: PropertyDefinitionsManager let teamId: number let organizationId: string let groupTypeManager: GroupTypeManager beforeEach(async () => { - ;[hub, closeHub] = await createHub() + hub = await createHub() organizationId = await createOrganization(hub.db.postgres) teamId = await createTeam(hub.db.postgres, organizationId) groupTypeManager = new GroupTypeManager(hub.postgres, hub.teamManager, hub.SITE_URL) @@ -38,7 +37,7 @@ describe('PropertyDefinitionsManager()', () => { }) afterEach(async () => { - await closeHub() + await closeHub(hub) }) describe('updateEventNamesAndProperties()', () => { diff --git a/plugin-server/tests/worker/ingestion/utils.test.ts b/plugin-server/tests/worker/ingestion/utils.test.ts index f398b01d29be0..8eed11d028ac3 100644 --- a/plugin-server/tests/worker/ingestion/utils.test.ts +++ b/plugin-server/tests/worker/ingestion/utils.test.ts @@ -1,5 +1,5 @@ import { Hub, LogLevel } from '../../../src/types' -import { createHub } from '../../../src/utils/db/hub' +import { closeHub, createHub } from '../../../src/utils/db/hub' import { captureIngestionWarning } from '../../../src/worker/ingestion/utils' import { delayUntilEventIngested, resetTestDatabaseClickhouse } from '../../helpers/clickhouse' @@ -7,15 +7,14 @@ jest.setTimeout(60000) // 60 sec timeout describe('captureIngestionWarning()', () => { let hub: Hub - let closeHub: () => Promise beforeEach(async () => { - ;[hub, closeHub] = await createHub({ LOG_LEVEL: LogLevel.Log }) + hub = await createHub({ LOG_LEVEL: LogLevel.Log }) await resetTestDatabaseClickhouse() }) afterEach(async () => { - await closeHub() + await closeHub(hub) }) async function fetchWarnings() { diff --git a/plugin-server/tests/worker/plugins-api-key-manager.test.ts b/plugin-server/tests/worker/plugins-api-key-manager.test.ts index f4a1b77220587..606bfc86a89ff 100644 --- a/plugin-server/tests/worker/plugins-api-key-manager.test.ts +++ b/plugin-server/tests/worker/plugins-api-key-manager.test.ts @@ -1,5 +1,5 @@ import { Hub } from '../../src/types' -import { createHub } from '../../src/utils/db/hub' +import { closeHub, createHub } from '../../src/utils/db/hub' import { PostgresUse } from '../../src/utils/db/postgres' import { PluginsApiKeyManager } from '../../src/worker/vm/extensions/helpers/api-key-manager' import { createUserTeamAndOrganization, POSTGRES_DELETE_TABLES_QUERY } from '../helpers/sql' @@ -9,10 +9,9 @@ const ORG_ID_2 = '4dc8564d-bd82-1065-2f40-97f7c50f67cf' describe('PluginsApiKeyManager', () => { let hub: Hub - let closeHub: () => Promise beforeEach(async () => { - ;[hub, closeHub] = await createHub({ + hub = await createHub({ TASK_TIMEOUT: 1, }) await hub.db.postgres.query(PostgresUse.COMMON_WRITE, POSTGRES_DELETE_TABLES_QUERY, [], 'truncateTablesTest') @@ -21,7 +20,7 @@ describe('PluginsApiKeyManager', () => { }) afterEach(async () => { - await closeHub() + await closeHub(hub) }) test('fetchOrCreatePersonalApiKey', async () => { diff --git a/plugin-server/tests/worker/plugins.test.ts b/plugin-server/tests/worker/plugins.test.ts index 286f289e46cd4..eceb14e4d142c 100644 --- a/plugin-server/tests/worker/plugins.test.ts +++ b/plugin-server/tests/worker/plugins.test.ts @@ -2,7 +2,7 @@ import { PluginEvent } from '@posthog/plugin-scaffold/src/types' import { Hub, LogLevel } from '../../src/types' import { processError } from '../../src/utils/db/error' -import { createHub } from '../../src/utils/db/hub' +import { closeHub, createHub } from '../../src/utils/db/hub' import { delay, IllegalOperationError } from '../../src/utils/utils' import { loadPlugin } from '../../src/worker/plugins/loadPlugin' import { loadSchedule } from '../../src/worker/plugins/loadSchedule' @@ -32,16 +32,15 @@ jest.setTimeout(20_000) describe('plugins', () => { let hub: Hub - let closeHub: () => Promise beforeEach(async () => { - ;[hub, closeHub] = await createHub({ LOG_LEVEL: LogLevel.Log }) + hub = await createHub({ LOG_LEVEL: LogLevel.Log }) console.warn = jest.fn() as any await resetTestDatabase() }) afterEach(async () => { - await closeHub() + await closeHub(hub) }) test('setupPlugins and runProcessEvent', async () => { diff --git a/plugin-server/tests/worker/plugins/inline.test.ts b/plugin-server/tests/worker/plugins/inline.test.ts index d03d66b357552..8201f5e07dafb 100644 --- a/plugin-server/tests/worker/plugins/inline.test.ts +++ b/plugin-server/tests/worker/plugins/inline.test.ts @@ -1,7 +1,7 @@ import { PluginEvent } from '@posthog/plugin-scaffold' import { Hub, LogLevel, Plugin, PluginConfig } from '../../../src/types' -import { createHub } from '../../../src/utils/db/hub' +import { closeHub, createHub } from '../../../src/utils/db/hub' import { PostgresUse } from '../../../src/utils/db/postgres' import { constructInlinePluginInstance, @@ -15,17 +15,16 @@ import { resetTestDatabase } from '../../helpers/sql' describe('Inline plugin', () => { let hub: Hub - let closeHub: () => Promise beforeAll(async () => { console.info = jest.fn() as any console.warn = jest.fn() as any - ;[hub, closeHub] = await createHub({ LOG_LEVEL: LogLevel.Log }) + hub = await createHub({ LOG_LEVEL: LogLevel.Log }) await resetTestDatabase() }) afterAll(async () => { - await closeHub() + await closeHub(hub) }) // Sync all the inline plugins, then assert that for each plugin URL, a diff --git a/plugin-server/tests/worker/plugins/inline_plugins/user-agent.test.ts b/plugin-server/tests/worker/plugins/inline_plugins/user-agent.test.ts index b196353751b73..4b7a677b77476 100644 --- a/plugin-server/tests/worker/plugins/inline_plugins/user-agent.test.ts +++ b/plugin-server/tests/worker/plugins/inline_plugins/user-agent.test.ts @@ -1,23 +1,22 @@ import { PluginEvent } from '@posthog/plugin-scaffold' import { LogLevel, PluginConfig } from '../../../../src/types' -import { createHub } from '../../../../src/utils/db/hub' +import { closeHub, createHub } from '../../../../src/utils/db/hub' import { constructInlinePluginInstance } from '../../../../src/worker/vm/inline/inline' import { resetTestDatabase } from '../../../helpers/sql' describe('user-agent tests', () => { let hub: any - let closeHub: () => Promise beforeAll(async () => { console.info = jest.fn() as any console.warn = jest.fn() as any - ;[hub, closeHub] = await createHub({ LOG_LEVEL: LogLevel.Log }) + hub = await createHub({ LOG_LEVEL: LogLevel.Log }) await resetTestDatabase() }) afterAll(async () => { - await closeHub() + await closeHub(hub) }) test('should not process event when $userAgent is missing', async () => { diff --git a/plugin-server/tests/worker/plugins/mmdb.test.ts b/plugin-server/tests/worker/plugins/mmdb.test.ts index 8179191a27640..e95e3fec24f32 100644 --- a/plugin-server/tests/worker/plugins/mmdb.test.ts +++ b/plugin-server/tests/worker/plugins/mmdb.test.ts @@ -5,7 +5,7 @@ import fetch from 'node-fetch' import { join } from 'path' import { Hub, LogLevel } from '../../../src/types' -import { createHub } from '../../../src/utils/db/hub' +import { closeHub, createHub } from '../../../src/utils/db/hub' import { setupMmdb } from '../../../src/worker/plugins/mmdb' import { resetTestDatabase } from '../../helpers/sql' @@ -38,17 +38,16 @@ async function getCityName(hub: Hub, ipAddress: string) { describe('mmdb', () => { let hub: Hub - let closeHub: () => Promise jest.setTimeout(100_000) beforeAll(async () => { - ;[hub, closeHub] = await createHub({ LOG_LEVEL: LogLevel.Warn }) + hub = await createHub({ LOG_LEVEL: LogLevel.Warn }) hub.capabilities.mmdb = true }) afterAll(async () => { - await closeHub() + await closeHub(hub) jest.clearAllMocks() }) diff --git a/plugin-server/tests/worker/transforms.test.ts b/plugin-server/tests/worker/transforms.test.ts index 9b88c329f4563..d3072eaa0acd9 100644 --- a/plugin-server/tests/worker/transforms.test.ts +++ b/plugin-server/tests/worker/transforms.test.ts @@ -1,5 +1,5 @@ import { Hub } from '../../src/types' -import { createHub } from '../../src/utils/db/hub' +import { closeHub, createHub } from '../../src/utils/db/hub' import { code } from '../../src/utils/utils' import { transformCode } from '../../src/worker/vm/transforms' import { resetTestDatabase } from '../helpers/sql' @@ -10,15 +10,14 @@ const EMPTY_IMPORTS = {} describe('transforms', () => { let hub: Hub - let closeHub: () => Promise beforeEach(async () => { - ;[hub, closeHub] = await createHub() + hub = await createHub() await resetTestDatabase(`const processEvent = event => event`) }) afterEach(async () => { - await closeHub() + await closeHub(hub) }) describe('transformCode', () => { diff --git a/plugin-server/tests/worker/vm.extra-lazy.test.ts b/plugin-server/tests/worker/vm.extra-lazy.test.ts index 78bcc0da60f6c..480fdc423bd30 100644 --- a/plugin-server/tests/worker/vm.extra-lazy.test.ts +++ b/plugin-server/tests/worker/vm.extra-lazy.test.ts @@ -1,7 +1,7 @@ import fetch from 'node-fetch' import { Hub, PluginTaskType } from '../../src/types' -import { createHub } from '../../src/utils/db/hub' +import { closeHub, createHub } from '../../src/utils/db/hub' import { pluginDigest } from '../../src/utils/utils' import { LazyPluginVM } from '../../src/worker/vm/lazy' import { plugin60, pluginConfig39 } from '../helpers/plugins' @@ -9,14 +9,13 @@ import { resetTestDatabase } from '../helpers/sql' describe('VMs are extra lazy 💤', () => { let hub: Hub - let closeHub: () => Promise beforeEach(async () => { - ;[hub, closeHub] = await createHub() + hub = await createHub() }) afterEach(async () => { - await closeHub() + await closeHub(hub) jest.clearAllMocks() }) test('VM with scheduled tasks gets setup immediately', async () => { diff --git a/plugin-server/tests/worker/vm.test.ts b/plugin-server/tests/worker/vm.test.ts index 5f1f727d4dbeb..6f4f35460da19 100644 --- a/plugin-server/tests/worker/vm.test.ts +++ b/plugin-server/tests/worker/vm.test.ts @@ -4,7 +4,7 @@ import fetch from 'node-fetch' import { KAFKA_EVENTS_PLUGIN_INGESTION, KAFKA_PLUGIN_LOG_ENTRIES } from '../../src/config/kafka-topics' import { Hub, PluginLogEntrySource, PluginLogEntryType } from '../../src/types' import { PluginConfig, PluginConfigVMResponse } from '../../src/types' -import { createHub } from '../../src/utils/db/hub' +import { closeHub, createHub } from '../../src/utils/db/hub' import { delay, UUIDT } from '../../src/utils/utils' import { createPluginConfigVM } from '../../src/worker/vm/vm' import { pluginConfig39 } from '../helpers/plugins' @@ -42,14 +42,13 @@ export const createReadyPluginConfigVm = async ( } describe('vm tests', () => { let hub: Hub - let closeHub: () => Promise beforeEach(async () => { - ;[hub, closeHub] = await createHub() + hub = await createHub() }) afterEach(async () => { - await closeHub() + await closeHub(hub) }) test('empty plugins', async () => { diff --git a/plugin-server/tests/worker/vm.timeout.test.ts b/plugin-server/tests/worker/vm.timeout.test.ts index 739c35095a7c1..aebab19669622 100644 --- a/plugin-server/tests/worker/vm.timeout.test.ts +++ b/plugin-server/tests/worker/vm.timeout.test.ts @@ -1,5 +1,5 @@ import { Hub, PluginConfig, PluginConfigVMResponse } from '../../src/types' -import { createHub } from '../../src/utils/db/hub' +import { closeHub, createHub } from '../../src/utils/db/hub' import { createPluginConfigVM, TimeoutError } from '../../src/worker/vm/vm' import { pluginConfig39 } from '../helpers/plugins' import { resetTestDatabase } from '../helpers/sql' @@ -30,16 +30,15 @@ export const createReadyPluginConfigVm = async ( describe('vm timeout tests', () => { let hub: Hub - let closeHub: () => Promise beforeEach(async () => { - ;[hub, closeHub] = await createHub({ + hub = await createHub({ TASK_TIMEOUT: 1, }) }) afterEach(async () => { - await closeHub() + await closeHub(hub) }) test('while loop', async () => {