Skip to content

Commit

Permalink
chore(plugin-server): move runner process.env calculations to Hub (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner authored Oct 13, 2023
1 parent 5813380 commit 63b6736
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 17 deletions.
2 changes: 2 additions & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ export function getDefaultConfig(): PluginsServerConfig {
USE_KAFKA_FOR_SCHEDULED_TASKS: true,
CLOUD_DEPLOYMENT: null,
EXTERNAL_REQUEST_TIMEOUT_MS: 10 * 1000, // 10 seconds
DROP_EVENTS_BY_TOKEN_DISTINCT_ID: '',
POE_EMBRACE_JOIN_FOR_TEAMS: '',

STARTUP_PROFILE_DURATION_SECONDS: 300, // 5 minutes
STARTUP_PROFILE_CPU: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ export async function eachMessageAppsOnEventHandlers(
if (pluginConfigs) {
// Elements parsing can be extremely slow, so we skip it for some plugins
// # SKIP_ELEMENTS_PARSING_PLUGINS
const skipElementsChain = pluginConfigs.every(
(pluginConfig) =>
queue.pluginsServer.pluginConfigsToSkipElementsParsing &&
queue.pluginsServer.pluginConfigsToSkipElementsParsing(pluginConfig.plugin_id)
const skipElementsChain = pluginConfigs.every((pluginConfig) =>
queue.pluginsServer.pluginConfigsToSkipElementsParsing?.(pluginConfig.plugin_id)
)

const event = convertToIngestionEvent(clickHouseEvent, skipElementsChain)
Expand Down
5 changes: 5 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ export interface PluginsServerConfig {
/** Label of the PostHog Cloud environment. Null if not running PostHog Cloud. @example 'US' */
CLOUD_DEPLOYMENT: string | null
EXTERNAL_REQUEST_TIMEOUT_MS: number
DROP_EVENTS_BY_TOKEN_DISTINCT_ID: string
POE_EMBRACE_JOIN_FOR_TEAMS: string

// dump profiles to disk, covering the first N seconds of runtime
STARTUP_PROFILE_DURATION_SECONDS: number
Expand Down Expand Up @@ -275,6 +277,9 @@ export interface Hub extends PluginsServerConfig {
enqueuePluginJob: (job: EnqueuedPluginJob) => Promise<void>
// ValueMatchers used for various opt-in/out features
pluginConfigsToSkipElementsParsing: ValueMatcher<number>
poeEmbraceJoinForTeams: ValueMatcher<number>
// lookups
eventsToDropByToken: Map<string, string[]>
}

export interface PluginServerCapabilities {
Expand Down
13 changes: 13 additions & 0 deletions plugin-server/src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@ export async function createKafkaProducerWrapper(serverConfig: PluginsServerConf
return new KafkaProducerWrapper(producer)
}

export function createEventsToDropByToken(eventsToDropByTokenStr?: string): Map<string, string[]> {
const eventsToDropByToken: Map<string, string[]> = new Map()
if (eventsToDropByTokenStr) {
eventsToDropByTokenStr.split(',').forEach((pair) => {
const [token, distinctID] = pair.split(':')
eventsToDropByToken.set(token, [...(eventsToDropByToken.get(token) || []), distinctID])
})
}
return eventsToDropByToken
}

export async function createHub(
config: Partial<PluginsServerConfig> = {},
threadId: number | null = null,
Expand Down Expand Up @@ -184,6 +195,8 @@ export async function createHub(
conversionBufferEnabledTeams,
fetchHostnameGuardTeams,
pluginConfigsToSkipElementsParsing: buildIntegerMatcher(process.env.SKIP_ELEMENTS_PARSING_PLUGINS, true),
poeEmbraceJoinForTeams: buildIntegerMatcher(process.env.POE_EMBRACE_JOIN_FOR_TEAMS, true),
eventsToDropByToken: createEventsToDropByToken(process.env.DROP_EVENTS_BY_TOKEN_DISTINCT_ID),
}

// :TODO: This is only used on worker threads, not main
Expand Down
14 changes: 2 additions & 12 deletions plugin-server/src/worker/ingestion/event-pipeline/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ export class EventPipelineRunner {
// See https://docs.google.com/document/d/12Q1KcJ41TicIwySCfNJV5ZPKXWVtxT7pzpB3r9ivz_0
poEEmbraceJoin: boolean
private delayAcks: boolean
private eventsToDropByToken: Map<string, string[]>

constructor(hub: Hub, originalEvent: PipelineEvent | ProcessedPluginEvent, poEEmbraceJoin = false) {
this.hub = hub
Expand All @@ -74,12 +73,6 @@ export class EventPipelineRunner {

// TODO: remove after successful rollout
this.delayAcks = stringToBoolean(process.env.INGESTION_DELAY_WRITE_ACKS)

this.eventsToDropByToken = new Map()
process.env.DROP_EVENTS_BY_TOKEN_DISTINCT_ID?.split(',').forEach((pair) => {
const [token, distinctID] = pair.split(':')
this.eventsToDropByToken.set(token, [...(this.eventsToDropByToken.get(token) || []), distinctID])
})
}

isEventBlacklisted(event: PipelineEvent): boolean {
Expand All @@ -89,7 +82,7 @@ export class EventPipelineRunner {
if (!key) {
return false // for safety don't drop events here, they are later dropped in teamDataPopulation
}
const dropIds = this.eventsToDropByToken.get(key)
const dropIds = this.hub.eventsToDropByToken?.get(key)
return dropIds?.includes(event.distinct_id) || dropIds?.includes('*') || false
}

Expand Down Expand Up @@ -131,10 +124,7 @@ export class EventPipelineRunner {
}

async runEventPipelineSteps(event: PluginEvent): Promise<EventPipelineResult> {
if (
process.env.POE_EMBRACE_JOIN_FOR_TEAMS === '*' ||
process.env.POE_EMBRACE_JOIN_FOR_TEAMS?.split(',').includes(event.team_id.toString())
) {
if (this.hub.poeEmbraceJoinForTeams?.(event.team_id)) {
// https://docs.google.com/document/d/12Q1KcJ41TicIwySCfNJV5ZPKXWVtxT7pzpB3r9ivz_0
// We're not using the buffer anymore
// instead we'll (if within timeframe) merge into the newer personId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { PluginEvent } from '@posthog/plugin-scaffold'
import { DateTime } from 'luxon'

import { ISOTimestamp, Person, PipelineEvent, PreIngestionEvent } from '../../../../src/types'
import { createEventsToDropByToken } from '../../../../src/utils/db/hub'
import { createEventStep } from '../../../../src/worker/ingestion/event-pipeline/createEventStep'
import { pluginsProcessEventStep } from '../../../../src/worker/ingestion/event-pipeline/pluginsProcessEventStep'
import { populateTeamDataStep } from '../../../../src/worker/ingestion/event-pipeline/populateTeamDataStep'
Expand Down Expand Up @@ -91,8 +92,8 @@ describe('EventPipelineRunner', () => {
increment: jest.fn(),
timing: jest.fn(),
},
eventsToDropByToken: createEventsToDropByToken('drop_token:drop_id,drop_token_all:*'),
}
process.env.DROP_EVENTS_BY_TOKEN_DISTINCT_ID = 'drop_token:drop_id,drop_token_all:*'
runner = new TestEventPipelineRunner(hub, pluginEvent)

jest.mocked(populateTeamDataStep).mockResolvedValue(pluginEvent)
Expand Down

0 comments on commit 63b6736

Please sign in to comment.