Skip to content

Commit

Permalink
chore: Reverts "Plugin server services refactor (#24638)" (#24733)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored Sep 2, 2024
1 parent 11b8c09 commit 3f56346
Show file tree
Hide file tree
Showing 72 changed files with 557 additions and 414 deletions.
2 changes: 2 additions & 0 deletions plugin-server/bin/ci_functional_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
set -e -o pipefail

export WORKER_CONCURRENCY=1
export CONVERSION_BUFFER_ENABLED=true
export BUFFER_CONVERSION_SECONDS=2 # Make sure we don't have to wait for the default 60 seconds
export KAFKA_MAX_MESSAGE_BATCH_SIZE=0
export APP_METRICS_FLUSH_FREQUENCY_MS=0 # Reduce the potential for spurious errors in tests that wait for metrics
export APP_METRICS_GATHERED_FOR_ALL=true
Expand Down
11 changes: 1 addition & 10 deletions plugin-server/src/cdp/cdp-consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars
import { createKafkaProducer } from '../kafka/producer'
import { addSentryBreadcrumbsEventListeners } from '../main/ingestion-queues/kafka-metrics'
import { runInstrumentedFunction } from '../main/utils'
import { AppMetric2Type, Hub, PluginServerService, RawClickHouseEvent, TeamId, TimestampFormat } from '../types'
import { AppMetric2Type, Hub, RawClickHouseEvent, TeamId, TimestampFormat } from '../types'
import { KafkaProducerWrapper } from '../utils/db/kafka-producer-wrapper'
import { captureTeamEvent } from '../utils/posthog'
import { status } from '../utils/status'
Expand Down Expand Up @@ -113,15 +113,6 @@ abstract class CdpConsumerBase {
this.groupsManager = new GroupsManager(this.hub)
}

public get service(): PluginServerService {
return {
id: this.consumerGroupId,
onShutdown: async () => await this.stop(),
healthcheck: () => this.isHealthy() ?? false,
batchConsumer: this.batchConsumer,
}
}

private async captureInternalPostHogEvent(
hogFunctionId: HogFunctionType['id'],
event: string,
Expand Down
4 changes: 4 additions & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ export function getDefaultConfig(): PluginsServerConfig {
CLICKHOUSE_DISABLE_EXTERNAL_SCHEMAS_TEAMS: '',
CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC: KAFKA_EVENTS_JSON,
CLICKHOUSE_HEATMAPS_KAFKA_TOPIC: KAFKA_CLICKHOUSE_HEATMAP_EVENTS,
CONVERSION_BUFFER_ENABLED: false,
CONVERSION_BUFFER_ENABLED_TEAMS: '',
CONVERSION_BUFFER_TOPIC_ENABLED_TEAMS: '',
BUFFER_CONVERSION_SECONDS: isDevEnv() ? 2 : 60, // KEEP IN SYNC WITH posthog/settings/ingestion.py
PERSON_INFO_CACHE_TTL: 5 * 60, // 5 min
KAFKA_HEALTHCHECK_SECONDS: 20,
OBJECT_STORAGE_ENABLED: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Counter } from 'prom-client'

import { buildStringMatcher } from '../../config/config'
import { KAFKA_EVENTS_PLUGIN_INGESTION, prefix as KAFKA_PREFIX } from '../../config/kafka-topics'
import { Hub, PluginServerService } from '../../types'
import { Hub } from '../../types'
import { status } from '../../utils/status'
import { eachBatchParallelIngestion, IngestionOverflowMode } from './batch-processing/each-batch-ingestion'
import { IngestionConsumer } from './kafka-queue'
Expand All @@ -18,7 +18,7 @@ export const startAnalyticsEventsIngestionConsumer = async ({
hub, // TODO: remove needing to pass in the whole hub and be more selective on dependency injection.
}: {
hub: Hub
}): Promise<PluginServerService> => {
}) => {
/*
Consumes analytics events from the Kafka topic `events_plugin_ingestion`
and processes them for ingestion into ClickHouse.
Expand Down Expand Up @@ -66,10 +66,5 @@ export const startAnalyticsEventsIngestionConsumer = async ({

const { isHealthy } = await queue.start()

return {
id: 'analytics-ingestion',
batchConsumer: queue.consumer,
healthcheck: isHealthy,
onShutdown: () => queue.stop(),
}
return { queue, isHealthy }
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Message } from 'node-rdkafka'

import { buildStringMatcher } from '../../config/config'
import { KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL, prefix as KAFKA_PREFIX } from '../../config/kafka-topics'
import { Hub, PluginServerService } from '../../types'
import { Hub } from '../../types'
import { status } from '../../utils/status'
import { eachBatchParallelIngestion, IngestionOverflowMode } from './batch-processing/each-batch-ingestion'
import { IngestionConsumer } from './kafka-queue'
Expand All @@ -11,7 +11,7 @@ export const startAnalyticsEventsIngestionHistoricalConsumer = async ({
hub, // TODO: remove needing to pass in the whole hub and be more selective on dependency injection.
}: {
hub: Hub
}): Promise<PluginServerService> => {
}) => {
/*
Consumes analytics events from the Kafka topic `events_plugin_ingestion_historical`
and processes them for ingestion into ClickHouse.
Expand Down Expand Up @@ -39,10 +39,5 @@ export const startAnalyticsEventsIngestionHistoricalConsumer = async ({

const { isHealthy } = await queue.start()

return {
id: 'analytics-ingestion-historical',
onShutdown: async () => await queue.stop(),
healthcheck: isHealthy,
batchConsumer: queue.consumer,
}
return { queue, isHealthy }
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,5 @@ export const startAnalyticsEventsIngestionOverflowConsumer = async ({

const { isHealthy } = await queue.start()

return {
id: 'analytics-ingestion-overflow',
onShutdown: async () => await queue.stop(),
healthcheck: isHealthy,
batchConsumer: queue.consumer,
}
return { queue, isHealthy }
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Message } from 'node-rdkafka'

import { buildStringMatcher } from '../../config/config'
import { prefix as KAFKA_PREFIX, suffix as KAFKA_SUFFIX } from '../../config/kafka-topics'
import { Hub, PluginServerService } from '../../types'
import { Hub } from '../../types'
import { status } from '../../utils/status'
import { eachBatchParallelIngestion, IngestionOverflowMode } from './batch-processing/each-batch-ingestion'
import { IngestionConsumer } from './kafka-queue'
Expand All @@ -27,19 +27,16 @@ export const PIPELINES: { [key: string]: PipelineType } = {
},
}

export type PipelineKeyType = keyof typeof PIPELINES

export const startEventsIngestionPipelineConsumer = async ({
hub, // TODO: remove needing to pass in the whole hub and be more selective on dependency injection.
pipelineKey,
pipeline,
}: {
hub: Hub
pipelineKey: PipelineKeyType
}): Promise<PluginServerService> => {
pipeline: PipelineType
}) => {
/*
Consumes events from the topic and consumer passed in.
*/
const pipeline = PIPELINES[pipelineKey]
const kafka_topic = `${KAFKA_PREFIX}${pipeline.topic}${KAFKA_SUFFIX}`
const kafka_consumer = `${KAFKA_PREFIX}${pipeline.consumer_group}`
status.info(
Expand All @@ -62,10 +59,5 @@ export const startEventsIngestionPipelineConsumer = async ({

const { isHealthy } = await queue.start()

return {
id: `events-ingestion-pipeline-${pipelineKey}`,
onShutdown: async () => await queue.stop(),
healthcheck: isHealthy,
batchConsumer: queue.consumer,
}
return { queue, isHealthy }
}
10 changes: 4 additions & 6 deletions plugin-server/src/main/ingestion-queues/jobs-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import { status } from '../../utils/status'
import { GraphileWorker } from '../graphile-worker/graphile-worker'
import { instrumentEachBatchKafkaJS, setupEventHandlers } from './kafka-queue'
import { latestOffsetTimestampGauge } from './metrics'
import { makeHealthCheck } from './on-event-handler-consumer'

const jobsConsumerSuccessCounter = new Counter({
name: 'jobs_consumer_enqueue_success_total',
Expand Down Expand Up @@ -126,11 +125,10 @@ export const startJobsConsumer = async ({
},
})

const healthcheck = makeHealthCheck(consumer, serverConfig.KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS)

return {
id: 'jobs-consumer',
healthcheck: async () => await healthcheck(),
onShutdown: async () => await consumer.stop(),
...consumer,
stop: async () => {
await consumer.stop()
},
}
}
6 changes: 6 additions & 0 deletions plugin-server/src/main/ingestion-queues/kafka-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ type EachBatchFunction = (messages: Message[], queue: IngestionConsumer) => Prom

export class IngestionConsumer {
public pluginsServer: Hub
public consumerReady: boolean
public topic: string
public consumerGroupId: string
public eachBatch: EachBatchFunction
Expand All @@ -176,6 +177,8 @@ export class IngestionConsumer {
this.topic = topic
this.consumerGroupId = consumerGroupId

this.consumerReady = false

this.eachBatch = batchHandler
}

Expand All @@ -197,6 +200,7 @@ export class IngestionConsumer {
topicMetadataRefreshInterval: this.pluginsServer.KAFKA_TOPIC_METADATA_REFRESH_INTERVAL_MS,
eachBatch: (payload) => this.eachBatchConsumer(payload),
})
this.consumerReady = true
return this.consumer
}

Expand All @@ -212,6 +216,8 @@ export class IngestionConsumer {
} catch (error) {
status.error('⚠️', 'An error occurred while stopping Kafka queue:\n', error)
}

this.consumerReady = false
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Consumer, Kafka } from 'kafkajs'

import { KAFKA_EVENTS_JSON, prefix as KAFKA_PREFIX } from '../../config/kafka-topics'
import { Hub, PluginServerService, PluginsServerConfig } from '../../types'
import { Hub, PluginsServerConfig } from '../../types'
import { PostgresRouter } from '../../utils/db/postgres'
import { status } from '../../utils/status'
import { ActionManager } from '../../worker/ingestion/action-manager'
Expand All @@ -20,7 +20,7 @@ export const startAsyncOnEventHandlerConsumer = async ({
hub, // TODO: remove needing to pass in the whole hub and be more selective on dependency injection.
}: {
hub: Hub
}): Promise<PluginServerService> => {
}) => {
/*
Consumes analytics events from the Kafka topic `clickhouse_events_json`
and processes any onEvent plugin handlers configured for the team.
Expand All @@ -35,11 +35,9 @@ export const startAsyncOnEventHandlerConsumer = async ({
await hub.actionManager.start()
await queue.start()

return {
id: 'on-event-ingestion',
healthcheck: makeHealthCheck(queue.consumer, queue.sessionTimeout),
onShutdown: async () => await queue.stop(),
}
const isHealthy = makeHealthCheck(queue.consumer, queue.sessionTimeout)

return { queue, isHealthy: () => isHealthy() }
}

export const startAsyncWebhooksHandlerConsumer = async ({
Expand All @@ -64,7 +62,7 @@ export const startAsyncWebhooksHandlerConsumer = async ({
groupTypeManager: GroupTypeManager
actionMatcher: ActionMatcher
actionManager: ActionManager
}): Promise<PluginServerService> => {
}) => {
/*
Consumes analytics events from the Kafka topic `clickhouse_events_json`
and processes any onEvent plugin handlers configured for the team.
Expand Down Expand Up @@ -108,24 +106,23 @@ export const startAsyncWebhooksHandlerConsumer = async ({
),
})

const onShutdown = async () => {
await actionManager.stop()
try {
await consumer.stop()
} catch (e) {
status.error('🚨', 'Error stopping consumer', e)
}
try {
await consumer.disconnect()
} catch (e) {
status.error('🚨', 'Error disconnecting consumer', e)
}
}
const isHealthy = makeHealthCheck(consumer, serverConfig.KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS)

return {
id: 'webhooks-ingestion',
healthcheck: makeHealthCheck(consumer, serverConfig.KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS),
onShutdown,
stop: async () => {
await actionManager.stop()
try {
await consumer.stop()
} catch (e) {
status.error('🚨', 'Error stopping consumer', e)
}
try {
await consumer.disconnect()
} catch (e) {
status.error('🚨', 'Error disconnecting consumer', e)
}
},
isHealthy,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ import { Batch, EachBatchHandler, Kafka } from 'kafkajs'
import { KafkaProducerWrapper } from 'utils/db/kafka-producer-wrapper'

import { KAFKA_SCHEDULED_TASKS, KAFKA_SCHEDULED_TASKS_DLQ } from '../../config/kafka-topics'
import { PluginServerService, PluginsServerConfig } from '../../types'
import { PluginsServerConfig } from '../../types'
import { DependencyUnavailableError } from '../../utils/db/error'
import { status } from '../../utils/status'
import Piscina from '../../worker/piscina'
import { instrumentEachBatchKafkaJS, setupEventHandlers } from './kafka-queue'
import { latestOffsetTimestampGauge, scheduledTaskCounter } from './metrics'
import { makeHealthCheck } from './on-event-handler-consumer'

// The valid task types that can be scheduled.
// TODO: not sure if there is another place that defines these but it would be good to unify.
Expand All @@ -26,7 +25,7 @@ export const startScheduledTasksConsumer = async ({
piscina: Piscina
serverConfig: PluginsServerConfig
partitionConcurrency: number
}): Promise<PluginServerService> => {
}) => {
/*
Consumes from the scheduled tasks topic, and executes them within a
Expand Down Expand Up @@ -139,12 +138,11 @@ export const startScheduledTasksConsumer = async ({
},
})

const healthcheck = makeHealthCheck(consumer, serverConfig.KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS)

return {
id: 'scheduled-tasks-consumer',
healthcheck: async () => await healthcheck(),
onShutdown: async () => await consumer.stop(),
...consumer,
stop: async () => {
await consumer.stop()
},
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer'
import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../../../kafka/config'
import { createKafkaProducer } from '../../../kafka/producer'
import { PluginServerService, PluginsServerConfig, RedisPool, TeamId, ValueMatcher } from '../../../types'
import { PluginsServerConfig, RedisPool, TeamId, ValueMatcher } from '../../../types'
import { BackgroundRefresher } from '../../../utils/background-refresher'
import { KafkaProducerWrapper } from '../../../utils/db/kafka-producer-wrapper'
import { PostgresRouter } from '../../../utils/db/postgres'
Expand Down Expand Up @@ -235,15 +235,6 @@ export class SessionRecordingIngester {
}, 10000)
}

public get service(): PluginServerService {
return {
id: 'session-recordings-blob-overflow',
onShutdown: async () => await this.stop(),
healthcheck: () => this.isHealthy() ?? false,
batchConsumer: this.batchConsumer,
}
}

private get connectedBatchConsumer(): KafkaConsumer | undefined {
// Helper to only use the batch consumer if we are actually connected to it - otherwise it will throw errors
const consumer = this.batchConsumer?.consumer
Expand Down
Loading

0 comments on commit 3f56346

Please sign in to comment.