Skip to content

Commit

Permalink
feat(plugin-server): project_id in clickhouse_events_json
Browse files Browse the repository at this point in the history
  • Loading branch information
Twixes committed Oct 29, 2024
1 parent f04c43b commit c0345d0
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 26 deletions.
8 changes: 8 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,14 @@ export interface RawClickHouseEvent extends BaseEvent {
person_mode: PersonMode
}

export interface RawKafkaEvent extends RawClickHouseEvent {
/**
* The project ID field is only included in the `clickhouse_events_json` topic, not present in ClickHouse.
* That's because we need it in `property-defs-rs` and not elsewhere.
*/
project_id: number
}

/** Parsed event row from ClickHouse. */
export interface ClickHouseEvent extends BaseEvent {
timestamp: DateTime
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { Person, PreIngestionEvent, RawClickHouseEvent } from '../../../types'
import { EventPipelineRunner } from './runner'

export function createEventStep(
export async function createEventStep(
runner: EventPipelineRunner,
event: PreIngestionEvent,
person: Person,
processPerson: boolean
): [RawClickHouseEvent, Promise<void>] {
return runner.eventsProcessor.createEvent(event, person, processPerson)
): Promise<[RawClickHouseEvent, Promise<void>]> {
return await runner.eventsProcessor.createEvent(event, person, processPerson)
}
23 changes: 19 additions & 4 deletions plugin-server/src/worker/ingestion/process-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
Person,
PersonMode,
PreIngestionEvent,
RawClickHouseEvent,
RawKafkaEvent,
Team,
TimestampFormat,
} from '../../types'
Expand Down Expand Up @@ -201,13 +201,27 @@ export class EventsProcessor {
return res
}

createEvent(
async createEvent(
preIngestionEvent: PreIngestionEvent,
person: Person,
processPerson: boolean
): [RawClickHouseEvent, Promise<void>] {
): Promise<[RawKafkaEvent, Promise<void>]> {
const { eventUuid: uuid, event, teamId, distinctId, properties, timestamp } = preIngestionEvent

let team = this.teamManager.getCachedTeam(teamId)
if (team === undefined) {
Sentry.captureException(
new Error(
"Team cache wasn't warmed by eventsProcessor.processEvent() in prepareEventStep - this should not be the case in production"
),
{ tags: { team_id: teamId } }
)
team = await this.teamManager.fetchTeam(teamId)
}
if (team === null) {
throw new Error(`No team found with ID ${teamId}. Can't ingest event.`)
}

let elementsChain = ''
try {
elementsChain = this.getElementsChain(properties)
Expand Down Expand Up @@ -245,12 +259,13 @@ export class EventsProcessor {
personMode = 'propertyless'
}

const rawEvent: RawClickHouseEvent = {
const rawEvent: RawKafkaEvent = {
uuid,
event: safeClickhouseString(event),
properties: JSON.stringify(properties ?? {}),
timestamp: castTimestampOrNow(timestamp, TimestampFormat.ClickHouse),
team_id: teamId,
project_id: team.project_id,
distinct_id: safeClickhouseString(distinctId),
elements_chain: safeClickhouseString(elementsChain),
created_at: castTimestampOrNow(null, TimestampFormat.ClickHouse),
Expand Down
6 changes: 5 additions & 1 deletion plugin-server/src/worker/ingestion/team-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export class TeamManager {
}

public async fetchTeam(teamId: number): Promise<Team | null> {
const cachedTeam = this.teamCache.get(teamId)
const cachedTeam = this.getCachedTeam(teamId)
if (cachedTeam !== undefined) {
return cachedTeam
}
Expand All @@ -56,6 +56,10 @@ export class TeamManager {
}
}

public getCachedTeam(teamId: TeamId): Team | null | undefined {
return this.teamCache.get(teamId)
}

public async getTeamByToken(token: string): Promise<Team | null> {
/**
* Validates and resolves the api token from an incoming event.
Expand Down
5 changes: 4 additions & 1 deletion plugin-server/tests/helpers/kafka.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Kafka, logLevel } from 'kafkajs'
import { CompressionCodecs, CompressionTypes, Kafka, logLevel } from 'kafkajs'
import SnappyCodec from 'kafkajs-snappy'

import { defaultConfig, overrideWithEnv } from '../../src/config/config'
import {
Expand All @@ -16,6 +17,8 @@ import {
import { PluginsServerConfig } from '../../src/types'
import { KAFKA_EVENTS_DEAD_LETTER_QUEUE } from './../../src/config/kafka-topics'

CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec

/** Clear the Kafka queue and return Kafka object */
export async function resetKafka(extraServerConfig?: Partial<PluginsServerConfig>): Promise<Kafka> {
const config = { ...overrideWithEnv(defaultConfig, process.env), ...extraServerConfig }
Expand Down
60 changes: 43 additions & 17 deletions plugin-server/tests/worker/ingestion/process-event.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import * as IORedis from 'ioredis'
import { Consumer, Kafka, KafkaMessage } from 'kafkajs'
import { DateTime } from 'luxon'

import { KAFKA_EVENTS_JSON } from '../../../src/config/kafka-topics'
import { Hub, ISOTimestamp, Person, PreIngestionEvent } from '../../../src/types'
import { closeHub, createHub } from '../../../src/utils/db/hub'
import { UUIDT } from '../../../src/utils/utils'
Expand All @@ -13,11 +15,12 @@ jest.mock('../../../src/utils/status')
jest.setTimeout(600000) // 600 sec timeout.

let hub: Hub
let kafka: Kafka
let redis: IORedis.Redis
let eventsProcessor: EventsProcessor

beforeAll(async () => {
await resetKafka()
kafka = await resetKafka()
})

beforeEach(async () => {
Expand Down Expand Up @@ -45,14 +48,31 @@ describe('EventsProcessor#createEvent()', () => {
eventUuid,
timestamp,
distinctId: 'my_id',
ip: '127.0.0.1',
teamId: 2,
event: '$pageview',
properties: { event: 'property', $set: { foo: 'onEvent' } },
elementsList: [],
}

let kafkaEvents: KafkaMessage[]
let kafkaEventsConsumer: Consumer

beforeAll(async () => {
kafkaEventsConsumer = kafka.consumer({ groupId: 'process-event-test' })
await kafkaEventsConsumer.subscribe({ topic: KAFKA_EVENTS_JSON })
await kafkaEventsConsumer.run({
eachMessage: ({ message }) => {
kafkaEvents.push(message)
return Promise.resolve()
},
})
})

afterAll(async () => {
await kafkaEventsConsumer.disconnect()
})

beforeEach(async () => {
kafkaEvents = []
person = await hub.db.createPerson(
DateTime.fromISO(timestamp).toUTC(),
{ foo: 'onPerson', pprop: 5 },
Expand All @@ -68,13 +88,25 @@ describe('EventsProcessor#createEvent()', () => {

it('emits event with person columns, re-using event properties', async () => {
const processPerson = true
eventsProcessor.createEvent(preIngestionEvent, person, processPerson)
await eventsProcessor.createEvent(preIngestionEvent, person, processPerson)

await eventsProcessor.kafkaProducer.flush()

const events = await delayUntilEventIngested(() => hub.db.fetchEvents())
expect(events.length).toEqual(1)
expect(events[0]).toEqual(
// Waiting until we see the event in both Kafka nand ClickHouse
const chEvents = await delayUntilEventIngested(() => (kafkaEvents.length ? hub.db.fetchEvents() : []))
expect(kafkaEvents.length).toEqual(1)
expect(JSON.parse(kafkaEvents[0].value!.toString())).toEqual(
expect.objectContaining({
uuid: eventUuid,
event: '$pageview',
team_id: 2,
project_id: 2,
distinct_id: 'my_id',
person_id: personUuid,
})
)
expect(chEvents.length).toEqual(1)
expect(chEvents[0]).toEqual(
expect.objectContaining({
uuid: eventUuid,
event: '$pageview',
Expand Down Expand Up @@ -114,7 +146,7 @@ describe('EventsProcessor#createEvent()', () => {
)

const processPerson = true
eventsProcessor.createEvent(
await eventsProcessor.createEvent(
{ ...preIngestionEvent, properties: { $group_0: 'group_key' } },
person,
processPerson
Expand All @@ -136,7 +168,7 @@ describe('EventsProcessor#createEvent()', () => {

it('when $process_person_profile=false, emits event with without person properties or groups', async () => {
const processPerson = false
eventsProcessor.createEvent(
await eventsProcessor.createEvent(
{ ...preIngestionEvent, properties: { $group_0: 'group_key' } },
person,
processPerson
Expand Down Expand Up @@ -166,7 +198,7 @@ describe('EventsProcessor#createEvent()', () => {
it('force_upgrade persons are recorded as such', async () => {
const processPerson = false
person.force_upgrade = true
eventsProcessor.createEvent(
await eventsProcessor.createEvent(
{ ...preIngestionEvent, properties: { $group_0: 'group_key' } },
person,
processPerson
Expand Down Expand Up @@ -198,18 +230,12 @@ describe('EventsProcessor#createEvent()', () => {
const uuid = new UUIDT().toString()
const nonExistingPerson: Person = {
created_at: DateTime.fromISO(timestamp).toUTC(),
version: 0,
id: 0,
team_id: 0,
properties: { random: 'x' },
is_user_id: 0,
is_identified: false,
uuid: uuid,
properties_last_updated_at: {},
properties_last_operation: {},
}
const processPerson = true
eventsProcessor.createEvent(
await eventsProcessor.createEvent(
{ ...preIngestionEvent, distinctId: 'no-such-person' },
nonExistingPerson,
processPerson
Expand Down

0 comments on commit c0345d0

Please sign in to comment.