diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-onevent.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-onevent.ts index 9389455164e98..8a3035c5b6644 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-onevent.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-onevent.ts @@ -1,6 +1,6 @@ import { EachBatchPayload } from 'kafkajs' -import { PostIngestionEvent, RawClickHouseEvent } from '../../../types' +import { PostIngestionEvent, RawKafkaEvent } from '../../../types' import { convertToPostIngestionEvent } from '../../../utils/event' import { processComposeWebhookStep, @@ -42,7 +42,7 @@ export async function handleComposeWebhookPlugins( } export async function eachMessageAppsOnEventHandlers( - clickHouseEvent: RawClickHouseEvent, + clickHouseEvent: RawKafkaEvent, queue: KafkaJSIngestionConsumer ): Promise { const pluginConfigs = queue.pluginsServer.pluginConfigsPerTeam.get(clickHouseEvent.team_id) diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-webhooks.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-webhooks.ts index 51123f497363b..0cb0bf3f2bbe2 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-webhooks.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-webhooks.ts @@ -6,7 +6,7 @@ import { ActionMatcher } from 'worker/ingestion/action-matcher' import { GroupTypeManager } from 'worker/ingestion/group-type-manager' import { OrganizationManager } from 'worker/ingestion/organization-manager' -import { GroupTypeToColumnIndex, PostIngestionEvent, RawClickHouseEvent } from '../../../types' +import { GroupTypeToColumnIndex, PostIngestionEvent, RawKafkaEvent } from '../../../types' import { DependencyUnavailableError } from '../../../utils/db/error' import { PostgresRouter, PostgresUse } from '../../../utils/db/postgres' import { convertToPostIngestionEvent } from '../../../utils/event' @@ -30,17 +30,17 @@ export function groupIntoBatchesByUsage( array: KafkaMessage[], batchSize: number, shouldProcess: (teamId: number) => boolean -): { eventBatch: RawClickHouseEvent[]; lastOffset: string; lastTimestamp: string }[] { +): { eventBatch: RawKafkaEvent[]; lastOffset: string; lastTimestamp: string }[] { // Most events will not trigger a webhook call, so we want to filter them out as soon as possible // to achieve the highest effective concurrency when executing the actual HTTP calls. // actionMatcher holds an in-memory set of all teams with enabled webhooks, that we use to // drop events based on that signal. To use it we must parse the message, as there aren't that many // webhooks, we can keep batches of the parsed messages in memory with the offsets of the last message - const result: { eventBatch: RawClickHouseEvent[]; lastOffset: string; lastTimestamp: string }[] = [] - let currentBatch: RawClickHouseEvent[] = [] + const result: { eventBatch: RawKafkaEvent[]; lastOffset: string; lastTimestamp: string }[] = [] + let currentBatch: RawKafkaEvent[] = [] let currentCount = 0 array.forEach((message, index) => { - const clickHouseEvent = JSON.parse(message.value!.toString()) as RawClickHouseEvent + const clickHouseEvent = JSON.parse(message.value!.toString()) as RawKafkaEvent if (shouldProcess(clickHouseEvent.team_id)) { currentBatch.push(clickHouseEvent) currentCount++ @@ -90,7 +90,7 @@ export async function eachBatchWebhooksHandlers( export async function eachBatchHandlerHelper( payload: EachBatchPayload, shouldProcess: (teamId: number) => boolean, - eachMessageHandler: (event: RawClickHouseEvent) => Promise, + eachMessageHandler: (event: RawKafkaEvent) => Promise, concurrency: number, stats_key: string ): Promise { @@ -123,7 +123,7 @@ export async function eachBatchHandlerHelper( } await Promise.all( - eventBatch.map((event: RawClickHouseEvent) => eachMessageHandler(event).finally(() => heartbeat())) + eventBatch.map((event: RawKafkaEvent) => eachMessageHandler(event).finally(() => heartbeat())) ) resolveOffset(lastOffset) @@ -202,19 +202,19 @@ async function addGroupPropertiesToPostIngestionEvent( } export async function eachMessageWebhooksHandlers( - clickHouseEvent: RawClickHouseEvent, + kafkaEvent: RawKafkaEvent, actionMatcher: ActionMatcher, hookCannon: HookCommander, groupTypeManager: GroupTypeManager, organizationManager: OrganizationManager, postgres: PostgresRouter ): Promise { - if (!actionMatcher.hasWebhooks(clickHouseEvent.team_id)) { + if (!actionMatcher.hasWebhooks(kafkaEvent.team_id)) { // exit early if no webhooks nor resthooks return } - const eventWithoutGroups = convertToPostIngestionEvent(clickHouseEvent) + const eventWithoutGroups = convertToPostIngestionEvent(kafkaEvent) // This is very inefficient, we always pull group properties for all groups (up to 5) for this event // from PG if a webhook is defined for this team. // Instead we should be lazily loading group properties only when needed, but this is the fastest way to fix this consumer diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 42d31b00adea5..e4e3ac8e2aeee 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -704,6 +704,14 @@ export interface RawClickHouseEvent extends BaseEvent { person_mode: PersonMode } +export interface RawKafkaEvent extends RawClickHouseEvent { + /** + * The project ID field is only included in the `clickhouse_events_json` topic, not present in ClickHouse. + * That's because we need it in `property-defs-rs` and not elsewhere. + */ + project_id: number +} + /** Parsed event row from ClickHouse. */ export interface ClickHouseEvent extends BaseEvent { timestamp: DateTime @@ -732,6 +740,7 @@ export interface PreIngestionEvent { eventUuid: string event: string teamId: TeamId + projectId: TeamId distinctId: string properties: Properties timestamp: ISOTimestamp diff --git a/plugin-server/src/utils/event.ts b/plugin-server/src/utils/event.ts index 0d3f59bd8a4d7..26c6ab17dc541 100644 --- a/plugin-server/src/utils/event.ts +++ b/plugin-server/src/utils/event.ts @@ -3,7 +3,14 @@ import { Message } from 'node-rdkafka' import { Counter } from 'prom-client' import { setUsageInNonPersonEventsCounter } from '../main/ingestion-queues/metrics' -import { ClickHouseEvent, HookPayload, PipelineEvent, PostIngestionEvent, RawClickHouseEvent } from '../types' +import { + ClickHouseEvent, + HookPayload, + PipelineEvent, + PostIngestionEvent, + RawClickHouseEvent, + RawKafkaEvent, +} from '../types' import { chainToElements } from './db/elements-chain' import { hasDifferenceWithProposedNewNormalisationMode, @@ -115,7 +122,7 @@ export function convertToPostHogEvent(event: PostIngestionEvent): PostHogEvent { // NOTE: PostIngestionEvent is our context event - it should never be sent directly to an output, but rather transformed into a lightweight schema // that we can keep to as a contract -export function convertToPostIngestionEvent(event: RawClickHouseEvent): PostIngestionEvent { +export function convertToPostIngestionEvent(event: RawKafkaEvent): PostIngestionEvent { const properties = event.properties ? JSON.parse(event.properties) : {} if (event.elements_chain) { properties['$elements_chain'] = event.elements_chain @@ -125,6 +132,7 @@ export function convertToPostIngestionEvent(event: RawClickHouseEvent): PostInge eventUuid: event.uuid, event: event.event!, teamId: event.team_id, + projectId: event.project_id, distinctId: event.distinct_id, properties, timestamp: clickHouseTimestampToISO(event.timestamp), diff --git a/plugin-server/src/worker/ingestion/process-event.ts b/plugin-server/src/worker/ingestion/process-event.ts index fc53331c98eb4..05ebd54c72045 100644 --- a/plugin-server/src/worker/ingestion/process-event.ts +++ b/plugin-server/src/worker/ingestion/process-event.ts @@ -12,7 +12,7 @@ import { Person, PersonMode, PreIngestionEvent, - RawClickHouseEvent, + RawKafkaEvent, Team, TimestampFormat, } from '../../types' @@ -187,6 +187,7 @@ export class EventsProcessor { properties, timestamp: timestamp.toISO() as ISOTimestamp, teamId: team.id, + projectId: team.project_id, } } @@ -205,8 +206,8 @@ export class EventsProcessor { preIngestionEvent: PreIngestionEvent, person: Person, processPerson: boolean - ): [RawClickHouseEvent, Promise] { - const { eventUuid: uuid, event, teamId, distinctId, properties, timestamp } = preIngestionEvent + ): [RawKafkaEvent, Promise] { + const { eventUuid: uuid, event, teamId, projectId, distinctId, properties, timestamp } = preIngestionEvent let elementsChain = '' try { @@ -245,12 +246,13 @@ export class EventsProcessor { personMode = 'propertyless' } - const rawEvent: RawClickHouseEvent = { + const rawEvent: RawKafkaEvent = { uuid, event: safeClickhouseString(event), properties: JSON.stringify(properties ?? {}), timestamp: castTimestampOrNow(timestamp, TimestampFormat.ClickHouse), team_id: teamId, + project_id: projectId, distinct_id: safeClickhouseString(distinctId), elements_chain: safeClickhouseString(elementsChain), created_at: castTimestampOrNow(null, TimestampFormat.ClickHouse), diff --git a/plugin-server/src/worker/ingestion/team-manager.ts b/plugin-server/src/worker/ingestion/team-manager.ts index 9846071c094fd..46ec9ebcd0a05 100644 --- a/plugin-server/src/worker/ingestion/team-manager.ts +++ b/plugin-server/src/worker/ingestion/team-manager.ts @@ -41,7 +41,7 @@ export class TeamManager { } public async fetchTeam(teamId: number): Promise { - const cachedTeam = this.teamCache.get(teamId) + const cachedTeam = this.getCachedTeam(teamId) if (cachedTeam !== undefined) { return cachedTeam } @@ -56,6 +56,10 @@ export class TeamManager { } } + public getCachedTeam(teamId: TeamId): Team | null | undefined { + return this.teamCache.get(teamId) + } + public async getTeamByToken(token: string): Promise { /** * Validates and resolves the api token from an incoming event. diff --git a/plugin-server/tests/helpers/kafka.ts b/plugin-server/tests/helpers/kafka.ts index 775ae674ce86b..812ac3bdf7550 100644 --- a/plugin-server/tests/helpers/kafka.ts +++ b/plugin-server/tests/helpers/kafka.ts @@ -1,4 +1,5 @@ -import { Kafka, logLevel } from 'kafkajs' +import { CompressionCodecs, CompressionTypes, Kafka, logLevel } from 'kafkajs' +import SnappyCodec from 'kafkajs-snappy' import { defaultConfig, overrideWithEnv } from '../../src/config/config' import { @@ -16,6 +17,8 @@ import { import { PluginsServerConfig } from '../../src/types' import { KAFKA_EVENTS_DEAD_LETTER_QUEUE } from './../../src/config/kafka-topics' +CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec + /** Clear the Kafka queue and return Kafka object */ export async function resetKafka(extraServerConfig?: Partial): Promise { const config = { ...overrideWithEnv(defaultConfig, process.env), ...extraServerConfig } diff --git a/plugin-server/tests/main/ingestion-queues/each-batch-webhooks.test.ts b/plugin-server/tests/main/ingestion-queues/each-batch-webhooks.test.ts index ef1dbca1e887c..944f35ea4e74d 100644 --- a/plugin-server/tests/main/ingestion-queues/each-batch-webhooks.test.ts +++ b/plugin-server/tests/main/ingestion-queues/each-batch-webhooks.test.ts @@ -1,5 +1,5 @@ import { eachMessageWebhooksHandlers } from '../../../src/main/ingestion-queues/batch-processing/each-batch-webhooks' -import { ClickHouseTimestamp, ClickHouseTimestampSecondPrecision, Hub, RawClickHouseEvent } from '../../../src/types' +import { ClickHouseTimestamp, ClickHouseTimestampSecondPrecision, Hub, RawKafkaEvent } from '../../../src/types' import { closeHub, createHub } from '../../../src/utils/db/hub' import { PostgresUse } from '../../../src/utils/db/postgres' import { ActionManager } from '../../../src/worker/ingestion/action-manager' @@ -11,7 +11,7 @@ import { resetTestDatabase } from '../../helpers/sql' jest.mock('../../../src/utils/status') -const clickhouseEvent: RawClickHouseEvent = { +const kafkaEvent: RawKafkaEvent = { event: '$pageview', properties: JSON.stringify({ $ip: '127.0.0.1', @@ -23,6 +23,7 @@ const clickhouseEvent: RawClickHouseEvent = { elements_chain: '', timestamp: '2020-02-23 02:15:00.00' as ClickHouseTimestamp, team_id: 2, + project_id: 1, distinct_id: 'my_id', created_at: '2020-02-23 02:15:00.00' as ClickHouseTimestamp, person_id: 'F99FA0A1-E0C2-4CFE-A09A-4C3C4327A4CC', @@ -138,7 +139,7 @@ describe('eachMessageWebhooksHandlers', () => { const postWebhookSpy = jest.spyOn(hookCannon.rustyHook, 'enqueueIfEnabledForTeam') await eachMessageWebhooksHandlers( - clickhouseEvent, + kafkaEvent, actionMatcher, hookCannon, groupTypeManager, @@ -168,6 +169,7 @@ describe('eachMessageWebhooksHandlers', () => { "person_created_at": "2020-02-20T02:15:00.000Z", "person_id": "F99FA0A1-E0C2-4CFE-A09A-4C3C4327A4CC", "person_properties": Object {}, + "projectId": 1, "properties": Object { "$groups": Object { "organization": "org_posthog", diff --git a/plugin-server/tests/main/ingestion-queues/each-batch.test.ts b/plugin-server/tests/main/ingestion-queues/each-batch.test.ts index cb4b33373545c..adac7d05fd456 100644 --- a/plugin-server/tests/main/ingestion-queues/each-batch.test.ts +++ b/plugin-server/tests/main/ingestion-queues/each-batch.test.ts @@ -16,7 +16,7 @@ import { ClickHouseTimestampSecondPrecision, ISOTimestamp, PostIngestionEvent, - RawClickHouseEvent, + RawKafkaEvent, } from '../../../src/types' import { ActionManager } from '../../../src/worker/ingestion/action-manager' import { ActionMatcher } from '../../../src/worker/ingestion/action-matcher' @@ -50,6 +50,7 @@ const event: PostIngestionEvent = { eventUuid: 'uuid1', distinctId: 'my_id', teamId: 2, + projectId: 1, timestamp: '2020-02-23T02:15:00.000Z' as ISOTimestamp, event: '$pageview', properties: {}, @@ -59,7 +60,7 @@ const event: PostIngestionEvent = { person_properties: {}, } -const clickhouseEvent: RawClickHouseEvent = { +const kafkaEvent: RawKafkaEvent = { event: '$pageview', properties: JSON.stringify({ $ip: '127.0.0.1', @@ -68,6 +69,7 @@ const clickhouseEvent: RawClickHouseEvent = { elements_chain: '', timestamp: '2020-02-23 02:15:00.00' as ClickHouseTimestamp, team_id: 2, + project_id: 1, distinct_id: 'my_id', created_at: '2020-02-23 02:15:00.00' as ClickHouseTimestamp, person_id: 'F99FA0A1-E0C2-4CFE-A09A-4C3C4327A4CC', @@ -146,7 +148,7 @@ describe('eachBatchX', () => { describe('eachBatchAppsOnEventHandlers', () => { it('calls runOnEvent when useful', async () => { queue.pluginsServer.pluginConfigsPerTeam.set(2, [pluginConfig39]) - await eachBatchAppsOnEventHandlers(createKafkaJSBatch(clickhouseEvent), queue) + await eachBatchAppsOnEventHandlers(createKafkaJSBatch(kafkaEvent), queue) // TODO fix to jest spy on the actual function expect(runOnEvent).toHaveBeenCalledWith( expect.anything(), @@ -159,7 +161,7 @@ describe('eachBatchX', () => { }) it('skip runOnEvent when no pluginconfig for team', async () => { queue.pluginsServer.pluginConfigsPerTeam.clear() - await eachBatchAppsOnEventHandlers(createKafkaJSBatch(clickhouseEvent), queue) + await eachBatchAppsOnEventHandlers(createKafkaJSBatch(kafkaEvent), queue) expect(runOnEvent).not.toHaveBeenCalled() }) }) @@ -191,7 +193,7 @@ describe('eachBatchX', () => { // mock hasWebhooks to return true actionMatcher.hasWebhooks = jest.fn(() => true) await eachBatchWebhooksHandlers( - createKafkaJSBatch(clickhouseEvent), + createKafkaJSBatch(kafkaEvent), actionMatcher, hookCannon, 10, @@ -215,61 +217,61 @@ describe('eachBatchX', () => { // create a batch with 10 events each having teamId the same as offset, timestamp which all increment by 1 const batch = createKafkaJSBatchWithMultipleEvents([ { - ...clickhouseEvent, + ...kafkaEvent, team_id: 1, offset: 1, kafkaTimestamp: '2020-02-23 00:01:00.00' as ClickHouseTimestamp, }, { - ...clickhouseEvent, + ...kafkaEvent, team_id: 2, offset: 2, kafkaTimestamp: '2020-02-23 00:02:00.00' as ClickHouseTimestamp, }, { - ...clickhouseEvent, + ...kafkaEvent, team_id: 3, offset: 3, kafkaTimestamp: '2020-02-23 00:03:00.00' as ClickHouseTimestamp, }, { - ...clickhouseEvent, + ...kafkaEvent, team_id: 4, offset: 4, kafkaTimestamp: '2020-02-23 00:04:00.00' as ClickHouseTimestamp, }, { - ...clickhouseEvent, + ...kafkaEvent, team_id: 5, offset: 5, kafkaTimestamp: '2020-02-23 00:05:00.00' as ClickHouseTimestamp, }, { - ...clickhouseEvent, + ...kafkaEvent, team_id: 6, offset: 6, kafkaTimestamp: '2020-02-23 00:06:00.00' as ClickHouseTimestamp, }, { - ...clickhouseEvent, + ...kafkaEvent, team_id: 7, offset: 7, kafkaTimestamp: '2020-02-23 00:07:00.00' as ClickHouseTimestamp, }, { - ...clickhouseEvent, + ...kafkaEvent, team_id: 8, offset: 8, kafkaTimestamp: '2020-02-23 00:08:00.00' as ClickHouseTimestamp, }, { - ...clickhouseEvent, + ...kafkaEvent, team_id: 9, offset: 9, kafkaTimestamp: '2020-02-23 00:09:00.00' as ClickHouseTimestamp, }, { - ...clickhouseEvent, + ...kafkaEvent, team_id: 10, offset: 10, kafkaTimestamp: '2020-02-23 00:10:00.00' as ClickHouseTimestamp, diff --git a/plugin-server/tests/utils/event.test.ts b/plugin-server/tests/utils/event.test.ts index 74bee4ffa23b1..decdb0549cbf7 100644 --- a/plugin-server/tests/utils/event.test.ts +++ b/plugin-server/tests/utils/event.test.ts @@ -1,7 +1,7 @@ import { KafkaMessage } from 'kafkajs' import { DateTime } from 'luxon' -import { ClickHouseTimestamp, RawClickHouseEvent } from '../../src/types' +import { ClickHouseTimestamp, RawKafkaEvent } from '../../src/types' import { formPipelineEvent, normalizeEvent, parseRawClickHouseEvent } from '../../src/utils/event' describe('normalizeEvent()', () => { @@ -48,7 +48,7 @@ describe('normalizeEvent()', () => { describe('parseRawClickHouseEvent()', () => { it('parses a random event', () => { - const clickhouseEvent: RawClickHouseEvent = { + const kafkaEvent: RawKafkaEvent = { event: '$pageview', properties: JSON.stringify({ $ip: '127.0.0.1', @@ -57,6 +57,7 @@ describe('parseRawClickHouseEvent()', () => { elements_chain: '', timestamp: '2020-02-23 02:15:00.00' as ClickHouseTimestamp, team_id: 2, + project_id: 1, distinct_id: 'my_id', created_at: '2020-02-23 02:15:00.00' as ClickHouseTimestamp, person_created_at: '2020-02-23 02:10:00.00' as ClickHouseTimestamp, @@ -65,7 +66,7 @@ describe('parseRawClickHouseEvent()', () => { group1_properties: JSON.stringify({ a: 1, b: 2 }), } - expect(parseRawClickHouseEvent(clickhouseEvent)).toEqual({ + expect(parseRawClickHouseEvent(kafkaEvent)).toEqual({ event: '$pageview', properties: { $ip: '127.0.0.1', @@ -73,6 +74,7 @@ describe('parseRawClickHouseEvent()', () => { uuid: 'uuid1', timestamp: DateTime.fromISO('2020-02-23T02:15:00.000Z').toUTC(), team_id: 2, + project_id: 1, distinct_id: 'my_id', created_at: DateTime.fromISO('2020-02-23T02:15:00.000Z').toUTC(), elements_chain: null, diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap b/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap index f84f2ccc21250..1d1b12faa29ac 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap +++ b/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap @@ -89,6 +89,7 @@ Array [ "event": "$pageview", "eventUuid": "uuid1", "ip": "127.0.0.1", + "projectId": 1, "properties": Object {}, "teamId": 2, "timestamp": "2020-02-23T02:15:00.000Z", @@ -104,6 +105,7 @@ Array [ "event": "$pageview", "eventUuid": "uuid1", "ip": "127.0.0.1", + "projectId": 1, "properties": Object {}, "teamId": 2, "timestamp": "2020-02-23T02:15:00.000Z", @@ -119,6 +121,7 @@ Array [ "event": "$pageview", "eventUuid": "uuid1", "ip": "127.0.0.1", + "projectId": 1, "properties": Object {}, "teamId": 2, "timestamp": "2020-02-23T02:15:00.000Z", diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/enrichExceptionEventStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/enrichExceptionEventStep.test.ts index 7330865f51bce..1f007b1aff57c 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/enrichExceptionEventStep.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/enrichExceptionEventStep.test.ts @@ -43,6 +43,7 @@ const preIngestionEvent: PreIngestionEvent = { }, timestamp: '2024-04-17T12:06:46.861Z' as ISOTimestamp, teamId: 1, + projectId: 1, } describe('enrichExceptionEvent()', () => { diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/extractHeatmapDataStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/extractHeatmapDataStep.test.ts index 2fe66ad4dfa37..52de14eabf552 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/extractHeatmapDataStep.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/extractHeatmapDataStep.test.ts @@ -120,6 +120,7 @@ const preIngestionEvent: PreIngestionEvent = { }, timestamp: '2024-04-17T12:06:46.861Z' as ISOTimestamp, teamId: 1, + projectId: 1, } describe('extractHeatmapDataStep()', () => { diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/prepareEventStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/prepareEventStep.test.ts index 8c203468348aa..1c817de16bc31 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/prepareEventStep.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/prepareEventStep.test.ts @@ -40,6 +40,7 @@ const person: Person = { const teamTwo: Team = { id: 2, + project_id: 1, uuid: 'af95d312-1a0a-4208-b80f-562ddafc9bcd', organization_id: '66f3f7bf-44e2-45dd-9901-5dbd93744e3a', name: 'testTeam', @@ -90,6 +91,7 @@ describe('prepareEventStep()', () => { $ip: '127.0.0.1', }, teamId: 2, + projectId: 1, timestamp: '2020-02-23T02:15:00.000Z', }) expect(hub.db.kafkaProducer!.queueMessage).not.toHaveBeenCalled() @@ -108,6 +110,7 @@ describe('prepareEventStep()', () => { eventUuid: '017ef865-19da-0000-3b60-1506093bf40f', properties: {}, teamId: 2, + projectId: 1, timestamp: '2020-02-23T02:15:00.000Z', }) expect(hub.db.kafkaProducer!.queueMessage).not.toHaveBeenCalled() diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts index 2aad4445410f8..78301b2503ad7 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts @@ -67,6 +67,7 @@ const preIngestionEvent: PreIngestionEvent = { distinctId: 'my_id', ip: '127.0.0.1', teamId: 2, + projectId: 1, timestamp: '2020-02-23T02:15:00.000Z' as ISOTimestamp, event: '$pageview', properties: {}, diff --git a/plugin-server/tests/worker/ingestion/process-event.test.ts b/plugin-server/tests/worker/ingestion/process-event.test.ts index 1187cf5336d29..4e828ebd8ca62 100644 --- a/plugin-server/tests/worker/ingestion/process-event.test.ts +++ b/plugin-server/tests/worker/ingestion/process-event.test.ts @@ -1,6 +1,8 @@ import * as IORedis from 'ioredis' +import { Consumer, Kafka, KafkaMessage } from 'kafkajs' import { DateTime } from 'luxon' +import { KAFKA_EVENTS_JSON } from '../../../src/config/kafka-topics' import { Hub, ISOTimestamp, Person, PreIngestionEvent } from '../../../src/types' import { closeHub, createHub } from '../../../src/utils/db/hub' import { UUIDT } from '../../../src/utils/utils' @@ -13,11 +15,12 @@ jest.mock('../../../src/utils/status') jest.setTimeout(600000) // 600 sec timeout. let hub: Hub +let kafka: Kafka let redis: IORedis.Redis let eventsProcessor: EventsProcessor beforeAll(async () => { - await resetKafka() + kafka = await resetKafka() }) beforeEach(async () => { @@ -45,14 +48,32 @@ describe('EventsProcessor#createEvent()', () => { eventUuid, timestamp, distinctId: 'my_id', - ip: '127.0.0.1', teamId: 2, + projectId: 1, event: '$pageview', properties: { event: 'property', $set: { foo: 'onEvent' } }, - elementsList: [], } + let kafkaEvents: KafkaMessage[] + let kafkaEventsConsumer: Consumer + + beforeAll(async () => { + kafkaEventsConsumer = kafka.consumer({ groupId: 'process-event-test' }) + await kafkaEventsConsumer.subscribe({ topic: KAFKA_EVENTS_JSON }) + await kafkaEventsConsumer.run({ + eachMessage: ({ message }) => { + kafkaEvents.push(message) + return Promise.resolve() + }, + }) + }) + + afterAll(async () => { + await kafkaEventsConsumer.disconnect() + }) + beforeEach(async () => { + kafkaEvents = [] person = await hub.db.createPerson( DateTime.fromISO(timestamp).toUTC(), { foo: 'onPerson', pprop: 5 }, @@ -72,9 +93,21 @@ describe('EventsProcessor#createEvent()', () => { await eventsProcessor.kafkaProducer.flush() - const events = await delayUntilEventIngested(() => hub.db.fetchEvents()) - expect(events.length).toEqual(1) - expect(events[0]).toEqual( + // Waiting until we see the event in both Kafka nand ClickHouse + const chEvents = await delayUntilEventIngested(() => (kafkaEvents.length ? hub.db.fetchEvents() : [])) + expect(kafkaEvents.length).toEqual(1) + expect(JSON.parse(kafkaEvents[0].value!.toString())).toEqual( + expect.objectContaining({ + uuid: eventUuid, + event: '$pageview', + team_id: 2, + project_id: 1, + distinct_id: 'my_id', + person_id: personUuid, + }) + ) + expect(chEvents.length).toEqual(1) + expect(chEvents[0]).toEqual( expect.objectContaining({ uuid: eventUuid, event: '$pageview', @@ -198,15 +231,9 @@ describe('EventsProcessor#createEvent()', () => { const uuid = new UUIDT().toString() const nonExistingPerson: Person = { created_at: DateTime.fromISO(timestamp).toUTC(), - version: 0, - id: 0, team_id: 0, properties: { random: 'x' }, - is_user_id: 0, - is_identified: false, uuid: uuid, - properties_last_updated_at: {}, - properties_last_operation: {}, } const processPerson = true eventsProcessor.createEvent( diff --git a/plugin-server/tests/worker/ingestion/webhook-formatter.test.ts b/plugin-server/tests/worker/ingestion/webhook-formatter.test.ts index 5e26261087ba6..f851d8a21ca60 100644 --- a/plugin-server/tests/worker/ingestion/webhook-formatter.test.ts +++ b/plugin-server/tests/worker/ingestion/webhook-formatter.test.ts @@ -11,6 +11,7 @@ describe('WebhookFormatter', () => { event: '$pageview', eventUuid: '123', teamId: 123, + projectId: 1, distinctId: 'WALL-E', person_properties: { email: 'test@posthog.com' }, person_created_at: '2021-10-31T00%3A44%3A00.000Z' as ISOTimestamp,