Skip to content

Commit

Permalink
tests
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Nov 22, 2023
1 parent 8571a8b commit f4d8114
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jest.mock('./../../../src/worker/ingestion/event-pipeline/runner', () => ({
runEventPipeline: jest.fn().mockResolvedValue('default value'),
}))

const captureEndpointEvent = {
const captureEndpointEvent1 = {
uuid: 'uuid1',
distinct_id: 'id',
ip: null,
Expand All @@ -29,6 +29,20 @@ const captureEndpointEvent = {
sent_at: null,
}

const captureEndpointEvent2 = {
uuid: 'uuid2',
distinct_id: 'id',
ip: null,
site_url: '',
data: JSON.stringify({
event: 'event',
properties: {},
}),
token: 'othertoken',
now: null,
sent_at: null,
}

describe('eachBatchParallelIngestion with overflow reroute', () => {
let queue: any

Expand Down Expand Up @@ -68,9 +82,9 @@ describe('eachBatchParallelIngestion with overflow reroute', () => {
{
partition: 0,
topic: KAFKA_EVENTS_PLUGIN_INGESTION,
value: JSON.stringify(captureEndpointEvent),
timestamp: captureEndpointEvent['timestamp'],
offset: captureEndpointEvent['offset'],
value: JSON.stringify(captureEndpointEvent1),
timestamp: captureEndpointEvent1['timestamp'],
offset: captureEndpointEvent1['offset'],
key: null,
token: 'ok',
},
Expand All @@ -85,9 +99,9 @@ describe('eachBatchParallelIngestion with overflow reroute', () => {
expect(captureIngestionWarning).not.toHaveBeenCalled()
expect(queue.pluginsServer.kafkaProducer.produce).toHaveBeenCalledWith({
topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
value: JSON.stringify(captureEndpointEvent),
timestamp: captureEndpointEvent['timestamp'],
offset: captureEndpointEvent['offset'],
value: JSON.stringify(captureEndpointEvent1),
timestamp: captureEndpointEvent1['timestamp'],
offset: captureEndpointEvent1['offset'],
key: null,
waitForAck: true,
})
Expand All @@ -98,23 +112,23 @@ describe('eachBatchParallelIngestion with overflow reroute', () => {

it('reroutes excess events to OVERFLOW topic', async () => {
const now = Date.now()
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent], now)
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1], now)
const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false)

const tokenBlockList = buildStringMatcher('another_token,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Reroute)

expect(consume).toHaveBeenCalledWith(
captureEndpointEvent['token'] + ':' + captureEndpointEvent['distinct_id'],
captureEndpointEvent1['token'] + ':' + captureEndpointEvent1['distinct_id'],
1,
now
)
expect(captureIngestionWarning).not.toHaveBeenCalled()
expect(queue.pluginsServer.kafkaProducer.produce).toHaveBeenCalledWith({
topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
value: JSON.stringify(captureEndpointEvent),
timestamp: captureEndpointEvent['timestamp'],
offset: captureEndpointEvent['offset'],
value: JSON.stringify(captureEndpointEvent1),
timestamp: captureEndpointEvent1['timestamp'],
offset: captureEndpointEvent1['offset'],
key: null,
waitForAck: true,
})
Expand All @@ -125,35 +139,47 @@ describe('eachBatchParallelIngestion with overflow reroute', () => {

it('does not reroute if not over capacity limit', async () => {
const now = Date.now()
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent], now)
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1, captureEndpointEvent2], now)
const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => true)

const tokenBlockList = buildStringMatcher('another_token,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Reroute)

expect(consume).toHaveBeenCalledWith(
captureEndpointEvent['token'] + ':' + captureEndpointEvent['distinct_id'],
captureEndpointEvent1['token'] + ':' + captureEndpointEvent1['distinct_id'],
1,
now
)
expect(consume).toHaveBeenCalledWith(
captureEndpointEvent2['token'] + ':' + captureEndpointEvent2['distinct_id'],
1,
now
)
expect(captureIngestionWarning).not.toHaveBeenCalled()
expect(queue.pluginsServer.kafkaProducer.produce).not.toHaveBeenCalled()
// Event is processed
expect(runEventPipeline).toHaveBeenCalled()
expect(runEventPipeline).toHaveBeenCalledTimes(2)
})

it('does drop events from blocked tokens', async () => {
const now = Date.now()
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent], now)
const batch = createBatchWithMultipleEventsWithKeys(
[captureEndpointEvent1, captureEndpointEvent2, captureEndpointEvent1],
now
)
const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => true)

const tokenBlockList = buildStringMatcher('mytoken,another_token', false)
await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Reroute)

// Event is dropped, none of that happens
expect(consume).not.toHaveBeenCalled()
// Event captureEndpointEvent1 is dropped , captureEndpointEvent2 goes though
expect(consume).toHaveBeenCalledWith(
captureEndpointEvent2['token'] + ':' + captureEndpointEvent2['distinct_id'],
1,
now
)
expect(captureIngestionWarning).not.toHaveBeenCalled()
expect(queue.pluginsServer.kafkaProducer.produce).not.toHaveBeenCalled()
expect(runEventPipeline).not.toHaveBeenCalled()
expect(runEventPipeline).toHaveBeenCalledTimes(1)
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
eachBatchParallelIngestion,
IngestionOverflowMode,
} from '../../../src/main/ingestion-queues/batch-processing/each-batch-ingestion'
import { ConfiguredLimiter, OverflowWarningLimiter } from '../../../src/utils/token-bucket'
import { OverflowWarningLimiter } from '../../../src/utils/token-bucket'
import { runEventPipeline } from './../../../src/worker/ingestion/event-pipeline/runner'
import { captureIngestionWarning } from './../../../src/worker/ingestion/utils'

Expand All @@ -13,7 +13,7 @@ jest.mock('./../../../src/worker/ingestion/event-pipeline/runner', () => ({
runEventPipeline: jest.fn().mockResolvedValue('default value'),
}))

const captureEndpointEvent = {
const captureEndpointEvent1 = {
uuid: 'uuid1',
distinct_id: 'id',
ip: null,
Expand All @@ -22,7 +22,21 @@ const captureEndpointEvent = {
event: 'event',
properties: {},
}),
team_id: 1,
token: 'mytoken',
now: null,
sent_at: null,
}

const captureEndpointEvent2 = {
uuid: 'uuid2',
distinct_id: 'id',
ip: null,
site_url: '',
data: JSON.stringify({
event: 'event',
properties: {},
}),
token: 'othertoken',
now: null,
sent_at: null,
}
Expand Down Expand Up @@ -62,43 +76,32 @@ describe('eachBatchParallelIngestion with overflow consume', () => {
})

it('raises ingestion warning when consuming from overflow', async () => {
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent])
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1])
const consume = jest.spyOn(OverflowWarningLimiter, 'consume').mockImplementation(() => true)

queue.pluginsServer.teamManager.getTeamForEvent.mockResolvedValueOnce({ id: 1 })
const tokenBlockList = buildStringMatcher('another_token,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Consume)

expect(queue.pluginsServer.teamManager.getTeamForEvent).toHaveBeenCalledTimes(1)
expect(consume).toHaveBeenCalledWith(
captureEndpointEvent['team_id'] + ':' + captureEndpointEvent['distinct_id'],
1
)
expect(captureIngestionWarning).toHaveBeenCalledWith(
queue.pluginsServer.db,
captureEndpointEvent['team_id'],
'ingestion_capacity_overflow',
{
overflowDistinctId: captureEndpointEvent['distinct_id'],
}
)
expect(consume).toHaveBeenCalledWith('1:id', 1)
expect(captureIngestionWarning).toHaveBeenCalledWith(queue.pluginsServer.db, 1, 'ingestion_capacity_overflow', {
overflowDistinctId: captureEndpointEvent1['distinct_id'],
})

// Event is processed
expect(runEventPipeline).toHaveBeenCalled()
})

it('does not raise ingestion warning when under threshold', async () => {
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent])
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1])
const consume = jest.spyOn(OverflowWarningLimiter, 'consume').mockImplementation(() => false)

queue.pluginsServer.teamManager.getTeamForEvent.mockResolvedValueOnce({ id: 1 })
const tokenBlockList = buildStringMatcher('another_token,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Consume)

expect(consume).toHaveBeenCalledWith(
captureEndpointEvent['team_id'] + ':' + captureEndpointEvent['distinct_id'],
1
)
expect(consume).toHaveBeenCalledWith('1:id', 1)
expect(captureIngestionWarning).not.toHaveBeenCalled()
expect(queue.pluginsServer.kafkaProducer.queueMessage).not.toHaveBeenCalled()

Expand All @@ -107,17 +110,22 @@ describe('eachBatchParallelIngestion with overflow consume', () => {
})

it('does drop events from blocked tokens', async () => {
const now = Date.now()
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent], now)
const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => true)
const batch = createBatchWithMultipleEventsWithKeys([
captureEndpointEvent1,
captureEndpointEvent2,
captureEndpointEvent1,
])
const consume = jest.spyOn(OverflowWarningLimiter, 'consume').mockImplementation(() => false)

const tokenBlockList = buildStringMatcher('mytoken,another_token', false)
queue.pluginsServer.teamManager.getTeamForEvent.mockResolvedValueOnce({ id: 1 })
const tokenBlockList = buildStringMatcher('mytoken,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Consume)

// Event is dropped, none of that happens
expect(consume).not.toHaveBeenCalled()
expect(captureIngestionWarning).not.toHaveBeenCalled()
expect(queue.pluginsServer.kafkaProducer.queueMessage).not.toHaveBeenCalled()
expect(runEventPipeline).not.toHaveBeenCalled()

// captureEndpointEvent2 is processed, captureEndpointEvent1 are dropped
expect(runEventPipeline).toHaveBeenCalledTimes(1)
expect(consume).toHaveBeenCalledTimes(1)
})
})

0 comments on commit f4d8114

Please sign in to comment.