Skip to content

Commit

Permalink
feat(ingestion): use kafka message ts when rate-limiting to overflow (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello authored Oct 26, 2023
1 parent 7ff535e commit 9179e65
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,10 @@ export function splitIngestionBatch(
}
const pluginEvent = formPipelineEvent(message)
const eventKey = computeKey(pluginEvent)
if (overflowMode === IngestionOverflowMode.Reroute && !ConfiguredLimiter.consume(eventKey, 1)) {
if (
overflowMode === IngestionOverflowMode.Reroute &&
!ConfiguredLimiter.consume(eventKey, 1, message.timestamp)
) {
// Local overflow detection triggering, reroute to overflow topic too
message.key = null
ingestionPartitionKeyOverflowed.labels(`${pluginEvent.team_id ?? pluginEvent.token}`).inc()
Expand Down
24 changes: 11 additions & 13 deletions plugin-server/src/utils/token-bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,20 @@ export class Storage {
}

replenish(key: string, now?: number): void {
if (typeof now === 'undefined') {
now = Date.now()
}

if (this.buckets.has(key) === false) {
this.buckets.set(key, [this.bucketCapacity, now])
const replenish_timestamp: number = now ?? Date.now()
const bucket = this.buckets.get(key)
if (bucket === undefined) {
this.buckets.set(key, [this.bucketCapacity, replenish_timestamp])
return
}

// We have checked the key exists already, so this cannot be undefined
const bucket: Bucket = this.buckets.get(key)!

// replenishRate is per second, but timestamps are in milliseconds
const replenishedTokens = this.replenishRate * ((now - bucket[1]) / 1000) + bucket[0]
bucket[0] = Math.min(replenishedTokens, this.bucketCapacity)
bucket[1] = now
// Replenish the bucket if replenish_timestamp is higher than lastReplenishedTimestamp
const secondsToReplenish = (replenish_timestamp - bucket[1]) / 1000
if (secondsToReplenish > 0) {
bucket[0] += this.replenishRate * secondsToReplenish
bucket[0] = Math.min(bucket[0], this.bucketCapacity)
bucket[1] = replenish_timestamp
}
}

consume(key: string, tokens: number): boolean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
IngestionOverflowMode,
} from '../../../src/main/ingestion-queues/batch-processing/each-batch-ingestion'
import { ConfiguredLimiter } from '../../../src/utils/token-bucket'
import { runEventPipeline } from './../../../src/worker/ingestion/event-pipeline/runner'
import { captureIngestionWarning } from './../../../src/worker/ingestion/utils'

jest.mock('../../../src/utils/status')
Expand All @@ -12,7 +13,6 @@ jest.mock('./../../../src/worker/ingestion/utils')
jest.mock('./../../../src/worker/ingestion/event-pipeline/runner', () => ({
runEventPipeline: jest.fn().mockResolvedValue('default value'),
}))
import { runEventPipeline } from './../../../src/worker/ingestion/event-pipeline/runner'

const captureEndpointEvent = {
uuid: 'uuid1',
Expand Down Expand Up @@ -94,14 +94,16 @@ describe('eachBatchParallelIngestion with overflow reroute', () => {
})

it('reroutes excess events to OVERFLOW topic', async () => {
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent])
const now = Date.now()
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent], now)
const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false)

await eachBatchParallelIngestion(batch, queue, IngestionOverflowMode.Reroute)

expect(consume).toHaveBeenCalledWith(
captureEndpointEvent['team_id'] + ':' + captureEndpointEvent['distinct_id'],
1
1,
now
)
expect(captureIngestionWarning).not.toHaveBeenCalled()
expect(queue.pluginsServer.kafkaProducer.produce).toHaveBeenCalledWith({
Expand All @@ -118,14 +120,16 @@ describe('eachBatchParallelIngestion with overflow reroute', () => {
})

it('does not reroute if not over capacity limit', async () => {
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent])
const now = Date.now()
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent], now)
const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => true)

await eachBatchParallelIngestion(batch, queue, IngestionOverflowMode.Reroute)

expect(consume).toHaveBeenCalledWith(
captureEndpointEvent['team_id'] + ':' + captureEndpointEvent['distinct_id'],
1
1,
now
)
expect(captureIngestionWarning).not.toHaveBeenCalled()
expect(queue.pluginsServer.kafkaProducer.produce).not.toHaveBeenCalled()
Expand Down
25 changes: 25 additions & 0 deletions plugin-server/tests/utils/token-bucket.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,38 @@ describe('Storage', () => {
expect(storage.buckets.get(key)![0]).toEqual(10)
expect(storage.buckets.get(key)![1]).toEqual(now.valueOf())

// get two tokens to be replenished
storage.consume(key, 2)
expect(storage.buckets.get(key)![0]).toEqual(8)

// 20 seconds would exceed capacity of 10 tokens at 1 token/sec.
storage.replenish(key, now.valueOf() + 20000)

expect(storage.buckets.has(key)).toEqual(true)
expect(storage.buckets.get(key)![0]).toEqual(10)
expect(storage.buckets.get(key)![1]).toEqual(now.valueOf() + 20000)
})

it('does not add if now is in the past', () => {
const key = 'test'
const storage = new Storage(10, 1)
const now = new Date('2023-02-08T08:00:00')

storage.replenish(key, now.valueOf())
expect(storage.buckets.get(key)![0]).toEqual(10)
expect(storage.buckets.get(key)![1]).toEqual(now.valueOf())

// get two tokens to be replenished
storage.consume(key, 2)
expect(storage.buckets.get(key)![0]).toEqual(8)

// Will be a no-op due to a lower now value
storage.replenish(key, now.valueOf() - 20000)

expect(storage.buckets.has(key)).toEqual(true)
expect(storage.buckets.get(key)![0]).toEqual(8)
expect(storage.buckets.get(key)![1]).toEqual(now.valueOf())
})
})

describe('consume()', () => {
Expand Down

0 comments on commit 9179e65

Please sign in to comment.