From de8d3b689e65f22ebd40d320f604d409507df591 Mon Sep 17 00:00:00 2001 From: Edmund Kump Date: Wed, 29 May 2024 10:25:35 -0400 Subject: [PATCH 1/2] Add retry logic to trace_utils::SendData (#433) Add retry logic to trace_utils::SendData RetryStrategy is added to trace_utils and is enabled by default with a strategy of 100ms delay, exponential backoff, 5 retries, and no jitter. SendData.set_retry_strategy() has been added to allow overriding these settings. The Bytes crate has been added to trace_utils and is used to do a shallow copy of the payload for each retry attempt instead of a full clone of the data. Some minor code reorg of SendData has been done to support retry logic, but the implementation has been left mostly untouched. --- Cargo.lock | 3 + sidecar/src/service/tracing/trace_flusher.rs | 22 +- trace-utils/Cargo.toml | 7 +- trace-utils/src/send_data.rs | 711 +++++++++++++++++-- 4 files changed, 663 insertions(+), 80 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1821a903f..81f100688 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1414,15 +1414,18 @@ name = "datadog-trace-utils" version = "9.0.0" dependencies = [ "anyhow", + "bytes", "datadog-trace-normalization", "datadog-trace-protobuf", "ddcommon", "flate2", "futures", + "httpmock", "hyper", "hyper-rustls", "log", "prost 0.11.9", + "rand", "rmp-serde", "serde", "serde_json", diff --git a/sidecar/src/service/tracing/trace_flusher.rs b/sidecar/src/service/tracing/trace_flusher.rs index fa9058386..df365592b 100644 --- a/sidecar/src/service/tracing/trace_flusher.rs +++ b/sidecar/src/service/tracing/trace_flusher.rs @@ -306,11 +306,11 @@ mod tests { use std::sync::Arc; // This function will poll the mock server for "hits" until the expected number of hits is - // observed. In its current form it may not correctly report if more than the asserted number of - // hits occurred. More attempts at lower sleep intervals is preferred to reduce flakiness and - // test runtime. + // observed. Then it will delete the mock. In its current form it may not correctly report if + // more than the asserted number of hits occurred. More attempts at lower sleep intervals is + // preferred to reduce flakiness and test runtime. async fn poll_for_mock_hit( - mock: &Mock<'_>, + mock: &mut Mock<'_>, poll_attempts: i32, sleep_interval_ms: u64, expected_hits: usize, @@ -360,7 +360,7 @@ mod tests { let server = MockServer::start(); - let mock = server + let mut mock = server .mock_async(|_when, then| { then.status(202) .header("content-type", "application/json") @@ -386,12 +386,12 @@ mod tests { trace_flusher.enqueue(send_data_1); trace_flusher.enqueue(send_data_2); - assert!(poll_for_mock_hit(&mock, 10, 150, 0).await); + assert!(poll_for_mock_hit(&mut mock, 10, 150, 0).await); // enqueue a trace that exceeds the min force flush size trace_flusher.enqueue(send_data_3); - assert!(poll_for_mock_hit(&mock, 5, 250, 1).await); + assert!(poll_for_mock_hit(&mut mock, 25, 100, 1).await); } #[cfg_attr(miri, ignore)] @@ -403,7 +403,7 @@ mod tests { ..TraceFlusher::default() }); let server = MockServer::start(); - let mock = server + let mut mock = server .mock_async(|_when, then| { then.status(202) .header("content-type", "application/json") @@ -427,7 +427,7 @@ mod tests { trace_flusher.interval_ms.load(Ordering::Relaxed) + 1, )) .await; - assert!(poll_for_mock_hit(&mock, 25, 100, 1).await); + assert!(poll_for_mock_hit(&mut mock, 25, 100, 1).await); } #[cfg_attr(miri, ignore)] @@ -439,7 +439,7 @@ mod tests { ..TraceFlusher::default() }); let server = MockServer::start(); - let mock = server + let mut mock = server .mock_async(|_when, then| { then.status(202) .header("content-type", "application/json") @@ -459,6 +459,6 @@ mod tests { trace_flusher.enqueue(send_data_1); - assert!(poll_for_mock_hit(&mock, 5, 250, 0).await); + assert!(poll_for_mock_hit(&mut mock, 5, 250, 0).await); } } diff --git a/trace-utils/Cargo.toml b/trace-utils/Cargo.toml index 96c2dfb85..f3c8f372b 100644 --- a/trace-utils/Cargo.toml +++ b/trace-utils/Cargo.toml @@ -20,7 +20,10 @@ futures = { version = "0.3", default-features = false } ddcommon = { path = "../ddcommon" } datadog-trace-protobuf = { path = "../trace-protobuf" } datadog-trace-normalization = { path = "../trace-normalization" } +tokio = { version = "1", features = ["macros", "rt-multi-thread"] } +rand = "0.8.5" +bytes = "1.6.0" [dev-dependencies] -tokio = { version = "1", features = ["macros", "rt-multi-thread"] } -serde_json = "1.0" \ No newline at end of file +httpmock = "0.7.0" +serde_json = "1.0" diff --git a/trace-utils/src/send_data.rs b/trace-utils/src/send_data.rs index fb6b8dc33..eff43dbe8 100644 --- a/trace-utils/src/send_data.rs +++ b/trace-utils/src/send_data.rs @@ -2,10 +2,13 @@ // SPDX-License-Identifier: Apache-2.0 use anyhow::{anyhow, Context}; +use bytes::Bytes; use futures::stream::FuturesUnordered; use futures::StreamExt; use hyper::{Body, Client, Method, Response, StatusCode}; use std::collections::HashMap; +use std::time::Duration; +use tokio::time::sleep; use crate::tracer_header_tags::TracerHeaderTags; use datadog_trace_protobuf::pb; @@ -107,12 +110,94 @@ where Ok(buf) } +/// Enum representing the type of backoff to use for the delay between retries. +/// +/// ``` +#[derive(Debug, Clone)] +pub enum RetryBackoffType { + /// Increases the delay by a fixed increment each attempt. + Linear, + /// The delay is constant for each attempt. + Constant, + /// The delay is doubled for each attempt. + Exponential, +} + +// TODO: APMSP-1076 - RetryStrategy should be moved to a separate file when send_data is refactored. +/// Struct representing the retry strategy for sending data. +/// +/// This struct contains the parameters that define how retries should be handled when sending data. +/// It includes the maximum number of retries, the delay between retries, the type of backoff to +/// use, and an optional jitter to add randomness to the delay. +/// +/// # Examples +/// +/// ```rust +/// use datadog_trace_utils::send_data::{RetryBackoffType, RetryStrategy}; +/// use std::time::Duration; +/// +/// let retry_strategy = RetryStrategy { +/// max_retries: 5, +/// delay_ms: Duration::from_millis(100), +/// backoff_type: RetryBackoffType::Exponential, +/// jitter: Some(Duration::from_millis(50)), +/// }; +/// ``` +#[derive(Debug, Clone)] +pub struct RetryStrategy { + /// The maximum number of retries to attempt. + pub max_retries: u32, + /// The minimum delay between retries. + pub delay_ms: Duration, + /// The type of backoff to use for the delay between retries. + pub backoff_type: RetryBackoffType, + /// An optional jitter to add randomness to the delay. + pub jitter: Option, +} + +impl Default for RetryStrategy { + fn default() -> Self { + RetryStrategy { + max_retries: 5, + delay_ms: Duration::from_millis(100), + backoff_type: RetryBackoffType::Exponential, + jitter: None, + } + } +} + +impl RetryStrategy { + /// Delays the next request attempt based on the retry strategy. + /// + /// If a jitter duration is specified in the retry strategy, a random duration up to the jitter + /// value is added to the delay. + /// + /// # Arguments + /// + /// * `attempt`: The number of the current attempt (1-indexed). + pub(crate) async fn delay(&self, attempt: u32) { + let delay = match self.backoff_type { + RetryBackoffType::Exponential => self.delay_ms * 2u32.pow(attempt - 1), + RetryBackoffType::Constant => self.delay_ms, + RetryBackoffType::Linear => self.delay_ms + (self.delay_ms * (attempt - 1)), + }; + + if let Some(jitter) = self.jitter { + let jitter = rand::random::() % jitter.as_millis() as u64; + sleep(delay + Duration::from_millis(jitter)).await; + } else { + sleep(delay).await; + } + } +} + #[derive(Debug, Clone)] pub struct SendData { pub tracer_payloads: Vec, pub size: usize, // have a rough size estimate to force flushing if it's large pub target: Endpoint, headers: HashMap<&'static str, String>, + retry_strategy: RetryStrategy, } impl SendData { @@ -133,14 +218,76 @@ impl SendData { size, target: target.clone(), headers, + retry_strategy: RetryStrategy::default(), } } - pub async fn send<'a>(self) -> SendDataResult { - let target = &self.target; + /// Overrides the default RetryStrategy with user-defined values. + /// + /// # Arguments + /// + /// * `retry_strategy`: The new retry strategy to be used. + pub fn set_retry_strategy(&mut self, retry_strategy: RetryStrategy) { + self.retry_strategy = retry_strategy; + } + pub async fn send(self) -> SendDataResult { + if self.use_protobuf() { + self.send_with_protobuf().await + } else { + self.send_with_msgpack().await + } + } + + // This function wraps send_data with a retry strategy and the building of the request. + // Hyper doesn't allow you to send a ref to a request, and you can't clone it. So we have to + // build a new one for every send attempt. + async fn send_payload( + &self, + content_type: &'static str, + payload: Vec, + ) -> Result, SendRequestError> { + let mut request_attempt = 0; + let payload = Bytes::from(payload); + loop { + request_attempt += 1; + let mut req = self.create_request_builder(); + req = req.header("Content-type", content_type); + + let result = self.send_request(req, payload.clone()).await; + + // If the request was successful, or if we have exhausted retries then return the + // result. Otherwise, delay and try again. + match &result { + Ok(response) => { + if response.status().is_client_error() || response.status().is_server_error() { + if request_attempt >= self.retry_strategy.max_retries { + return result; + } else { + self.retry_strategy.delay(request_attempt).await; + } + } else { + return result; + } + } + Err(_) => { + if request_attempt >= self.retry_strategy.max_retries { + return result; + } else { + self.retry_strategy.delay(request_attempt).await; + } + } + } + } + } + + fn use_protobuf(&self) -> bool { + self.target.api_key.is_some() + } + + fn create_request_builder(&self) -> HttpRequestBuilder { let mut req = hyper::Request::builder() - .uri(target.url.clone()) + .uri(self.target.url.clone()) .header( hyper::header::USER_AGENT, concat!("Tracer/", env!("CARGO_PKG_VERSION")), @@ -151,79 +298,509 @@ impl SendData { req = req.header(*key, value); } - async fn send_request( - req: HttpRequestBuilder, - payload: Vec, - ) -> Result, SendRequestError> { - let req = req - .body(Body::from(payload)) - .map_err(|e| SendRequestError::Any(anyhow!(e)))?; - - Client::builder() - .build(connector::Connector::default()) - .request(req) - .await - .map_err(SendRequestError::Hyper) - } + req + } + + async fn send_request( + &self, + req: HttpRequestBuilder, + payload: Bytes, + ) -> Result, SendRequestError> { + let req = req + .body(Body::from(payload)) + .map_err(|e| SendRequestError::Any(anyhow!(e)))?; + + Client::builder() + .build(connector::Connector::default()) + .request(req) + .await + .map_err(SendRequestError::Hyper) + } + + async fn send_with_protobuf(&self) -> SendDataResult { + let mut result = SendDataResult::new(); + + let agent_payload = construct_agent_payload(self.tracer_payloads.clone()); + let serialized_trace_payload = match serialize_proto_payload(&agent_payload) + .context("Failed to serialize trace agent payload, dropping traces") + { + Ok(p) => p, + Err(e) => return result.error(e), + }; + + result + .update( + self.send_payload("application/x-protobuf", serialized_trace_payload) + .await, + StatusCode::ACCEPTED, + ) + .await; + + result + } + async fn send_with_msgpack(&self) -> SendDataResult { let mut result = SendDataResult::new(); - if target.api_key.is_some() { - req = req.header("Content-type", "application/x-protobuf"); + let mut req = self.create_request_builder(); + req = req.header("Content-type", "application/msgpack"); - let agent_payload = construct_agent_payload(self.tracer_payloads); - let serialized_trace_payload = match serialize_proto_payload(&agent_payload) - .context("Failed to serialize trace agent payload, dropping traces") - { + let (template, _) = req.body(()).unwrap().into_parts(); + + let mut futures = FuturesUnordered::new(); + for tracer_payload in self.tracer_payloads.iter() { + let mut builder = HttpRequestBuilder::new() + .method(template.method.clone()) + .uri(template.uri.clone()) + .version(template.version) + .header( + "X-Datadog-Trace-Count", + tracer_payload.chunks.len().to_string(), + ); + builder + .headers_mut() + .unwrap() + .extend(template.headers.clone()); + + let payload = match rmp_serde::to_vec_named(&tracer_payload) { Ok(p) => p, - Err(e) => return result.error(e), + Err(e) => return result.error(anyhow!(e)), }; - - result - .update( - send_request(req, serialized_trace_payload).await, - StatusCode::ACCEPTED, - ) - .await; - result - } else { - req = req.header("Content-type", "application/msgpack"); - - let (template, _) = req.body(()).unwrap().into_parts(); - - let mut futures = FuturesUnordered::new(); - for tracer_payload in self.tracer_payloads.into_iter() { - let mut builder = HttpRequestBuilder::new() - .method(template.method.clone()) - .uri(template.uri.clone()) - .version(template.version) - .header( - "X-Datadog-Trace-Count", - tracer_payload.chunks.len().to_string(), - ); - builder - .headers_mut() - .unwrap() - .extend(template.headers.clone()); - - let payload = match rmp_serde::to_vec_named(&tracer_payload) { - Ok(p) => p, - Err(e) => return result.error(anyhow!(e)), - }; - - futures.push(send_request(builder, payload)); - } - loop { - match futures.next().await { - Some(response) => { - result.update(response, StatusCode::OK).await; - if result.last_result.is_err() { - return result; - } + futures.push(self.send_payload("application/msgpack", payload)); + } + loop { + match futures.next().await { + Some(response) => { + result.update(response, StatusCode::OK).await; + if result.last_result.is_err() { + return result; } - None => return result, } + None => return result, + } + } + } +} + +#[cfg(test)] +// For RetryStrategy tests the observed delay should be approximate. +// There may be a small amount of overhead, so we check that the elapsed time is within +// a tolerance of the expected delay. +// TODO: APMSP-1079 - We should have more comprehensive tests for SendData logic beyond retry logic. +mod tests { + use super::*; + use httpmock::{Mock, MockServer}; + use std::time::Duration; + use tokio::time::Instant; + + const RETRY_STRATEGY_TIME_TOLERANCE_MS: u64 = 25; + + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn test_retry_strategy_constant() { + let retry_strategy = RetryStrategy { + max_retries: 5, + delay_ms: Duration::from_millis(100), + backoff_type: RetryBackoffType::Constant, + jitter: None, + }; + + let start = Instant::now(); + retry_strategy.delay(1).await; + let elapsed = start.elapsed(); + + assert!( + elapsed >= retry_strategy.delay_ms + && elapsed + <= retry_strategy.delay_ms + + Duration::from_millis(RETRY_STRATEGY_TIME_TOLERANCE_MS), + "Elapsed time was not within expected range" + ); + + let start = Instant::now(); + retry_strategy.delay(2).await; + let elapsed = start.elapsed(); + + assert!( + elapsed >= retry_strategy.delay_ms + && elapsed + <= retry_strategy.delay_ms + + Duration::from_millis(RETRY_STRATEGY_TIME_TOLERANCE_MS), + "Elapsed time was not within expected range" + ); + } + + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn test_retry_strategy_linear() { + let retry_strategy = RetryStrategy { + max_retries: 5, + delay_ms: Duration::from_millis(100), + backoff_type: RetryBackoffType::Linear, + jitter: None, + }; + + let start = Instant::now(); + retry_strategy.delay(1).await; + let elapsed = start.elapsed(); + + assert!( + elapsed >= retry_strategy.delay_ms + && elapsed + <= retry_strategy.delay_ms + + Duration::from_millis(RETRY_STRATEGY_TIME_TOLERANCE_MS), + "Elapsed time was not within expected range" + ); + + let start = Instant::now(); + retry_strategy.delay(3).await; + let elapsed = start.elapsed(); + + // For the Linear strategy, the delay for the 3rd attempt should be delay_ms + (delay_ms * + // 2). + assert!( + elapsed >= retry_strategy.delay_ms + (retry_strategy.delay_ms * 2) + && elapsed + <= retry_strategy.delay_ms + + (retry_strategy.delay_ms * 2) + + Duration::from_millis(RETRY_STRATEGY_TIME_TOLERANCE_MS), + "Elapsed time was not within expected range" + ); + } + + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn test_retry_strategy_exponential() { + let retry_strategy = RetryStrategy { + max_retries: 5, + delay_ms: Duration::from_millis(100), + backoff_type: RetryBackoffType::Exponential, + jitter: None, + }; + + let start = Instant::now(); + retry_strategy.delay(1).await; + let elapsed = start.elapsed(); + + assert!( + elapsed >= retry_strategy.delay_ms + && elapsed + <= retry_strategy.delay_ms + + Duration::from_millis(RETRY_STRATEGY_TIME_TOLERANCE_MS), + "Elapsed time was not within expected range" + ); + + let start = Instant::now(); + retry_strategy.delay(3).await; + let elapsed = start.elapsed(); + // For the Exponential strategy, the delay for the 3rd attempt should be delay_ms * 2^(3-1) + // = delay_ms * 4. + assert!( + elapsed >= retry_strategy.delay_ms * 4 + && elapsed + <= retry_strategy.delay_ms * 4 + + Duration::from_millis(RETRY_STRATEGY_TIME_TOLERANCE_MS), + "Elapsed time was not within expected range" + ); + } + + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn test_retry_strategy_jitter() { + let retry_strategy = RetryStrategy { + max_retries: 5, + delay_ms: Duration::from_millis(100), + backoff_type: RetryBackoffType::Constant, + jitter: Some(Duration::from_millis(50)), + }; + + let start = Instant::now(); + retry_strategy.delay(1).await; + let elapsed = start.elapsed(); + + // The delay should be between delay_ms and delay_ms + jitter + assert!( + elapsed >= retry_strategy.delay_ms + && elapsed + <= retry_strategy.delay_ms + + retry_strategy.jitter.unwrap() + + Duration::from_millis(RETRY_STRATEGY_TIME_TOLERANCE_MS), + "Elapsed time was not within expected range" + ); + } + + // TODO: APMSP-1153 - This function also exists in + // sidecar::service::tracing::trace_flusher::tests. It should be moved to a common + // trace_test_utils module when it is properly gated to just test dependency. + async fn poll_for_mock_hit( + mock: &mut Mock<'_>, + poll_attempts: i32, + sleep_interval_ms: u64, + expected_hits: usize, + delete_after_hit: bool, + ) -> bool { + let mut mock_hit = mock.hits_async().await == expected_hits; + + let mut mock_observations_remaining = poll_attempts; + + while !mock_hit { + sleep(Duration::from_millis(sleep_interval_ms)).await; + mock_hit = mock.hits_async().await == expected_hits; + mock_observations_remaining -= 1; + if mock_observations_remaining == 0 || mock_hit { + if delete_after_hit { + mock.delete(); + } + break; } } + + mock_hit + } + + // TODO: APMSP-1153 - This function also exists in + // sidecar::service::tracing::trace_flusher::tests. It should be moved to a common + // trace_test_utils module when it is properly gated to just test dependency. + fn create_send_data(size: usize, target_endpoint: &Endpoint) -> SendData { + let tracer_header_tags = TracerHeaderTags::default(); + + let tracer_payload = pb::TracerPayload { + container_id: "container_id_1".to_owned(), + language_name: "php".to_owned(), + language_version: "4.0".to_owned(), + tracer_version: "1.1".to_owned(), + runtime_id: "runtime_1".to_owned(), + chunks: vec![], + tags: Default::default(), + env: "test".to_owned(), + hostname: "test_host".to_owned(), + app_version: "2.0".to_owned(), + }; + + SendData::new(size, tracer_payload, tracer_header_tags, target_endpoint) + } + + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn test_zero_retries_on_error() { + let server = MockServer::start(); + + let mut mock_503 = server + .mock_async(|_when, then| { + then.status(503) + .header("content-type", "application/json") + .body(r#"{"status":"error"}"#); + }) + .await; + + // We add this mock so that if a second request was made it would be a success and our + // assertion below that last_result is an error would fail. + let _mock_202 = server + .mock_async(|_when, then| { + then.status(202) + .header("content-type", "application/json") + .body(r#"{"status":"ok"}"#); + }) + .await; + + let target_endpoint = Endpoint { + url: server.url("").to_owned().parse().unwrap(), + api_key: Some("test-key".into()), + }; + + let size = 512; + + let mut send_data = create_send_data(size, &target_endpoint); + send_data.set_retry_strategy(RetryStrategy { + max_retries: 0, + delay_ms: Duration::from_millis(2), + backoff_type: RetryBackoffType::Constant, + jitter: None, + }); + + tokio::spawn(async move { + let result = send_data.send().await; + assert!(result.last_result.is_err(), "Expected an error result"); + }); + + assert!(poll_for_mock_hit(&mut mock_503, 10, 100, 1, true).await); + } + + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn test_retry_logic_error_then_success() { + let server = MockServer::start(); + + let mut mock_503 = server + .mock_async(|_when, then| { + then.status(503) + .header("content-type", "application/json") + .body(r#"{"status":"error"}"#); + }) + .await; + + let mut mock_202 = server + .mock_async(|_when, then| { + then.status(202) + .header("content-type", "application/json") + .body(r#"{"status":"ok"}"#); + }) + .await; + + let target_endpoint = Endpoint { + url: server.url("").to_owned().parse().unwrap(), + api_key: Some("test-key".into()), + }; + + let size = 512; + + let mut send_data = create_send_data(size, &target_endpoint); + send_data.set_retry_strategy(RetryStrategy { + max_retries: 2, + delay_ms: Duration::from_millis(250), + backoff_type: RetryBackoffType::Constant, + jitter: None, + }); + + tokio::spawn(async move { + let result = send_data.send().await; + assert!(result.last_result.is_ok(), "Expected a successful result"); + }); + + assert!(poll_for_mock_hit(&mut mock_503, 10, 100, 1, true).await); + assert!( + poll_for_mock_hit(&mut mock_202, 10, 100, 1, true).await, + "Expected a retry request after a 5xx error" + ); + } + + #[cfg_attr(miri, ignore)] + #[tokio::test] + // Ensure at least one test exists for msgpack with retry logic + async fn test_retry_logic_error_then_success_msgpack() { + let server = MockServer::start(); + + let mut mock_503 = server + .mock_async(|_when, then| { + then.status(503) + .header("content-type", "application/json") + .body(r#"{"status":"error"}"#); + }) + .await; + + let mut mock_202 = server + .mock_async(|_when, then| { + then.status(202) + .header("content-type", "application/json") + .body(r#"{"status":"ok"}"#); + }) + .await; + + let target_endpoint = Endpoint { + url: server.url("").to_owned().parse().unwrap(), + api_key: None, + }; + + let size = 512; + + let mut send_data = create_send_data(size, &target_endpoint); + send_data.set_retry_strategy(RetryStrategy { + max_retries: 2, + delay_ms: Duration::from_millis(250), + backoff_type: RetryBackoffType::Constant, + jitter: None, + }); + + tokio::spawn(async move { + let result = send_data.send().await; + assert!(result.last_result.is_ok(), "Expected a successful result"); + }); + + assert!(poll_for_mock_hit(&mut mock_503, 10, 100, 1, true).await); + assert!( + poll_for_mock_hit(&mut mock_202, 10, 100, 1, true).await, + "Expected a retry request after a 5xx error" + ); + } + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn test_retry_logic_max_errors() { + let server = MockServer::start(); + let expected_retry_attempts = 3; + let mut mock_503 = server + .mock_async(|_when, then| { + then.status(503) + .header("content-type", "application/json") + .body(r#"{"status":"error"}"#); + }) + .await; + + let target_endpoint = Endpoint { + url: server.url("").to_owned().parse().unwrap(), + api_key: Some("test-key".into()), + }; + + let size = 512; + + let mut send_data = create_send_data(size, &target_endpoint); + send_data.set_retry_strategy(RetryStrategy { + max_retries: expected_retry_attempts, + delay_ms: Duration::from_millis(10), + backoff_type: RetryBackoffType::Constant, + jitter: None, + }); + + tokio::spawn(async move { + send_data.send().await; + }); + + assert!( + poll_for_mock_hit( + &mut mock_503, + 10, + 100, + expected_retry_attempts as usize, + true + ) + .await, + "Expected max retry attempts" + ); + } + + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn test_retry_logic_no_errors() { + let server = MockServer::start(); + let mut mock_202 = server + .mock_async(|_when, then| { + then.status(202) + .header("content-type", "application/json") + .body(r#"{"status":"Ok"}"#); + }) + .await; + + let target_endpoint = Endpoint { + url: server.url("").to_owned().parse().unwrap(), + api_key: Some("test-key".into()), + }; + + let size = 512; + + let mut send_data = create_send_data(size, &target_endpoint); + send_data.set_retry_strategy(RetryStrategy { + max_retries: 2, + delay_ms: Duration::from_millis(10), + backoff_type: RetryBackoffType::Constant, + jitter: None, + }); + + tokio::spawn(async move { + send_data.send().await; + }); + + assert!( + poll_for_mock_hit(&mut mock_202, 10, 250, 1, true).await, + "Expected only one request attempt" + ); } } From 5128ad691f4fed5da9e8a955475e1945aed8fabd Mon Sep 17 00:00:00 2001 From: Tae Gyun Kim Date: Wed, 29 May 2024 14:32:28 -0400 Subject: [PATCH 2/2] use matrix --- .github/workflows/fuzz.yml | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/.github/workflows/fuzz.yml b/.github/workflows/fuzz.yml index 75c849fa2..dbc3ae8cf 100644 --- a/.github/workflows/fuzz.yml +++ b/.github/workflows/fuzz.yml @@ -5,6 +5,9 @@ on: jobs: run-fuzz: runs-on: ubuntu-latest + strategy: + matrix: + directory: [alloc, profiling] env: CARGO_TERM_COLOR: always steps: @@ -20,20 +23,16 @@ jobs: tool: cargo-bolero - run: | set -e - DIRS="alloc profiling" - for dir in $DIRS; + # cargo bolero list outputs {"package":"package-name","test":"test-name"} + pushd ${{ matrix.directory }} + cargo bolero list | \ + # And the following command will parse package-name's and test-name's one in each line + grep -oP '"(package|test)"\s*:\s*"\K[^"]+' | \ + # awk will stitch package and test names back separated by a tab + awk 'NR%2{printf "%s\t", $0; next}1' | \ + while read -r package test; do - # cargo bolero list outputs {"package":"package-name","test":"test-name"} - pushd $dir - cargo bolero list | \ - # And the following command will parse package-name's and test-name's one in each line - grep -oP '"(package|test)"\s*:\s*"\K[^"]+' | \ - # awk will stitch package and test names back separated by a tab - awk 'NR%2{printf "%s\t", $0; next}1' | \ - while read -r package test; - do - echo "****** Starting bolero test for $package $test ******" 1>&2 - cargo bolero test -T 1min --package $package $test - done - popd - done \ No newline at end of file + echo "****** Starting bolero test for $package $test ******" 1>&2 + cargo bolero test -T 1min --package $package $test + done + popd