Skip to content

Commit

Permalink
Load shed control plane and metastore (quickwit-oss#4859)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Apr 19, 2024
1 parent 45bb963 commit 1077acb
Show file tree
Hide file tree
Showing 34 changed files with 485 additions and 84 deletions.
2 changes: 1 addition & 1 deletion quickwit/quickwit-codegen/example/src/codegen/hello.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions quickwit/quickwit-codegen/example/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ pub enum HelloError {
InvalidArgument(String),
#[error("request timed out: {0}")]
Timeout(String),
#[error("too many requests")]
TooManyRequests,
#[error("service unavailable: {0}")]
Unavailable(String),
}
Expand All @@ -44,6 +46,7 @@ impl ServiceError for HelloError {
Self::Internal(_) => ServiceErrorCode::Internal,
Self::InvalidArgument(_) => ServiceErrorCode::BadRequest,
Self::Timeout(_) => ServiceErrorCode::Timeout,
Self::TooManyRequests => ServiceErrorCode::TooManyRequests,
Self::Unavailable(_) => ServiceErrorCode::Unavailable,
}
}
Expand All @@ -58,6 +61,10 @@ impl GrpcServiceError for HelloError {
Self::Timeout(message)
}

fn new_too_many_requests() -> Self {
Self::TooManyRequests
}

fn new_unavailable(message: String) -> Self {
Self::Unavailable(message)
}
Expand Down
103 changes: 103 additions & 0 deletions quickwit/quickwit-codegen/example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ impl Hello for HelloImpl {

#[cfg(test)]
mod tests {
use std::fmt;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::atomic::Ordering;
Expand Down Expand Up @@ -538,6 +539,108 @@ mod tests {
assert_eq!(ping_layer.counter.load(Ordering::Relaxed), 1);
}

#[tokio::test]
async fn test_hello_codegen_tower_stack_layer_ordering() {
trait AppendSuffix {
fn append_suffix(&mut self, suffix: &'static str);
}

impl AppendSuffix for HelloRequest {
fn append_suffix(&mut self, suffix: &'static str) {
self.name.push_str(suffix);
}
}

impl AppendSuffix for GoodbyeRequest {
fn append_suffix(&mut self, suffix: &'static str) {
self.name.push_str(suffix);
}
}

impl AppendSuffix for PingRequest {
fn append_suffix(&mut self, suffix: &'static str) {
self.name.push_str(suffix);
}
}

impl AppendSuffix for ServiceStream<PingRequest> {
fn append_suffix(&mut self, _suffix: &'static str) {}
}

#[derive(Debug, Clone)]
struct AppendSuffixService<S> {
inner: S,
suffix: &'static str,
}

impl<S, R> Service<R> for AppendSuffixService<S>
where
S: Service<R, Error = HelloError>,
S::Response: fmt::Debug,
S::Future: Send + 'static,
R: AppendSuffix,
{
type Response = S::Response;
type Error = HelloError;
type Future = BoxFuture<S::Response, S::Error>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, mut req: R) -> Self::Future {
req.append_suffix(self.suffix);
let inner = self.inner.call(req);
Box::pin(inner)
}
}

#[derive(Debug, Clone)]
struct AppendSuffixLayer {
suffix: &'static str,
}

impl AppendSuffixLayer {
fn new(suffix: &'static str) -> Self {
Self { suffix }
}
}

impl<S> Layer<S> for AppendSuffixLayer {
type Service = AppendSuffixService<S>;

fn layer(&self, inner: S) -> Self::Service {
AppendSuffixService {
inner,
suffix: self.suffix,
}
}
}
let mut hello_tower = HelloClient::tower()
.stack_layer(AppendSuffixLayer::new("->foo"))
.stack_hello_layer(AppendSuffixLayer::new("->bar"))
.stack_layer(AppendSuffixLayer::new("->qux"))
.stack_hello_layer(AppendSuffixLayer::new("->tox"))
.stack_goodbye_layer(AppendSuffixLayer::new("->moo"))
.build(HelloImpl::default());

let response = hello_tower
.hello(HelloRequest {
name: "".to_string(),
})
.await
.unwrap();
assert_eq!(response.message, "Hello, ->foo->bar->qux->tox!");

let response = hello_tower
.goodbye(GoodbyeRequest {
name: "".to_string(),
})
.await
.unwrap();
assert_eq!(response.message, "Goodbye, ->foo->qux->moo!");
}

#[tokio::test]
async fn test_from_channel() {
let balance_channed = BalanceChannel::from_channel(
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-codegen/src/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,8 +524,8 @@ fn generate_client(context: &CodegenContext) -> TokenStream {
let mock_name = &context.mock_name;
let mock_wrapper_name = quote::format_ident!("{}Wrapper", mock_name);
let error_mesage = format!(
"`{}` must be wrapped in a `{}`. Use `{}::from(mock)` to instantiate the client.",
mock_name, mock_wrapper_name, mock_name
"`{}` must be wrapped in a `{}`: use `{}::from_mock(mock)` to instantiate the client",
mock_name, mock_wrapper_name, client_name
);
let extra_client_methods = if context.generate_extra_service_methods {
generate_extra_methods_calling_inner()
Expand Down
46 changes: 33 additions & 13 deletions quickwit/quickwit-common/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ use futures::Future;
use rand::Rng;
use tracing::{debug, warn};

const DEFAULT_MAX_ATTEMPTS: usize = 30;
const DEFAULT_BASE_DELAY: Duration = Duration::from_millis(250);
const DEFAULT_MAX_DELAY: Duration = Duration::from_secs(20);

pub trait Retryable {
fn is_retryable(&self) -> bool {
false
Expand Down Expand Up @@ -66,17 +62,37 @@ pub struct RetryParams {
pub max_attempts: usize,
}

impl Default for RetryParams {
fn default() -> Self {
impl RetryParams {
/// Creates a new [`RetryParams`] instance using the same settings as the standard retry policy
/// defined in the AWS SDK for Rust.
pub fn standard() -> Self {
Self {
base_delay: DEFAULT_BASE_DELAY,
max_delay: DEFAULT_MAX_DELAY,
max_attempts: DEFAULT_MAX_ATTEMPTS,
base_delay: Duration::from_secs(1),
max_delay: Duration::from_secs(20),
max_attempts: 3,
}
}

/// Creates a new [`RetryParams`] instance using settings that are more aggressive than those of
/// the standard policy for services that are more resilient to retries, usually managed
/// cloud services.
pub fn aggressive() -> Self {
Self {
base_delay: Duration::from_millis(250),
max_delay: Duration::from_secs(20),
max_attempts: 5,
}
}

/// Creates a new [`RetryParams`] instance that does not perform any retries.
pub fn no_retries() -> Self {
Self {
base_delay: Duration::ZERO,
max_delay: Duration::ZERO,
max_attempts: 1,
}
}
}

impl RetryParams {
/// Computes the delay after which a new attempt should be performed. The randomized delay
/// increases after each attempt (exponential backoff and full jitter). Implementation and
/// default values originate from the Java SDK. See also: <https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/>.
Expand Down Expand Up @@ -104,7 +120,7 @@ impl RetryParams {
Self {
base_delay: Duration::from_millis(1),
max_delay: Duration::from_millis(2),
..Default::default()
max_attempts: 3,
}
}
}
Expand Down Expand Up @@ -211,7 +227,11 @@ mod tests {
let noop_mock = NoopSleep;
let values_it = RwLock::new(values.into_iter());
retry_with_mockable_sleep(
&RetryParams::default(),
&RetryParams {
base_delay: Duration::from_millis(1),
max_delay: Duration::from_millis(2),
max_attempts: 30,
},
|| ready(values_it.write().unwrap().next().unwrap()),
noop_mock,
)
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-common/src/tower/delay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ mod tests {
use std::time::Instant;

use tokio::time::Duration;
use tower::ServiceBuilder;
use tower::{ServiceBuilder, ServiceExt};

use super::*;

Expand All @@ -122,7 +122,7 @@ mod tests {
.service_fn(|_| async { Ok::<_, ()>(()) });

let start = Instant::now();
service.call(()).await.unwrap();
service.ready().await.unwrap().call(()).await.unwrap();

let elapsed = start.elapsed();
assert!(elapsed >= delay);
Expand Down
Loading

0 comments on commit 1077acb

Please sign in to comment.