Skip to content

Commit

Permalink
feat(plugin-server): project_id in clickhouse_events_json (#25873)
Browse files Browse the repository at this point in the history
  • Loading branch information
Twixes committed Nov 4, 2024
1 parent e02ede3 commit d74cb47
Show file tree
Hide file tree
Showing 17 changed files with 122 additions and 53 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { EachBatchPayload } from 'kafkajs'

import { PostIngestionEvent, RawClickHouseEvent } from '../../../types'
import { PostIngestionEvent, RawKafkaEvent } from '../../../types'
import { convertToPostIngestionEvent } from '../../../utils/event'
import {
processComposeWebhookStep,
Expand Down Expand Up @@ -42,7 +42,7 @@ export async function handleComposeWebhookPlugins(
}

export async function eachMessageAppsOnEventHandlers(
clickHouseEvent: RawClickHouseEvent,
clickHouseEvent: RawKafkaEvent,
queue: KafkaJSIngestionConsumer
): Promise<void> {
const pluginConfigs = queue.pluginsServer.pluginConfigsPerTeam.get(clickHouseEvent.team_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { ActionMatcher } from 'worker/ingestion/action-matcher'
import { GroupTypeManager } from 'worker/ingestion/group-type-manager'
import { OrganizationManager } from 'worker/ingestion/organization-manager'

import { GroupTypeToColumnIndex, PostIngestionEvent, RawClickHouseEvent } from '../../../types'
import { GroupTypeToColumnIndex, PostIngestionEvent, RawKafkaEvent } from '../../../types'
import { DependencyUnavailableError } from '../../../utils/db/error'
import { PostgresRouter, PostgresUse } from '../../../utils/db/postgres'
import { convertToPostIngestionEvent } from '../../../utils/event'
Expand All @@ -30,17 +30,17 @@ export function groupIntoBatchesByUsage(
array: KafkaMessage[],
batchSize: number,
shouldProcess: (teamId: number) => boolean
): { eventBatch: RawClickHouseEvent[]; lastOffset: string; lastTimestamp: string }[] {
): { eventBatch: RawKafkaEvent[]; lastOffset: string; lastTimestamp: string }[] {
// Most events will not trigger a webhook call, so we want to filter them out as soon as possible
// to achieve the highest effective concurrency when executing the actual HTTP calls.
// actionMatcher holds an in-memory set of all teams with enabled webhooks, that we use to
// drop events based on that signal. To use it we must parse the message, as there aren't that many
// webhooks, we can keep batches of the parsed messages in memory with the offsets of the last message
const result: { eventBatch: RawClickHouseEvent[]; lastOffset: string; lastTimestamp: string }[] = []
let currentBatch: RawClickHouseEvent[] = []
const result: { eventBatch: RawKafkaEvent[]; lastOffset: string; lastTimestamp: string }[] = []
let currentBatch: RawKafkaEvent[] = []
let currentCount = 0
array.forEach((message, index) => {
const clickHouseEvent = JSON.parse(message.value!.toString()) as RawClickHouseEvent
const clickHouseEvent = JSON.parse(message.value!.toString()) as RawKafkaEvent
if (shouldProcess(clickHouseEvent.team_id)) {
currentBatch.push(clickHouseEvent)
currentCount++
Expand Down Expand Up @@ -90,7 +90,7 @@ export async function eachBatchWebhooksHandlers(
export async function eachBatchHandlerHelper(
payload: EachBatchPayload,
shouldProcess: (teamId: number) => boolean,
eachMessageHandler: (event: RawClickHouseEvent) => Promise<void>,
eachMessageHandler: (event: RawKafkaEvent) => Promise<void>,
concurrency: number,
stats_key: string
): Promise<void> {
Expand Down Expand Up @@ -123,7 +123,7 @@ export async function eachBatchHandlerHelper(
}

await Promise.all(
eventBatch.map((event: RawClickHouseEvent) => eachMessageHandler(event).finally(() => heartbeat()))
eventBatch.map((event: RawKafkaEvent) => eachMessageHandler(event).finally(() => heartbeat()))
)

resolveOffset(lastOffset)
Expand Down Expand Up @@ -202,19 +202,19 @@ async function addGroupPropertiesToPostIngestionEvent(
}

export async function eachMessageWebhooksHandlers(
clickHouseEvent: RawClickHouseEvent,
kafkaEvent: RawKafkaEvent,
actionMatcher: ActionMatcher,
hookCannon: HookCommander,
groupTypeManager: GroupTypeManager,
organizationManager: OrganizationManager,
postgres: PostgresRouter
): Promise<void> {
if (!actionMatcher.hasWebhooks(clickHouseEvent.team_id)) {
if (!actionMatcher.hasWebhooks(kafkaEvent.team_id)) {
// exit early if no webhooks nor resthooks
return
}

const eventWithoutGroups = convertToPostIngestionEvent(clickHouseEvent)
const eventWithoutGroups = convertToPostIngestionEvent(kafkaEvent)
// This is very inefficient, we always pull group properties for all groups (up to 5) for this event
// from PG if a webhook is defined for this team.
// Instead we should be lazily loading group properties only when needed, but this is the fastest way to fix this consumer
Expand Down
9 changes: 9 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,14 @@ export interface RawClickHouseEvent extends BaseEvent {
person_mode: PersonMode
}

export interface RawKafkaEvent extends RawClickHouseEvent {
/**
* The project ID field is only included in the `clickhouse_events_json` topic, not present in ClickHouse.
* That's because we need it in `property-defs-rs` and not elsewhere.
*/
project_id: number
}

/** Parsed event row from ClickHouse. */
export interface ClickHouseEvent extends BaseEvent {
timestamp: DateTime
Expand Down Expand Up @@ -732,6 +740,7 @@ export interface PreIngestionEvent {
eventUuid: string
event: string
teamId: TeamId
projectId: TeamId
distinctId: string
properties: Properties
timestamp: ISOTimestamp
Expand Down
12 changes: 10 additions & 2 deletions plugin-server/src/utils/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@ import { Message } from 'node-rdkafka'
import { Counter } from 'prom-client'

import { setUsageInNonPersonEventsCounter } from '../main/ingestion-queues/metrics'
import { ClickHouseEvent, HookPayload, PipelineEvent, PostIngestionEvent, RawClickHouseEvent } from '../types'
import {
ClickHouseEvent,
HookPayload,
PipelineEvent,
PostIngestionEvent,
RawClickHouseEvent,
RawKafkaEvent,
} from '../types'
import { chainToElements } from './db/elements-chain'
import {
hasDifferenceWithProposedNewNormalisationMode,
Expand Down Expand Up @@ -115,7 +122,7 @@ export function convertToPostHogEvent(event: PostIngestionEvent): PostHogEvent {

// NOTE: PostIngestionEvent is our context event - it should never be sent directly to an output, but rather transformed into a lightweight schema
// that we can keep to as a contract
export function convertToPostIngestionEvent(event: RawClickHouseEvent): PostIngestionEvent {
export function convertToPostIngestionEvent(event: RawKafkaEvent): PostIngestionEvent {
const properties = event.properties ? JSON.parse(event.properties) : {}
if (event.elements_chain) {
properties['$elements_chain'] = event.elements_chain
Expand All @@ -125,6 +132,7 @@ export function convertToPostIngestionEvent(event: RawClickHouseEvent): PostInge
eventUuid: event.uuid,
event: event.event!,
teamId: event.team_id,
projectId: event.project_id,
distinctId: event.distinct_id,
properties,
timestamp: clickHouseTimestampToISO(event.timestamp),
Expand Down
10 changes: 6 additions & 4 deletions plugin-server/src/worker/ingestion/process-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
Person,
PersonMode,
PreIngestionEvent,
RawClickHouseEvent,
RawKafkaEvent,
Team,
TimestampFormat,
} from '../../types'
Expand Down Expand Up @@ -187,6 +187,7 @@ export class EventsProcessor {
properties,
timestamp: timestamp.toISO() as ISOTimestamp,
teamId: team.id,
projectId: team.project_id,
}
}

Expand All @@ -205,8 +206,8 @@ export class EventsProcessor {
preIngestionEvent: PreIngestionEvent,
person: Person,
processPerson: boolean
): [RawClickHouseEvent, Promise<void>] {
const { eventUuid: uuid, event, teamId, distinctId, properties, timestamp } = preIngestionEvent
): [RawKafkaEvent, Promise<void>] {
const { eventUuid: uuid, event, teamId, projectId, distinctId, properties, timestamp } = preIngestionEvent

let elementsChain = ''
try {
Expand Down Expand Up @@ -245,12 +246,13 @@ export class EventsProcessor {
personMode = 'propertyless'
}

const rawEvent: RawClickHouseEvent = {
const rawEvent: RawKafkaEvent = {
uuid,
event: safeClickhouseString(event),
properties: JSON.stringify(properties ?? {}),
timestamp: castTimestampOrNow(timestamp, TimestampFormat.ClickHouse),
team_id: teamId,
project_id: projectId,
distinct_id: safeClickhouseString(distinctId),
elements_chain: safeClickhouseString(elementsChain),
created_at: castTimestampOrNow(null, TimestampFormat.ClickHouse),
Expand Down
6 changes: 5 additions & 1 deletion plugin-server/src/worker/ingestion/team-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export class TeamManager {
}

public async fetchTeam(teamId: number): Promise<Team | null> {
const cachedTeam = this.teamCache.get(teamId)
const cachedTeam = this.getCachedTeam(teamId)
if (cachedTeam !== undefined) {
return cachedTeam
}
Expand All @@ -56,6 +56,10 @@ export class TeamManager {
}
}

public getCachedTeam(teamId: TeamId): Team | null | undefined {
return this.teamCache.get(teamId)
}

public async getTeamByToken(token: string): Promise<Team | null> {
/**
* Validates and resolves the api token from an incoming event.
Expand Down
5 changes: 4 additions & 1 deletion plugin-server/tests/helpers/kafka.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Kafka, logLevel } from 'kafkajs'
import { CompressionCodecs, CompressionTypes, Kafka, logLevel } from 'kafkajs'
import SnappyCodec from 'kafkajs-snappy'

import { defaultConfig, overrideWithEnv } from '../../src/config/config'
import {
Expand All @@ -16,6 +17,8 @@ import {
import { PluginsServerConfig } from '../../src/types'
import { KAFKA_EVENTS_DEAD_LETTER_QUEUE } from './../../src/config/kafka-topics'

CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec

/** Clear the Kafka queue and return Kafka object */
export async function resetKafka(extraServerConfig?: Partial<PluginsServerConfig>): Promise<Kafka> {
const config = { ...overrideWithEnv(defaultConfig, process.env), ...extraServerConfig }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { eachMessageWebhooksHandlers } from '../../../src/main/ingestion-queues/batch-processing/each-batch-webhooks'
import { ClickHouseTimestamp, ClickHouseTimestampSecondPrecision, Hub, RawClickHouseEvent } from '../../../src/types'
import { ClickHouseTimestamp, ClickHouseTimestampSecondPrecision, Hub, RawKafkaEvent } from '../../../src/types'
import { closeHub, createHub } from '../../../src/utils/db/hub'
import { PostgresUse } from '../../../src/utils/db/postgres'
import { ActionManager } from '../../../src/worker/ingestion/action-manager'
Expand All @@ -11,7 +11,7 @@ import { resetTestDatabase } from '../../helpers/sql'

jest.mock('../../../src/utils/status')

const clickhouseEvent: RawClickHouseEvent = {
const kafkaEvent: RawKafkaEvent = {
event: '$pageview',
properties: JSON.stringify({
$ip: '127.0.0.1',
Expand All @@ -23,6 +23,7 @@ const clickhouseEvent: RawClickHouseEvent = {
elements_chain: '',
timestamp: '2020-02-23 02:15:00.00' as ClickHouseTimestamp,
team_id: 2,
project_id: 1,
distinct_id: 'my_id',
created_at: '2020-02-23 02:15:00.00' as ClickHouseTimestamp,
person_id: 'F99FA0A1-E0C2-4CFE-A09A-4C3C4327A4CC',
Expand Down Expand Up @@ -138,7 +139,7 @@ describe('eachMessageWebhooksHandlers', () => {
const postWebhookSpy = jest.spyOn(hookCannon.rustyHook, 'enqueueIfEnabledForTeam')

await eachMessageWebhooksHandlers(
clickhouseEvent,
kafkaEvent,
actionMatcher,
hookCannon,
groupTypeManager,
Expand Down Expand Up @@ -168,6 +169,7 @@ describe('eachMessageWebhooksHandlers', () => {
"person_created_at": "2020-02-20T02:15:00.000Z",
"person_id": "F99FA0A1-E0C2-4CFE-A09A-4C3C4327A4CC",
"person_properties": Object {},
"projectId": 1,
"properties": Object {
"$groups": Object {
"organization": "org_posthog",
Expand Down
32 changes: 17 additions & 15 deletions plugin-server/tests/main/ingestion-queues/each-batch.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
ClickHouseTimestampSecondPrecision,
ISOTimestamp,
PostIngestionEvent,
RawClickHouseEvent,
RawKafkaEvent,
} from '../../../src/types'
import { ActionManager } from '../../../src/worker/ingestion/action-manager'
import { ActionMatcher } from '../../../src/worker/ingestion/action-matcher'
Expand Down Expand Up @@ -50,6 +50,7 @@ const event: PostIngestionEvent = {
eventUuid: 'uuid1',
distinctId: 'my_id',
teamId: 2,
projectId: 1,
timestamp: '2020-02-23T02:15:00.000Z' as ISOTimestamp,
event: '$pageview',
properties: {},
Expand All @@ -59,7 +60,7 @@ const event: PostIngestionEvent = {
person_properties: {},
}

const clickhouseEvent: RawClickHouseEvent = {
const kafkaEvent: RawKafkaEvent = {
event: '$pageview',
properties: JSON.stringify({
$ip: '127.0.0.1',
Expand All @@ -68,6 +69,7 @@ const clickhouseEvent: RawClickHouseEvent = {
elements_chain: '',
timestamp: '2020-02-23 02:15:00.00' as ClickHouseTimestamp,
team_id: 2,
project_id: 1,
distinct_id: 'my_id',
created_at: '2020-02-23 02:15:00.00' as ClickHouseTimestamp,
person_id: 'F99FA0A1-E0C2-4CFE-A09A-4C3C4327A4CC',
Expand Down Expand Up @@ -146,7 +148,7 @@ describe('eachBatchX', () => {
describe('eachBatchAppsOnEventHandlers', () => {
it('calls runOnEvent when useful', async () => {
queue.pluginsServer.pluginConfigsPerTeam.set(2, [pluginConfig39])
await eachBatchAppsOnEventHandlers(createKafkaJSBatch(clickhouseEvent), queue)
await eachBatchAppsOnEventHandlers(createKafkaJSBatch(kafkaEvent), queue)
// TODO fix to jest spy on the actual function
expect(runOnEvent).toHaveBeenCalledWith(
expect.anything(),
Expand All @@ -159,7 +161,7 @@ describe('eachBatchX', () => {
})
it('skip runOnEvent when no pluginconfig for team', async () => {
queue.pluginsServer.pluginConfigsPerTeam.clear()
await eachBatchAppsOnEventHandlers(createKafkaJSBatch(clickhouseEvent), queue)
await eachBatchAppsOnEventHandlers(createKafkaJSBatch(kafkaEvent), queue)
expect(runOnEvent).not.toHaveBeenCalled()
})
})
Expand Down Expand Up @@ -191,7 +193,7 @@ describe('eachBatchX', () => {
// mock hasWebhooks to return true
actionMatcher.hasWebhooks = jest.fn(() => true)
await eachBatchWebhooksHandlers(
createKafkaJSBatch(clickhouseEvent),
createKafkaJSBatch(kafkaEvent),
actionMatcher,
hookCannon,
10,
Expand All @@ -215,61 +217,61 @@ describe('eachBatchX', () => {
// create a batch with 10 events each having teamId the same as offset, timestamp which all increment by 1
const batch = createKafkaJSBatchWithMultipleEvents([
{
...clickhouseEvent,
...kafkaEvent,
team_id: 1,
offset: 1,
kafkaTimestamp: '2020-02-23 00:01:00.00' as ClickHouseTimestamp,
},
{
...clickhouseEvent,
...kafkaEvent,
team_id: 2,
offset: 2,
kafkaTimestamp: '2020-02-23 00:02:00.00' as ClickHouseTimestamp,
},
{
...clickhouseEvent,
...kafkaEvent,
team_id: 3,
offset: 3,
kafkaTimestamp: '2020-02-23 00:03:00.00' as ClickHouseTimestamp,
},
{
...clickhouseEvent,
...kafkaEvent,
team_id: 4,
offset: 4,
kafkaTimestamp: '2020-02-23 00:04:00.00' as ClickHouseTimestamp,
},
{
...clickhouseEvent,
...kafkaEvent,
team_id: 5,
offset: 5,
kafkaTimestamp: '2020-02-23 00:05:00.00' as ClickHouseTimestamp,
},
{
...clickhouseEvent,
...kafkaEvent,
team_id: 6,
offset: 6,
kafkaTimestamp: '2020-02-23 00:06:00.00' as ClickHouseTimestamp,
},
{
...clickhouseEvent,
...kafkaEvent,
team_id: 7,
offset: 7,
kafkaTimestamp: '2020-02-23 00:07:00.00' as ClickHouseTimestamp,
},
{
...clickhouseEvent,
...kafkaEvent,
team_id: 8,
offset: 8,
kafkaTimestamp: '2020-02-23 00:08:00.00' as ClickHouseTimestamp,
},
{
...clickhouseEvent,
...kafkaEvent,
team_id: 9,
offset: 9,
kafkaTimestamp: '2020-02-23 00:09:00.00' as ClickHouseTimestamp,
},
{
...clickhouseEvent,
...kafkaEvent,
team_id: 10,
offset: 10,
kafkaTimestamp: '2020-02-23 00:10:00.00' as ClickHouseTimestamp,
Expand Down
Loading

0 comments on commit d74cb47

Please sign in to comment.