Skip to content

Commit

Permalink
feat: Refactor Hog executor (#22977)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored Jun 17, 2024
1 parent a0eaff1 commit 5ab011b
Show file tree
Hide file tree
Showing 7 changed files with 362 additions and 196 deletions.
32 changes: 14 additions & 18 deletions plugin-server/src/cdp/async-function-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@ import { Webhook } from '@posthog/plugin-scaffold'

import { KAFKA_CDP_FUNCTION_CALLBACKS } from '../config/kafka-topics'
import { PluginsServerConfig } from '../types'
import { KafkaProducerWrapper } from '../utils/db/kafka-producer-wrapper'
import { trackedFetch } from '../utils/fetch'
import { status } from '../utils/status'
import { RustyHook } from '../worker/rusty-hook'
import { HogFunctionInvocationAsyncRequest, HogFunctionInvocationAsyncResponse } from './types'
import {
HogFunctionInvocationAsyncRequest,
HogFunctionInvocationAsyncResponse,
HogFunctionMessageToQueue,
} from './types'

export class AsyncFunctionExecutor {
constructor(
private serverConfig: PluginsServerConfig,
private rustyHook: RustyHook,
private kafkaProducer: KafkaProducerWrapper
) {}
constructor(private serverConfig: PluginsServerConfig, private rustyHook: RustyHook) {}

async execute(request: HogFunctionInvocationAsyncRequest): Promise<HogFunctionInvocationAsyncRequest> {
async execute(request: HogFunctionInvocationAsyncRequest): Promise<HogFunctionMessageToQueue | undefined> {
const loggingContext = {
hogFunctionId: request.hogFunctionId,
invocationId: request.id,
Expand All @@ -25,16 +24,15 @@ export class AsyncFunctionExecutor {

switch (request.asyncFunctionName) {
case 'fetch':
await this.asyncFunctionFetch(request)
break
return await this.asyncFunctionFetch(request)
default:
status.error('🦔', `[HogExecutor] Unknown async function: ${request.asyncFunctionName}`, loggingContext)
}

return request
}

private async asyncFunctionFetch(request: HogFunctionInvocationAsyncRequest): Promise<any> {
private async asyncFunctionFetch(
request: HogFunctionInvocationAsyncRequest
): Promise<HogFunctionMessageToQueue | undefined> {
// TODO: validate the args
const args = request.asyncFunctionArgs ?? []
const url: string = args[0]
Expand Down Expand Up @@ -95,13 +93,11 @@ export class AsyncFunctionExecutor {
response.error = 'Something went wrong with the fetch request.'
}

// NOTE: This feels like overkill but is basically simulating rusty hook's callback that will eventually be implemented
await this.kafkaProducer!.produce({
return {
topic: KAFKA_CDP_FUNCTION_CALLBACKS,
value: Buffer.from(JSON.stringify(response)),
value: response,
key: response.id,
waitForAck: true,
})
}
}
}
}
109 changes: 51 additions & 58 deletions plugin-server/src/cdp/cdp-processed-events-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import {
HogFunctionInvocationAsyncResponse,
HogFunctionInvocationGlobals,
HogFunctionInvocationResult,
HogFunctionLogEntry,
HogFunctionMessageToQueue,
} from './types'
import { convertToHogFunctionInvocationGlobals } from './utils'

Expand Down Expand Up @@ -57,7 +57,7 @@ abstract class CdpConsumerBase {
groupTypeManager: GroupTypeManager
hogFunctionManager: HogFunctionManager
asyncFunctionExecutor?: AsyncFunctionExecutor
hogExecutor?: HogExecutor
hogExecutor: HogExecutor
appMetrics?: AppMetrics
isStopping = false

Expand All @@ -73,10 +73,54 @@ abstract class CdpConsumerBase {
this.organizationManager = new OrganizationManager(postgres, this.teamManager)
this.groupTypeManager = new GroupTypeManager(postgres, this.teamManager)
this.hogFunctionManager = new HogFunctionManager(postgres, config)
this.hogExecutor = new HogExecutor(this.config, this.hogFunctionManager)
}

public abstract handleEachBatch(messages: Message[], heartbeat: () => void): Promise<void>

protected async processInvocationResults(results: HogFunctionInvocationResult[]): Promise<void> {
// Processes any async functions and queues up produced messages

// TODO: Follow up - process metrics from the invocationResults
await runInstrumentedFunction({
statsKey: `cdpFunctionExecutor.handleEachBatch.produceResults`,
func: async () => {
const messagesToProduce: HogFunctionMessageToQueue[] = []

await Promise.all(
results.map(async (result) => {
result.logs.forEach((x) => {
messagesToProduce.push({
topic: KAFKA_LOG_ENTRIES,
value: x,
key: x.instance_id,
})
})

if (result.asyncFunction) {
const res = await this.asyncFunctionExecutor!.execute(result.asyncFunction)

if (res) {
messagesToProduce.push(res)
}
}
})
)

await Promise.all(
messagesToProduce.map((x) =>
this.kafkaProducer!.produce({
topic: x.topic,
value: Buffer.from(JSON.stringify(x.value)),
key: x.key,
waitForAck: true,
})
)
)
},
})
}

public async start(): Promise<void> {
status.info('🔁', `${this.name} - starting`, {
librdKafkaVersion: librdkafkaVersion,
Expand All @@ -94,8 +138,7 @@ abstract class CdpConsumerBase {
)

const rustyHook = this.hub?.rustyHook ?? new RustyHook(this.config)
this.asyncFunctionExecutor = new AsyncFunctionExecutor(this.config, rustyHook, this.kafkaProducer!)
this.hogExecutor = new HogExecutor(this.config, this.hogFunctionManager, this.asyncFunctionExecutor)
this.asyncFunctionExecutor = new AsyncFunctionExecutor(this.config, rustyHook)

this.appMetrics =
this.hub?.appMetrics ??
Expand Down Expand Up @@ -199,40 +242,15 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase {
statsKey: `cdpFunctionExecutor.handleEachBatch.consumeBatch`,
func: async () => {
const results = await Promise.all(
events.map((e) => this.hogExecutor!.executeMatchingFunctions(e))
events.map((e) => this.hogExecutor.executeMatchingFunctions(e))
)
invocationResults.push(...results.flat())
},
})

heartbeat()

// TODO: Follow up - process metrics from the invocationResults
await runInstrumentedFunction({
statsKey: `cdpFunctionExecutor.handleEachBatch.queueMetrics`,
func: async () => {
const allLogs = invocationResults.reduce((acc, result) => {
return [...acc, ...result.logs]
}, [] as HogFunctionLogEntry[])

await Promise.all(
allLogs.map((x) =>
this.kafkaProducer!.produce({
topic: KAFKA_LOG_ENTRIES,
value: Buffer.from(JSON.stringify(x)),
key: x.instance_id,
waitForAck: true,
})
)
)

if (allLogs.length) {
status.info('🔁', `${this.name} - produced logs`, {
size: allLogs.length,
})
}
},
})
await this.processInvocationResults(invocationResults)
},
})
}
Expand Down Expand Up @@ -310,39 +328,14 @@ export class CdpFunctionCallbackConsumer extends CdpConsumerBase {
await runInstrumentedFunction({
statsKey: `cdpFunctionExecutor.handleEachBatch.consumeBatch`,
func: async () => {
const results = await Promise.all(events.map((e) => this.hogExecutor!.executeAsyncResponse(e)))
const results = await Promise.all(events.map((e) => this.hogExecutor.executeAsyncResponse(e)))
invocationResults.push(...results.flat())
},
})

heartbeat()

// TODO: Follow up - process metrics from the invocationResults
await runInstrumentedFunction({
statsKey: `cdpFunctionExecutor.handleEachBatch.queueMetrics`,
func: async () => {
const allLogs = invocationResults.reduce((acc, result) => {
return [...acc, ...result.logs]
}, [] as HogFunctionLogEntry[])

await Promise.all(
allLogs.map((x) =>
this.kafkaProducer!.produce({
topic: KAFKA_LOG_ENTRIES,
value: Buffer.from(JSON.stringify(x)),
key: x.instance_id,
waitForAck: true,
})
)
)

if (allLogs.length) {
status.info('🔁', `${this.name} - produced logs`, {
size: allLogs.length,
})
}
},
})
await this.processInvocationResults(invocationResults)
},
})
}
Expand Down
Loading

0 comments on commit 5ab011b

Please sign in to comment.