From b20d6f86baf986ce6c6b91354a3efee08ed200ef Mon Sep 17 00:00:00 2001 From: Ben White Date: Wed, 4 Sep 2024 10:44:34 +0200 Subject: [PATCH] feat(cdp): Pre-cyclotron refactor (#24681) --- plugin-server/src/capabilities.ts | 6 - .../src/cdp/async-function-executor.ts | 210 ----- plugin-server/src/cdp/cdp-api.ts | 119 +-- plugin-server/src/cdp/cdp-consumers.ts | 753 +++++++++--------- plugin-server/src/cdp/fetch-executor.ts | 143 ++++ plugin-server/src/cdp/hog-executor.ts | 214 +++-- plugin-server/src/cdp/hog-masker.ts | 17 +- plugin-server/src/cdp/hog-watcher.ts | 4 +- plugin-server/src/cdp/types.ts | 76 +- plugin-server/src/cdp/utils.ts | 27 +- plugin-server/src/config/config.ts | 1 + plugin-server/src/main/pluginsServer.ts | 35 +- plugin-server/src/types.ts | 3 +- ...backs-consumer.test.ts => cdp-api.test.ts} | 33 +- .../tests/cdp/cdp-consumer.e2e.test.ts | 222 ++++++ .../cdp/cdp-processed-events-consumer.test.ts | 333 +++++--- plugin-server/tests/cdp/fixtures.ts | 60 +- .../tests/cdp/groups-manager.test.ts | 8 + plugin-server/tests/cdp/hog-executor.test.ts | 286 +++---- .../tests/cdp/hog-function-manager.test.ts | 2 +- plugin-server/tests/cdp/hog-masker.test.ts | 86 +- plugin-server/tests/cdp/hog-watcher.test.ts | 4 +- plugin-server/tests/cdp/utils.test.ts | 8 +- plugin-server/tests/server.test.ts | 1 - 24 files changed, 1430 insertions(+), 1221 deletions(-) delete mode 100644 plugin-server/src/cdp/async-function-executor.ts create mode 100644 plugin-server/src/cdp/fetch-executor.ts rename plugin-server/tests/cdp/{cdp-function-callbacks-consumer.test.ts => cdp-api.test.ts} (91%) create mode 100644 plugin-server/tests/cdp/cdp-consumer.e2e.test.ts diff --git a/plugin-server/src/capabilities.ts b/plugin-server/src/capabilities.ts index 7b8c8461b78ad..6a9d30af15ff4 100644 --- a/plugin-server/src/capabilities.ts +++ b/plugin-server/src/capabilities.ts @@ -25,7 +25,6 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin preflightSchedules: true, cdpProcessedEvents: true, cdpFunctionCallbacks: true, - cdpFunctionOverflow: true, cdpCyclotronWorker: true, syncInlinePlugins: true, ...sharedCapabilities, @@ -104,11 +103,6 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin cdpFunctionCallbacks: true, ...sharedCapabilities, } - case PluginServerMode.cdp_function_overflow: - return { - cdpFunctionOverflow: true, - ...sharedCapabilities, - } case PluginServerMode.cdp_cyclotron_worker: return { cdpCyclotronWorker: true, diff --git a/plugin-server/src/cdp/async-function-executor.ts b/plugin-server/src/cdp/async-function-executor.ts deleted file mode 100644 index fe6df753dc723..0000000000000 --- a/plugin-server/src/cdp/async-function-executor.ts +++ /dev/null @@ -1,210 +0,0 @@ -import cyclotron from '@posthog/cyclotron' -import { Histogram } from 'prom-client' - -import { buildIntegerMatcher } from '../config/config' -import { PluginsServerConfig, ValueMatcher } from '../types' -import { trackedFetch } from '../utils/fetch' -import { status } from '../utils/status' -import { RustyHook } from '../worker/rusty-hook' -import { HogFunctionInvocationAsyncRequest, HogFunctionInvocationAsyncResponse } from './types' - -export const BUCKETS_KB_WRITTEN = [0, 128, 512, 1024, 2024, 4096, 10240, Infinity] - -const histogramFetchPayloadSize = new Histogram({ - name: 'cdp_async_function_fetch_payload_size_kb', - help: 'The size in kb of the batches we are receiving from Kafka', - buckets: BUCKETS_KB_WRITTEN, -}) - -const histogramHogHooksPayloadSize = new Histogram({ - name: 'cdp_async_function_hoghooks_payload_size_kb', - help: 'The size in kb of the batches we are receiving from Kafka', - buckets: BUCKETS_KB_WRITTEN, -}) - -export type AsyncFunctionExecutorOptions = { - sync?: boolean -} - -export class AsyncFunctionExecutor { - hogHookEnabledForTeams: ValueMatcher - cyclotronEnabledForTeams: ValueMatcher - - constructor(private serverConfig: PluginsServerConfig, private rustyHook: RustyHook) { - this.hogHookEnabledForTeams = buildIntegerMatcher(serverConfig.CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS, true) - this.cyclotronEnabledForTeams = buildIntegerMatcher(serverConfig.CDP_ASYNC_FUNCTIONS_CYCLOTRON_TEAMS, true) - } - - async execute( - request: HogFunctionInvocationAsyncRequest, - options: AsyncFunctionExecutorOptions = { sync: false } - ): Promise { - if (!request.asyncFunctionRequest) { - throw new Error('No async function request provided') - } - - const loggingContext = { - hogFunctionId: request.hogFunctionId, - asyncFunctionName: request.asyncFunctionRequest.name, - } - status.info('🦔', `[AsyncFunctionExecutor] Executing async function`, loggingContext) - - switch (request.asyncFunctionRequest.name) { - // TODO: Add error case here - if we don't get a valid queued message then we should log something against the function - case 'fetch': - return await this.asyncFunctionFetch(request, options) - default: - status.error( - '🦔', - `[HogExecutor] Unknown async function: ${request.asyncFunctionRequest.name}`, - loggingContext - ) - } - } - - private async asyncFunctionFetch( - request: HogFunctionInvocationAsyncRequest, - options?: AsyncFunctionExecutorOptions - ): Promise { - if (!request.asyncFunctionRequest) { - return - } - - const asyncFunctionResponse: HogFunctionInvocationAsyncResponse['asyncFunctionResponse'] = { - response: null, - timings: [], - } - - try { - // Sanitize the args - const [url, fetchOptions] = request.asyncFunctionRequest.args as [ - string | undefined, - Record | undefined - ] - - if (typeof url !== 'string') { - status.error('🦔', `[HogExecutor] Invalid URL`, { ...request, url }) - return - } - - const method = fetchOptions?.method || 'POST' - const headers = fetchOptions?.headers || { - 'Content-Type': 'application/json', - } - let body = fetchOptions?.body - // Modify the body to ensure it is a string (we allow Hog to send an object to keep things simple) - body = body ? (typeof body === 'string' ? body : JSON.stringify(body)) : body - - // Finally overwrite the args with the sanitized ones - request.asyncFunctionRequest.args = [url, { method, headers, body }] - - if (body) { - histogramFetchPayloadSize.observe(body.length / 1024) - } - - // If the caller hasn't forced it to be synchronous and the team has the cyclotron or - // rustyhook enabled, enqueue it in one of those services. - if (!options?.sync && this.cyclotronEnabledForTeams(request.teamId)) { - try { - await cyclotron.createJob({ - teamId: request.teamId, - functionId: request.hogFunctionId, - queueName: 'fetch', - // TODO: The async function compression changes happen upstream of this - // function. I guess we'll want to unwind that change because we actually - // want the `vmState` (and the rest of state) so we can put it into PG here. - vmState: '', - parameters: JSON.stringify({ - return_queue: 'hog', - url, - method, - headers, - // The body is passed in the `blob` field below. - }), - metadata: JSON.stringify({}), - // Fetch bodies are passed in the binary blob column/field. - blob: toUint8Array(body), - }) - } catch (e) { - status.error( - '🦔', - `[HogExecutor] Cyclotron failed to enqueue async fetch function, sending directly instead`, - { - error: e, - } - ) - } - } else if (!options?.sync && this.hogHookEnabledForTeams(request.teamId)) { - const hoghooksPayload = JSON.stringify(request) - - histogramHogHooksPayloadSize.observe(hoghooksPayload.length / 1024) - - const enqueued = await this.rustyHook.enqueueForHog(JSON.stringify(request)) - if (enqueued) { - return - } - } - - status.info('🦔', `[HogExecutor] Webhook not sent via rustyhook, sending directly instead`) - - const start = performance.now() - const fetchResponse = await trackedFetch(url, { - method, - body, - headers, - timeout: this.serverConfig.EXTERNAL_REQUEST_TIMEOUT_MS, - }) - - let responseBody = await fetchResponse.text() - try { - responseBody = JSON.parse(responseBody) - } catch (err) { - // Ignore - } - - const duration = performance.now() - start - - asyncFunctionResponse.timings!.push({ - kind: 'async_function', - duration_ms: duration, - }) - - asyncFunctionResponse.response = { - status: fetchResponse.status, - body: responseBody, - } - } catch (err) { - status.error('🦔', `[HogExecutor] Error during fetch`, { error: String(err) }) - asyncFunctionResponse.error = 'Something went wrong with the fetch request.' - } - - const response: HogFunctionInvocationAsyncResponse = { - state: request.state, - teamId: request.teamId, - hogFunctionId: request.hogFunctionId, - asyncFunctionResponse, - } - - return response - } -} - -function toUint8Array(data: any): Uint8Array | undefined { - if (data === null || data === undefined) { - return undefined - } - - if (data instanceof Uint8Array) { - return data - } - - if (data instanceof ArrayBuffer) { - return new Uint8Array(data) - } - - if (typeof data === 'string') { - return new TextEncoder().encode(data) - } - - return new TextEncoder().encode(JSON.stringify(data)) -} diff --git a/plugin-server/src/cdp/cdp-api.ts b/plugin-server/src/cdp/cdp-api.ts index 943091af13814..34de05942471e 100644 --- a/plugin-server/src/cdp/cdp-api.ts +++ b/plugin-server/src/cdp/cdp-api.ts @@ -4,16 +4,17 @@ import { DateTime } from 'luxon' import { Hub } from '../types' import { status } from '../utils/status' import { delay } from '../utils/utils' -import { AsyncFunctionExecutor } from './async-function-executor' +import { FetchExecutor } from './fetch-executor' import { HogExecutor } from './hog-executor' import { HogFunctionManager } from './hog-function-manager' import { HogWatcher, HogWatcherState } from './hog-watcher' -import { HogFunctionInvocation, HogFunctionInvocationAsyncRequest, HogFunctionType, LogEntry } from './types' +import { HogFunctionInvocationResult, HogFunctionType, LogEntry } from './types' +import { createInvocation } from './utils' export class CdpApi { private hogExecutor: HogExecutor private hogFunctionManager: HogFunctionManager - private asyncFunctionExecutor: AsyncFunctionExecutor + private fetchExecutor: FetchExecutor private hogWatcher: HogWatcher constructor( @@ -21,13 +22,13 @@ export class CdpApi { dependencies: { hogExecutor: HogExecutor hogFunctionManager: HogFunctionManager - asyncFunctionExecutor: AsyncFunctionExecutor + fetchExecutor: FetchExecutor hogWatcher: HogWatcher } ) { this.hogExecutor = dependencies.hogExecutor this.hogFunctionManager = dependencies.hogFunctionManager - this.asyncFunctionExecutor = dependencies.asyncFunctionExecutor + this.fetchExecutor = dependencies.fetchExecutor this.hogWatcher = dependencies.hogWatcher } @@ -104,14 +105,6 @@ export class CdpApi { return } - const invocation: HogFunctionInvocation = { - id, - globals: globals, - teamId: team.id, - hogFunctionId: id, - timings: [], - } - // We use the provided config if given, otherwise the function's config // We use the provided config if given, otherwise the function's config const compoundConfiguration: HogFunctionType = { @@ -119,64 +112,72 @@ export class CdpApi { ...(configuration ?? {}), } - // TODO: Type the configuration better so we don't make mistakes here await this.hogFunctionManager.enrichWithIntegrations([compoundConfiguration]) - let response = this.hogExecutor.execute(compoundConfiguration, invocation) + let lastResponse: HogFunctionInvocationResult | null = null const logs: LogEntry[] = [] - while (response.asyncFunctionRequest) { - invocation.vmState = response.invocation.vmState - - const asyncFunctionRequest = response.asyncFunctionRequest - - if (mock_async_functions || asyncFunctionRequest.name !== 'fetch') { - response.logs.push({ - level: 'info', - timestamp: DateTime.now(), - message: `Async function '${asyncFunctionRequest.name}' was mocked with arguments:`, - }) - - response.logs.push({ - level: 'info', - timestamp: DateTime.now(), - message: `${asyncFunctionRequest.name}(${asyncFunctionRequest.args - .map((x) => JSON.stringify(x, null, 2)) - .join(', ')})`, - }) + let count = 0 - // Add the state, simulating what executeAsyncResponse would do - invocation.vmState!.stack.push({ status: 200, body: {} }) - } else { - const asyncInvocationRequest: HogFunctionInvocationAsyncRequest = { - state: '', // WE don't care about the state for this level of testing - teamId: team.id, - hogFunctionId: hogFunction.id, - asyncFunctionRequest, - } - const asyncRes = await this.asyncFunctionExecutor!.execute(asyncInvocationRequest, { - sync: true, - }) - - if (!asyncRes || asyncRes.asyncFunctionResponse.error) { - response.logs.push({ - level: 'error', - timestamp: DateTime.now(), - message: 'Failed to execute async function', - }) + while (!lastResponse || !lastResponse.finished) { + if (count > 5) { + throw new Error('Too many iterations') + } + count += 1 + + let response: HogFunctionInvocationResult + + const invocation = + lastResponse?.invocation || + createInvocation( + { + ...globals, + project: { + id: team.id, + name: team.name, + url: `${this.hub.SITE_URL ?? 'http://localhost:8000'}/project/${team.id}`, + }, + }, + compoundConfiguration + ) + + if (invocation.queue === 'fetch') { + if (mock_async_functions) { + // Add the state, simulating what executeAsyncResponse would do + response = { + invocation: { + ...invocation, + queue: 'hog', + queueParameters: { response: { status: 200, body: {} } }, + }, + finished: false, + logs: [ + { + level: 'info', + timestamp: DateTime.now(), + message: `Async function 'fetch' was mocked with arguments:`, + }, + { + level: 'info', + timestamp: DateTime.now(), + message: `fetch(${JSON.stringify(invocation.queueParameters, null, 2)})`, + }, + ], + } + } else { + response = await this.fetchExecutor!.executeLocally(invocation) } - invocation.vmState!.stack.push(asyncRes?.asyncFunctionResponse.response ?? null) + } else { + response = this.hogExecutor.execute(invocation) } logs.push(...response.logs) - response = this.hogExecutor.execute(compoundConfiguration, invocation) + lastResponse = response } - logs.push(...response.logs) - res.json({ - status: response.finished ? 'success' : 'error', - error: String(response.error), + status: lastResponse.finished ? 'success' : 'error', + error: String(lastResponse.error), logs: logs, }) } catch (e) { diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index 544947ba6a5be..010180bc784bb 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -1,28 +1,27 @@ import cyclotron from '@posthog/cyclotron' import { captureException } from '@sentry/node' -import { features, librdkafkaVersion, Message } from 'node-rdkafka' +import { Message } from 'node-rdkafka' import { Counter, Histogram } from 'prom-client' import { KAFKA_APP_METRICS_2, KAFKA_CDP_FUNCTION_CALLBACKS, - KAFKA_CDP_FUNCTION_OVERFLOW, KAFKA_EVENTS_JSON, KAFKA_EVENTS_PLUGIN_INGESTION, KAFKA_LOG_ENTRIES, } from '../config/kafka-topics' import { BatchConsumer, startBatchConsumer } from '../kafka/batch-consumer' -import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../kafka/config' -import { createKafkaProducer } from '../kafka/producer' +import { createRdConnectionConfigFromEnvVars } from '../kafka/config' import { addSentryBreadcrumbsEventListeners } from '../main/ingestion-queues/kafka-metrics' import { runInstrumentedFunction } from '../main/utils' import { AppMetric2Type, Hub, PluginServerService, RawClickHouseEvent, TeamId, TimestampFormat } from '../types' +import { createKafkaProducerWrapper } from '../utils/db/hub' import { KafkaProducerWrapper } from '../utils/db/kafka-producer-wrapper' import { captureTeamEvent } from '../utils/posthog' import { status } from '../utils/status' import { castTimestampOrNow } from '../utils/utils' import { RustyHook } from '../worker/rusty-hook' -import { AsyncFunctionExecutor } from './async-function-executor' +import { FetchExecutor } from './fetch-executor' import { GroupsManager } from './groups-manager' import { HogExecutor } from './hog-executor' import { HogFunctionManager } from './hog-function-manager' @@ -30,20 +29,19 @@ import { HogMasker } from './hog-masker' import { HogWatcher, HogWatcherState } from './hog-watcher' import { CdpRedis, createCdpRedisPool } from './redis' import { - CdpOverflowMessage, - HogFunctionAsyncFunctionResponse, HogFunctionInvocation, - HogFunctionInvocationAsyncRequest, - HogFunctionInvocationAsyncResponse, HogFunctionInvocationGlobals, HogFunctionInvocationResult, + HogFunctionInvocationSerialized, + HogFunctionInvocationSerializedCompressed, HogFunctionMessageToProduce, - HogFunctionOverflowedGlobals, HogFunctionType, + HogHooksFetchResponse, } from './types' import { convertToCaptureEvent, convertToHogFunctionInvocationGlobals, + createInvocation, gzipObject, prepareLogEntriesForClickhouse, unGzipObject, @@ -70,12 +68,6 @@ const counterFunctionInvocation = new Counter({ labelNames: ['outcome'], // One of 'failed', 'succeeded', 'overflowed', 'disabled', 'filtered' }) -const counterAsyncFunctionResponse = new Counter({ - name: 'cdp_async_function_response', - help: 'An async function response was received with an outcome', - labelNames: ['outcome'], // One of 'failed', 'succeeded', 'overflowed', 'disabled', 'filtered' -}) - export interface TeamIDWithConfig { teamId: TeamId | null consoleLogIngestionEnabled: boolean @@ -84,7 +76,7 @@ export interface TeamIDWithConfig { abstract class CdpConsumerBase { batchConsumer?: BatchConsumer hogFunctionManager: HogFunctionManager - asyncFunctionExecutor: AsyncFunctionExecutor + fetchExecutor: FetchExecutor hogExecutor: HogExecutor hogWatcher: HogWatcher hogMasker: HogMasker @@ -109,7 +101,7 @@ abstract class CdpConsumerBase { this.hogMasker = new HogMasker(this.redis) this.hogExecutor = new HogExecutor(this.hogFunctionManager) const rustyHook = this.hub?.rustyHook ?? new RustyHook(this.hub) - this.asyncFunctionExecutor = new AsyncFunctionExecutor(this.hub, rustyHook) + this.fetchExecutor = new FetchExecutor(this.hub, rustyHook) this.groupsManager = new GroupsManager(this.hub) } @@ -163,29 +155,9 @@ abstract class CdpConsumerBase { return results } - public async handleEachBatch(messages: Message[], heartbeat: () => void): Promise { - status.info('🔁', `${this.name} - handling batch`, { - size: messages.length, - }) - - this.heartbeat = heartbeat + protected abstract _handleKafkaBatch(messages: Message[]): Promise - histogramKafkaBatchSize.observe(messages.length) - histogramKafkaBatchSizeKb.observe(messages.reduce((acc, m) => (m.value?.length ?? 0) + acc, 0) / 1024) - - return await runInstrumentedFunction({ - statsKey: `cdpConsumer.handleEachBatch`, - sendTimeoutGuardToSentry: false, - func: async () => { - await this._handleEachBatch(messages) - await this.produceQueuedMessages() - }, - }) - } - - protected abstract _handleEachBatch(messages: Message[]): Promise - - private async produceQueuedMessages() { + protected async produceQueuedMessages() { const messages = [...this.messagesToProduce] this.messagesToProduce = [] await Promise.all( @@ -232,21 +204,57 @@ abstract class CdpConsumerBase { }) } + protected async queueInvocations(invocation: HogFunctionInvocation[]) { + await Promise.all( + invocation.map(async (item) => { + await this.queueInvocation(item) + }) + ) + } + + protected async queueInvocation(invocation: HogFunctionInvocation) { + // TODO: Add cylcotron check here and enqueue that way + // For now we just enqueue to kafka + // For kafka style this is overkill to enqueue this way but it simplifies migrating to the new system + + const serializedInvocation: HogFunctionInvocationSerialized = { + ...invocation, + hogFunctionId: invocation.hogFunction.id, + } + + delete (serializedInvocation as any).hogFunction + + const request: HogFunctionInvocationSerializedCompressed = { + state: await gzipObject(serializedInvocation), + } + + // NOTE: This is very temporary as it is producing the response. the response will actually be produced by the 3rd party service + // Later this will actually be the _request_ which we will push to the async function topic if we make one + this.messagesToProduce.push({ + topic: KAFKA_CDP_FUNCTION_CALLBACKS, + value: request, + key: invocation.hogFunction.id, + }) + } + protected async processInvocationResults(results: HogFunctionInvocationResult[]): Promise { await runInstrumentedFunction({ statsKey: `cdpConsumer.handleEachBatch.produceResults`, func: async () => { + console.log('Processing invocations results', results.length) + await Promise.all( results.map(async (result) => { // Tricky: We want to pull all the logs out as we don't want them to be passed around to any subsequent functions - - this.produceAppMetric({ - team_id: result.invocation.teamId, - app_source_id: result.invocation.hogFunctionId, - metric_kind: result.error ? 'failure' : 'success', - metric_name: result.error ? 'failed' : 'succeeded', - count: 1, - }) + if (result.finished || result.error) { + this.produceAppMetric({ + team_id: result.invocation.teamId, + app_source_id: result.invocation.hogFunction.id, + metric_kind: result.error ? 'failure' : 'success', + metric_name: result.error ? 'failed' : 'succeeded', + count: 1, + }) + } this.produceLogs(result) @@ -266,24 +274,9 @@ abstract class CdpConsumerBase { }) } - if (result.asyncFunctionRequest) { - const request: HogFunctionInvocationAsyncRequest = { - state: await gzipObject(result.invocation), - teamId: result.invocation.teamId, - hogFunctionId: result.invocation.hogFunctionId, - asyncFunctionRequest: result.asyncFunctionRequest, - } - const res = await this.runWithHeartbeat(() => this.asyncFunctionExecutor.execute(request)) - - // NOTE: This is very temporary as it is producing the response. the response will actually be produced by the 3rd party service - // Later this will actually be the _request_ which we will push to the async function topic if we make one - if (res) { - this.messagesToProduce.push({ - topic: KAFKA_CDP_FUNCTION_CALLBACKS, - value: res, - key: res.hogFunctionId, - }) - } + if (!result.finished) { + // If it isn't finished then we need to put it back on the queue + await this.queueInvocation(result.invocation) } }) ) @@ -291,53 +284,149 @@ abstract class CdpConsumerBase { }) } - protected async executeAsyncResponses( - asyncResponses: HogFunctionInvocationAsyncResponse[] - ): Promise { - return await runInstrumentedFunction({ - statsKey: `cdpConsumer.handleEachBatch.executeAsyncResponses`, - func: async () => { - asyncResponses.forEach((x) => { - counterAsyncFunctionResponse.inc({ - outcome: x.asyncFunctionResponse.error ? 'failed' : 'succeeded', - }) + protected async startKafkaConsumer() { + this.batchConsumer = await startBatchConsumer({ + connectionConfig: createRdConnectionConfigFromEnvVars(this.hub), + groupId: this.consumerGroupId, + topic: this.topic, + autoCommit: true, + sessionTimeout: this.hub.KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS, + maxPollIntervalMs: this.hub.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS, + // the largest size of a message that can be fetched by the consumer. + // the largest size our MSK cluster allows is 20MB + // we only use 9 or 10MB but there's no reason to limit this 🤷️ + consumerMaxBytes: this.hub.KAFKA_CONSUMPTION_MAX_BYTES, + consumerMaxBytesPerPartition: this.hub.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION, + // our messages are very big, so we don't want to buffer too many + // queuedMinMessages: this.hub.KAFKA_QUEUE_SIZE, + consumerMaxWaitMs: this.hub.KAFKA_CONSUMPTION_MAX_WAIT_MS, + consumerErrorBackoffMs: this.hub.KAFKA_CONSUMPTION_ERROR_BACKOFF_MS, + fetchBatchSize: this.hub.INGESTION_BATCH_SIZE, + batchingTimeoutMs: this.hub.KAFKA_CONSUMPTION_BATCHING_TIMEOUT_MS, + topicCreationTimeoutMs: this.hub.KAFKA_TOPIC_CREATION_TIMEOUT_MS, + topicMetadataRefreshInterval: this.hub.KAFKA_TOPIC_METADATA_REFRESH_INTERVAL_MS, + eachBatch: async (messages, { heartbeat }) => { + status.info('🔁', `${this.name} - handling batch`, { + size: messages.length, }) - const invocationsWithResponses: [HogFunctionInvocation, HogFunctionAsyncFunctionResponse][] = [] - - // Deserialize the compressed data - await Promise.all( - asyncResponses.map(async (item) => { - try { - const invocation = await unGzipObject(item.state) - invocationsWithResponses.push([invocation, item.asyncFunctionResponse]) - } catch (e) { - status.error('Error unzipping message', e, item.state) - captureException(e, { - extra: { hogFunctionId: item.hogFunctionId, teamId: item.teamId }, - }) - } - }) - ) + this.heartbeat = heartbeat - const results = await this.runManyWithHeartbeat(invocationsWithResponses, (item) => - this.hogExecutor.executeAsyncResponse(...item) - ) + histogramKafkaBatchSize.observe(messages.length) + histogramKafkaBatchSizeKb.observe(messages.reduce((acc, m) => (m.value?.length ?? 0) + acc, 0) / 1024) - await this.hogWatcher.observeResults(results) - return results + return await runInstrumentedFunction({ + statsKey: `cdpConsumer.handleEachBatch`, + sendTimeoutGuardToSentry: false, + func: async () => { + await this._handleKafkaBatch(messages) + }, + }) }, + callEachBatchWhenEmpty: false, + }) + + addSentryBreadcrumbsEventListeners(this.batchConsumer.consumer) + + this.batchConsumer.consumer.on('disconnected', async (err) => { + // since we can't be guaranteed that the consumer will be stopped before some other code calls disconnect + // we need to listen to disconnect and make sure we're stopped + status.info('🔁', `${this.name} batch consumer disconnected, cleaning up`, { err }) + await this.stop() }) } - protected async executeMatchingFunctions( + public async start(): Promise { + // NOTE: This is only for starting shared services + await Promise.all([ + this.hogFunctionManager.start(), + this.hub.CYCLOTRON_DATABASE_URL + ? cyclotron.initManager({ shards: [{ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }] }) + : Promise.resolve(), + ]) + + this.kafkaProducer = await createKafkaProducerWrapper(this.hub) + this.kafkaProducer.producer.connect() + + await this.startKafkaConsumer() + } + + public async stop(): Promise { + status.info('🔁', `${this.name} - stopping`) + this.isStopping = true + + // Mark as stopping so that we don't actually process any more incoming messages, but still keep the process alive + status.info('🔁', `${this.name} - stopping batch consumer`) + await this.batchConsumer?.stop() + status.info('🔁', `${this.name} - stopping kafka producer`) + await this.kafkaProducer?.disconnect() + status.info('🔁', `${this.name} - stopping hog function manager and hog watcher`) + await Promise.all([this.hogFunctionManager.stop()]) + + status.info('👍', `${this.name} - stopped!`) + } + + public isHealthy() { + // TODO: Check either kafka consumer or cyclotron worker exists + // and that whatever exists is healthy + return this.batchConsumer?.isHealthy() + } +} + +/** + * This consumer handles incoming events from the main clickhouse topic + */ + +export class CdpProcessedEventsConsumer extends CdpConsumerBase { + protected name = 'CdpProcessedEventsConsumer' + protected topic = KAFKA_EVENTS_JSON + protected consumerGroupId = 'cdp-processed-events-consumer' + + public async processBatch(invocationGlobals: HogFunctionInvocationGlobals[]): Promise { + if (!invocationGlobals.length) { + return [] + } + + const invocationsToBeQueued = await this.runWithHeartbeat(() => + this.createHogFunctionInvocations(invocationGlobals) + ) + + if (this.hub.CDP_EVENT_PROCESSOR_EXECUTE_FIRST_STEP) { + // NOTE: This is for testing the two ways of enqueueing processing. It will be swapped out for a cyclotron env check + // Kafka based workflow + const invocationResults = await runInstrumentedFunction({ + statsKey: `cdpConsumer.handleEachBatch.executeInvocations`, + func: async () => { + const hogResults = await this.runManyWithHeartbeat(invocationsToBeQueued, (item) => + this.hogExecutor.execute(item) + ) + return [...hogResults] + }, + }) + + await this.hogWatcher.observeResults(invocationResults) + await this.processInvocationResults(invocationResults) + } else { + await this.queueInvocations(invocationsToBeQueued) + } + + await this.produceQueuedMessages() + + return invocationsToBeQueued + } + + /** + * Finds all matching hog functions for the given globals. + * Filters them for their disabled state as well as masking configs + * + */ + protected async createHogFunctionInvocations( invocationGlobals: HogFunctionInvocationGlobals[] - ): Promise { + ): Promise { return await runInstrumentedFunction({ - statsKey: `cdpConsumer.handleEachBatch.executeMatchingFunctions`, + statsKey: `cdpConsumer.handleEachBatch.queueMatchingFunctions`, func: async () => { - const possibleInvocations: { globals: HogFunctionInvocationGlobals; hogFunction: HogFunctionType }[] = - [] + const possibleInvocations: HogFunctionInvocation[] = [] // TODO: Add a helper to hog functions to determine if they require groups or not and then only load those await this.groupsManager.enrichGroups(invocationGlobals) @@ -347,10 +436,7 @@ abstract class CdpConsumerBase { const { matchingFunctions, nonMatchingFunctions } = this.hogExecutor.findMatchingFunctions(globals) possibleInvocations.push( - ...matchingFunctions.map((hogFunction) => ({ - globals, - hogFunction, - })) + ...matchingFunctions.map((hogFunction) => createInvocation(globals, hogFunction)) ) nonMatchingFunctions.forEach((item) => @@ -400,334 +486,209 @@ abstract class CdpConsumerBase { }) }) - const results = ( - await this.runManyWithHeartbeat(notMaskedInvocations, (item) => - this.hogExecutor.executeFunction(item.globals, item.hogFunction) - ) - ).filter((x) => !!x) as HogFunctionInvocationResult[] - - await this.hogWatcher.observeResults(results) - return results - }, - }) - } - - public async start(): Promise { - status.info('🔁', `${this.name} - starting`, { - librdKafkaVersion: librdkafkaVersion, - kafkaCapabilities: features, - }) - - // NOTE: This is the only place where we need to use the shared server config - const globalConnectionConfig = createRdConnectionConfigFromEnvVars(this.hub) - const globalProducerConfig = createRdProducerConfigFromEnvVars(this.hub) - - await Promise.all([ - this.hogFunctionManager.start(), - this.hub.CYCLOTRON_DATABASE_URL - ? cyclotron.initManager({ shards: [{ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }] }) - : Promise.resolve(), - ]) - - this.kafkaProducer = new KafkaProducerWrapper( - await createKafkaProducer(globalConnectionConfig, globalProducerConfig) - ) - - this.kafkaProducer.producer.connect() - - this.batchConsumer = await startBatchConsumer({ - connectionConfig: createRdConnectionConfigFromEnvVars(this.hub), - groupId: this.consumerGroupId, - topic: this.topic, - autoCommit: true, - sessionTimeout: this.hub.KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS, - maxPollIntervalMs: this.hub.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS, - // the largest size of a message that can be fetched by the consumer. - // the largest size our MSK cluster allows is 20MB - // we only use 9 or 10MB but there's no reason to limit this 🤷️ - consumerMaxBytes: this.hub.KAFKA_CONSUMPTION_MAX_BYTES, - consumerMaxBytesPerPartition: this.hub.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION, - // our messages are very big, so we don't want to buffer too many - // queuedMinMessages: this.hub.KAFKA_QUEUE_SIZE, - consumerMaxWaitMs: this.hub.KAFKA_CONSUMPTION_MAX_WAIT_MS, - consumerErrorBackoffMs: this.hub.KAFKA_CONSUMPTION_ERROR_BACKOFF_MS, - fetchBatchSize: this.hub.INGESTION_BATCH_SIZE, - batchingTimeoutMs: this.hub.KAFKA_CONSUMPTION_BATCHING_TIMEOUT_MS, - topicCreationTimeoutMs: this.hub.KAFKA_TOPIC_CREATION_TIMEOUT_MS, - topicMetadataRefreshInterval: this.hub.KAFKA_TOPIC_METADATA_REFRESH_INTERVAL_MS, - eachBatch: async (messages, { heartbeat }) => { - return await this.handleEachBatch(messages, heartbeat) + return notMaskedInvocations }, - callEachBatchWhenEmpty: false, }) - - addSentryBreadcrumbsEventListeners(this.batchConsumer.consumer) - - this.batchConsumer.consumer.on('disconnected', async (err) => { - // since we can't be guaranteed that the consumer will be stopped before some other code calls disconnect - // we need to listen to disconnect and make sure we're stopped - status.info('🔁', `${this.name} batch consumer disconnected, cleaning up`, { err }) - await this.stop() - }) - } - - public async stop(): Promise { - status.info('🔁', `${this.name} - stopping`) - this.isStopping = true - - // Mark as stopping so that we don't actually process any more incoming messages, but still keep the process alive - status.info('🔁', `${this.name} - stopping batch consumer`) - await this.batchConsumer?.stop() - status.info('🔁', `${this.name} - stopping kafka producer`) - await this.kafkaProducer?.disconnect() - status.info('🔁', `${this.name} - stopping hog function manager and hog watcher`) - await Promise.all([this.hogFunctionManager.stop()]) - - status.info('👍', `${this.name} - stopped!`) - } - - public isHealthy() { - return this.batchConsumer?.isHealthy() } -} - -/** - * This consumer handles incoming events from the main clickhouse topic - */ -export class CdpProcessedEventsConsumer extends CdpConsumerBase { - protected name = 'CdpProcessedEventsConsumer' - protected topic = KAFKA_EVENTS_JSON - protected consumerGroupId = 'cdp-processed-events-consumer' - - public async _handleEachBatch(messages: Message[]): Promise { + // This consumer always parses from kafka + public async _handleKafkaBatch(messages: Message[]): Promise { const invocationGlobals = await this.runWithHeartbeat(() => runInstrumentedFunction({ statsKey: `cdpConsumer.handleEachBatch.parseKafkaMessages`, - func: async () => await this.parseMessages(messages), - }) - ) - - if (!invocationGlobals.length) { - return - } - - const invocationResults = await this.runWithHeartbeat(() => this.executeMatchingFunctions(invocationGlobals)) - - await this.processInvocationResults(invocationResults) - } - - private async parseMessages(messages: Message[]): Promise { - const events: HogFunctionInvocationGlobals[] = [] - await Promise.all( - messages.map(async (message) => { - try { - const clickHouseEvent = JSON.parse(message.value!.toString()) as RawClickHouseEvent - - if (!this.hogFunctionManager.teamHasHogFunctions(clickHouseEvent.team_id)) { - // No need to continue if the team doesn't have any functions - return - } - - const team = await this.hub.teamManager.fetchTeam(clickHouseEvent.team_id) - if (!team) { - return - } - events.push( - convertToHogFunctionInvocationGlobals( - clickHouseEvent, - team, - this.hub.SITE_URL ?? 'http://localhost:8000' - ) + func: async () => { + const events: HogFunctionInvocationGlobals[] = [] + await Promise.all( + messages.map(async (message) => { + try { + const clickHouseEvent = JSON.parse(message.value!.toString()) as RawClickHouseEvent + + if (!this.hogFunctionManager.teamHasHogFunctions(clickHouseEvent.team_id)) { + // No need to continue if the team doesn't have any functions + return + } + + const team = await this.hub.teamManager.fetchTeam(clickHouseEvent.team_id) + if (!team) { + return + } + events.push( + convertToHogFunctionInvocationGlobals( + clickHouseEvent, + team, + this.hub.SITE_URL ?? 'http://localhost:8000' + ) + ) + } catch (e) { + status.error('Error parsing message', e) + } + }) ) - } catch (e) { - status.error('Error parsing message', e) - } + + return events + }, }) ) - return events + await this.processBatch(invocationGlobals) } } /** - * This consumer handles callbacks from async functions. + * This consumer handles actually invoking hog in a loop */ export class CdpFunctionCallbackConsumer extends CdpConsumerBase { protected name = 'CdpFunctionCallbackConsumer' protected topic = KAFKA_CDP_FUNCTION_CALLBACKS protected consumerGroupId = 'cdp-function-callback-consumer' - public async _handleEachBatch(messages: Message[]): Promise { - const events = await this.runWithHeartbeat(() => - runInstrumentedFunction({ - statsKey: `cdpConsumer.handleEachBatch.parseKafkaMessages`, - func: () => Promise.resolve(this.parseMessages(messages)), - }) - ) - - if (!events.length) { + public async processBatch(invocations: HogFunctionInvocation[]): Promise { + if (!invocations.length) { return } - const invocationResults = await this.runWithHeartbeat(() => this.executeAsyncResponses(events)) - - await this.processInvocationResults(invocationResults) - } + const invocationResults = await runInstrumentedFunction({ + statsKey: `cdpConsumer.handleEachBatch.executeInvocations`, + func: async () => { + // NOTE: In the future this service will never do fetching (unless we decide we want to do it in node at some point) + // This is just "for now" to support the transition to cyclotron + const fetchQueue = invocations.filter((item) => item.queue === 'fetch') + const fetchResults = await this.runManyWithHeartbeat(fetchQueue, (item) => + this.fetchExecutor.execute(item) + ) - private parseMessages(messages: Message[]): HogFunctionInvocationAsyncResponse[] { - const events: HogFunctionInvocationAsyncResponse[] = [] - messages.map((message) => { - try { - const event = JSON.parse(message.value!.toString()) - events.push(event as HogFunctionInvocationAsyncResponse) - } catch (e) { - status.error('Error parsing message', e) - } + const hogQueue = invocations.filter((item) => item.queue === 'hog') + const hogResults = await this.runManyWithHeartbeat(hogQueue, (item) => this.hogExecutor.execute(item)) + return [...hogResults, ...(fetchResults.filter(Boolean) as HogFunctionInvocationResult[])] + }, }) - return events + await this.hogWatcher.observeResults(invocationResults) + await this.processInvocationResults(invocationResults) + await this.produceQueuedMessages() } -} - -/** - * This consumer handles overflow for both incoming events as well as callbacks. - * In the future we might want multiple consumers but for now this is fine. - */ -export class CdpOverflowConsumer extends CdpConsumerBase { - protected name = 'CdpOverflowConsumer' - protected topic = KAFKA_CDP_FUNCTION_OVERFLOW - protected consumerGroupId = 'cdp-overflow-consumer' - - public async _handleEachBatch(messages: Message[]): Promise { - const overflowedGlobals = await this.runWithHeartbeat(() => + public async _handleKafkaBatch(messages: Message[]): Promise { + const events = await this.runWithHeartbeat(() => runInstrumentedFunction({ statsKey: `cdpConsumer.handleEachBatch.parseKafkaMessages`, - func: () => Promise.resolve(this.parseMessages(messages)), - }) - ) - - const invocationResults = await this.executeOverflowedFunctions(overflowedGlobals) - - await this.processInvocationResults(invocationResults) - } + func: async () => { + // TRICKY: In the future we won't use kafka. For now though we need to parse messages as Cyclotron style jobs + // or hoghooks async callbacks + + const invocations: HogFunctionInvocation[] = [] + + // Parse the base message value + const entries: (HogHooksFetchResponse | HogFunctionInvocationSerializedCompressed)[] = messages + .map((message) => { + try { + return JSON.parse(message.value!.toString()) + } catch (e) { + status.error('Error parsing message', e) + } - protected async executeOverflowedFunctions( - invocationGlobals: HogFunctionOverflowedGlobals[] - ): Promise { - return await runInstrumentedFunction({ - statsKey: `cdpConsumer.handleEachBatch.executeOverflowedFunctions`, - func: async () => { - // TODO: Add a helper to hog functions to determine if they require groups or not and then only load those - await this.groupsManager.enrichGroups(invocationGlobals.map((x) => x.globals)) - - const invocations = invocationGlobals - .map((item) => - item.hogFunctionIds.map((hogFunctionId) => ({ - globals: item.globals, - hogFunctionId, - })) + return undefined + }) + .filter(Boolean) + + // Deserialize the compressed data + await Promise.all( + entries.map(async (item) => { + try { + const invocationSerialized = await unGzipObject( + item.state + ) + + if ('asyncFunctionResponse' in item) { + // This means it is a callback from hoghooks so we need to add the response to the invocation + invocationSerialized.queue = 'hog' + invocationSerialized.queueParameters = item.asyncFunctionResponse + } + + const hogFunction = this.hogFunctionManager.getHogFunction( + invocationSerialized.hogFunctionId + ) + if (!hogFunction) { + status.error('Error finding hog function', { + id: invocationSerialized.hogFunctionId, + }) + return + } + + const invocation: HogFunctionInvocation = { + ...invocationSerialized, + hogFunction, + } + + delete (invocation as any).hogFunctionId + + invocations.push(invocation) + } catch (e) { + status.error('Error unzipping message', e, item.state) + captureException(e) + } + }) ) - .flat() - - const states = await this.hogWatcher.getStates(invocationGlobals.map((x) => x.hogFunctionIds).flat()) - const results = ( - await this.runManyWithHeartbeat(invocations, (item) => { - const state = states[item.hogFunctionId].state - if (state >= HogWatcherState.disabledForPeriod) { - this.produceAppMetric({ - team_id: item.globals.project.id, - app_source_id: item.hogFunctionId, - metric_kind: 'failure', - metric_name: - state === HogWatcherState.disabledForPeriod - ? 'disabled_temporarily' - : 'disabled_permanently', - count: 1, - }) - return + invocations.forEach((item) => { + if (!item.hogFunction?.id) { + console.error('No hog function id', item) } - return this.hogExecutor.executeFunction(item.globals, item.hogFunctionId) }) - ).filter((x) => !!x) as HogFunctionInvocationResult[] - - await this.hogWatcher.observeResults(results) - return results - }, - }) - } - private parseMessages(messages: Message[]): HogFunctionOverflowedGlobals[] { - const invocationGlobals: HogFunctionOverflowedGlobals[] = [] - messages.map((message) => { - try { - const parsed = JSON.parse(message.value!.toString()) as CdpOverflowMessage - - if (parsed.source === 'event_invocations') { - invocationGlobals.push(parsed.payload) - } - } catch (e) { - // TODO: We probably want to crash here right as this means something went really wrong and needs investigating? - status.error('Error parsing message', e) - } - }) + return invocations + }, + }) + ) - return invocationGlobals + await this.processBatch(events) } } -// TODO: Split out non-Kafka specific parts of CdpConsumerBase so that it can be used by the -// Cyclotron worker below. Or maybe we can just wait, and rip the Kafka bits out once Cyclotron is -// shipped (and rename it something other than consomer, probably). For now, this is an easy way to -// use existing code and get an end-to-end demo shipped. -export class CdpCyclotronWorker extends CdpConsumerBase { - protected name = 'CdpCyclotronWorker' - protected topic = 'UNUSED-CdpCyclotronWorker' - protected consumerGroupId = 'UNUSED-CdpCyclotronWorker' - private runningWorker: Promise | undefined - private isUnhealthy = false - - public async _handleEachBatch(_: Message[]): Promise { - // Not called, we override `start` below to use Cyclotron instead. - } - - private async innerStart() { - try { - const limit = 100 // TODO: Make configurable. - while (!this.isStopping) { - const jobs = await cyclotron.dequeueJobsWithVmState('hog', limit) - for (const job of jobs) { - // TODO: Reassemble a HogFunctionInvocationAsyncResponse (or whatever proper type) - // from the fields on the job, and then execute the next Hog step. - console.log(job.id) - } - } - } catch (err) { - this.isUnhealthy = true - console.error('Error in Cyclotron worker', err) - throw err - } - } - - public async start() { - await cyclotron.initManager({ shards: [{ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }] }) - await cyclotron.initWorker({ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }) - - // Consumer `start` expects an async task is started, and not that `start` itself blocks - // indefinitely. - this.runningWorker = this.innerStart() - - return Promise.resolve() - } - - public async stop() { - await super.stop() - await this.runningWorker - } - - public isHealthy() { - return this.isUnhealthy - } -} +// // TODO: Split out non-Kafka specific parts of CdpConsumerBase so that it can be used by the +// // Cyclotron worker below. Or maybe we can just wait, and rip the Kafka bits out once Cyclotron is +// // shipped (and rename it something other than consumer, probably). For now, this is an easy way to +// // use existing code and get an end-to-end demo shipped. +// export class CdpCyclotronWorker extends CdpFunctionCallbackConsumer { +// protected name = 'CdpCyclotronWorker' +// protected topic = 'UNUSED-CdpCyclotronWorker' +// protected consumerGroupId = 'UNUSED-CdpCyclotronWorker' +// private runningWorker: Promise | undefined +// private isUnhealthy = false + +// private async innerStart() { +// try { +// const limit = 100 // TODO: Make configurable. +// while (!this.isStopping) { +// const jobs = await cyclotron.dequeueJobsWithVmState('hog', limit) +// // TODO: Decode jobs into the right types + +// await this.processBatch(jobs) +// } +// } catch (err) { +// this.isUnhealthy = true +// console.error('Error in Cyclotron worker', err) +// throw err +// } +// } + +// public async start() { +// await cyclotron.initManager({ shards: [{ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }] }) +// await cyclotron.initWorker({ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }) + +// // Consumer `start` expects an async task is started, and not that `start` itself blocks +// // indefinitely. +// this.runningWorker = this.innerStart() + +// return Promise.resolve() +// } + +// public async stop() { +// await super.stop() +// await this.runningWorker +// } + +// public isHealthy() { +// return this.isUnhealthy +// } +// } diff --git a/plugin-server/src/cdp/fetch-executor.ts b/plugin-server/src/cdp/fetch-executor.ts new file mode 100644 index 0000000000000..b2e99ef0a1836 --- /dev/null +++ b/plugin-server/src/cdp/fetch-executor.ts @@ -0,0 +1,143 @@ +import { Histogram } from 'prom-client' + +import { buildIntegerMatcher } from '../config/config' +import { PluginsServerConfig, ValueMatcher } from '../types' +import { trackedFetch } from '../utils/fetch' +import { status } from '../utils/status' +import { RustyHook } from '../worker/rusty-hook' +import { + HogFunctionInvocation, + HogFunctionInvocationAsyncRequest, + HogFunctionInvocationResult, + HogFunctionQueueParametersFetchRequest, + HogFunctionQueueParametersFetchResponse, +} from './types' +import { gzipObject } from './utils' + +export const BUCKETS_KB_WRITTEN = [0, 128, 512, 1024, 2024, 4096, 10240, Infinity] + +const histogramFetchPayloadSize = new Histogram({ + name: 'cdp_async_function_fetch_payload_size_kb', + help: 'The size in kb of the batches we are receiving from Kafka', + buckets: BUCKETS_KB_WRITTEN, +}) + +const histogramHogHooksPayloadSize = new Histogram({ + name: 'cdp_async_function_hoghooks_payload_size_kb', + help: 'The size in kb of the batches we are receiving from Kafka', + buckets: BUCKETS_KB_WRITTEN, +}) + +/** + * This class is only used by the kafka based queuing system. For the Cyclotron system there is no need for this. + */ +export class FetchExecutor { + hogHookEnabledForTeams: ValueMatcher + + constructor(private serverConfig: PluginsServerConfig, private rustyHook: RustyHook) { + this.hogHookEnabledForTeams = buildIntegerMatcher(serverConfig.CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS, true) + } + + async execute(invocation: HogFunctionInvocation): Promise { + if (invocation.queue !== 'fetch' || !invocation.queueParameters) { + throw new Error('Bad invocation') + } + + const params = invocation.queueParameters as HogFunctionQueueParametersFetchRequest + if (params.body) { + histogramFetchPayloadSize.observe(params.body.length / 1024) + } + + try { + if (this.hogHookEnabledForTeams(invocation.teamId)) { + // This is very temporary until we are commited to Cyclotron + const payload: HogFunctionInvocationAsyncRequest = { + state: await gzipObject(invocation), + teamId: invocation.teamId, + hogFunctionId: invocation.hogFunction.id, + asyncFunctionRequest: { + name: 'fetch', + args: [ + params.url, + { + ...params, + }, + ], + }, + } + const hoghooksPayload = JSON.stringify(payload) + histogramHogHooksPayloadSize.observe(hoghooksPayload.length / 1024) + const enqueued = await this.rustyHook.enqueueForHog(hoghooksPayload) + if (enqueued) { + // We return nothing here hoghooks will take care of that + return + } + } + + status.info('🦔', `[HogExecutor] Webhook not sent via rustyhook, sending directly instead`) + } catch (err) { + status.error('🦔', `[HogExecutor] Error during fetch`, { error: String(err) }) + } + + return await this.executeLocally(invocation) + } + + async executeLocally(invocation: HogFunctionInvocation): Promise { + if (invocation.queue !== 'fetch' || !invocation.queueParameters) { + throw new Error('Bad invocation') + } + + const params = invocation.queueParameters as HogFunctionQueueParametersFetchRequest + + const resParams: HogFunctionQueueParametersFetchResponse = { + response: { + status: 0, + body: {}, + }, + error: null, + timings: [], + } + + try { + const start = performance.now() + const fetchResponse = await trackedFetch(params.url, { + method: params.method, + body: params.body, + headers: params.headers, + timeout: this.serverConfig.EXTERNAL_REQUEST_TIMEOUT_MS, + }) + + let responseBody = await fetchResponse.text() + try { + responseBody = JSON.parse(responseBody) + } catch (err) { + // Ignore + } + + const duration = performance.now() - start + + resParams.timings!.push({ + kind: 'async_function', + duration_ms: duration, + }) + + resParams.response = { + status: fetchResponse.status, + body: responseBody, + } + } catch (err) { + status.error('🦔', `[HogExecutor] Error during fetch`, { error: String(err) }) + resParams.error = 'Something went wrong with the fetch request.' + } + + return { + invocation: { + ...invocation, + queue: 'hog', + queueParameters: resParams, + }, + finished: false, + logs: [], + } + } +} diff --git a/plugin-server/src/cdp/hog-executor.ts b/plugin-server/src/cdp/hog-executor.ts index 1e9fb6719aaa3..382f6b3fc3549 100644 --- a/plugin-server/src/cdp/hog-executor.ts +++ b/plugin-server/src/cdp/hog-executor.ts @@ -5,14 +5,13 @@ import { Histogram } from 'prom-client' import RE2 from 're2' import { status } from '../utils/status' -import { UUIDT } from '../utils/utils' import { HogFunctionManager } from './hog-function-manager' import { HogFunctionInvocation, - HogFunctionInvocationAsyncResponse, HogFunctionInvocationGlobals, HogFunctionInvocationGlobalsWithInputs, HogFunctionInvocationResult, + HogFunctionQueueParametersFetchResponse, HogFunctionType, } from './types' import { convertToHogFunctionFilterGlobal } from './utils' @@ -128,115 +127,11 @@ export class HogExecutor { } } - /** - * Intended to be invoked as a starting point from an event - */ - executeFunction( - event: HogFunctionInvocationGlobals, - functionOrId: HogFunctionType | HogFunctionType['id'] - ): HogFunctionInvocationResult | undefined { - const hogFunction = - typeof functionOrId === 'string' - ? this.hogFunctionManager.getTeamHogFunction(event.project.id, functionOrId) - : functionOrId - - if (!hogFunction) { - return - } - - // Add the source of the trigger to the globals - const modifiedGlobals: HogFunctionInvocationGlobals = { - ...event, - source: { - name: hogFunction.name ?? `Hog function: ${hogFunction.id}`, - url: `${event.project.url}/pipeline/destinations/hog-${hogFunction.id}/configuration/`, - }, - } - - return this.execute(hogFunction, { - id: new UUIDT().toString(), - globals: modifiedGlobals, - teamId: hogFunction.team_id, - hogFunctionId: hogFunction.id, - timings: [], - }) - } - - /** - * Intended to be invoked as a continuation from an async function - */ - executeAsyncResponse( - invocation: HogFunctionInvocation, - asyncFunctionResponse: HogFunctionInvocationAsyncResponse['asyncFunctionResponse'] - ): HogFunctionInvocationResult { - if (!invocation.hogFunctionId) { - throw new Error('No hog function id provided') - } - - const { logs = [], response = null, error: asyncError, timings = [] } = asyncFunctionResponse - - if (response?.status && response.status >= 400) { - // Generic warn log for bad status codes - logs.push({ - level: 'warn', - timestamp: DateTime.now(), - message: `Fetch returned bad status: ${response.status}`, - }) - } - - const errorRes = (error = 'Something went wrong'): HogFunctionInvocationResult => ({ - invocation, - finished: false, - error, - logs: [ - ...logs, - { - level: 'error', - timestamp: DateTime.now(), - message: error, - }, - ], - }) - - const hogFunction = this.hogFunctionManager.getTeamHogFunction( - invocation.globals.project.id, - invocation.hogFunctionId - ) - - if (!hogFunction || !invocation.vmState || asyncError) { - return errorRes( - !hogFunction - ? `Hog Function with ID ${invocation.hogFunctionId} not found` - : asyncError - ? asyncError - : 'No VM state provided for async response' - ) - } - - if (typeof response?.body === 'string') { - try { - response.body = JSON.parse(response.body) - } catch (e) { - // pass - if it isn't json we just pass it on - } - } - - // Add the response to the stack to continue execution - invocation.vmState.stack.push(response) - invocation.timings.push(...timings) - - const res = this.execute(hogFunction, invocation) - - // Add any timings and logs from the async function - res.logs = [...(logs ?? []), ...res.logs] - - return res - } - - execute(hogFunction: HogFunctionType, invocation: HogFunctionInvocation): HogFunctionInvocationResult { + execute(invocation: HogFunctionInvocation): HogFunctionInvocationResult { const loggingContext = { - hogFunctionId: hogFunction.id, - hogFunctionName: hogFunction.name, + invocationId: invocation.id, + hogFunctionId: invocation.hogFunction.id, + hogFunctionName: invocation.hogFunction.name, hogFunctionUrl: invocation.globals.source?.url, } @@ -244,7 +139,6 @@ export class HogExecutor { const result: HogFunctionInvocationResult = { invocation, - asyncFunctionRequest: undefined, finished: false, capturedPostHogEvents: [], logs: [], @@ -257,12 +151,58 @@ export class HogExecutor { }) try { + // If the queueParameter is set then we have an expected format that we want to parse and add to the stack + if (invocation.queueParameters) { + const { + logs = [], + response = null, + error, + timings = [], + } = invocation.queueParameters as HogFunctionQueueParametersFetchResponse + + // Reset the queue parameters to be sure + invocation.queue = 'hog' + invocation.queueParameters = undefined + + // Special handling for fetch + // TODO: Would be good to have a dedicated value in the fetch response for the status code + if (response?.status && response.status >= 400) { + // Generic warn log for bad status codes + logs.push({ + level: 'warn', + timestamp: DateTime.now(), + message: `Fetch returned bad status: ${response.status}`, + }) + } + + if (!invocation.vmState) { + throw new Error("VM state wasn't provided for queue parameters") + } + + if (error) { + throw new Error(error) + } + + if (typeof response?.body === 'string') { + try { + response.body = JSON.parse(response.body) + } catch (e) { + // pass - if it isn't json we just pass it on + } + } + + // Add the response to the stack to continue execution + invocation.vmState!.stack.push(response) + invocation.timings.push(...timings) + result.logs = [...logs, ...result.logs] + } + const start = performance.now() let globals: HogFunctionInvocationGlobalsWithInputs let execRes: ExecResult | undefined = undefined try { - globals = this.buildHogFunctionGlobals(hogFunction, invocation) + globals = this.buildHogFunctionGlobals(invocation) } catch (e) { result.logs.push({ level: 'error', @@ -273,11 +213,11 @@ export class HogExecutor { throw e } - const sensitiveValues = this.getSensitiveValues(hogFunction, globals.inputs) + const sensitiveValues = this.getSensitiveValues(invocation.hogFunction, globals.inputs) try { let hogLogs = 0 - execRes = execHog(invocation.vmState ?? hogFunction.bytecode, { + execRes = execHog(invocation.vmState ?? invocation.hogFunction.bytecode, { globals, maxAsyncSteps: MAX_ASYNC_STEPS, // NOTE: This will likely be configurable in the future asyncFunctions: { @@ -353,6 +293,7 @@ export class HogExecutor { hogExecutionDuration.observe(duration) result.finished = execRes.finished + result.invocation.vmState = execRes.state invocation.timings.push({ kind: 'hog', duration_ms: duration, @@ -373,10 +314,34 @@ export class HogExecutor { }) if (execRes.asyncFunctionName) { - result.invocation.vmState = execRes.state - result.asyncFunctionRequest = { - name: execRes.asyncFunctionName, - args: args, + switch (execRes.asyncFunctionName) { + case 'fetch': + // Sanitize the args + const [url, fetchOptions] = args as [string | undefined, Record | undefined] + + if (typeof url !== 'string') { + throw new Error('fetch: Invalid URL') + } + + const method = fetchOptions?.method || 'POST' + const headers = fetchOptions?.headers || { + 'Content-Type': 'application/json', + } + let body = fetchOptions?.body + // Modify the body to ensure it is a string (we allow Hog to send an object to keep things simple) + body = body ? (typeof body === 'string' ? body : JSON.stringify(body)) : body + + result.invocation.queue = 'fetch' + result.invocation.queueParameters = { + url, + method, + headers, + body, + } + + break + default: + throw new Error(`Unknown async function '${execRes.asyncFunctionName}'`) } } else { result.logs.push({ @@ -401,19 +366,20 @@ export class HogExecutor { } } catch (err) { result.error = err.message - status.error('🦔', `[HogExecutor] Error executing function ${hogFunction.id} - ${hogFunction.name}`, err) + status.error( + '🦔', + `[HogExecutor] Error executing function ${invocation.hogFunction.id} - ${invocation.hogFunction.name}`, + err + ) } return result } - buildHogFunctionGlobals( - hogFunction: HogFunctionType, - invocation: HogFunctionInvocation - ): HogFunctionInvocationGlobalsWithInputs { + buildHogFunctionGlobals(invocation: HogFunctionInvocation): HogFunctionInvocationGlobalsWithInputs { const builtInputs: Record = {} - Object.entries(hogFunction.inputs ?? {}).forEach(([key, item]) => { + Object.entries(invocation.hogFunction.inputs ?? {}).forEach(([key, item]) => { builtInputs[key] = item.value if (item.bytecode) { diff --git a/plugin-server/src/cdp/hog-masker.ts b/plugin-server/src/cdp/hog-masker.ts index 0ab4cfcb5c299..2f204ab855552 100644 --- a/plugin-server/src/cdp/hog-masker.ts +++ b/plugin-server/src/cdp/hog-masker.ts @@ -2,7 +2,7 @@ import { createHash } from 'crypto' import { execHog } from './hog-executor' import { CdpRedis } from './redis' -import { HogFunctionInvocationGlobals, HogFunctionType } from './types' +import { HogFunctionInvocation } from './types' export const BASE_REDIS_KEY = process.env.NODE_ENV == 'test' ? '@posthog-test/hog-masker' : '@posthog/hog-masker' const REDIS_KEY_TOKENS = `${BASE_REDIS_KEY}/mask` @@ -20,12 +20,7 @@ type MaskContext = { threshold: number | null } -type HogInvocationContext = { - globals: HogFunctionInvocationGlobals - hogFunction: HogFunctionType -} - -type HogInvocationContextWithMasker = HogInvocationContext & { +type HogInvocationContextWithMasker = HogFunctionInvocation & { masker?: MaskContext } @@ -39,9 +34,9 @@ type HogInvocationContextWithMasker = HogInvocationContext & { export class HogMasker { constructor(private redis: CdpRedis) {} - public async filterByMasking(invocations: HogInvocationContext[]): Promise<{ - masked: HogInvocationContext[] - notMasked: HogInvocationContext[] + public async filterByMasking(invocations: HogFunctionInvocation[]): Promise<{ + masked: HogFunctionInvocation[] + notMasked: HogFunctionInvocation[] }> { const invocationsWithMasker: HogInvocationContextWithMasker[] = [...invocations] const masks: Record = {} @@ -123,7 +118,7 @@ export class HogMasker { } return acc }, - { masked: [], notMasked: [] } as { masked: HogInvocationContext[]; notMasked: HogInvocationContext[] } + { masked: [], notMasked: [] } as { masked: HogFunctionInvocation[]; notMasked: HogFunctionInvocation[] } ) } } diff --git a/plugin-server/src/cdp/hog-watcher.ts b/plugin-server/src/cdp/hog-watcher.ts index 0e80c1a3cac41..38a2a3901b776 100644 --- a/plugin-server/src/cdp/hog-watcher.ts +++ b/plugin-server/src/cdp/hog-watcher.ts @@ -124,7 +124,7 @@ export class HogWatcher { const costs: Record = {} results.forEach((result) => { - let cost = (costs[result.invocation.hogFunctionId] = costs[result.invocation.hogFunctionId] || 0) + let cost = (costs[result.invocation.hogFunction.id] = costs[result.invocation.hogFunction.id] || 0) if (result.finished) { // If it is finished we can calculate the score based off of the timings @@ -142,7 +142,7 @@ export class HogWatcher { cost += this.hub.CDP_WATCHER_COST_ERROR } - costs[result.invocation.hogFunctionId] = cost + costs[result.invocation.hogFunction.id] = cost }) const res = await this.redis.usePipeline({ name: 'checkRateLimits' }, (pipeline) => { diff --git a/plugin-server/src/cdp/types.ts b/plugin-server/src/cdp/types.ts index b4b8155d971f0..19740db81306a 100644 --- a/plugin-server/src/cdp/types.ts +++ b/plugin-server/src/cdp/types.ts @@ -7,6 +7,7 @@ import { ElementPropertyFilter, EventPropertyFilter, PersonPropertyFilter, + Team, } from '../types' export type HogBytecode = any[] @@ -99,11 +100,6 @@ export type HogFunctionInvocationGlobalsWithInputs = HogFunctionInvocationGlobal inputs: Record } -export type HogFunctionOverflowedGlobals = { - hogFunctionIds: HogFunctionType['id'][] - globals: HogFunctionInvocationGlobals -} - export type HogFunctionFilterGlobals = { // Filter Hog is built in the same way as analytics so the global object is meant to be an event event: string @@ -157,23 +153,14 @@ export interface HogFunctionTiming { duration_ms: number } -// This is the "persistent" state of a hog function invocation -export type HogFunctionInvocation = { - id: string - globals: HogFunctionInvocationGlobals - teamId: number - hogFunctionId: HogFunctionType['id'] - // The current vmstate (set if the invocation is paused) - vmState?: VMState - timings: HogFunctionTiming[] -} - -export type HogFunctionAsyncFunctionRequest = { - name: string - args: any[] +export type HogFunctionQueueParametersFetchRequest = { + url: string + method: string + body: string + headers: Record } -export type HogFunctionAsyncFunctionResponse = { +export type HogFunctionQueueParametersFetchResponse = { /** An error message to indicate something went wrong and the invocation should be stopped */ error?: any /** The data to be passed to the Hog function from the response */ @@ -185,12 +172,33 @@ export type HogFunctionAsyncFunctionResponse = { logs?: LogEntry[] } +export type HogFunctionInvocationQueueParameters = + | HogFunctionQueueParametersFetchRequest + | HogFunctionQueueParametersFetchResponse + +export type HogFunctionInvocation = { + id: string + globals: HogFunctionInvocationGlobals + teamId: Team['id'] + hogFunction: HogFunctionType + queue: 'hog' | 'fetch' + queueParameters?: HogFunctionInvocationQueueParameters + // The current vmstate (set if the invocation is paused) + vmState?: VMState + timings: HogFunctionTiming[] +} + +export type HogFunctionAsyncFunctionRequest = { + name: string + args: any[] +} + // The result of an execution export type HogFunctionInvocationResult = { invocation: HogFunctionInvocation finished: boolean error?: any - asyncFunctionRequest?: HogFunctionAsyncFunctionRequest + // asyncFunctionRequest?: HogFunctionAsyncFunctionRequest logs: LogEntry[] capturedPostHogEvents?: HogFunctionCapturedEvent[] } @@ -202,13 +210,20 @@ export type HogFunctionInvocationAsyncRequest = { asyncFunctionRequest?: HogFunctionAsyncFunctionRequest } -export type HogFunctionInvocationAsyncResponse = { +export type HogHooksFetchResponse = { state: string // Serialized HogFunctionInvocation teamId: number hogFunctionId: HogFunctionType['id'] + asyncFunctionResponse: HogFunctionQueueParametersFetchResponse +} + +export type HogFunctionInvocationSerialized = Omit & { + // When serialized to kafka / cyclotron we only store the ID + hogFunctionId: HogFunctionType['id'] +} - // FOLLOWUP: do we want to type this more strictly? - asyncFunctionResponse: HogFunctionAsyncFunctionResponse +export type HogFunctionInvocationSerializedCompressed = { + state: string // Serialized HogFunctionInvocation } // Mostly copied from frontend types @@ -259,16 +274,13 @@ export type IntegrationType = { created_by_id?: number } -type CdpOverflowMessageInvocations = { - source: 'event_invocations' - payload: HogFunctionOverflowedGlobals -} - -export type CdpOverflowMessage = CdpOverflowMessageInvocations - export type HogFunctionMessageToProduce = { topic: string - value: CdpOverflowMessage | HogFunctionLogEntrySerialized | HogFunctionInvocationAsyncResponse | AppMetric2Type + value: + | HogFunctionLogEntrySerialized + | HogHooksFetchResponse + | AppMetric2Type + | HogFunctionInvocationSerializedCompressed key: string } diff --git a/plugin-server/src/cdp/utils.ts b/plugin-server/src/cdp/utils.ts index cc49b63fc1eb6..da1d64273f7aa 100644 --- a/plugin-server/src/cdp/utils.ts +++ b/plugin-server/src/cdp/utils.ts @@ -9,9 +9,11 @@ import { castTimestampOrNow, clickHouseTimestampToISO, UUIDT } from '../utils/ut import { HogFunctionCapturedEvent, HogFunctionFilterGlobals, + HogFunctionInvocation, HogFunctionInvocationGlobals, HogFunctionInvocationResult, HogFunctionLogEntrySerialized, + HogFunctionType, ParsedClickhouseEvent, } from './types' @@ -190,7 +192,7 @@ export const prepareLogEntriesForClickhouse = ( ...logEntry, team_id: result.invocation.teamId, log_source: 'hog_function', - log_source_id: result.invocation.hogFunctionId, + log_source_id: result.invocation.hogFunction.id, instance_id: result.invocation.id, timestamp: castTimestampOrNow(logEntry.timestamp, TimestampFormat.ClickHouse), } @@ -199,3 +201,26 @@ export const prepareLogEntriesForClickhouse = ( return preparedLogs } + +export function createInvocation( + globals: HogFunctionInvocationGlobals, + hogFunction: HogFunctionType +): HogFunctionInvocation { + // Add the source of the trigger to the globals + const modifiedGlobals: HogFunctionInvocationGlobals = { + ...globals, + source: { + name: hogFunction.name ?? `Hog function: ${hogFunction.id}`, + url: `${globals.project.url}/pipeline/destinations/hog-${hogFunction.id}/configuration/`, + }, + } + + return { + id: new UUIDT().toString(), + globals: modifiedGlobals, + teamId: hogFunction.team_id, + hogFunction, + queue: 'hog', + timings: [], + } +} diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index 936cde4e65656..afa2ba1d72fe3 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -185,6 +185,7 @@ export function getDefaultConfig(): PluginsServerConfig { CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS: '', CDP_ASYNC_FUNCTIONS_CYCLOTRON_TEAMS: '', CDP_REDIS_PASSWORD: '', + CDP_EVENT_PROCESSOR_EXECUTE_FIRST_STEP: true, CDP_REDIS_HOST: '', CDP_REDIS_PORT: 6479, diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index b2dacc0b9767c..cafdc0451806d 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -10,12 +10,7 @@ import v8Profiler from 'v8-profiler-next' import { getPluginServerCapabilities } from '../capabilities' import { CdpApi } from '../cdp/cdp-api' -import { - CdpCyclotronWorker, - CdpFunctionCallbackConsumer, - CdpOverflowConsumer, - CdpProcessedEventsConsumer, -} from '../cdp/cdp-consumers' +import { CdpFunctionCallbackConsumer, CdpProcessedEventsConsumer } from '../cdp/cdp-consumers' import { defaultConfig, sessionRecordingConsumerConfig } from '../config/config' import { Hub, PluginServerCapabilities, PluginServerService, PluginsServerConfig } from '../types' import { closeHub, createHub, createKafkaClient, createKafkaProducerWrapper } from '../utils/db/hub' @@ -463,24 +458,16 @@ export async function startPluginsServer( } } - if (capabilities.cdpFunctionOverflow) { - const hub = await setupHub() - const consumer = new CdpOverflowConsumer(hub) - await consumer.start() - services.push(consumer.service) - } - - if (capabilities.cdpCyclotronWorker) { - const hub = await setupHub() - if (hub.CYCLOTRON_DATABASE_URL) { - const worker = new CdpCyclotronWorker(hub) - await worker.start() - services.push(worker.service) - } else { - // This is a temporary solution until we *require* Cyclotron to be configured. - status.warn('💥', 'CYCLOTRON_DATABASE_URL is not set, not running Cyclotron worker') - } - } + // if (capabilities.cdpCyclotronWorker) { + // ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities) + // if (hub.CYCLOTRON_DATABASE_URL) { + // const worker = new CdpCyclotronWorker(hub) + // await worker.start() + // } else { + // // This is a temporary solution until we *require* Cyclotron to be configured. + // status.warn('💥', 'CYCLOTRON_DATABASE_URL is not set, not running Cyclotron worker') + // } + // } if (capabilities.http) { const app = setupCommonRoutes(services) diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 4cf4f66deeb6b..58253a210abd3 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -83,7 +83,6 @@ export enum PluginServerMode { recordings_blob_ingestion_overflow = 'recordings-blob-ingestion-overflow', cdp_processed_events = 'cdp-processed-events', cdp_function_callbacks = 'cdp-function-callbacks', - cdp_function_overflow = 'cdp-function-overflow', cdp_cyclotron_worker = 'cdp-cyclotron-worker', functional_tests = 'functional-tests', } @@ -118,6 +117,7 @@ export type CdpConfig = { CDP_REDIS_HOST: string CDP_REDIS_PORT: number CDP_REDIS_PASSWORD: string + CDP_EVENT_PROCESSOR_EXECUTE_FIRST_STEP: boolean } export interface PluginsServerConfig extends CdpConfig { @@ -342,7 +342,6 @@ export interface PluginServerCapabilities { sessionRecordingBlobOverflowIngestion?: boolean cdpProcessedEvents?: boolean cdpFunctionCallbacks?: boolean - cdpFunctionOverflow?: boolean cdpCyclotronWorker?: boolean appManagementSingleton?: boolean preflightSchedules?: boolean // Used for instance health checks on hobby deploy, not useful on cloud diff --git a/plugin-server/tests/cdp/cdp-function-callbacks-consumer.test.ts b/plugin-server/tests/cdp/cdp-api.test.ts similarity index 91% rename from plugin-server/tests/cdp/cdp-function-callbacks-consumer.test.ts rename to plugin-server/tests/cdp/cdp-api.test.ts index 278132739e951..f3c7dc3b22e05 100644 --- a/plugin-server/tests/cdp/cdp-function-callbacks-consumer.test.ts +++ b/plugin-server/tests/cdp/cdp-api.test.ts @@ -3,7 +3,7 @@ import supertest from 'supertest' import { CdpApi } from '../../src/cdp/cdp-api' import { CdpFunctionCallbackConsumer } from '../../src/cdp/cdp-consumers' -import { HogFunctionType } from '../../src/cdp/types' +import { HogFunctionInvocationGlobals, HogFunctionType } from '../../src/cdp/types' import { Hub, Team } from '../../src/types' import { closeHub, createHub } from '../../src/utils/db/hub' import { getFirstTeam, resetTestDatabase } from '../helpers/sql' @@ -64,7 +64,7 @@ const mockFetch: jest.Mock = require('../../src/utils/fetch').trackedFetch jest.setTimeout(1000) -describe('CDP Processed Events Consuner', () => { +describe('CDP API', () => { let processor: CdpFunctionCallbackConsumer let hub: Hub let team: Team @@ -102,20 +102,24 @@ describe('CDP Processed Events Consuner', () => { let app: express.Express let hogFunction: HogFunctionType - const globals = { - event: { - uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', - name: '$pageview', + const globals: Partial = { + groups: {}, + person: { + uuid: '123', + name: 'Jane Doe', + url: 'https://example.com/person/123', properties: { - $lib_version: '1.0.0', + email: 'example@posthog.com', }, }, - groups: {}, - person: { + event: { uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', - distinct_ids: ['b3a1fe86-b10c-43cc-acaf-d208977608d0'], + name: '$pageview', + distinct_id: '123', + timestamp: '2021-09-28T14:00:00Z', + url: 'https://example.com/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/2021-09-28T14:00:00Z', properties: { - email: 'test@posthog.com', + $lib_version: '1.0.0', }, }, } @@ -158,6 +162,7 @@ describe('CDP Processed Events Consuner', () => { .send({ globals, mock_async_functions: true }) expect(res.status).toEqual(200) + console.log(res.body.logs[3].message) expect(res.body).toMatchObject({ status: 'success', error: 'undefined', @@ -168,7 +173,7 @@ describe('CDP Processed Events Consuner', () => { }, { level: 'debug', - message: "Suspending function due to async function call 'fetch'. Payload: 1689 bytes", + message: "Suspending function due to async function call 'fetch'. Payload: 2010 bytes", }, { level: 'info', @@ -176,7 +181,7 @@ describe('CDP Processed Events Consuner', () => { }, { level: 'info', - message: expect.stringContaining('fetch("https://example.com/posthog-webhook",'), + message: expect.stringContaining('fetch({'), }, { level: 'debug', @@ -216,7 +221,7 @@ describe('CDP Processed Events Consuner', () => { }, { level: 'debug', - message: "Suspending function due to async function call 'fetch'. Payload: 1689 bytes", + message: "Suspending function due to async function call 'fetch'. Payload: 2010 bytes", }, { level: 'debug', diff --git a/plugin-server/tests/cdp/cdp-consumer.e2e.test.ts b/plugin-server/tests/cdp/cdp-consumer.e2e.test.ts new file mode 100644 index 0000000000000..8d6581aef9ef0 --- /dev/null +++ b/plugin-server/tests/cdp/cdp-consumer.e2e.test.ts @@ -0,0 +1,222 @@ +import { CdpFunctionCallbackConsumer, CdpProcessedEventsConsumer } from '../../src/cdp/cdp-consumers' +import { HogFunctionInvocationGlobals, HogFunctionType } from '../../src/cdp/types' +import { Hub, Team } from '../../src/types' +import { closeHub, createHub } from '../../src/utils/db/hub' +import { getFirstTeam, resetTestDatabase } from '../helpers/sql' +import { HOG_EXAMPLES, HOG_FILTERS_EXAMPLES, HOG_INPUTS_EXAMPLES } from './examples' +import { createHogExecutionGlobals, insertHogFunction as _insertHogFunction } from './fixtures' + +const mockConsumer = { + on: jest.fn(), + commitSync: jest.fn(), + commit: jest.fn(), + queryWatermarkOffsets: jest.fn(), + committed: jest.fn(), + assignments: jest.fn(), + isConnected: jest.fn(() => true), + getMetadata: jest.fn(), +} + +jest.mock('../../src/kafka/batch-consumer', () => { + return { + startBatchConsumer: jest.fn(() => + Promise.resolve({ + join: () => ({ + finally: jest.fn(), + }), + stop: jest.fn(), + consumer: mockConsumer, + }) + ), + } +}) + +jest.mock('../../src/utils/fetch', () => { + return { + trackedFetch: jest.fn(() => + Promise.resolve({ + status: 200, + text: () => Promise.resolve(JSON.stringify({ success: true })), + json: () => Promise.resolve({ success: true }), + }) + ), + } +}) + +jest.mock('../../src/utils/db/kafka-producer-wrapper', () => { + const mockKafkaProducer = { + producer: { + connect: jest.fn(), + }, + disconnect: jest.fn(), + produce: jest.fn(() => Promise.resolve()), + } + return { + KafkaProducerWrapper: jest.fn(() => mockKafkaProducer), + } +}) + +const mockFetch: jest.Mock = require('../../src/utils/fetch').trackedFetch + +const mockProducer = require('../../src/utils/db/kafka-producer-wrapper').KafkaProducerWrapper() + +jest.setTimeout(1000) + +const decodeKafkaMessage = (message: any): any => { + return { + ...message, + value: JSON.parse(message.value.toString()), + } +} + +const decodeAllKafkaMessages = (): any[] => { + return mockProducer.produce.mock.calls.map((x) => decodeKafkaMessage(x[0])) +} + +const convertToKafkaMessage = (message: any): any => { + return { + ...message, + value: Buffer.from(JSON.stringify(message.value)), + } +} + +/** + * NOTE: This isn't fully e2e... We still mock kafka but we trigger one queue from the other in a loop + */ +describe('CDP Consumers E2E', () => { + let processedEventsConsumer: CdpProcessedEventsConsumer + let functionProcessor: CdpFunctionCallbackConsumer + let hub: Hub + let team: Team + + const insertHogFunction = async (hogFunction: Partial) => { + const item = await _insertHogFunction(hub.postgres, team.id, hogFunction) + // Trigger the reload that django would do + await processedEventsConsumer.hogFunctionManager.reloadAllHogFunctions() + await functionProcessor.hogFunctionManager.reloadAllHogFunctions() + return item + } + + beforeEach(async () => { + await resetTestDatabase() + hub = await createHub() + team = await getFirstTeam(hub) + + processedEventsConsumer = new CdpProcessedEventsConsumer(hub) + await processedEventsConsumer.start() + functionProcessor = new CdpFunctionCallbackConsumer(hub) + await functionProcessor.start() + + mockFetch.mockClear() + }) + + afterEach(async () => { + jest.setTimeout(10000) + await processedEventsConsumer.stop() + await functionProcessor.stop() + await closeHub(hub) + }) + + afterAll(() => { + jest.useRealTimers() + }) + + describe('e2e fetch function', () => { + /** + * Tests here are somewhat expensive so should mostly simulate happy paths and the more e2e scenarios + */ + + let fnFetchNoFilters: HogFunctionType + let globals: HogFunctionInvocationGlobals + + let kafkaMessages = { + metrics: [] as any[], + logs: [] as any[], + invocations: [] as any[], + } + + beforeEach(async () => { + fnFetchNoFilters = await insertHogFunction({ + ...HOG_EXAMPLES.simple_fetch, + ...HOG_INPUTS_EXAMPLES.simple_fetch, + ...HOG_FILTERS_EXAMPLES.no_filters, + }) + + globals = createHogExecutionGlobals({ + project: { + id: team.id, + } as any, + event: { + uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', + name: '$pageview', + properties: { + $current_url: 'https://posthog.com', + $lib_version: '1.0.0', + }, + } as any, + }) + + kafkaMessages = { + metrics: [], + logs: [], + invocations: [], + } + }) + + const gatherProducedMessages = () => { + const allMessages = decodeAllKafkaMessages() + + allMessages.forEach((message) => { + if (message.topic === 'clickhouse_app_metrics2_test') { + kafkaMessages.metrics.push(message) + } else if (message.topic === 'log_entries_test') { + kafkaMessages.logs.push(message) + } else if (message.topic === 'cdp_function_callbacks_test') { + kafkaMessages.invocations.push(message) + } else { + throw new Error(`Unknown topic: ${message.topic}`) + } + }) + + mockProducer.produce.mockClear() + } + + it('should invoke a function via kafka transportation until completed', async () => { + // NOTE: We can skip kafka as the entry point + const invocations = await processedEventsConsumer.processBatch([globals]) + expect(invocations).toHaveLength(1) + gatherProducedMessages() + + expect(kafkaMessages.invocations).toHaveLength(1) + expect(kafkaMessages.invocations[0].topic).toEqual('cdp_function_callbacks_test') + mockProducer.produce.mockClear() + + while (kafkaMessages.invocations.length) { + await functionProcessor._handleKafkaBatch([convertToKafkaMessage(kafkaMessages.invocations[0])]) + kafkaMessages.invocations = [] + gatherProducedMessages() + } + + expect(kafkaMessages.metrics).toMatchObject([ + { + key: fnFetchNoFilters.id.toString(), + value: { + app_source: 'hog_function', + app_source_id: fnFetchNoFilters.id.toString(), + count: 1, + metric_kind: 'success', + metric_name: 'succeeded', + team_id: 2, + }, + }, + ]) + expect(kafkaMessages.logs.map((x) => x.value.message)).toEqual([ + 'Executing function', + "Suspending function due to async function call 'fetch'. Payload: 1902 bytes", + 'Resuming function', + 'Fetch response:, {"status":200,"body":{"success":true}}', + expect.stringContaining('Function completed'), + ]) + }) + }) +}) diff --git a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts index a5534036f848b..b0a1c09f15d6f 100644 --- a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts +++ b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts @@ -1,10 +1,16 @@ import { CdpProcessedEventsConsumer } from '../../src/cdp/cdp-consumers' -import { HogFunctionType } from '../../src/cdp/types' +import { HogWatcherState } from '../../src/cdp/hog-watcher' +import { HogFunctionInvocationGlobals, HogFunctionType } from '../../src/cdp/types' import { Hub, Team } from '../../src/types' import { closeHub, createHub } from '../../src/utils/db/hub' import { getFirstTeam, resetTestDatabase } from '../helpers/sql' import { HOG_EXAMPLES, HOG_FILTERS_EXAMPLES, HOG_INPUTS_EXAMPLES } from './examples' -import { createIncomingEvent, createMessage, insertHogFunction as _insertHogFunction } from './fixtures' +import { + createHogExecutionGlobals, + createIncomingEvent, + createMessage, + insertHogFunction as _insertHogFunction, +} from './fixtures' const mockConsumer = { on: jest.fn(), @@ -62,8 +68,6 @@ const mockProducer = require('../../src/utils/db/kafka-producer-wrapper').KafkaP jest.setTimeout(1000) -const noop = () => {} - const decodeKafkaMessage = (message: any): any => { return { ...message, @@ -71,7 +75,11 @@ const decodeKafkaMessage = (message: any): any => { } } -describe('CDP Processed Events Consuner', () => { +const decodeAllKafkaMessages = (): any[] => { + return mockProducer.produce.mock.calls.map((x) => decodeKafkaMessage(x[0])) +} + +describe('CDP Processed Events Consumer', () => { let processor: CdpProcessedEventsConsumer let hub: Hub let team: Team @@ -105,19 +113,163 @@ describe('CDP Processed Events Consuner', () => { }) describe('general event processing', () => { - /** - * Tests here are somewhat expensive so should mostly simulate happy paths and the more e2e scenarios - */ - it('can parse incoming messages correctly', async () => { - await insertHogFunction({ - ...HOG_EXAMPLES.simple_fetch, - ...HOG_INPUTS_EXAMPLES.simple_fetch, - ...HOG_FILTERS_EXAMPLES.no_filters, + beforeEach(() => { + hub.CDP_EVENT_PROCESSOR_EXECUTE_FIRST_STEP = false + }) + + describe('common processing', () => { + let fnFetchNoFilters: HogFunctionType + let fnPrinterPageviewFilters: HogFunctionType + let globals: HogFunctionInvocationGlobals + + beforeEach(async () => { + fnFetchNoFilters = await insertHogFunction({ + ...HOG_EXAMPLES.simple_fetch, + ...HOG_INPUTS_EXAMPLES.simple_fetch, + ...HOG_FILTERS_EXAMPLES.no_filters, + }) + + fnPrinterPageviewFilters = await insertHogFunction({ + ...HOG_EXAMPLES.input_printer, + ...HOG_INPUTS_EXAMPLES.secret_inputs, + ...HOG_FILTERS_EXAMPLES.pageview_or_autocapture_filter, + }) + + globals = createHogExecutionGlobals({ + project: { + id: team.id, + } as any, + event: { + uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', + name: '$pageview', + properties: { + $current_url: 'https://posthog.com', + $lib_version: '1.0.0', + }, + } as any, + }) + }) + + const matchInvocation = (hogFunction: HogFunctionType, globals: HogFunctionInvocationGlobals) => { + return { + hogFunction: { + id: hogFunction.id, + }, + globals: { + event: globals.event, + }, + } + } + + it('should process events', async () => { + const invocations = await processor.processBatch([globals]) + + expect(invocations).toHaveLength(2) + expect(invocations).toMatchObject([ + matchInvocation(fnFetchNoFilters, globals), + matchInvocation(fnPrinterPageviewFilters, globals), + ]) + + expect(mockProducer.produce).toHaveBeenCalledTimes(2) + + expect(decodeAllKafkaMessages()).toMatchObject([ + { + key: expect.any(String), + topic: 'cdp_function_callbacks_test', + value: { + state: expect.any(String), + }, + waitForAck: true, + }, + { + key: expect.any(String), + topic: 'cdp_function_callbacks_test', + value: { + state: expect.any(String), + }, + waitForAck: true, + }, + ]) + }) + + it("should filter out functions that don't match the filter", async () => { + globals.event.properties.$current_url = 'https://nomatch.com' + + const invocations = await processor.processBatch([globals]) + + expect(invocations).toHaveLength(1) + expect(invocations).toMatchObject([matchInvocation(fnFetchNoFilters, globals)]) + expect(mockProducer.produce).toHaveBeenCalledTimes(2) + + expect(decodeAllKafkaMessages()).toMatchObject([ + { + key: expect.any(String), + topic: 'clickhouse_app_metrics2_test', + value: { + app_source: 'hog_function', + app_source_id: fnPrinterPageviewFilters.id, + count: 1, + metric_kind: 'other', + metric_name: 'filtered', + team_id: 2, + timestamp: expect.any(String), + }, + }, + { + topic: 'cdp_function_callbacks_test', + }, + ]) }) - // Create a message that should be processed by this function - // Run the function and check that it was executed - await processor.handleEachBatch( - [ + + it.each([ + [HogWatcherState.disabledForPeriod, 'disabled_temporarily'], + [HogWatcherState.disabledIndefinitely, 'disabled_permanently'], + ])('should filter out functions that are disabled', async (state, metric_name) => { + await processor.hogWatcher.forceStateChange(fnFetchNoFilters.id, state) + await processor.hogWatcher.forceStateChange(fnPrinterPageviewFilters.id, state) + + const invocations = await processor.processBatch([globals]) + + expect(invocations).toHaveLength(0) + expect(mockProducer.produce).toHaveBeenCalledTimes(2) + + expect(decodeAllKafkaMessages()).toMatchObject([ + { + topic: 'clickhouse_app_metrics2_test', + value: { + app_source: 'hog_function', + app_source_id: fnFetchNoFilters.id, + count: 1, + metric_kind: 'failure', + metric_name: metric_name, + team_id: 2, + }, + }, + { + topic: 'clickhouse_app_metrics2_test', + value: { + app_source: 'hog_function', + app_source_id: fnPrinterPageviewFilters.id, + count: 1, + metric_kind: 'failure', + metric_name: metric_name, + team_id: 2, + }, + }, + ]) + }) + }) + + describe('kafka parsing', () => { + it('can parse incoming messages correctly', async () => { + await insertHogFunction({ + ...HOG_EXAMPLES.simple_fetch, + ...HOG_INPUTS_EXAMPLES.simple_fetch, + ...HOG_FILTERS_EXAMPLES.no_filters, + }) + // Create a message that should be processed by this function + // Run the function and check that it was executed + await processor._handleKafkaBatch([ createMessage( createIncomingEvent(team.id, { uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', @@ -127,37 +279,36 @@ describe('CDP Processed Events Consuner', () => { }), }) ), - ], - noop - ) - - expect(mockFetch).toHaveBeenCalledTimes(1) - expect(mockFetch.mock.calls[0]).toMatchInlineSnapshot(` - Array [ - "https://example.com/posthog-webhook", - Object { - "body": "{\\"event\\":{\\"uuid\\":\\"b3a1fe86-b10c-43cc-acaf-d208977608d0\\",\\"name\\":\\"$pageview\\",\\"distinct_id\\":\\"distinct_id_1\\",\\"properties\\":{\\"$lib_version\\":\\"1.0.0\\",\\"$elements_chain\\":\\"[]\\"},\\"timestamp\\":null,\\"url\\":\\"http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null\\"},\\"groups\\":{},\\"nested\\":{\\"foo\\":\\"http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null\\"},\\"person\\":null,\\"event_url\\":\\"http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null-test\\"}", - "headers": Object { - "version": "v=1.0.0", + ]) + + // Generall check that the message seemed to get processed + expect(decodeAllKafkaMessages()).toMatchObject([ + { + key: expect.any(String), + topic: 'cdp_function_callbacks_test', + value: { + state: expect.any(String), + }, + waitForAck: true, }, - "method": "POST", - "timeout": 10000, - }, - ] - `) + ]) + }) }) - it('generates logs and metrics and produces them to kafka', async () => { - const hogFunction = await insertHogFunction({ - ...HOG_EXAMPLES.simple_fetch, - ...HOG_INPUTS_EXAMPLES.simple_fetch, - ...HOG_FILTERS_EXAMPLES.no_filters, + describe('no delayed execution', () => { + beforeEach(() => { + hub.CDP_EVENT_PROCESSOR_EXECUTE_FIRST_STEP = true }) - // Create a message that should be processed by this function - // Run the function and check that it was executed - await processor.handleEachBatch( - [ + it('should invoke the initial function before enqueuing', async () => { + await insertHogFunction({ + ...HOG_EXAMPLES.simple_fetch, + ...HOG_INPUTS_EXAMPLES.simple_fetch, + ...HOG_FILTERS_EXAMPLES.no_filters, + }) + // Create a message that should be processed by this function + // Run the function and check that it was executed + await processor._handleKafkaBatch([ createMessage( createIncomingEvent(team.id, { uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', @@ -167,77 +318,37 @@ describe('CDP Processed Events Consuner', () => { }), }) ), - ], - noop - ) - - expect(mockFetch).toHaveBeenCalledTimes(1) - // Once for the async callback, twice for the logs, once for metrics - expect(mockProducer.produce).toHaveBeenCalledTimes(4) - - expect(decodeKafkaMessage(mockProducer.produce.mock.calls[0][0])).toEqual({ - key: expect.any(String), - topic: 'clickhouse_app_metrics2_test', - value: { - app_source: 'hog_function', - team_id: 2, - app_source_id: hogFunction.id, - metric_kind: 'success', - metric_name: 'succeeded', - count: 1, - timestamp: expect.any(String), - }, - waitForAck: true, - }) - - expect(decodeKafkaMessage(mockProducer.produce.mock.calls[1][0])).toEqual({ - key: expect.any(String), - topic: 'log_entries_test', - value: { - instance_id: expect.any(String), - level: 'debug', - log_source: 'hog_function', - log_source_id: expect.any(String), - message: 'Executing function', - team_id: 2, - timestamp: expect.any(String), - }, - - waitForAck: true, - }) + ]) - expect(decodeKafkaMessage(mockProducer.produce.mock.calls[2][0])).toMatchObject({ - topic: 'log_entries_test', - value: { - log_source: 'hog_function', - message: "Suspending function due to async function call 'fetch'. Payload: 1855 bytes", - team_id: 2, - }, - }) - - const msg = decodeKafkaMessage(mockProducer.produce.mock.calls[3][0]) - - expect(msg).toEqual({ - key: expect.any(String), - topic: 'cdp_function_callbacks_test', - value: { - state: expect.any(String), - hogFunctionId: hogFunction.id, - teamId: 2, - asyncFunctionResponse: { - response: { - status: 200, - body: { success: true }, + // General check that the message seemed to get processed + expect(decodeAllKafkaMessages()).toMatchObject([ + { + key: expect.any(String), + topic: 'log_entries_test', + value: { + message: 'Executing function', + }, + waitForAck: true, + }, + { + key: expect.any(String), + topic: 'log_entries_test', + value: { + message: expect.stringContaining( + "Suspending function due to async function call 'fetch'. Payload" + ), + }, + waitForAck: true, + }, + { + key: expect.any(String), + topic: 'cdp_function_callbacks_test', + value: { + state: expect.any(String), }, - timings: [ - { - kind: 'async_function', - duration_ms: expect.any(Number), - }, - ], + waitForAck: true, }, - }, - waitForAck: true, + ]) }) }) }) diff --git a/plugin-server/tests/cdp/fixtures.ts b/plugin-server/tests/cdp/fixtures.ts index 8f3fed7da62dc..52b8c20cf4b3d 100644 --- a/plugin-server/tests/cdp/fixtures.ts +++ b/plugin-server/tests/cdp/fixtures.ts @@ -1,22 +1,25 @@ import { randomUUID } from 'crypto' import { Message } from 'node-rdkafka' -import { HogFunctionInvocationGlobals, HogFunctionType, IntegrationType } from '../../src/cdp/types' +import { + HogFunctionInvocation, + HogFunctionInvocationGlobals, + HogFunctionType, + IntegrationType, +} from '../../src/cdp/types' import { ClickHouseTimestamp, RawClickHouseEvent, Team } from '../../src/types' import { PostgresRouter } from '../../src/utils/db/postgres' +import { UUIDT } from '../../src/utils/utils' import { insertRow } from '../helpers/sql' export const createHogFunction = (hogFunction: Partial) => { const item: HogFunctionType = { id: randomUUID(), + name: 'Hog Function', team_id: 1, - created_at: new Date().toISOString(), - updated_at: new Date().toISOString(), - created_by_id: 1001, enabled: true, - deleted: false, - description: '', hog: '', + bytecode: [], ...hogFunction, } @@ -68,14 +71,19 @@ export const insertHogFunction = async ( team_id: Team['id'], hogFunction: Partial = {} ): Promise => { - const res = await insertRow( - postgres, - 'posthog_hogfunction', - createHogFunction({ + // This is only used for testing so we need to override some values + + const res = await insertRow(postgres, 'posthog_hogfunction', { + ...createHogFunction({ ...hogFunction, team_id: team_id, - }) - ) + }), + description: '', + created_at: new Date().toISOString(), + updated_at: new Date().toISOString(), + created_by_id: 1001, + deleted: false, + }) return res } @@ -99,13 +107,14 @@ export const createHogExecutionGlobals = ( data: Partial = {} ): HogFunctionInvocationGlobals => { return { + groups: {}, ...data, person: { uuid: 'uuid', name: 'test', url: 'http://localhost:8000/persons/1', properties: { - $lib_version: '1.2.3', + email: 'test@posthog.com', }, ...(data.person ?? {}), }, @@ -128,3 +137,28 @@ export const createHogExecutionGlobals = ( }, } } + +export const createInvocation = ( + _hogFunction: Partial = {}, + _globals: Partial = {} +): HogFunctionInvocation => { + const hogFunction = createHogFunction(_hogFunction) + // Add the source of the trigger to the globals + let globals = createHogExecutionGlobals(_globals) + globals = { + ...globals, + source: { + name: hogFunction.name ?? `Hog function: ${hogFunction.id}`, + url: `${globals.project.url}/pipeline/destinations/hog-${hogFunction.id}/configuration/`, + }, + } + + return { + id: new UUIDT().toString(), + globals, + teamId: hogFunction.team_id, + hogFunction, + queue: 'hog', + timings: [], + } +} diff --git a/plugin-server/tests/cdp/groups-manager.test.ts b/plugin-server/tests/cdp/groups-manager.test.ts index eb3d718211ce1..f489d6b019045 100644 --- a/plugin-server/tests/cdp/groups-manager.test.ts +++ b/plugin-server/tests/cdp/groups-manager.test.ts @@ -50,6 +50,7 @@ describe('Groups Manager', () => { it('does nothing if no group properties found', async () => { const globals = createHogExecutionGlobals({ + groups: undefined, event: { properties: { $groups: { GroupA: 'id-1', GroupB: 'id-2' }, @@ -84,6 +85,7 @@ describe('Groups Manager', () => { { team_id: 1, group_type_index: 1, group_key: 'id-2', group_properties: { prop: 'value-2' } }, ] const globals = createHogExecutionGlobals({ + groups: undefined, event: { properties: { $groups: { GroupA: 'id-1', GroupB: 'id-2' }, @@ -125,14 +127,17 @@ describe('Groups Manager', () => { const items = [ // Should get both groups enriched createHogExecutionGlobals({ + groups: undefined, event: { properties: { $groups: { GroupA: 'id-1', GroupB: 'id-2' } } } as any, }), // Should get its group enriched (via reference) createHogExecutionGlobals({ + groups: undefined, event: { properties: { $groups: { GroupA: 'id-1' } } } as any, }), // Should get the right group for its team createHogExecutionGlobals({ + groups: undefined, project: { id: 2 } as any, event: { properties: { $groups: { GroupA: 'id-1' } } } as any, }), @@ -191,10 +196,12 @@ describe('Groups Manager', () => { it('cached group type queries', async () => { const globals = [ createHogExecutionGlobals({ + groups: undefined, project: { id: 1 } as any, event: { properties: { $groups: { GroupA: 'id-1', GroupB: 'id-2' } } } as any, }), createHogExecutionGlobals({ + groups: undefined, project: { id: 2 } as any, event: { properties: { $groups: { GroupA: 'id-1', GroupB: 'id-2' } } } as any, }), @@ -209,6 +216,7 @@ describe('Groups Manager', () => { globals.push( createHogExecutionGlobals({ + groups: undefined, project: { id: 3 } as any, event: { properties: { $groups: { GroupA: 'id-1', GroupB: 'id-2' } } } as any, }) diff --git a/plugin-server/tests/cdp/hog-executor.test.ts b/plugin-server/tests/cdp/hog-executor.test.ts index c7e48ed81a5ae..dc6350e0bb3d2 100644 --- a/plugin-server/tests/cdp/hog-executor.test.ts +++ b/plugin-server/tests/cdp/hog-executor.test.ts @@ -2,14 +2,14 @@ import { DateTime } from 'luxon' import { HogExecutor } from '../../src/cdp/hog-executor' import { HogFunctionManager } from '../../src/cdp/hog-function-manager' -import { - HogFunctionAsyncFunctionResponse, - HogFunctionInvocationResult, - HogFunctionType, - LogEntry, -} from '../../src/cdp/types' +import { HogFunctionAsyncFunctionResponse, HogFunctionType } from '../../src/cdp/types' import { HOG_EXAMPLES, HOG_FILTERS_EXAMPLES, HOG_INPUTS_EXAMPLES } from './examples' -import { createHogExecutionGlobals, createHogFunction, insertHogFunction as _insertHogFunction } from './fixtures' +import { + createHogExecutionGlobals, + createHogFunction, + createInvocation, + insertHogFunction as _insertHogFunction, +} from './fixtures' const createAsyncFunctionResponse = (response?: Record): HogFunctionAsyncFunctionResponse => { return { @@ -57,28 +57,35 @@ describe('Hog Executor', () => { mockFunctionManager.getTeamHogFunction.mockReturnValue(hogFunction) }) - it('can execute messages', () => { - const globals = createHogExecutionGlobals({ groups: {} }) - const results = executor - .findMatchingFunctions(createHogExecutionGlobals({ groups: {} })) - .matchingFunctions.map((x) => executor.executeFunction(globals, x) as HogFunctionInvocationResult) - expect(results).toHaveLength(1) - expect(results[0]).toMatchObject({ + it('can execute an invocation', () => { + const invocation = createInvocation(hogFunction) + const result = executor.execute(invocation) + expect(result).toEqual({ + capturedPostHogEvents: [], invocation: { id: expect.any(String), - hogFunctionId: hogFunction.id, + teamId: 1, + globals: invocation.globals, + hogFunction: invocation.hogFunction, + queue: 'fetch', + queueParameters: expect.any(Object), + timings: [ + { + kind: 'hog', + duration_ms: 0, + }, + ], + vmState: expect.any(Object), }, finished: false, - asyncFunctionRequest: {}, + logs: expect.any(Array), }) }) it('collects logs from the function', () => { - const globals = createHogExecutionGlobals({ groups: {} }) - const results = executor - .findMatchingFunctions(createHogExecutionGlobals({ groups: {} })) - .matchingFunctions.map((x) => executor.executeFunction(globals, x) as HogFunctionInvocationResult) - expect(results[0].logs).toMatchObject([ + const invocation = createInvocation(hogFunction) + const result = executor.execute(invocation) + expect(result.logs).toMatchObject([ { timestamp: expect.any(DateTime), level: 'debug', @@ -87,7 +94,7 @@ describe('Hog Executor', () => { { timestamp: expect.any(DateTime), level: 'debug', - message: "Suspending function due to async function call 'fetch'. Payload: 1814 bytes", + message: "Suspending function due to async function call 'fetch'. Payload: 1818 bytes", }, ]) }) @@ -97,13 +104,9 @@ describe('Hog Executor', () => { ...HOG_EXAMPLES.input_printer, ...HOG_INPUTS_EXAMPLES.secret_inputs, }) + const invocation = createInvocation(fn) + const result = executor.execute(invocation) - mockFunctionManager.getTeamHogFunctions.mockReturnValue([fn]) - - const result = executor.executeFunction( - createHogExecutionGlobals({ groups: {} }), - fn - ) as HogFunctionInvocationResult expect(result.logs.map((x) => x.message)).toMatchInlineSnapshot(` Array [ "Executing function", @@ -118,120 +121,87 @@ describe('Hog Executor', () => { }) it('queues up an async function call', () => { - const globals = createHogExecutionGlobals({ groups: {} }) - const results = executor - .findMatchingFunctions(createHogExecutionGlobals({ groups: {} })) - .matchingFunctions.map((x) => executor.executeFunction(globals, x) as HogFunctionInvocationResult) - expect(results[0]).toMatchObject({ - invocation: { - id: results[0].invocation.id, - teamId: 1, - hogFunctionId: hogFunction.id, - vmState: expect.any(Object), - globals: { - project: { id: 1, name: 'test', url: 'http://localhost:8000/projects/1' }, - event: { - uuid: 'uuid', - name: 'test', - distinct_id: 'distinct_id', - url: 'http://localhost:8000/events/1', - properties: { $lib_version: '1.2.3' }, - timestamp: '2024-06-07T12:00:00.000Z', - }, - source: { - name: 'Test hog function', - url: `http://localhost:8000/projects/1/pipeline/destinations/hog-${hogFunction.id}/configuration/`, - }, - }, - timings: [ - { - kind: 'hog', - duration_ms: 0, - }, - ], + const invocation = createInvocation(hogFunction) + const result = executor.execute(invocation) + + expect(result.invocation).toMatchObject({ + queue: 'fetch', + queueParameters: { + url: 'https://example.com/posthog-webhook', + method: 'POST', + headers: { version: 'v=1.2.3' }, }, + }) - asyncFunctionRequest: { - name: 'fetch', - args: [ - 'https://example.com/posthog-webhook', - { - headers: { version: 'v=1.2.3' }, - body: { - event: { - uuid: 'uuid', - name: 'test', - distinct_id: 'distinct_id', - url: 'http://localhost:8000/events/1', - properties: { $lib_version: '1.2.3' }, - timestamp: '2024-06-07T12:00:00.000Z', - }, - groups: {}, - nested: { foo: 'http://localhost:8000/events/1' }, - person: { - uuid: 'uuid', - name: 'test', - url: 'http://localhost:8000/persons/1', - properties: { $lib_version: '1.2.3' }, - }, - event_url: 'http://localhost:8000/events/1-test', - }, - method: 'POST', - }, - ], + expect(JSON.parse(result.invocation.queueParameters!.body)).toEqual({ + event: { + uuid: 'uuid', + name: 'test', + distinct_id: 'distinct_id', + url: 'http://localhost:8000/events/1', + properties: { $lib_version: '1.2.3' }, + timestamp: '2024-06-07T12:00:00.000Z', + }, + groups: {}, + nested: { foo: 'http://localhost:8000/events/1' }, + person: { + uuid: 'uuid', + name: 'test', + url: 'http://localhost:8000/persons/1', + properties: { email: 'test@posthog.com' }, }, + event_url: 'http://localhost:8000/events/1-test', }) }) it('executes the full function in a loop', () => { - const logs: LogEntry[] = [] - const globals = createHogExecutionGlobals({ groups: {} }) - const results = executor - .findMatchingFunctions(createHogExecutionGlobals({ groups: {} })) - .matchingFunctions.map((x) => executor.executeFunction(globals, x) as HogFunctionInvocationResult) - const splicedLogs = results[0].logs.splice(0, 100) - logs.push(...splicedLogs) - - const asyncExecResult = executor.executeAsyncResponse(results[0].invocation, createAsyncFunctionResponse()) - - logs.push(...asyncExecResult.logs) - expect(asyncExecResult.error).toBeUndefined() - expect(asyncExecResult.finished).toBe(true) - expect(logs.map((log) => log.message)).toEqual([ - 'Executing function', - "Suspending function due to async function call 'fetch'. Payload: 1814 bytes", - 'Resuming function', - 'Fetch response:, {"status":200,"body":"success"}', - 'Function completed in 100ms. Sync: 0ms. Mem: 746 bytes. Ops: 22.', - ]) + const result = executor.execute(createInvocation(hogFunction)) + const logs = result.logs.splice(0, 100) + + expect(result.finished).toBe(false) + expect(result.invocation.queue).toBe('fetch') + expect(result.invocation.vmState).toBeDefined() + + // Simulate what the callback does + result.invocation.queue = 'hog' + result.invocation.queueParameters = createAsyncFunctionResponse() + + const secondResult = executor.execute(result.invocation) + logs.push(...secondResult.logs) + + expect(secondResult.finished).toBe(true) + expect(secondResult.error).toBeUndefined() + expect(logs.map((log) => log.message)).toMatchInlineSnapshot(` + Array [ + "Executing function", + "Suspending function due to async function call 'fetch'. Payload: 1818 bytes", + "Resuming function", + "Fetch response:, {\\"status\\":200,\\"body\\":\\"success\\"}", + "Function completed in 100ms. Sync: 0ms. Mem: 750 bytes. Ops: 22.", + ] + `) }) it('parses the responses body if a string', () => { - const logs: LogEntry[] = [] - const globals = createHogExecutionGlobals({ groups: {} }) - const results = executor - .findMatchingFunctions(createHogExecutionGlobals({ groups: {} })) - .matchingFunctions.map((x) => executor.executeFunction(globals, x) as HogFunctionInvocationResult) - const splicedLogs = results[0].logs.splice(0, 100) - logs.push(...splicedLogs) - - const asyncExecResult = executor.executeAsyncResponse( - results[0].invocation, - createAsyncFunctionResponse({ - body: JSON.stringify({ foo: 'bar' }), - }) - ) + const result = executor.execute(createInvocation(hogFunction)) + const logs = result.logs.splice(0, 100) + result.invocation.queue = 'hog' + result.invocation.queueParameters = createAsyncFunctionResponse({ + body: JSON.stringify({ foo: 'bar' }), + }) - logs.push(...asyncExecResult.logs) - expect(asyncExecResult.error).toBeUndefined() - expect(asyncExecResult.finished).toBe(true) - expect(logs.map((log) => log.message)).toEqual([ - 'Executing function', - "Suspending function due to async function call 'fetch'. Payload: 1814 bytes", - 'Resuming function', - 'Fetch response:, {"status":200,"body":{"foo":"bar"}}', // The body is parsed - 'Function completed in 100ms. Sync: 0ms. Mem: 746 bytes. Ops: 22.', - ]) + const secondResult = executor.execute(result.invocation) + logs.push(...secondResult.logs) + + expect(logs.map((log) => log.message)).toMatchInlineSnapshot(` + Array [ + "Executing function", + "Suspending function due to async function call 'fetch'. Payload: 1818 bytes", + "Resuming function", + "Fetch response:, {\\"status\\":200,\\"body\\":{\\"foo\\":\\"bar\\"}}", + "Function completed in 100ms. Sync: 0ms. Mem: 750 bytes. Ops: 22.", + ] + `) }) }) @@ -265,7 +235,7 @@ describe('Hog Executor', () => { }) }) - describe('async function responses', () => { + describe('async functions', () => { it('prevents large looped fetch calls', () => { const fn = createHogFunction({ ...HOG_EXAMPLES.recursive_fetch, @@ -273,27 +243,26 @@ describe('Hog Executor', () => { ...HOG_FILTERS_EXAMPLES.no_filters, }) - mockFunctionManager.getTeamHogFunctions.mockReturnValue([fn]) - // Simulate the recusive loop - const globals = createHogExecutionGlobals({ groups: {} }) - const results = executor - .findMatchingFunctions(createHogExecutionGlobals({ groups: {} })) - .matchingFunctions.map((x) => executor.executeFunction(globals, x) as HogFunctionInvocationResult) - expect(results).toHaveLength(1) - - // Run the result one time simulating a successful fetch - const asyncResult1 = executor.executeAsyncResponse(results[0].invocation, createAsyncFunctionResponse()) - expect(asyncResult1.finished).toBe(false) - expect(asyncResult1.error).toBe(undefined) - expect(asyncResult1.asyncFunctionRequest).toBeDefined() - - // Run the result one more time simulating a second successful fetch - const asyncResult2 = executor.executeAsyncResponse(asyncResult1.invocation, createAsyncFunctionResponse()) + const invocation = createInvocation(fn) + + // Start the function + const result1 = executor.execute(invocation) + // Run the response one time simulating a successful fetch + result1.invocation.queue = 'hog' + result1.invocation.queueParameters = createAsyncFunctionResponse() + const result2 = executor.execute(result1.invocation) + expect(result2.finished).toBe(false) + expect(result2.error).toBe(undefined) + expect(result2.invocation.queue).toBe('fetch') + // This time we should see an error for hitting the loop limit - expect(asyncResult2.finished).toBe(false) - expect(asyncResult2.error).toEqual('Exceeded maximum number of async steps: 2') - expect(asyncResult2.logs.map((log) => log.message)).toEqual([ + result2.invocation.queue = 'hog' + result2.invocation.queueParameters = createAsyncFunctionResponse() + const result3 = executor.execute(result1.invocation) + expect(result3.finished).toBe(false) + expect(result3.error).toEqual('Exceeded maximum number of async steps: 2') + expect(result3.logs.map((log) => log.message)).toEqual([ 'Resuming function', 'Error executing function: HogVMException: Exceeded maximum number of async steps: 2', ]) @@ -314,14 +283,10 @@ describe('Hog Executor', () => { mockFunctionManager.getTeamHogFunctions.mockReturnValue([fn]) - const globals = createHogExecutionGlobals({ groups: {} }) - const results = executor - .findMatchingFunctions(createHogExecutionGlobals({ groups: {} })) - .matchingFunctions.map((x) => executor.executeFunction(globals, x) as HogFunctionInvocationResult) - expect(results).toHaveLength(1) - expect(results[0].error).toContain('Execution timed out after 0.1 seconds. Performed ') + const result = executor.execute(createInvocation(fn)) + expect(result.error).toContain('Execution timed out after 0.1 seconds. Performed ') - expect(results[0].logs.map((log) => log.message)).toEqual([ + expect(result.logs.map((log) => log.message)).toEqual([ 'Executing function', 'I AM FIBONACCI', 'I AM FIBONACCI', @@ -348,8 +313,7 @@ describe('Hog Executor', () => { ...HOG_FILTERS_EXAMPLES.no_filters, }) - const globals = createHogExecutionGlobals({ groups: {} }) - const result = executor.executeFunction(globals, fn) + const result = executor.execute(createInvocation(fn)) expect(result?.capturedPostHogEvents).toEqual([ { distinct_id: 'distinct_id', @@ -378,7 +342,7 @@ describe('Hog Executor', () => { }, }, } as any) - const result = executor.executeFunction(globals, fn) + const result = executor.execute(createInvocation(fn, globals)) expect(result?.capturedPostHogEvents).toEqual([]) expect(result?.logs[1].message).toMatchInlineSnapshot( `"postHogCapture was called from an event that already executed this function. To prevent infinite loops, the event was not captured."` diff --git a/plugin-server/tests/cdp/hog-function-manager.test.ts b/plugin-server/tests/cdp/hog-function-manager.test.ts index 2ed100711a74d..1624999c93058 100644 --- a/plugin-server/tests/cdp/hog-function-manager.test.ts +++ b/plugin-server/tests/cdp/hog-function-manager.test.ts @@ -93,7 +93,7 @@ describe('HogFunctionManager', () => { team_id: teamId1, name: 'Test Hog Function team 1', enabled: true, - bytecode: null, + bytecode: {}, filters: null, inputs_schema: [ { diff --git a/plugin-server/tests/cdp/hog-masker.test.ts b/plugin-server/tests/cdp/hog-masker.test.ts index 7e667e64b9141..df05043290e63 100644 --- a/plugin-server/tests/cdp/hog-masker.test.ts +++ b/plugin-server/tests/cdp/hog-masker.test.ts @@ -10,7 +10,7 @@ import { Hub } from '../../src/types' import { closeHub, createHub } from '../../src/utils/db/hub' import { delay } from '../../src/utils/utils' import { HOG_MASK_EXAMPLES } from './examples' -import { createHogExecutionGlobals, createHogFunction } from './fixtures' +import { createHogExecutionGlobals, createHogFunction, createInvocation } from './fixtures' import { deleteKeysWithPrefix } from './helpers/redis' const mockNow: jest.Mock = require('../../src/utils/now').now as any @@ -50,7 +50,7 @@ describe('HogMasker', () => { it('should return all functions without masks', async () => { const normalFunction = createHogFunction({}) - const invocations = [{ globals: createHogExecutionGlobals(), hogFunction: normalFunction }] + const invocations = [createInvocation(normalFunction)] const res = await masker.filterByMasking(invocations) expect(res.notMasked).toHaveLength(1) @@ -61,21 +61,27 @@ describe('HogMasker', () => { const functionWithAllMasking = createHogFunction({ ...HOG_MASK_EXAMPLES.all, }) - const globals1 = createHogExecutionGlobals({ event: { uuid: '1' } as any }) - const globals2 = createHogExecutionGlobals({ event: { uuid: '2' } as any }) - const globals3 = createHogExecutionGlobals({ event: { uuid: '3' } as any }) - const invocations = [ - { globals: globals1, hogFunction: functionWithAllMasking }, - { globals: globals2, hogFunction: functionWithAllMasking }, - { globals: globals3, hogFunction: functionWithAllMasking }, - ] + + const invocation1 = createInvocation( + functionWithAllMasking, + createHogExecutionGlobals({ event: { uuid: '1' } as any }) + ) + const invocation2 = createInvocation( + functionWithAllMasking, + createHogExecutionGlobals({ event: { uuid: '2' } as any }) + ) + const invocation3 = createInvocation( + functionWithAllMasking, + createHogExecutionGlobals({ event: { uuid: '3' } as any }) + ) + const invocations = [invocation1, invocation2, invocation3] const res = await masker.filterByMasking(invocations) expect(res.notMasked).toHaveLength(1) expect(res.masked).toHaveLength(2) - expect(res.notMasked[0].globals).toEqual(globals1) - expect(res.masked[0].globals).toEqual(globals2) - expect(res.masked[1].globals).toEqual(globals3) + expect(res.notMasked[0].globals).toEqual(invocation1.globals) + expect(res.masked[0].globals).toEqual(invocation2.globals) + expect(res.masked[1].globals).toEqual(invocation3.globals) const res2 = await masker.filterByMasking(invocations) expect(res2.notMasked).toHaveLength(0) @@ -92,9 +98,9 @@ describe('HogMasker', () => { const functionWithNoMasking = createHogFunction({}) const globals = createHogExecutionGlobals() const invocations = [ - { globals, hogFunction: functionWithAllMasking }, - { globals, hogFunction: functionWithAllMasking2 }, - { globals, hogFunction: functionWithNoMasking }, + createInvocation(functionWithAllMasking, globals), + createInvocation(functionWithAllMasking2, globals), + createInvocation(functionWithNoMasking, globals), ] const res = await masker.filterByMasking(invocations) @@ -129,7 +135,7 @@ describe('HogMasker', () => { }) }) it('should re-allow after the ttl expires', async () => { - const invocations = [{ globals: createHogExecutionGlobals(), hogFunction: hogFunctionAll }] + const invocations = [createInvocation(hogFunctionAll)] expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(1) expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(0) expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(0) @@ -143,10 +149,10 @@ describe('HogMasker', () => { const globalsPerson2 = createHogExecutionGlobals({ person: { uuid: '2' } as any }) const invocations = [ - { globals: globalsPerson1, hogFunction: hogFunctionPerson }, - { globals: globalsPerson1, hogFunction: hogFunctionAll }, - { globals: globalsPerson2, hogFunction: hogFunctionPerson }, - { globals: globalsPerson2, hogFunction: hogFunctionAll }, + createInvocation(hogFunctionPerson, globalsPerson1), + createInvocation(hogFunctionAll, globalsPerson1), + createInvocation(hogFunctionPerson, globalsPerson2), + createInvocation(hogFunctionAll, globalsPerson2), ] const res = await masker.filterByMasking(invocations) expect(res.masked.length).toEqual(1) @@ -159,24 +165,24 @@ describe('HogMasker', () => { it('should mask until threshold passed', async () => { hogFunctionAll.masking!.threshold = 5 - const invocations = [{ globals: createHogExecutionGlobals(), hogFunction: hogFunctionAll }] + const invocation = createInvocation(hogFunctionAll) // First one goes through - expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(1) + expect((await masker.filterByMasking([invocation])).notMasked).toHaveLength(1) // Next 4 should be masked - expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(0) - expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(0) - expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(0) - expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(0) + expect((await masker.filterByMasking([invocation])).notMasked).toHaveLength(0) + expect((await masker.filterByMasking([invocation])).notMasked).toHaveLength(0) + expect((await masker.filterByMasking([invocation])).notMasked).toHaveLength(0) + expect((await masker.filterByMasking([invocation])).notMasked).toHaveLength(0) // Now we have hit the threshold so it should not be masked - expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(1) + expect((await masker.filterByMasking([invocation])).notMasked).toHaveLength(1) // Next 4 should be masked - expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(0) - expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(0) - expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(0) - expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(0) + expect((await masker.filterByMasking([invocation])).notMasked).toHaveLength(0) + expect((await masker.filterByMasking([invocation])).notMasked).toHaveLength(0) + expect((await masker.filterByMasking([invocation])).notMasked).toHaveLength(0) + expect((await masker.filterByMasking([invocation])).notMasked).toHaveLength(0) // Again the Nth one shouldn't be masked - expect((await masker.filterByMasking(invocations)).notMasked).toHaveLength(1) + expect((await masker.filterByMasking([invocation])).notMasked).toHaveLength(1) }) it('should mask threshold based in a batch', async () => { @@ -185,21 +191,11 @@ describe('HogMasker', () => { // If we have 10 invocations in a batch then we should have 2 invocations that are not masked expect( - ( - await masker.filterByMasking( - Array(10).fill({ globals: createHogExecutionGlobals(), hogFunction: hogFunctionAll }) - ) - ).notMasked + (await masker.filterByMasking(Array(10).fill(createInvocation(hogFunctionAll)))).notMasked ).toHaveLength(2) // Next one should cross the threshold - expect( - ( - await masker.filterByMasking([ - { globals: createHogExecutionGlobals(), hogFunction: hogFunctionAll }, - ]) - ).notMasked - ).toHaveLength(1) + expect((await masker.filterByMasking([createInvocation(hogFunctionAll)])).notMasked).toHaveLength(1) }) }) }) diff --git a/plugin-server/tests/cdp/hog-watcher.test.ts b/plugin-server/tests/cdp/hog-watcher.test.ts index dfcf4128c8187..4cbc48cec65b8 100644 --- a/plugin-server/tests/cdp/hog-watcher.test.ts +++ b/plugin-server/tests/cdp/hog-watcher.test.ts @@ -9,6 +9,7 @@ import { HogFunctionInvocationResult } from '../../src/cdp/types' import { Hub } from '../../src/types' import { closeHub, createHub } from '../../src/utils/db/hub' import { delay } from '../../src/utils/utils' +import { createInvocation } from './fixtures' import { deleteKeysWithPrefix } from './helpers/redis' const mockNow: jest.Mock = require('../../src/utils/now').now as any @@ -21,10 +22,9 @@ const createResult = (options: { }): HogFunctionInvocationResult => { return { invocation: { + ...createInvocation({ id: options.id }), id: 'invocation-id', teamId: 2, - hogFunctionId: options.id, - globals: {} as any, timings: [ { kind: 'async_function', diff --git a/plugin-server/tests/cdp/utils.test.ts b/plugin-server/tests/cdp/utils.test.ts index 9e1cf795e36b5..6640662b2e79e 100644 --- a/plugin-server/tests/cdp/utils.test.ts +++ b/plugin-server/tests/cdp/utils.test.ts @@ -2,7 +2,7 @@ import { DateTime } from 'luxon' import { HogFunctionInvocationResult } from '../../src/cdp/types' import { gzipObject, prepareLogEntriesForClickhouse, unGzipObject } from '../../src/cdp/utils' -import { insertHogFunction as _insertHogFunction } from './fixtures' +import { createHogFunction, createInvocation, insertHogFunction as _insertHogFunction } from './fixtures' describe('Utils', () => { describe('gzip compressions', () => { @@ -19,12 +19,8 @@ describe('Utils', () => { const startTime = DateTime.fromMillis(1620000000000) const example: HogFunctionInvocationResult = { invocation: { + ...createInvocation(createHogFunction({ id: 'hog-1' })), id: 'inv-1', - globals: {} as any, - teamId: 1, - hogFunctionId: 'hog-1', - vmState: undefined, - timings: [], }, finished: false, logs: [ diff --git a/plugin-server/tests/server.test.ts b/plugin-server/tests/server.test.ts index 009416547b36d..3b8f7d58dda29 100644 --- a/plugin-server/tests/server.test.ts +++ b/plugin-server/tests/server.test.ts @@ -96,7 +96,6 @@ describe('server', () => { processAsyncWebhooksHandlers: true, cdpProcessedEvents: true, cdpFunctionCallbacks: true, - cdpFunctionOverflow: true, cdpCyclotronWorker: true, syncInlinePlugins: true, }