Skip to content

Commit

Permalink
feat: Added heatmaps opt in check to processing (#23328)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored Jun 28, 2024
1 parent c7c8892 commit a27381d
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 12 deletions.
1 change: 1 addition & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ export interface Team {
api_token: string
slack_incoming_webhook: string | null
session_recording_opt_in: boolean
heatmaps_opt_in: boolean | null
ingested_event: boolean
person_display_name_properties: string[] | null
test_account_filters:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type HeatmapDataItem = {

type HeatmapData = Record<string, HeatmapDataItem[]>

export function extractHeatmapDataStep(
export async function extractHeatmapDataStep(
runner: EventPipelineRunner,
event: PreIngestionEvent
): Promise<[PreIngestionEvent, Promise<void>[]]> {
Expand All @@ -29,17 +29,21 @@ export function extractHeatmapDataStep(
let acks: Promise<void>[] = []

try {
const heatmapEvents = extractScrollDepthHeatmapData(event) ?? []

// eslint-disable-next-line @typescript-eslint/no-floating-promises
acks = heatmapEvents.map((rawEvent) => {
return runner.hub.kafkaProducer.produce({
topic: runner.hub.CLICKHOUSE_HEATMAPS_KAFKA_TOPIC,
key: eventUuid,
value: Buffer.from(JSON.stringify(rawEvent)),
waitForAck: true,
const team = await runner.hub.teamManager.fetchTeam(teamId)

if (team?.heatmaps_opt_in !== false) {
const heatmapEvents = extractScrollDepthHeatmapData(event) ?? []

// eslint-disable-next-line @typescript-eslint/no-floating-promises
acks = heatmapEvents.map((rawEvent) => {
return runner.hub.kafkaProducer.produce({
topic: runner.hub.CLICKHOUSE_HEATMAPS_KAFKA_TOPIC,
key: eventUuid,
value: Buffer.from(JSON.stringify(rawEvent)),
waitForAck: true,
})
})
})
}
} catch (e) {
acks.push(
captureIngestionWarning(runner.hub.kafkaProducer, teamId, 'invalid_heatmap_data', {
Expand All @@ -51,7 +55,7 @@ export function extractHeatmapDataStep(
// We don't want to ingest this data to the events table
delete event.properties['$heatmap_data']

return Promise.resolve([event, acks])
return [event, acks]
}

function replacePathInUrl(url: string, newPath: string): string {
Expand Down
2 changes: 2 additions & 0 deletions plugin-server/src/worker/ingestion/team-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ export async function fetchTeam(client: PostgresRouter, teamId: Team['id']): Pro
api_token,
slack_incoming_webhook,
session_recording_opt_in,
heatmaps_opt_in,
ingested_event,
person_display_name_properties,
test_account_filters
Expand All @@ -180,6 +181,7 @@ export async function fetchTeamByToken(client: PostgresRouter, token: string): P
api_token,
slack_incoming_webhook,
session_recording_opt_in,
heatmaps_opt_in,
ingested_event,
test_account_filters
FROM posthog_team
Expand Down
2 changes: 2 additions & 0 deletions plugin-server/tests/main/db.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,7 @@ describe('DB', () => {
name: 'TEST PROJECT',
organization_id: organizationId,
session_recording_opt_in: true,
heatmaps_opt_in: null,
slack_incoming_webhook: null,
uuid: expect.any(String),
person_display_name_properties: [],
Expand Down Expand Up @@ -1026,6 +1027,7 @@ describe('DB', () => {
name: 'TEST PROJECT',
organization_id: organizationId,
session_recording_opt_in: true,
heatmaps_opt_in: null,
slack_incoming_webhook: null,
uuid: expect.any(String),
test_account_filters: {} as any, // NOTE: Test insertion data gets set as an object weirdly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ describe('extractHeatmapDataStep()', () => {
hub: {
kafkaProducer: {
produce: jest.fn((e) => Promise.resolve(e)),
queueMessage: jest.fn((e) => Promise.resolve(e)),
},
teamManager: {
fetchTeam: jest.fn(() => Promise.resolve({ heatmaps_opt_in: true })),
},
},
nextStep: (...args: any[]) => args,
Expand Down Expand Up @@ -209,6 +213,15 @@ describe('extractHeatmapDataStep()', () => {
`)
})

it('drops if the associated team has explicit opt out', async () => {
runner.hub.teamManager.fetchTeam = jest.fn(() => Promise.resolve({ heatmaps_opt_in: false }))
const response = await extractHeatmapDataStep(runner, event)
expect(response[0]).toEqual(event)
expect(response[0].properties.$heatmap_data).toBeUndefined()
expect(response[1]).toHaveLength(0)
expect(runner.hub.kafkaProducer.produce).toBeCalledTimes(0)
})

describe('validation', () => {
it('handles empty array $heatmap_data', async () => {
event.properties.$heatmap_data = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ describe('EventPipelineRunner', () => {

beforeEach(() => {
hub = {
kafkaProducer: { queueMessage: jest.fn() },
teamManager: { fetchTeam: jest.fn(() => {}) },
db: {
kafkaProducer: { queueMessage: jest.fn() },
fetchPerson: jest.fn(),
Expand Down

0 comments on commit a27381d

Please sign in to comment.