Skip to content

Commit

Permalink
fix(hoghooks): send back http response status on kafka for bad respon… (
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner authored and silentninja committed Aug 8, 2024
1 parent af748e8 commit 9daacb1
Showing 1 changed file with 160 additions and 69 deletions.
229 changes: 160 additions & 69 deletions rust/hook-worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ async fn create_hoghook_kafka_payload(
if let Value::Object(ref mut object) = metadata {
// Add the response or error in the `asyncFunctionResponse` field.
match result {
WebhookResult::Success(response) => {
WebhookResult::Success(response) | WebhookResult::BadResponse(response) => {
let async_function_response = json!({
"timings": [{
"kind": "async_function",
Expand All @@ -392,9 +392,9 @@ async fn create_hoghook_kafka_payload(

object.insert("asyncFunctionResponse".to_owned(), async_function_response);
}
WebhookResult::Failed(error) => {
WebhookResult::Error(error) => {
let async_function_response = json!({
"error": error.error.to_string(),
"error": error,
});

object.insert("asyncFunctionResponse".to_owned(), async_function_response);
Expand All @@ -410,26 +410,17 @@ async fn create_hoghook_kafka_payload(
Some(serde_json::to_string(&metadata).expect("unable to serialize metadata"))
}

struct WebhookSuccess {
status_code: StatusCode,
struct WebhookResponse {
duration: Duration,
body: Option<String>,
}

struct WebhookFailed {
error: String,
#[allow(dead_code)]
status_code: Option<StatusCode>,
#[allow(dead_code)]
duration: Duration,
#[allow(dead_code)]
status_code: StatusCode,
body: Option<String>,
}

enum WebhookResult {
Success(WebhookSuccess),
Success(WebhookResponse),
BadResponse(WebhookResponse),
WillRetry,
Failed(WebhookFailed),
Error(String),
}

/// Process a webhook job by transitioning it to its appropriate state after its request is sent.
Expand Down Expand Up @@ -512,12 +503,9 @@ async fn process_webhook_job<W: WebhookJob>(

metrics::counter!("webhook_jobs_failed", &labels).increment(1);

Ok(WebhookResult::Failed(WebhookFailed {
error: "timeout while reading response body".to_owned(),
status_code: Some(status),
duration: now.elapsed(),
body: None,
}))
Ok(WebhookResult::Error(
"timeout while reading response body".to_owned(),
))
}
Err(RetryError::DatabaseError(job_error)) => {
metrics::counter!("webhook_jobs_database_error", &labels)
Expand Down Expand Up @@ -556,7 +544,7 @@ async fn process_webhook_job<W: WebhookJob>(
metrics::histogram!("webhook_jobs_processing_duration_seconds", &labels)
.record(duration.as_secs_f64());

Ok(WebhookResult::Success(WebhookSuccess {
Ok(WebhookResult::Success(WebhookResponse {
status_code: status,
duration,
body,
Expand All @@ -573,12 +561,7 @@ async fn process_webhook_job<W: WebhookJob>(

metrics::counter!("webhook_jobs_failed", &labels).increment(1);

Ok(WebhookResult::Failed(WebhookFailed {
error: e.to_string(),
status_code: None,
duration: now.elapsed(),
body: None,
}))
Ok(WebhookResult::Error(e.to_string()))
}
Err(WebhookError::Parse(WebhookParseError::ParseHttpMethodError(e))) => {
webhook_job
Expand All @@ -591,12 +574,7 @@ async fn process_webhook_job<W: WebhookJob>(

metrics::counter!("webhook_jobs_failed", &labels).increment(1);

Ok(WebhookResult::Failed(WebhookFailed {
error: e.to_string(),
status_code: None,
duration: now.elapsed(),
body: None,
}))
Ok(WebhookResult::Error(e.to_string()))
}
Err(WebhookError::Parse(WebhookParseError::ParseUrlError(e))) => {
webhook_job
Expand All @@ -609,12 +587,7 @@ async fn process_webhook_job<W: WebhookJob>(

metrics::counter!("webhook_jobs_failed", &labels).increment(1);

Ok(WebhookResult::Failed(WebhookFailed {
error: e.to_string(),
status_code: None,
duration: now.elapsed(),
body: None,
}))
Ok(WebhookResult::Error(e.to_string()))
}
Err(WebhookError::Request(request_error)) => {
let webhook_job_error = WebhookJobError::from(&request_error);
Expand Down Expand Up @@ -652,12 +625,14 @@ async fn process_webhook_job<W: WebhookJob>(

metrics::counter!("webhook_jobs_failed", &labels).increment(1);

Ok(WebhookResult::Failed(WebhookFailed {
error: error.to_string(),
status_code: None,
duration: now.elapsed(),
body: None,
}))
match error.status() {
Some(status) => Ok(WebhookResult::BadResponse(WebhookResponse {
duration: now.elapsed(),
status_code: status,
body: None,
})),
None => Ok(WebhookResult::Error(error.to_string())),
}
}
Err(RetryError::DatabaseError(job_error)) => {
metrics::counter!("webhook_jobs_database_error", &labels).increment(1);
Expand All @@ -676,12 +651,14 @@ async fn process_webhook_job<W: WebhookJob>(

metrics::counter!("webhook_jobs_failed", &labels).increment(1);

Ok(WebhookResult::Failed(WebhookFailed {
error: error.to_string(),
status_code: None,
duration: now.elapsed(),
body: None,
}))
match error.status() {
Some(status) => Ok(WebhookResult::BadResponse(WebhookResponse {
duration: now.elapsed(),
status_code: status,
body: None,
})),
None => Ok(WebhookResult::Error(error.to_string())),
}
}
}
}
Expand Down Expand Up @@ -948,7 +925,7 @@ mod tests {
}

#[sqlx::test(migrations = "../migrations")]
async fn test_hoghook_sends_kafka_payload(db: PgPool) {
async fn test_hoghook_sends_kafka_payload_for_success(db: PgPool) {
use httpmock::prelude::*;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::{ClientConfig, Message};
Expand All @@ -960,33 +937,122 @@ mod tests {

let server = MockServer::start();

let registry = HealthRegistry::new("liveness");
let liveness = registry
.register("worker".to_string(), ::time::Duration::seconds(30))
.await;

let (mock_cluster, mock_producer) = create_mock_kafka().await;
let hog_mode = true;
let worker = WebhookWorker::new(
&worker_id,
&queue,
1,
time::Duration::from_millis(100),
time::Duration::from_millis(5000),
10,
RetryPolicy::default(),
false,
mock_producer,
topic.to_string(),
hog_mode,
liveness,
);

// Enqueue and run a successful job.

server.mock(|when, then| {
when.method(POST).path("/");
when.method(POST).path("/200");
then.status(200)
.header("content-type", "application/json; charset=UTF-8")
.body(r#"{"message": "hello, world"}"#);
});

let mock_url = server.url("/");

let webhook_job_parameters = WebhookJobParameters {
let success_webhook_job_parameters = WebhookJobParameters {
body: "".to_owned(),
headers: collections::HashMap::new(),
method: HttpMethod::POST,
url: mock_url,
url: server.url("/200"),
};

let webhook_job_metadata = json!({"someOtherField": true});

enqueue_job(
&queue,
1,
webhook_job_parameters.clone(),
serde_json::to_value(webhook_job_metadata).unwrap(),
success_webhook_job_parameters.clone(),
serde_json::to_value(json!({"someOtherField": true})).unwrap(),
)
.await
.expect("failed to enqueue job");

let batch = worker.wait_for_jobs_tx().await;

process_batch(
batch,
worker.http_client.clone(),
worker.retry_policy.clone(),
worker.kafka_producer.clone(),
worker.cdp_function_callbacks_topic,
hog_mode,
)
.await;

let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", mock_cluster.bootstrap_servers())
.set("group.id", "mock")
.set("auto.offset.reset", "earliest")
.create()
.expect("failed to create mock consumer");
consumer.subscribe(&[topic]).unwrap();

let kafka_msg = consumer.recv().await.unwrap();
let kafka_payload_str = String::from_utf8(kafka_msg.payload().unwrap().to_vec()).unwrap();

let received = serde_json::from_str::<Value>(&kafka_payload_str).unwrap();

// Verify data is passed through, and that response and timings are correct.
assert!(received.get("someOtherField").unwrap().as_bool().unwrap());

let async_function_response = received.get("asyncFunctionResponse").unwrap();
let received_response = async_function_response.get("response").unwrap();
assert_eq!(
json!({
"body": "{\"message\": \"hello, world\"}",
"status": 200
}),
*received_response
);

let first_timing = async_function_response
.get("timings")
.unwrap()
.as_array()
.unwrap()
.get(0)
.unwrap();
first_timing
.get("duration_ms")
.unwrap()
.as_number()
.unwrap();
assert_eq!(
"async_function",
first_timing.get("kind").unwrap().as_str().unwrap()
);
}

#[sqlx::test(migrations = "../migrations")]
async fn test_hoghook_sends_kafka_payload_for_bad_response(db: PgPool) {
use httpmock::prelude::*;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::{ClientConfig, Message};

let worker_id = worker_id();
let queue_name = "test_hoghook_sends_kafka_payload".to_string();
let queue = PgQueue::new_from_pool(&queue_name, db).await;
let topic = "cdp_function_callbacks";

let server = MockServer::start();

let registry = HealthRegistry::new("liveness");
let liveness = registry
.register("worker".to_string(), ::time::Duration::seconds(30))
Expand All @@ -1009,13 +1075,38 @@ mod tests {
liveness,
);

// Enqueue and run a job that returns a bad HTTP response.

server.mock(|when, then| {
when.method(POST).path("/500");
then.status(500)
.header("content-type", "application/json; charset=UTF-8")
.body(r#"{"message": "bad response"}"#);
});

let bad_webhook_job_parameters = WebhookJobParameters {
body: "".to_owned(),
headers: collections::HashMap::new(),
method: HttpMethod::POST,
url: server.url("/500"),
};

enqueue_job(
&queue,
1,
bad_webhook_job_parameters.clone(),
serde_json::to_value(json!({"someOtherField": true})).unwrap(),
)
.await
.expect("failed to enqueue job");

let batch = worker.wait_for_jobs_tx().await;

process_batch(
batch,
worker.http_client,
worker.retry_policy,
worker.kafka_producer,
worker.http_client.clone(),
worker.retry_policy.clone(),
worker.kafka_producer.clone(),
worker.cdp_function_callbacks_topic,
hog_mode,
)
Expand All @@ -1041,8 +1132,8 @@ mod tests {
let received_response = async_function_response.get("response").unwrap();
assert_eq!(
json!({
"body": "{\"message\": \"hello, world\"}",
"status": 200
"body": None::<String>, // TODO: We should still return the response.
"status": 500
}),
*received_response
);
Expand Down

0 comments on commit 9daacb1

Please sign in to comment.