Skip to content

Commit

Permalink
tests
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Nov 22, 2023
1 parent f4d8114 commit 9f626c3
Showing 1 changed file with 20 additions and 14 deletions.
34 changes: 20 additions & 14 deletions plugin-server/tests/main/ingestion-queues/each-batch.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { buildIntegerMatcher } from '../../../src/config/config'
import { buildIntegerMatcher, buildStringMatcher } from '../../../src/config/config'
import { KAFKA_EVENTS_PLUGIN_INGESTION } from '../../../src/config/kafka-topics'
import {
eachBatchParallelIngestion,
Expand All @@ -23,6 +23,7 @@ import { ActionMatcher } from '../../../src/worker/ingestion/action-matcher'
import { HookCommander } from '../../../src/worker/ingestion/hooks'
import { runOnEvent } from '../../../src/worker/plugins/run'
import { pluginConfig39 } from '../../helpers/plugins'
import { runEventPipeline } from './../../../src/worker/ingestion/event-pipeline/runner'

jest.mock('../../../src/worker/plugins/run')

Expand All @@ -39,7 +40,6 @@ jest.mock('./../../../src/worker/ingestion/event-pipeline/runner', () => ({
runEventPipeline: jest.fn().mockResolvedValue('default value'),
// runEventPipeline: jest.fn().mockRejectedValue('default value'),
}))
import { runEventPipeline } from './../../../src/worker/ingestion/event-pipeline/runner'

const event: PostIngestionEvent = {
eventUuid: 'uuid1',
Expand Down Expand Up @@ -408,7 +408,8 @@ describe('eachBatchX', () => {
})
it('calls runEventPipeline', async () => {
const batch = createBatch(captureEndpointEvent)
await eachBatchParallelIngestion(batch, queue, IngestionOverflowMode.Disabled)
const tokenBlockList = buildStringMatcher('another_token,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Disabled)

expect(runEventPipeline).toHaveBeenCalledWith(expect.anything(), {
distinct_id: 'id',
Expand All @@ -430,7 +431,8 @@ describe('eachBatchX', () => {
it("doesn't fail the batch if runEventPipeline rejects once then succeeds on retry", async () => {
const batch = createBatch(captureEndpointEvent)
runEventPipelineSpy.mockImplementationOnce(() => Promise.reject('runEventPipeline nopes out'))
await eachBatchParallelIngestion(batch, queue, IngestionOverflowMode.Disabled)
const tokenBlockList = buildStringMatcher('another_token,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Disabled)
expect(runEventPipeline).toHaveBeenCalledTimes(2)
})

Expand All @@ -441,9 +443,10 @@ describe('eachBatchX', () => {
promises: [Promise.resolve(), Promise.reject('deferred nopes out')],
})
)
await expect(eachBatchParallelIngestion(batch, queue, IngestionOverflowMode.Disabled)).rejects.toBe(
'deferred nopes out'
)
const tokenBlockList = buildStringMatcher('another_token,more_token', false)
await expect(
eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Disabled)
).rejects.toBe('deferred nopes out')
expect(runEventPipeline).toHaveBeenCalledTimes(1)
})

Expand All @@ -463,7 +466,8 @@ describe('eachBatchX', () => {
{ ...captureEndpointEvent, team_id: 3, distinct_id: 'a' },
])
const stats = new Map()
for (const group of splitIngestionBatch(batch, IngestionOverflowMode.Disabled).toProcess) {
const tokenBlockList = buildStringMatcher('another_token,more_token', false)
for (const group of splitIngestionBatch(tokenBlockList, batch, IngestionOverflowMode.Disabled).toProcess) {
const key = `${group[0].pluginEvent.team_id}:${group[0].pluginEvent.token}:${group[0].pluginEvent.distinct_id}`
for (const { pluginEvent: event } of group) {
expect(`${event.team_id}:${event.token}:${event.distinct_id}`).toEqual(key)
Expand Down Expand Up @@ -492,7 +496,8 @@ describe('eachBatchX', () => {
{ ...captureEndpointEvent, team_id: 4, distinct_id: 'a' },
{ ...captureEndpointEvent, team_id: 4, distinct_id: 'a' },
])
const batches = splitIngestionBatch(input, IngestionOverflowMode.Consume).toProcess
const tokenBlockList = buildStringMatcher('another_token,more_token', false)
const batches = splitIngestionBatch(tokenBlockList, input, IngestionOverflowMode.Consume).toProcess
expect(batches.length).toEqual(input.length)
for (const group of batches) {
expect(group.length).toEqual(1)
Expand All @@ -516,8 +521,8 @@ describe('eachBatchX', () => {
{ ...captureEndpointEvent, offset: 13, team_id: 3 }, // repeat
{ ...captureEndpointEvent, offset: 14, team_id: 5 }, // repeat
])

await eachBatchParallelIngestion(batch, queue, IngestionOverflowMode.Disabled)
const tokenBlockList = buildStringMatcher('another_token,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Disabled)
expect(runEventPipeline).toHaveBeenCalledTimes(14)
expect(queue.pluginsServer.statsd.histogram).toHaveBeenCalledWith(
'ingest_event_batching.input_length',
Expand All @@ -532,14 +537,15 @@ describe('eachBatchX', () => {
})

it('fails the batch if runEventPipeline rejects repeatedly', async () => {
const tokenBlockList = buildStringMatcher('another_token,more_token', false)
const batch = createBatch(captureEndpointEvent)
runEventPipelineSpy
.mockImplementationOnce(() => Promise.reject('runEventPipeline nopes out'))
.mockImplementationOnce(() => Promise.reject('runEventPipeline nopes out'))
.mockImplementationOnce(() => Promise.reject('runEventPipeline nopes out'))
await expect(eachBatchParallelIngestion(batch, queue, IngestionOverflowMode.Disabled)).rejects.toBe(
'runEventPipeline nopes out'
)
await expect(
eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Disabled)
).rejects.toBe('runEventPipeline nopes out')
expect(runEventPipeline).toHaveBeenCalledTimes(3)
runEventPipelineSpy.mockRestore()
})
Expand Down

0 comments on commit 9f626c3

Please sign in to comment.