Skip to content

Commit

Permalink
Merge branch 'master' into fix/no-varying-in-snapshot-test
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldambra committed Mar 19, 2024
2 parents 21f5812 + 36baca3 commit 88a2706
Show file tree
Hide file tree
Showing 17 changed files with 275 additions and 77 deletions.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import { useValues } from 'kea'
import { ReadingHog } from 'lib/components/hedgehogs'
import { ProductIntroduction } from 'lib/components/ProductIntroduction/ProductIntroduction'
import { TZLabel } from 'lib/components/TZLabel'
import { IconPlayCircle } from 'lib/lemon-ui/icons'
import { LemonButton } from 'lib/lemon-ui/LemonButton'
import { LemonTable } from 'lib/lemon-ui/LemonTable'
import { Link } from 'lib/lemon-ui/Link'
import { Sparkline } from 'lib/lemon-ui/Sparkline'
Expand All @@ -19,6 +21,8 @@ const WARNING_TYPE_TO_DESCRIPTION = {
event_timestamp_in_future: 'An event was sent more than 23 hours in the future',
ingestion_capacity_overflow: 'Event ingestion has overflowed capacity',
message_size_too_large: 'Discarded event exceeding 1MB limit',
replay_timestamp_invalid: 'Replay event timestamp is invalid',
replay_timestamp_too_far: 'Replay event timestamp was too far in the future',
}

const WARNING_TYPE_RENDERER = {
Expand Down Expand Up @@ -134,6 +138,68 @@ const WARNING_TYPE_RENDERER = {
</>
)
},
replay_timestamp_invalid: function Render(warning: IngestionWarning): JSX.Element {
const details: {
timestamp: string
session_id: string
} = {
timestamp: warning.details.timestamp,
session_id: warning.details.replayRecord.session_id,
}
return (
<>
Session replay data dropped due to invalid timestamp:
<ul>
<li>invalid timestamp: {details.timestamp}</li>
<li>session_id: {details.session_id}</li>
</ul>
<div className="max-w-30 mt-2">
<LemonButton
type="primary"
size="xsmall"
to={urls.replaySingle(details.session_id)}
sideIcon={<IconPlayCircle />}
data-attr="skewed-timestamp-view-recording"
>
View recording
</LemonButton>
</div>
</>
)
},
replay_timestamp_too_far: function Render(warning: IngestionWarning): JSX.Element {
const details: {
timestamp: string
session_id: string
daysFromNow: string
} = {
timestamp: warning.details.timestamp,
session_id: warning.details.replayRecord.session_id,
daysFromNow: warning.details.daysFromNow,
}
return (
<>
The session replay data timestamp was too different from the capture time, so the data was dropped.
Event values:
<ul>
<li>invalid timestamp: {details.timestamp}</li>
<li>session_id: {details.session_id}</li>
<li>skew: {details.daysFromNow} days</li>
</ul>
<div className="max-w-30 mt-2">
<LemonButton
type="primary"
size="xsmall"
to={urls.replaySingle(details.session_id)}
sideIcon={<IconPlayCircle />}
data-attr="skewed-timestamp-view-recording"
>
View recording
</LemonButton>
</div>
</>
)
},
}

export function IngestionWarningsView(): JSX.Element {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,37 @@ import { dayjs } from 'lib/dayjs'

export const ingestionWarningsResponse = (baseTime: dayjs.Dayjs): { results: Record<string, any> } => ({
results: [
{
type: 'replay_timestamp_invalid',
lastSeen: baseTime.subtract(1, 'day'),
warnings: [
{
type: 'replay_timestamp_invalid',
timestamp: baseTime.subtract(1, 'day'),
details: {
timestamp: 'not a date',
replayRecord: { session_id: 'some uuid' },
},
},
],
count: 1,
},
{
type: 'replay_timestamp_too_far',
lastSeen: baseTime.subtract(1, 'day'),
warnings: [
{
type: 'replay_timestamp_too_far',
timestamp: baseTime.subtract(1, 'day'),
details: {
timestamp: baseTime.add(4, 'day').toISOString(),
replayRecord: { session_id: 'some uuid' },
daysFromNow: 4,
},
},
],
count: 1,
},
{
type: 'event_timestamp_in_future',
lastSeen: '2023-06-07T15:11:42.149000Z',
Expand Down
3 changes: 2 additions & 1 deletion frontend/src/scenes/onboarding/onboardingLogic.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { actions, connect, kea, listeners, path, props, reducers, selectors } from 'kea'
import { actionToUrl, router, urlToAction } from 'kea-router'
import { featureFlagLogic } from 'lib/logic/featureFlagLogic'
import { FEATURE_FLAGS } from 'lib/constants'
import { featureFlagLogic, FeatureFlagsSet } from 'lib/logic/featureFlagLogic'
import { eventUsageLogic } from 'lib/utils/eventUsageLogic'
import { billingLogic } from 'scenes/billing/billingLogic'
import { preflightLogic } from 'scenes/PreflightCheck/preflightLogic'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { PipelineEvent, ValueMatcher } from '../../../types'
import { formPipelineEvent } from '../../../utils/event'
import { retryIfRetriable } from '../../../utils/retries'
import { status } from '../../../utils/status'
import { ConfiguredLimiter, LoggingLimiter, OverflowWarningLimiter } from '../../../utils/token-bucket'
import { ConfiguredLimiter, LoggingLimiter } from '../../../utils/token-bucket'
import { EventPipelineRunner } from '../../../worker/ingestion/event-pipeline/runner'
import { captureIngestionWarning } from '../../../worker/ingestion/utils'
import { ingestionPartitionKeyOverflowed } from '../analytics-events-ingestion-consumer'
Expand Down Expand Up @@ -143,11 +143,17 @@ export async function eachBatchParallelIngestion(
) {
const team = await queue.pluginsServer.teamManager.getTeamForEvent(currentBatch[0].pluginEvent)
const distinct_id = currentBatch[0].pluginEvent.distinct_id
if (team && OverflowWarningLimiter.consume(`${team.id}:${distinct_id}`, 1)) {
if (team) {
processingPromises.push(
captureIngestionWarning(queue.pluginsServer.db, team.id, 'ingestion_capacity_overflow', {
overflowDistinctId: distinct_id,
})
captureIngestionWarning(
queue.pluginsServer.db.kafkaProducer,
team.id,
'ingestion_capacity_overflow',
{
overflowDistinctId: distinct_id,
},
{ key: distinct_id }
)
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import { findOffsetsToCommit } from '../../../../kafka/consumer'
import { retryOnDependencyUnavailableError } from '../../../../kafka/error-handling'
import { createKafkaProducer, disconnectProducer, flushProducer, produce } from '../../../../kafka/producer'
import { PluginsServerConfig } from '../../../../types'
import { KafkaProducerWrapper } from '../../../../utils/db/kafka-producer-wrapper'
import { status } from '../../../../utils/status'
import { captureIngestionWarning } from '../../../../worker/ingestion/utils'
import { eventDroppedCounter } from '../../metrics'
import { createSessionReplayEvent } from '../process-event'
import { IncomingRecordingMessage } from '../types'
Expand Down Expand Up @@ -130,7 +132,20 @@ export class ReplayEventsIngester {
// the replay record timestamp has to be valid and be within a reasonable diff from now
if (replayRecord !== null) {
const asDate = DateTime.fromSQL(replayRecord.first_timestamp)
if (!asDate.isValid || Math.abs(asDate.diffNow('months').months) >= 0.99) {
if (!asDate.isValid || Math.abs(asDate.diffNow('day').days) >= 7) {
await captureIngestionWarning(
new KafkaProducerWrapper(this.producer),
event.team_id,
!asDate.isValid ? 'replay_timestamp_invalid' : 'replay_timestamp_too_far',
{
replayRecord,
timestamp: replayRecord.first_timestamp,
isValid: asDate.isValid,
daysFromNow: Math.round(Math.abs(asDate.diffNow('day').days)),
processingTimestamp: DateTime.now().toISO(),
},
{ key: event.session_id }
)
return drop('invalid_timestamp')
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ export class SessionRecordingIngester {

this.realtimeManager = new RealtimeManager(this.redisPool, this.config)

// We create a hash of the cluster to use as a unique identifier for the high water marks
// This enables us to swap clusters without having to worry about resetting the high water marks
// We create a hash of the cluster to use as a unique identifier for the high-water marks
// This enables us to swap clusters without having to worry about resetting the high-water marks
const kafkaClusterIdentifier = crypto.createHash('md5').update(this.config.KAFKA_HOSTS).digest('hex')

this.sessionHighWaterMarker = new OffsetHighWaterMarker(
Expand Down Expand Up @@ -227,7 +227,6 @@ export class SessionRecordingIngester {
*/
this.promises.add(promise)

// eslint-disable-next-line @typescript-eslint/no-floating-promises
promise.finally(() => this.promises.delete(promise))

Check failure on line 230 in plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts

View workflow job for this annotation

GitHub Actions / Code quality

Promises must be awaited, end with a call to .catch, end with a call to .then with a rejection handler or be explicitly marked as ignored with the `void` operator

return promise
Expand Down
4 changes: 1 addition & 3 deletions plugin-server/src/utils/token-bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ export const ConfiguredLimiter: Limiter = new Limiter(
defaultConfig.EVENT_OVERFLOW_BUCKET_REPLENISH_RATE
)

export const OverflowWarningLimiter: Limiter = new Limiter(1, 1.0 / 3600)

export const MessageSizeTooLargeWarningLimiter: Limiter = new Limiter(1, 1.0 / 300)
export const IngestionWarningLimiter: Limiter = new Limiter(1, 1.0 / 3600)

export const LoggingLimiter: Limiter = new Limiter(1, 1.0 / 60)
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export async function populateTeamDataStep(
if (event.team_id) {
// Check for an invalid UUID, which should be blocked by capture, when team_id is present
if (!UUID.validateString(event.uuid, false)) {
await captureIngestionWarning(db, event.team_id, 'skipping_event_invalid_uuid', {
await captureIngestionWarning(db.kafkaProducer, event.team_id, 'skipping_event_invalid_uuid', {
eventUuid: JSON.stringify(event.uuid),
})
throw new Error(`Not a valid UUID: "${event.uuid}"`)
Expand Down Expand Up @@ -70,7 +70,7 @@ export async function populateTeamDataStep(

// Check for an invalid UUID, which should be blocked by capture, when team_id is present
if (!UUID.validateString(event.uuid, false)) {
await captureIngestionWarning(db, team.id, 'skipping_event_invalid_uuid', {
await captureIngestionWarning(db.kafkaProducer, team.id, 'skipping_event_invalid_uuid', {
eventUuid: JSON.stringify(event.uuid),
})
throw new Error(`Not a valid UUID: "${event.uuid}"`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export async function prepareEventStep(runner: EventPipelineRunner, event: Plugi
const invalidTimestampCallback = function (type: string, details: Record<string, any>) {
invalidTimestampCounter.labels(type).inc()

tsParsingIngestionWarnings.push(captureIngestionWarning(runner.hub.db, team_id, type, details))
tsParsingIngestionWarnings.push(captureIngestionWarning(runner.hub.db.kafkaProducer, team_id, type, details))
}

const preIngestionEvent = await runner.hub.eventsProcessor.processEvent(
Expand Down
49 changes: 33 additions & 16 deletions plugin-server/src/worker/ingestion/person-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,19 +323,31 @@ export class PersonState {
return undefined
}
if (isDistinctIdIllegal(mergeIntoDistinctId)) {
await captureIngestionWarning(this.db, teamId, 'cannot_merge_with_illegal_distinct_id', {
illegalDistinctId: mergeIntoDistinctId,
otherDistinctId: otherPersonDistinctId,
eventUuid: this.event.uuid,
})
await captureIngestionWarning(
this.db.kafkaProducer,
teamId,
'cannot_merge_with_illegal_distinct_id',
{
illegalDistinctId: mergeIntoDistinctId,
otherDistinctId: otherPersonDistinctId,
eventUuid: this.event.uuid,
},
{ alwaysSend: true }
)
return undefined
}
if (isDistinctIdIllegal(otherPersonDistinctId)) {
await captureIngestionWarning(this.db, teamId, 'cannot_merge_with_illegal_distinct_id', {
illegalDistinctId: otherPersonDistinctId,
otherDistinctId: mergeIntoDistinctId,
eventUuid: this.event.uuid,
})
await captureIngestionWarning(
this.db.kafkaProducer,
teamId,
'cannot_merge_with_illegal_distinct_id',
{
illegalDistinctId: otherPersonDistinctId,
otherDistinctId: mergeIntoDistinctId,
eventUuid: this.event.uuid,
},
{ alwaysSend: true }
)
return undefined
}
return promiseRetry(
Expand Down Expand Up @@ -403,12 +415,17 @@ export class PersonState {

// If merge isn't allowed, we will ignore it, log an ingestion warning and exit
if (!mergeAllowed) {
// TODO: add event UUID to the ingestion warning
await captureIngestionWarning(this.db, this.teamId, 'cannot_merge_already_identified', {
sourcePersonDistinctId: otherPersonDistinctId,
targetPersonDistinctId: mergeIntoDistinctId,
eventUuid: this.event.uuid,
})
await captureIngestionWarning(
this.db.kafkaProducer,
this.teamId,
'cannot_merge_already_identified',
{
sourcePersonDistinctId: otherPersonDistinctId,
targetPersonDistinctId: mergeIntoDistinctId,
eventUuid: this.event.uuid,
},
{ alwaysSend: true }
)
status.warn('🤔', 'refused to merge an already identified user via an $identify or $create_alias call')
return mergeInto // We're returning the original person tied to distinct_id used for the event
}
Expand Down
11 changes: 4 additions & 7 deletions plugin-server/src/worker/ingestion/process-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import { MessageSizeTooLarge } from '../../utils/db/error'
import { KafkaProducerWrapper } from '../../utils/db/kafka-producer-wrapper'
import { safeClickhouseString, sanitizeEventName, timeoutGuard } from '../../utils/db/utils'
import { status } from '../../utils/status'
import { MessageSizeTooLargeWarningLimiter } from '../../utils/token-bucket'
import { castTimestampOrNow } from '../../utils/utils'
import { GroupTypeManager } from './group-type-manager'
import { addGroupProperties } from './groups'
Expand Down Expand Up @@ -240,12 +239,10 @@ export class EventsProcessor {
// Some messages end up significantly larger than the original
// after plugin processing, person & group enrichment, etc.
if (error instanceof MessageSizeTooLarge) {
if (MessageSizeTooLargeWarningLimiter.consume(`${teamId}`, 1)) {
await captureIngestionWarning(this.db, teamId, 'message_size_too_large', {
eventUuid: uuid,
distinctId: distinctId,
})
}
await captureIngestionWarning(this.db.kafkaProducer, teamId, 'message_size_too_large', {
eventUuid: uuid,
distinctId: distinctId,
})
} else {
throw error
}
Expand Down
Loading

0 comments on commit 88a2706

Please sign in to comment.