From cfcc7328f532788a7d8cee9849f7e5becf848a69 Mon Sep 17 00:00:00 2001 From: Ben White Date: Wed, 4 Sep 2024 16:46:17 +0200 Subject: [PATCH 1/6] Fixed serialization to not include the hog function --- plugin-server/src/cdp/cdp-consumers.ts | 8 ++------ plugin-server/src/cdp/fetch-executor.ts | 4 ++-- plugin-server/src/cdp/utils.ts | 11 +++++++++++ 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index 16eff91ceeb0f..927457d6a7580 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -44,6 +44,7 @@ import { createInvocation, gzipObject, prepareLogEntriesForClickhouse, + serializeInvocation, unGzipObject, } from './utils' @@ -217,12 +218,7 @@ abstract class CdpConsumerBase { // For now we just enqueue to kafka // For kafka style this is overkill to enqueue this way but it simplifies migrating to the new system - const serializedInvocation: HogFunctionInvocationSerialized = { - ...invocation, - hogFunctionId: invocation.hogFunction.id, - } - - delete (serializedInvocation as any).hogFunction + const serializedInvocation = serializeInvocation(invocation) const request: HogFunctionInvocationSerializedCompressed = { state: await gzipObject(serializedInvocation), diff --git a/plugin-server/src/cdp/fetch-executor.ts b/plugin-server/src/cdp/fetch-executor.ts index b2e99ef0a1836..89900215ec1fd 100644 --- a/plugin-server/src/cdp/fetch-executor.ts +++ b/plugin-server/src/cdp/fetch-executor.ts @@ -12,7 +12,7 @@ import { HogFunctionQueueParametersFetchRequest, HogFunctionQueueParametersFetchResponse, } from './types' -import { gzipObject } from './utils' +import { gzipObject, serializeInvocation } from './utils' export const BUCKETS_KB_WRITTEN = [0, 128, 512, 1024, 2024, 4096, 10240, Infinity] @@ -52,7 +52,7 @@ export class FetchExecutor { if (this.hogHookEnabledForTeams(invocation.teamId)) { // This is very temporary until we are commited to Cyclotron const payload: HogFunctionInvocationAsyncRequest = { - state: await gzipObject(invocation), + state: await gzipObject(serializeInvocation(invocation)), teamId: invocation.teamId, hogFunctionId: invocation.hogFunction.id, asyncFunctionRequest: { diff --git a/plugin-server/src/cdp/utils.ts b/plugin-server/src/cdp/utils.ts index da1d64273f7aa..158814718dc0a 100644 --- a/plugin-server/src/cdp/utils.ts +++ b/plugin-server/src/cdp/utils.ts @@ -12,6 +12,7 @@ import { HogFunctionInvocation, HogFunctionInvocationGlobals, HogFunctionInvocationResult, + HogFunctionInvocationSerialized, HogFunctionLogEntrySerialized, HogFunctionType, ParsedClickhouseEvent, @@ -224,3 +225,13 @@ export function createInvocation( timings: [], } } + +export function serializeInvocation(invocation: HogFunctionInvocation): HogFunctionInvocationSerialized { + const serializedInvocation: HogFunctionInvocationSerialized = { + ...invocation, + hogFunctionId: invocation.hogFunction.id, + } + + delete (serializedInvocation as any).hogFunction + return invocation +} From fa8c4895b2e7ebeaf1496c10416115249cb9153c Mon Sep 17 00:00:00 2001 From: Ben White Date: Wed, 4 Sep 2024 17:21:04 +0200 Subject: [PATCH 2/6] Fix up parallel sending --- plugin-server/src/cdp/cdp-consumers.ts | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index 927457d6a7580..a08875e7bd7ee 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -548,8 +548,15 @@ export class CdpFunctionCallbackConsumer extends CdpConsumerBase { // NOTE: In the future this service will never do fetching (unless we decide we want to do it in node at some point) // This is just "for now" to support the transition to cyclotron const fetchQueue = invocations.filter((item) => item.queue === 'fetch') - const fetchResults = await this.runManyWithHeartbeat(fetchQueue, (item) => - this.fetchExecutor.execute(item) + + const fetchResults = await Promise.all( + fetchQueue.map((item) => { + return runInstrumentedFunction({ + statsKey: `cdpConsumer.handleEachBatch.fetchExecutor.execute`, + func: () => this.fetchExecutor.execute(item), + timeout: 1000, + }) + }) ) const hogQueue = invocations.filter((item) => item.queue === 'hog') From a06fefe7677ad644ac30c5c74f1904f7b3ce805c Mon Sep 17 00:00:00 2001 From: Ben White Date: Wed, 4 Sep 2024 17:21:56 +0200 Subject: [PATCH 3/6] Fixes --- plugin-server/src/cdp/cdp-consumers.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index a08875e7bd7ee..9b11c320b5fd9 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -296,7 +296,7 @@ abstract class CdpConsumerBase { // queuedMinMessages: this.hub.KAFKA_QUEUE_SIZE, consumerMaxWaitMs: this.hub.KAFKA_CONSUMPTION_MAX_WAIT_MS, consumerErrorBackoffMs: this.hub.KAFKA_CONSUMPTION_ERROR_BACKOFF_MS, - fetchBatchSize: this.hub.INGESTION_BATCH_SIZE, + fetchBatchSize: 10, // this.hub.INGESTION_BATCH_SIZE, batchingTimeoutMs: this.hub.KAFKA_CONSUMPTION_BATCHING_TIMEOUT_MS, topicCreationTimeoutMs: this.hub.KAFKA_TOPIC_CREATION_TIMEOUT_MS, topicMetadataRefreshInterval: this.hub.KAFKA_TOPIC_METADATA_REFRESH_INTERVAL_MS, @@ -607,11 +607,11 @@ export class CdpFunctionCallbackConsumer extends CdpConsumerBase { invocationSerialized.queueParameters = item.asyncFunctionResponse } - const hogFunction = - invocationSerialized.hogFunction ?? - (invocationSerialized.hogFunctionId - ? this.hogFunctionManager.getHogFunction(invocationSerialized.hogFunctionId) - : undefined) + const hogFunctionId = + invocationSerialized.hogFunctionId ?? invocationSerialized.hogFunction?.id + const hogFunction = hogFunctionId + ? this.hogFunctionManager.getHogFunction(hogFunctionId) + : undefined if (!hogFunction) { status.error('Error finding hog function', { From 60f36b951f1d7e73d2033632f6a185ccf9f77dd3 Mon Sep 17 00:00:00 2001 From: Ben White Date: Wed, 4 Sep 2024 17:23:21 +0200 Subject: [PATCH 4/6] Fixes --- plugin-server/src/cdp/cdp-consumers.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index 9b11c320b5fd9..3c26cf00ef75c 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -229,7 +229,7 @@ abstract class CdpConsumerBase { this.messagesToProduce.push({ topic: KAFKA_CDP_FUNCTION_CALLBACKS, value: request, - key: invocation.hogFunction.id, + key: `${invocation.hogFunction.id}:${invocation.id}`, }) } From 456de207e8f2991c492ba038ad6031aa97b178fd Mon Sep 17 00:00:00 2001 From: Ben White Date: Wed, 4 Sep 2024 17:23:55 +0200 Subject: [PATCH 5/6] Fix --- plugin-server/src/cdp/cdp-consumers.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index 3c26cf00ef75c..499b030e61564 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -296,7 +296,7 @@ abstract class CdpConsumerBase { // queuedMinMessages: this.hub.KAFKA_QUEUE_SIZE, consumerMaxWaitMs: this.hub.KAFKA_CONSUMPTION_MAX_WAIT_MS, consumerErrorBackoffMs: this.hub.KAFKA_CONSUMPTION_ERROR_BACKOFF_MS, - fetchBatchSize: 10, // this.hub.INGESTION_BATCH_SIZE, + fetchBatchSize: this.hub.INGESTION_BATCH_SIZE, batchingTimeoutMs: this.hub.KAFKA_CONSUMPTION_BATCHING_TIMEOUT_MS, topicCreationTimeoutMs: this.hub.KAFKA_TOPIC_CREATION_TIMEOUT_MS, topicMetadataRefreshInterval: this.hub.KAFKA_TOPIC_METADATA_REFRESH_INTERVAL_MS, From 97bb0d2ebe0fc6ca51af71be325aae8b5eb3e457 Mon Sep 17 00:00:00 2001 From: Ben White Date: Wed, 4 Sep 2024 17:30:26 +0200 Subject: [PATCH 6/6] Fixes --- plugin-server/src/cdp/cdp-consumers.ts | 8 -------- 1 file changed, 8 deletions(-) diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index 499b030e61564..8c4eec5e11951 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -237,8 +237,6 @@ abstract class CdpConsumerBase { await runInstrumentedFunction({ statsKey: `cdpConsumer.handleEachBatch.produceResults`, func: async () => { - console.log('Processing invocations results', results.length) - await Promise.all( results.map(async (result) => { // Tricky: We want to pull all the logs out as we don't want them to be passed around to any subsequent functions @@ -635,12 +633,6 @@ export class CdpFunctionCallbackConsumer extends CdpConsumerBase { }) ) - invocations.forEach((item) => { - if (!item.hogFunction?.id) { - console.error('No hog function id', item) - } - }) - return invocations }, })