diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 7c06dfc8ce87d..d78b2ba924b4a 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -600,6 +600,7 @@ export interface Team { api_token: string slack_incoming_webhook: string | null session_recording_opt_in: boolean + heatmaps_opt_in: boolean | null ingested_event: boolean person_display_name_properties: string[] | null test_account_filters: diff --git a/plugin-server/src/worker/ingestion/event-pipeline/extractHeatmapDataStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/extractHeatmapDataStep.ts index 8f3cf3b2f22d8..93c759a1770a4 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/extractHeatmapDataStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/extractHeatmapDataStep.ts @@ -20,7 +20,7 @@ type HeatmapDataItem = { type HeatmapData = Record -export function extractHeatmapDataStep( +export async function extractHeatmapDataStep( runner: EventPipelineRunner, event: PreIngestionEvent ): Promise<[PreIngestionEvent, Promise[]]> { @@ -29,17 +29,21 @@ export function extractHeatmapDataStep( let acks: Promise[] = [] try { - const heatmapEvents = extractScrollDepthHeatmapData(event) ?? [] - - // eslint-disable-next-line @typescript-eslint/no-floating-promises - acks = heatmapEvents.map((rawEvent) => { - return runner.hub.kafkaProducer.produce({ - topic: runner.hub.CLICKHOUSE_HEATMAPS_KAFKA_TOPIC, - key: eventUuid, - value: Buffer.from(JSON.stringify(rawEvent)), - waitForAck: true, + const team = await runner.hub.teamManager.fetchTeam(teamId) + + if (team?.heatmaps_opt_in !== false) { + const heatmapEvents = extractScrollDepthHeatmapData(event) ?? [] + + // eslint-disable-next-line @typescript-eslint/no-floating-promises + acks = heatmapEvents.map((rawEvent) => { + return runner.hub.kafkaProducer.produce({ + topic: runner.hub.CLICKHOUSE_HEATMAPS_KAFKA_TOPIC, + key: eventUuid, + value: Buffer.from(JSON.stringify(rawEvent)), + waitForAck: true, + }) }) - }) + } } catch (e) { acks.push( captureIngestionWarning(runner.hub.kafkaProducer, teamId, 'invalid_heatmap_data', { @@ -51,7 +55,7 @@ export function extractHeatmapDataStep( // We don't want to ingest this data to the events table delete event.properties['$heatmap_data'] - return Promise.resolve([event, acks]) + return [event, acks] } function replacePathInUrl(url: string, newPath: string): string { diff --git a/plugin-server/src/worker/ingestion/team-manager.ts b/plugin-server/src/worker/ingestion/team-manager.ts index 72258e72d2a50..79135df9309fe 100644 --- a/plugin-server/src/worker/ingestion/team-manager.ts +++ b/plugin-server/src/worker/ingestion/team-manager.ts @@ -155,6 +155,7 @@ export async function fetchTeam(client: PostgresRouter, teamId: Team['id']): Pro api_token, slack_incoming_webhook, session_recording_opt_in, + heatmaps_opt_in, ingested_event, person_display_name_properties, test_account_filters @@ -180,6 +181,7 @@ export async function fetchTeamByToken(client: PostgresRouter, token: string): P api_token, slack_incoming_webhook, session_recording_opt_in, + heatmaps_opt_in, ingested_event, test_account_filters FROM posthog_team diff --git a/plugin-server/tests/main/db.test.ts b/plugin-server/tests/main/db.test.ts index 94b8d8d310363..8d419b0b9fdb1 100644 --- a/plugin-server/tests/main/db.test.ts +++ b/plugin-server/tests/main/db.test.ts @@ -999,6 +999,7 @@ describe('DB', () => { name: 'TEST PROJECT', organization_id: organizationId, session_recording_opt_in: true, + heatmaps_opt_in: null, slack_incoming_webhook: null, uuid: expect.any(String), person_display_name_properties: [], @@ -1026,6 +1027,7 @@ describe('DB', () => { name: 'TEST PROJECT', organization_id: organizationId, session_recording_opt_in: true, + heatmaps_opt_in: null, slack_incoming_webhook: null, uuid: expect.any(String), test_account_filters: {} as any, // NOTE: Test insertion data gets set as an object weirdly diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/extractHeatmapDataStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/extractHeatmapDataStep.test.ts index f41e30dbf89dd..c98c085298417 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/extractHeatmapDataStep.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/extractHeatmapDataStep.test.ts @@ -132,6 +132,10 @@ describe('extractHeatmapDataStep()', () => { hub: { kafkaProducer: { produce: jest.fn((e) => Promise.resolve(e)), + queueMessage: jest.fn((e) => Promise.resolve(e)), + }, + teamManager: { + fetchTeam: jest.fn(() => Promise.resolve({ heatmaps_opt_in: true })), }, }, nextStep: (...args: any[]) => args, @@ -209,6 +213,15 @@ describe('extractHeatmapDataStep()', () => { `) }) + it('drops if the associated team has explicit opt out', async () => { + runner.hub.teamManager.fetchTeam = jest.fn(() => Promise.resolve({ heatmaps_opt_in: false })) + const response = await extractHeatmapDataStep(runner, event) + expect(response[0]).toEqual(event) + expect(response[0].properties.$heatmap_data).toBeUndefined() + expect(response[1]).toHaveLength(0) + expect(runner.hub.kafkaProducer.produce).toBeCalledTimes(0) + }) + describe('validation', () => { it('handles empty array $heatmap_data', async () => { event.properties.$heatmap_data = [] diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts index 0cf0fcdb08cd5..53fafaf31ade9 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts @@ -91,6 +91,8 @@ describe('EventPipelineRunner', () => { beforeEach(() => { hub = { + kafkaProducer: { queueMessage: jest.fn() }, + teamManager: { fetchTeam: jest.fn(() => {}) }, db: { kafkaProducer: { queueMessage: jest.fn() }, fetchPerson: jest.fn(),