Skip to content

Commit

Permalink
chore: Remove promise manager usage from ingestion warnings & db (#17650
Browse files Browse the repository at this point in the history
)
  • Loading branch information
tiina303 authored Oct 9, 2023
1 parent 04df7c0 commit a9eb17a
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,11 @@ export async function eachBatchLegacyIngestion(
const team = await queue.pluginsServer.teamManager.getTeamForEvent(currentBatch[0].pluginEvent)
const distinct_id = currentBatch[0].pluginEvent.distinct_id
if (team && WarningLimiter.consume(`${team.id}:${distinct_id}`, 1)) {
captureIngestionWarning(queue.pluginsServer.db, team.id, 'ingestion_capacity_overflow', {
overflowDistinctId: distinct_id,
})
processingPromises.push(
captureIngestionWarning(queue.pluginsServer.db, team.id, 'ingestion_capacity_overflow', {
overflowDistinctId: distinct_id,
})
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,11 @@ export async function eachBatchParallelIngestion(
const team = await queue.pluginsServer.teamManager.getTeamForEvent(currentBatch[0].pluginEvent)
const distinct_id = currentBatch[0].pluginEvent.distinct_id
if (team && WarningLimiter.consume(`${team.id}:${distinct_id}`, 1)) {
captureIngestionWarning(queue.pluginsServer.db, team.id, 'ingestion_capacity_overflow', {
overflowDistinctId: distinct_id,
})
processingPromises.push(
captureIngestionWarning(queue.pluginsServer.db, team.id, 'ingestion_capacity_overflow', {
overflowDistinctId: distinct_id,
})
)
}
}

Expand Down
6 changes: 0 additions & 6 deletions plugin-server/src/utils/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ import {
UUIDT,
} from '../utils'
import { OrganizationPluginsAccessLevel } from './../../types'
import { PromiseManager } from './../../worker/vm/promise-manager'
import { KafkaProducerWrapper } from './kafka-producer-wrapper'
import { PostgresRouter, PostgresUse, TransactionClient } from './postgres'
import {
Expand Down Expand Up @@ -165,16 +164,12 @@ export class DB {
/** How many seconds to keep person info in Redis cache */
PERSONS_AND_GROUPS_CACHE_TTL: number

/** PromiseManager instance to keep track of voided promises */
promiseManager: PromiseManager

constructor(
postgres: PostgresRouter,
redisPool: GenericPool<Redis.Redis>,
kafkaProducer: KafkaProducerWrapper,
clickhouse: ClickHouse,
statsd: StatsD | undefined,
promiseManager: PromiseManager,
personAndGroupsCacheTtl = 1
) {
this.postgres = postgres
Expand All @@ -183,7 +178,6 @@ export class DB {
this.clickhouse = clickhouse
this.statsd = statsd
this.PERSONS_AND_GROUPS_CACHE_TTL = personAndGroupsCacheTtl
this.promiseManager = promiseManager
}

// ClickHouse
Expand Down
10 changes: 1 addition & 9 deletions plugin-server/src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,7 @@ export async function createHub(

const promiseManager = new PromiseManager(serverConfig, statsd)

const db = new DB(
postgres,
redisPool,
kafkaProducer,
clickhouse,
statsd,
promiseManager,
serverConfig.PERSON_INFO_CACHE_TTL
)
const db = new DB(postgres, redisPool, kafkaProducer, clickhouse, statsd, serverConfig.PERSON_INFO_CACHE_TTL)
const teamManager = new TeamManager(postgres, serverConfig, statsd)
const organizationManager = new OrganizationManager(postgres, teamManager)
const pluginsApiKeyManager = new PluginsApiKeyManager(db)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,22 @@ import { EventPipelineRunner } from './runner'

export async function prepareEventStep(runner: EventPipelineRunner, event: PluginEvent): Promise<PreIngestionEvent> {
const { team_id, uuid } = event
const tsParsingIngestionWarnings: Promise<void>[] = []
const invalidTimestampCallback = function (type: string, details: Record<string, any>) {
// TODO: make that metric name more generic when transitionning to prometheus
runner.hub.statsd?.increment('process_event_invalid_timestamp', { teamId: String(team_id), type: type })

captureIngestionWarning(runner.hub.db, team_id, type, details)
tsParsingIngestionWarnings.push(captureIngestionWarning(runner.hub.db, team_id, type, details))
}

const preIngestionEvent = await runner.hub.eventsProcessor.processEvent(
String(event.distinct_id),
event,
team_id,
parseEventTimestamp(event, invalidTimestampCallback),
uuid! // it will throw if it's undefined,
)
await Promise.all(tsParsingIngestionWarnings)

return preIngestionEvent
}
6 changes: 3 additions & 3 deletions plugin-server/src/worker/ingestion/person-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ export class PersonState {
}
if (isDistinctIdIllegal(mergeIntoDistinctId)) {
this.statsd?.increment('illegal_distinct_ids.total', { distinctId: mergeIntoDistinctId })
captureIngestionWarning(this.db, teamId, 'cannot_merge_with_illegal_distinct_id', {
await captureIngestionWarning(this.db, teamId, 'cannot_merge_with_illegal_distinct_id', {
illegalDistinctId: mergeIntoDistinctId,
otherDistinctId: otherPersonDistinctId,
eventUuid: this.event.uuid,
Expand All @@ -335,7 +335,7 @@ export class PersonState {
}
if (isDistinctIdIllegal(otherPersonDistinctId)) {
this.statsd?.increment('illegal_distinct_ids.total', { distinctId: otherPersonDistinctId })
captureIngestionWarning(this.db, teamId, 'cannot_merge_with_illegal_distinct_id', {
await captureIngestionWarning(this.db, teamId, 'cannot_merge_with_illegal_distinct_id', {
illegalDistinctId: otherPersonDistinctId,
otherDistinctId: mergeIntoDistinctId,
eventUuid: this.event.uuid,
Expand Down Expand Up @@ -421,7 +421,7 @@ export class PersonState {
// If merge isn't allowed, we will ignore it, log an ingestion warning and exit
if (!mergeAllowed) {
// TODO: add event UUID to the ingestion warning
captureIngestionWarning(this.db, this.teamId, 'cannot_merge_already_identified', {
await captureIngestionWarning(this.db, this.teamId, 'cannot_merge_already_identified', {
sourcePersonDistinctId: otherPersonDistinctId,
targetPersonDistinctId: mergeIntoDistinctId,
eventUuid: this.event.uuid,
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/src/worker/ingestion/process-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export class EventsProcessor {
eventUuid: string
): Promise<PreIngestionEvent> {
if (!UUID.validateString(eventUuid, false)) {
captureIngestionWarning(this.db, teamId, 'skipping_event_invalid_uuid', {
await captureIngestionWarning(this.db, teamId, 'skipping_event_invalid_uuid', {
eventUuid: JSON.stringify(eventUuid),
})
throw new Error(`Not a valid UUID: "${eventUuid}"`)
Expand Down
33 changes: 15 additions & 18 deletions plugin-server/src/worker/ingestion/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,22 +61,19 @@ export function generateEventDeadLetterQueueMessage(
// These get displayed under Data Management > Ingestion Warnings
// These warnings get displayed to end users. Make sure these errors are actionable and useful for them and
// also update IngestionWarningsView.tsx to display useful context.
export function captureIngestionWarning(db: DB, teamId: TeamId, type: string, details: Record<string, any>) {
db.promiseManager.trackPromise(
db.kafkaProducer.queueMessage({
topic: KAFKA_INGESTION_WARNINGS,
messages: [
{
value: JSON.stringify({
team_id: teamId,
type: type,
source: 'plugin-server',
details: JSON.stringify(details),
timestamp: castTimestampOrNow(null, TimestampFormat.ClickHouse),
}),
},
],
}),
'ingestion_warning'
)
export async function captureIngestionWarning(db: DB, teamId: TeamId, type: string, details: Record<string, any>) {
await db.kafkaProducer.queueMessage({
topic: KAFKA_INGESTION_WARNINGS,
messages: [
{
value: JSON.stringify({
team_id: teamId,
type: type,
source: 'plugin-server',
details: JSON.stringify(details),
timestamp: castTimestampOrNow(null, TimestampFormat.ClickHouse),
}),
},
],
})
}
2 changes: 1 addition & 1 deletion plugin-server/tests/worker/ingestion/utils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ describe('captureIngestionWarning()', () => {
}

it('can read own writes', async () => {
captureIngestionWarning(hub.db, 2, 'some_type', { foo: 'bar' })
await captureIngestionWarning(hub.db, 2, 'some_type', { foo: 'bar' })
await hub.promiseManager.awaitPromisesIfNeeded()

const warnings = await delayUntilEventIngested(fetchWarnings)
Expand Down

0 comments on commit a9eb17a

Please sign in to comment.