Skip to content

Commit

Permalink
chore(plugin-server): move runner process.env calculations to Hub
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner committed Oct 12, 2023
1 parent 9f8ef65 commit 6fbbce1
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 13 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci-plugin-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ jobs:
CLICKHOUSE_HOST: 'localhost'
CLICKHOUSE_DATABASE: 'posthog_test'
KAFKA_HOSTS: 'kafka:9092'
DROP_EVENTS_BY_TOKEN_DISTINCT_ID: 'drop_token:drop_id,drop_token_all:*'

steps:
- name: Code check out
Expand Down
2 changes: 2 additions & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ export function getDefaultConfig(): PluginsServerConfig {
USE_KAFKA_FOR_SCHEDULED_TASKS: true,
CLOUD_DEPLOYMENT: null,
EXTERNAL_REQUEST_TIMEOUT_MS: 10 * 1000, // 10 seconds
DROP_EVENTS_BY_TOKEN_DISTINCT_ID: '',
POE_EMBRACE_JOIN_FOR_TEAMS: '',

STARTUP_PROFILE_DURATION_SECONDS: 300, // 5 minutes
STARTUP_PROFILE_CPU: false,
Expand Down
5 changes: 5 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ export interface PluginsServerConfig {
/** Label of the PostHog Cloud environment. Null if not running PostHog Cloud. @example 'US' */
CLOUD_DEPLOYMENT: string | null
EXTERNAL_REQUEST_TIMEOUT_MS: number
DROP_EVENTS_BY_TOKEN_DISTINCT_ID: string
POE_EMBRACE_JOIN_FOR_TEAMS: string

// dump profiles to disk, covering the first N seconds of runtime
STARTUP_PROFILE_DURATION_SECONDS: number
Expand Down Expand Up @@ -275,6 +277,9 @@ export interface Hub extends PluginsServerConfig {
enqueuePluginJob: (job: EnqueuedPluginJob) => Promise<void>
// ValueMatchers used for various opt-in/out features
pluginConfigsToSkipElementsParsing: ValueMatcher<number>
poeEmbraceJoinForTeams: ValueMatcher<number>
// lookups
eventsToDropByToken: Map<string, string[]>
}

export interface PluginServerCapabilities {
Expand Down
10 changes: 10 additions & 0 deletions plugin-server/src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ export async function createHub(
})
}

const eventsToDropByToken: Map<string, string[]> = new Map()
if (process.env.DROP_EVENTS_BY_TOKEN_DISTINCT_ID) {
process.env.DROP_EVENTS_BY_TOKEN_DISTINCT_ID.split(',').forEach((pair) => {
const [token, distinctID] = pair.split(':')
eventsToDropByToken.set(token, [...(eventsToDropByToken.get(token) || []), distinctID])
})
}

const hub: Partial<Hub> = {
...serverConfig,
instanceId,
Expand Down Expand Up @@ -184,6 +192,8 @@ export async function createHub(
conversionBufferEnabledTeams,
fetchHostnameGuardTeams,
pluginConfigsToSkipElementsParsing: buildIntegerMatcher(process.env.SKIP_ELEMENTS_PARSING_PLUGINS, true),
poeEmbraceJoinForTeams: buildIntegerMatcher(process.env.POE_EMBRACE_JOIN_FOR_TEAMS, true),
eventsToDropByToken,
}

// :TODO: This is only used on worker threads, not main
Expand Down
14 changes: 2 additions & 12 deletions plugin-server/src/worker/ingestion/event-pipeline/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ export class EventPipelineRunner {
// See https://docs.google.com/document/d/12Q1KcJ41TicIwySCfNJV5ZPKXWVtxT7pzpB3r9ivz_0
poEEmbraceJoin: boolean
private delayAcks: boolean
private eventsToDropByToken: Map<string, string[]>

constructor(hub: Hub, originalEvent: PipelineEvent | ProcessedPluginEvent, poEEmbraceJoin = false) {
this.hub = hub
Expand All @@ -74,12 +73,6 @@ export class EventPipelineRunner {

// TODO: remove after successful rollout
this.delayAcks = stringToBoolean(process.env.INGESTION_DELAY_WRITE_ACKS)

this.eventsToDropByToken = new Map()
process.env.DROP_EVENTS_BY_TOKEN_DISTINCT_ID?.split(',').forEach((pair) => {
const [token, distinctID] = pair.split(':')
this.eventsToDropByToken.set(token, [...(this.eventsToDropByToken.get(token) || []), distinctID])
})
}

isEventBlacklisted(event: PipelineEvent): boolean {
Expand All @@ -89,7 +82,7 @@ export class EventPipelineRunner {
if (!key) {
return false // for safety don't drop events here, they are later dropped in teamDataPopulation
}
const dropIds = this.eventsToDropByToken.get(key)
const dropIds = this.hub.eventsToDropByToken?.get(key)
return dropIds?.includes(event.distinct_id) || dropIds?.includes('*') || false
}

Expand Down Expand Up @@ -131,10 +124,7 @@ export class EventPipelineRunner {
}

async runEventPipelineSteps(event: PluginEvent): Promise<EventPipelineResult> {
if (
process.env.POE_EMBRACE_JOIN_FOR_TEAMS === '*' ||
process.env.POE_EMBRACE_JOIN_FOR_TEAMS?.split(',').includes(event.team_id.toString())
) {
if (this.hub.poeEmbraceJoinForTeams(event.team_id)) {
// https://docs.google.com/document/d/12Q1KcJ41TicIwySCfNJV5ZPKXWVtxT7pzpB3r9ivz_0
// We're not using the buffer anymore
// instead we'll (if within timeframe) merge into the newer personId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ describe('EventPipelineRunner', () => {
timing: jest.fn(),
},
}
process.env.DROP_EVENTS_BY_TOKEN_DISTINCT_ID = 'drop_token:drop_id,drop_token_all:*'
runner = new TestEventPipelineRunner(hub, pluginEvent)

jest.mocked(populateTeamDataStep).mockResolvedValue(pluginEvent)
Expand Down

0 comments on commit 6fbbce1

Please sign in to comment.