Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldambra committed Mar 18, 2024
1 parent ed3df5c commit 310a98c
Showing 1 changed file with 33 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import { Settings } from 'luxon'

import { buildStringMatcher } from '../../../src/config/config'
import {
eachBatchParallelIngestion,
IngestionOverflowMode,
} from '../../../src/main/ingestion-queues/batch-processing/each-batch-ingestion'
import { TimestampFormat } from '../../../src/types'
import { IngestionWarningLimiter } from '../../../src/utils/token-bucket'
import { captureIngestionWarning } from './../../../src/worker/ingestion/utils'
import { castTimestampOrNow } from '../../../src/utils/utils'

jest.mock('../../../src/utils/status')
jest.mock('./../../../src/worker/ingestion/utils')

const runEventPipeline = jest.fn().mockResolvedValue('default value')

Expand Down Expand Up @@ -47,6 +49,7 @@ const captureEndpointEvent2 = {

describe('eachBatchParallelIngestion with overflow consume', () => {
let queue: any
let mockQueueMessage: jest.Mock

function createBatchWithMultipleEventsWithKeys(events: any[], timestamp?: any): any {
return events.map((event) => ({
Expand All @@ -58,17 +61,25 @@ describe('eachBatchParallelIngestion with overflow consume', () => {
}

beforeEach(() => {
// luxon datetime lets you specify a fixed "now"
Settings.now = () => new Date(2018, 4, 25).valueOf()

mockQueueMessage = jest.fn()
queue = {
bufferSleep: jest.fn(),
pluginsServer: {
INGESTION_CONCURRENCY: 4,
kafkaProducer: {
queueMessage: jest.fn(),
queueMessage: mockQueueMessage,
},
teamManager: {
getTeamForEvent: jest.fn(),
},
db: 'database',
db: {
kafkaProducer: {
queueMessage: mockQueueMessage,
},
},
},
}
})
Expand All @@ -84,15 +95,23 @@ describe('eachBatchParallelIngestion with overflow consume', () => {
await eachBatchParallelIngestion(tokenBlockList, batch, queue, mode)

expect(queue.pluginsServer.teamManager.getTeamForEvent).toHaveBeenCalledTimes(1)
expect(consume).toHaveBeenCalledWith('1:id', 1)
expect(captureIngestionWarning).toHaveBeenCalledWith(
queue.pluginsServer.db.kafkaProducer,
1,
'ingestion_capacity_overflow',
{
overflowDistinctId: captureEndpointEvent1['distinct_id'],
}
)
expect(consume).toHaveBeenCalledWith('1:ingestion_capacity_overflow:id', 1)
expect(mockQueueMessage).toHaveBeenCalledWith({
topic: 'clickhouse_ingestion_warnings_test',
messages: [
{
value: JSON.stringify({
team_id: 1,
type: 'ingestion_capacity_overflow',
source: 'plugin-server',
details: JSON.stringify({
overflowDistinctId: 'id',
}),
timestamp: castTimestampOrNow(null, TimestampFormat.ClickHouse),
}),
},
],
})

// Event is processed
expect(runEventPipeline).toHaveBeenCalled()
Expand All @@ -109,8 +128,7 @@ describe('eachBatchParallelIngestion with overflow consume', () => {
const tokenBlockList = buildStringMatcher('another_token,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, batch, queue, mode)

expect(consume).toHaveBeenCalledWith('1:id', 1)
expect(captureIngestionWarning).not.toHaveBeenCalled()
expect(consume).toHaveBeenCalledWith('1:ingestion_capacity_overflow:id', 1)
expect(queue.pluginsServer.kafkaProducer.queueMessage).not.toHaveBeenCalled()

// Event is processed
Expand All @@ -132,7 +150,6 @@ describe('eachBatchParallelIngestion with overflow consume', () => {
const tokenBlockList = buildStringMatcher('mytoken,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, batch, queue, mode)

expect(captureIngestionWarning).not.toHaveBeenCalled()
expect(queue.pluginsServer.kafkaProducer.queueMessage).not.toHaveBeenCalled()

// captureEndpointEvent2 is processed, captureEndpointEvent1 are dropped
Expand Down

0 comments on commit 310a98c

Please sign in to comment.