diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index 4caeabb38dde0..910a2c85689f7 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -130,6 +130,8 @@ export function getDefaultConfig(): PluginsServerConfig { USE_KAFKA_FOR_SCHEDULED_TASKS: true, CLOUD_DEPLOYMENT: null, EXTERNAL_REQUEST_TIMEOUT_MS: 10 * 1000, // 10 seconds + DROP_EVENTS_BY_TOKEN_DISTINCT_ID: '', + POE_EMBRACE_JOIN_FOR_TEAMS: '', STARTUP_PROFILE_DURATION_SECONDS: 300, // 5 minutes STARTUP_PROFILE_CPU: false, diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-onevent.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-onevent.ts index 8bebd169d9bca..0277757ff53b9 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-onevent.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-onevent.ts @@ -21,10 +21,8 @@ export async function eachMessageAppsOnEventHandlers( if (pluginConfigs) { // Elements parsing can be extremely slow, so we skip it for some plugins // # SKIP_ELEMENTS_PARSING_PLUGINS - const skipElementsChain = pluginConfigs.every( - (pluginConfig) => - queue.pluginsServer.pluginConfigsToSkipElementsParsing && - queue.pluginsServer.pluginConfigsToSkipElementsParsing(pluginConfig.plugin_id) + const skipElementsChain = pluginConfigs.every((pluginConfig) => + queue.pluginsServer.pluginConfigsToSkipElementsParsing?.(pluginConfig.plugin_id) ) const event = convertToIngestionEvent(clickHouseEvent, skipElementsChain) diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index e9700a4786276..7c99819524b25 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -202,6 +202,8 @@ export interface PluginsServerConfig { /** Label of the PostHog Cloud environment. Null if not running PostHog Cloud. @example 'US' */ CLOUD_DEPLOYMENT: string | null EXTERNAL_REQUEST_TIMEOUT_MS: number + DROP_EVENTS_BY_TOKEN_DISTINCT_ID: string + POE_EMBRACE_JOIN_FOR_TEAMS: string // dump profiles to disk, covering the first N seconds of runtime STARTUP_PROFILE_DURATION_SECONDS: number @@ -275,6 +277,9 @@ export interface Hub extends PluginsServerConfig { enqueuePluginJob: (job: EnqueuedPluginJob) => Promise // ValueMatchers used for various opt-in/out features pluginConfigsToSkipElementsParsing: ValueMatcher + poeEmbraceJoinForTeams: ValueMatcher + // lookups + eventsToDropByToken: Map } export interface PluginServerCapabilities { diff --git a/plugin-server/src/utils/db/hub.ts b/plugin-server/src/utils/db/hub.ts index 5b035f1dca06f..79d6c56dadd36 100644 --- a/plugin-server/src/utils/db/hub.ts +++ b/plugin-server/src/utils/db/hub.ts @@ -58,6 +58,17 @@ export async function createKafkaProducerWrapper(serverConfig: PluginsServerConf return new KafkaProducerWrapper(producer) } +export function createEventsToDropByToken(eventsToDropByTokenStr?: string): Map { + const eventsToDropByToken: Map = new Map() + if (eventsToDropByTokenStr) { + eventsToDropByTokenStr.split(',').forEach((pair) => { + const [token, distinctID] = pair.split(':') + eventsToDropByToken.set(token, [...(eventsToDropByToken.get(token) || []), distinctID]) + }) + } + return eventsToDropByToken +} + export async function createHub( config: Partial = {}, threadId: number | null = null, @@ -184,6 +195,8 @@ export async function createHub( conversionBufferEnabledTeams, fetchHostnameGuardTeams, pluginConfigsToSkipElementsParsing: buildIntegerMatcher(process.env.SKIP_ELEMENTS_PARSING_PLUGINS, true), + poeEmbraceJoinForTeams: buildIntegerMatcher(process.env.POE_EMBRACE_JOIN_FOR_TEAMS, true), + eventsToDropByToken: createEventsToDropByToken(process.env.DROP_EVENTS_BY_TOKEN_DISTINCT_ID), } // :TODO: This is only used on worker threads, not main diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts index a72ce7579ddec..fde36de792369 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts @@ -65,7 +65,6 @@ export class EventPipelineRunner { // See https://docs.google.com/document/d/12Q1KcJ41TicIwySCfNJV5ZPKXWVtxT7pzpB3r9ivz_0 poEEmbraceJoin: boolean private delayAcks: boolean - private eventsToDropByToken: Map constructor(hub: Hub, originalEvent: PipelineEvent | ProcessedPluginEvent, poEEmbraceJoin = false) { this.hub = hub @@ -74,12 +73,6 @@ export class EventPipelineRunner { // TODO: remove after successful rollout this.delayAcks = stringToBoolean(process.env.INGESTION_DELAY_WRITE_ACKS) - - this.eventsToDropByToken = new Map() - process.env.DROP_EVENTS_BY_TOKEN_DISTINCT_ID?.split(',').forEach((pair) => { - const [token, distinctID] = pair.split(':') - this.eventsToDropByToken.set(token, [...(this.eventsToDropByToken.get(token) || []), distinctID]) - }) } isEventBlacklisted(event: PipelineEvent): boolean { @@ -89,7 +82,7 @@ export class EventPipelineRunner { if (!key) { return false // for safety don't drop events here, they are later dropped in teamDataPopulation } - const dropIds = this.eventsToDropByToken.get(key) + const dropIds = this.hub.eventsToDropByToken?.get(key) return dropIds?.includes(event.distinct_id) || dropIds?.includes('*') || false } @@ -131,10 +124,7 @@ export class EventPipelineRunner { } async runEventPipelineSteps(event: PluginEvent): Promise { - if ( - process.env.POE_EMBRACE_JOIN_FOR_TEAMS === '*' || - process.env.POE_EMBRACE_JOIN_FOR_TEAMS?.split(',').includes(event.team_id.toString()) - ) { + if (this.hub.poeEmbraceJoinForTeams?.(event.team_id)) { // https://docs.google.com/document/d/12Q1KcJ41TicIwySCfNJV5ZPKXWVtxT7pzpB3r9ivz_0 // We're not using the buffer anymore // instead we'll (if within timeframe) merge into the newer personId diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts index a48e743c31b48..7e4f0f5f5e266 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts @@ -2,6 +2,7 @@ import { PluginEvent } from '@posthog/plugin-scaffold' import { DateTime } from 'luxon' import { ISOTimestamp, Person, PipelineEvent, PreIngestionEvent } from '../../../../src/types' +import { createEventsToDropByToken } from '../../../../src/utils/db/hub' import { createEventStep } from '../../../../src/worker/ingestion/event-pipeline/createEventStep' import { pluginsProcessEventStep } from '../../../../src/worker/ingestion/event-pipeline/pluginsProcessEventStep' import { populateTeamDataStep } from '../../../../src/worker/ingestion/event-pipeline/populateTeamDataStep' @@ -91,8 +92,8 @@ describe('EventPipelineRunner', () => { increment: jest.fn(), timing: jest.fn(), }, + eventsToDropByToken: createEventsToDropByToken('drop_token:drop_id,drop_token_all:*'), } - process.env.DROP_EVENTS_BY_TOKEN_DISTINCT_ID = 'drop_token:drop_id,drop_token_all:*' runner = new TestEventPipelineRunner(hub, pluginEvent) jest.mocked(populateTeamDataStep).mockResolvedValue(pluginEvent)