Skip to content

Commit

Permalink
fix(cdp): Enrich with groups (#23941)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored Jul 25, 2024
1 parent caa98ae commit 9e03d7c
Show file tree
Hide file tree
Showing 6 changed files with 437 additions and 80 deletions.
30 changes: 13 additions & 17 deletions plugin-server/src/cdp/cdp-consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars
import { createKafkaProducer } from '../kafka/producer'
import { addSentryBreadcrumbsEventListeners } from '../main/ingestion-queues/kafka-metrics'
import { runInstrumentedFunction } from '../main/utils'
import { AppMetric2Type, GroupTypeToColumnIndex, Hub, RawClickHouseEvent, TeamId, TimestampFormat } from '../types'
import { AppMetric2Type, Hub, RawClickHouseEvent, TeamId, TimestampFormat } from '../types'
import { KafkaProducerWrapper } from '../utils/db/kafka-producer-wrapper'
import { status } from '../utils/status'
import { castTimestampOrNow } from '../utils/utils'
import { RustyHook } from '../worker/rusty-hook'
import { AsyncFunctionExecutor } from './async-function-executor'
import { GroupsManager } from './groups-manager'
import { HogExecutor } from './hog-executor'
import { HogFunctionManager } from './hog-function-manager'
import { HogWatcher } from './hog-watcher/hog-watcher'
Expand All @@ -33,7 +34,7 @@ import {
HogFunctionOverflowedGlobals,
HogFunctionType,
} from './types'
import { convertToCaptureEvent, convertToHogFunctionInvocationGlobals, convertToParsedClickhouseEvent } from './utils'
import { convertToCaptureEvent, convertToHogFunctionInvocationGlobals } from './utils'

// Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals
require('@sentry/tracing')
Expand Down Expand Up @@ -73,6 +74,7 @@ abstract class CdpConsumerBase {
asyncFunctionExecutor: AsyncFunctionExecutor
hogExecutor: HogExecutor
hogWatcher: HogWatcher
groupsManager: GroupsManager
isStopping = false
messagesToProduce: HogFunctionMessageToProduce[] = []

Expand All @@ -89,6 +91,7 @@ abstract class CdpConsumerBase {
this.hogExecutor = new HogExecutor(this.hogFunctionManager)
const rustyHook = this.hub?.rustyHook ?? new RustyHook(this.hub)
this.asyncFunctionExecutor = new AsyncFunctionExecutor(this.hub, rustyHook)
this.groupsManager = new GroupsManager(this.hub)
}

protected async runWithHeartbeat<T>(func: () => Promise<T> | T): Promise<T> {
Expand Down Expand Up @@ -297,6 +300,9 @@ abstract class CdpConsumerBase {
func: async () => {
const invocations: { globals: HogFunctionInvocationGlobals; hogFunction: HogFunctionType }[] = []

// TODO: Add a helper to hog functions to determine if they require groups or not and then only load those
await this.groupsManager.enrichGroups(invocationGlobals)

invocationGlobals.forEach((globals) => {
const { matchingFunctions, nonMatchingFunctions } = this.hogExecutor.findMatchingFunctions(globals)

Expand Down Expand Up @@ -491,28 +497,15 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase {
return
}

let groupTypes: GroupTypeToColumnIndex | undefined = undefined

if (
await this.hub.organizationManager.hasAvailableFeature(
clickHouseEvent.team_id,
'group_analytics'
)
) {
// If the organization has group analytics enabled then we enrich the event with group data
groupTypes = await this.hub.groupTypeManager.fetchGroupTypes(clickHouseEvent.team_id)
}

const team = await this.hub.teamManager.fetchTeam(clickHouseEvent.team_id)
if (!team) {
return
}
events.push(
convertToHogFunctionInvocationGlobals(
convertToParsedClickhouseEvent(clickHouseEvent),
clickHouseEvent,
team,
this.hub.SITE_URL ?? 'http://localhost:8000',
groupTypes
this.hub.SITE_URL ?? 'http://localhost:8000'
)
)
} catch (e) {
Expand Down Expand Up @@ -600,6 +593,9 @@ export class CdpOverflowConsumer extends CdpConsumerBase {
return await runInstrumentedFunction({
statsKey: `cdpConsumer.handleEachBatch.executeOverflowedFunctions`,
func: async () => {
// TODO: Add a helper to hog functions to determine if they require groups or not and then only load those
await this.groupsManager.enrichGroups(invocationGlobals.map((x) => x.globals))

const invocations = invocationGlobals
.map((item) =>
item.hogFunctionIds.map((hogFunctionId) => ({
Expand Down
181 changes: 181 additions & 0 deletions plugin-server/src/cdp/groups-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import LRUCache from 'lru-cache'

import { Hub, Team } from '../types'
import { PostgresUse } from '../utils/db/postgres'
import { GroupType, HogFunctionInvocationGlobals } from './types'

export type GroupsMap = Record<string, GroupType>
export type GroupsCache = Record<Team['id'], GroupsMap>

// Maps to the group type index for easy lookup like: { 'team_id:group_type': group_type_index }
type GroupIndexByTeamType = Record<string, number | undefined>

type Group = {
id: string
index: number
type: string
url: string
properties: Record<string, any>
teamId?: number
}

const GROUP_TYPES_CACHE_AGE_MS = 60 * 10 * 1000 // 10 minutes

export class GroupsManager {
groupTypesMappingCache: LRUCache<number, { type: string; index: number }[]>

constructor(private hub: Hub) {
// There is only 5 per team so we can have a very high cache and a very long cooldown
this.groupTypesMappingCache = new LRUCache({ max: 1_000_000, maxAge: GROUP_TYPES_CACHE_AGE_MS })
}

private async filterTeamsWithGroups(teams: Team['id'][]): Promise<Team['id'][]> {
const teamIds = await Promise.all(
teams.map(async (teamId) => {
if (await this.hub.organizationManager.hasAvailableFeature(teamId, 'group_analytics')) {
return teamId
}
})
)

return teamIds.filter((x) => x !== undefined) as Team['id'][]
}

private async fetchGroupTypesMapping(teams: Team['id'][]): Promise<GroupIndexByTeamType> {
// Get from cache otherwise load and save
const teamsWithGroupAnalytics = await this.filterTeamsWithGroups(teams)

// Load teams from cache where possible
// Any teams that aren't in the cache we load from the DB, and then add to the cache

const groupTypesMapping: GroupIndexByTeamType = {}

// Load the cached values so we definitely have them
teamsWithGroupAnalytics.forEach((teamId) => {
const cached = this.groupTypesMappingCache.get(teamId)

if (cached) {
cached.forEach((row) => {
groupTypesMapping[`${teamId}:${row.type}`] = row.index
})
}
})

const teamsToLoad = teamsWithGroupAnalytics.filter((teamId) => !this.groupTypesMappingCache.get(teamId))

if (teamsToLoad.length) {
const result = await this.hub.postgres.query(
PostgresUse.COMMON_READ,
`SELECT team_id, group_type, group_type_index FROM posthog_grouptypemapping WHERE team_id = ANY($1)`,
[teamsToLoad],
'fetchGroupTypes'
)

const groupedByTeam: Record<number, { type: string; index: number }[]> = result.rows.reduce((acc, row) => {
if (!acc[row.team_id]) {
acc[row.team_id] = []
}
acc[row.team_id].push({ type: row.group_type, index: row.group_type_index })
return acc
}, {})

// Save to cache
Object.entries(groupedByTeam).forEach(([teamId, groupTypes]) => {
this.groupTypesMappingCache.set(parseInt(teamId), groupTypes)
groupTypes.forEach((row) => {
groupTypesMapping[`${teamId}:${row.type}`] = row.index
})
})
}

return groupTypesMapping
}

private async fetchGroupProperties(
groups: Group[]
): Promise<
{ team_id: number; group_type_index: number; group_key: string; group_properties: Record<string, any> }[]
> {
const [teamIds, groupIndexes, groupKeys] = groups.reduce(
(acc, group) => {
acc[0].push(group.teamId!)
acc[1].push(group.index)
acc[2].push(group.id)
return acc
},
[[], [], []] as [number[], number[], string[]]
)

return (
await this.hub.postgres.query(
PostgresUse.COMMON_READ,
`SELECT team_id, group_type_index, group_key, group_properties
FROM posthog_group
WHERE team_id = ANY($1) AND group_type_index = ANY($2) AND group_key = ANY($3)`,
[teamIds, groupIndexes, groupKeys],
'fetchGroups'
)
).rows
}

/**
* This function looks complex but is trying to be as optimized as possible.
*
* It iterates over the globals and creates "Group" objects, tracking them referentially in order to later load the properties.
* Once loaded, the objects are mutated in place.
*/
public async enrichGroups(items: HogFunctionInvocationGlobals[]): Promise<HogFunctionInvocationGlobals[]> {
const itemsNeedingGroups = items.filter((x) => !x.groups)
const byTeamType = await this.fetchGroupTypesMapping(
Array.from(new Set(itemsNeedingGroups.map((global) => global.project.id)))
)

const groupsByTeamTypeId: Record<string, Group> = {}

itemsNeedingGroups.forEach((item) => {
const groupsProperty: Record<string, string> = item.event.properties['$groups'] || {}
const groups: HogFunctionInvocationGlobals['groups'] = {}

// Add the base group info without properties
Object.entries(groupsProperty).forEach(([groupType, groupKey]) => {
const groupIndex = byTeamType[`${item.project.id}:${groupType}`]

if (typeof groupIndex === 'number') {
let group = groupsByTeamTypeId[`${item.project.id}:${groupIndex}:${groupKey}`]
if (!group) {
group = groupsByTeamTypeId[`${item.project.id}:${groupIndex}:${groupKey}`] = {
id: groupKey,
index: groupIndex,
type: groupType,
url: `${item.project.url}/groups/${groupIndex}/${encodeURIComponent(groupKey)}`,
properties: {},
teamId: item.project.id,
}
}

// Add to the groups to be enriched and the object here
groups[groupType] = group
}
})

item.groups = groups
})
const groupsFromDatabase = await this.fetchGroupProperties(Object.values(groupsByTeamTypeId))

// Add the properties to all the groups
groupsFromDatabase.forEach((row) => {
const group = groupsByTeamTypeId[`${row.team_id}:${row.group_type_index}:${row.group_key}`]

if (group) {
group.properties = row.group_properties
}
})

// Finally delete the teamId from the groupsByTeamTypeId
Object.values(groupsByTeamTypeId).forEach((group) => {
delete group.teamId
})

return items
}
}
29 changes: 9 additions & 20 deletions plugin-server/src/cdp/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,14 @@ export interface ParsedClickhouseEvent {
properties: Record<string, any>
person_created_at?: string
person_properties: Record<string, any>
group0_properties: Record<string, any>
group1_properties: Record<string, any>
group2_properties: Record<string, any>
group3_properties: Record<string, any>
group4_properties: Record<string, any>
group0_created_at?: string
group1_created_at?: string
group2_created_at?: string
group3_created_at?: string
group4_created_at?: string
}

export type GroupType = {
id: string // the "key" of the group
type: string
index: number
url: string
properties: Record<string, any>
}

export type HogFunctionInvocationGlobals = {
Expand All @@ -89,16 +87,7 @@ export type HogFunctionInvocationGlobals = {
url: string
properties: Record<string, any>
}
groups?: Record<
string,
{
id: string // the "key" of the group
type: string
index: number
url: string
properties: Record<string, any>
}
>
groups?: Record<string, GroupType>
}

export type HogFunctionInvocationGlobalsWithInputs = HogFunctionInvocationGlobals & {
Expand Down
Loading

0 comments on commit 9e03d7c

Please sign in to comment.