diff --git a/plugin-server/package.json b/plugin-server/package.json index 3699ce1bbc769a..764d63ec5fe22c 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -89,7 +89,9 @@ "prom-client": "^14.2.0", "re2": "^1.20.3", "safe-stable-stringify": "^2.4.0", + "siphash": "^1.1.0", "tail": "^2.2.6", + "tldts": "^6.1.57", "uuid": "^9.0.1", "v8-profiler-next": "^1.9.0", "vm2": "3.9.18" diff --git a/plugin-server/pnpm-lock.yaml b/plugin-server/pnpm-lock.yaml index e49da8ba134910..4eff36b9e84fcf 100644 --- a/plugin-server/pnpm-lock.yaml +++ b/plugin-server/pnpm-lock.yaml @@ -151,9 +151,15 @@ dependencies: safe-stable-stringify: specifier: ^2.4.0 version: 2.4.3 + siphash: + specifier: ^1.1.0 + version: 1.1.0 tail: specifier: ^2.2.6 version: 2.2.6 + tldts: + specifier: ^6.1.57 + version: 6.1.57 uuid: specifier: ^9.0.1 version: 9.0.1 @@ -9659,6 +9665,10 @@ packages: string-width: 1.0.2 dev: true + /siphash@1.1.0: + resolution: {integrity: sha512-QXQOIeN7Lq1uAVfppZukylZ2tAGedZ49Xpu39Zfyb6JJqVFrP7GfbVc7kxTAyoHGi3/c0y7yIG6lmSwxapEKqA==} + dev: false + /sisteransi@1.0.5: resolution: {integrity: sha512-bLGGlR1QxBcynn2d5YmDX4MGjlZvy2MRBDRNHLJ8VI6l6+9FUiyTFNJ0IveOSP0bcXgVDPRcfGqA0pjaqUpfVg==} dev: true @@ -10115,6 +10125,17 @@ packages: engines: {node: '>=12'} dev: false + /tldts-core@6.1.57: + resolution: {integrity: sha512-lXnRhuQpx3zU9EONF9F7HfcRLvN1uRYUBIiKL+C/gehC/77XTU+Jye6ui86GA3rU6FjlJ0triD1Tkjt2F/2lEg==} + dev: false + + /tldts@6.1.57: + resolution: {integrity: sha512-Oy7yDXK8meJl8vPMOldzA+MtueAJ5BrH4l4HXwZuj2AtfoQbLjmTJmjNWPUcAo+E/ibHn7QlqMS0BOcXJFJyHQ==} + hasBin: true + dependencies: + tldts-core: 6.1.57 + dev: false + /tmp@0.2.1: resolution: {integrity: sha512-76SUhtfqR2Ijn+xllcI5P1oyannHNHByD80W1q447gU3mp9G9PSpGdWmjUOHRDPiHYacIk66W7ubDTuPF3BEtQ==} engines: {node: '>=8.17.0'} diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index e8682da3b337be..2a311f3a1344e4 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -640,6 +640,7 @@ export interface Team { test_account_filters: | (EventPropertyFilter | PersonPropertyFilter | ElementPropertyFilter | CohortPropertyFilter)[] | null + cookieless_server_hash_opt_in?: boolean } /** Properties shared by RawEventMessage and EventMessage. */ diff --git a/plugin-server/src/utils/db/db.ts b/plugin-server/src/utils/db/db.ts index 7d59501006cf13..a2eae030e99c06 100644 --- a/plugin-server/src/utils/db/db.ts +++ b/plugin-server/src/utils/db/db.ts @@ -403,6 +403,37 @@ export class DB { }) } + public redisSAdd(key: string, value: Redis.ValueType): Promise { + return instrumentQuery('query.redisSAdd', undefined, async () => { + const client = await this.redisPool.acquire() + const timeout = timeoutGuard('SADD delayed. Waiting over 30 sec to perform SADD', { + key, + value, + }) + try { + return await client.sadd(key, value) + } finally { + clearTimeout(timeout) + await this.redisPool.release(client) + } + }) + } + + public redisSCard(key: string): Promise { + return instrumentQuery('query.redisSCard', undefined, async () => { + const client = await this.redisPool.acquire() + const timeout = timeoutGuard('SCARD delayed. Waiting over 30 sec to perform SADD', { + key, + }) + try { + return await client.scard(key) + } finally { + clearTimeout(timeout) + await this.redisPool.release(client) + } + }) + } + public redisPublish(channel: string, message: string): Promise { return instrumentQuery('query.redisPublish', undefined, async () => { const client = await this.redisPool.acquire() diff --git a/plugin-server/src/worker/ingestion/event-pipeline/cookielessServerHashStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/cookielessServerHashStep.ts new file mode 100644 index 00000000000000..2a284d3d012d0a --- /dev/null +++ b/plugin-server/src/worker/ingestion/event-pipeline/cookielessServerHashStep.ts @@ -0,0 +1,139 @@ +import { PluginEvent } from '@posthog/plugin-scaffold' +import { DateTime } from 'luxon' +// @ts-expect-error no types +import * as siphashDouble from 'siphash/lib/siphash-double' +import { getDomain } from 'tldts' + +import { UUID7 } from '../../../utils/utils' +import { EventPipelineRunner } from './runner' + +const TIMEZONE_FALLBACK = 'UTC' +const SENTINEL_COOKIELESS_SERVER_HASH_DISTINCT_ID = '$sentinel_cookieless_server_hash' + +export function getSaltForDay(timestamp: number, timezone: string | undefined): string { + // get the day based on the timezone + const datetime = new Date(timestamp) + // use the esperanto locale code to get the day of this timestamp in the timezone in YYYY-MM-DD format + const day = datetime.toLocaleDateString('eo', { timeZone: timezone || TIMEZONE_FALLBACK }) + + // lookup the salt for this day + // TODO + return 'salt' + day +} + +export function doHash( + timestamp: number, + timezone: string | undefined, + teamId: number, + ip: string, + host: string, + userAgent: string, + nIdentifies: number = 0 +) { + const salt = getSaltForDay(timestamp, timezone) + const key = siphashDouble.string16_to_key(salt) + const rootDomain = getDomain(host) || host + // use the 128-bit version of siphash to get the result + return siphashDouble.hash_hex(key, `${teamId.toString()}-${ip}-${rootDomain}-${userAgent}-${nIdentifies}`) +} + +export async function cookielessServerHashStep( + runner: EventPipelineRunner, + event: PluginEvent +): Promise<[PluginEvent | undefined]> { + // if events aren't using this mode, skip all processing + if (event.properties?.['$device_id'] !== SENTINEL_COOKIELESS_SERVER_HASH_DISTINCT_ID) { + return [event] + } + // drop some events that aren't valid in this mode + if (!event.timestamp) { + // TODO log + return [undefined] + } + const sessionId = event.properties['$session_id'] + if (sessionId !== SENTINEL_COOKIELESS_SERVER_HASH_DISTINCT_ID) { + // TODO log + return [undefined] + } + // ensure that the distinct id is also the sentinel value + if ( + (event.event === '$identify' && + event.properties['$anon_distinct_id'] !== SENTINEL_COOKIELESS_SERVER_HASH_DISTINCT_ID) || + (event.event !== '$identify' && event.distinct_id !== SENTINEL_COOKIELESS_SERVER_HASH_DISTINCT_ID) + ) { + // TODO log + return [undefined] + } + + // drop events that don't have the necessary properties + const userAgent = event.properties['$raw_user_agent'] + const ip = event.properties['$ip'] + const host = event.properties['$host'] + const timezone = event.properties['$timezone'] + const timestamp = DateTime.fromISO(event.timestamp).toMillis() + const teamId = event.team_id + if (!userAgent || !ip || !host) { + // TODO log + return [undefined] + } + + // drop events from teams that aren't allowed to use this mode + const team = await runner.hub.teamManager.getTeamForEvent(event) + if (!team?.cookieless_server_hash_opt_in) { + // TODO log + return [undefined] + } + + const hashValue = doHash(timestamp, timezone, teamId, ip, host, userAgent) + event.properties['$device_id'] = hashValue + + // TRICKY: if a user were to log in and out, to avoid collisions, we would want a different hash value, so we store the set of identify event uuids for identifies + // ASSUMPTION: all events are processed in order, and are processed exactly once + const identifiesRedisKey = `cookieless_i:${hashValue}` + // how many identifies have happened with that hash value? + const numIdentifies = await runner.hub.db.redisSCard(identifiesRedisKey) + // rehash with the number of identifies, so that each 'user' has a unique hash value + const hashValue2 = + numIdentifies === 0 ? hashValue : doHash(timestamp, timezone, teamId, ip, host, userAgent, numIdentifies) + + if (event.event === '$identify') { + // add this identify event id to redis + await runner.hub.db.redisSAdd(identifiesRedisKey, event.uuid) + await runner.hub.db.redisExpire(identifiesRedisKey, 60 * 60 * 24) // 24 hours // TODO this is the max but could be less, given we looked at the timestamp 10 lines of code ago + + // set the distinct id to the new hash value + event.properties[`$anon_distinct_id`] = hashValue2 + } else { + // set the distinct id to the new hash value + event.distinct_id = hashValue2 + } + + const sessionRedisKey = `cookieless_s:${hashValue2}` + // do we have a session id for this user already? + let sessionInfo = await runner.hub.db.redisGet<{ s: string; t: number } | null>( + sessionRedisKey, + null, + 'cookielessServerHashStep', + { + jsonSerialize: true, + } + ) + // if not, or the TTL has expired, create a new one. Don't rely on redis TTL, as ingestion lag could approach the 30-minute session inactivity timeout + if (!sessionInfo || timestamp - sessionInfo.t > 60 * 30 * 1000) { + const sessionId = new UUID7(timestamp).toString() + sessionInfo = { s: sessionId, t: timestamp } + await runner.hub.db.redisSet(sessionRedisKey, sessionInfo, 'cookielessServerHashStep', 60 * 60 * 24) + } else { + // otherwise, update the timestamp + await runner.hub.db.redisSet( + sessionRedisKey, + { s: sessionInfo.s, t: timestamp }, + 'cookielessServerHashStep', + 60 * 60 * 24 + ) + } + + event.properties['$session_id'] = sessionInfo.s + + return [event] +} diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts index dc5cf6d8df186c..7c17720292e317 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts @@ -10,6 +10,7 @@ import { normalizeProcessPerson } from '../../../utils/event' import { status } from '../../../utils/status' import { EventsProcessor } from '../process-event' import { captureIngestionWarning, generateEventDeadLetterQueueMessage } from '../utils' +import { cookielessServerHashStep } from './cookielessServerHashStep' import { createEventStep } from './createEventStep' import { emitEventStep } from './emitEventStep' import { extractHeatmapDataStep } from './extractHeatmapDataStep' @@ -215,10 +216,15 @@ export class EventPipelineRunner { return this.runHeatmapPipelineSteps(event, kafkaAcks) } - const processedEvent = await this.runStep(pluginsProcessEventStep, [this, event], event.team_id) + const [postCookielessEvent] = await this.runStep(cookielessServerHashStep, [this, event], event.team_id) + if (postCookielessEvent == null) { + return this.registerLastStep('cookielessServerHashStep', [event], kafkaAcks) + } + + const processedEvent = await this.runStep(pluginsProcessEventStep, [this, postCookielessEvent], event.team_id) if (processedEvent == null) { // A plugin dropped the event. - return this.registerLastStep('pluginsProcessEventStep', [event], kafkaAcks) + return this.registerLastStep('pluginsProcessEventStep', [postCookielessEvent], kafkaAcks) } const [normalizedEvent, timestamp] = await this.runStep(