Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite committed Dec 25, 2024
1 parent 9e5d1dc commit 38bcfeb
Show file tree
Hide file tree
Showing 20 changed files with 134 additions and 184 deletions.
8 changes: 4 additions & 4 deletions plugin-server/src/cdp/cdp-consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,17 +157,17 @@ abstract class CdpConsumerBase {
const messages = [...this.messagesToProduce]
this.messagesToProduce = []

await this.kafkaProducer!.queueMessages({
kafkaMessages: messages.map((x) => ({
await this.kafkaProducer!.queueMessages(
messages.map((x) => ({
topic: x.topic,
messages: [
{
value: safeClickhouseString(JSON.stringify(x.value)),
key: x.key,
},
],
})),
}).catch((reason) => {
}))
).catch((reason) => {
status.error('⚠️', `failed to produce message: ${reason}`)
})
}
Expand Down
19 changes: 8 additions & 11 deletions plugin-server/src/kafka/producer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Message, ProducerRecord } from 'kafkajs'
import { Message } from 'kafkajs'
import {
ClientMetrics,
HighLevelProducer,
Expand Down Expand Up @@ -29,11 +29,11 @@ import { createRdConnectionConfigFromEnvVars } from './config'

export type MessageKey = Exclude<RdKafkaMessageKey, undefined>

export type TopicMessages = {
export type TopicMessage = {
topic: string
messages: {
value: string | Buffer
key: MessageKey
value: string | Buffer | null
key?: MessageKey
}[]
}

Expand Down Expand Up @@ -145,15 +145,11 @@ export class KafkaProducerWrapper {
}
}

async queueMessages({
kafkaMessages: kafkaMessage,
}: {
kafkaMessages: ProducerRecord | ProducerRecord[]
}): Promise<void> {
const records = Array.isArray(kafkaMessage) ? kafkaMessage : [kafkaMessage]
async queueMessages(topicMessages: TopicMessage | TopicMessage[]): Promise<void> {
topicMessages = Array.isArray(topicMessages) ? topicMessages : [topicMessages]

await Promise.all(
records.map((record) => {
topicMessages.map((record) => {
return Promise.all(
record.messages.map((message) =>
this.produce({
Expand All @@ -167,6 +163,7 @@ export class KafkaProducerWrapper {
})
)
}

public async flush() {
status.debug('📤', 'flushing_producer')

Expand Down
6 changes: 2 additions & 4 deletions plugin-server/src/main/graphile-worker/schedule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,8 @@ export async function runScheduledTasks(
for (const pluginConfigId of server.pluginSchedule?.[taskType] || []) {
status.info('⏲️', 'queueing_schedule_task', { taskType, pluginConfigId })
await server.kafkaProducer.queueMessages({
kafkaMessages: {
topic: KAFKA_SCHEDULED_TASKS,
messages: [{ key: pluginConfigId.toString(), value: JSON.stringify({ taskType, pluginConfigId }) }],
},
topic: KAFKA_SCHEDULED_TASKS,
messages: [{ key: pluginConfigId.toString(), value: JSON.stringify({ taskType, pluginConfigId }) }],
})
graphileScheduledTaskCounter.labels({ status: 'queued', task: taskType }).inc()
}
Expand Down
12 changes: 4 additions & 8 deletions plugin-server/src/main/ingestion-queues/jobs-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,8 @@ export const startJobsConsumer = async ({
})
// TODO: handle resolving offsets asynchronously
await producer.queueMessages({
kafkaMessages: {
topic: KAFKA_JOBS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
topic: KAFKA_JOBS_DLQ,
messages: [{ value: message.value, key: message.key }],
})
resolveOffset(message.offset)
continue
Expand All @@ -74,10 +72,8 @@ export const startJobsConsumer = async ({
})
// TODO: handle resolving offsets asynchronously
await producer.queueMessages({
kafkaMessages: {
topic: KAFKA_JOBS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
topic: KAFKA_JOBS_DLQ,
messages: [{ value: message.value, key: message.key }],
})
resolveOffset(message.offset)
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,8 @@ const getTasksFromBatch = async (batch: Batch, producer: KafkaProducerWrapper) =
value: message.value,
})
await producer.queueMessages({
kafkaMessages: {
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
})
continue
}
Expand All @@ -185,21 +183,17 @@ const getTasksFromBatch = async (batch: Batch, producer: KafkaProducerWrapper) =
error: error.stack ?? error,
})
await producer.queueMessages({
kafkaMessages: {
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
})
continue
}

if (!taskTypes.includes(task.taskType) || isNaN(task.pluginConfigId)) {
status.warn('⚠️', `Invalid schema for partition ${batch.partition} offset ${message.offset}.`, task)
await producer.queueMessages({
kafkaMessages: {
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
})
continue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,11 @@ export class ConsoleLogsIngester {

return [
this.producer.queueMessages({
kafkaMessages: {
topic: KAFKA_LOG_ENTRIES,
messages: consoleLogEvents.map((cle: ConsoleLogEntry) => ({
value: JSON.stringify(cle),
key: event.session_id,
})),
},
topic: KAFKA_LOG_ENTRIES,
messages: consoleLogEvents.map((cle: ConsoleLogEntry) => ({
value: JSON.stringify(cle),
key: event.session_id,
})),
}),
]
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,13 @@ export class ReplayEventsIngester {

return [
this.producer.queueMessages({
kafkaMessages: {
topic: KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS,
messages: [
{
value: JSON.stringify(replayRecord),
key: event.session_id,
},
],
},
topic: KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS,
messages: [
{
value: JSON.stringify(replayRecord),
key: event.session_id,
},
],
}),
]
} catch (error) {
Expand Down
51 changes: 23 additions & 28 deletions plugin-server/src/utils/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ import { CacheOptions, Properties } from '@posthog/plugin-scaffold'
import { captureException } from '@sentry/node'
import { Pool as GenericPool } from 'generic-pool'
import Redis from 'ioredis'
import { ProducerRecord } from 'kafkajs'
import { DateTime } from 'luxon'
import { QueryResult } from 'pg'

import { KAFKA_GROUPS, KAFKA_PERSON_DISTINCT_ID, KAFKA_PLUGIN_LOG_ENTRIES } from '../../config/kafka-topics'
import { KafkaProducerWrapper } from '../../kafka/producer'
import { KafkaProducerWrapper, TopicMessage } from '../../kafka/producer'
import {
Action,
ClickHouseEvent,
Expand Down Expand Up @@ -697,7 +696,7 @@ export class DB {
})
}

await this.kafkaProducer.queueMessages({ kafkaMessages })
await this.kafkaProducer.queueMessages(kafkaMessages)
return person
}

Expand All @@ -706,7 +705,7 @@ export class DB {
person: InternalPerson,
update: Partial<InternalPerson>,
tx?: TransactionClient
): Promise<[InternalPerson, ProducerRecord[]]> {
): Promise<[InternalPerson, TopicMessage[]]> {
let versionString = 'COALESCE(version, 0)::numeric + 1'
if (update.version) {
versionString = update.version.toString()
Expand Down Expand Up @@ -758,15 +757,15 @@ export class DB {
return [updatedPerson, [kafkaMessage]]
}

public async deletePerson(person: InternalPerson, tx?: TransactionClient): Promise<ProducerRecord[]> {
public async deletePerson(person: InternalPerson, tx?: TransactionClient): Promise<TopicMessage[]> {
const { rows } = await this.postgres.query<{ version: string }>(
tx ?? PostgresUse.COMMON_WRITE,
'DELETE FROM posthog_person WHERE team_id = $1 AND id = $2 RETURNING version',
[person.team_id, person.id],
'deletePerson'
)

let kafkaMessages: ProducerRecord[] = []
let kafkaMessages: TopicMessage[] = []

if (rows.length > 0) {
const [row] = rows
Expand Down Expand Up @@ -881,7 +880,7 @@ export class DB {
): Promise<void> {
const kafkaMessages = await this.addDistinctIdPooled(person, distinctId, version, tx)
if (kafkaMessages.length) {
await this.kafkaProducer.queueMessages({ kafkaMessages })
await this.kafkaProducer.queueMessages(kafkaMessages)
}
}

Expand All @@ -890,7 +889,7 @@ export class DB {
distinctId: string,
version: number,
tx?: TransactionClient
): Promise<ProducerRecord[]> {
): Promise<TopicMessage[]> {
const insertResult = await this.postgres.query(
tx ?? PostgresUse.COMMON_WRITE,
// NOTE: Keep this in sync with the posthog_persondistinctid INSERT in `createPerson`
Expand Down Expand Up @@ -923,7 +922,7 @@ export class DB {
source: InternalPerson,
target: InternalPerson,
tx?: TransactionClient
): Promise<ProducerRecord[]> {
): Promise<TopicMessage[]> {
let movedDistinctIdResult: QueryResult<any> | null = null
try {
movedDistinctIdResult = await this.postgres.query(
Expand Down Expand Up @@ -1133,10 +1132,8 @@ export class DB {
// disk.
void this.kafkaProducer
.queueMessages({
kafkaMessages: {
topic: KAFKA_PLUGIN_LOG_ENTRIES,
messages: [{ key: parsedEntry.id, value: JSON.stringify(parsedEntry) }],
},
topic: KAFKA_PLUGIN_LOG_ENTRIES,
messages: [{ key: parsedEntry.id, value: JSON.stringify(parsedEntry) }],
})
.catch((error) => {
status.warn('⚠️', 'Failed to produce plugin log entry', {
Expand Down Expand Up @@ -1424,21 +1421,19 @@ export class DB {
version: number
): Promise<void> {
await this.kafkaProducer.queueMessages({
kafkaMessages: {
topic: KAFKA_GROUPS,
messages: [
{
value: JSON.stringify({
group_type_index: groupTypeIndex,
group_key: groupKey,
team_id: teamId,
group_properties: JSON.stringify(properties),
created_at: castTimestampOrNow(createdAt, TimestampFormat.ClickHouseSecondPrecision),
version,
}),
},
],
},
topic: KAFKA_GROUPS,
messages: [
{
value: JSON.stringify({
group_type_index: groupTypeIndex,
group_key: groupKey,
team_id: teamId,
group_properties: JSON.stringify(properties),
created_at: castTimestampOrNow(createdAt, TimestampFormat.ClickHouseSecondPrecision),
version,
}),
},
],
})
}

Expand Down
16 changes: 7 additions & 9 deletions plugin-server/src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,13 @@ export async function createHub(
// chained, and if we do not manage to produce then the chain will be
// broken.
await kafkaProducer.queueMessages({
kafkaMessages: {
topic: KAFKA_JOBS,
messages: [
{
value: Buffer.from(JSON.stringify(job)),
key: Buffer.from(job.pluginConfigTeam.toString()),
},
],
},
topic: KAFKA_JOBS,
messages: [
{
value: Buffer.from(JSON.stringify(job)),
key: Buffer.from(job.pluginConfigTeam.toString()),
},
],
})
}

Expand Down
4 changes: 2 additions & 2 deletions plugin-server/src/utils/db/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Properties } from '@posthog/plugin-scaffold'
import * as Sentry from '@sentry/node'
import { ProducerRecord } from 'kafkajs'
import { TopicMessage } from 'kafka/producer'
import { Counter } from 'prom-client'

import { defaultConfig } from '../../config/config'
Expand Down Expand Up @@ -179,7 +179,7 @@ export function hasDifferenceWithProposedNewNormalisationMode(properties: Proper
return !areMapsEqual(setOnce, filteredSetOnce)
}

export function generateKafkaPersonUpdateMessage(person: InternalPerson, isDeleted = false): ProducerRecord {
export function generateKafkaPersonUpdateMessage(person: InternalPerson, isDeleted = false): TopicMessage {
return {
topic: KAFKA_PERSON,
messages: [
Expand Down
8 changes: 3 additions & 5 deletions plugin-server/src/worker/ingestion/app-metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ export class AppMetrics {
this.queueSize = 0
this.queuedData = {}

const kafkaMessages: Message[] = Object.values(queue).map((value) => ({
const messages: Message[] = Object.values(queue).map((value) => ({
value: JSON.stringify({
timestamp: castTimestampOrNow(DateTime.fromMillis(value.lastTimestamp), TimestampFormat.ClickHouse),
team_id: value.metric.teamId,
Expand All @@ -183,10 +183,8 @@ export class AppMetrics {
}))

await this.kafkaProducer.queueMessages({
kafkaMessages: {
topic: KAFKA_APP_METRICS,
messages: kafkaMessages,
},
topic: KAFKA_APP_METRICS,
messages: messages,
})
status.debug('🚽', `Finished flushing app metrics, took ${Date.now() - startTime}ms`)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,11 @@ export async function extractHeatmapDataStep(

acks.push(
runner.hub.kafkaProducer.queueMessages({
kafkaMessages: {
topic: runner.hub.CLICKHOUSE_HEATMAPS_KAFKA_TOPIC,
messages: heatmapEvents.map((rawEvent) => ({
key: eventUuid,
value: JSON.stringify(rawEvent),
})),
},
topic: runner.hub.CLICKHOUSE_HEATMAPS_KAFKA_TOPIC,
messages: heatmapEvents.map((rawEvent) => ({
key: eventUuid,
value: JSON.stringify(rawEvent),
})),
})
)
}
Expand Down
Loading

0 comments on commit 38bcfeb

Please sign in to comment.