diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion-kafkajs.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion-kafkajs.ts index c8d6da502f73a..6be2d9e988346 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion-kafkajs.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion-kafkajs.ts @@ -91,9 +91,11 @@ export async function eachBatchLegacyIngestion( const team = await queue.pluginsServer.teamManager.getTeamForEvent(currentBatch[0].pluginEvent) const distinct_id = currentBatch[0].pluginEvent.distinct_id if (team && WarningLimiter.consume(`${team.id}:${distinct_id}`, 1)) { - captureIngestionWarning(queue.pluginsServer.db, team.id, 'ingestion_capacity_overflow', { - overflowDistinctId: distinct_id, - }) + processingPromises.push( + captureIngestionWarning(queue.pluginsServer.db, team.id, 'ingestion_capacity_overflow', { + overflowDistinctId: distinct_id, + }) + ) } } diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts index 199108bdf9d28..97f4c36420eb4 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts @@ -90,9 +90,11 @@ export async function eachBatchParallelIngestion( const team = await queue.pluginsServer.teamManager.getTeamForEvent(currentBatch[0].pluginEvent) const distinct_id = currentBatch[0].pluginEvent.distinct_id if (team && WarningLimiter.consume(`${team.id}:${distinct_id}`, 1)) { - captureIngestionWarning(queue.pluginsServer.db, team.id, 'ingestion_capacity_overflow', { - overflowDistinctId: distinct_id, - }) + processingPromises.push( + captureIngestionWarning(queue.pluginsServer.db, team.id, 'ingestion_capacity_overflow', { + overflowDistinctId: distinct_id, + }) + ) } } diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 83cf936a3b713..8b1bb5024a346 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -662,7 +662,6 @@ interface BaseIngestionEvent { export interface PreIngestionEvent { eventUuid: string event: string - ip: string | null teamId: TeamId distinctId: string properties: Properties diff --git a/plugin-server/src/utils/db/db.ts b/plugin-server/src/utils/db/db.ts index a5873a7a8b8ec..33c384e28fb03 100644 --- a/plugin-server/src/utils/db/db.ts +++ b/plugin-server/src/utils/db/db.ts @@ -66,7 +66,6 @@ import { UUIDT, } from '../utils' import { OrganizationPluginsAccessLevel } from './../../types' -import { PromiseManager } from './../../worker/vm/promise-manager' import { KafkaProducerWrapper } from './kafka-producer-wrapper' import { PostgresRouter, PostgresUse, TransactionClient } from './postgres' import { @@ -165,16 +164,12 @@ export class DB { /** How many seconds to keep person info in Redis cache */ PERSONS_AND_GROUPS_CACHE_TTL: number - /** PromiseManager instance to keep track of voided promises */ - promiseManager: PromiseManager - constructor( postgres: PostgresRouter, redisPool: GenericPool, kafkaProducer: KafkaProducerWrapper, clickhouse: ClickHouse, statsd: StatsD | undefined, - promiseManager: PromiseManager, personAndGroupsCacheTtl = 1 ) { this.postgres = postgres @@ -183,7 +178,6 @@ export class DB { this.clickhouse = clickhouse this.statsd = statsd this.PERSONS_AND_GROUPS_CACHE_TTL = personAndGroupsCacheTtl - this.promiseManager = promiseManager } // ClickHouse diff --git a/plugin-server/src/utils/db/hub.ts b/plugin-server/src/utils/db/hub.ts index 6673cda368c11..dfcda8b8328bc 100644 --- a/plugin-server/src/utils/db/hub.ts +++ b/plugin-server/src/utils/db/hub.ts @@ -129,15 +129,7 @@ export async function createHub( const promiseManager = new PromiseManager(serverConfig, statsd) - const db = new DB( - postgres, - redisPool, - kafkaProducer, - clickhouse, - statsd, - promiseManager, - serverConfig.PERSON_INFO_CACHE_TTL - ) + const db = new DB(postgres, redisPool, kafkaProducer, clickhouse, statsd, serverConfig.PERSON_INFO_CACHE_TTL) const teamManager = new TeamManager(postgres, serverConfig, statsd) const organizationManager = new OrganizationManager(postgres, teamManager) const pluginsApiKeyManager = new PluginsApiKeyManager(db) diff --git a/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts index d44bb440b3c4a..e51fa5df4a477 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts @@ -7,12 +7,14 @@ import { EventPipelineRunner } from './runner' export async function prepareEventStep(runner: EventPipelineRunner, event: PluginEvent): Promise { const { team_id, uuid } = event + const tsParsingIngestionWarnings: Promise[] = [] const invalidTimestampCallback = function (type: string, details: Record) { // TODO: make that metric name more generic when transitionning to prometheus runner.hub.statsd?.increment('process_event_invalid_timestamp', { teamId: String(team_id), type: type }) - captureIngestionWarning(runner.hub.db, team_id, type, details) + tsParsingIngestionWarnings.push(captureIngestionWarning(runner.hub.db, team_id, type, details)) } + const preIngestionEvent = await runner.hub.eventsProcessor.processEvent( String(event.distinct_id), event, @@ -20,6 +22,7 @@ export async function prepareEventStep(runner: EventPipelineRunner, event: Plugi parseEventTimestamp(event, invalidTimestampCallback), uuid! // it will throw if it's undefined, ) + await Promise.all(tsParsingIngestionWarnings) return preIngestionEvent } diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index 2bbd3bb4dd2dd..25d83b5e21083 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -321,7 +321,7 @@ export class PersonState { } if (isDistinctIdIllegal(mergeIntoDistinctId)) { this.statsd?.increment('illegal_distinct_ids.total', { distinctId: mergeIntoDistinctId }) - captureIngestionWarning(this.db, teamId, 'cannot_merge_with_illegal_distinct_id', { + await captureIngestionWarning(this.db, teamId, 'cannot_merge_with_illegal_distinct_id', { illegalDistinctId: mergeIntoDistinctId, otherDistinctId: otherPersonDistinctId, eventUuid: this.event.uuid, @@ -330,7 +330,7 @@ export class PersonState { } if (isDistinctIdIllegal(otherPersonDistinctId)) { this.statsd?.increment('illegal_distinct_ids.total', { distinctId: otherPersonDistinctId }) - captureIngestionWarning(this.db, teamId, 'cannot_merge_with_illegal_distinct_id', { + await captureIngestionWarning(this.db, teamId, 'cannot_merge_with_illegal_distinct_id', { illegalDistinctId: otherPersonDistinctId, otherDistinctId: mergeIntoDistinctId, eventUuid: this.event.uuid, @@ -416,7 +416,7 @@ export class PersonState { // If merge isn't allowed, we will ignore it, log an ingestion warning and exit if (!mergeAllowed) { // TODO: add event UUID to the ingestion warning - captureIngestionWarning(this.db, this.teamId, 'cannot_merge_already_identified', { + await captureIngestionWarning(this.db, this.teamId, 'cannot_merge_already_identified', { sourcePersonDistinctId: otherPersonDistinctId, targetPersonDistinctId: mergeIntoDistinctId, eventUuid: this.event.uuid, diff --git a/plugin-server/src/worker/ingestion/process-event.ts b/plugin-server/src/worker/ingestion/process-event.ts index 4b0397c66d792..f378c0e6c1770 100644 --- a/plugin-server/src/worker/ingestion/process-event.ts +++ b/plugin-server/src/worker/ingestion/process-event.ts @@ -67,7 +67,7 @@ export class EventsProcessor { eventUuid: string ): Promise { if (!UUID.validateString(eventUuid, false)) { - captureIngestionWarning(this.db, teamId, 'skipping_event_invalid_uuid', { + await captureIngestionWarning(this.db, teamId, 'skipping_event_invalid_uuid', { eventUuid: JSON.stringify(eventUuid), }) throw new Error(`Not a valid UUID: "${eventUuid}"`) diff --git a/plugin-server/src/worker/ingestion/utils.ts b/plugin-server/src/worker/ingestion/utils.ts index 9a80d578eb1e8..affc100370afa 100644 --- a/plugin-server/src/worker/ingestion/utils.ts +++ b/plugin-server/src/worker/ingestion/utils.ts @@ -61,22 +61,19 @@ export function generateEventDeadLetterQueueMessage( // These get displayed under Data Management > Ingestion Warnings // These warnings get displayed to end users. Make sure these errors are actionable and useful for them and // also update IngestionWarningsView.tsx to display useful context. -export function captureIngestionWarning(db: DB, teamId: TeamId, type: string, details: Record) { - db.promiseManager.trackPromise( - db.kafkaProducer.queueMessage({ - topic: KAFKA_INGESTION_WARNINGS, - messages: [ - { - value: JSON.stringify({ - team_id: teamId, - type: type, - source: 'plugin-server', - details: JSON.stringify(details), - timestamp: castTimestampOrNow(null, TimestampFormat.ClickHouse), - }), - }, - ], - }), - 'ingestion_warning' - ) +export async function captureIngestionWarning(db: DB, teamId: TeamId, type: string, details: Record) { + await db.kafkaProducer.queueMessage({ + topic: KAFKA_INGESTION_WARNINGS, + messages: [ + { + value: JSON.stringify({ + team_id: teamId, + type: type, + source: 'plugin-server', + details: JSON.stringify(details), + timestamp: castTimestampOrNow(null, TimestampFormat.ClickHouse), + }), + }, + ], + }) } diff --git a/plugin-server/tests/worker/ingestion/utils.test.ts b/plugin-server/tests/worker/ingestion/utils.test.ts index e289449c46e42..3e65b0964595d 100644 --- a/plugin-server/tests/worker/ingestion/utils.test.ts +++ b/plugin-server/tests/worker/ingestion/utils.test.ts @@ -24,7 +24,7 @@ describe('captureIngestionWarning()', () => { } it('can read own writes', async () => { - captureIngestionWarning(hub.db, 2, 'some_type', { foo: 'bar' }) + await captureIngestionWarning(hub.db, 2, 'some_type', { foo: 'bar' }) await hub.promiseManager.awaitPromisesIfNeeded() const warnings = await delayUntilEventIngested(fetchWarnings)