diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index bbaa04a0416fe..ac97bd67c5727 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -92,9 +92,6 @@ abstract class CdpConsumerBase { public abstract handleEachBatch(messages: Message[], heartbeat: () => void): Promise protected async processInvocationResults(results: HogFunctionInvocationResult[]): Promise { - // Processes any async functions and queues up produced messages - await this.hogWatcher.observeResults(results) - // TODO: Follow up - process metrics from theĀ invocationResults await runInstrumentedFunction({ statsKey: `cdpConsumer.handleEachBatch.produceResults`, @@ -156,7 +153,7 @@ abstract class CdpConsumerBase { return await runInstrumentedFunction({ statsKey: `cdpConsumer.handleEachBatch.executeAsyncResponses`, func: async () => { - await this.hogWatcher.observeAsyncFunctionResponses(asyncResponses) + this.hogWatcher.observeAsyncFunctionResponses(asyncResponses) // Filter for blocked functions asyncResponses = asyncResponses.filter((e) => { if (this.hogWatcher.isHogFunctionOverflowed(e.hogFunctionId)) { @@ -171,7 +168,7 @@ abstract class CdpConsumerBase { }) const results = await Promise.all(asyncResponses.map((e) => this.hogExecutor.executeAsyncResponse(e))) - await this.hogWatcher.observeResults(results) + this.hogWatcher.observeResults(results) return results }, }) @@ -195,7 +192,7 @@ abstract class CdpConsumerBase { }) ) ).flat() - await this.hogWatcher.observeResults(results) + this.hogWatcher.observeResults(results) return results }, })