From 6e19a065059a10dc10372b04b427943a71b9aabc Mon Sep 17 00:00:00 2001 From: Brett Hoerner Date: Thu, 25 Jul 2024 12:50:54 -0600 Subject: [PATCH] fix(hoghooks): drop payloads that are too large for kafka --- rust/hook-worker/src/worker.rs | 99 +++++++++++++++++++++++++++++++++- 1 file changed, 98 insertions(+), 1 deletion(-) diff --git a/rust/hook-worker/src/worker.rs b/rust/hook-worker/src/worker.rs index 422030d2a48b5..678e866ad087c 100644 --- a/rust/hook-worker/src/worker.rs +++ b/rust/hook-worker/src/worker.rs @@ -7,7 +7,7 @@ use futures::channel::oneshot::Canceled; use futures::future::join_all; use health::HealthHandle; use http::StatusCode; -use rdkafka::error::KafkaError; +use rdkafka::error::{KafkaError, RDKafkaErrorCode}; use rdkafka::producer::{FutureProducer, FutureRecord}; use reqwest::{header, Client}; use serde_json::{json, Value}; @@ -278,6 +278,30 @@ async fn process_batch<'a>( headers: None, }) { Ok(future) => kafka_ack_futures.push(future), + Err(( + KafkaError::MessageProduction( + RDKafkaErrorCode::MessageSizeTooLarge, + ), + _, + )) => { + // HACK: While under development, we are dropping messages that + // are too large. This is temporary, as we expect the webhook + // handler for Hog to change soon. In the meantime, nobody needs + // to be alerted about this. + let team_id = metadata + .get("teamId") + .and_then(|t| t.as_number()) + .map(|t| t.to_string()) + .unwrap_or_else(|| "?".to_string()); + + let hog_function_id = metadata + .get("hogFunctionId") + .and_then(|h| h.as_str()) + .map(|h| h.to_string()) + .unwrap_or_else(|| "?".to_string()); + + error!("dropping message due to size limit, team_id: {}, hog_function_id: {}", team_id, hog_function_id); + } Err((error, _)) => { // Return early to avoid committing the batch. return log_kafka_error_and_sleep("send", Some(error)).await; @@ -928,6 +952,79 @@ mod tests { ); } + #[sqlx::test(migrations = "../migrations")] + async fn test_hoghook_drops_large_payloads(db: PgPool) { + use httpmock::prelude::*; + + let worker_id = worker_id(); + let queue_name = "test_hoghook_drops_large_payloads".to_string(); + let queue = PgQueue::new_from_pool(&queue_name, db).await; + let topic = "cdp_function_callbacks"; + + let server = MockServer::start(); + + server.mock(|when, then| { + when.method(POST).path("/"); + then.status(200) + .header("content-type", "application/json; charset=UTF-8") + .body(r#"{"message": "hello, world"}"#); + }); + + let mock_url = server.url("/"); + + let webhook_job_parameters = WebhookJobParameters { + body: "".to_owned(), + headers: collections::HashMap::new(), + method: HttpMethod::POST, + url: mock_url, + }; + + let webhook_job_metadata = json!({"hugeField": "a".repeat(2 * 1024 * 1024)}); + + enqueue_job( + &queue, + 1, + webhook_job_parameters.clone(), + serde_json::to_value(webhook_job_metadata).unwrap(), + ) + .await + .expect("failed to enqueue job"); + + let registry = HealthRegistry::new("liveness"); + let liveness = registry + .register("worker".to_string(), ::time::Duration::seconds(30)) + .await; + + let (_, mock_producer) = create_mock_kafka().await; + let hog_mode = true; + let worker = WebhookWorker::new( + &worker_id, + &queue, + 1, + time::Duration::from_millis(100), + time::Duration::from_millis(5000), + 10, + RetryPolicy::default(), + false, + mock_producer, + topic.to_string(), + hog_mode, + liveness, + ); + + let batch = worker.wait_for_jobs_tx().await; + + process_batch( + batch, + worker.http_client, + worker.retry_policy, + worker.kafka_producer, + worker.cdp_function_callbacks_topic, + hog_mode, + ) + .await; + } + #[tokio::test] async fn test_send_webhook() { let method = HttpMethod::POST;