diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts index 777d7dc82fd29..2444b4cd624d4 100644 --- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts @@ -15,7 +15,7 @@ jest.mock('./../../../src/worker/ingestion/event-pipeline/runner', () => ({ runEventPipeline: jest.fn().mockResolvedValue('default value'), })) -const captureEndpointEvent = { +const captureEndpointEvent1 = { uuid: 'uuid1', distinct_id: 'id', ip: null, @@ -29,6 +29,20 @@ const captureEndpointEvent = { sent_at: null, } +const captureEndpointEvent2 = { + uuid: 'uuid2', + distinct_id: 'id', + ip: null, + site_url: '', + data: JSON.stringify({ + event: 'event', + properties: {}, + }), + token: 'othertoken', + now: null, + sent_at: null, +} + describe('eachBatchParallelIngestion with overflow reroute', () => { let queue: any @@ -68,9 +82,9 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { { partition: 0, topic: KAFKA_EVENTS_PLUGIN_INGESTION, - value: JSON.stringify(captureEndpointEvent), - timestamp: captureEndpointEvent['timestamp'], - offset: captureEndpointEvent['offset'], + value: JSON.stringify(captureEndpointEvent1), + timestamp: captureEndpointEvent1['timestamp'], + offset: captureEndpointEvent1['offset'], key: null, token: 'ok', }, @@ -85,9 +99,9 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { expect(captureIngestionWarning).not.toHaveBeenCalled() expect(queue.pluginsServer.kafkaProducer.produce).toHaveBeenCalledWith({ topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, - value: JSON.stringify(captureEndpointEvent), - timestamp: captureEndpointEvent['timestamp'], - offset: captureEndpointEvent['offset'], + value: JSON.stringify(captureEndpointEvent1), + timestamp: captureEndpointEvent1['timestamp'], + offset: captureEndpointEvent1['offset'], key: null, waitForAck: true, }) @@ -98,23 +112,23 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { it('reroutes excess events to OVERFLOW topic', async () => { const now = Date.now() - const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent], now) + const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1], now) const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false) const tokenBlockList = buildStringMatcher('another_token,more_token', false) await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Reroute) expect(consume).toHaveBeenCalledWith( - captureEndpointEvent['token'] + ':' + captureEndpointEvent['distinct_id'], + captureEndpointEvent1['token'] + ':' + captureEndpointEvent1['distinct_id'], 1, now ) expect(captureIngestionWarning).not.toHaveBeenCalled() expect(queue.pluginsServer.kafkaProducer.produce).toHaveBeenCalledWith({ topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, - value: JSON.stringify(captureEndpointEvent), - timestamp: captureEndpointEvent['timestamp'], - offset: captureEndpointEvent['offset'], + value: JSON.stringify(captureEndpointEvent1), + timestamp: captureEndpointEvent1['timestamp'], + offset: captureEndpointEvent1['offset'], key: null, waitForAck: true, }) @@ -125,35 +139,47 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { it('does not reroute if not over capacity limit', async () => { const now = Date.now() - const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent], now) + const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1, captureEndpointEvent2], now) const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => true) const tokenBlockList = buildStringMatcher('another_token,more_token', false) await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Reroute) expect(consume).toHaveBeenCalledWith( - captureEndpointEvent['token'] + ':' + captureEndpointEvent['distinct_id'], + captureEndpointEvent1['token'] + ':' + captureEndpointEvent1['distinct_id'], + 1, + now + ) + expect(consume).toHaveBeenCalledWith( + captureEndpointEvent2['token'] + ':' + captureEndpointEvent2['distinct_id'], 1, now ) expect(captureIngestionWarning).not.toHaveBeenCalled() expect(queue.pluginsServer.kafkaProducer.produce).not.toHaveBeenCalled() // Event is processed - expect(runEventPipeline).toHaveBeenCalled() + expect(runEventPipeline).toHaveBeenCalledTimes(2) }) it('does drop events from blocked tokens', async () => { const now = Date.now() - const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent], now) + const batch = createBatchWithMultipleEventsWithKeys( + [captureEndpointEvent1, captureEndpointEvent2, captureEndpointEvent1], + now + ) const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => true) const tokenBlockList = buildStringMatcher('mytoken,another_token', false) await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Reroute) - // Event is dropped, none of that happens - expect(consume).not.toHaveBeenCalled() + // Event captureEndpointEvent1 is dropped , captureEndpointEvent2 goes though + expect(consume).toHaveBeenCalledWith( + captureEndpointEvent2['token'] + ':' + captureEndpointEvent2['distinct_id'], + 1, + now + ) expect(captureIngestionWarning).not.toHaveBeenCalled() expect(queue.pluginsServer.kafkaProducer.produce).not.toHaveBeenCalled() - expect(runEventPipeline).not.toHaveBeenCalled() + expect(runEventPipeline).toHaveBeenCalledTimes(1) }) }) diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts index e1bd6e9f26494..8cb50d0e99c36 100644 --- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts @@ -3,7 +3,7 @@ import { eachBatchParallelIngestion, IngestionOverflowMode, } from '../../../src/main/ingestion-queues/batch-processing/each-batch-ingestion' -import { ConfiguredLimiter, OverflowWarningLimiter } from '../../../src/utils/token-bucket' +import { OverflowWarningLimiter } from '../../../src/utils/token-bucket' import { runEventPipeline } from './../../../src/worker/ingestion/event-pipeline/runner' import { captureIngestionWarning } from './../../../src/worker/ingestion/utils' @@ -13,7 +13,7 @@ jest.mock('./../../../src/worker/ingestion/event-pipeline/runner', () => ({ runEventPipeline: jest.fn().mockResolvedValue('default value'), })) -const captureEndpointEvent = { +const captureEndpointEvent1 = { uuid: 'uuid1', distinct_id: 'id', ip: null, @@ -22,7 +22,21 @@ const captureEndpointEvent = { event: 'event', properties: {}, }), - team_id: 1, + token: 'mytoken', + now: null, + sent_at: null, +} + +const captureEndpointEvent2 = { + uuid: 'uuid2', + distinct_id: 'id', + ip: null, + site_url: '', + data: JSON.stringify({ + event: 'event', + properties: {}, + }), + token: 'othertoken', now: null, sent_at: null, } @@ -62,7 +76,7 @@ describe('eachBatchParallelIngestion with overflow consume', () => { }) it('raises ingestion warning when consuming from overflow', async () => { - const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent]) + const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1]) const consume = jest.spyOn(OverflowWarningLimiter, 'consume').mockImplementation(() => true) queue.pluginsServer.teamManager.getTeamForEvent.mockResolvedValueOnce({ id: 1 }) @@ -70,35 +84,24 @@ describe('eachBatchParallelIngestion with overflow consume', () => { await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Consume) expect(queue.pluginsServer.teamManager.getTeamForEvent).toHaveBeenCalledTimes(1) - expect(consume).toHaveBeenCalledWith( - captureEndpointEvent['team_id'] + ':' + captureEndpointEvent['distinct_id'], - 1 - ) - expect(captureIngestionWarning).toHaveBeenCalledWith( - queue.pluginsServer.db, - captureEndpointEvent['team_id'], - 'ingestion_capacity_overflow', - { - overflowDistinctId: captureEndpointEvent['distinct_id'], - } - ) + expect(consume).toHaveBeenCalledWith('1:id', 1) + expect(captureIngestionWarning).toHaveBeenCalledWith(queue.pluginsServer.db, 1, 'ingestion_capacity_overflow', { + overflowDistinctId: captureEndpointEvent1['distinct_id'], + }) // Event is processed expect(runEventPipeline).toHaveBeenCalled() }) it('does not raise ingestion warning when under threshold', async () => { - const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent]) + const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1]) const consume = jest.spyOn(OverflowWarningLimiter, 'consume').mockImplementation(() => false) queue.pluginsServer.teamManager.getTeamForEvent.mockResolvedValueOnce({ id: 1 }) const tokenBlockList = buildStringMatcher('another_token,more_token', false) await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Consume) - expect(consume).toHaveBeenCalledWith( - captureEndpointEvent['team_id'] + ':' + captureEndpointEvent['distinct_id'], - 1 - ) + expect(consume).toHaveBeenCalledWith('1:id', 1) expect(captureIngestionWarning).not.toHaveBeenCalled() expect(queue.pluginsServer.kafkaProducer.queueMessage).not.toHaveBeenCalled() @@ -107,17 +110,22 @@ describe('eachBatchParallelIngestion with overflow consume', () => { }) it('does drop events from blocked tokens', async () => { - const now = Date.now() - const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent], now) - const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => true) + const batch = createBatchWithMultipleEventsWithKeys([ + captureEndpointEvent1, + captureEndpointEvent2, + captureEndpointEvent1, + ]) + const consume = jest.spyOn(OverflowWarningLimiter, 'consume').mockImplementation(() => false) - const tokenBlockList = buildStringMatcher('mytoken,another_token', false) + queue.pluginsServer.teamManager.getTeamForEvent.mockResolvedValueOnce({ id: 1 }) + const tokenBlockList = buildStringMatcher('mytoken,more_token', false) await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Consume) - // Event is dropped, none of that happens - expect(consume).not.toHaveBeenCalled() expect(captureIngestionWarning).not.toHaveBeenCalled() expect(queue.pluginsServer.kafkaProducer.queueMessage).not.toHaveBeenCalled() - expect(runEventPipeline).not.toHaveBeenCalled() + + // captureEndpointEvent2 is processed, captureEndpointEvent1 are dropped + expect(runEventPipeline).toHaveBeenCalledTimes(1) + expect(consume).toHaveBeenCalledTimes(1) }) })