Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
robbie-c committed Dec 4, 2024
1 parent 471735a commit c202179
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ export async function cookielessServerHashStep(
// return [event]
// }

const timestamp = event.timestamp ?? event.sent_at ?? event.now

// drop some events that aren't valid in this mode
if (!event.timestamp) {
if (!timestamp) {
// TODO log
return [undefined]
}
Expand All @@ -62,11 +64,10 @@ export async function cookielessServerHashStep(
// TODO log
return [undefined]
}
// ensure that the distinct id is also the sentinel value
// if it's an identify event, it must have the sentinel distinct id
if (
(event.event === '$identify' &&
event.properties['$anon_distinct_id'] !== SENTINEL_COOKIELESS_SERVER_HASH_DISTINCT_ID) ||
(event.event !== '$identify' && event.distinct_id !== SENTINEL_COOKIELESS_SERVER_HASH_DISTINCT_ID)
event.event === '$identify' &&
event.properties['$anon_distinct_id'] !== SENTINEL_COOKIELESS_SERVER_HASH_DISTINCT_ID
) {
// TODO log
return [undefined]
Expand All @@ -77,14 +78,14 @@ export async function cookielessServerHashStep(
const ip = event.properties['$ip']
const host = event.properties['$host']
const timezone = event.properties['$timezone']
const timestamp = DateTime.fromISO(event.timestamp).toMillis()
const timestampMs = DateTime.fromISO(timestamp).toMillis()
const teamId = event.team_id
if (!userAgent || !ip || !host) {
// TODO log
return [undefined]
}

const hashValue = doHash(timestamp, timezone, teamId, ip, host, userAgent)
const hashValue = doHash(timestampMs, timezone, teamId, ip, host, userAgent)
event.properties['$device_id'] = hashValue

// TRICKY: if a user were to log in and out, to avoid collisions, we would want a different hash value, so we store the set of identify event uuids for identifies
Expand All @@ -96,14 +97,16 @@ export async function cookielessServerHashStep(
const hashValue2 = numIdentifies === 0 ? hashValue : hashValue + '_' + numIdentifies

if (event.event === '$identify') {
// identify event, so the anon_distinct_id must be the sentinel and needs to be replaced

// add this identify event id to redis
await runner.hub.db.redisSAdd(identifiesRedisKey, event.uuid)
await runner.hub.db.redisExpire(identifiesRedisKey, 60 * 60 * 24) // 24 hours // TODO this is the max but could be less, given we looked at the timestamp 10 lines of code ago

// set the distinct id to the new hash value
event.properties[`$anon_distinct_id`] = hashValue2
} else {
// set the distinct id to the new hash value
} else if (event.distinct_id === SENTINEL_COOKIELESS_SERVER_HASH_DISTINCT_ID) {
// event before identify has been called, distinct id is the sentinel and needs to be replaced
event.distinct_id = hashValue2
event.properties[`$distinct_id`] = hashValue2
}
Expand All @@ -119,15 +122,15 @@ export async function cookielessServerHashStep(
}
)
// if not, or the TTL has expired, create a new one. Don't rely on redis TTL, as ingestion lag could approach the 30-minute session inactivity timeout
if (!sessionInfo || timestamp - sessionInfo.t > 60 * 30 * 1000) {
const sessionId = new UUID7(timestamp).toString()
sessionInfo = { s: sessionId, t: timestamp }
if (!sessionInfo || timestampMs - sessionInfo.t > 60 * 30 * 1000) {
const sessionId = new UUID7(timestampMs).toString()
sessionInfo = { s: sessionId, t: timestampMs }
await runner.hub.db.redisSet(sessionRedisKey, sessionInfo, 'cookielessServerHashStep', 60 * 60 * 24)
} else {
// otherwise, update the timestamp
await runner.hub.db.redisSet(
sessionRedisKey,
{ s: sessionInfo.s, t: timestamp },
{ s: sessionInfo.s, t: timestampMs },
'cookielessServerHashStep',
60 * 60 * 24
)
Expand Down
48 changes: 38 additions & 10 deletions posthog/api/capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,22 @@ def get_distinct_id(data: dict[str, Any]) -> str:
return str(raw_value)[0:200]


def get_device_id(data: dict[str, Any]) -> Optional[str]:
raw_value: Any = ""

try:
raw_value = data["properties"]["distinct_id"]
except KeyError:
pass
except TypeError:
raise ValueError(f'Properties must be a JSON object, received {type(data["properties"]).__name__}!')

if not raw_value:
return None

return str(raw_value)[0:200]


def drop_performance_events(events: list[Any]) -> list[Any]:
cleaned_list = [event for event in events if event.get("event") != "$performance_event"]
return cleaned_list
Expand Down Expand Up @@ -521,11 +537,20 @@ def get_event(request):

with start_span(op="kafka.produce") as span:
span.set_tag("event.count", len(processed_events))
for event, event_uuid, distinct_id in processed_events:
for event, event_uuid, distinct_id, device_id in processed_events:
try:
futures.append(
capture_internal(
event, distinct_id, ip, site_url, now, sent_at, event_uuid, token, historical=historical
event,
distinct_id,
device_id,
ip,
site_url,
now,
sent_at,
event_uuid,
token,
historical=historical,
)
)
except Exception as exc:
Expand Down Expand Up @@ -589,10 +614,11 @@ def get_event(request):
if alternative_replay_events:
processed_events = list(preprocess_events(alternative_replay_events))
with REPLAY_MESSAGE_PRODUCTION_TIMER.time():
for event, event_uuid, distinct_id in processed_events:
for event, event_uuid, distinct_id, device_id in processed_events:
capture_args = (
event,
distinct_id,
device_id,
ip,
site_url,
now,
Expand Down Expand Up @@ -793,6 +819,7 @@ def preprocess_events(events: list[dict[str, Any]]) -> Iterator[tuple[dict[str,
for event in events:
event_uuid = UUIDT()
distinct_id = get_distinct_id(event)
device_id = get_device_id(event)
payload_uuid = event.get("uuid", None)
if payload_uuid:
if UUIDT.is_valid_uuid(payload_uuid):
Expand All @@ -805,7 +832,7 @@ def preprocess_events(events: list[dict[str, Any]]) -> Iterator[tuple[dict[str,
if not event:
continue

yield event, event_uuid, distinct_id
yield event, event_uuid, distinct_id, device_id


def parse_event(event):
Expand All @@ -826,6 +853,7 @@ def parse_event(event):
def capture_internal(
event,
distinct_id,
device_id,
ip,
site_url,
now,
Expand Down Expand Up @@ -874,17 +902,17 @@ def capture_internal(
# overriding this to deal with hot partitions in specific cases.
# Setting the partition key to None means using random partitioning.
candidate_partition_key = f"{token}:{distinct_id}"
if (
if device_id == SENTINEL_COOKIELESS_SERVER_HASH_DISTINCT_ID:
# This sentinel value will later on be replaced by the actual cookieless server hash, which contains the ip
# address, so sending the same ip address to the same partition should mean that every event with the same hash
# will end up in the same partition.
kafka_partition_key = f"{token}:{ip}"
elif (
not historical
and settings.CAPTURE_ALLOW_RANDOM_PARTITIONING
and (distinct_id.lower() in LIKELY_ANONYMOUS_IDS or is_randomly_partitioned(candidate_partition_key))
):
kafka_partition_key = None
elif distinct_id == SENTINEL_COOKIELESS_SERVER_HASH_DISTINCT_ID:
# This sentinel value will later on be replaced by the actual cookieless server hash, which contains the ip
# address, so sending the same ip address to the same partition should mean that every event with the same hash
# will end up in the same partition.
kafka_partition_key = f"{token}:{ip}"
else:
kafka_partition_key = candidate_partition_key

Expand Down

0 comments on commit c202179

Please sign in to comment.