From 310a98cadc2583e59f2a0176140ef9dd63fcc7f6 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Mon, 18 Mar 2024 22:47:37 +0000 Subject: [PATCH] fix tests --- ...events-ingestion-overflow-consumer.test.ts | 49 +++++++++++++------ 1 file changed, 33 insertions(+), 16 deletions(-) diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts index c802e2bad4cb3..851bb23e2ac14 100644 --- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts @@ -1,13 +1,15 @@ +import { Settings } from 'luxon' + import { buildStringMatcher } from '../../../src/config/config' import { eachBatchParallelIngestion, IngestionOverflowMode, } from '../../../src/main/ingestion-queues/batch-processing/each-batch-ingestion' +import { TimestampFormat } from '../../../src/types' import { IngestionWarningLimiter } from '../../../src/utils/token-bucket' -import { captureIngestionWarning } from './../../../src/worker/ingestion/utils' +import { castTimestampOrNow } from '../../../src/utils/utils' jest.mock('../../../src/utils/status') -jest.mock('./../../../src/worker/ingestion/utils') const runEventPipeline = jest.fn().mockResolvedValue('default value') @@ -47,6 +49,7 @@ const captureEndpointEvent2 = { describe('eachBatchParallelIngestion with overflow consume', () => { let queue: any + let mockQueueMessage: jest.Mock function createBatchWithMultipleEventsWithKeys(events: any[], timestamp?: any): any { return events.map((event) => ({ @@ -58,17 +61,25 @@ describe('eachBatchParallelIngestion with overflow consume', () => { } beforeEach(() => { + // luxon datetime lets you specify a fixed "now" + Settings.now = () => new Date(2018, 4, 25).valueOf() + + mockQueueMessage = jest.fn() queue = { bufferSleep: jest.fn(), pluginsServer: { INGESTION_CONCURRENCY: 4, kafkaProducer: { - queueMessage: jest.fn(), + queueMessage: mockQueueMessage, }, teamManager: { getTeamForEvent: jest.fn(), }, - db: 'database', + db: { + kafkaProducer: { + queueMessage: mockQueueMessage, + }, + }, }, } }) @@ -84,15 +95,23 @@ describe('eachBatchParallelIngestion with overflow consume', () => { await eachBatchParallelIngestion(tokenBlockList, batch, queue, mode) expect(queue.pluginsServer.teamManager.getTeamForEvent).toHaveBeenCalledTimes(1) - expect(consume).toHaveBeenCalledWith('1:id', 1) - expect(captureIngestionWarning).toHaveBeenCalledWith( - queue.pluginsServer.db.kafkaProducer, - 1, - 'ingestion_capacity_overflow', - { - overflowDistinctId: captureEndpointEvent1['distinct_id'], - } - ) + expect(consume).toHaveBeenCalledWith('1:ingestion_capacity_overflow:id', 1) + expect(mockQueueMessage).toHaveBeenCalledWith({ + topic: 'clickhouse_ingestion_warnings_test', + messages: [ + { + value: JSON.stringify({ + team_id: 1, + type: 'ingestion_capacity_overflow', + source: 'plugin-server', + details: JSON.stringify({ + overflowDistinctId: 'id', + }), + timestamp: castTimestampOrNow(null, TimestampFormat.ClickHouse), + }), + }, + ], + }) // Event is processed expect(runEventPipeline).toHaveBeenCalled() @@ -109,8 +128,7 @@ describe('eachBatchParallelIngestion with overflow consume', () => { const tokenBlockList = buildStringMatcher('another_token,more_token', false) await eachBatchParallelIngestion(tokenBlockList, batch, queue, mode) - expect(consume).toHaveBeenCalledWith('1:id', 1) - expect(captureIngestionWarning).not.toHaveBeenCalled() + expect(consume).toHaveBeenCalledWith('1:ingestion_capacity_overflow:id', 1) expect(queue.pluginsServer.kafkaProducer.queueMessage).not.toHaveBeenCalled() // Event is processed @@ -132,7 +150,6 @@ describe('eachBatchParallelIngestion with overflow consume', () => { const tokenBlockList = buildStringMatcher('mytoken,more_token', false) await eachBatchParallelIngestion(tokenBlockList, batch, queue, mode) - expect(captureIngestionWarning).not.toHaveBeenCalled() expect(queue.pluginsServer.kafkaProducer.queueMessage).not.toHaveBeenCalled() // captureEndpointEvent2 is processed, captureEndpointEvent1 are dropped