Skip to content

Commit

Permalink
Adopt omniqueue as a queue backend and remove our own in-memory queue…
Browse files Browse the repository at this point in the history
… implementation (#1188)

~~Depends on #1185 (merged),
svix/omniqueue-rs#24

I have deleted low-level tests for the in-memory queue since its basic
functionality should be tested inside omniqueue, not here. Do we have
tests for higher-level functionality that uses the in-memory queue
backend in some scenarios?

The redis and SQS implementation should probably be replaced by the
implementations found in Omniqueue in a follow-up PR, to keep this one
reasonably small.
  • Loading branch information
svix-onelson authored Feb 12, 2024
2 parents fc5b0ad + 39e16bb commit 3a78289
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 310 deletions.
15 changes: 15 additions & 0 deletions server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions server/svix-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ urlencoding = "2.1.2"
form_urlencoded = "1.1.0"
lapin = "2.1.1"
sentry = { version = "0.32.2", features = ["tracing"] }
omniqueue = { git = "https://github.com/svix/omniqueue-rs.git", rev = "32bf5f17209b76ab33902ed149f1890a80dda32a", default-features = false, features = ["memory_queue"] }

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = { version = "0.5", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion server/svix-server/src/core/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use axum::async_trait;
use enum_dispatch::enum_dispatch;
use serde::{de::DeserializeOwned, Serialize};

use crate::core::run_with_retries::run_with_retries;
use crate::core::retry::run_with_retries;

pub mod memory;
pub mod none;
Expand Down
2 changes: 1 addition & 1 deletion server/svix-server/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub mod message_app;
pub mod operational_webhooks;
pub mod otel_spans;
pub mod permissions;
pub mod run_with_retries;
pub mod retry;
pub mod security;
pub mod types;
pub mod webhook_http_client;
Expand Down
67 changes: 67 additions & 0 deletions server/svix-server/src/core/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use std::{future::Future, time::Duration};

use tracing::warn;

pub async fn run_with_retries<
T,
E: std::error::Error,
F: Future<Output = Result<T, E>>,
FN: FnMut() -> F,
>(
mut fun: FN,
should_retry: impl Fn(&E) -> bool,
retry_schedule: &[Duration],
) -> Result<T, E> {
let mut retry = Retry::new(should_retry, retry_schedule);
loop {
if let Some(result) = retry.run(&mut fun).await {
return result;
}
}
}

/// A state machine for retrying an asynchronous operation.
///
/// Unfortunately needed to get around Rust's lack of `AttachedFn*` traits.
/// For usage, check the implementation of `run_with_retries`.`
pub struct Retry<'a, Re> {
retry_schedule: &'a [Duration],
should_retry: Re,
}

impl<'a, Re> Retry<'a, Re> {
pub fn new(should_retry: Re, retry_schedule: &'a [Duration]) -> Self {
Self {
retry_schedule,
should_retry,
}
}

pub async fn run<T, E, F, Fut>(&mut self, f: F) -> Option<Result<T, E>>
where
E: std::error::Error,
F: FnOnce() -> Fut,
Fut: Future<Output = Result<T, E>>,
Re: Fn(&E) -> bool,
{
match f().await {
// If the function succeeded, we're done
Ok(t) => Some(Ok(t)),
Err(e) => {
let should_retry = &self.should_retry;
if self.retry_schedule.is_empty() || !should_retry(&e) {
// If we already used up all the retries or should_retry returns false,
// return the latest error and stop retrying.
self.retry_schedule = &[];
Some(Err(e))
} else {
// Otherwise, wait and let the caller call retry.run() again.
warn!("Retrying after error: {e}");
tokio::time::sleep(self.retry_schedule[0]).await;
self.retry_schedule = &self.retry_schedule[1..];
None
}
}
}
}
}
30 changes: 0 additions & 30 deletions server/svix-server/src/core/run_with_retries.rs

This file was deleted.

7 changes: 7 additions & 0 deletions server/svix-server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ impl From<redis::RedisError> for Error {
}
}

impl From<omniqueue::QueueError> for Error {
#[track_caller]
fn from(value: omniqueue::QueueError) -> Self {
Error::queue(value)
}
}

impl<E: error::Error + 'static> From<bb8::RunError<E>> for Error {
#[track_caller]
fn from(value: bb8::RunError<E>) -> Self {
Expand Down
80 changes: 0 additions & 80 deletions server/svix-server/src/queue/memory.rs

This file was deleted.

Loading

0 comments on commit 3a78289

Please sign in to comment.