Skip to content

Commit

Permalink
cleanup(plugin-server): make waitForAck explicit/required
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner committed Mar 22, 2024
1 parent 493f14f commit 48fddc0
Show file tree
Hide file tree
Showing 14 changed files with 136 additions and 90 deletions.
4 changes: 2 additions & 2 deletions plugin-server/src/kafka/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ export const produce = async ({
value,
key,
headers = [],
waitForAck = true,
waitForAck,
}: {
producer: RdKafkaProducer
topic: string
value: MessageValue
key: MessageKey
headers?: MessageHeader[]
waitForAck?: boolean
waitForAck: boolean
}): Promise<number | null | undefined> => {
status.debug('📤', 'Producing message', { topic: topic })
const produceSpan = getSpan()?.startChild({ op: 'kafka_produce' })
Expand Down
7 changes: 5 additions & 2 deletions plugin-server/src/main/graphile-worker/schedule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,11 @@ export async function runScheduledTasks(
for (const pluginConfigId of server.pluginSchedule?.[taskType] || []) {
status.info('⏲️', 'queueing_schedule_task', { taskType, pluginConfigId })
await server.kafkaProducer.queueMessage({
topic: KAFKA_SCHEDULED_TASKS,
messages: [{ key: pluginConfigId.toString(), value: JSON.stringify({ taskType, pluginConfigId }) }],
kafkaMessage: {
topic: KAFKA_SCHEDULED_TASKS,
messages: [{ key: pluginConfigId.toString(), value: JSON.stringify({ taskType, pluginConfigId }) }],
},
waitForAck: true,
})
graphileScheduledTaskCounter.labels({ status: 'queued', task: taskType }).inc()
}
Expand Down
14 changes: 10 additions & 4 deletions plugin-server/src/main/ingestion-queues/jobs-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,11 @@ export const startJobsConsumer = async ({
})
// TODO: handle resolving offsets asynchronously
await producer.queueMessage({
topic: KAFKA_JOBS_DLQ,
messages: [{ value: message.value, key: message.key }],
kafkaMessage: {
topic: KAFKA_JOBS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
waitForAck: true,
})
resolveOffset(message.offset)
continue
Expand All @@ -71,8 +74,11 @@ export const startJobsConsumer = async ({
})
// TODO: handle resolving offsets asynchronously
await producer.queueMessage({
topic: KAFKA_JOBS_DLQ,
messages: [{ value: message.value, key: message.key }],
kafkaMessage: {
topic: KAFKA_JOBS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
waitForAck: true,
})
resolveOffset(message.offset)
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,11 @@ const getTasksFromBatch = async (batch: Batch, producer: KafkaProducerWrapper) =
value: message.value,
})
await producer.queueMessage({
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
kafkaMessage: {
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
waitForAck: true,
})
continue
}
Expand All @@ -181,17 +184,23 @@ const getTasksFromBatch = async (batch: Batch, producer: KafkaProducerWrapper) =
error: error.stack ?? error,
})
await producer.queueMessage({
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
kafkaMessage: {
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
waitForAck: true,
})
continue
}

if (!taskTypes.includes(task.taskType) || isNaN(task.pluginConfigId)) {
status.warn('⚠️', `Invalid schema for partition ${batch.partition} offset ${message.offset}.`, task)
await producer.queueMessage({
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
kafkaMessage: {
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
waitForAck: true,
})
continue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ export class ConsoleLogsIngester {
topic: KAFKA_LOG_ENTRIES,
value: Buffer.from(JSON.stringify(cle)),
key: event.session_id,
waitForAck: true,
})
)
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ export class ReplayEventsIngester {
topic: KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS,
value: Buffer.from(JSON.stringify(replayRecord)),
key: event.session_id,
waitForAck: true,
}),
]
} catch (error) {
Expand Down
47 changes: 25 additions & 22 deletions plugin-server/src/utils/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ export class DB {
})
}

await this.kafkaProducer.queueMessages(kafkaMessages)
await this.kafkaProducer.queueMessages({ kafkaMessages, waitForAck: true })
return person
}

Expand Down Expand Up @@ -759,7 +759,7 @@ export class DB {
if (tx) {
kafkaMessages.push(message)
} else {
await this.kafkaProducer.queueMessage(message)
await this.kafkaProducer.queueMessage({ kafkaMessage: message, waitForAck: true })
}

status.debug(
Expand Down Expand Up @@ -829,7 +829,7 @@ export class DB {
public async addDistinctId(person: Person, distinctId: string): Promise<void> {
const kafkaMessages = await this.addDistinctIdPooled(person, distinctId)
if (kafkaMessages.length) {
await this.kafkaProducer.queueMessages(kafkaMessages)
await this.kafkaProducer.queueMessages({ kafkaMessages, waitForAck: true })
}
}

Expand Down Expand Up @@ -1072,15 +1072,15 @@ export class DB {
pluginLogEntryCounter.labels({ plugin_id: String(pluginConfig.plugin_id), source }).inc()

try {
await this.kafkaProducer.queueSingleJsonMessage(
KAFKA_PLUGIN_LOG_ENTRIES,
parsedEntry.id,
parsedEntry,
await this.kafkaProducer.queueSingleJsonMessage({
topic: KAFKA_PLUGIN_LOG_ENTRIES,
key: parsedEntry.id,
object: parsedEntry,
// For logs, we relax our durability requirements a little and
// do not wait for acks that Kafka has persisted the message to
// disk.
false
)
waitForAck: false,
})
} catch (e) {
captureException(e, { tags: { team_id: entry.pluginConfig.team_id } })
console.error('Failed to produce message', e, parsedEntry)
Expand Down Expand Up @@ -1409,19 +1409,22 @@ export class DB {
version: number
): Promise<void> {
await this.kafkaProducer.queueMessage({
topic: KAFKA_GROUPS,
messages: [
{
value: JSON.stringify({
group_type_index: groupTypeIndex,
group_key: groupKey,
team_id: teamId,
group_properties: JSON.stringify(properties),
created_at: castTimestampOrNow(createdAt, TimestampFormat.ClickHouseSecondPrecision),
version,
}),
},
],
kafkaMessage: {
topic: KAFKA_GROUPS,
messages: [
{
value: JSON.stringify({
group_type_index: groupTypeIndex,
group_key: groupKey,
team_id: teamId,
group_properties: JSON.stringify(properties),
created_at: castTimestampOrNow(createdAt, TimestampFormat.ClickHouseSecondPrecision),
version,
}),
},
],
},
waitForAck: true,
})
}

Expand Down
17 changes: 10 additions & 7 deletions plugin-server/src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,16 @@ export async function createHub(
// chained, and if we do not manage to produce then the chain will be
// broken.
await kafkaProducer.queueMessage({
topic: KAFKA_JOBS,
messages: [
{
value: Buffer.from(JSON.stringify(job)),
key: Buffer.from(job.pluginConfigTeam.toString()),
},
],
kafkaMessage: {
topic: KAFKA_JOBS,
messages: [
{
value: Buffer.from(JSON.stringify(job)),
key: Buffer.from(job.pluginConfigTeam.toString()),
},
],
},
waitForAck: true,
})
}

Expand Down
39 changes: 25 additions & 14 deletions plugin-server/src/utils/db/kafka-producer-wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export class KafkaProducerWrapper {
key: MessageKey
topic: string
headers?: MessageHeader[]
waitForAck?: boolean
waitForAck: boolean
}): Promise<void> {
try {
kafkaProducerMessagesQueuedCounter.labels({ topic_name: topic }).inc()
Expand Down Expand Up @@ -66,7 +66,7 @@ export class KafkaProducerWrapper {
}
}

async queueMessage(kafkaMessage: ProducerRecord, waitForAck?: boolean) {
async queueMessage({ kafkaMessage, waitForAck }: { kafkaMessage: ProducerRecord; waitForAck: boolean }) {
return await Promise.all(
kafkaMessage.messages.map((message) =>
this.produce({
Expand All @@ -80,23 +80,34 @@ export class KafkaProducerWrapper {
)
}

async queueMessages(kafkaMessages: ProducerRecord[], waitForAck?: boolean): Promise<void> {
await Promise.all(kafkaMessages.map((message) => this.queueMessage(message, waitForAck)))
async queueMessages({
kafkaMessages,
waitForAck,
}: {
kafkaMessages: ProducerRecord[]
waitForAck: boolean
}): Promise<void> {
await Promise.all(kafkaMessages.map((kafkaMessage) => this.queueMessage({ kafkaMessage, waitForAck })))
}

async queueSingleJsonMessage(
topic: string,
key: Message['key'],
object: Record<string, any>,
waitForAck?: boolean
): Promise<void> {
await this.queueMessage(
{
async queueSingleJsonMessage({
topic,
key,
object,
waitForAck,
}: {
topic: string
key: Message['key']
object: Record<string, any>
waitForAck: boolean
}): Promise<void> {
await this.queueMessage({
kafkaMessage: {
topic,
messages: [{ key, value: JSON.stringify(object) }],
},
waitForAck
)
waitForAck,
})
}

public async flush() {
Expand Down
7 changes: 5 additions & 2 deletions plugin-server/src/worker/ingestion/app-metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,11 @@ export class AppMetrics {
}))

await this.kafkaProducer.queueMessage({
topic: KAFKA_APP_METRICS,
messages: kafkaMessages,
kafkaMessage: {
topic: KAFKA_APP_METRICS,
messages: kafkaMessages,
},
waitForAck: true,
})
status.debug('🚽', `Finished flushing app metrics, took ${Date.now() - startTime}ms`)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ export class EventPipelineRunner {
teamId,
`plugin_server_ingest_event:${currentStepName}`
)
await this.hub.db.kafkaProducer!.queueMessage(message)
await this.hub.db.kafkaProducer!.queueMessage({ kafkaMessage: message, waitForAck: true })
} catch (dlqError) {
status.info('🔔', `Errored trying to add event to dead letter queue. Error: ${dlqError}`)
Sentry.captureException(dlqError, {
Expand Down
4 changes: 2 additions & 2 deletions plugin-server/src/worker/ingestion/person-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ export class PersonState {
olderCreatedAt, // Keep the oldest created_at (i.e. the first time we've seen either person)
properties
)
await this.db.kafkaProducer.queueMessages(kafkaMessages)
await this.db.kafkaProducer.queueMessages({ kafkaMessages, waitForAck: true })
return mergedPerson
}

Expand Down Expand Up @@ -767,7 +767,7 @@ export class DeferredPersonOverrideWorker {
// Postgres for some reason -- the same row state should be
// generated each call, and the receiving ReplacingMergeTree will
// ensure we keep only the latest version after all writes settle.)
await this.kafkaProducer.queueMessages(messages, true)
await this.kafkaProducer.queueMessages({ kafkaMessages: messages, waitForAck: true })

return rows.length
}
Expand Down
27 changes: 15 additions & 12 deletions plugin-server/src/worker/ingestion/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,21 @@ export async function captureIngestionWarning(
const limiter_key = `${teamId}:${type}:${debounce?.key || ''}`
if (!!debounce?.alwaysSend || IngestionWarningLimiter.consume(limiter_key, 1)) {
await kafkaProducer.queueMessage({
topic: KAFKA_INGESTION_WARNINGS,
messages: [
{
value: JSON.stringify({
team_id: teamId,
type: type,
source: 'plugin-server',
details: JSON.stringify(details),
timestamp: castTimestampOrNow(null, TimestampFormat.ClickHouse),
}),
},
],
kafkaMessage: {
topic: KAFKA_INGESTION_WARNINGS,
messages: [
{
value: JSON.stringify({
team_id: teamId,
type: type,
source: 'plugin-server',
details: JSON.stringify(details),
timestamp: castTimestampOrNow(null, TimestampFormat.ClickHouse),
}),
},
],
},
waitForAck: true,
})
} else {
return Promise.resolve()
Expand Down
Loading

0 comments on commit 48fddc0

Please sign in to comment.