From 8f4df4598470494a00540c479e6c2f798c71aea7 Mon Sep 17 00:00:00 2001 From: Brett Hoerner Date: Tue, 16 Jul 2024 11:12:48 -0600 Subject: [PATCH] feat: make rusty-hook Hog-aware, sending responses back via kafka (#23619) Co-authored-by: Ben White --- .vscode/launch.json | 4 +- bin/start | 1 + .../src/cdp/async-function-executor.ts | 15 +- plugin-server/src/config/config.ts | 2 + plugin-server/src/types.ts | 2 + plugin-server/src/worker/rusty-hook.ts | 52 +- .../cdp/cdp-processed-events-consumer.test.ts | 9 +- rust/Cargo.lock | 743 +++++++++++++++++- rust/Cargo.toml | 5 +- rust/bin/start-hoghooks | 25 + rust/hook-api/src/config.rs | 5 +- rust/hook-api/src/handlers/app.rs | 28 +- rust/hook-api/src/handlers/webhook.rs | 258 +++++- rust/hook-api/src/main.rs | 7 +- rust/hook-common/Cargo.toml | 3 + rust/hook-common/src/config.rs | 22 + .../src/kafka_producer.rs | 0 rust/hook-common/src/lib.rs | 3 + rust/hook-common/src/test.rs | 33 + rust/hook-janitor/src/config.rs | 30 +- rust/hook-janitor/src/main.rs | 6 +- rust/hook-janitor/src/webhooks.rs | 94 +-- rust/hook-worker/Cargo.toml | 5 + rust/hook-worker/src/config.rs | 13 +- rust/hook-worker/src/main.rs | 11 + rust/hook-worker/src/worker.rs | 423 ++++++++-- 26 files changed, 1622 insertions(+), 177 deletions(-) create mode 100755 rust/bin/start-hoghooks create mode 100644 rust/hook-common/src/config.rs rename rust/{hook-janitor => hook-common}/src/kafka_producer.rs (100%) create mode 100644 rust/hook-common/src/test.rs diff --git a/.vscode/launch.json b/.vscode/launch.json index 07424fab49cf2..3e1e1c075a362 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -105,7 +105,9 @@ "DATABASE_URL": "postgres://posthog:posthog@localhost:5432/posthog", "KAFKA_HOSTS": "localhost:9092", "WORKER_CONCURRENCY": "2", - "OBJECT_STORAGE_ENABLED": "True" + "OBJECT_STORAGE_ENABLED": "True", + "HOG_HOOK_URL": "http://localhost:3300/hoghook", + "CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS": "*" }, "presentation": { "group": "main" diff --git a/bin/start b/bin/start index 38f83accb3968..171656ed0e3ec 100755 --- a/bin/start +++ b/bin/start @@ -7,6 +7,7 @@ trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM EXIT export DEBUG=${DEBUG:-1} export SKIP_SERVICE_VERSION_REQUIREMENTS=1 export BILLING_SERVICE_URL=${BILLING_SERVICE_URL:-https://billing.dev.posthog.dev} +export HOG_HOOK_URL=${HOG_HOOK_URL:-http://localhost:3300/hoghook} service_warning() { echo -e "\033[0;31m$1 isn't ready. You can run the stack with:\ndocker compose -f docker-compose.dev.yml up\nIf you have already ran that, just make sure that services are starting properly, and sit back.\nWaiting for $1 to start...\033[0m" diff --git a/plugin-server/src/cdp/async-function-executor.ts b/plugin-server/src/cdp/async-function-executor.ts index e90782bf4bda2..8c221cde25ff6 100644 --- a/plugin-server/src/cdp/async-function-executor.ts +++ b/plugin-server/src/cdp/async-function-executor.ts @@ -1,4 +1,5 @@ -import { PluginsServerConfig } from '../types' +import { buildIntegerMatcher } from '../config/config' +import { PluginsServerConfig, ValueMatcher } from '../types' import { trackedFetch } from '../utils/fetch' import { status } from '../utils/status' import { RustyHook } from '../worker/rusty-hook' @@ -9,7 +10,11 @@ export type AsyncFunctionExecutorOptions = { } export class AsyncFunctionExecutor { - constructor(private serverConfig: PluginsServerConfig, private rustyHook: RustyHook) {} + hogHookEnabledForTeams: ValueMatcher + + constructor(private serverConfig: PluginsServerConfig, private rustyHook: RustyHook) { + this.hogHookEnabledForTeams = buildIntegerMatcher(serverConfig.CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS, true) + } async execute( request: HogFunctionInvocationResult, @@ -74,8 +79,10 @@ export class AsyncFunctionExecutor { // Finally overwrite the args with the sanitized ones request.asyncFunctionRequest.args = [url, { method, headers, body }] - if (!options?.sync === false) { - // TODO: Add rusty hook support + // If the caller hasn't forced it to be synchronous and the team has the rustyhook enabled, enqueue it + if (!options?.sync && this.hogHookEnabledForTeams(request.teamId)) { + await this.rustyHook.enqueueForHog(request) + return } status.info('🦔', `[HogExecutor] Webhook not sent via rustyhook, sending directly instead`) diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index 17feeaeb1b2c9..f0a646e47d92e 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -133,6 +133,7 @@ export function getDefaultConfig(): PluginsServerConfig { RUSTY_HOOK_FOR_TEAMS: '', RUSTY_HOOK_ROLLOUT_PERCENTAGE: 0, RUSTY_HOOK_URL: '', + HOG_HOOK_URL: '', CAPTURE_CONFIG_REDIS_HOST: null, STARTUP_PROFILE_DURATION_SECONDS: 300, // 5 minutes @@ -180,6 +181,7 @@ export function getDefaultConfig(): PluginsServerConfig { CDP_WATCHER_MIN_OBSERVATIONS: 3, CDP_WATCHER_OVERFLOW_RATING_THRESHOLD: 0.8, CDP_WATCHER_DISABLED_RATING_THRESHOLD: 0.5, + CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS: '', } } diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 29672a19db6fc..b0281437d197e 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -102,6 +102,7 @@ export type CdpConfig = { CDP_WATCHER_MIN_OBSERVATIONS: number CDP_WATCHER_OVERFLOW_RATING_THRESHOLD: number CDP_WATCHER_DISABLED_RATING_THRESHOLD: number + CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS: string } export interface PluginsServerConfig extends CdpConfig { @@ -219,6 +220,7 @@ export interface PluginsServerConfig extends CdpConfig { RUSTY_HOOK_FOR_TEAMS: string RUSTY_HOOK_ROLLOUT_PERCENTAGE: number RUSTY_HOOK_URL: string + HOG_HOOK_URL: string SKIP_UPDATE_EVENT_AND_PROPERTIES_STEP: boolean PIPELINE_STEP_STALLED_LOG_TIMEOUT: number CAPTURE_CONFIG_REDIS_HOST: string | null // Redis cluster to use to coordinate with capture (overflow, routing) diff --git a/plugin-server/src/worker/rusty-hook.ts b/plugin-server/src/worker/rusty-hook.ts index a4d1c6c6b2d81..85a32964c10a6 100644 --- a/plugin-server/src/worker/rusty-hook.ts +++ b/plugin-server/src/worker/rusty-hook.ts @@ -2,6 +2,7 @@ import { Webhook } from '@posthog/plugin-scaffold' import * as Sentry from '@sentry/node' import fetch from 'node-fetch' +import { HogFunctionInvocationResult } from '../cdp/types' import { buildIntegerMatcher } from '../config/config' import { PluginsServerConfig, ValueMatcher } from '../types' import { isProdEnv } from '../utils/env-utils' @@ -29,7 +30,11 @@ export class RustyHook { constructor( private serverConfig: Pick< PluginsServerConfig, - 'RUSTY_HOOK_URL' | 'RUSTY_HOOK_FOR_TEAMS' | 'RUSTY_HOOK_ROLLOUT_PERCENTAGE' | 'EXTERNAL_REQUEST_TIMEOUT_MS' + | 'RUSTY_HOOK_URL' + | 'HOG_HOOK_URL' + | 'RUSTY_HOOK_FOR_TEAMS' + | 'RUSTY_HOOK_ROLLOUT_PERCENTAGE' + | 'EXTERNAL_REQUEST_TIMEOUT_MS' > ) { this.enabledForTeams = buildIntegerMatcher(serverConfig.RUSTY_HOOK_FOR_TEAMS, true) @@ -122,4 +127,49 @@ export class RustyHook { return true } + + public async enqueueForHog(payload: HogFunctionInvocationResult): Promise { + // This is a temporary copy of `enqueueIfEnabledForTeam` above for Hog fetches because the + // API differs. It will likely be replaced with a Kafka topic soon. + + const body = JSON.stringify(payload) + + // We attempt to enqueue into the rusty-hook service until we succeed. This is deliberatly + // designed to block up the consumer if rusty-hook is down or if we deploy code that + // sends malformed requests. The entire purpose of rusty-hook is to reliably deliver webhooks, + // so we'd rather leave items in the Kafka topic until we manage to get them into rusty-hook. + let attempt = 0 + while (true) { + try { + attempt += 1 + const response = await fetch(this.serverConfig.HOG_HOOK_URL, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body, + + // Sure, it's not an external request, but we should have a timeout and this is as + // good as any. + timeout: this.serverConfig.EXTERNAL_REQUEST_TIMEOUT_MS, + }) + + if (response.ok) { + // Success, exit the loop. + break + } + + // Throw to unify error handling below. + throw new Error( + `rusty-hook for Hog returned ${response.status} ${response.statusText}: ${await response.text()}` + ) + } catch (error) { + status.error('🔴', 'Webhook enqueue to rusty-hook for Hog failed', { error, attempt }) + Sentry.captureException(error) + } + + const delayMs = Math.min(2 ** (attempt - 1) * RUSTY_HOOK_BASE_DELAY_MS, MAX_RUSTY_HOOK_DELAY_MS) + await sleep(delayMs) + } + + return true + } } diff --git a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts index 7acc90f65042b..0050446c026a9 100644 --- a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts +++ b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts @@ -200,7 +200,10 @@ describe('CDP Processed Events Consuner', () => { }, }) - expect(decodeKafkaMessage(mockProducer.produce.mock.calls[2][0])).toEqual({ + const msg = decodeKafkaMessage(mockProducer.produce.mock.calls[2][0]) + // Parse body so it can match by object equality rather than exact string equality + msg.value.asyncFunctionRequest.args[1].body = JSON.parse(msg.value.asyncFunctionRequest.args[1].body) + expect(msg).toEqual({ key: expect.any(String), topic: 'cdp_function_callbacks_test', value: { @@ -225,7 +228,7 @@ describe('CDP Processed Events Consuner', () => { 'https://example.com/posthog-webhook', { headers: { version: 'v=1.0.0' }, - body: JSON.stringify({ + body: { event: { uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', name: '$pageview', @@ -241,7 +244,7 @@ describe('CDP Processed Events Consuner', () => { person: null, event_url: 'http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null-test', - }), + }, method: 'POST', }, ], diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 804ab47416080..634f682cba98d 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -66,6 +66,15 @@ version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca" +[[package]] +name = "ascii-canvas" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8824ecca2e851cec16968d54a01dd372ef8f95b244fb84b84e70128be347c3c6" +dependencies = [ + "term", +] + [[package]] name = "assert-json-diff" version = "2.0.2" @@ -76,6 +85,198 @@ dependencies = [ "serde_json", ] +[[package]] +name = "async-attributes" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3203e79f4dd9bdda415ed03cf14dae5a2bf775c683a00f94e9cd1faf0f596e5" +dependencies = [ + "quote", + "syn 1.0.109", +] + +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener 2.5.3", + "futures-core", +] + +[[package]] +name = "async-channel" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-executor" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8828ec6e544c02b0d6691d21ed9f9218d0384a82542855073c2a3f58304aaf0" +dependencies = [ + "async-task", + "concurrent-queue", + "fastrand 2.0.1", + "futures-lite 2.3.0", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" +dependencies = [ + "async-channel 2.3.1", + "async-executor", + "async-io 2.3.3", + "async-lock 3.4.0", + "blocking", + "futures-lite 2.3.0", + "once_cell", +] + +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock 2.8.0", + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-lite 1.13.0", + "log", + "parking", + "polling 2.8.0", + "rustix 0.37.27", + "slab", + "socket2 0.4.10", + "waker-fn", +] + +[[package]] +name = "async-io" +version = "2.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d6baa8f0178795da0e71bc42c9e5d13261aac7ee549853162e66a241ba17964" +dependencies = [ + "async-lock 3.4.0", + "cfg-if", + "concurrent-queue", + "futures-io", + "futures-lite 2.3.0", + "parking", + "polling 3.7.2", + "rustix 0.38.31", + "slab", + "tracing", + "windows-sys 0.52.0", +] + +[[package]] +name = "async-lock" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" +dependencies = [ + "event-listener 2.5.3", +] + +[[package]] +name = "async-lock" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" +dependencies = [ + "event-listener 5.3.1", + "event-listener-strategy", + "pin-project-lite", +] + +[[package]] +name = "async-object-pool" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aeb901c30ebc2fc4ab46395bbfbdba9542c16559d853645d75190c3056caf3bc" +dependencies = [ + "async-std", +] + +[[package]] +name = "async-process" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea6438ba0a08d81529c69b36700fa2f95837bfe3e776ab39cde9c14d9149da88" +dependencies = [ + "async-io 1.13.0", + "async-lock 2.8.0", + "async-signal", + "blocking", + "cfg-if", + "event-listener 3.1.0", + "futures-lite 1.13.0", + "rustix 0.38.31", + "windows-sys 0.48.0", +] + +[[package]] +name = "async-signal" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "794f185324c2f00e771cd9f1ae8b5ac68be2ca7abb129a87afd6e86d228bc54d" +dependencies = [ + "async-io 2.3.3", + "async-lock 3.4.0", + "atomic-waker", + "cfg-if", + "futures-core", + "futures-io", + "rustix 0.38.31", + "signal-hook-registry", + "slab", + "windows-sys 0.52.0", +] + +[[package]] +name = "async-std" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" +dependencies = [ + "async-attributes", + "async-channel 1.9.0", + "async-global-executor", + "async-io 1.13.0", + "async-lock 2.8.0", + "async-process", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite 1.13.0", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -98,6 +299,12 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + [[package]] name = "async-trait" version = "0.1.77" @@ -124,6 +331,12 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba" +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "atomic-write-file" version = "0.1.2" @@ -314,6 +527,32 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "basic-cookies" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67bd8fd42c16bdb08688243dc5f0cc117a3ca9efeeaba3a345a18a6159ad96f7" +dependencies = [ + "lalrpop", + "lalrpop-util", + "regex", +] + +[[package]] +name = "bit-set" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0700ddab506f33b20a03b13996eccd309a48e5ff77d0d95926aa0210fb4e95f1" +dependencies = [ + "bit-vec", +] + +[[package]] +name = "bit-vec" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" + [[package]] name = "bitflags" version = "1.3.2" @@ -338,6 +577,19 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "703f41c54fc768e63e091340b424302bb1c29ef4aa0c7f10fe849dfb114d29ea" +dependencies = [ + "async-channel 2.3.1", + "async-task", + "futures-io", + "futures-lite 2.3.0", + "piper", +] + [[package]] name = "bumpalo" version = "3.14.0" @@ -448,6 +700,15 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -542,6 +803,12 @@ version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" +[[package]] +name = "crunchy" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" + [[package]] name = "crypto-common" version = "0.1.6" @@ -598,6 +865,27 @@ dependencies = [ "subtle", ] +[[package]] +name = "dirs-next" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b98cf8ebf19c3d1b223e151f99a4f9f0690dca41414773390fc824184ac833e1" +dependencies = [ + "cfg-if", + "dirs-sys-next", +] + +[[package]] +name = "dirs-sys-next" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d" +dependencies = [ + "libc", + "redox_users", + "winapi", +] + [[package]] name = "dotenvy" version = "0.15.7" @@ -613,6 +901,15 @@ dependencies = [ "serde", ] +[[package]] +name = "ena" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d248bdd43ce613d87415282f69b9bb99d947d290b10962dd6c56233312c2ad5" +dependencies = [ + "log", +] + [[package]] name = "encoding_rs" version = "0.8.33" @@ -675,6 +972,38 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +[[package]] +name = "event-listener" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d93877bcde0eb80ca09131a08d23f0a5c18a620b01db137dba666d18cd9b30c2" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener" +version = "5.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f214dc438f977e6d4e3500aaa277f5ad94ca83fbbd9b1a15713ce2344ccc5a1" +dependencies = [ + "event-listener 5.3.1", + "pin-project-lite", +] + [[package]] name = "eyre" version = "0.6.12" @@ -685,6 +1014,15 @@ dependencies = [ "once_cell", ] +[[package]] +name = "fastrand" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + [[package]] name = "fastrand" version = "2.0.1" @@ -723,6 +1061,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fcfdc7a0362c9f4444381a9e697c79d435fe65b52a37466fc2c1184cee9edc6" +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "flate2" version = "1.0.28" @@ -843,6 +1187,34 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand 1.9.0", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + +[[package]] +name = "futures-lite" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52527eb5074e35e9339c6b4e8d12600c7128b68fb25dcb9fa9dec18f7c25f3a5" +dependencies = [ + "fastrand 2.0.1", + "futures-core", + "futures-io", + "parking", + "pin-project-lite", +] + [[package]] name = "futures-macro" version = "0.3.30" @@ -923,6 +1295,18 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "gloo-timers" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "governor" version = "0.5.1" @@ -1038,6 +1422,12 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "hermit-abi" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbf6a919d6cf397374f7dfeeea91d974c7c0a7221d0d0f4f20d859d329e53fcc" + [[package]] name = "hex" version = "0.4.3" @@ -1100,9 +1490,12 @@ dependencies = [ "async-trait", "axum 0.7.5", "chrono", + "envconfig", + "health", "http 1.1.0", "metrics", "metrics-exporter-prometheus", + "rdkafka", "reqwest 0.12.3", "serde", "serde_json", @@ -1147,8 +1540,11 @@ dependencies = [ "health", "hook-common", "http 1.1.0", + "httpmock", "metrics", + "rdkafka", "reqwest 0.12.3", + "serde_json", "sqlx", "thiserror", "time", @@ -1226,6 +1622,34 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "httpmock" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08ec9586ee0910472dec1a1f0f8acf52f0fdde93aea74d70d4a3107b4be0fd5b" +dependencies = [ + "assert-json-diff", + "async-object-pool", + "async-std", + "async-trait", + "base64 0.21.7", + "basic-cookies", + "crossbeam-utils", + "form_urlencoded", + "futures-util", + "hyper 0.14.28", + "lazy_static", + "levenshtein", + "log", + "regex", + "serde", + "serde_json", + "serde_regex", + "similar", + "tokio", + "url", +] + [[package]] name = "hyper" version = "0.14.28" @@ -1392,6 +1816,26 @@ dependencies = [ "hashbrown 0.14.3", ] +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "io-lifetimes" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" +dependencies = [ + "hermit-abi 0.3.9", + "libc", + "windows-sys 0.48.0", +] + [[package]] name = "ipnet" version = "2.9.0" @@ -1404,6 +1848,15 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c397ca3ea05ad509c4ec451fea28b4771236a376ca1c69fd5143aae0cf8f93c4" +[[package]] +name = "itertools" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.12.1" @@ -1428,6 +1881,46 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + +[[package]] +name = "lalrpop" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55cb077ad656299f160924eb2912aa147d7339ea7d69e1b5517326fdcec3c1ca" +dependencies = [ + "ascii-canvas", + "bit-set", + "ena", + "itertools 0.11.0", + "lalrpop-util", + "petgraph", + "pico-args", + "regex", + "regex-syntax 0.8.2", + "string_cache", + "term", + "tiny-keccak", + "unicode-xid", + "walkdir", +] + +[[package]] +name = "lalrpop-util" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "507460a910eb7b32ee961886ff48539633b788a36b65692b95f225b844c82553" +dependencies = [ + "regex-automata 0.4.5", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -1437,6 +1930,12 @@ dependencies = [ "spin 0.5.2", ] +[[package]] +name = "levenshtein" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db13adb97ab515a3691f56e4dbab09283d0b86cb45abd991d8634a9d6f501760" + [[package]] name = "libc" version = "0.2.153" @@ -1449,6 +1948,16 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "libredox" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" +dependencies = [ + "bitflags 2.4.2", + "libc", +] + [[package]] name = "libsqlite3-sys" version = "0.27.0" @@ -1472,6 +1981,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "linux-raw-sys" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" + [[package]] name = "linux-raw-sys" version = "0.4.13" @@ -1493,6 +2008,9 @@ name = "log" version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +dependencies = [ + "value-bag", +] [[package]] name = "mach" @@ -1640,6 +2158,12 @@ dependencies = [ "tempfile", ] +[[package]] +name = "new_debug_unreachable" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086" + [[package]] name = "nix" version = "0.27.1" @@ -1758,7 +2282,7 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi", + "hermit-abi 0.3.9", "libc", ] @@ -1931,6 +2455,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "parking" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" + [[package]] name = "parking_lot" version = "0.12.1" @@ -1975,6 +2505,31 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "petgraph" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" +dependencies = [ + "fixedbitset", + "indexmap 2.2.2", +] + +[[package]] +name = "phf_shared" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6796ad771acdc0123d2a88dc428b5e38ef24456743ddb1744ed628f9815c096" +dependencies = [ + "siphasher", +] + +[[package]] +name = "pico-args" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5be167a7af36ee22fe3115051bc51f6e6c7054c9348e28deb4f49bd6f705a315" + [[package]] name = "pin-project" version = "1.1.4" @@ -2007,6 +2562,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1d5c74c9876f070d3e8fd503d748c7d974c3e48da8f41350fa5222ef9b4391" +dependencies = [ + "atomic-waker", + "fastrand 2.0.1", + "futures-io", +] + [[package]] name = "pkcs1" version = "0.7.5" @@ -2034,6 +2600,37 @@ version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2900ede94e305130c13ddd391e0ab7cbaeb783945ae07a279c268cb05109c6cb" +[[package]] +name = "polling" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +dependencies = [ + "autocfg", + "bitflags 1.3.2", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys 0.48.0", +] + +[[package]] +name = "polling" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3ed00ed3fbf728b5816498ecd316d1716eecaced9c0c8d2c5a6740ca214985b" +dependencies = [ + "cfg-if", + "concurrent-queue", + "hermit-abi 0.4.0", + "pin-project-lite", + "rustix 0.38.31", + "tracing", + "windows-sys 0.52.0", +] + [[package]] name = "portable-atomic" version = "1.6.0" @@ -2052,6 +2649,12 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "precomputed-hash" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c" + [[package]] name = "proc-macro-crate" version = "1.3.1" @@ -2088,7 +2691,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19de2de2a00075bf566bee3bd4db014b11587e84184d3f7a791bc17f1a8e9e48" dependencies = [ "anyhow", - "itertools", + "itertools 0.12.1", "proc-macro2", "quote", "syn 2.0.48", @@ -2249,6 +2852,17 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redox_users" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd283d9651eeda4b2a83a43c1c91b266c40fd76ecd39a50a8c630ae69dc72891" +dependencies = [ + "getrandom", + "libredox", + "thiserror", +] + [[package]] name = "regex" version = "1.10.4" @@ -2422,6 +3036,20 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustix" +version = "0.37.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea8ca367a3a01fe35e6943c400addf443c0f57670e6ec51196f71a4b8762dd2" +dependencies = [ + "bitflags 1.3.2", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys 0.3.8", + "windows-sys 0.48.0", +] + [[package]] name = "rustix" version = "0.38.31" @@ -2431,7 +3059,7 @@ dependencies = [ "bitflags 2.4.2", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.4.13", "windows-sys 0.52.0", ] @@ -2494,6 +3122,15 @@ version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.23" @@ -2596,6 +3233,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_regex" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8136f1a4ea815d7eac4101cfd0b16dc0cb5e1fe1b8609dfd728058656b7badf" +dependencies = [ + "regex", + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -2664,6 +3311,18 @@ dependencies = [ "rand_core", ] +[[package]] +name = "similar" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa42c91313f1d05da9b26f267f931cf178d4aba455b4c4622dd7355eb80c6640" + +[[package]] +name = "siphasher" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" + [[package]] name = "sketches-ddsketch" version = "0.2.2" @@ -2736,7 +3395,7 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ce81b7bd7c4493975347ef60d8c7e8b742d4694f4c49f93e0a12ea263938176c" dependencies = [ - "itertools", + "itertools 0.12.1", "nom", "unicode_categories", ] @@ -2769,7 +3428,7 @@ dependencies = [ "crossbeam-queue", "dotenvy", "either", - "event-listener", + "event-listener 2.5.3", "futures-channel", "futures-core", "futures-intrusive", @@ -2947,6 +3606,19 @@ dependencies = [ "uuid", ] +[[package]] +name = "string_cache" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f91138e76242f575eb1d3b38b4f1362f10d3a43f47d182a5b359af488a02293b" +dependencies = [ + "new_debug_unreachable", + "once_cell", + "parking_lot", + "phf_shared", + "precomputed-hash", +] + [[package]] name = "stringprep" version = "0.1.4" @@ -3026,11 +3698,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a365e8cd18e44762ef95d87f284f4b5cd04107fec2ff3052bd6a3e6069669e67" dependencies = [ "cfg-if", - "fastrand", - "rustix", + "fastrand 2.0.1", + "rustix 0.38.31", "windows-sys 0.52.0", ] +[[package]] +name = "term" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59df8ac95d96ff9bede18eb7300b0fda5e5d8d90960e76f8e14ae765eedbf1f" +dependencies = [ + "dirs-next", + "rustversion", + "winapi", +] + [[package]] name = "thiserror" version = "1.0.56" @@ -3092,6 +3775,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinyvec" version = "1.6.0" @@ -3413,6 +4105,12 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1dd624098567895118886609431a7c3b8f516e41d30e0643f03d94592a147e36" +[[package]] +name = "unicode-xid" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" + [[package]] name = "unicode_categories" version = "0.1.1" @@ -3459,6 +4157,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "value-bag" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a84c137d37ab0142f0f2ddfe332651fdbf252e7b7dbb4e67b6c1f1b2e925101" + [[package]] name = "vcpkg" version = "0.2.15" @@ -3471,6 +4175,22 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "waker-fn" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "317211a0dc0ceedd78fb2ca9a44aed3d7b9b26f81870d485c07122b4350673b7" + +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -3619,6 +4339,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d4cc384e1e73b93bafa6fb4f1df8c41695c8a91cf9c4c64358067d15a7b6c6b" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index ea5d041027ad8..58b0b2b1715ce 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -42,10 +42,11 @@ futures = { version = "0.3.29" } governor = { version = "0.5.1", features = ["dashmap"] } http = { version = "1.1.0" } http-body-util = "0.1.0" +httpmock = "0.7.0" metrics = "0.22.0" metrics-exporter-prometheus = "0.14.0" once_cell = "1.18.0" -opentelemetry = { version = "0.22.0", features = ["trace"]} +opentelemetry = { version = "0.22.0", features = ["trace"] } opentelemetry-otlp = "0.15.0" opentelemetry_sdk = { version = "0.22.1", features = ["trace", "rt-tokio"] } rand = "0.8.5" @@ -76,6 +77,6 @@ tower = "0.4.13" tower-http = { version = "0.5.2", features = ["cors", "limit", "trace"] } tracing = "0.1.40" tracing-opentelemetry = "0.23.0" -tracing-subscriber = { version="0.3.18", features = ["env-filter"] } +tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } url = { version = "2.5.0 " } uuid = { version = "1.6.1", features = ["v7", "serde"] } diff --git a/rust/bin/start-hoghooks b/rust/bin/start-hoghooks new file mode 100755 index 0000000000000..ba620aaac5340 --- /dev/null +++ b/rust/bin/start-hoghooks @@ -0,0 +1,25 @@ +#!/bin/bash + +set -ex + +trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM EXIT + +cargo build + +export RUST_LOG=${DEBUG:-debug} +SQLX_QUERY_LEVEL=${SQLX_QUERY_LEVEL:-warn} +export RUST_LOG=$RUST_LOG,sqlx::query=$SQLX_QUERY_LEVEL + +export HOG_MODE=true +DATABASE_NAME=${DEBUG:-hoghooks} +export DATABASE_URL=postgres://posthog:posthog@localhost:5432/$DATABASE_NAME +export ALLOW_INTERNAL_IPS=true + +sqlx database create -D "$DATABASE_URL" +sqlx migrate run -D "$DATABASE_URL" + +./target/debug/hook-api & +./target/debug/hook-worker & +./target/debug/hook-janitor & + +wait \ No newline at end of file diff --git a/rust/hook-api/src/config.rs b/rust/hook-api/src/config.rs index e15f0d3fac77a..fa1bbb3c7e484 100644 --- a/rust/hook-api/src/config.rs +++ b/rust/hook-api/src/config.rs @@ -2,7 +2,7 @@ use envconfig::Envconfig; #[derive(Envconfig)] pub struct Config { - #[envconfig(from = "BIND_HOST", default = "0.0.0.0")] + #[envconfig(from = "BIND_HOST", default = "::")] pub host: String, #[envconfig(from = "BIND_PORT", default = "3300")] @@ -19,6 +19,9 @@ pub struct Config { #[envconfig(default = "5000000")] pub max_body_size: usize, + + #[envconfig(default = "false")] + pub hog_mode: bool, } impl Config { diff --git a/rust/hook-api/src/handlers/app.rs b/rust/hook-api/src/handlers/app.rs index 7cbbc449e424d..1dea37c1bdc5b 100644 --- a/rust/hook-api/src/handlers/app.rs +++ b/rust/hook-api/src/handlers/app.rs @@ -5,17 +5,32 @@ use hook_common::pgqueue::PgQueue; use super::webhook; -pub fn add_routes(router: Router, pg_pool: PgQueue, max_body_size: usize) -> Router { - router +pub fn add_routes( + router: Router, + pg_pool: PgQueue, + hog_mode: bool, + max_body_size: usize, +) -> Router { + let router = router .route("/", routing::get(index)) .route("/_readiness", routing::get(index)) - .route("/_liveness", routing::get(index)) // No async loop for now, just check axum health - .route( + .route("/_liveness", routing::get(index)); // No async loop for now, just check axum health + + if hog_mode { + router.route( + "/hoghook", + routing::post(webhook::post_hoghook) + .with_state(pg_pool) + .layer(RequestBodyLimitLayer::new(max_body_size)), + ) + } else { + router.route( "/webhook", - routing::post(webhook::post) + routing::post(webhook::post_webhook) .with_state(pg_pool) .layer(RequestBodyLimitLayer::new(max_body_size)), ) + } } pub async fn index() -> &'static str { @@ -37,8 +52,9 @@ mod tests { #[sqlx::test(migrations = "../migrations")] async fn index(db: PgPool) { let pg_queue = PgQueue::new_from_pool("test_index", db).await; + let hog_mode = false; - let app = add_routes(Router::new(), pg_queue, 1_000_000); + let app = add_routes(Router::new(), pg_queue, hog_mode, 1_000_000); let response = app .oneshot(Request::builder().uri("/").body(Body::empty()).unwrap()) diff --git a/rust/hook-api/src/handlers/webhook.rs b/rust/hook-api/src/handlers/webhook.rs index 808c94878291b..c0e5f41313005 100644 --- a/rust/hook-api/src/handlers/webhook.rs +++ b/rust/hook-api/src/handlers/webhook.rs @@ -1,11 +1,14 @@ +use std::collections::HashMap; use std::time::Instant; use axum::{extract::State, http::StatusCode, Json}; use hook_common::webhook::{WebhookJobMetadata, WebhookJobParameters}; use serde_derive::Deserialize; +use serde_json::Value; use url::Url; use hook_common::pgqueue::{NewJob, PgQueue}; +use hook_common::webhook::HttpMethod; use serde::Serialize; use tracing::{debug, error}; @@ -29,7 +32,7 @@ fn default_max_attempts() -> u32 { 3 } -pub async fn post( +pub async fn post_webhook( State(pg_queue): State, Json(payload): Json, ) -> Result, (StatusCode, Json)> { @@ -62,6 +65,106 @@ pub async fn post( Ok(Json(WebhookPostResponse { error: None })) } +#[derive(Debug, Deserialize)] +pub struct HogFetchParameters { + pub body: Option, + pub headers: Option>, + pub method: Option, +} + +// Hoghook expects a JSON payload in the format of `HogFunctionInvocationResult` (as seen in +// plugin-server), but we accept a plain `Json` via Axum here, and this is why: +// * The reason we don't decode that into a `HogFunctionInvocationResult`-shaped Rust struct is that +// there's no benefit in mirroring the exact shape of that type (and keeping it sync with the +// plugin-server type). +// * Hoghook only cares about a small subset of the payload (the `asyncFunctionRequest` field), and +// the reason we don't decode *that* into a Rust struct is because the function args are a simple +// array (because this type is used for more than just `fetch` requests), and so we would need to +// manually validate and destructure the array elements anyway. +// * Additionally, don't want to discard the rest of the payload because we pass it back to the +// plugin-server after receiving the response body from the remote server. By accepting a plain +// `Json` we only decode the JSON once, we can do our minimal validation/extraction, and we +// can save the rest of the payload for later. +pub async fn post_hoghook( + State(pg_queue): State, + Json(mut payload): Json, +) -> Result, (StatusCode, Json)> { + debug!("received payload: {:?}", payload); + + let parameters: WebhookJobParameters = match &mut payload { + Value::Object(object) => { + let async_fn_request = object + .get("asyncFunctionRequest") + .ok_or_else(|| bad_request("missing required field 'asyncFunctionRequest'"))?; + + let name = async_fn_request + .get("name") + .ok_or_else(|| bad_request("missing required field 'asyncFunctionRequest.name'"))?; + + if name != "fetch" { + return Err(bad_request("asyncFunctionRequest.name must be 'fetch'")); + } + + let args = async_fn_request + .get("args") + .ok_or_else(|| bad_request("missing required field 'asyncFunctionRequest.args'"))?; + + // Note that the URL is parsed (and thus validated as a valid URL) as part of + // `get_hostname` below. + let url = args.get(0).ok_or_else(|| { + bad_request("missing required field 'asyncFunctionRequest.args[0]'") + })?; + + let fetch_options: HogFetchParameters = if let Some(value) = args.get(1) { + serde_json::from_value(value.clone()).map_err(|_| { + bad_request("failed to deserialize asyncFunctionRequest.args[1]") + })? + } else { + HogFetchParameters { + body: None, + headers: None, + method: None, + } + }; + + WebhookJobParameters { + body: fetch_options.body.unwrap_or("".to_owned()), + headers: fetch_options.headers.unwrap_or_default(), + method: fetch_options.method.unwrap_or(HttpMethod::POST), + url: url + .as_str() + .ok_or_else(|| bad_request("url must be a string"))? + .to_owned(), + } + } + _ => return Err(bad_request("expected JSON object")), + }; + + let url_hostname = get_hostname(¶meters.url)?; + let max_attempts = default_max_attempts() as i32; + + let job = NewJob::new(max_attempts, payload, parameters, url_hostname.as_str()); + + let start_time = Instant::now(); + + pg_queue.enqueue(job).await.map_err(internal_error)?; + + let elapsed_time = start_time.elapsed().as_secs_f64(); + metrics::histogram!("webhook_api_enqueue").record(elapsed_time); + + Ok(Json(WebhookPostResponse { error: None })) +} + +fn bad_request(msg: &str) -> (StatusCode, Json) { + error!(msg); + ( + StatusCode::BAD_REQUEST, + Json(WebhookPostResponse { + error: Some(msg.to_owned()), + }), + ) +} + fn internal_error(err: E) -> (StatusCode, Json) where E: std::error::Error, @@ -76,23 +179,11 @@ where } fn get_hostname(url_str: &str) -> Result)> { - let url = Url::parse(url_str).map_err(|_| { - ( - StatusCode::BAD_REQUEST, - Json(WebhookPostResponse { - error: Some("could not parse url".to_owned()), - }), - ) - })?; + let url = Url::parse(url_str).map_err(|_| bad_request("could not parse url"))?; match url.host_str() { Some(hostname) => Ok(hostname.to_owned()), - None => Err(( - StatusCode::BAD_REQUEST, - Json(WebhookPostResponse { - error: Some("couldn't extract hostname from url".to_owned()), - }), - )), + None => Err(bad_request("couldn't extract hostname from url")), } } @@ -119,8 +210,9 @@ mod tests { #[sqlx::test(migrations = "../migrations")] async fn webhook_success(db: PgPool) { let pg_queue = PgQueue::new_from_pool("test_index", db).await; + let hog_mode = false; - let app = add_routes(Router::new(), pg_queue, MAX_BODY_SIZE); + let app = add_routes(Router::new(), pg_queue, hog_mode, MAX_BODY_SIZE); let mut headers = collections::HashMap::new(); headers.insert("Content-Type".to_owned(), "application/json".to_owned()); @@ -161,8 +253,9 @@ mod tests { #[sqlx::test(migrations = "../migrations")] async fn webhook_bad_url(db: PgPool) { let pg_queue = PgQueue::new_from_pool("test_index", db).await; + let hog_mode = false; - let app = add_routes(Router::new(), pg_queue, MAX_BODY_SIZE); + let app = add_routes(Router::new(), pg_queue, hog_mode, MAX_BODY_SIZE); let response = app .oneshot( @@ -198,8 +291,9 @@ mod tests { #[sqlx::test(migrations = "../migrations")] async fn webhook_payload_missing_fields(db: PgPool) { let pg_queue = PgQueue::new_from_pool("test_index", db).await; + let hog_mode = false; - let app = add_routes(Router::new(), pg_queue, MAX_BODY_SIZE); + let app = add_routes(Router::new(), pg_queue, hog_mode, MAX_BODY_SIZE); let response = app .oneshot( @@ -219,8 +313,9 @@ mod tests { #[sqlx::test(migrations = "../migrations")] async fn webhook_payload_not_json(db: PgPool) { let pg_queue = PgQueue::new_from_pool("test_index", db).await; + let hog_mode = false; - let app = add_routes(Router::new(), pg_queue, MAX_BODY_SIZE); + let app = add_routes(Router::new(), pg_queue, hog_mode, MAX_BODY_SIZE); let response = app .oneshot( @@ -240,8 +335,9 @@ mod tests { #[sqlx::test(migrations = "../migrations")] async fn webhook_payload_body_too_large(db: PgPool) { let pg_queue = PgQueue::new_from_pool("test_index", db).await; + let hog_mode = false; - let app = add_routes(Router::new(), pg_queue, MAX_BODY_SIZE); + let app = add_routes(Router::new(), pg_queue, hog_mode, MAX_BODY_SIZE); let bytes: Vec = vec![b'a'; MAX_BODY_SIZE + 1]; let long_string = String::from_utf8_lossy(&bytes); @@ -276,4 +372,126 @@ mod tests { assert_eq!(response.status(), StatusCode::PAYLOAD_TOO_LARGE); } + + #[derive(sqlx::FromRow, Debug)] + struct TestJobRow { + parameters: Value, + metadata: Value, + target: String, + } + + #[sqlx::test(migrations = "../migrations")] + async fn hoghook_success(db: PgPool) { + let pg_queue = PgQueue::new_from_pool("test_index", db.clone()).await; + let hog_mode = true; + + let app = add_routes(Router::new(), pg_queue, hog_mode, MAX_BODY_SIZE); + + let valid_payloads = vec![ + ( + r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com"]}}"#, + r#"{"body": "", "headers": {}, "method": "POST", "url": "http://example.com"}"#, + ), + ( + r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"method": "GET"}]}}"#, + r#"{"body": "", "headers": {}, "method": "GET", "url": "http://example.com"}"#, + ), + ( + r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"body": "hello, world"}]}}"#, + r#"{"body": "hello, world", "headers": {}, "method": "POST", "url": "http://example.com"}"#, + ), + ( + r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"headers": {"k": "v"}}]}}"#, + r#"{"body": "", "headers": {"k": "v"}, "method": "POST", "url": "http://example.com"}"#, + ), + ( + r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"method": "GET", "body": "hello, world", "headers": {"k": "v"}}]}, "otherField": true}"#, + r#"{"body": "hello, world", "headers": {"k": "v"}, "method": "GET", "url": "http://example.com"}"#, + ), + ]; + + for (payload, expected_parameters) in valid_payloads { + let mut headers = collections::HashMap::new(); + headers.insert("Content-Type".to_owned(), "application/json".to_owned()); + let response = app + .clone() + .oneshot( + Request::builder() + .method(http::Method::POST) + .uri("/hoghook") + .header(http::header::CONTENT_TYPE, "application/json") + .body(Body::from(payload.to_owned())) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + + let body = response.into_body().collect().await.unwrap().to_bytes(); + assert_eq!(&body[..], b"{}"); + + let mut conn = db.acquire().await.unwrap(); + + let row = sqlx::query_as::<_, TestJobRow>( + "SELECT parameters, metadata, target FROM job_queue;", + ) + .fetch_one(&mut *conn) + .await + .unwrap(); + + assert_eq!( + row.parameters, + serde_json::from_str::(expected_parameters).unwrap() + ); + assert_eq!( + row.metadata, + serde_json::from_str::(payload).unwrap() + ); + assert_eq!(row.target, "example.com"); + + sqlx::query("DELETE FROM job_queue") + .execute(&mut *conn) + .await + .unwrap(); + } + } + + #[sqlx::test(migrations = "../migrations")] + async fn hoghook_bad_requests(db: PgPool) { + let pg_queue = PgQueue::new_from_pool("test_index", db.clone()).await; + let hog_mode = true; + + let app = add_routes(Router::new(), pg_queue, hog_mode, MAX_BODY_SIZE); + + let valid_payloads = vec![ + r#"{}"#, + r#"{"asyncFunctionRequest":{}"#, + r#"{"asyncFunctionRequest":{"name":"not-fetch","args":[]}}"#, + r#"{"asyncFunctionRequest":{"name":"fetch"}}"#, + r#"{"asyncFunctionRequest":{"name":"fetch","args":{}}}"#, + r#"{"asyncFunctionRequest":{"name":"fetch","args":[]}}"#, + r#"{"asyncFunctionRequest":{"name":"fetch","args":["not-url"]}}"#, + r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"method": "not-method"}]}}"#, + ]; + + for payload in valid_payloads { + let mut headers = collections::HashMap::new(); + headers.insert("Content-Type".to_owned(), "application/json".to_owned()); + let response = app + .clone() + .oneshot( + Request::builder() + .method(http::Method::POST) + .uri("/hoghook") + .header(http::header::CONTENT_TYPE, "application/json") + .body(Body::from(payload.to_owned())) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::BAD_REQUEST); + } + } } diff --git a/rust/hook-api/src/main.rs b/rust/hook-api/src/main.rs index ad05edef1ff98..d20c5a11a37e1 100644 --- a/rust/hook-api/src/main.rs +++ b/rust/hook-api/src/main.rs @@ -34,7 +34,12 @@ async fn main() { .await .expect("failed to initialize queue"); - let app = handlers::add_routes(Router::new(), pg_queue, config.max_body_size); + let app = handlers::add_routes( + Router::new(), + pg_queue, + config.hog_mode, + config.max_body_size, + ); let app = setup_metrics_routes(app); match listen(app, config.bind()).await { diff --git a/rust/hook-common/Cargo.toml b/rust/hook-common/Cargo.toml index 58232a80fe17d..e5c27fd598245 100644 --- a/rust/hook-common/Cargo.toml +++ b/rust/hook-common/Cargo.toml @@ -10,9 +10,12 @@ workspace = true async-trait = { workspace = true } axum = { workspace = true, features = ["http2"] } chrono = { workspace = true } +envconfig = { workspace = true } +health = { path = "../common/health" } http = { workspace = true } metrics = { workspace = true } metrics-exporter-prometheus = { workspace = true } +rdkafka = { workspace = true } reqwest = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/rust/hook-common/src/config.rs b/rust/hook-common/src/config.rs new file mode 100644 index 0000000000000..8154a1d3fb7c5 --- /dev/null +++ b/rust/hook-common/src/config.rs @@ -0,0 +1,22 @@ +use envconfig::Envconfig; + +#[derive(Envconfig, Clone)] +pub struct KafkaConfig { + #[envconfig(default = "20")] + pub kafka_producer_linger_ms: u32, // Maximum time between producer batches during low traffic + + #[envconfig(default = "400")] + pub kafka_producer_queue_mib: u32, // Size of the in-memory producer queue in mebibytes + + #[envconfig(default = "20000")] + pub kafka_message_timeout_ms: u32, // Time before we stop retrying producing a message: 20 seconds + + #[envconfig(default = "none")] + pub kafka_compression_codec: String, // none, gzip, snappy, lz4, zstd + + #[envconfig(default = "false")] + pub kafka_tls: bool, + + #[envconfig(default = "localhost:9092")] + pub kafka_hosts: String, +} diff --git a/rust/hook-janitor/src/kafka_producer.rs b/rust/hook-common/src/kafka_producer.rs similarity index 100% rename from rust/hook-janitor/src/kafka_producer.rs rename to rust/hook-common/src/kafka_producer.rs diff --git a/rust/hook-common/src/lib.rs b/rust/hook-common/src/lib.rs index 8e63ded5a7bf2..5531ceb7346de 100644 --- a/rust/hook-common/src/lib.rs +++ b/rust/hook-common/src/lib.rs @@ -1,5 +1,8 @@ +pub mod config; pub mod kafka_messages; +pub mod kafka_producer; pub mod metrics; pub mod pgqueue; pub mod retry; +pub mod test; pub mod webhook; diff --git a/rust/hook-common/src/test.rs b/rust/hook-common/src/test.rs new file mode 100644 index 0000000000000..33bb2b5d4e213 --- /dev/null +++ b/rust/hook-common/src/test.rs @@ -0,0 +1,33 @@ +use health::HealthRegistry; +use rdkafka::mocking::MockCluster; +use rdkafka::producer::{DefaultProducerContext, FutureProducer}; + +use crate::config::KafkaConfig; +use crate::kafka_producer::{create_kafka_producer, KafkaContext}; + +pub async fn create_mock_kafka() -> ( + MockCluster<'static, DefaultProducerContext>, + FutureProducer, +) { + let registry = HealthRegistry::new("liveness"); + let handle = registry + .register("one".to_string(), time::Duration::seconds(30)) + .await; + let cluster = MockCluster::new(1).expect("failed to create mock brokers"); + + let config = KafkaConfig { + kafka_producer_linger_ms: 0, + kafka_producer_queue_mib: 50, + kafka_message_timeout_ms: 5000, + kafka_compression_codec: "none".to_string(), + kafka_hosts: cluster.bootstrap_servers(), + kafka_tls: false, + }; + + ( + cluster, + create_kafka_producer(&config, handle) + .await + .expect("failed to create mocked kafka producer"), + ) +} diff --git a/rust/hook-janitor/src/config.rs b/rust/hook-janitor/src/config.rs index 389de0342e03a..166fb8067a056 100644 --- a/rust/hook-janitor/src/config.rs +++ b/rust/hook-janitor/src/config.rs @@ -1,8 +1,10 @@ use envconfig::Envconfig; +use hook_common::config::KafkaConfig; + #[derive(Envconfig)] pub struct Config { - #[envconfig(from = "BIND_HOST", default = "0.0.0.0")] + #[envconfig(from = "BIND_HOST", default = "::")] pub host: String, #[envconfig(from = "BIND_PORT", default = "3302")] @@ -20,34 +22,14 @@ pub struct Config { #[envconfig(default = "webhooks")] pub mode: String, - #[envconfig(nested = true)] - pub kafka: KafkaConfig, -} - -#[derive(Envconfig, Clone)] -pub struct KafkaConfig { - #[envconfig(default = "20")] - pub kafka_producer_linger_ms: u32, // Maximum time between producer batches during low traffic - - #[envconfig(default = "400")] - pub kafka_producer_queue_mib: u32, // Size of the in-memory producer queue in mebibytes - - #[envconfig(default = "20000")] - pub kafka_message_timeout_ms: u32, // Time before we stop retrying producing a message: 20 seconds - - #[envconfig(default = "none")] - pub kafka_compression_codec: String, // none, gzip, snappy, lz4, zstd - #[envconfig(default = "false")] - pub kafka_tls: bool, + pub hog_mode: bool, #[envconfig(default = "clickhouse_app_metrics")] pub app_metrics_topic: String, - #[envconfig(default = "plugin_log_entries")] - pub plugin_log_entries_topic: String, - - pub kafka_hosts: String, + #[envconfig(nested = true)] + pub kafka: KafkaConfig, } impl Config { diff --git a/rust/hook-janitor/src/main.rs b/rust/hook-janitor/src/main.rs index 325aa098ed6fe..891532d35dae0 100644 --- a/rust/hook-janitor/src/main.rs +++ b/rust/hook-janitor/src/main.rs @@ -5,17 +5,16 @@ use envconfig::Envconfig; use eyre::Result; use futures::future::{select, Either}; use health::{HealthHandle, HealthRegistry}; -use kafka_producer::create_kafka_producer; use std::{str::FromStr, time::Duration}; use tokio::sync::Semaphore; use webhooks::WebhookCleaner; +use hook_common::kafka_producer::create_kafka_producer; use hook_common::metrics::setup_metrics_routes; mod cleanup; mod config; mod handlers; -mod kafka_producer; mod webhooks; async fn listen(app: Router, bind: String) -> Result<()> { @@ -63,7 +62,8 @@ async fn main() { WebhookCleaner::new( &config.database_url, kafka_producer, - config.kafka.app_metrics_topic.to_owned(), + config.app_metrics_topic.to_owned(), + config.hog_mode, ) .expect("unable to create webhook cleaner"), ) diff --git a/rust/hook-janitor/src/webhooks.rs b/rust/hook-janitor/src/webhooks.rs index c1dfbba51aa35..67d6550fe8a19 100644 --- a/rust/hook-janitor/src/webhooks.rs +++ b/rust/hook-janitor/src/webhooks.rs @@ -15,9 +15,9 @@ use thiserror::Error; use tracing::{debug, error, info}; use crate::cleanup::Cleaner; -use crate::kafka_producer::KafkaContext; use hook_common::kafka_messages::app_metrics::{AppMetric, AppMetricCategory}; +use hook_common::kafka_producer::KafkaContext; use hook_common::metrics::get_current_timestamp_seconds; #[derive(Error, Debug)] @@ -58,6 +58,7 @@ pub struct WebhookCleaner { pg_pool: PgPool, kafka_producer: FutureProducer, app_metrics_topic: String, + hog_mode: bool, } #[derive(sqlx::FromRow, Debug)] @@ -155,6 +156,7 @@ impl WebhookCleaner { database_url: &str, kafka_producer: FutureProducer, app_metrics_topic: String, + hog_mode: bool, ) -> Result { let options = PgConnectOptions::from_str(database_url) .map_err(|error| WebhookCleanerError::PoolCreationError { error })? @@ -167,6 +169,7 @@ impl WebhookCleaner { pg_pool, kafka_producer, app_metrics_topic, + hog_mode, }) } @@ -175,11 +178,13 @@ impl WebhookCleaner { pg_pool: PgPool, kafka_producer: FutureProducer, app_metrics_topic: String, + hog_mode: bool, ) -> Result { Ok(Self { pg_pool, kafka_producer, app_metrics_topic, + hog_mode, }) } @@ -394,7 +399,13 @@ impl WebhookCleaner { let (completed_row_count, completed_agg_row_count) = { let completed_row_count = self.get_row_count_for_status(&mut tx, "completed").await?; - let completed_agg_rows = self.get_completed_agg_rows(&mut tx).await?; + let completed_agg_rows = if self.hog_mode { + // Hog mode doesn't need to send metrics to Kafka (and can't aggregate by + // plugin anyway), so we can skip this. + vec![] + } else { + self.get_completed_agg_rows(&mut tx).await? + }; let agg_row_count = completed_agg_rows.len() as u64; let completed_app_metrics: Vec = completed_agg_rows.into_iter().map(Into::into).collect(); @@ -404,7 +415,13 @@ impl WebhookCleaner { let (failed_row_count, failed_agg_row_count) = { let failed_row_count = self.get_row_count_for_status(&mut tx, "failed").await?; - let failed_agg_rows = self.get_failed_agg_rows(&mut tx).await?; + let failed_agg_rows = if self.hog_mode { + // Hog mode doesn't need to send metrics to Kafka (and can't aggregate by + // plugin anyway), so we can skip this. + vec![] + } else { + self.get_failed_agg_rows(&mut tx).await? + }; let agg_row_count = failed_agg_rows.len() as u64; let failed_app_metrics: Vec = failed_agg_rows.into_iter().map(Into::into).collect(); @@ -413,7 +430,7 @@ impl WebhookCleaner { }; let mut rows_deleted = 0; - if completed_agg_row_count + failed_agg_row_count != 0 { + if completed_row_count + failed_row_count != 0 { rows_deleted = self.delete_observed_rows(&mut tx).await?; if rows_deleted != completed_row_count + failed_row_count { @@ -493,18 +510,15 @@ impl Cleaner for WebhookCleaner { #[cfg(test)] mod tests { use super::*; - use crate::config; - use crate::kafka_producer::{create_kafka_producer, KafkaContext}; - use health::HealthRegistry; + use hook_common::kafka_messages::app_metrics::{ Error as WebhookError, ErrorDetails, ErrorType, }; use hook_common::pgqueue::PgQueueJob; use hook_common::pgqueue::{NewJob, PgQueue, PgTransactionBatch}; + use hook_common::test::create_mock_kafka; use hook_common::webhook::{HttpMethod, WebhookJobMetadata, WebhookJobParameters}; use rdkafka::consumer::{Consumer, StreamConsumer}; - use rdkafka::mocking::MockCluster; - use rdkafka::producer::{DefaultProducerContext, FutureProducer}; use rdkafka::types::{RDKafkaApiKey, RDKafkaRespErr}; use rdkafka::{ClientConfig, Message}; use sqlx::{PgPool, Row}; @@ -513,35 +527,6 @@ mod tests { const APP_METRICS_TOPIC: &str = "app_metrics"; - async fn create_mock_kafka() -> ( - MockCluster<'static, DefaultProducerContext>, - FutureProducer, - ) { - let registry = HealthRegistry::new("liveness"); - let handle = registry - .register("one".to_string(), time::Duration::seconds(30)) - .await; - let cluster = MockCluster::new(1).expect("failed to create mock brokers"); - - let config = config::KafkaConfig { - kafka_producer_linger_ms: 0, - kafka_producer_queue_mib: 50, - kafka_message_timeout_ms: 5000, - kafka_compression_codec: "none".to_string(), - kafka_hosts: cluster.bootstrap_servers(), - app_metrics_topic: APP_METRICS_TOPIC.to_string(), - plugin_log_entries_topic: "plugin_log_entries".to_string(), - kafka_tls: false, - }; - - ( - cluster, - create_kafka_producer(&config, handle) - .await - .expect("failed to create mocked kafka producer"), - ) - } - fn check_app_metric_vector_equality(v1: &[AppMetric], v2: &[AppMetric]) { // Ignores `error_uuid`s. assert_eq!(v1.len(), v2.len()); @@ -569,9 +554,14 @@ mod tests { .expect("failed to create mock consumer"); consumer.subscribe(&[APP_METRICS_TOPIC]).unwrap(); - let webhook_cleaner = - WebhookCleaner::new_from_pool(db, mock_producer, APP_METRICS_TOPIC.to_owned()) - .expect("unable to create webhook cleaner"); + let hog_mode = false; + let webhook_cleaner = WebhookCleaner::new_from_pool( + db, + mock_producer, + APP_METRICS_TOPIC.to_owned(), + hog_mode, + ) + .expect("unable to create webhook cleaner"); let cleanup_stats = webhook_cleaner .cleanup_impl() @@ -762,9 +752,14 @@ mod tests { .expect("failed to create mock consumer"); consumer.subscribe(&[APP_METRICS_TOPIC]).unwrap(); - let webhook_cleaner = - WebhookCleaner::new_from_pool(db, mock_producer, APP_METRICS_TOPIC.to_owned()) - .expect("unable to create webhook cleaner"); + let hog_mode = false; + let webhook_cleaner = WebhookCleaner::new_from_pool( + db, + mock_producer, + APP_METRICS_TOPIC.to_owned(), + hog_mode, + ) + .expect("unable to create webhook cleaner"); let cleanup_stats = webhook_cleaner .cleanup_impl() @@ -782,9 +777,14 @@ mod tests { #[sqlx::test(migrations = "../migrations", fixtures("webhook_cleanup"))] async fn test_serializable_isolation(db: PgPool) { let (_, mock_producer) = create_mock_kafka().await; - let webhook_cleaner = - WebhookCleaner::new_from_pool(db.clone(), mock_producer, APP_METRICS_TOPIC.to_owned()) - .expect("unable to create webhook cleaner"); + let hog_mode = false; + let webhook_cleaner = WebhookCleaner::new_from_pool( + db.clone(), + mock_producer, + APP_METRICS_TOPIC.to_owned(), + hog_mode, + ) + .expect("unable to create webhook cleaner"); let queue = PgQueue::new_from_pool("webhooks", db.clone()).await; diff --git a/rust/hook-worker/Cargo.toml b/rust/hook-worker/Cargo.toml index 79416f9004a10..d09a241206911 100644 --- a/rust/hook-worker/Cargo.toml +++ b/rust/hook-worker/Cargo.toml @@ -15,7 +15,9 @@ health = { path = "../common/health" } hook-common = { path = "../hook-common" } http = { workspace = true } metrics = { workspace = true } +rdkafka = { workspace = true } reqwest = { workspace = true } +serde_json = { workspace = true } sqlx = { workspace = true } thiserror = { workspace = true } time = { workspace = true } @@ -23,3 +25,6 @@ tokio = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } url = { version = "2.2" } + +[dev-dependencies] +httpmock = { workspace = true } diff --git a/rust/hook-worker/src/config.rs b/rust/hook-worker/src/config.rs index 51b23b7f273c5..1fa6c04638698 100644 --- a/rust/hook-worker/src/config.rs +++ b/rust/hook-worker/src/config.rs @@ -3,9 +3,11 @@ use std::time; use envconfig::Envconfig; +use hook_common::config::KafkaConfig; + #[derive(Envconfig, Clone)] pub struct Config { - #[envconfig(from = "BIND_HOST", default = "0.0.0.0")] + #[envconfig(from = "BIND_HOST", default = "::")] pub host: String, #[envconfig(from = "BIND_PORT", default = "3301")] @@ -40,6 +42,15 @@ pub struct Config { #[envconfig(default = "false")] pub allow_internal_ips: bool, + + #[envconfig(default = "false")] + pub hog_mode: bool, + + #[envconfig(default = "cdp_function_callbacks")] + pub cdp_function_callbacks_topic: String, + + #[envconfig(nested = true)] + pub kafka: KafkaConfig, } impl Config { diff --git a/rust/hook-worker/src/main.rs b/rust/hook-worker/src/main.rs index 050e2b947c780..5400ff93bf6a4 100644 --- a/rust/hook-worker/src/main.rs +++ b/rust/hook-worker/src/main.rs @@ -5,6 +5,7 @@ use envconfig::Envconfig; use std::future::ready; use health::HealthRegistry; +use hook_common::kafka_producer::create_kafka_producer; use hook_common::{ metrics::serve, metrics::setup_metrics_routes, pgqueue::PgQueue, retry::RetryPolicy, }; @@ -44,6 +45,13 @@ async fn main() -> Result<(), WorkerError> { .await .expect("failed to initialize queue"); + let kafka_liveness = liveness + .register("rdkafka".to_string(), time::Duration::seconds(30)) + .await; + let kafka_producer = create_kafka_producer(&config.kafka, kafka_liveness) + .await + .expect("failed to create kafka producer"); + let worker = WebhookWorker::new( &config.worker_name, &queue, @@ -53,6 +61,9 @@ async fn main() -> Result<(), WorkerError> { config.max_concurrent_jobs, retry_policy_builder.provide(), config.allow_internal_ips, + kafka_producer, + config.cdp_function_callbacks_topic.to_owned(), + config.hog_mode, worker_liveness, ); diff --git a/rust/hook-worker/src/worker.rs b/rust/hook-worker/src/worker.rs index 9dcc4a2f4b7b0..422030d2a48b5 100644 --- a/rust/hook-worker/src/worker.rs +++ b/rust/hook-worker/src/worker.rs @@ -1,20 +1,27 @@ -use std::collections; use std::sync::Arc; use std::time; +use std::{collections, iter}; use chrono::Utc; +use futures::channel::oneshot::Canceled; use futures::future::join_all; use health::HealthHandle; +use http::StatusCode; +use rdkafka::error::KafkaError; +use rdkafka::producer::{FutureProducer, FutureRecord}; +use reqwest::{header, Client}; +use serde_json::{json, Value}; +use tokio::sync; +use tokio::time::{sleep, Duration}; +use tracing::error; + +use hook_common::kafka_producer::KafkaContext; use hook_common::pgqueue::PgTransactionBatch; use hook_common::{ pgqueue::{Job, PgQueue, PgQueueJob, PgTransactionJob, RetryError, RetryInvalidError}, retry::RetryPolicy, - webhook::{HttpMethod, WebhookJobError, WebhookJobMetadata, WebhookJobParameters}, + webhook::{HttpMethod, WebhookJobError, WebhookJobParameters}, }; -use http::StatusCode; -use reqwest::{header, Client}; -use tokio::sync; -use tracing::error; use crate::dns::{NoPublicIPv4Error, PublicIPv4Resolver}; use crate::error::{ @@ -22,11 +29,11 @@ use crate::error::{ }; use crate::util::first_n_bytes_of_response; -/// A WebhookJob is any `PgQueueJob` with `WebhookJobParameters` and `WebhookJobMetadata`. +/// A WebhookJob is any `PgQueueJob` with `WebhookJobParameters` and `Value`. trait WebhookJob: PgQueueJob + std::marker::Send { fn parameters(&self) -> &WebhookJobParameters; - fn metadata(&self) -> &WebhookJobMetadata; - fn job(&self) -> &Job; + fn take_metadata(&mut self) -> Value; + fn job(&self) -> &Job; fn attempt(&self) -> i32 { self.job().attempt @@ -36,21 +43,22 @@ trait WebhookJob: PgQueueJob + std::marker::Send { self.job().queue.to_owned() } + #[allow(dead_code)] fn target(&self) -> String { self.job().target.to_owned() } } -impl WebhookJob for PgTransactionJob<'_, WebhookJobParameters, WebhookJobMetadata> { +impl WebhookJob for PgTransactionJob<'_, WebhookJobParameters, Value> { fn parameters(&self) -> &WebhookJobParameters { &self.job.parameters } - fn metadata(&self) -> &WebhookJobMetadata { - &self.job.metadata + fn take_metadata(&mut self) -> Value { + self.job.metadata.take() } - fn job(&self) -> &Job { + fn job(&self) -> &Job { &self.job } } @@ -66,11 +74,17 @@ pub struct WebhookWorker<'p> { /// The interval for polling the queue. poll_interval: time::Duration, /// The client used for HTTP requests. - client: reqwest::Client, + http_client: reqwest::Client, /// Maximum number of concurrent jobs being processed. max_concurrent_jobs: usize, /// The retry policy used to calculate retry intervals when a job fails with a retryable error. retry_policy: RetryPolicy, + /// Kafka producer used to send results when in Hog mode + kafka_producer: FutureProducer, + /// The topic to send results to when in Hog mode + cdp_function_callbacks_topic: &'static str, + /// Whether we are running in Hog mode or not + hog_mode: bool, /// The liveness check handle, to call on a schedule to report healthy liveness: HealthHandle, } @@ -105,9 +119,12 @@ impl<'p> WebhookWorker<'p> { max_concurrent_jobs: usize, retry_policy: RetryPolicy, allow_internal_ips: bool, + kafka_producer: FutureProducer, + cdp_function_callbacks_topic: String, + hog_mode: bool, liveness: HealthHandle, ) -> Self { - let client = build_http_client(request_timeout, allow_internal_ips) + let http_client = build_http_client(request_timeout, allow_internal_ips) .expect("failed to construct reqwest client for webhook worker"); Self { @@ -115,17 +132,18 @@ impl<'p> WebhookWorker<'p> { queue, dequeue_batch_size, poll_interval, - client, + http_client, max_concurrent_jobs, retry_policy, + kafka_producer, + cdp_function_callbacks_topic: cdp_function_callbacks_topic.leak(), + hog_mode, liveness, } } /// Wait until at least one job becomes available in our queue in transactional mode. - async fn wait_for_jobs_tx<'a>( - &self, - ) -> PgTransactionBatch<'a, WebhookJobParameters, WebhookJobMetadata> { + async fn wait_for_jobs_tx<'a>(&self) -> PgTransactionBatch<'a, WebhookJobParameters, Value> { let mut interval = tokio::time::interval(self.poll_interval); loop { @@ -163,7 +181,7 @@ impl<'p> WebhookWorker<'p> { // `min(semaphore.available_permits(), dequeue_batch_size)` // And then dequeue only up to that many jobs. We'd then need to hand back the // difference in permits based on how many jobs were dequeued. - let mut batch = self.wait_for_jobs_tx().await; + let batch = self.wait_for_jobs_tx().await; dequeue_batch_size_histogram.record(batch.jobs.len() as f64); // Get enough permits for the jobs before spawning a task. @@ -173,39 +191,176 @@ impl<'p> WebhookWorker<'p> { .await .expect("semaphore has been closed"); - let client = self.client.clone(); + let http_client = self.http_client.clone(); let retry_policy = self.retry_policy.clone(); + let kafka_producer = self.kafka_producer.clone(); + let cdp_function_callbacks_topic = self.cdp_function_callbacks_topic; + let hog_mode = self.hog_mode; tokio::spawn(async move { - let mut futures = Vec::new(); + // Move `permits` into the closure so they will be dropped when the scope ends. + let _permits = permits; + + process_batch( + batch, + http_client, + retry_policy, + kafka_producer, + cdp_function_callbacks_topic, + hog_mode, + ) + .await + }); + } + } +} - // We have to `take` the Vec of jobs from the batch to avoid a borrow checker - // error below when we commit. - for job in std::mem::take(&mut batch.jobs) { - let client = client.clone(); - let retry_policy = retry_policy.clone(); +async fn log_kafka_error_and_sleep(step: &str, error: Option) { + match error { + Some(error) => error!("error sending hog message to kafka ({}): {}", step, error), + None => error!("error sending hog message to kafka ({})", step), + } - let future = - async move { process_webhook_job(client, job, &retry_policy).await }; + // Errors producing to Kafka *should* be exceedingly rare, but when they happen we don't want + // to enter a tight loop where we re-send the hook payload, fail to produce to Kafka, and + // repeat over and over again. We also don't want to commit the job as done and not produce + // something to Kafka, as the Hog task would then be lost. + // + // For this reason, we sleep before aborting the batch, in hopes that Kafka has recovered by the + // time we retry. + // + // In the future we may want to consider dequeueing completed jobs from PG itself rather than + // using a Kafka intermediary. + sleep(Duration::from_secs(30)).await; +} - futures.push(future); - } +async fn process_batch<'a>( + mut batch: PgTransactionBatch<'a, WebhookJobParameters, Value>, + http_client: Client, + retry_policy: RetryPolicy, + kafka_producer: FutureProducer, + cdp_function_callbacks_topic: &'static str, + hog_mode: bool, +) { + let mut futures = Vec::with_capacity(batch.jobs.len()); + let mut metadata_vec = Vec::with_capacity(batch.jobs.len()); + + // We have to `take` the Vec of jobs from the batch to avoid a borrow checker + // error below when we commit. + for mut job in std::mem::take(&mut batch.jobs) { + let http_client = http_client.clone(); + let retry_policy = retry_policy.clone(); + + metadata_vec.push(job.take_metadata()); + + let read_body = hog_mode; + let future = + async move { process_webhook_job(http_client, job, &retry_policy, read_body).await }; + + futures.push(future); + } + + let results = join_all(futures).await; - let results = join_all(futures).await; - for result in results { - if let Err(e) = result { - error!("error processing webhook job: {}", e); + let mut kafka_ack_futures = Vec::new(); + for (result, mut metadata) in iter::zip(results, metadata_vec) { + match result { + Ok(result) => { + if hog_mode { + if let Some(payload) = create_hoghook_kafka_payload(result, &mut metadata).await + { + match kafka_producer.send_result(FutureRecord { + topic: cdp_function_callbacks_topic, + payload: Some(&payload), + partition: None, + key: None::<&str>, + timestamp: None, + headers: None, + }) { + Ok(future) => kafka_ack_futures.push(future), + Err((error, _)) => { + // Return early to avoid committing the batch. + return log_kafka_error_and_sleep("send", Some(error)).await; + } + }; } } + } + Err(e) => { + error!("error processing webhook job: {}", e) + } + } + } - let _ = batch.commit().await.map_err(|e| { - error!("error committing transactional batch: {}", e); + for result in join_all(kafka_ack_futures).await { + match result { + Ok(Ok(_)) => {} + Ok(Err((error, _))) => { + // Return early to avoid committing the batch. + return log_kafka_error_and_sleep("ack", Some(error)).await; + } + Err(Canceled) => { + // Cancelled due to timeout while retrying + // Return early to avoid committing the batch. + return log_kafka_error_and_sleep("timeout", None).await; + } + } + } + + let _ = batch.commit().await.map_err(|e| { + error!("error committing transactional batch: {}", e); + }); +} + +async fn create_hoghook_kafka_payload( + result: WebhookResult, + metadata: &mut Value, +) -> Option { + if let Value::Object(ref mut object) = metadata { + // Add the response or error in the `asyncFunctionResponse` field. + match result { + WebhookResult::Success(response) => { + let async_function_response = json!({ + "timings": [{ + "kind": "async_function", + "duration_ms": response.duration.as_millis().try_into().unwrap_or(u32::MAX) + }], + "response": { + "status": response.status_code, + "body": response.body + } }); - drop(permits); - }); + object.insert("asyncFunctionResponse".to_owned(), async_function_response); + } + WebhookResult::Failed(error) => { + let async_function_response = json!({ + "error": error.to_string(), + }); + + object.insert("asyncFunctionResponse".to_owned(), async_function_response); + } + WebhookResult::WillRetry => { + // Nothing to do, and we don't want to produce anything + // to Kafka. + return None; + } } } + + Some(serde_json::to_string(&metadata).expect("unable to serialize metadata")) +} + +struct WebhookSuccess { + status_code: u16, + duration: Duration, + body: Option, +} + +enum WebhookResult { + Success(WebhookSuccess), + WillRetry, + Failed(String), } /// Process a webhook job by transitioning it to its appropriate state after its request is sent. @@ -223,10 +378,11 @@ impl<'p> WebhookWorker<'p> { /// * `webhook_job`: The webhook job to process as dequeued from `hook_common::pgqueue::PgQueue`. /// * `retry_policy`: The retry policy used to set retry parameters if a job fails and has remaining attempts. async fn process_webhook_job( - client: reqwest::Client, + http_client: reqwest::Client, webhook_job: W, retry_policy: &RetryPolicy, -) -> Result<(), WorkerError> { + read_body: bool, +) -> Result { let parameters = webhook_job.parameters(); let labels = [("queue", webhook_job.queue())]; @@ -235,7 +391,7 @@ async fn process_webhook_job( let now = tokio::time::Instant::now(); let send_result = send_webhook( - client, + http_client, ¶meters.method, ¶meters.url, ¶meters.headers, @@ -243,10 +399,36 @@ async fn process_webhook_job( ) .await; - let elapsed = now.elapsed().as_secs_f64(); - match send_result { - Ok(_) => { + Ok(response) => { + // First, read the body if needed so that the read time is included in `duration`. + let status = response.status(); + let body = if read_body { + match first_n_bytes_of_response(response, 1024 * 1024).await { + Ok(body) => Some(body), // Once told me... + Err(e) => { + webhook_job + .fail(WebhookJobError::new_parse(&e.to_string())) + .await + .map_err(|job_error| { + metrics::counter!("webhook_jobs_database_error", &labels) + .increment(1); + job_error + })?; + + metrics::counter!("webhook_jobs_failed", &labels).increment(1); + + return Ok(WebhookResult::Failed( + "failed to read response body".to_owned(), + )); + } + } + } else { + None + }; + + let duration = now.elapsed(); + let created_at = webhook_job.job().created_at; let retries = webhook_job.job().attempt - 1; let labels_with_retries = [ @@ -267,9 +449,13 @@ async fn process_webhook_job( .record((insert_to_complete_duration.num_milliseconds() as f64) / 1_000_f64); metrics::counter!("webhook_jobs_completed", &labels).increment(1); metrics::histogram!("webhook_jobs_processing_duration_seconds", &labels) - .record(elapsed); + .record(duration.as_secs_f64()); - Ok(()) + Ok(WebhookResult::Success(WebhookSuccess { + status_code: status.as_u16(), + duration, + body, + })) } Err(WebhookError::Parse(WebhookParseError::ParseHeadersError(e))) => { webhook_job @@ -282,7 +468,7 @@ async fn process_webhook_job( metrics::counter!("webhook_jobs_failed", &labels).increment(1); - Ok(()) + Ok(WebhookResult::Failed(e.to_string())) } Err(WebhookError::Parse(WebhookParseError::ParseHttpMethodError(e))) => { webhook_job @@ -295,7 +481,7 @@ async fn process_webhook_job( metrics::counter!("webhook_jobs_failed", &labels).increment(1); - Ok(()) + Ok(WebhookResult::Failed(e.to_string())) } Err(WebhookError::Parse(WebhookParseError::ParseUrlError(e))) => { webhook_job @@ -308,7 +494,7 @@ async fn process_webhook_job( metrics::counter!("webhook_jobs_failed", &labels).increment(1); - Ok(()) + Ok(WebhookResult::Failed(e.to_string())) } Err(WebhookError::Request(request_error)) => { let webhook_job_error = WebhookJobError::from(&request_error); @@ -329,7 +515,7 @@ async fn process_webhook_job( Ok(_) => { metrics::counter!("webhook_jobs_retried", &labels).increment(1); - Ok(()) + Ok(WebhookResult::WillRetry) } Err(RetryError::RetryInvalidError(RetryInvalidError { job: webhook_job, @@ -346,7 +532,7 @@ async fn process_webhook_job( metrics::counter!("webhook_jobs_failed", &labels).increment(1); - Ok(()) + Ok(WebhookResult::Failed(error.to_string())) } Err(RetryError::DatabaseError(job_error)) => { metrics::counter!("webhook_jobs_database_error", &labels).increment(1); @@ -354,7 +540,7 @@ async fn process_webhook_job( } } } - WebhookRequestError::NonRetryableRetryableRequestError { .. } => { + WebhookRequestError::NonRetryableRetryableRequestError { error, .. } => { webhook_job .fail(webhook_job_error) .await @@ -365,7 +551,7 @@ async fn process_webhook_job( metrics::counter!("webhook_jobs_failed", &labels).increment(1); - Ok(()) + Ok(WebhookResult::Failed(error.to_string())) } } } @@ -496,6 +682,8 @@ mod tests { // See: https://github.com/rust-lang/rust/issues/46379. use health::HealthRegistry; use hook_common::pgqueue::{DatabaseError, NewJob}; + use hook_common::test::create_mock_kafka; + use hook_common::webhook::WebhookJobMetadata; use sqlx::PgPool; /// Use process id as a worker id for tests. @@ -512,7 +700,7 @@ mod tests { queue: &PgQueue, max_attempts: i32, job_parameters: WebhookJobParameters, - job_metadata: WebhookJobMetadata, + job_metadata: Value, ) -> Result<(), DatabaseError> { let job_target = job_parameters.url.to_owned(); let new_job = NewJob::new(max_attempts, job_metadata, job_parameters, &job_target); @@ -579,10 +767,12 @@ mod tests { &queue, 1, webhook_job_parameters.clone(), - webhook_job_metadata, + serde_json::to_value(webhook_job_metadata).unwrap(), ) .await .expect("failed to enqueue job"); + let (_mock_cluster, mock_producer) = create_mock_kafka().await; + let hog_mode = false; let worker = WebhookWorker::new( &worker_id, &queue, @@ -592,6 +782,9 @@ mod tests { 10, RetryPolicy::default(), false, + mock_producer, + "cdp_function_callbacks".to_string(), + hog_mode, liveness, ); @@ -617,6 +810,124 @@ mod tests { assert!(registry.get_status().healthy) } + #[sqlx::test(migrations = "../migrations")] + async fn test_hoghook_sends_kafka_payload(db: PgPool) { + use httpmock::prelude::*; + use rdkafka::consumer::{Consumer, StreamConsumer}; + use rdkafka::{ClientConfig, Message}; + + let worker_id = worker_id(); + let queue_name = "test_hoghook_sends_kafka_payload".to_string(); + let queue = PgQueue::new_from_pool(&queue_name, db).await; + let topic = "cdp_function_callbacks"; + + let server = MockServer::start(); + + server.mock(|when, then| { + when.method(POST).path("/"); + then.status(200) + .header("content-type", "application/json; charset=UTF-8") + .body(r#"{"message": "hello, world"}"#); + }); + + let mock_url = server.url("/"); + + let webhook_job_parameters = WebhookJobParameters { + body: "".to_owned(), + headers: collections::HashMap::new(), + method: HttpMethod::POST, + url: mock_url, + }; + + let webhook_job_metadata = json!({"someOtherField": true}); + + enqueue_job( + &queue, + 1, + webhook_job_parameters.clone(), + serde_json::to_value(webhook_job_metadata).unwrap(), + ) + .await + .expect("failed to enqueue job"); + + let registry = HealthRegistry::new("liveness"); + let liveness = registry + .register("worker".to_string(), ::time::Duration::seconds(30)) + .await; + + let (mock_cluster, mock_producer) = create_mock_kafka().await; + let hog_mode = true; + let worker = WebhookWorker::new( + &worker_id, + &queue, + 1, + time::Duration::from_millis(100), + time::Duration::from_millis(5000), + 10, + RetryPolicy::default(), + false, + mock_producer, + topic.to_string(), + hog_mode, + liveness, + ); + + let batch = worker.wait_for_jobs_tx().await; + + process_batch( + batch, + worker.http_client, + worker.retry_policy, + worker.kafka_producer, + worker.cdp_function_callbacks_topic, + hog_mode, + ) + .await; + + let consumer: StreamConsumer = ClientConfig::new() + .set("bootstrap.servers", mock_cluster.bootstrap_servers()) + .set("group.id", "mock") + .set("auto.offset.reset", "earliest") + .create() + .expect("failed to create mock consumer"); + consumer.subscribe(&[topic]).unwrap(); + + let kafka_msg = consumer.recv().await.unwrap(); + let kafka_payload_str = String::from_utf8(kafka_msg.payload().unwrap().to_vec()).unwrap(); + + let received = serde_json::from_str::(&kafka_payload_str).unwrap(); + + // Verify data is passed through, and that response and timings are correct. + assert!(received.get("someOtherField").unwrap().as_bool().unwrap()); + + let async_function_response = received.get("asyncFunctionResponse").unwrap(); + let received_response = async_function_response.get("response").unwrap(); + assert_eq!( + json!({ + "body": "{\"message\": \"hello, world\"}", + "status": 200 + }), + *received_response + ); + + let first_timing = async_function_response + .get("timings") + .unwrap() + .as_array() + .unwrap() + .get(0) + .unwrap(); + first_timing + .get("duration_ms") + .unwrap() + .as_number() + .unwrap(); + assert_eq!( + "async_function", + first_timing.get("kind").unwrap().as_str().unwrap() + ); + } + #[tokio::test] async fn test_send_webhook() { let method = HttpMethod::POST;