Skip to content

Commit

Permalink
Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite committed Apr 26, 2024
1 parent beb5f35 commit ed7b016
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import { Consumer, Kafka } from 'kafkajs'
import { AppMetrics } from 'worker/ingestion/app-metrics'
import { RustyHook } from 'worker/rusty-hook'

import { KAFKA_EVENTS_JSON, prefix as KAFKA_PREFIX } from '../../config/kafka-topics'
import { Hub, PluginsServerConfig } from '../../types'
import { PostgresRouter } from '../../utils/db/postgres'
import { status } from '../../utils/status'
import { ActionManager } from '../../worker/ingestion/action-manager'
import { ActionMatcher } from '../../worker/ingestion/action-matcher'
import { AppMetrics } from '../../worker/ingestion/app-metrics'
import { HookCommander } from '../../worker/ingestion/hooks'
import { OrganizationManager } from '../../worker/ingestion/organization-manager'
import { TeamManager } from '../../worker/ingestion/team-manager'
import { RustyHook } from '../../worker/rusty-hook'
import { eachBatchAppsOnEventHandlers } from './batch-processing/each-batch-onevent'
import { eachBatchWebhooksHandlers } from './batch-processing/each-batch-webhooks'
import { KafkaJSIngestionConsumer, setupEventHandlers } from './kafka-queue'
Expand All @@ -30,6 +31,7 @@ export const startAsyncOnEventHandlerConsumer = async ({

const queue = buildOnEventIngestionConsumer({ hub })

await hub.actionManager.start()
await queue.start()

const isHealthy = makeHealthCheck(queue.consumer, queue.sessionTimeout)
Expand All @@ -43,6 +45,7 @@ export const startAsyncWebhooksHandlerConsumer = async ({
teamManager,
organizationManager,
actionMatcher,
actionManager,
serverConfig,
rustyHook,
appMetrics,
Expand All @@ -55,6 +58,7 @@ export const startAsyncWebhooksHandlerConsumer = async ({
rustyHook: RustyHook
appMetrics: AppMetrics
actionMatcher: ActionMatcher
actionManager: ActionManager
}) => {
/*
Consumes analytics events from the Kafka topic `clickhouse_events_json`
Expand Down Expand Up @@ -84,6 +88,7 @@ export const startAsyncWebhooksHandlerConsumer = async ({
)
const concurrency = serverConfig.TASKS_PER_WORKER || 20

await actionManager.start()
await consumer.subscribe({ topic: KAFKA_EVENTS_JSON, fromBeginning: false })
await consumer.run({
eachBatch: (payload) => eachBatchWebhooksHandlers(payload, actionMatcher, hookCannon, concurrency),
Expand Down
52 changes: 41 additions & 11 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,23 @@ import { Counter } from 'prom-client'
import v8Profiler from 'v8-profiler-next'

import { getPluginServerCapabilities } from '../capabilities'
import { defaultConfig, sessionRecordingConsumerConfig } from '../config/config'
import { buildIntegerMatcher, defaultConfig, sessionRecordingConsumerConfig } from '../config/config'
import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types'
import { createHub, createKafkaProducerWrapper } from '../utils/db/hub'
import { createHub, createKafkaClient, createKafkaProducerWrapper } from '../utils/db/hub'
import { PostgresRouter } from '../utils/db/postgres'
import { cancelAllScheduledJobs } from '../utils/node-schedule'
import { PeriodicTask } from '../utils/periodic-task'
import { PubSub } from '../utils/pubsub'
import { status } from '../utils/status'
import { createRedisClient, delay } from '../utils/utils'
import { ActionManager } from '../worker/ingestion/action-manager'
import { ActionMatcher } from '../worker/ingestion/action-matcher'
import { AppMetrics } from '../worker/ingestion/app-metrics'
import { OrganizationManager } from '../worker/ingestion/organization-manager'
import { DeferredPersonOverrideWorker, FlatPersonOverrideWriter } from '../worker/ingestion/person-state'
import { TeamManager } from '../worker/ingestion/team-manager'
import Piscina, { makePiscina as defaultMakePiscina } from '../worker/piscina'
import { RustyHook } from '../worker/rusty-hook'
import { GraphileWorker } from './graphile-worker/graphile-worker'
import { loadPluginSchedule } from './graphile-worker/schedule'
import { startGraphileWorker } from './graphile-worker/worker-setup'
Expand Down Expand Up @@ -357,18 +363,42 @@ export async function startPluginsServer(
}

if (capabilities.processAsyncWebhooksHandlers) {
;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities)
// we need to create them. We only initialize the ones we need.
const postgres = hub?.postgres ?? new PostgresRouter(serverConfig)
const kafka = hub?.kafka ?? createKafkaClient(serverConfig)
const teamManager = hub?.teamManager ?? new TeamManager(postgres, serverConfig)
const organizationManager = hub?.organizationManager ?? new OrganizationManager(postgres, teamManager)
const KafkaProducerWrapper = hub?.kafkaProducer ?? (await createKafkaProducerWrapper(serverConfig))
const rustyHook =
hub?.rustyHook ??
new RustyHook(
buildIntegerMatcher(serverConfig.RUSTY_HOOK_FOR_TEAMS, true),
serverConfig.RUSTY_HOOK_ROLLOUT_PERCENTAGE,
serverConfig.RUSTY_HOOK_URL,
serverConfig.EXTERNAL_REQUEST_TIMEOUT_MS
)
const appMetrics =
hub?.appMetrics ??
new AppMetrics(
KafkaProducerWrapper,
serverConfig.APP_METRICS_FLUSH_FREQUENCY_MS,
serverConfig.APP_METRICS_FLUSH_MAX_QUEUE_SIZE
)

const actionManager = hub?.actionManager ?? new ActionManager(postgres, serverConfig)
const actionMatcher = hub?.actionMatcher ?? new ActionMatcher(postgres, actionManager)

const { stop: webhooksStopConsumer, isHealthy: isWebhooksIngestionHealthy } =
await startAsyncWebhooksHandlerConsumer({
postgres: hub.postgres,
kafka: hub.kafka,
teamManager: hub.teamManager,
organizationManager: hub.organizationManager,
serverConfig: serverConfig,
rustyHook: hub.rustyHook,
appMetrics: hub.appMetrics,
actionMatcher: hub.actionMatcher,
postgres,
kafka,
teamManager,
organizationManager,
serverConfig,
rustyHook,
appMetrics,
actionMatcher,
actionManager,
})

stopWebhooksHandlerConsumer = webhooksStopConsumer
Expand Down
1 change: 0 additions & 1 deletion plugin-server/src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ export async function createHub(
)

const actionManager = new ActionManager(postgres, serverConfig)
await actionManager.start() // TODO: Do we want this to happen here?
const actionMatcher = new ActionMatcher(postgres, actionManager)

const enqueuePluginJob = async (job: EnqueuedPluginJob) => {
Expand Down

0 comments on commit ed7b016

Please sign in to comment.