Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite committed Jun 20, 2024
1 parent c39992d commit 5cb1482
Showing 1 changed file with 3 additions and 6 deletions.
9 changes: 3 additions & 6 deletions plugin-server/src/cdp/cdp-consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,6 @@ abstract class CdpConsumerBase {
public abstract handleEachBatch(messages: Message[], heartbeat: () => void): Promise<void>

protected async processInvocationResults(results: HogFunctionInvocationResult[]): Promise<void> {
// Processes any async functions and queues up produced messages
await this.hogWatcher.observeResults(results)

// TODO: Follow up - process metrics from the invocationResults
await runInstrumentedFunction({
statsKey: `cdpConsumer.handleEachBatch.produceResults`,
Expand Down Expand Up @@ -156,7 +153,7 @@ abstract class CdpConsumerBase {
return await runInstrumentedFunction({
statsKey: `cdpConsumer.handleEachBatch.executeAsyncResponses`,
func: async () => {
await this.hogWatcher.observeAsyncFunctionResponses(asyncResponses)
this.hogWatcher.observeAsyncFunctionResponses(asyncResponses)
// Filter for blocked functions
asyncResponses = asyncResponses.filter((e) => {
if (this.hogWatcher.isHogFunctionOverflowed(e.hogFunctionId)) {
Expand All @@ -171,7 +168,7 @@ abstract class CdpConsumerBase {
})

const results = await Promise.all(asyncResponses.map((e) => this.hogExecutor.executeAsyncResponse(e)))
await this.hogWatcher.observeResults(results)
this.hogWatcher.observeResults(results)
return results
},
})
Expand All @@ -195,7 +192,7 @@ abstract class CdpConsumerBase {
})
)
).flat()
await this.hogWatcher.observeResults(results)
this.hogWatcher.observeResults(results)
return results
},
})
Expand Down

0 comments on commit 5cb1482

Please sign in to comment.