diff --git a/frontend/__snapshots__/scenes-app-data-management--ingestion-warnings--dark.png b/frontend/__snapshots__/scenes-app-data-management--ingestion-warnings--dark.png
index be85fb4e666ef..313e792f2bf40 100644
Binary files a/frontend/__snapshots__/scenes-app-data-management--ingestion-warnings--dark.png and b/frontend/__snapshots__/scenes-app-data-management--ingestion-warnings--dark.png differ
diff --git a/frontend/__snapshots__/scenes-app-data-management--ingestion-warnings--light.png b/frontend/__snapshots__/scenes-app-data-management--ingestion-warnings--light.png
index a2b71fa7be769..eb8bcd99b5b3f 100644
Binary files a/frontend/__snapshots__/scenes-app-data-management--ingestion-warnings--light.png and b/frontend/__snapshots__/scenes-app-data-management--ingestion-warnings--light.png differ
diff --git a/frontend/src/scenes/data-management/ingestion-warnings/IngestionWarningsView.tsx b/frontend/src/scenes/data-management/ingestion-warnings/IngestionWarningsView.tsx
index f9dd1830383b8..e6cc569d737f7 100644
--- a/frontend/src/scenes/data-management/ingestion-warnings/IngestionWarningsView.tsx
+++ b/frontend/src/scenes/data-management/ingestion-warnings/IngestionWarningsView.tsx
@@ -2,6 +2,8 @@ import { useValues } from 'kea'
import { ReadingHog } from 'lib/components/hedgehogs'
import { ProductIntroduction } from 'lib/components/ProductIntroduction/ProductIntroduction'
import { TZLabel } from 'lib/components/TZLabel'
+import { IconPlayCircle } from 'lib/lemon-ui/icons'
+import { LemonButton } from 'lib/lemon-ui/LemonButton'
import { LemonTable } from 'lib/lemon-ui/LemonTable'
import { Link } from 'lib/lemon-ui/Link'
import { Sparkline } from 'lib/lemon-ui/Sparkline'
@@ -19,6 +21,8 @@ const WARNING_TYPE_TO_DESCRIPTION = {
event_timestamp_in_future: 'An event was sent more than 23 hours in the future',
ingestion_capacity_overflow: 'Event ingestion has overflowed capacity',
message_size_too_large: 'Discarded event exceeding 1MB limit',
+ replay_timestamp_invalid: 'Replay event timestamp is invalid',
+ replay_timestamp_too_far: 'Replay event timestamp was too far in the future',
}
const WARNING_TYPE_RENDERER = {
@@ -134,6 +138,68 @@ const WARNING_TYPE_RENDERER = {
>
)
},
+ replay_timestamp_invalid: function Render(warning: IngestionWarning): JSX.Element {
+ const details: {
+ timestamp: string
+ session_id: string
+ } = {
+ timestamp: warning.details.timestamp,
+ session_id: warning.details.replayRecord.session_id,
+ }
+ return (
+ <>
+ Session replay data dropped due to invalid timestamp:
+
+ - invalid timestamp: {details.timestamp}
+ - session_id: {details.session_id}
+
+
+ }
+ data-attr="skewed-timestamp-view-recording"
+ >
+ View recording
+
+
+ >
+ )
+ },
+ replay_timestamp_too_far: function Render(warning: IngestionWarning): JSX.Element {
+ const details: {
+ timestamp: string
+ session_id: string
+ daysFromNow: string
+ } = {
+ timestamp: warning.details.timestamp,
+ session_id: warning.details.replayRecord.session_id,
+ daysFromNow: warning.details.daysFromNow,
+ }
+ return (
+ <>
+ The session replay data timestamp was too different from the capture time, so the data was dropped.
+ Event values:
+
+ - invalid timestamp: {details.timestamp}
+ - session_id: {details.session_id}
+ - skew: {details.daysFromNow} days
+
+
+ }
+ data-attr="skewed-timestamp-view-recording"
+ >
+ View recording
+
+
+ >
+ )
+ },
}
export function IngestionWarningsView(): JSX.Element {
diff --git a/frontend/src/scenes/data-management/ingestion-warnings/__mocks__/ingestion-warnings-response.ts b/frontend/src/scenes/data-management/ingestion-warnings/__mocks__/ingestion-warnings-response.ts
index deb6752883ca2..11765e97b4f37 100644
--- a/frontend/src/scenes/data-management/ingestion-warnings/__mocks__/ingestion-warnings-response.ts
+++ b/frontend/src/scenes/data-management/ingestion-warnings/__mocks__/ingestion-warnings-response.ts
@@ -2,6 +2,37 @@ import { dayjs } from 'lib/dayjs'
export const ingestionWarningsResponse = (baseTime: dayjs.Dayjs): { results: Record } => ({
results: [
+ {
+ type: 'replay_timestamp_invalid',
+ lastSeen: baseTime.subtract(1, 'day'),
+ warnings: [
+ {
+ type: 'replay_timestamp_invalid',
+ timestamp: baseTime.subtract(1, 'day'),
+ details: {
+ timestamp: 'not a date',
+ replayRecord: { session_id: 'some uuid' },
+ },
+ },
+ ],
+ count: 1,
+ },
+ {
+ type: 'replay_timestamp_too_far',
+ lastSeen: baseTime.subtract(1, 'day'),
+ warnings: [
+ {
+ type: 'replay_timestamp_too_far',
+ timestamp: baseTime.subtract(1, 'day'),
+ details: {
+ timestamp: baseTime.add(4, 'day').toISOString(),
+ replayRecord: { session_id: 'some uuid' },
+ daysFromNow: 4,
+ },
+ },
+ ],
+ count: 1,
+ },
{
type: 'event_timestamp_in_future',
lastSeen: '2023-06-07T15:11:42.149000Z',
diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts
index 9466838cdf9bc..749e41c18c335 100644
--- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts
+++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts
@@ -6,7 +6,7 @@ import { PipelineEvent, ValueMatcher } from '../../../types'
import { formPipelineEvent } from '../../../utils/event'
import { retryIfRetriable } from '../../../utils/retries'
import { status } from '../../../utils/status'
-import { ConfiguredLimiter, LoggingLimiter, OverflowWarningLimiter } from '../../../utils/token-bucket'
+import { ConfiguredLimiter, LoggingLimiter } from '../../../utils/token-bucket'
import { EventPipelineRunner } from '../../../worker/ingestion/event-pipeline/runner'
import { captureIngestionWarning } from '../../../worker/ingestion/utils'
import { ingestionPartitionKeyOverflowed } from '../analytics-events-ingestion-consumer'
@@ -143,11 +143,17 @@ export async function eachBatchParallelIngestion(
) {
const team = await queue.pluginsServer.teamManager.getTeamForEvent(currentBatch[0].pluginEvent)
const distinct_id = currentBatch[0].pluginEvent.distinct_id
- if (team && OverflowWarningLimiter.consume(`${team.id}:${distinct_id}`, 1)) {
+ if (team) {
processingPromises.push(
- captureIngestionWarning(queue.pluginsServer.db, team.id, 'ingestion_capacity_overflow', {
- overflowDistinctId: distinct_id,
- })
+ captureIngestionWarning(
+ queue.pluginsServer.db.kafkaProducer,
+ team.id,
+ 'ingestion_capacity_overflow',
+ {
+ overflowDistinctId: distinct_id,
+ },
+ { key: distinct_id }
+ )
)
}
}
diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts
index fe4e402addf04..632f695a158f5 100644
--- a/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts
+++ b/plugin-server/src/main/ingestion-queues/session-recording/services/replay-events-ingester.ts
@@ -10,7 +10,9 @@ import { findOffsetsToCommit } from '../../../../kafka/consumer'
import { retryOnDependencyUnavailableError } from '../../../../kafka/error-handling'
import { createKafkaProducer, disconnectProducer, flushProducer, produce } from '../../../../kafka/producer'
import { PluginsServerConfig } from '../../../../types'
+import { KafkaProducerWrapper } from '../../../../utils/db/kafka-producer-wrapper'
import { status } from '../../../../utils/status'
+import { captureIngestionWarning } from '../../../../worker/ingestion/utils'
import { eventDroppedCounter } from '../../metrics'
import { createSessionReplayEvent } from '../process-event'
import { IncomingRecordingMessage } from '../types'
@@ -130,7 +132,20 @@ export class ReplayEventsIngester {
// the replay record timestamp has to be valid and be within a reasonable diff from now
if (replayRecord !== null) {
const asDate = DateTime.fromSQL(replayRecord.first_timestamp)
- if (!asDate.isValid || Math.abs(asDate.diffNow('months').months) >= 0.99) {
+ if (!asDate.isValid || Math.abs(asDate.diffNow('day').days) >= 7) {
+ await captureIngestionWarning(
+ new KafkaProducerWrapper(this.producer),
+ event.team_id,
+ !asDate.isValid ? 'replay_timestamp_invalid' : 'replay_timestamp_too_far',
+ {
+ replayRecord,
+ timestamp: replayRecord.first_timestamp,
+ isValid: asDate.isValid,
+ daysFromNow: Math.round(Math.abs(asDate.diffNow('day').days)),
+ processingTimestamp: DateTime.now().toISO(),
+ },
+ { key: event.session_id }
+ )
return drop('invalid_timestamp')
}
}
diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts
index 5de7c00f0cfc2..30aaab4a023d5 100644
--- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts
+++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts
@@ -160,8 +160,8 @@ export class SessionRecordingIngester {
this.realtimeManager = new RealtimeManager(this.redisPool, this.config)
- // We create a hash of the cluster to use as a unique identifier for the high water marks
- // This enables us to swap clusters without having to worry about resetting the high water marks
+ // We create a hash of the cluster to use as a unique identifier for the high-water marks
+ // This enables us to swap clusters without having to worry about resetting the high-water marks
const kafkaClusterIdentifier = crypto.createHash('md5').update(this.config.KAFKA_HOSTS).digest('hex')
this.sessionHighWaterMarker = new OffsetHighWaterMarker(
diff --git a/plugin-server/src/utils/token-bucket.ts b/plugin-server/src/utils/token-bucket.ts
index 310c151310575..81409a70e0c85 100644
--- a/plugin-server/src/utils/token-bucket.ts
+++ b/plugin-server/src/utils/token-bucket.ts
@@ -72,8 +72,6 @@ export const ConfiguredLimiter: Limiter = new Limiter(
defaultConfig.EVENT_OVERFLOW_BUCKET_REPLENISH_RATE
)
-export const OverflowWarningLimiter: Limiter = new Limiter(1, 1.0 / 3600)
-
-export const MessageSizeTooLargeWarningLimiter: Limiter = new Limiter(1, 1.0 / 300)
+export const IngestionWarningLimiter: Limiter = new Limiter(1, 1.0 / 3600)
export const LoggingLimiter: Limiter = new Limiter(1, 1.0 / 60)
diff --git a/plugin-server/src/worker/ingestion/event-pipeline/populateTeamDataStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/populateTeamDataStep.ts
index 5ea66c651c32d..06d14d8f4fced 100644
--- a/plugin-server/src/worker/ingestion/event-pipeline/populateTeamDataStep.ts
+++ b/plugin-server/src/worker/ingestion/event-pipeline/populateTeamDataStep.ts
@@ -33,7 +33,7 @@ export async function populateTeamDataStep(
if (event.team_id) {
// Check for an invalid UUID, which should be blocked by capture, when team_id is present
if (!UUID.validateString(event.uuid, false)) {
- await captureIngestionWarning(db, event.team_id, 'skipping_event_invalid_uuid', {
+ await captureIngestionWarning(db.kafkaProducer, event.team_id, 'skipping_event_invalid_uuid', {
eventUuid: JSON.stringify(event.uuid),
})
throw new Error(`Not a valid UUID: "${event.uuid}"`)
@@ -70,7 +70,7 @@ export async function populateTeamDataStep(
// Check for an invalid UUID, which should be blocked by capture, when team_id is present
if (!UUID.validateString(event.uuid, false)) {
- await captureIngestionWarning(db, team.id, 'skipping_event_invalid_uuid', {
+ await captureIngestionWarning(db.kafkaProducer, team.id, 'skipping_event_invalid_uuid', {
eventUuid: JSON.stringify(event.uuid),
})
throw new Error(`Not a valid UUID: "${event.uuid}"`)
diff --git a/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts
index b026423156662..879941e4d1838 100644
--- a/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts
+++ b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts
@@ -12,7 +12,7 @@ export async function prepareEventStep(runner: EventPipelineRunner, event: Plugi
const invalidTimestampCallback = function (type: string, details: Record) {
invalidTimestampCounter.labels(type).inc()
- tsParsingIngestionWarnings.push(captureIngestionWarning(runner.hub.db, team_id, type, details))
+ tsParsingIngestionWarnings.push(captureIngestionWarning(runner.hub.db.kafkaProducer, team_id, type, details))
}
const preIngestionEvent = await runner.hub.eventsProcessor.processEvent(
diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts
index 4dca2b465a0e9..b2356f3652662 100644
--- a/plugin-server/src/worker/ingestion/person-state.ts
+++ b/plugin-server/src/worker/ingestion/person-state.ts
@@ -323,19 +323,31 @@ export class PersonState {
return undefined
}
if (isDistinctIdIllegal(mergeIntoDistinctId)) {
- await captureIngestionWarning(this.db, teamId, 'cannot_merge_with_illegal_distinct_id', {
- illegalDistinctId: mergeIntoDistinctId,
- otherDistinctId: otherPersonDistinctId,
- eventUuid: this.event.uuid,
- })
+ await captureIngestionWarning(
+ this.db.kafkaProducer,
+ teamId,
+ 'cannot_merge_with_illegal_distinct_id',
+ {
+ illegalDistinctId: mergeIntoDistinctId,
+ otherDistinctId: otherPersonDistinctId,
+ eventUuid: this.event.uuid,
+ },
+ { alwaysSend: true }
+ )
return undefined
}
if (isDistinctIdIllegal(otherPersonDistinctId)) {
- await captureIngestionWarning(this.db, teamId, 'cannot_merge_with_illegal_distinct_id', {
- illegalDistinctId: otherPersonDistinctId,
- otherDistinctId: mergeIntoDistinctId,
- eventUuid: this.event.uuid,
- })
+ await captureIngestionWarning(
+ this.db.kafkaProducer,
+ teamId,
+ 'cannot_merge_with_illegal_distinct_id',
+ {
+ illegalDistinctId: otherPersonDistinctId,
+ otherDistinctId: mergeIntoDistinctId,
+ eventUuid: this.event.uuid,
+ },
+ { alwaysSend: true }
+ )
return undefined
}
return promiseRetry(
@@ -403,12 +415,17 @@ export class PersonState {
// If merge isn't allowed, we will ignore it, log an ingestion warning and exit
if (!mergeAllowed) {
- // TODO: add event UUID to the ingestion warning
- await captureIngestionWarning(this.db, this.teamId, 'cannot_merge_already_identified', {
- sourcePersonDistinctId: otherPersonDistinctId,
- targetPersonDistinctId: mergeIntoDistinctId,
- eventUuid: this.event.uuid,
- })
+ await captureIngestionWarning(
+ this.db.kafkaProducer,
+ this.teamId,
+ 'cannot_merge_already_identified',
+ {
+ sourcePersonDistinctId: otherPersonDistinctId,
+ targetPersonDistinctId: mergeIntoDistinctId,
+ eventUuid: this.event.uuid,
+ },
+ { alwaysSend: true }
+ )
status.warn('🤔', 'refused to merge an already identified user via an $identify or $create_alias call')
return mergeInto // We're returning the original person tied to distinct_id used for the event
}
diff --git a/plugin-server/src/worker/ingestion/process-event.ts b/plugin-server/src/worker/ingestion/process-event.ts
index 13154133304c4..5d6a9a4191334 100644
--- a/plugin-server/src/worker/ingestion/process-event.ts
+++ b/plugin-server/src/worker/ingestion/process-event.ts
@@ -21,7 +21,6 @@ import { MessageSizeTooLarge } from '../../utils/db/error'
import { KafkaProducerWrapper } from '../../utils/db/kafka-producer-wrapper'
import { safeClickhouseString, sanitizeEventName, timeoutGuard } from '../../utils/db/utils'
import { status } from '../../utils/status'
-import { MessageSizeTooLargeWarningLimiter } from '../../utils/token-bucket'
import { castTimestampOrNow } from '../../utils/utils'
import { GroupTypeManager } from './group-type-manager'
import { addGroupProperties } from './groups'
@@ -240,12 +239,10 @@ export class EventsProcessor {
// Some messages end up significantly larger than the original
// after plugin processing, person & group enrichment, etc.
if (error instanceof MessageSizeTooLarge) {
- if (MessageSizeTooLargeWarningLimiter.consume(`${teamId}`, 1)) {
- await captureIngestionWarning(this.db, teamId, 'message_size_too_large', {
- eventUuid: uuid,
- distinctId: distinctId,
- })
- }
+ await captureIngestionWarning(this.db.kafkaProducer, teamId, 'message_size_too_large', {
+ eventUuid: uuid,
+ distinctId: distinctId,
+ })
} else {
throw error
}
diff --git a/plugin-server/src/worker/ingestion/utils.ts b/plugin-server/src/worker/ingestion/utils.ts
index 7861e00185b0b..c52ef4ebba78e 100644
--- a/plugin-server/src/worker/ingestion/utils.ts
+++ b/plugin-server/src/worker/ingestion/utils.ts
@@ -3,8 +3,9 @@ import { ProducerRecord } from 'kafkajs'
import { DateTime } from 'luxon'
import { PipelineEvent, TeamId, TimestampFormat } from '../../types'
-import { DB } from '../../utils/db/db'
+import { KafkaProducerWrapper } from '../../utils/db/kafka-producer-wrapper'
import { safeClickhouseString } from '../../utils/db/utils'
+import { IngestionWarningLimiter } from '../../utils/token-bucket'
import { castTimestampOrNow, castTimestampToClickhouseFormat, UUIDT } from '../../utils/utils'
import { KAFKA_EVENTS_DEAD_LETTER_QUEUE, KAFKA_INGESTION_WARNINGS } from './../../config/kafka-topics'
@@ -61,19 +62,38 @@ export function generateEventDeadLetterQueueMessage(
// These get displayed under Data Management > Ingestion Warnings
// These warnings get displayed to end users. Make sure these errors are actionable and useful for them and
// also update IngestionWarningsView.tsx to display useful context.
-export async function captureIngestionWarning(db: DB, teamId: TeamId, type: string, details: Record) {
- await db.kafkaProducer.queueMessage({
- topic: KAFKA_INGESTION_WARNINGS,
- messages: [
- {
- value: JSON.stringify({
- team_id: teamId,
- type: type,
- source: 'plugin-server',
- details: JSON.stringify(details),
- timestamp: castTimestampOrNow(null, TimestampFormat.ClickHouse),
- }),
- },
- ],
- })
+export async function captureIngestionWarning(
+ kafkaProducer: KafkaProducerWrapper,
+ teamId: TeamId,
+ type: string,
+ details: Record,
+ /**
+ * captureIngestionWarning will debounce calls using team id and type as the key
+ * you can provide additional config in debounce.key to add to that key
+ * for example to debounce by specific user id you can use debounce: { key: user_id }
+ *
+ * if alwaysSend is true, the message will be sent regardless of the debounce key
+ * you can use this when a message is rare enough or important enough that it should always be sent
+ */
+ debounce?: { key?: string; alwaysSend?: boolean }
+) {
+ const limiter_key = `${teamId}:${type}:${debounce?.key || ''}`
+ if (!!debounce?.alwaysSend || IngestionWarningLimiter.consume(limiter_key, 1)) {
+ await kafkaProducer.queueMessage({
+ topic: KAFKA_INGESTION_WARNINGS,
+ messages: [
+ {
+ value: JSON.stringify({
+ team_id: teamId,
+ type: type,
+ source: 'plugin-server',
+ details: JSON.stringify(details),
+ timestamp: castTimestampOrNow(null, TimestampFormat.ClickHouse),
+ }),
+ },
+ ],
+ })
+ } else {
+ return Promise.resolve()
+ }
}
diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts
index ce2569909fe4a..851bb23e2ac14 100644
--- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts
+++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts
@@ -1,13 +1,15 @@
+import { Settings } from 'luxon'
+
import { buildStringMatcher } from '../../../src/config/config'
import {
eachBatchParallelIngestion,
IngestionOverflowMode,
} from '../../../src/main/ingestion-queues/batch-processing/each-batch-ingestion'
-import { OverflowWarningLimiter } from '../../../src/utils/token-bucket'
-import { captureIngestionWarning } from './../../../src/worker/ingestion/utils'
+import { TimestampFormat } from '../../../src/types'
+import { IngestionWarningLimiter } from '../../../src/utils/token-bucket'
+import { castTimestampOrNow } from '../../../src/utils/utils'
jest.mock('../../../src/utils/status')
-jest.mock('./../../../src/worker/ingestion/utils')
const runEventPipeline = jest.fn().mockResolvedValue('default value')
@@ -47,6 +49,7 @@ const captureEndpointEvent2 = {
describe('eachBatchParallelIngestion with overflow consume', () => {
let queue: any
+ let mockQueueMessage: jest.Mock
function createBatchWithMultipleEventsWithKeys(events: any[], timestamp?: any): any {
return events.map((event) => ({
@@ -58,17 +61,25 @@ describe('eachBatchParallelIngestion with overflow consume', () => {
}
beforeEach(() => {
+ // luxon datetime lets you specify a fixed "now"
+ Settings.now = () => new Date(2018, 4, 25).valueOf()
+
+ mockQueueMessage = jest.fn()
queue = {
bufferSleep: jest.fn(),
pluginsServer: {
INGESTION_CONCURRENCY: 4,
kafkaProducer: {
- queueMessage: jest.fn(),
+ queueMessage: mockQueueMessage,
},
teamManager: {
getTeamForEvent: jest.fn(),
},
- db: 'database',
+ db: {
+ kafkaProducer: {
+ queueMessage: mockQueueMessage,
+ },
+ },
},
}
})
@@ -77,22 +88,30 @@ describe('eachBatchParallelIngestion with overflow consume', () => {
'raises ingestion warning when consuming from overflow %s',
async (mode) => {
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1])
- const consume = jest.spyOn(OverflowWarningLimiter, 'consume').mockImplementation(() => true)
+ const consume = jest.spyOn(IngestionWarningLimiter, 'consume').mockImplementation(() => true)
queue.pluginsServer.teamManager.getTeamForEvent.mockResolvedValueOnce({ id: 1 })
const tokenBlockList = buildStringMatcher('another_token,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, batch, queue, mode)
expect(queue.pluginsServer.teamManager.getTeamForEvent).toHaveBeenCalledTimes(1)
- expect(consume).toHaveBeenCalledWith('1:id', 1)
- expect(captureIngestionWarning).toHaveBeenCalledWith(
- queue.pluginsServer.db,
- 1,
- 'ingestion_capacity_overflow',
- {
- overflowDistinctId: captureEndpointEvent1['distinct_id'],
- }
- )
+ expect(consume).toHaveBeenCalledWith('1:ingestion_capacity_overflow:id', 1)
+ expect(mockQueueMessage).toHaveBeenCalledWith({
+ topic: 'clickhouse_ingestion_warnings_test',
+ messages: [
+ {
+ value: JSON.stringify({
+ team_id: 1,
+ type: 'ingestion_capacity_overflow',
+ source: 'plugin-server',
+ details: JSON.stringify({
+ overflowDistinctId: 'id',
+ }),
+ timestamp: castTimestampOrNow(null, TimestampFormat.ClickHouse),
+ }),
+ },
+ ],
+ })
// Event is processed
expect(runEventPipeline).toHaveBeenCalled()
@@ -103,14 +122,13 @@ describe('eachBatchParallelIngestion with overflow consume', () => {
'does not raise ingestion warning when under threshold %s',
async (mode) => {
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1])
- const consume = jest.spyOn(OverflowWarningLimiter, 'consume').mockImplementation(() => false)
+ const consume = jest.spyOn(IngestionWarningLimiter, 'consume').mockImplementation(() => false)
queue.pluginsServer.teamManager.getTeamForEvent.mockResolvedValueOnce({ id: 1 })
const tokenBlockList = buildStringMatcher('another_token,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, batch, queue, mode)
- expect(consume).toHaveBeenCalledWith('1:id', 1)
- expect(captureIngestionWarning).not.toHaveBeenCalled()
+ expect(consume).toHaveBeenCalledWith('1:ingestion_capacity_overflow:id', 1)
expect(queue.pluginsServer.kafkaProducer.queueMessage).not.toHaveBeenCalled()
// Event is processed
@@ -126,13 +144,12 @@ describe('eachBatchParallelIngestion with overflow consume', () => {
captureEndpointEvent2,
captureEndpointEvent1,
])
- const consume = jest.spyOn(OverflowWarningLimiter, 'consume').mockImplementation(() => false)
+ const consume = jest.spyOn(IngestionWarningLimiter, 'consume').mockImplementation(() => false)
queue.pluginsServer.teamManager.getTeamForEvent.mockResolvedValueOnce({ id: 1 })
const tokenBlockList = buildStringMatcher('mytoken,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, batch, queue, mode)
- expect(captureIngestionWarning).not.toHaveBeenCalled()
expect(queue.pluginsServer.kafkaProducer.queueMessage).not.toHaveBeenCalled()
// captureEndpointEvent2 is processed, captureEndpointEvent1 are dropped
diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/services/replay-events-ingester.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/services/replay-events-ingester.test.ts
index 6545941f30cc8..af798504406e6 100644
--- a/plugin-server/tests/main/ingestion-queues/session-recording/services/replay-events-ingester.test.ts
+++ b/plugin-server/tests/main/ingestion-queues/session-recording/services/replay-events-ingester.test.ts
@@ -1,3 +1,4 @@
+import { DateTime } from 'luxon'
import { HighLevelProducer } from 'node-rdkafka'
import { defaultConfig } from '../../../../../src/config/config'
@@ -5,8 +6,9 @@ import { createKafkaProducer, produce } from '../../../../../src/kafka/producer'
import { OffsetHighWaterMarker } from '../../../../../src/main/ingestion-queues/session-recording/services/offset-high-water-marker'
import { ReplayEventsIngester } from '../../../../../src/main/ingestion-queues/session-recording/services/replay-events-ingester'
import { IncomingRecordingMessage } from '../../../../../src/main/ingestion-queues/session-recording/types'
-import { PluginsServerConfig } from '../../../../../src/types'
+import { PluginsServerConfig, TimestampFormat } from '../../../../../src/types'
import { status } from '../../../../../src/utils/status'
+import { castTimestampOrNow } from '../../../../../src/utils/utils'
jest.mock('../../../../../src/utils/status')
jest.mock('../../../../../src/kafka/producer')
@@ -47,6 +49,34 @@ describe('replay events ingester', () => {
await ingester.start()
})
+ test('does not ingest messages from a month in the future', async () => {
+ const twoMonthsFromNow = DateTime.utc().plus({ months: 2 })
+
+ await ingester.consume(makeIncomingMessage("mickey's fun house", twoMonthsFromNow.toMillis()))
+
+ expect(jest.mocked(status.debug).mock.calls).toEqual([])
+ expect(jest.mocked(produce).mock.calls).toHaveLength(1)
+ expect(jest.mocked(produce).mock.calls[0]).toHaveLength(1)
+ const call = jest.mocked(produce).mock.calls[0][0]
+
+ expect(call.topic).toEqual('clickhouse_ingestion_warnings_test')
+ // call.value is a Buffer convert it to a string
+ const value = call.value ? JSON.parse(call.value.toString()) : null
+ const expectedTimestamp = castTimestampOrNow(twoMonthsFromNow, TimestampFormat.ClickHouse)
+
+ expect(value.source).toEqual('plugin-server')
+ expect(value.team_id).toEqual(0)
+ expect(value.type).toEqual('replay_timestamp_too_far')
+ const details = JSON.parse(value.details)
+ expect(details).toEqual(
+ expect.objectContaining({
+ isValid: true,
+ daysFromNow: 61,
+ timestamp: expectedTimestamp,
+ })
+ )
+ })
+
test('it passes snapshot source along', async () => {
const ts = new Date().getTime()
await ingester.consume(makeIncomingMessage("mickey's fun house", ts))
@@ -79,6 +109,7 @@ describe('replay events ingester', () => {
uuid: expect.any(String),
})
})
+
test('it defaults snapshot source to web when absent', async () => {
const ts = new Date().getTime()
await ingester.consume(makeIncomingMessage(null, ts))
diff --git a/plugin-server/tests/worker/ingestion/utils.test.ts b/plugin-server/tests/worker/ingestion/utils.test.ts
index 0d7922a2aa72f..f398b01d29be0 100644
--- a/plugin-server/tests/worker/ingestion/utils.test.ts
+++ b/plugin-server/tests/worker/ingestion/utils.test.ts
@@ -24,7 +24,7 @@ describe('captureIngestionWarning()', () => {
}
it('can read own writes', async () => {
- await captureIngestionWarning(hub.db, 2, 'some_type', { foo: 'bar' })
+ await captureIngestionWarning(hub.db.kafkaProducer, 2, 'some_type', { foo: 'bar' })
const warnings = await delayUntilEventIngested(fetchWarnings)
expect(warnings).toEqual([