From d3c247cef4f04ed6c36723e1485da20f246a46e0 Mon Sep 17 00:00:00 2001 From: Ben White Date: Fri, 28 Jun 2024 14:50:39 +0200 Subject: [PATCH] fix(cdp): Hearbeat and nextTick processing (#23322) --- plugin-server/src/cdp/cdp-consumers.ts | 107 +++++++++++++------------ 1 file changed, 57 insertions(+), 50 deletions(-) diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index 67f725470a682..a1f731890b5af 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -94,6 +94,25 @@ abstract class CdpConsumerBase { this.asyncFunctionExecutor = new AsyncFunctionExecutor(this.hub, rustyHook) } + protected async runWithHeartbeat(func: () => Promise | T): Promise { + // Helper function to ensure that looping over lots of hog functions doesn't block up the thread, killing the consumer + const res = await func() + this.heartbeat() + await new Promise((resolve) => process.nextTick(resolve)) + + return res + } + + protected async runManyWithHeartbeat(items: T[], func: (item: T) => Promise | R): Promise { + // Helper function to ensure that looping over lots of hog functions doesn't block up the thread, killing the consumer + const results = [] + + for (const item of items) { + results.push(await this.runWithHeartbeat(() => func(item))) + } + return results + } + public async handleEachBatch(messages: Message[], heartbeat: () => void): Promise { status.info('🔁', `${this.name} - handling batch`, { size: messages.length, @@ -108,13 +127,13 @@ abstract class CdpConsumerBase { statsKey: `cdpConsumer.handleEachBatch`, sendTimeoutGuardToSentry: false, func: async () => { - await this._handleEachBatch(messages, heartbeat) + await this._handleEachBatch(messages) await this.produceQueuedMessages() }, }) } - protected abstract _handleEachBatch(messages: Message[], heartbeat: () => void): Promise + protected abstract _handleEachBatch(messages: Message[]): Promise private async produceQueuedMessages() { const messages = [...this.messagesToProduce] @@ -159,8 +178,7 @@ abstract class CdpConsumerBase { }) if (result.asyncFunctionRequest) { - const res = await this.asyncFunctionExecutor.execute(result) - this.heartbeat() + const res = await this.runWithHeartbeat(() => this.asyncFunctionExecutor.execute(result)) // NOTE: This is very temporary as it is producing the response. the response will actually be produced by the 3rd party service // Later this will actually be the _request_ which we will push to the async function topic if we make one @@ -217,13 +235,10 @@ abstract class CdpConsumerBase { } } - const results = await Promise.all( - asyncResponsesToRun.map((e) => { - const res = this.hogExecutor.executeAsyncResponse(e) - this.heartbeat() - return res - }) + const results = await this.runManyWithHeartbeat(asyncResponsesToRun, (item) => + this.hogExecutor.executeAsyncResponse(item) ) + this.hogWatcher.currentObservations.observeResults(results) return results }, @@ -288,12 +303,9 @@ abstract class CdpConsumerBase { }) } - return healthy.map((x) => { - // NOTE: Let's see if this works - otherwise we might need a process.nextTick to make sure there is room for events to fire - const res = this.hogExecutor.executeFunction(globals, x) - this.heartbeat() - return res - }) + return this.runManyWithHeartbeat(healthy, (x) => + this.hogExecutor.executeFunction(globals, x) + ) }) ) ) @@ -396,19 +408,19 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase { protected topic = KAFKA_EVENTS_JSON protected consumerGroupId = 'cdp-processed-events-consumer' - public async _handleEachBatch(messages: Message[], heartbeat: () => void): Promise { - const invocationGlobals = await runInstrumentedFunction({ - statsKey: `cdpConsumer.handleEachBatch.parseKafkaMessages`, - func: async () => await this.parseMessages(messages), - }) - heartbeat() + public async _handleEachBatch(messages: Message[]): Promise { + const invocationGlobals = await this.runWithHeartbeat(() => + runInstrumentedFunction({ + statsKey: `cdpConsumer.handleEachBatch.parseKafkaMessages`, + func: async () => await this.parseMessages(messages), + }) + ) if (!invocationGlobals.length) { return } - const invocationResults = await this.executeMatchingFunctions(invocationGlobals) - heartbeat() + const invocationResults = await this.runWithHeartbeat(() => this.executeMatchingFunctions(invocationGlobals)) await this.processInvocationResults(invocationResults) } @@ -467,19 +479,19 @@ export class CdpFunctionCallbackConsumer extends CdpConsumerBase { protected topic = KAFKA_CDP_FUNCTION_CALLBACKS protected consumerGroupId = 'cdp-function-callback-consumer' - public async _handleEachBatch(messages: Message[], heartbeat: () => void): Promise { - const events = await runInstrumentedFunction({ - statsKey: `cdpConsumer.handleEachBatch.parseKafkaMessages`, - func: () => Promise.resolve(this.parseMessages(messages)), - }) - heartbeat() + public async _handleEachBatch(messages: Message[]): Promise { + const events = await this.runWithHeartbeat(() => + runInstrumentedFunction({ + statsKey: `cdpConsumer.handleEachBatch.parseKafkaMessages`, + func: () => Promise.resolve(this.parseMessages(messages)), + }) + ) if (!events.length) { return } - const invocationResults = await this.executeAsyncResponses(events) - heartbeat() + const invocationResults = await this.runWithHeartbeat(() => this.executeAsyncResponses(events)) await this.processInvocationResults(invocationResults) } @@ -510,24 +522,21 @@ export class CdpOverflowConsumer extends CdpConsumerBase { protected topic = KAFKA_CDP_FUNCTION_OVERFLOW protected consumerGroupId = 'cdp-overflow-consumer' - public async _handleEachBatch(messages: Message[], heartbeat: () => void): Promise { + public async _handleEachBatch(messages: Message[]): Promise { // This consumer can receive both events and callbacks so needs to check the message being parsed - const [overflowedGlobals, callbacks] = await runInstrumentedFunction({ - statsKey: `cdpConsumer.handleEachBatch.parseKafkaMessages`, - func: () => Promise.resolve(this.parseMessages(messages)), - }) - - heartbeat() + const [overflowedGlobals, callbacks] = await this.runWithHeartbeat(() => + runInstrumentedFunction({ + statsKey: `cdpConsumer.handleEachBatch.parseKafkaMessages`, + func: () => Promise.resolve(this.parseMessages(messages)), + }) + ) const invocationResults = ( - await Promise.all([ - this.executeAsyncResponses(callbacks), - this.executeOverflowedFunctions(overflowedGlobals), - ]) + await this.runWithHeartbeat(() => + Promise.all([this.executeAsyncResponses(callbacks), this.executeOverflowedFunctions(overflowedGlobals)]) + ) ).flat() - heartbeat() - await this.processInvocationResults(invocationResults) } @@ -540,11 +549,9 @@ export class CdpOverflowConsumer extends CdpConsumerBase { const results = ( await Promise.all( invocationGlobals.map((item) => { - return item.hogFunctionIds.map((hogFunctionId) => { - const res = this.hogExecutor.executeFunction(item.globals, hogFunctionId) - this.heartbeat() - return res - }) + return this.runManyWithHeartbeat(item.hogFunctionIds, (hogFunctionId) => + this.hogExecutor.executeFunction(item.globals, hogFunctionId) + ) }) ) )