Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(plugin-server): move runner process.env calculations to Hub #17933

Merged
merged 1 commit into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ export function getDefaultConfig(): PluginsServerConfig {
USE_KAFKA_FOR_SCHEDULED_TASKS: true,
CLOUD_DEPLOYMENT: null,
EXTERNAL_REQUEST_TIMEOUT_MS: 10 * 1000, // 10 seconds
DROP_EVENTS_BY_TOKEN_DISTINCT_ID: '',
POE_EMBRACE_JOIN_FOR_TEAMS: '',

STARTUP_PROFILE_DURATION_SECONDS: 300, // 5 minutes
STARTUP_PROFILE_CPU: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ export async function eachMessageAppsOnEventHandlers(
if (pluginConfigs) {
// Elements parsing can be extremely slow, so we skip it for some plugins
// # SKIP_ELEMENTS_PARSING_PLUGINS
const skipElementsChain = pluginConfigs.every(
(pluginConfig) =>
queue.pluginsServer.pluginConfigsToSkipElementsParsing &&
queue.pluginsServer.pluginConfigsToSkipElementsParsing(pluginConfig.plugin_id)
const skipElementsChain = pluginConfigs.every((pluginConfig) =>
queue.pluginsServer.pluginConfigsToSkipElementsParsing?.(pluginConfig.plugin_id)
)

const event = convertToIngestionEvent(clickHouseEvent, skipElementsChain)
Expand Down
5 changes: 5 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ export interface PluginsServerConfig {
/** Label of the PostHog Cloud environment. Null if not running PostHog Cloud. @example 'US' */
CLOUD_DEPLOYMENT: string | null
EXTERNAL_REQUEST_TIMEOUT_MS: number
DROP_EVENTS_BY_TOKEN_DISTINCT_ID: string
POE_EMBRACE_JOIN_FOR_TEAMS: string

// dump profiles to disk, covering the first N seconds of runtime
STARTUP_PROFILE_DURATION_SECONDS: number
Expand Down Expand Up @@ -275,6 +277,9 @@ export interface Hub extends PluginsServerConfig {
enqueuePluginJob: (job: EnqueuedPluginJob) => Promise<void>
// ValueMatchers used for various opt-in/out features
pluginConfigsToSkipElementsParsing: ValueMatcher<number>
poeEmbraceJoinForTeams: ValueMatcher<number>
// lookups
eventsToDropByToken: Map<string, string[]>
}

export interface PluginServerCapabilities {
Expand Down
13 changes: 13 additions & 0 deletions plugin-server/src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@ export async function createKafkaProducerWrapper(serverConfig: PluginsServerConf
return new KafkaProducerWrapper(producer)
}

export function createEventsToDropByToken(eventsToDropByTokenStr?: string): Map<string, string[]> {
const eventsToDropByToken: Map<string, string[]> = new Map()
if (eventsToDropByTokenStr) {
eventsToDropByTokenStr.split(',').forEach((pair) => {
const [token, distinctID] = pair.split(':')
eventsToDropByToken.set(token, [...(eventsToDropByToken.get(token) || []), distinctID])
})
}
return eventsToDropByToken
}

export async function createHub(
config: Partial<PluginsServerConfig> = {},
threadId: number | null = null,
Expand Down Expand Up @@ -184,6 +195,8 @@ export async function createHub(
conversionBufferEnabledTeams,
fetchHostnameGuardTeams,
pluginConfigsToSkipElementsParsing: buildIntegerMatcher(process.env.SKIP_ELEMENTS_PARSING_PLUGINS, true),
poeEmbraceJoinForTeams: buildIntegerMatcher(process.env.POE_EMBRACE_JOIN_FOR_TEAMS, true),
eventsToDropByToken: createEventsToDropByToken(process.env.DROP_EVENTS_BY_TOKEN_DISTINCT_ID),
}

// :TODO: This is only used on worker threads, not main
Expand Down
14 changes: 2 additions & 12 deletions plugin-server/src/worker/ingestion/event-pipeline/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ export class EventPipelineRunner {
// See https://docs.google.com/document/d/12Q1KcJ41TicIwySCfNJV5ZPKXWVtxT7pzpB3r9ivz_0
poEEmbraceJoin: boolean
private delayAcks: boolean
private eventsToDropByToken: Map<string, string[]>

constructor(hub: Hub, originalEvent: PipelineEvent | ProcessedPluginEvent, poEEmbraceJoin = false) {
this.hub = hub
Expand All @@ -74,12 +73,6 @@ export class EventPipelineRunner {

// TODO: remove after successful rollout
this.delayAcks = stringToBoolean(process.env.INGESTION_DELAY_WRITE_ACKS)

this.eventsToDropByToken = new Map()
process.env.DROP_EVENTS_BY_TOKEN_DISTINCT_ID?.split(',').forEach((pair) => {
const [token, distinctID] = pair.split(':')
this.eventsToDropByToken.set(token, [...(this.eventsToDropByToken.get(token) || []), distinctID])
})
}

isEventBlacklisted(event: PipelineEvent): boolean {
Expand All @@ -89,7 +82,7 @@ export class EventPipelineRunner {
if (!key) {
return false // for safety don't drop events here, they are later dropped in teamDataPopulation
}
const dropIds = this.eventsToDropByToken.get(key)
const dropIds = this.hub.eventsToDropByToken?.get(key)
return dropIds?.includes(event.distinct_id) || dropIds?.includes('*') || false
}

Expand Down Expand Up @@ -131,10 +124,7 @@ export class EventPipelineRunner {
}

async runEventPipelineSteps(event: PluginEvent): Promise<EventPipelineResult> {
if (
process.env.POE_EMBRACE_JOIN_FOR_TEAMS === '*' ||
process.env.POE_EMBRACE_JOIN_FOR_TEAMS?.split(',').includes(event.team_id.toString())
) {
if (this.hub.poeEmbraceJoinForTeams?.(event.team_id)) {
// https://docs.google.com/document/d/12Q1KcJ41TicIwySCfNJV5ZPKXWVtxT7pzpB3r9ivz_0
// We're not using the buffer anymore
// instead we'll (if within timeframe) merge into the newer personId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { PluginEvent } from '@posthog/plugin-scaffold'
import { DateTime } from 'luxon'

import { ISOTimestamp, Person, PipelineEvent, PreIngestionEvent } from '../../../../src/types'
import { createEventsToDropByToken } from '../../../../src/utils/db/hub'
import { createEventStep } from '../../../../src/worker/ingestion/event-pipeline/createEventStep'
import { pluginsProcessEventStep } from '../../../../src/worker/ingestion/event-pipeline/pluginsProcessEventStep'
import { populateTeamDataStep } from '../../../../src/worker/ingestion/event-pipeline/populateTeamDataStep'
Expand Down Expand Up @@ -91,8 +92,8 @@ describe('EventPipelineRunner', () => {
increment: jest.fn(),
timing: jest.fn(),
},
eventsToDropByToken: createEventsToDropByToken('drop_token:drop_id,drop_token_all:*'),
}
process.env.DROP_EVENTS_BY_TOKEN_DISTINCT_ID = 'drop_token:drop_id,drop_token_all:*'
runner = new TestEventPipelineRunner(hub, pluginEvent)

jest.mocked(populateTeamDataStep).mockResolvedValue(pluginEvent)
Expand Down
Loading