Skip to content

Commit

Permalink
cleanup(plugin-server): make waitForAck explicit/required
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner committed Mar 22, 2024
1 parent 3ccfb85 commit 3a7cd0e
Show file tree
Hide file tree
Showing 28 changed files with 318 additions and 214 deletions.
1 change: 1 addition & 0 deletions plugin-server/functional_tests/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ export const capture = async ({
})
),
key: teamId ? teamId.toString() : '',
waitForAck: true,
})
}

Expand Down
6 changes: 3 additions & 3 deletions plugin-server/functional_tests/jobs-consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ describe('dlq handling', () => {
test.concurrent(`handles empty messages`, async () => {
const key = uuidv4()

await produce({ topic: 'jobs', message: null, key })
await produce({ topic: 'jobs', message: null, key, waitForAck: true })

await waitForExpect(() => {
const messages = dlq.filter((message) => message.key?.toString() === key)
Expand All @@ -54,7 +54,7 @@ describe('dlq handling', () => {
test.concurrent(`handles invalid JSON`, async () => {
const key = uuidv4()

await produce({ topic: 'jobs', message: Buffer.from('invalid json'), key })
await produce({ topic: 'jobs', message: Buffer.from('invalid json'), key, waitForAck: true })

await waitForExpect(() => {
const messages = dlq.filter((message) => message.key?.toString() === key)
Expand All @@ -72,7 +72,7 @@ describe('dlq handling', () => {
labels: { topic: 'jobs', partition: '0', groupId: 'jobs-inserter' },
})

await produce({ topic: 'jobs', message: Buffer.from(''), key: '' })
await produce({ topic: 'jobs', message: Buffer.from(''), key: '', waitForAck: true })

await waitForExpect(async () => {
const metricAfter = await getMetric({
Expand Down
14 changes: 12 additions & 2 deletions plugin-server/functional_tests/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,17 @@ export async function createKafkaProducer() {
return producer
}

export async function produce({ topic, message, key }: { topic: string; message: Buffer | null; key: string }) {
export async function produce({
topic,
message,
key,
waitForAck,
}: {
topic: string
message: Buffer | null
key: string
waitForAck: boolean
}) {
producer = producer ?? (await createKafkaProducer())
await defaultProduce({ producer, topic, value: message, key: Buffer.from(key) })
await defaultProduce({ producer, topic, value: message, key: Buffer.from(key), waitForAck })
}
8 changes: 5 additions & 3 deletions plugin-server/functional_tests/scheduled-tasks-runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ describe('dlq handling', () => {
test.concurrent(`handles empty messages`, async () => {
const key = uuidv4()

await produce({ topic: 'scheduled_tasks', message: null, key })
await produce({ topic: 'scheduled_tasks', message: null, key, waitForAck: true })

await waitForExpect(() => {
const messages = dlq.filter((message) => message.key?.toString() === key)
Expand All @@ -54,7 +54,7 @@ describe('dlq handling', () => {
test.concurrent(`handles invalid JSON`, async () => {
const key = uuidv4()

await produce({ topic: 'scheduled_tasks', message: Buffer.from('invalid json'), key })
await produce({ topic: 'scheduled_tasks', message: Buffer.from('invalid json'), key, waitForAck: true })

await waitForExpect(() => {
const messages = dlq.filter((message) => message.key?.toString() === key)
Expand All @@ -69,6 +69,7 @@ describe('dlq handling', () => {
topic: 'scheduled_tasks',
message: Buffer.from(JSON.stringify({ taskType: 'invalidTaskType', pluginConfigId: 1 })),
key,
waitForAck: true,
})

await waitForExpect(() => {
Expand All @@ -84,6 +85,7 @@ describe('dlq handling', () => {
topic: 'scheduled_tasks',
message: Buffer.from(JSON.stringify({ taskType: 'runEveryMinute', pluginConfigId: 'asdf' })),
key,
waitForAck: true,
})

await waitForExpect(() => {
Expand All @@ -104,7 +106,7 @@ describe('dlq handling', () => {

// NOTE: we don't actually care too much about the contents of the
// message, just that it triggeres the consumer to try to process it.
await produce({ topic: 'scheduled_tasks', message: Buffer.from(''), key: '' })
await produce({ topic: 'scheduled_tasks', message: Buffer.from(''), key: '', waitForAck: true })

await waitForExpect(async () => {
const metricAfter = await getMetric({
Expand Down
9 changes: 8 additions & 1 deletion plugin-server/functional_tests/session-recordings.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,12 @@ test.skip('consumer updates timestamp exported to prometheus', async () => {
},
})

await produce({ topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, message: Buffer.from(''), key: '' })
await produce({
topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
message: Buffer.from(''),
key: '',
waitForAck: true,
})

await waitForExpect(async () => {
const metricAfter = await getMetric({
Expand Down Expand Up @@ -245,13 +250,15 @@ test.skip(`handles message with no token or with token and no associated team_id
topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
message: Buffer.from(JSON.stringify({ uuid: noTokenUuid, data: JSON.stringify({}) })),
key: noTokenKey,
waitForAck: true,
})
await produce({
topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
message: Buffer.from(
JSON.stringify({ uuid: noAssociatedTeamUuid, token: 'no associated team', data: JSON.stringify({}) })
),
key: noAssociatedTeamKey,
waitForAck: true,
})

await capture(makeSessionMessage(teamId, 'should be ingested'))
Expand Down
4 changes: 2 additions & 2 deletions plugin-server/src/kafka/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ export const produce = async ({
value,
key,
headers = [],
waitForAck = true,
waitForAck,
}: {
producer: RdKafkaProducer
topic: string
value: MessageValue
key: MessageKey
headers?: MessageHeader[]
waitForAck?: boolean
waitForAck: boolean
}): Promise<number | null | undefined> => {
status.debug('📤', 'Producing message', { topic: topic })
const produceSpan = getSpan()?.startChild({ op: 'kafka_produce' })
Expand Down
7 changes: 5 additions & 2 deletions plugin-server/src/main/graphile-worker/schedule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,11 @@ export async function runScheduledTasks(
for (const pluginConfigId of server.pluginSchedule?.[taskType] || []) {
status.info('⏲️', 'queueing_schedule_task', { taskType, pluginConfigId })
await server.kafkaProducer.queueMessage({
topic: KAFKA_SCHEDULED_TASKS,
messages: [{ key: pluginConfigId.toString(), value: JSON.stringify({ taskType, pluginConfigId }) }],
kafkaMessage: {
topic: KAFKA_SCHEDULED_TASKS,
messages: [{ key: pluginConfigId.toString(), value: JSON.stringify({ taskType, pluginConfigId }) }],
},
waitForAck: true,
})
graphileScheduledTaskCounter.labels({ status: 'queued', task: taskType }).inc()
}
Expand Down
14 changes: 10 additions & 4 deletions plugin-server/src/main/ingestion-queues/jobs-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,11 @@ export const startJobsConsumer = async ({
})
// TODO: handle resolving offsets asynchronously
await producer.queueMessage({
topic: KAFKA_JOBS_DLQ,
messages: [{ value: message.value, key: message.key }],
kafkaMessage: {
topic: KAFKA_JOBS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
waitForAck: true,
})
resolveOffset(message.offset)
continue
Expand All @@ -71,8 +74,11 @@ export const startJobsConsumer = async ({
})
// TODO: handle resolving offsets asynchronously
await producer.queueMessage({
topic: KAFKA_JOBS_DLQ,
messages: [{ value: message.value, key: message.key }],
kafkaMessage: {
topic: KAFKA_JOBS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
waitForAck: true,
})
resolveOffset(message.offset)
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,11 @@ const getTasksFromBatch = async (batch: Batch, producer: KafkaProducerWrapper) =
value: message.value,
})
await producer.queueMessage({
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
kafkaMessage: {
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
waitForAck: true,
})
continue
}
Expand All @@ -181,17 +184,23 @@ const getTasksFromBatch = async (batch: Batch, producer: KafkaProducerWrapper) =
error: error.stack ?? error,
})
await producer.queueMessage({
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
kafkaMessage: {
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
waitForAck: true,
})
continue
}

if (!taskTypes.includes(task.taskType) || isNaN(task.pluginConfigId)) {
status.warn('⚠️', `Invalid schema for partition ${batch.partition} offset ${message.offset}.`, task)
await producer.queueMessage({
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
kafkaMessage: {
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
},
waitForAck: true,
})
continue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ export class ConsoleLogsIngester {
topic: KAFKA_LOG_ENTRIES,
value: Buffer.from(JSON.stringify(cle)),
key: event.session_id,
waitForAck: true,
})
)
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ export class ReplayEventsIngester {
topic: KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS,
value: Buffer.from(JSON.stringify(replayRecord)),
key: event.session_id,
waitForAck: true,
}),
]
} catch (error) {
Expand Down
47 changes: 25 additions & 22 deletions plugin-server/src/utils/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ export class DB {
})
}

await this.kafkaProducer.queueMessages(kafkaMessages)
await this.kafkaProducer.queueMessages({ kafkaMessages, waitForAck: true })
return person
}

Expand Down Expand Up @@ -759,7 +759,7 @@ export class DB {
if (tx) {
kafkaMessages.push(message)
} else {
await this.kafkaProducer.queueMessage(message)
await this.kafkaProducer.queueMessage({ kafkaMessage: message, waitForAck: true })
}

status.debug(
Expand Down Expand Up @@ -829,7 +829,7 @@ export class DB {
public async addDistinctId(person: Person, distinctId: string): Promise<void> {
const kafkaMessages = await this.addDistinctIdPooled(person, distinctId)
if (kafkaMessages.length) {
await this.kafkaProducer.queueMessages(kafkaMessages)
await this.kafkaProducer.queueMessages({ kafkaMessages, waitForAck: true })
}
}

Expand Down Expand Up @@ -1072,15 +1072,15 @@ export class DB {
pluginLogEntryCounter.labels({ plugin_id: String(pluginConfig.plugin_id), source }).inc()

try {
await this.kafkaProducer.queueSingleJsonMessage(
KAFKA_PLUGIN_LOG_ENTRIES,
parsedEntry.id,
parsedEntry,
await this.kafkaProducer.queueSingleJsonMessage({
topic: KAFKA_PLUGIN_LOG_ENTRIES,
key: parsedEntry.id,
object: parsedEntry,
// For logs, we relax our durability requirements a little and
// do not wait for acks that Kafka has persisted the message to
// disk.
false
)
waitForAck: false,
})
} catch (e) {
captureException(e, { tags: { team_id: entry.pluginConfig.team_id } })
console.error('Failed to produce message', e, parsedEntry)
Expand Down Expand Up @@ -1409,19 +1409,22 @@ export class DB {
version: number
): Promise<void> {
await this.kafkaProducer.queueMessage({
topic: KAFKA_GROUPS,
messages: [
{
value: JSON.stringify({
group_type_index: groupTypeIndex,
group_key: groupKey,
team_id: teamId,
group_properties: JSON.stringify(properties),
created_at: castTimestampOrNow(createdAt, TimestampFormat.ClickHouseSecondPrecision),
version,
}),
},
],
kafkaMessage: {
topic: KAFKA_GROUPS,
messages: [
{
value: JSON.stringify({
group_type_index: groupTypeIndex,
group_key: groupKey,
team_id: teamId,
group_properties: JSON.stringify(properties),
created_at: castTimestampOrNow(createdAt, TimestampFormat.ClickHouseSecondPrecision),
version,
}),
},
],
},
waitForAck: true,
})
}

Expand Down
17 changes: 10 additions & 7 deletions plugin-server/src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,16 @@ export async function createHub(
// chained, and if we do not manage to produce then the chain will be
// broken.
await kafkaProducer.queueMessage({
topic: KAFKA_JOBS,
messages: [
{
value: Buffer.from(JSON.stringify(job)),
key: Buffer.from(job.pluginConfigTeam.toString()),
},
],
kafkaMessage: {
topic: KAFKA_JOBS,
messages: [
{
value: Buffer.from(JSON.stringify(job)),
key: Buffer.from(job.pluginConfigTeam.toString()),
},
],
},
waitForAck: true,
})
}

Expand Down
Loading

0 comments on commit 3a7cd0e

Please sign in to comment.