Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite committed Jun 19, 2024
1 parent b06ac52 commit c0fb8c8
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 85 deletions.
62 changes: 25 additions & 37 deletions plugin-server/src/cdp/cdp-consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,14 @@ import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars
import { createKafkaProducer } from '../kafka/producer'
import { addSentryBreadcrumbsEventListeners } from '../main/ingestion-queues/kafka-metrics'
import { runInstrumentedFunction } from '../main/utils'
import { GroupTypeToColumnIndex, Hub, PluginsServerConfig, RawClickHouseEvent, TeamId, TimestampFormat } from '../types'
import { GroupTypeToColumnIndex, Hub, RawClickHouseEvent, TeamId, TimestampFormat } from '../types'
import { KafkaProducerWrapper } from '../utils/db/kafka-producer-wrapper'
import { PostgresRouter } from '../utils/db/postgres'
import { status } from '../utils/status'
import { castTimestampOrNow } from '../utils/utils'
import { AppMetrics } from '../worker/ingestion/app-metrics'
import { GroupTypeManager } from '../worker/ingestion/group-type-manager'
import { OrganizationManager } from '../worker/ingestion/organization-manager'
import { TeamManager } from '../worker/ingestion/team-manager'
import { RustyHook } from '../worker/rusty-hook'
import { AsyncFunctionExecutor } from './async-function-executor'
import { addLog, HogExecutor } from './hog-executor'
import { HogFunctionManager } from './hog-function-manager'
Expand Down Expand Up @@ -63,22 +61,21 @@ abstract class CdpConsumerBase {
hogFunctionManager: HogFunctionManager
asyncFunctionExecutor?: AsyncFunctionExecutor
hogExecutor: HogExecutor
appMetrics?: AppMetrics
appMetrics: AppMetrics
isStopping = false

protected kafkaProducer?: KafkaProducerWrapper
protected abstract name: string
protected abstract topic: string
protected abstract consumerGroupId: string

constructor(protected config: PluginsServerConfig, protected hub?: Hub) {
const postgres = hub?.postgres ?? new PostgresRouter(config)

this.teamManager = new TeamManager(postgres, config)
this.organizationManager = new OrganizationManager(postgres, this.teamManager)
this.groupTypeManager = new GroupTypeManager(postgres, this.teamManager)
this.hogFunctionManager = new HogFunctionManager(postgres, config)
this.hogExecutor = new HogExecutor(this.config, this.hogFunctionManager)
constructor(protected hub: Hub) {
this.teamManager = hub.teamManager
this.organizationManager = hub.organizationManager
this.groupTypeManager = hub.groupTypeManager
this.hogFunctionManager = new HogFunctionManager(hub.postgres, hub)
this.hogExecutor = new HogExecutor(hub, this.hogFunctionManager)
this.appMetrics = hub.appMetrics
}

public abstract handleEachBatch(messages: Message[], heartbeat: () => void): Promise<void>
Expand Down Expand Up @@ -146,46 +143,37 @@ abstract class CdpConsumerBase {
})

// NOTE: This is the only place where we need to use the shared server config
const globalConnectionConfig = createRdConnectionConfigFromEnvVars(this.config)
const globalProducerConfig = createRdProducerConfigFromEnvVars(this.config)
const globalConnectionConfig = createRdConnectionConfigFromEnvVars(this.hub)
const globalProducerConfig = createRdProducerConfigFromEnvVars(this.hub)

await this.hogFunctionManager.start()

this.kafkaProducer = new KafkaProducerWrapper(
await createKafkaProducer(globalConnectionConfig, globalProducerConfig)
)

const rustyHook = this.hub?.rustyHook ?? new RustyHook(this.config)
this.asyncFunctionExecutor = new AsyncFunctionExecutor(this.config, rustyHook)

this.appMetrics =
this.hub?.appMetrics ??
new AppMetrics(
this.kafkaProducer,
this.config.APP_METRICS_FLUSH_FREQUENCY_MS,
this.config.APP_METRICS_FLUSH_MAX_QUEUE_SIZE
)
this.asyncFunctionExecutor = new AsyncFunctionExecutor(this.hub, this.hub.rustyHook)
this.kafkaProducer.producer.connect()

this.batchConsumer = await startBatchConsumer({
connectionConfig: createRdConnectionConfigFromEnvVars(this.config),
connectionConfig: createRdConnectionConfigFromEnvVars(this.hub),
groupId: this.consumerGroupId,
topic: this.topic,
autoCommit: true,
sessionTimeout: this.config.KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS,
maxPollIntervalMs: this.config.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS,
sessionTimeout: this.hub.KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS,
maxPollIntervalMs: this.hub.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS,
// the largest size of a message that can be fetched by the consumer.
// the largest size our MSK cluster allows is 20MB
// we only use 9 or 10MB but there's no reason to limit this 🤷️
consumerMaxBytes: this.config.KAFKA_CONSUMPTION_MAX_BYTES,
consumerMaxBytesPerPartition: this.config.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION,
consumerMaxBytes: this.hub.KAFKA_CONSUMPTION_MAX_BYTES,
consumerMaxBytesPerPartition: this.hub.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION,
// our messages are very big, so we don't want to buffer too many
// queuedMinMessages: this.config.KAFKA_QUEUE_SIZE,
consumerMaxWaitMs: this.config.KAFKA_CONSUMPTION_MAX_WAIT_MS,
consumerErrorBackoffMs: this.config.KAFKA_CONSUMPTION_ERROR_BACKOFF_MS,
fetchBatchSize: this.config.INGESTION_BATCH_SIZE,
batchingTimeoutMs: this.config.KAFKA_CONSUMPTION_BATCHING_TIMEOUT_MS,
topicCreationTimeoutMs: this.config.KAFKA_TOPIC_CREATION_TIMEOUT_MS,
// queuedMinMessages: this.hub.KAFKA_QUEUE_SIZE,
consumerMaxWaitMs: this.hub.KAFKA_CONSUMPTION_MAX_WAIT_MS,
consumerErrorBackoffMs: this.hub.KAFKA_CONSUMPTION_ERROR_BACKOFF_MS,
fetchBatchSize: this.hub.INGESTION_BATCH_SIZE,
batchingTimeoutMs: this.hub.KAFKA_CONSUMPTION_BATCHING_TIMEOUT_MS,
topicCreationTimeoutMs: this.hub.KAFKA_TOPIC_CREATION_TIMEOUT_MS,
eachBatch: async (messages, { heartbeat }) => {
status.info('🔁', `${this.name} - handling batch`, {
size: messages.length,
Expand Down Expand Up @@ -302,7 +290,7 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase {
convertToHogFunctionInvocationGlobals(
convertToParsedClickhouseEvent(clickHouseEvent),
team,
this.config.SITE_URL ?? 'http://localhost:8000',
this.hub.SITE_URL ?? 'http://localhost:8000',
groupTypes
)
)
Expand Down Expand Up @@ -408,7 +396,7 @@ export class CdpFunctionCallbackConsumer extends CdpConsumerBase {
const globals = convertToHogFunctionInvocationGlobals(
event,
team,
this.config.SITE_URL ?? 'http://localhost:8000',
this.hub.SITE_URL ?? 'http://localhost:8000',
groupTypes
)

Expand Down
9 changes: 3 additions & 6 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export async function startPluginsServer(
makePiscina: (serverConfig: PluginsServerConfig, hub: Hub) => Promise<Piscina> = defaultMakePiscina,
capabilitiesOverride?: PluginServerCapabilities
): Promise<ServerInstance> {
const timer = new Date()
const startTime = new Date()

const serverConfig: PluginsServerConfig = {
...defaultConfig,
Expand All @@ -75,14 +75,11 @@ export async function startPluginsServer(

let httpServer: Server | undefined // server

let lastActivityCheck: NodeJS.Timeout | undefined

let shuttingDown = false

async function closeJobs(): Promise<void> {
shuttingDown = true
status.info('💤', ' Shutting down gracefully...')
lastActivityCheck && clearInterval(lastActivityCheck)

// HACKY: Stop all consumers and the graphile worker, as well as the
// http server. Note that we close the http server before the others to
Expand Down Expand Up @@ -318,7 +315,7 @@ export async function startPluginsServer(
}

// TODO: Should this only be running for this kind of capability?
pluginServerStartupTimeMs.inc(Date.now() - timer.valueOf())
pluginServerStartupTimeMs.inc(Date.now() - startTime.valueOf())
})

startCapabilities('ingestion', async () => {
Expand Down Expand Up @@ -477,7 +474,7 @@ export async function startPluginsServer(
})
}

status.info('🚀', `Finished Launching plugin server in ${Date.now() - timer.valueOf()}ms `)
status.info('🚀', `Finished Launching plugin server in ${Date.now() - startTime.valueOf()}ms `)

return serverInstance
} catch (error) {
Expand Down
10 changes: 4 additions & 6 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ import { Pool as GenericPool } from 'generic-pool'
import { Redis } from 'ioredis'
import { Kafka } from 'kafkajs'
import { DateTime } from 'luxon'
import { Job } from 'node-schedule'
import { VM } from 'vm2'
import { RustyHook } from 'worker/rusty-hook'

import { ObjectStorage } from './main/services/object_storage'
import { DB } from './utils/db/db'
Expand All @@ -27,9 +25,11 @@ import { UUID } from './utils/utils'
import { ActionManager } from './worker/ingestion/action-manager'
import { ActionMatcher } from './worker/ingestion/action-matcher'
import { AppMetrics } from './worker/ingestion/app-metrics'
import { GroupTypeManager } from './worker/ingestion/group-type-manager'
import { OrganizationManager } from './worker/ingestion/organization-manager'
import { EventsProcessor } from './worker/ingestion/process-event'
import { TeamManager } from './worker/ingestion/team-manager'
import { RustyHook } from './worker/rusty-hook'
import { PluginsApiKeyManager } from './worker/vm/extensions/helpers/api-key-manager'
import { RootAccessManager } from './worker/vm/extensions/helpers/root-acess-manager'
import { LazyPluginVM } from './worker/vm/lazy'
Expand Down Expand Up @@ -265,9 +265,8 @@ export interface Hub extends PluginsServerConfig {
clickhouse: ClickHouse
kafka: Kafka
kafkaProducer: KafkaProducerWrapper
objectStorage: ObjectStorage
// metrics
pluginMetricsJob: Job | undefined
objectStorage?: ObjectStorage
groupTypeManager: GroupTypeManager
// currently enabled plugin status
plugins: Map<PluginId, Plugin>
pluginConfigs: Map<PluginConfigId, PluginConfig>
Expand All @@ -289,7 +288,6 @@ export interface Hub extends PluginsServerConfig {
// geoip database, setup in workers
mmdb?: ReaderModel
// diagnostics
statelessVms: StatelessVmMap
conversionBufferEnabledTeams: Set<number>
// functions
enqueuePluginJob: (job: EnqueuedPluginJob) => Promise<void>
Expand Down
36 changes: 20 additions & 16 deletions plugin-server/src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
import { ActionManager } from '../../worker/ingestion/action-manager'
import { ActionMatcher } from '../../worker/ingestion/action-matcher'
import { AppMetrics } from '../../worker/ingestion/app-metrics'
import { GroupTypeManager } from '../../worker/ingestion/group-type-manager'
import { OrganizationManager } from '../../worker/ingestion/organization-manager'
import { EventsProcessor } from '../../worker/ingestion/process-event'
import { TeamManager } from '../../worker/ingestion/team-manager'
Expand Down Expand Up @@ -150,6 +151,7 @@ export async function createHub(

const actionManager = new ActionManager(postgres, serverConfig)
const actionMatcher = new ActionMatcher(postgres, actionManager, teamManager)
const groupTypeManager = new GroupTypeManager(postgres, teamManager, serverConfig.SITE_URL)

const enqueuePluginJob = async (job: EnqueuedPluginJob) => {
// NOTE: we use the producer directly here rather than using the wrapper
Expand All @@ -172,7 +174,15 @@ export async function createHub(
})
}

const hub: Partial<Hub> = {
const appMetrics = new AppMetrics(
kafkaProducer,
serverConfig.APP_METRICS_FLUSH_FREQUENCY_MS,
serverConfig.APP_METRICS_FLUSH_MAX_QUEUE_SIZE
)

const eventsProcessor = new EventsProcessor(serverConfig, db, kafkaProducer, teamManager, groupTypeManager)

const hub: Hub = {
...serverConfig,
instanceId,
capabilities,
Expand All @@ -183,7 +193,10 @@ export async function createHub(
kafka,
kafkaProducer,
enqueuePluginJob,
objectStorage: objectStorage,
objectStorage,
appMetrics,
groupTypeManager,
eventsProcessor,

plugins: new Map(),
pluginConfigs: new Map(),
Expand All @@ -207,29 +220,20 @@ export async function createHub(
eventsToDropByToken: createEventsToDropByToken(process.env.DROP_EVENTS_BY_TOKEN_DISTINCT_ID),
}

// :TODO: This is only used on worker threads, not main
hub.eventsProcessor = new EventsProcessor(hub as Hub)

hub.appMetrics = new AppMetrics(
kafkaProducer,
serverConfig.APP_METRICS_FLUSH_FREQUENCY_MS,
serverConfig.APP_METRICS_FLUSH_MAX_QUEUE_SIZE
)

const closeHub = async () => {
if (!isTestEnv()) {
await hub.appMetrics?.flush()
}
await Promise.allSettled([kafkaProducer.disconnect(), redisPool.drain(), hub.postgres?.end()])
await redisPool.clear()

// Break circular references to allow the hub to be GCed when running unit tests
// TODO: change these structs to not directly reference the hub
hub.eventsProcessor = undefined
hub.appMetrics = undefined
// // Break circular references to allow the hub to be GCed when running unit tests
// // TODO: change these structs to not directly reference the hub
// hub.eventsProcessor = undefined
// hub.appMetrics = undefined
}

return [hub as Hub, closeHub]
return [hub, closeHub]
}

export type KafkaConfig = {
Expand Down
33 changes: 13 additions & 20 deletions plugin-server/src/worker/ingestion/process-event.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import ClickHouse from '@posthog/clickhouse'
import { PluginEvent, Properties } from '@posthog/plugin-scaffold'
import * as Sentry from '@sentry/node'
import { DateTime } from 'luxon'
Expand All @@ -8,10 +7,10 @@ import {
ClickHouseTimestamp,
Element,
GroupTypeIndex,
Hub,
ISOTimestamp,
Person,
PersonMode,
PluginsServerConfig,
PreIngestionEvent,
RawClickHouseEvent,
Team,
Expand Down Expand Up @@ -44,26 +43,20 @@ const elementsOrElementsChainCounter = new Counter({
})

export class EventsProcessor {
pluginsServer: Hub
db: DB
clickhouse: ClickHouse
kafkaProducer: KafkaProducerWrapper
teamManager: TeamManager
groupTypeManager: GroupTypeManager
propertyDefinitionsManager: PropertyDefinitionsManager
private propertyDefinitionsManager: PropertyDefinitionsManager

constructor(pluginsServer: Hub) {
this.pluginsServer = pluginsServer
this.db = pluginsServer.db
this.clickhouse = pluginsServer.clickhouse
this.kafkaProducer = pluginsServer.kafkaProducer
this.teamManager = pluginsServer.teamManager
this.groupTypeManager = new GroupTypeManager(pluginsServer.postgres, this.teamManager, pluginsServer.SITE_URL)
constructor(
private config: PluginsServerConfig,
private db: DB,
private kafkaProducer: KafkaProducerWrapper,
private teamManager: TeamManager,
private groupTypeManager: GroupTypeManager
) {
this.propertyDefinitionsManager = new PropertyDefinitionsManager(
this.teamManager,
this.groupTypeManager,
pluginsServer.db,
pluginsServer
this.db,
config
)
}

Expand Down Expand Up @@ -155,7 +148,7 @@ export class EventsProcessor {
delete properties['$ip']
}

if (this.pluginsServer.SKIP_UPDATE_EVENT_AND_PROPERTIES_STEP === false) {
if (this.config.SKIP_UPDATE_EVENT_AND_PROPERTIES_STEP === false) {
try {
await this.propertyDefinitionsManager.updateEventNamesAndProperties(team.id, event, properties)
} catch (err) {
Expand Down Expand Up @@ -265,7 +258,7 @@ export class EventsProcessor {

const ack = this.kafkaProducer
.produce({
topic: this.pluginsServer.CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC,
topic: this.config.CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC,
key: uuid,
value: Buffer.from(JSON.stringify(rawEvent)),
waitForAck: true,
Expand Down

0 comments on commit c0fb8c8

Please sign in to comment.