Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: blobby ingestion warnings #20963

Merged
merged 19 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import { useValues } from 'kea'
import { ReadingHog } from 'lib/components/hedgehogs'
import { ProductIntroduction } from 'lib/components/ProductIntroduction/ProductIntroduction'
import { TZLabel } from 'lib/components/TZLabel'
import { IconPlayCircle } from 'lib/lemon-ui/icons'
import { LemonButton } from 'lib/lemon-ui/LemonButton'
import { LemonTable } from 'lib/lemon-ui/LemonTable'
import { Link } from 'lib/lemon-ui/Link'
import { Sparkline } from 'lib/lemon-ui/Sparkline'
Expand All @@ -19,6 +21,8 @@ const WARNING_TYPE_TO_DESCRIPTION = {
event_timestamp_in_future: 'An event was sent more than 23 hours in the future',
ingestion_capacity_overflow: 'Event ingestion has overflowed capacity',
message_size_too_large: 'Discarded event exceeding 1MB limit',
replay_timestamp_invalid: 'Replay event timestamp is invalid',
replay_timestamp_too_far: 'Replay event timestamp was too far in the future',
}

const WARNING_TYPE_RENDERER = {
Expand Down Expand Up @@ -134,6 +138,68 @@ const WARNING_TYPE_RENDERER = {
</>
)
},
replay_timestamp_invalid: function Render(warning: IngestionWarning): JSX.Element {
const details: {
timestamp: string
session_id: string
} = {
timestamp: warning.details.timestamp,
session_id: warning.details.replayRecord.session_id,
}
return (
<>
Session replay data dropped due to invalid timestamp:
<ul>
<li>invalid timestamp: {details.timestamp}</li>
<li>session_id: {details.session_id}</li>
</ul>
<div className="max-w-30 mt-2">
<LemonButton
type="primary"
size="xsmall"
to={urls.replaySingle(details.session_id)}
sideIcon={<IconPlayCircle />}
data-attr="skewed-timestamp-view-recording"
>
View recording
</LemonButton>
</div>
</>
)
},
replay_timestamp_too_far: function Render(warning: IngestionWarning): JSX.Element {
const details: {
timestamp: string
session_id: string
daysFromNow: string
} = {
timestamp: warning.details.timestamp,
session_id: warning.details.replayRecord.session_id,
daysFromNow: warning.details.daysFromNow,
pauldambra marked this conversation as resolved.
Show resolved Hide resolved
}
return (
<>
The session replay data timestamp was too different from the capture time, so the data was dropped.
Event values:
<ul>
<li>invalid timestamp: {details.timestamp}</li>
<li>session_id: {details.session_id}</li>
<li>skew: {details.daysFromNow} days</li>
</ul>
<div className="max-w-30 mt-2">
<LemonButton
type="primary"
size="xsmall"
to={urls.replaySingle(details.session_id)}
sideIcon={<IconPlayCircle />}
data-attr="skewed-timestamp-view-recording"
>
View recording
</LemonButton>
</div>
</>
)
},
}

export function IngestionWarningsView(): JSX.Element {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,37 @@ import { dayjs } from 'lib/dayjs'

export const ingestionWarningsResponse = (baseTime: dayjs.Dayjs): { results: Record<string, any> } => ({
results: [
{
type: 'replay_timestamp_invalid',
lastSeen: baseTime.subtract(1, 'day'),
warnings: [
{
type: 'replay_timestamp_invalid',
timestamp: baseTime.subtract(1, 'day'),
details: {
timestamp: 'not a date',
replayRecord: { session_id: 'some uuid' },
},
},
],
count: 1,
},
{
type: 'replay_timestamp_too_far',
lastSeen: baseTime.subtract(1, 'day'),
warnings: [
{
type: 'replay_timestamp_too_far',
timestamp: baseTime.subtract(1, 'day'),
details: {
timestamp: baseTime.add(4, 'day').toISOString(),
replayRecord: { session_id: 'some uuid' },
daysFromNow: 4,
},
},
],
count: 1,
},
{
type: 'event_timestamp_in_future',
lastSeen: '2023-06-07T15:11:42.149000Z',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { PipelineEvent, ValueMatcher } from '../../../types'
import { formPipelineEvent } from '../../../utils/event'
import { retryIfRetriable } from '../../../utils/retries'
import { status } from '../../../utils/status'
import { ConfiguredLimiter, LoggingLimiter, OverflowWarningLimiter } from '../../../utils/token-bucket'
import { ConfiguredLimiter, LoggingLimiter } from '../../../utils/token-bucket'
import { EventPipelineRunner } from '../../../worker/ingestion/event-pipeline/runner'
import { captureIngestionWarning } from '../../../worker/ingestion/utils'
import { ingestionPartitionKeyOverflowed } from '../analytics-events-ingestion-consumer'
Expand Down Expand Up @@ -143,11 +143,17 @@ export async function eachBatchParallelIngestion(
) {
const team = await queue.pluginsServer.teamManager.getTeamForEvent(currentBatch[0].pluginEvent)
const distinct_id = currentBatch[0].pluginEvent.distinct_id
if (team && OverflowWarningLimiter.consume(`${team.id}:${distinct_id}`, 1)) {
if (team) {
processingPromises.push(
captureIngestionWarning(queue.pluginsServer.db, team.id, 'ingestion_capacity_overflow', {
overflowDistinctId: distinct_id,
})
captureIngestionWarning(
queue.pluginsServer.db.kafkaProducer,
team.id,
'ingestion_capacity_overflow',
{
overflowDistinctId: distinct_id,
},
{ key: distinct_id }
)
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import { findOffsetsToCommit } from '../../../../kafka/consumer'
import { retryOnDependencyUnavailableError } from '../../../../kafka/error-handling'
import { createKafkaProducer, disconnectProducer, flushProducer, produce } from '../../../../kafka/producer'
import { PluginsServerConfig } from '../../../../types'
import { KafkaProducerWrapper } from '../../../../utils/db/kafka-producer-wrapper'
import { status } from '../../../../utils/status'
import { captureIngestionWarning } from '../../../../worker/ingestion/utils'
import { eventDroppedCounter } from '../../metrics'
import { createSessionReplayEvent } from '../process-event'
import { IncomingRecordingMessage } from '../types'
Expand Down Expand Up @@ -130,7 +132,20 @@ export class ReplayEventsIngester {
// the replay record timestamp has to be valid and be within a reasonable diff from now
if (replayRecord !== null) {
const asDate = DateTime.fromSQL(replayRecord.first_timestamp)
if (!asDate.isValid || Math.abs(asDate.diffNow('months').months) >= 0.99) {
if (!asDate.isValid || Math.abs(asDate.diffNow('day').days) >= 7) {
pauldambra marked this conversation as resolved.
Show resolved Hide resolved
await captureIngestionWarning(
new KafkaProducerWrapper(this.producer),
event.team_id,
!asDate.isValid ? 'replay_timestamp_invalid' : 'replay_timestamp_too_far',
{
replayRecord,
timestamp: replayRecord.first_timestamp,
isValid: asDate.isValid,
daysFromNow: Math.round(Math.abs(asDate.diffNow('day').days)),
processingTimestamp: DateTime.now().toISO(),
},
{ key: event.session_id }
)
return drop('invalid_timestamp')
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ export class SessionRecordingIngester {

this.realtimeManager = new RealtimeManager(this.redisPool, this.config)

// We create a hash of the cluster to use as a unique identifier for the high water marks
// This enables us to swap clusters without having to worry about resetting the high water marks
// We create a hash of the cluster to use as a unique identifier for the high-water marks
// This enables us to swap clusters without having to worry about resetting the high-water marks
const kafkaClusterIdentifier = crypto.createHash('md5').update(this.config.KAFKA_HOSTS).digest('hex')

this.sessionHighWaterMarker = new OffsetHighWaterMarker(
Expand Down
4 changes: 1 addition & 3 deletions plugin-server/src/utils/token-bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ export const ConfiguredLimiter: Limiter = new Limiter(
defaultConfig.EVENT_OVERFLOW_BUCKET_REPLENISH_RATE
)

export const OverflowWarningLimiter: Limiter = new Limiter(1, 1.0 / 3600)

export const MessageSizeTooLargeWarningLimiter: Limiter = new Limiter(1, 1.0 / 300)
export const IngestionWarningLimiter: Limiter = new Limiter(1, 1.0 / 3600)

export const LoggingLimiter: Limiter = new Limiter(1, 1.0 / 60)
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export async function populateTeamDataStep(
if (event.team_id) {
// Check for an invalid UUID, which should be blocked by capture, when team_id is present
if (!UUID.validateString(event.uuid, false)) {
await captureIngestionWarning(db, event.team_id, 'skipping_event_invalid_uuid', {
await captureIngestionWarning(db.kafkaProducer, event.team_id, 'skipping_event_invalid_uuid', {
eventUuid: JSON.stringify(event.uuid),
})
throw new Error(`Not a valid UUID: "${event.uuid}"`)
Expand Down Expand Up @@ -70,7 +70,7 @@ export async function populateTeamDataStep(

// Check for an invalid UUID, which should be blocked by capture, when team_id is present
if (!UUID.validateString(event.uuid, false)) {
await captureIngestionWarning(db, team.id, 'skipping_event_invalid_uuid', {
await captureIngestionWarning(db.kafkaProducer, team.id, 'skipping_event_invalid_uuid', {
eventUuid: JSON.stringify(event.uuid),
})
throw new Error(`Not a valid UUID: "${event.uuid}"`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export async function prepareEventStep(runner: EventPipelineRunner, event: Plugi
const invalidTimestampCallback = function (type: string, details: Record<string, any>) {
invalidTimestampCounter.labels(type).inc()

tsParsingIngestionWarnings.push(captureIngestionWarning(runner.hub.db, team_id, type, details))
tsParsingIngestionWarnings.push(captureIngestionWarning(runner.hub.db.kafkaProducer, team_id, type, details))
}

const preIngestionEvent = await runner.hub.eventsProcessor.processEvent(
Expand Down
49 changes: 33 additions & 16 deletions plugin-server/src/worker/ingestion/person-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,19 +323,31 @@ export class PersonState {
return undefined
}
if (isDistinctIdIllegal(mergeIntoDistinctId)) {
await captureIngestionWarning(this.db, teamId, 'cannot_merge_with_illegal_distinct_id', {
illegalDistinctId: mergeIntoDistinctId,
otherDistinctId: otherPersonDistinctId,
eventUuid: this.event.uuid,
})
await captureIngestionWarning(
this.db.kafkaProducer,
teamId,
'cannot_merge_with_illegal_distinct_id',
{
illegalDistinctId: mergeIntoDistinctId,
otherDistinctId: otherPersonDistinctId,
eventUuid: this.event.uuid,
},
{ alwaysSend: true }
)
return undefined
}
if (isDistinctIdIllegal(otherPersonDistinctId)) {
await captureIngestionWarning(this.db, teamId, 'cannot_merge_with_illegal_distinct_id', {
illegalDistinctId: otherPersonDistinctId,
otherDistinctId: mergeIntoDistinctId,
eventUuid: this.event.uuid,
})
await captureIngestionWarning(
this.db.kafkaProducer,
teamId,
'cannot_merge_with_illegal_distinct_id',
{
illegalDistinctId: otherPersonDistinctId,
otherDistinctId: mergeIntoDistinctId,
eventUuid: this.event.uuid,
},
{ alwaysSend: true }
)
return undefined
}
return promiseRetry(
Expand Down Expand Up @@ -403,12 +415,17 @@ export class PersonState {

// If merge isn't allowed, we will ignore it, log an ingestion warning and exit
if (!mergeAllowed) {
// TODO: add event UUID to the ingestion warning
await captureIngestionWarning(this.db, this.teamId, 'cannot_merge_already_identified', {
sourcePersonDistinctId: otherPersonDistinctId,
targetPersonDistinctId: mergeIntoDistinctId,
eventUuid: this.event.uuid,
})
await captureIngestionWarning(
this.db.kafkaProducer,
this.teamId,
'cannot_merge_already_identified',
{
sourcePersonDistinctId: otherPersonDistinctId,
targetPersonDistinctId: mergeIntoDistinctId,
eventUuid: this.event.uuid,
},
{ alwaysSend: true }
)
status.warn('🤔', 'refused to merge an already identified user via an $identify or $create_alias call')
return mergeInto // We're returning the original person tied to distinct_id used for the event
}
Expand Down
11 changes: 4 additions & 7 deletions plugin-server/src/worker/ingestion/process-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import { MessageSizeTooLarge } from '../../utils/db/error'
import { KafkaProducerWrapper } from '../../utils/db/kafka-producer-wrapper'
import { safeClickhouseString, sanitizeEventName, timeoutGuard } from '../../utils/db/utils'
import { status } from '../../utils/status'
import { MessageSizeTooLargeWarningLimiter } from '../../utils/token-bucket'
import { castTimestampOrNow } from '../../utils/utils'
import { GroupTypeManager } from './group-type-manager'
import { addGroupProperties } from './groups'
Expand Down Expand Up @@ -240,12 +239,10 @@ export class EventsProcessor {
// Some messages end up significantly larger than the original
// after plugin processing, person & group enrichment, etc.
if (error instanceof MessageSizeTooLarge) {
if (MessageSizeTooLargeWarningLimiter.consume(`${teamId}`, 1)) {
await captureIngestionWarning(this.db, teamId, 'message_size_too_large', {
eventUuid: uuid,
distinctId: distinctId,
})
}
await captureIngestionWarning(this.db.kafkaProducer, teamId, 'message_size_too_large', {
eventUuid: uuid,
distinctId: distinctId,
})
} else {
throw error
}
Expand Down
52 changes: 36 additions & 16 deletions plugin-server/src/worker/ingestion/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ import { ProducerRecord } from 'kafkajs'
import { DateTime } from 'luxon'

import { PipelineEvent, TeamId, TimestampFormat } from '../../types'
import { DB } from '../../utils/db/db'
import { KafkaProducerWrapper } from '../../utils/db/kafka-producer-wrapper'
import { safeClickhouseString } from '../../utils/db/utils'
import { IngestionWarningLimiter } from '../../utils/token-bucket'
import { castTimestampOrNow, castTimestampToClickhouseFormat, UUIDT } from '../../utils/utils'
import { KAFKA_EVENTS_DEAD_LETTER_QUEUE, KAFKA_INGESTION_WARNINGS } from './../../config/kafka-topics'

Expand Down Expand Up @@ -61,19 +62,38 @@ export function generateEventDeadLetterQueueMessage(
// These get displayed under Data Management > Ingestion Warnings
// These warnings get displayed to end users. Make sure these errors are actionable and useful for them and
// also update IngestionWarningsView.tsx to display useful context.
export async function captureIngestionWarning(db: DB, teamId: TeamId, type: string, details: Record<string, any>) {
await db.kafkaProducer.queueMessage({
topic: KAFKA_INGESTION_WARNINGS,
messages: [
{
value: JSON.stringify({
team_id: teamId,
type: type,
source: 'plugin-server',
details: JSON.stringify(details),
timestamp: castTimestampOrNow(null, TimestampFormat.ClickHouse),
}),
},
],
})
export async function captureIngestionWarning(
pauldambra marked this conversation as resolved.
Show resolved Hide resolved
kafkaProducer: KafkaProducerWrapper,
teamId: TeamId,
type: string,
details: Record<string, any>,
/**
* captureIngestionWarning will debounce calls using team id and type as the key
* you can provide additional config in debounce.key to add to that key
* for example to debounce by specific user id you can use debounce: { key: user_id }
*
* if alwaysSend is true, the message will be sent regardless of the debounce key
* you can use this when a message is rare enough or important enough that it should always be sent
*/
debounce?: { key?: string; alwaysSend?: boolean }
) {
const limiter_key = `${teamId}:${type}:${debounce?.key || ''}`
if (!!debounce?.alwaysSend || IngestionWarningLimiter.consume(limiter_key, 1)) {
await kafkaProducer.queueMessage({
topic: KAFKA_INGESTION_WARNINGS,
messages: [
{
value: JSON.stringify({
team_id: teamId,
type: type,
source: 'plugin-server',
details: JSON.stringify(details),
timestamp: castTimestampOrNow(null, TimestampFormat.ClickHouse),
}),
},
],
})
} else {
return Promise.resolve()
}
}
Loading
Loading