Skip to content

Commit

Permalink
chore(plugin-server): remove INGESTION_DELAY_WRITE_ACKS and workerMet…
Browse files Browse the repository at this point in the history
…hods (#17932)

* chore: stop using piscina worker methods for runEventPipeline

* chore(plugin-server): remove INGESTION_DELAY_WRITE_ACKS

---------

Co-authored-by: Tiina Turban <[email protected]>
  • Loading branch information
bretthoerner and tiina303 authored Oct 16, 2023
1 parent 0cf731a commit 286b689
Show file tree
Hide file tree
Showing 14 changed files with 140 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ afterAll(async () => {
await dlqConsumer.disconnect()
})

test.concurrent('consumer handles messages just less than 1MB gracefully', async () => {
test.concurrent('consumer handles messages just over 1MB gracefully', async () => {
// For this we basically want the plugin-server to try and produce a new
// message larger than 1MB. We do this by creating a person with a lot of
// properties. We will end up denormalizing the person properties onto the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ import * as Sentry from '@sentry/node'
import { Message, MessageHeader } from 'node-rdkafka'

import { KAFKA_EVENTS_PLUGIN_INGESTION_DLQ, KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW } from '../../../config/kafka-topics'
import { Hub, PipelineEvent, WorkerMethods } from '../../../types'
import { Hub, PipelineEvent } from '../../../types'
import { formPipelineEvent } from '../../../utils/event'
import { status } from '../../../utils/status'
import { ConfiguredLimiter, LoggingLimiter, WarningLimiter } from '../../../utils/token-bucket'
import { EventPipelineResult } from '../../../worker/ingestion/event-pipeline/runner'
import { EventPipelineResult, runEventPipeline } from '../../../worker/ingestion/event-pipeline/runner'
import { captureIngestionWarning } from '../../../worker/ingestion/utils'
import { ingestionPartitionKeyOverflowed } from '../analytics-events-ingestion-consumer'
import { IngestionConsumer } from '../kafka-queue'
Expand Down Expand Up @@ -40,13 +40,66 @@ type IngestResult = {
promises?: Array<Promise<void>>
}

async function handleProcessingError(
error: any,
message: Message,
pluginEvent: PipelineEvent,
queue: IngestionConsumer
) {
status.error('🔥', `Error processing message`, {
stack: error.stack,
error: error,
})

// If the error is a non-retriable error, push to the dlq and commit the offset. Else raise the
// error.
//
// NOTE: there is behavior to push to a DLQ at the moment within EventPipelineRunner. This
// doesn't work so well with e.g. messages that when sent to the DLQ is it's self too large.
// Here we explicitly do _not_ add any additional metadata to the message. We might want to add
// some metadata to the message e.g. in the header or reference e.g. the sentry event id.
//
// TODO: property abstract out this `isRetriable` error logic. This is currently relying on the
// fact that node-rdkafka adheres to the `isRetriable` interface.
if (error?.isRetriable === false) {
const sentryEventId = Sentry.captureException(error)
const headers: MessageHeader[] = message.headers ?? []
headers.push({ ['sentry-event-id']: sentryEventId })
headers.push({ ['event-id']: pluginEvent.uuid })
try {
await queue.pluginsServer.kafkaProducer.produce({
topic: KAFKA_EVENTS_PLUGIN_INGESTION_DLQ,
value: message.value,
key: message.key,
headers: headers,
waitForAck: true,
})
} catch (error) {
// If we can't send to the DLQ and it's not retriable, just continue. We'll commit the
// offset and move on.
if (error?.isRetriable === false) {
status.error('🔥', `Error pushing to DLQ`, {
stack: error.stack,
error: error,
})
return
}

// If we can't send to the DLQ and it is retriable, raise the error.
throw error
}
} else {
throw error
}
}

export async function eachBatchParallelIngestion(
messages: Message[],
queue: IngestionConsumer,
overflowMode: IngestionOverflowMode
): Promise<void> {
async function eachMessage(event: PipelineEvent, queue: IngestionConsumer): Promise<IngestResult> {
return ingestEvent(queue.pluginsServer, queue.workerMethods, event)
return ingestEvent(queue.pluginsServer, event)
}

const batchStartTimer = new Date()
Expand Down Expand Up @@ -102,62 +155,16 @@ export async function eachBatchParallelIngestion(
for (const { message, pluginEvent } of currentBatch) {
try {
const result = await eachMessage(pluginEvent, queue)
if (result.promises) {
processingPromises.push(...result.promises)
}
} catch (error) {
status.error('🔥', `Error processing message`, {
stack: error.stack,
error: error,
})

// If there error is a non-retriable error, push
// to the dlq and commit the offset. Else raise the
// error.
//
// NOTE: there is behavior to push to a DLQ at the
// moment within EventPipelineRunner. This doesn't work
// so well with e.g. messages that when sent to the DLQ
// is it's self too large. Here we explicitly do _not_
// add any additional metadata to the message. We might
// want to add some metadata to the message e.g. in the
// header or reference e.g. the sentry event id.
//
// TODO: property abstract out this `isRetriable` error
// logic. This is currently relying on the fact that
// node-rdkafka adheres to the `isRetriable` interface.
if (error?.isRetriable === false) {
const sentryEventId = Sentry.captureException(error)
const headers: MessageHeader[] = message.headers ?? []
headers.push({ ['sentry-event-id']: sentryEventId })
headers.push({ ['event-id']: pluginEvent.uuid })
try {
await queue.pluginsServer.kafkaProducer.produce({
topic: KAFKA_EVENTS_PLUGIN_INGESTION_DLQ,
value: message.value,
key: message.key,
headers: headers,
waitForAck: true,

for (const promise of result.promises ?? []) {
processingPromises.push(
promise.catch(async (error) => {
await handleProcessingError(error, message, pluginEvent, queue)
})
} catch (error) {
// If we can't send to the DLQ and it's not
// retriable, just continue. We'll commit the
// offset and move on.
if (error?.isRetriable === false) {
status.error('🔥', `Error pushing to DLQ`, {
stack: error.stack,
error: error,
})
continue
}

// If we can't send to the DLQ and it is
// retriable, raise the error.
throw error
}
} else {
throw error
)
}
} catch (error) {
await handleProcessingError(error, message, pluginEvent, queue)
}
}

Expand Down Expand Up @@ -236,7 +243,6 @@ export async function eachBatchParallelIngestion(

async function ingestEvent(
server: Hub,
workerMethods: WorkerMethods,
event: PipelineEvent,
checkAndPause?: () => void // pause incoming messages if we are slow in getting them out again
): Promise<EventPipelineResult> {
Expand All @@ -247,7 +253,8 @@ async function ingestEvent(
server.statsd?.increment('kafka_queue_ingest_event_hit', {
pipeline: 'runEventPipeline',
})
const result = await workerMethods.runEventPipeline(event)

const result = await runEventPipeline(server, event)

server.statsd?.timing('kafka_queue.each_event', eachEventStartTimer)
countAndLogEvents()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
import * as Sentry from '@sentry/node'
import { StatsD } from 'hot-shots'
import { EachBatchPayload, KafkaMessage } from 'kafkajs'
import { Counter } from 'prom-client'
import { ActionMatcher } from 'worker/ingestion/action-matcher'

import { PostIngestionEvent, RawClickHouseEvent } from '../../../types'
import { DependencyUnavailableError } from '../../../utils/db/error'
import { convertToIngestionEvent, convertToProcessedPluginEvent } from '../../../utils/event'
import { status } from '../../../utils/status'
import { processWebhooksStep } from '../../../worker/ingestion/event-pipeline/runAsyncHandlersStep'
import { silentFailuresAsyncHandlers } from '../../../worker/ingestion/event-pipeline/runner'
import { HookCommander } from '../../../worker/ingestion/hooks'
import { runInstrumentedFunction } from '../../utils'
import { eventDroppedCounter, latestOffsetTimestampGauge } from '../metrics'

// Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals
require('@sentry/tracing')

export const silentFailuresAsyncHandlers = new Counter({
name: 'async_handlers_silent_failure',
help: 'Number silent failures from async handlers.',
})
// exporting only for testing
export function groupIntoBatchesByUsage(
array: KafkaMessage[],
Expand Down
26 changes: 1 addition & 25 deletions plugin-server/src/main/ingestion-queues/kafka-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { Counter } from 'prom-client'

import { BatchConsumer, startBatchConsumer } from '../../kafka/batch-consumer'
import { createRdConnectionConfigFromEnvVars } from '../../kafka/config'
import { Hub, PipelineEvent, WorkerMethods } from '../../types'
import { Hub } from '../../types'
import { KafkaConfig } from '../../utils/db/hub'
import { timeoutGuard } from '../../utils/db/utils'
import { status } from '../../utils/status'
Expand All @@ -23,7 +23,6 @@ type KafkaJSBatchFunction = (payload: EachBatchPayload, queue: KafkaJSIngestionC

export class KafkaJSIngestionConsumer {
public pluginsServer: Hub
public workerMethods: WorkerMethods
public consumerReady: boolean
public topic: string
public consumerGroupId: string
Expand Down Expand Up @@ -54,17 +53,6 @@ export class KafkaJSIngestionConsumer {
)
this.wasConsumerRan = false

// TODO: remove `this.workerMethods` and just rely on
// `this.batchHandler`. At the time of writing however, there are some
// references to queue.workerMethods buried deep in the codebase
// #onestepatatime
this.workerMethods = {
runEventPipeline: (event: PipelineEvent) => {
this.pluginsServer.lastActivity = new Date().valueOf()
this.pluginsServer.lastActivityType = 'runEventPipeline'
return piscina.run({ task: 'runEventPipeline', args: { event } })
},
}
this.consumerGroupMemberId = null
this.consumerReady = false

Expand Down Expand Up @@ -198,7 +186,6 @@ type EachBatchFunction = (messages: Message[], queue: IngestionConsumer) => Prom

export class IngestionConsumer {
public pluginsServer: Hub
public workerMethods: WorkerMethods
public consumerReady: boolean
public topic: string
public consumerGroupId: string
Expand All @@ -216,17 +203,6 @@ export class IngestionConsumer {
this.topic = topic
this.consumerGroupId = consumerGroupId

// TODO: remove `this.workerMethods` and just rely on
// `this.batchHandler`. At the time of writing however, there are some
// references to queue.workerMethods buried deep in the codebase
// #onestepatatime
this.workerMethods = {
runEventPipeline: (event: PipelineEvent) => {
this.pluginsServer.lastActivity = new Date().valueOf()
this.pluginsServer.lastActivityType = 'runEventPipeline'
return piscina.run({ task: 'runEventPipeline', args: { event } })
},
}
this.consumerReady = false

this.eachBatch = batchHandler
Expand Down
5 changes: 0 additions & 5 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import { KafkaProducerWrapper } from './utils/db/kafka-producer-wrapper'
import { PostgresRouter } from './utils/db/postgres'
import { UUID } from './utils/utils'
import { AppMetrics } from './worker/ingestion/app-metrics'
import { EventPipelineResult } from './worker/ingestion/event-pipeline/runner'
import { OrganizationManager } from './worker/ingestion/organization-manager'
import { EventsProcessor } from './worker/ingestion/process-event'
import { TeamManager } from './worker/ingestion/team-manager'
Expand Down Expand Up @@ -477,10 +476,6 @@ export interface PluginTask {
__ignoreForAppMetrics?: boolean
}

export type WorkerMethods = {
runEventPipeline: (event: PipelineEvent) => Promise<EventPipelineResult>
}

export type VMMethods = {
setupPlugin?: () => Promise<void>
teardownPlugin?: () => Promise<void>
Expand Down
31 changes: 12 additions & 19 deletions plugin-server/src/worker/ingestion/event-pipeline/runner.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { PluginEvent, ProcessedPluginEvent } from '@posthog/plugin-scaffold'
import { PluginEvent } from '@posthog/plugin-scaffold'
import * as Sentry from '@sentry/node'
import { Counter } from 'prom-client'

Expand All @@ -7,7 +7,6 @@ import { runInSpan } from '../../../sentry'
import { Hub, PipelineEvent } from '../../../types'
import { DependencyUnavailableError } from '../../../utils/db/error'
import { timeoutGuard } from '../../../utils/db/utils'
import { stringToBoolean } from '../../../utils/env-utils'
import { status } from '../../../utils/status'
import { generateEventDeadLetterQueueMessage } from '../utils'
import { createEventStep } from './createEventStep'
Expand All @@ -16,10 +15,6 @@ import { populateTeamDataStep } from './populateTeamDataStep'
import { prepareEventStep } from './prepareEventStep'
import { processPersonsStep } from './processPersonsStep'

export const silentFailuresAsyncHandlers = new Counter({
name: 'async_handlers_silent_failure',
help: 'Number silent failures from async handlers.',
})
const pipelineStepCompletionCounter = new Counter({
name: 'events_pipeline_step_executed_total',
help: 'Number of events that have completed the step',
Expand Down Expand Up @@ -58,21 +53,22 @@ class StepErrorNoRetry extends Error {
}
}

export async function runEventPipeline(hub: Hub, event: PipelineEvent): Promise<EventPipelineResult> {
const runner = new EventPipelineRunner(hub, event)
return runner.runEventPipeline(event)
}

export class EventPipelineRunner {
hub: Hub
originalEvent: PipelineEvent | ProcessedPluginEvent
originalEvent: PipelineEvent

// See https://docs.google.com/document/d/12Q1KcJ41TicIwySCfNJV5ZPKXWVtxT7pzpB3r9ivz_0
poEEmbraceJoin: boolean
private delayAcks: boolean

constructor(hub: Hub, originalEvent: PipelineEvent | ProcessedPluginEvent, poEEmbraceJoin = false) {
constructor(hub: Hub, event: PipelineEvent, poEEmbraceJoin = false) {
this.hub = hub
this.originalEvent = originalEvent
this.poEEmbraceJoin = poEEmbraceJoin

// TODO: remove after successful rollout
this.delayAcks = stringToBoolean(process.env.INGESTION_DELAY_WRITE_ACKS)
this.originalEvent = event
}

isEventBlacklisted(event: PipelineEvent): boolean {
Expand All @@ -87,6 +83,7 @@ export class EventPipelineRunner {
}

async runEventPipeline(event: PipelineEvent): Promise<EventPipelineResult> {
this.originalEvent = event
this.hub.statsd?.increment('kafka_queue.event_pipeline.start', { pipeline: 'event' })

try {
Expand Down Expand Up @@ -147,12 +144,8 @@ export class EventPipelineRunner {
[this, preparedEvent, person],
event.team_id
)
if (this.delayAcks) {
return this.registerLastStep('createEventStep', event.team_id, [rawClickhouseEvent, person], [eventAck])
} else {
await eventAck
return this.registerLastStep('createEventStep', event.team_id, [rawClickhouseEvent, person])
}

return this.registerLastStep('createEventStep', event.team_id, [rawClickhouseEvent, person], [eventAck])
}

registerLastStep(
Expand Down
7 changes: 1 addition & 6 deletions plugin-server/src/worker/tasks.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { PluginEvent } from '@posthog/plugin-scaffold/src/types'

import { EnqueuedPluginJob, Hub, PipelineEvent, PluginTaskType } from '../types'
import { EventPipelineRunner } from './ingestion/event-pipeline/runner'
import { EnqueuedPluginJob, Hub, PluginTaskType } from '../types'
import { loadSchedule } from './plugins/loadSchedule'
import { runPluginTask, runProcessEvent } from './plugins/run'
import { setupPlugins } from './plugins/setup'
Expand All @@ -28,10 +27,6 @@ export const workerTasks: Record<string, TaskRunner> = {
pluginScheduleReady: (hub) => {
return hub.pluginSchedule !== null
},
runEventPipeline: async (hub, args: { event: PipelineEvent }) => {
const runner = new EventPipelineRunner(hub, args.event)
return await runner.runEventPipeline(args.event)
},
reloadPlugins: async (hub) => {
await setupPlugins(hub)
},
Expand Down
8 changes: 2 additions & 6 deletions plugin-server/src/worker/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,8 @@ export const createTaskRunner =
}
return response
},
(transactionDuration: number) => {
if (task === 'runEventPipeline') {
return transactionDuration > 0.5 ? 1 : 0.01
} else {
return 1
}
(_) => {
return 1
}
)

Expand Down
Loading

0 comments on commit 286b689

Please sign in to comment.