Skip to content

Commit

Permalink
refactor(plugin-server): Remove a few layers of passthrough functions…
Browse files Browse the repository at this point in the history
… in event ingestion (#20540)
  • Loading branch information
tkaemming authored Feb 24, 2024
1 parent 2897711 commit 66933ca
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ import * as Sentry from '@sentry/node'
import { Message, MessageHeader } from 'node-rdkafka'

import { KAFKA_EVENTS_PLUGIN_INGESTION_DLQ, KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW } from '../../../config/kafka-topics'
import { Hub, PipelineEvent, ValueMatcher } from '../../../types'
import { PipelineEvent, ValueMatcher } from '../../../types'
import { formPipelineEvent } from '../../../utils/event'
import { retryIfRetriable } from '../../../utils/retries'
import { status } from '../../../utils/status'
import { ConfiguredLimiter, LoggingLimiter, OverflowWarningLimiter } from '../../../utils/token-bucket'
import { EventPipelineResult, runEventPipeline } from '../../../worker/ingestion/event-pipeline/runner'
import { EventPipelineRunner } from '../../../worker/ingestion/event-pipeline/runner'
import { captureIngestionWarning } from '../../../worker/ingestion/utils'
import { ingestionPartitionKeyOverflowed } from '../analytics-events-ingestion-consumer'
import { IngestionConsumer } from '../kafka-queue'
Expand Down Expand Up @@ -156,7 +156,8 @@ export async function eachBatchParallelIngestion(
for (const { message, pluginEvent } of currentBatch) {
try {
const result = (await retryIfRetriable(async () => {
return await ingestEvent(queue.pluginsServer, pluginEvent)
const runner = new EventPipelineRunner(queue.pluginsServer, pluginEvent)
return await runner.runEventPipeline(pluginEvent)
})) as IngestResult

result.promises?.forEach((promise) =>
Expand Down Expand Up @@ -243,16 +244,6 @@ export async function eachBatchParallelIngestion(
}
}

async function ingestEvent(
server: Hub,
event: PipelineEvent,
checkAndPause?: () => void // pause incoming messages if we are slow in getting them out again
): Promise<EventPipelineResult> {
checkAndPause?.()
const result = await runEventPipeline(server, event)
return result
}

function computeKey(pluginEvent: PipelineEvent): string {
return `${pluginEvent.team_id ?? pluginEvent.token}:${pluginEvent.distinct_id}`
}
Expand Down
6 changes: 0 additions & 6 deletions plugin-server/src/worker/ingestion/event-pipeline/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,6 @@ class StepErrorNoRetry extends Error {
this.args = args
}
}

export async function runEventPipeline(hub: Hub, event: PipelineEvent): Promise<EventPipelineResult> {
const runner = new EventPipelineRunner(hub, event)
return runner.runEventPipeline(event)
}

export class EventPipelineRunner {
hub: Hub
originalEvent: PipelineEvent
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/src/worker/ingestion/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export function generateEventDeadLetterQueueMessage(
teamId: number,
errorLocation = 'plugin_server_ingest_event'
): ProducerRecord {
let errorMessage = 'ingestEvent failed. '
let errorMessage = 'Event ingestion failed. '
if (error instanceof Error) {
errorMessage += `Error: ${error.message}`
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ import {
IngestionOverflowMode,
} from '../../../src/main/ingestion-queues/batch-processing/each-batch-ingestion'
import { ConfiguredLimiter } from '../../../src/utils/token-bucket'
import { runEventPipeline } from './../../../src/worker/ingestion/event-pipeline/runner'
import { captureIngestionWarning } from './../../../src/worker/ingestion/utils'

jest.mock('../../../src/utils/status')
jest.mock('./../../../src/worker/ingestion/utils')

const runEventPipeline = jest.fn().mockResolvedValue('default value')

jest.mock('./../../../src/worker/ingestion/event-pipeline/runner', () => ({
runEventPipeline: jest.fn().mockResolvedValue('default value'),
EventPipelineRunner: jest.fn().mockImplementation(() => ({
runEventPipeline: runEventPipeline,
})),
}))

const captureEndpointEvent1 = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@ import {
IngestionOverflowMode,
} from '../../../src/main/ingestion-queues/batch-processing/each-batch-ingestion'
import { OverflowWarningLimiter } from '../../../src/utils/token-bucket'
import { runEventPipeline } from './../../../src/worker/ingestion/event-pipeline/runner'
import { captureIngestionWarning } from './../../../src/worker/ingestion/utils'

jest.mock('../../../src/utils/status')
jest.mock('./../../../src/worker/ingestion/utils')

const runEventPipeline = jest.fn().mockResolvedValue('default value')

jest.mock('./../../../src/worker/ingestion/event-pipeline/runner', () => ({
runEventPipeline: jest.fn().mockResolvedValue('default value'),
EventPipelineRunner: jest.fn().mockImplementation(() => ({
runEventPipeline: runEventPipeline,
})),
}))

const captureEndpointEvent1 = {
Expand Down
26 changes: 11 additions & 15 deletions plugin-server/tests/main/ingestion-queues/each-batch.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import { ActionMatcher } from '../../../src/worker/ingestion/action-matcher'
import { HookCommander } from '../../../src/worker/ingestion/hooks'
import { runOnEvent } from '../../../src/worker/plugins/run'
import { pluginConfig39 } from '../../helpers/plugins'
import { runEventPipeline } from './../../../src/worker/ingestion/event-pipeline/runner'

jest.mock('../../../src/worker/plugins/run')

Expand All @@ -37,9 +36,13 @@ jest.mock('../../../src/worker/ingestion/event-pipeline/runAsyncHandlersStep', (
})
jest.mock('../../../src/utils/status')
jest.mock('./../../../src/worker/ingestion/utils')

const runEventPipeline = jest.fn().mockResolvedValue('default value')

jest.mock('./../../../src/worker/ingestion/event-pipeline/runner', () => ({
runEventPipeline: jest.fn().mockResolvedValue('default value'),
// runEventPipeline: jest.fn().mockRejectedValue('default value'),
EventPipelineRunner: jest.fn().mockImplementation(() => ({
runEventPipeline: runEventPipeline,
})),
}))

const event: PostIngestionEvent = {
Expand Down Expand Up @@ -375,19 +378,12 @@ describe('eachBatchX', () => {
})

describe('eachBatchParallelIngestion', () => {
let runEventPipelineSpy
beforeEach(() => {
runEventPipelineSpy = jest.spyOn(
require('./../../../src/worker/ingestion/event-pipeline/runner'),
'runEventPipeline'
)
})
it('calls runEventPipeline', async () => {
const batch = createBatch(captureEndpointEvent)
const tokenBlockList = buildStringMatcher('another_token,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Disabled)

expect(runEventPipeline).toHaveBeenCalledWith(expect.anything(), {
expect(runEventPipeline).toHaveBeenCalledWith({
distinct_id: 'id',
event: 'event',
properties: {},
Expand All @@ -402,15 +398,15 @@ describe('eachBatchX', () => {

it("doesn't fail the batch if runEventPipeline rejects once then succeeds on retry", async () => {
const batch = createBatch(captureEndpointEvent)
runEventPipelineSpy.mockImplementationOnce(() => Promise.reject('runEventPipeline nopes out'))
runEventPipeline.mockImplementationOnce(() => Promise.reject('runEventPipeline nopes out'))
const tokenBlockList = buildStringMatcher('another_token,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Disabled)
expect(runEventPipeline).toHaveBeenCalledTimes(2)
})

it('fails the batch if one deferred promise rejects', async () => {
const batch = createBatch(captureEndpointEvent)
runEventPipelineSpy.mockImplementationOnce(() =>
runEventPipeline.mockImplementationOnce(() =>
Promise.resolve({
promises: [Promise.resolve(), Promise.reject('deferred nopes out')],
})
Expand Down Expand Up @@ -518,15 +514,15 @@ describe('eachBatchX', () => {
it('fails the batch if runEventPipeline rejects repeatedly', async () => {
const tokenBlockList = buildStringMatcher('another_token,more_token', false)
const batch = createBatch(captureEndpointEvent)
runEventPipelineSpy
runEventPipeline
.mockImplementationOnce(() => Promise.reject('runEventPipeline nopes out'))
.mockImplementationOnce(() => Promise.reject('runEventPipeline nopes out'))
.mockImplementationOnce(() => Promise.reject('runEventPipeline nopes out'))
await expect(
eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Disabled)
).rejects.toBe('runEventPipeline nopes out')
expect(runEventPipeline).toHaveBeenCalledTimes(3)
runEventPipelineSpy.mockRestore()
runEventPipeline.mockRestore()
})
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { DependencyUnavailableError } from '../../../src/utils/db/error'
import { createHub } from '../../../src/utils/db/hub'
import { PostgresUse } from '../../../src/utils/db/postgres'
import { UUIDT } from '../../../src/utils/utils'
import { runEventPipeline } from '../../../src/worker/ingestion/event-pipeline/runner'
import { EventPipelineRunner } from '../../../src/worker/ingestion/event-pipeline/runner'
import { createOrganization, createTeam, POSTGRES_DELETE_TABLES_QUERY } from '../../helpers/sql'

describe('workerTasks.runEventPipeline()', () => {
Expand Down Expand Up @@ -49,18 +49,19 @@ describe('workerTasks.runEventPipeline()', () => {
return Promise.reject(new Error(errorMessage))
})

await expect(
runEventPipeline(hub, {
distinct_id: 'asdf',
ip: '',
team_id: teamId,
event: 'some event',
properties: {},
site_url: 'https://example.com',
now: new Date().toISOString(),
uuid: new UUIDT().toString(),
})
).rejects.toEqual(new DependencyUnavailableError(errorMessage, 'Postgres', new Error(errorMessage)))
const event = {
distinct_id: 'asdf',
ip: '',
team_id: teamId,
event: 'some event',
properties: {},
site_url: 'https://example.com',
now: new Date().toISOString(),
uuid: new UUIDT().toString(),
}
await expect(new EventPipelineRunner(hub, event).runEventPipeline(event)).rejects.toEqual(
new DependencyUnavailableError(errorMessage, 'Postgres', new Error(errorMessage))
)
pgQueryMock.mockRestore()
})
})
4 changes: 2 additions & 2 deletions plugin-server/tests/main/teardown.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { PluginEvent } from '@posthog/plugin-scaffold'
import { waitForExpect } from '../../functional_tests/expectations'
import { startPluginsServer } from '../../src/main/pluginsServer'
import { Hub, LogLevel, PluginLogEntry, PluginLogEntrySource, PluginLogEntryType } from '../../src/types'
import { runEventPipeline } from '../../src/worker/ingestion/event-pipeline/runner'
import { EventPipelineRunner } from '../../src/worker/ingestion/event-pipeline/runner'
import { makePiscina } from '../../src/worker/piscina'
import { pluginConfig39 } from '../helpers/plugins'
import { resetTestDatabase } from '../helpers/sql'
Expand Down Expand Up @@ -36,7 +36,7 @@ async function getLogEntriesForPluginConfig(hub: Hub, pluginConfigId: number) {

describe('teardown', () => {
const processEvent = async (hub: Hub, event: PluginEvent) => {
const result = await runEventPipeline(hub, event)
const result = await new EventPipelineRunner(hub, event).runEventPipeline(event)
const resultEvent = result.args[0]
return resultEvent
}
Expand Down
7 changes: 4 additions & 3 deletions plugin-server/tests/worker/dead-letter-queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { PluginEvent } from '@posthog/plugin-scaffold/src/types'
import { Hub, LogLevel } from '../../src/types'
import { createHub } from '../../src/utils/db/hub'
import { UUIDT } from '../../src/utils/utils'
import { runEventPipeline } from '../../src/worker/ingestion/event-pipeline/runner'
import { EventPipelineRunner } from '../../src/worker/ingestion/event-pipeline/runner'
import { generateEventDeadLetterQueueMessage } from '../../src/worker/ingestion/utils'
import { delayUntilEventIngested, resetTestDatabaseClickhouse } from '../helpers/clickhouse'
import { resetTestDatabase } from '../helpers/sql'
Expand Down Expand Up @@ -59,7 +59,8 @@ describe('events dead letter queue', () => {
})

test('events get sent to dead letter queue on error', async () => {
const ingestResponse1 = await runEventPipeline(hub, createEvent())
const event = createEvent()
const ingestResponse1 = await new EventPipelineRunner(hub, event).runEventPipeline(event)
expect(ingestResponse1).toEqual({
lastStep: 'prepareEventStep',
error: 'database unavailable',
Expand All @@ -78,7 +79,7 @@ describe('events dead letter queue', () => {
expect(dlqEvent.team_id).toEqual(2)
expect(dlqEvent.team_id).toEqual(2)
expect(dlqEvent.error_location).toEqual('plugin_server_ingest_event:prepareEventStep')
expect(dlqEvent.error).toEqual('ingestEvent failed. Error: database unavailable')
expect(dlqEvent.error).toEqual('Event ingestion failed. Error: database unavailable')
expect(dlqEvent.properties).toEqual(JSON.stringify({ key: 'value', $ip: '127.0.0.1' }))
expect(dlqEvent.event_uuid).toEqual(EVENT_UUID)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ describe('EventPipelineRunner', () => {
expect(JSON.parse(hub.db.kafkaProducer.queueMessage.mock.calls[0][0].messages[0].value)).toMatchObject({
team_id: 2,
distinct_id: 'my_id',
error: 'ingestEvent failed. Error: testError',
error: 'Event ingestion failed. Error: testError',
error_location: 'plugin_server_ingest_event:prepareEventStep',
})
expect(pipelineStepDLQCounterSpy).toHaveBeenCalledWith('prepareEventStep')
Expand Down

0 comments on commit 66933ca

Please sign in to comment.