Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
robbie-c committed Dec 4, 2024
1 parent fb84481 commit d2180e6
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 2 deletions.
2 changes: 2 additions & 0 deletions plugin-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@
"prom-client": "^14.2.0",
"re2": "^1.20.3",
"safe-stable-stringify": "^2.4.0",
"siphash": "^1.1.0",
"tail": "^2.2.6",
"tldts": "^6.1.57",
"uuid": "^9.0.1",
"v8-profiler-next": "^1.9.0",
"vm2": "3.9.18"
Expand Down
21 changes: 21 additions & 0 deletions plugin-server/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ export interface Team {
test_account_filters:
| (EventPropertyFilter | PersonPropertyFilter | ElementPropertyFilter | CohortPropertyFilter)[]
| null
cookieless_server_hash_opt_in?: boolean
}

/** Properties shared by RawEventMessage and EventMessage. */
Expand Down
31 changes: 31 additions & 0 deletions plugin-server/src/utils/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,37 @@ export class DB {
})
}

public redisSAdd(key: string, value: Redis.ValueType): Promise<number> {
return instrumentQuery('query.redisSAdd', undefined, async () => {
const client = await this.redisPool.acquire()
const timeout = timeoutGuard('SADD delayed. Waiting over 30 sec to perform SADD', {
key,
value,
})
try {
return await client.sadd(key, value)
} finally {
clearTimeout(timeout)
await this.redisPool.release(client)
}
})
}

public redisSCard(key: string): Promise<number> {
return instrumentQuery('query.redisSCard', undefined, async () => {
const client = await this.redisPool.acquire()
const timeout = timeoutGuard('SCARD delayed. Waiting over 30 sec to perform SADD', {
key,
})
try {
return await client.scard(key)
} finally {
clearTimeout(timeout)
await this.redisPool.release(client)
}
})
}

public redisPublish(channel: string, message: string): Promise<number> {
return instrumentQuery('query.redisPublish', undefined, async () => {
const client = await this.redisPool.acquire()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import { PluginEvent } from '@posthog/plugin-scaffold'
import { DateTime } from 'luxon'
// @ts-expect-error no types
import * as siphashDouble from 'siphash/lib/siphash-double'
import { getDomain } from 'tldts'

import { UUID7 } from '../../../utils/utils'
import { EventPipelineRunner } from './runner'

const TIMEZONE_FALLBACK = 'UTC'
const SENTINEL_COOKIELESS_SERVER_HASH_DISTINCT_ID = '$sentinel_cookieless_server_hash'

export function getSaltForDay(timestamp: number, timezone: string | undefined): string {
// get the day based on the timezone
const datetime = new Date(timestamp)
// use the esperanto locale code to get the day of this timestamp in the timezone in YYYY-MM-DD format
const day = datetime.toLocaleDateString('eo', { timeZone: timezone || TIMEZONE_FALLBACK })

// lookup the salt for this day
// TODO
return 'salt' + day
}

export function doHash(
timestamp: number,
timezone: string | undefined,
teamId: number,
ip: string,
host: string,
userAgent: string,
nIdentifies: number = 0
) {
const salt = getSaltForDay(timestamp, timezone)
const key = siphashDouble.string16_to_key(salt)
const rootDomain = getDomain(host) || host
// use the 128-bit version of siphash to get the result
return siphashDouble.hash_hex(key, `${teamId.toString()}-${ip}-${rootDomain}-${userAgent}-${nIdentifies}`)
}

export async function cookielessServerHashStep(
runner: EventPipelineRunner,
event: PluginEvent
): Promise<[PluginEvent | undefined]> {
// if events aren't using this mode, skip all processing
if (event.properties?.['$device_id'] !== SENTINEL_COOKIELESS_SERVER_HASH_DISTINCT_ID) {
return [event]
}
// drop some events that aren't valid in this mode
if (!event.timestamp) {
// TODO log
return [undefined]
}
const sessionId = event.properties['$session_id']
if (sessionId !== SENTINEL_COOKIELESS_SERVER_HASH_DISTINCT_ID) {
// TODO log
return [undefined]
}
// ensure that the distinct id is also the sentinel value
if (
(event.event === '$identify' &&
event.properties['$anon_distinct_id'] !== SENTINEL_COOKIELESS_SERVER_HASH_DISTINCT_ID) ||
(event.event !== '$identify' && event.distinct_id !== SENTINEL_COOKIELESS_SERVER_HASH_DISTINCT_ID)
) {
// TODO log
return [undefined]
}

// drop events that don't have the necessary properties
const userAgent = event.properties['$raw_user_agent']
const ip = event.properties['$ip']
const host = event.properties['$host']
const timezone = event.properties['$timezone']
const timestamp = DateTime.fromISO(event.timestamp).toMillis()
const teamId = event.team_id
if (!userAgent || !ip || !host) {
// TODO log
return [undefined]
}

// drop events from teams that aren't allowed to use this mode
const team = await runner.hub.teamManager.getTeamForEvent(event)
if (!team?.cookieless_server_hash_opt_in) {
// TODO log
return [undefined]
}

const hashValue = doHash(timestamp, timezone, teamId, ip, host, userAgent)
event.properties['$device_id'] = hashValue

// TRICKY: if a user were to log in and out, to avoid collisions, we would want a different hash value, so we store the set of identify event uuids for identifies
// ASSUMPTION: all events are processed in order, and are processed exactly once
const identifiesRedisKey = `cookieless_i:${hashValue}`
// how many identifies have happened with that hash value?
const numIdentifies = await runner.hub.db.redisSCard(identifiesRedisKey)
// rehash with the number of identifies, so that each 'user' has a unique hash value
const hashValue2 =
numIdentifies === 0 ? hashValue : doHash(timestamp, timezone, teamId, ip, host, userAgent, numIdentifies)

if (event.event === '$identify') {
// add this identify event id to redis
await runner.hub.db.redisSAdd(identifiesRedisKey, event.uuid)
await runner.hub.db.redisExpire(identifiesRedisKey, 60 * 60 * 24) // 24 hours // TODO this is the max but could be less, given we looked at the timestamp 10 lines of code ago

// set the distinct id to the new hash value
event.properties[`$anon_distinct_id`] = hashValue2
} else {
// set the distinct id to the new hash value
event.distinct_id = hashValue2
}

const sessionRedisKey = `cookieless_s:${hashValue2}`
// do we have a session id for this user already?
let sessionInfo = await runner.hub.db.redisGet<{ s: string; t: number } | null>(
sessionRedisKey,
null,
'cookielessServerHashStep',
{
jsonSerialize: true,
}
)
// if not, or the TTL has expired, create a new one. Don't rely on redis TTL, as ingestion lag could approach the 30-minute session inactivity timeout
if (!sessionInfo || timestamp - sessionInfo.t > 60 * 30 * 1000) {
const sessionId = new UUID7(timestamp).toString()
sessionInfo = { s: sessionId, t: timestamp }
await runner.hub.db.redisSet(sessionRedisKey, sessionInfo, 'cookielessServerHashStep', 60 * 60 * 24)
} else {
// otherwise, update the timestamp
await runner.hub.db.redisSet(
sessionRedisKey,
{ s: sessionInfo.s, t: timestamp },
'cookielessServerHashStep',
60 * 60 * 24
)
}

event.properties['$session_id'] = sessionInfo.s

return [event]
}
10 changes: 8 additions & 2 deletions plugin-server/src/worker/ingestion/event-pipeline/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { normalizeProcessPerson } from '../../../utils/event'
import { status } from '../../../utils/status'
import { EventsProcessor } from '../process-event'
import { captureIngestionWarning, generateEventDeadLetterQueueMessage } from '../utils'
import { cookielessServerHashStep } from './cookielessServerHashStep'
import { createEventStep } from './createEventStep'
import { emitEventStep } from './emitEventStep'
import { extractHeatmapDataStep } from './extractHeatmapDataStep'
Expand Down Expand Up @@ -215,10 +216,15 @@ export class EventPipelineRunner {
return this.runHeatmapPipelineSteps(event, kafkaAcks)
}

const processedEvent = await this.runStep(pluginsProcessEventStep, [this, event], event.team_id)
const [postCookielessEvent] = await this.runStep(cookielessServerHashStep, [this, event], event.team_id)
if (postCookielessEvent == null) {
return this.registerLastStep('cookielessServerHashStep', [event], kafkaAcks)
}

const processedEvent = await this.runStep(pluginsProcessEventStep, [this, postCookielessEvent], event.team_id)
if (processedEvent == null) {
// A plugin dropped the event.
return this.registerLastStep('pluginsProcessEventStep', [event], kafkaAcks)
return this.registerLastStep('pluginsProcessEventStep', [postCookielessEvent], kafkaAcks)
}

const [normalizedEvent, timestamp] = await this.runStep(
Expand Down

0 comments on commit d2180e6

Please sign in to comment.