Skip to content

Commit

Permalink
fix(cdp): Hearbeat and nextTick processing (#23322)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored Jun 28, 2024
1 parent fc80f93 commit d3c247c
Showing 1 changed file with 57 additions and 50 deletions.
107 changes: 57 additions & 50 deletions plugin-server/src/cdp/cdp-consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,25 @@ abstract class CdpConsumerBase {
this.asyncFunctionExecutor = new AsyncFunctionExecutor(this.hub, rustyHook)
}

protected async runWithHeartbeat<T>(func: () => Promise<T> | T): Promise<T> {
// Helper function to ensure that looping over lots of hog functions doesn't block up the thread, killing the consumer
const res = await func()
this.heartbeat()
await new Promise((resolve) => process.nextTick(resolve))

return res
}

protected async runManyWithHeartbeat<T, R>(items: T[], func: (item: T) => Promise<R> | R): Promise<R[]> {
// Helper function to ensure that looping over lots of hog functions doesn't block up the thread, killing the consumer
const results = []

for (const item of items) {
results.push(await this.runWithHeartbeat(() => func(item)))
}
return results
}

public async handleEachBatch(messages: Message[], heartbeat: () => void): Promise<void> {
status.info('🔁', `${this.name} - handling batch`, {
size: messages.length,
Expand All @@ -108,13 +127,13 @@ abstract class CdpConsumerBase {
statsKey: `cdpConsumer.handleEachBatch`,
sendTimeoutGuardToSentry: false,
func: async () => {
await this._handleEachBatch(messages, heartbeat)
await this._handleEachBatch(messages)
await this.produceQueuedMessages()
},
})
}

protected abstract _handleEachBatch(messages: Message[], heartbeat: () => void): Promise<void>
protected abstract _handleEachBatch(messages: Message[]): Promise<void>

private async produceQueuedMessages() {
const messages = [...this.messagesToProduce]
Expand Down Expand Up @@ -159,8 +178,7 @@ abstract class CdpConsumerBase {
})

if (result.asyncFunctionRequest) {
const res = await this.asyncFunctionExecutor.execute(result)
this.heartbeat()
const res = await this.runWithHeartbeat(() => this.asyncFunctionExecutor.execute(result))

// NOTE: This is very temporary as it is producing the response. the response will actually be produced by the 3rd party service
// Later this will actually be the _request_ which we will push to the async function topic if we make one
Expand Down Expand Up @@ -217,13 +235,10 @@ abstract class CdpConsumerBase {
}
}

const results = await Promise.all(
asyncResponsesToRun.map((e) => {
const res = this.hogExecutor.executeAsyncResponse(e)
this.heartbeat()
return res
})
const results = await this.runManyWithHeartbeat(asyncResponsesToRun, (item) =>
this.hogExecutor.executeAsyncResponse(item)
)

this.hogWatcher.currentObservations.observeResults(results)
return results
},
Expand Down Expand Up @@ -288,12 +303,9 @@ abstract class CdpConsumerBase {
})
}

return healthy.map((x) => {
// NOTE: Let's see if this works - otherwise we might need a process.nextTick to make sure there is room for events to fire
const res = this.hogExecutor.executeFunction(globals, x)
this.heartbeat()
return res
})
return this.runManyWithHeartbeat(healthy, (x) =>
this.hogExecutor.executeFunction(globals, x)
)
})
)
)
Expand Down Expand Up @@ -396,19 +408,19 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase {
protected topic = KAFKA_EVENTS_JSON
protected consumerGroupId = 'cdp-processed-events-consumer'

public async _handleEachBatch(messages: Message[], heartbeat: () => void): Promise<void> {
const invocationGlobals = await runInstrumentedFunction({
statsKey: `cdpConsumer.handleEachBatch.parseKafkaMessages`,
func: async () => await this.parseMessages(messages),
})
heartbeat()
public async _handleEachBatch(messages: Message[]): Promise<void> {
const invocationGlobals = await this.runWithHeartbeat(() =>
runInstrumentedFunction({
statsKey: `cdpConsumer.handleEachBatch.parseKafkaMessages`,
func: async () => await this.parseMessages(messages),
})
)

if (!invocationGlobals.length) {
return
}

const invocationResults = await this.executeMatchingFunctions(invocationGlobals)
heartbeat()
const invocationResults = await this.runWithHeartbeat(() => this.executeMatchingFunctions(invocationGlobals))

await this.processInvocationResults(invocationResults)
}
Expand Down Expand Up @@ -467,19 +479,19 @@ export class CdpFunctionCallbackConsumer extends CdpConsumerBase {
protected topic = KAFKA_CDP_FUNCTION_CALLBACKS
protected consumerGroupId = 'cdp-function-callback-consumer'

public async _handleEachBatch(messages: Message[], heartbeat: () => void): Promise<void> {
const events = await runInstrumentedFunction({
statsKey: `cdpConsumer.handleEachBatch.parseKafkaMessages`,
func: () => Promise.resolve(this.parseMessages(messages)),
})
heartbeat()
public async _handleEachBatch(messages: Message[]): Promise<void> {
const events = await this.runWithHeartbeat(() =>
runInstrumentedFunction({
statsKey: `cdpConsumer.handleEachBatch.parseKafkaMessages`,
func: () => Promise.resolve(this.parseMessages(messages)),
})
)

if (!events.length) {
return
}

const invocationResults = await this.executeAsyncResponses(events)
heartbeat()
const invocationResults = await this.runWithHeartbeat(() => this.executeAsyncResponses(events))

await this.processInvocationResults(invocationResults)
}
Expand Down Expand Up @@ -510,24 +522,21 @@ export class CdpOverflowConsumer extends CdpConsumerBase {
protected topic = KAFKA_CDP_FUNCTION_OVERFLOW
protected consumerGroupId = 'cdp-overflow-consumer'

public async _handleEachBatch(messages: Message[], heartbeat: () => void): Promise<void> {
public async _handleEachBatch(messages: Message[]): Promise<void> {
// This consumer can receive both events and callbacks so needs to check the message being parsed
const [overflowedGlobals, callbacks] = await runInstrumentedFunction({
statsKey: `cdpConsumer.handleEachBatch.parseKafkaMessages`,
func: () => Promise.resolve(this.parseMessages(messages)),
})

heartbeat()
const [overflowedGlobals, callbacks] = await this.runWithHeartbeat(() =>
runInstrumentedFunction({
statsKey: `cdpConsumer.handleEachBatch.parseKafkaMessages`,
func: () => Promise.resolve(this.parseMessages(messages)),
})
)

const invocationResults = (
await Promise.all([
this.executeAsyncResponses(callbacks),
this.executeOverflowedFunctions(overflowedGlobals),
])
await this.runWithHeartbeat(() =>
Promise.all([this.executeAsyncResponses(callbacks), this.executeOverflowedFunctions(overflowedGlobals)])
)
).flat()

heartbeat()

await this.processInvocationResults(invocationResults)
}

Expand All @@ -540,11 +549,9 @@ export class CdpOverflowConsumer extends CdpConsumerBase {
const results = (
await Promise.all(
invocationGlobals.map((item) => {
return item.hogFunctionIds.map((hogFunctionId) => {
const res = this.hogExecutor.executeFunction(item.globals, hogFunctionId)
this.heartbeat()
return res
})
return this.runManyWithHeartbeat(item.hogFunctionIds, (hogFunctionId) =>
this.hogExecutor.executeFunction(item.globals, hogFunctionId)
)
})
)
)
Expand Down

0 comments on commit d3c247c

Please sign in to comment.