Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: blobby ingestion warnings #20963

Merged
merged 19 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import { useValues } from 'kea'
import { ReadingHog } from 'lib/components/hedgehogs'
import { ProductIntroduction } from 'lib/components/ProductIntroduction/ProductIntroduction'
import { TZLabel } from 'lib/components/TZLabel'
import { IconPlayCircle } from 'lib/lemon-ui/icons'
import { LemonButton } from 'lib/lemon-ui/LemonButton'
import { LemonTable } from 'lib/lemon-ui/LemonTable'
import { Link } from 'lib/lemon-ui/Link'
import { Sparkline } from 'lib/lemon-ui/Sparkline'
Expand All @@ -19,6 +21,8 @@ const WARNING_TYPE_TO_DESCRIPTION = {
event_timestamp_in_future: 'An event was sent more than 23 hours in the future',
ingestion_capacity_overflow: 'Event ingestion has overflowed capacity',
message_size_too_large: 'Discarded event exceeding 1MB limit',
replay_timestamp_invalid: 'Replay event timestamp is invalid',
replay_timestamp_too_far: 'Replay event timestamp was too far in the future',
}

const WARNING_TYPE_RENDERER = {
Expand Down Expand Up @@ -134,6 +138,68 @@ const WARNING_TYPE_RENDERER = {
</>
)
},
replay_timestamp_invalid: function Render(warning: IngestionWarning): JSX.Element {
const details: {
timestamp: string
session_id: string
} = {
timestamp: warning.details.timestamp,
session_id: warning.details.replayRecord.session_id,
}
return (
<>
Session replay data dropped due to invalid timestamp::
pauldambra marked this conversation as resolved.
Show resolved Hide resolved
<ul>
<li>invalid timestamp: {details.timestamp}</li>
<li>session_id: {details.session_id}</li>
</ul>
<div className="max-w-30 mt-2">
<LemonButton
type="primary"
size="xsmall"
to={urls.replaySingle(details.session_id)}
sideIcon={<IconPlayCircle />}
data-attr="skewed-timestamp-view-recording"
>
View recording
</LemonButton>
</div>
</>
)
},
replay_timestamp_too_far: function Render(warning: IngestionWarning): JSX.Element {
const details: {
timestamp: string
session_id: string
daysFromNow: string
} = {
timestamp: warning.details.timestamp,
session_id: warning.details.replayRecord.session_id,
daysFromNow: warning.details.daysFromNow,
pauldambra marked this conversation as resolved.
Show resolved Hide resolved
}
return (
<>
The session replay data timestamp was too different from the capture time, so the data was dropped.
Event values:
<ul>
<li>invalid timestamp: {details.timestamp}</li>
<li>session_id: {details.session_id}</li>
<li>skew: {details.daysFromNow} days</li>
</ul>
<div className="max-w-30 mt-2">
<LemonButton
type="primary"
size="xsmall"
to={urls.replaySingle(details.session_id)}
sideIcon={<IconPlayCircle />}
data-attr="skewed-timestamp-view-recording"
>
View recording
</LemonButton>
</div>
</>
)
},
}

export function IngestionWarningsView(): JSX.Element {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,37 @@ import { dayjs } from 'lib/dayjs'

export const ingestionWarningsResponse = (baseTime: dayjs.Dayjs): { results: Record<string, any> } => ({
results: [
{
type: 'replay_timestamp_invalid',
lastSeen: baseTime.subtract(1, 'day'),
warnings: [
{
type: 'replay_timestamp_invalid',
timestamp: baseTime.subtract(1, 'day'),
details: {
timestamp: 'not a date',
replayRecord: { session_id: 'some uuid' },
},
},
],
count: 1,
},
{
type: 'replay_timestamp_too_far',
lastSeen: baseTime.subtract(1, 'day'),
warnings: [
{
type: 'replay_timestamp_too_far',
timestamp: baseTime.subtract(1, 'day'),
details: {
timestamp: baseTime.add(4, 'day').toISOString(),
replayRecord: { session_id: 'some uuid' },
daysFromNow: 4,
},
},
],
count: 1,
},
{
type: 'event_timestamp_in_future',
lastSeen: '2023-06-07T15:11:42.149000Z',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,14 @@ export async function eachBatchParallelIngestion(
const distinct_id = currentBatch[0].pluginEvent.distinct_id
if (team && OverflowWarningLimiter.consume(`${team.id}:${distinct_id}`, 1)) {
processingPromises.push(
captureIngestionWarning(queue.pluginsServer.db, team.id, 'ingestion_capacity_overflow', {
overflowDistinctId: distinct_id,
})
captureIngestionWarning(
queue.pluginsServer.db.kafkaProducer,
team.id,
'ingestion_capacity_overflow',
{
overflowDistinctId: distinct_id,
}
)
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import { findOffsetsToCommit } from '../../../../kafka/consumer'
import { retryOnDependencyUnavailableError } from '../../../../kafka/error-handling'
import { createKafkaProducer, disconnectProducer, flushProducer, produce } from '../../../../kafka/producer'
import { PluginsServerConfig } from '../../../../types'
import { KafkaProducerWrapper } from '../../../../utils/db/kafka-producer-wrapper'
import { status } from '../../../../utils/status'
import { captureIngestionWarning } from '../../../../worker/ingestion/utils'
import { eventDroppedCounter } from '../../metrics'
import { createSessionReplayEvent } from '../process-event'
import { IncomingRecordingMessage } from '../types'
Expand Down Expand Up @@ -130,7 +132,18 @@ export class ReplayEventsIngester {
// the replay record timestamp has to be valid and be within a reasonable diff from now
if (replayRecord !== null) {
const asDate = DateTime.fromSQL(replayRecord.first_timestamp)
if (!asDate.isValid || Math.abs(asDate.diffNow('months').months) >= 0.99) {
if (!asDate.isValid || Math.abs(asDate.diffNow('day').days) >= 7) {
pauldambra marked this conversation as resolved.
Show resolved Hide resolved
await captureIngestionWarning(
new KafkaProducerWrapper(this.producer),
event.team_id,
!asDate.isValid ? 'replay_timestamp_invalid' : 'replay_timestamp_too_far',
{
replayRecord,
timestamp: replayRecord.first_timestamp,
isValid: asDate.isValid,
daysFromNow: Math.round(Math.abs(asDate.diffNow('day').days)),
}
)
return drop('invalid_timestamp')
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ export class SessionRecordingIngester {

this.realtimeManager = new RealtimeManager(this.redisPool, this.config)

// We create a hash of the cluster to use as a unique identifier for the high water marks
// This enables us to swap clusters without having to worry about resetting the high water marks
// We create a hash of the cluster to use as a unique identifier for the high-water marks
// This enables us to swap clusters without having to worry about resetting the high-water marks
const kafkaClusterIdentifier = crypto.createHash('md5').update(this.config.KAFKA_HOSTS).digest('hex')

this.sessionHighWaterMarker = new OffsetHighWaterMarker(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export async function populateTeamDataStep(
if (event.team_id) {
// Check for an invalid UUID, which should be blocked by capture, when team_id is present
if (!UUID.validateString(event.uuid, false)) {
await captureIngestionWarning(db, event.team_id, 'skipping_event_invalid_uuid', {
await captureIngestionWarning(db.kafkaProducer, event.team_id, 'skipping_event_invalid_uuid', {
eventUuid: JSON.stringify(event.uuid),
})
throw new Error(`Not a valid UUID: "${event.uuid}"`)
Expand Down Expand Up @@ -70,7 +70,7 @@ export async function populateTeamDataStep(

// Check for an invalid UUID, which should be blocked by capture, when team_id is present
if (!UUID.validateString(event.uuid, false)) {
await captureIngestionWarning(db, team.id, 'skipping_event_invalid_uuid', {
await captureIngestionWarning(db.kafkaProducer, team.id, 'skipping_event_invalid_uuid', {
eventUuid: JSON.stringify(event.uuid),
})
throw new Error(`Not a valid UUID: "${event.uuid}"`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export async function prepareEventStep(runner: EventPipelineRunner, event: Plugi
const invalidTimestampCallback = function (type: string, details: Record<string, any>) {
invalidTimestampCounter.labels(type).inc()

tsParsingIngestionWarnings.push(captureIngestionWarning(runner.hub.db, team_id, type, details))
tsParsingIngestionWarnings.push(captureIngestionWarning(runner.hub.db.kafkaProducer, team_id, type, details))
}

const preIngestionEvent = await runner.hub.eventsProcessor.processEvent(
Expand Down
6 changes: 3 additions & 3 deletions plugin-server/src/worker/ingestion/person-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,15 +323,15 @@ export class PersonState {
return undefined
}
if (isDistinctIdIllegal(mergeIntoDistinctId)) {
await captureIngestionWarning(this.db, teamId, 'cannot_merge_with_illegal_distinct_id', {
await captureIngestionWarning(this.db.kafkaProducer, teamId, 'cannot_merge_with_illegal_distinct_id', {
illegalDistinctId: mergeIntoDistinctId,
otherDistinctId: otherPersonDistinctId,
eventUuid: this.event.uuid,
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's best not to rate-limit these, as they contain useful info and merges are supposed to be rare.
I support not making the rate-limiting optional to avoid deploying a bomb, but let's add mergeIntoDistinctId as the debounce key for these the three warnings on this file

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pushed a solution to let us override the debounce key construction so that the three uses in the person state are debounced together.... is that what you meant?

(happy to change it if I'm being silly :))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for being unclear: I do not wish to debounce these warnings, and send all of them to CH. They should be extremely infrequent unless there's a subtle instrumentation bug, and in that case it's better to have as much info as possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so keeping the safe by default... i've made it opt in to always sending rather than implicit on absent debounce key for e.g.

so

  1. always debounce on team id and type
  2. unless you provide a debounce key in which case it is team id, type, and key
  3. unless you say alwaysSend in which case it always sends

return undefined
}
if (isDistinctIdIllegal(otherPersonDistinctId)) {
await captureIngestionWarning(this.db, teamId, 'cannot_merge_with_illegal_distinct_id', {
await captureIngestionWarning(this.db.kafkaProducer, teamId, 'cannot_merge_with_illegal_distinct_id', {
illegalDistinctId: otherPersonDistinctId,
otherDistinctId: mergeIntoDistinctId,
eventUuid: this.event.uuid,
Expand Down Expand Up @@ -404,7 +404,7 @@ export class PersonState {
// If merge isn't allowed, we will ignore it, log an ingestion warning and exit
if (!mergeAllowed) {
// TODO: add event UUID to the ingestion warning
await captureIngestionWarning(this.db, this.teamId, 'cannot_merge_already_identified', {
await captureIngestionWarning(this.db.kafkaProducer, this.teamId, 'cannot_merge_already_identified', {
sourcePersonDistinctId: otherPersonDistinctId,
targetPersonDistinctId: mergeIntoDistinctId,
eventUuid: this.event.uuid,
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/src/worker/ingestion/process-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ export class EventsProcessor {
// after plugin processing, person & group enrichment, etc.
if (error instanceof MessageSizeTooLarge) {
if (MessageSizeTooLargeWarningLimiter.consume(`${teamId}`, 1)) {
await captureIngestionWarning(this.db, teamId, 'message_size_too_large', {
await captureIngestionWarning(this.db.kafkaProducer, teamId, 'message_size_too_large', {
eventUuid: uuid,
distinctId: distinctId,
})
Expand Down
11 changes: 8 additions & 3 deletions plugin-server/src/worker/ingestion/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { ProducerRecord } from 'kafkajs'
import { DateTime } from 'luxon'

import { PipelineEvent, TeamId, TimestampFormat } from '../../types'
import { DB } from '../../utils/db/db'
import { KafkaProducerWrapper } from '../../utils/db/kafka-producer-wrapper'
import { safeClickhouseString } from '../../utils/db/utils'
import { castTimestampOrNow, castTimestampToClickhouseFormat, UUIDT } from '../../utils/utils'
import { KAFKA_EVENTS_DEAD_LETTER_QUEUE, KAFKA_INGESTION_WARNINGS } from './../../config/kafka-topics'
Expand Down Expand Up @@ -61,8 +61,13 @@ export function generateEventDeadLetterQueueMessage(
// These get displayed under Data Management > Ingestion Warnings
// These warnings get displayed to end users. Make sure these errors are actionable and useful for them and
// also update IngestionWarningsView.tsx to display useful context.
export async function captureIngestionWarning(db: DB, teamId: TeamId, type: string, details: Record<string, any>) {
await db.kafkaProducer.queueMessage({
export async function captureIngestionWarning(
pauldambra marked this conversation as resolved.
Show resolved Hide resolved
kafkaProducer: KafkaProducerWrapper,
teamId: TeamId,
type: string,
details: Record<string, any>
) {
await kafkaProducer.queueMessage({
topic: KAFKA_INGESTION_WARNINGS,
messages: [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ describe('eachBatchParallelIngestion with overflow consume', () => {
expect(queue.pluginsServer.teamManager.getTeamForEvent).toHaveBeenCalledTimes(1)
expect(consume).toHaveBeenCalledWith('1:id', 1)
expect(captureIngestionWarning).toHaveBeenCalledWith(
queue.pluginsServer.db,
queue.pluginsServer.db.kafkaProducer,
1,
'ingestion_capacity_overflow',
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { DateTime } from 'luxon'
import { HighLevelProducer } from 'node-rdkafka'

import { defaultConfig } from '../../../../../src/config/config'
import { createKafkaProducer, produce } from '../../../../../src/kafka/producer'
import { OffsetHighWaterMarker } from '../../../../../src/main/ingestion-queues/session-recording/services/offset-high-water-marker'
import { ReplayEventsIngester } from '../../../../../src/main/ingestion-queues/session-recording/services/replay-events-ingester'
import { IncomingRecordingMessage } from '../../../../../src/main/ingestion-queues/session-recording/types'
import { PluginsServerConfig } from '../../../../../src/types'
import { PluginsServerConfig, TimestampFormat } from '../../../../../src/types'
import { status } from '../../../../../src/utils/status'
import { castTimestampOrNow } from '../../../../../src/utils/utils'

jest.mock('../../../../../src/utils/status')
jest.mock('../../../../../src/kafka/producer')
Expand Down Expand Up @@ -47,6 +49,34 @@ describe('replay events ingester', () => {
await ingester.start()
})

test('does not ingest messages from a month in the future', async () => {
const twoMonthsFromNow = DateTime.utc().plus({ months: 2 })

await ingester.consume(makeIncomingMessage("mickey's fun house", twoMonthsFromNow.toMillis()))

expect(jest.mocked(status.debug).mock.calls).toEqual([])
expect(jest.mocked(produce).mock.calls).toHaveLength(1)
expect(jest.mocked(produce).mock.calls[0]).toHaveLength(1)
const call = jest.mocked(produce).mock.calls[0][0]

expect(call.topic).toEqual('clickhouse_ingestion_warnings_test')
// call.value is a Buffer convert it to a string
const value = call.value ? JSON.parse(call.value.toString()) : null
const expectedTimestamp = castTimestampOrNow(twoMonthsFromNow, TimestampFormat.ClickHouse)

expect(value.source).toEqual('plugin-server')
expect(value.team_id).toEqual(0)
expect(value.type).toEqual('replay_timestamp_too_far')
const details = JSON.parse(value.details)
expect(details).toEqual(
expect.objectContaining({
isValid: true,
daysFromNow: 61,
timestamp: expectedTimestamp,
})
)
})

test('it passes snapshot source along', async () => {
const ts = new Date().getTime()
await ingester.consume(makeIncomingMessage("mickey's fun house", ts))
Expand Down Expand Up @@ -79,6 +109,7 @@ describe('replay events ingester', () => {
uuid: expect.any(String),
})
})

test('it defaults snapshot source to web when absent', async () => {
const ts = new Date().getTime()
await ingester.consume(makeIncomingMessage(null, ts))
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/tests/worker/ingestion/utils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ describe('captureIngestionWarning()', () => {
}

it('can read own writes', async () => {
await captureIngestionWarning(hub.db, 2, 'some_type', { foo: 'bar' })
await captureIngestionWarning(hub.db.kafkaProducer, 2, 'some_type', { foo: 'bar' })

const warnings = await delayUntilEventIngested(fetchWarnings)
expect(warnings).toEqual([
Expand Down
Loading