Skip to content

Commit

Permalink
server: Add Omniqueue as an additional queue variant
Browse files Browse the repository at this point in the history
  • Loading branch information
svix-jplatte committed Feb 8, 2024
1 parent 7a65152 commit f15632e
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 43 deletions.
15 changes: 15 additions & 0 deletions server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions server/svix-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ urlencoding = "2.1.2"
form_urlencoded = "1.1.0"
lapin = "2.1.1"
sentry = { version = "0.32.2", features = ["tracing"] }
omniqueue = { git = "https://github.com/svix/omniqueue-rs.git", rev = "32bf5f17209b76ab33902ed149f1890a80dda32a", default-features = false }

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = { version = "0.5", optional = true }
Expand Down
7 changes: 7 additions & 0 deletions server/svix-server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ impl From<redis::RedisError> for Error {
}
}

impl From<omniqueue::QueueError> for Error {
#[track_caller]
fn from(value: omniqueue::QueueError) -> Self {
Error::queue(value)
}
}

impl<E: error::Error + 'static> From<bb8::RunError<E>> for Error {
#[track_caller]
fn from(value: bb8::RunError<E>) -> Self {
Expand Down
175 changes: 132 additions & 43 deletions server/svix-server/src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,22 @@ use std::{sync::Arc, time::Duration};
use axum::async_trait;
use chrono::{DateTime, Utc};
use lapin::options::{BasicAckOptions, BasicNackOptions};
use omniqueue::{
queue::{
consumer::{DynConsumer, QueueConsumer},
producer::QueueProducer,
Delivery,
},
scheduled::ScheduledProducer,
};
use serde::{Deserialize, Serialize};
use svix_ksuid::*;

use crate::error::Traceable;
use crate::{
cfg::{Configuration, QueueBackend},
core::{
retry::run_with_retries,
retry::{run_with_retries, Retry},
types::{ApplicationId, EndpointId, MessageAttemptTriggerType, MessageId},
},
error::{Error, ErrorType, Result},
Expand Down Expand Up @@ -138,6 +146,7 @@ pub enum TaskQueueProducer {
Memory(MemoryQueueProducer),
Redis(RedisQueueProducer),
RabbitMq(rabbitmq::Producer),
Omni(Arc<omniqueue::scheduled::DynScheduledProducer>),
}

impl TaskQueueProducer {
Expand All @@ -149,6 +158,12 @@ impl TaskQueueProducer {
TaskQueueProducer::Memory(q) => q.send(task.clone(), delay).await,
TaskQueueProducer::Redis(q) => q.send(task.clone(), delay).await,
TaskQueueProducer::RabbitMq(q) => q.send(task.clone(), delay).await,
TaskQueueProducer::Omni(q) => if let Some(delay) = delay {
q.send_serde_json_scheduled(task.as_ref(), delay).await
} else {
q.send_serde_json(task.as_ref()).await
}
.map_err(Into::into),
}
},
should_retry,
Expand All @@ -162,6 +177,7 @@ pub enum TaskQueueConsumer {
Redis(RedisQueueConsumer),
Memory(MemoryQueueConsumer),
RabbitMq(rabbitmq::Consumer),
Omni(DynConsumer),
}

impl TaskQueueConsumer {
Expand All @@ -170,6 +186,17 @@ impl TaskQueueConsumer {
TaskQueueConsumer::Redis(q) => q.receive_all().await.trace(),
TaskQueueConsumer::Memory(q) => q.receive_all().await.trace(),
TaskQueueConsumer::RabbitMq(q) => q.receive_all().await.trace(),
TaskQueueConsumer::Omni(q) => {
const MAX_MESSAGES: usize = 128;
// FIXME(onelson): need to figure out what deadline/duration to use here
q.receive_all(MAX_MESSAGES, Duration::from_secs(30))
.await
.map_err(Into::into)
.trace()?
.into_iter()
.map(TryInto::try_into)
.collect()
}
}
}
}
Expand All @@ -180,6 +207,7 @@ enum Acker {
Memory(MemoryQueueProducer),
Redis(Arc<RedisQueueInner>),
RabbitMQ(lapin::message::Delivery),
Omni(Delivery),
}

#[derive(Debug)]
Expand All @@ -202,54 +230,115 @@ impl TaskQueueDelivery {

pub async fn ack(self) -> Result<()> {
tracing::trace!("ack {}", self.id);
run_with_retries(
|| async {
match &self.acker {
Acker::Memory(_) => Ok(()), // nothing to do
Acker::Redis(q) => q.ack(&self.id, &self.task).await.trace(),
Acker::RabbitMQ(delivery) => {
delivery
.ack(BasicAckOptions {
multiple: false, // Only ack this message, not others
})
.await
.map_err(Into::into)

let mut retry = Retry::new(should_retry, RETRY_SCHEDULE);
let mut acker = Some(self.acker);
loop {
if let Some(result) = retry
.run(|| async {
let acker_ref = acker
.as_ref()
.expect("acker is always Some when trying to ack");
match acker_ref {
Acker::Memory(_) => Ok(()), // nothing to do
Acker::Redis(q) => q.ack(&self.id, &self.task).await.trace(),
Acker::RabbitMQ(delivery) => {
delivery
.ack(BasicAckOptions {
multiple: false, // Only ack this message, not others
})
.await
.map_err(Into::into)
}
Acker::Omni(_) => match acker.take() {
Some(Acker::Omni(delivery)) => {
delivery.ack().await.map_err(|(e, delivery)| {
// Put the delivery back in acker beforr retrying, to
// satisfy the expect above.
acker = Some(Acker::Omni(delivery));
e.into()
})
}
_ => unreachable!(),
},
}
}
},
should_retry,
RETRY_SCHEDULE,
)
.await
})
.await
{
return result;
}
}
}

pub async fn nack(self) -> Result<()> {
tracing::trace!("nack {}", self.id);
run_with_retries(
|| async {
match &self.acker {
Acker::Memory(q) => {
tracing::debug!("nack {}", self.id);
q.send(self.task.clone(), None).await.trace()
}
Acker::Redis(q) => q.nack(&self.id, &self.task).await.trace(),
Acker::RabbitMQ(delivery) => {
// See https://www.rabbitmq.com/confirms.html#consumer-nacks-requeue

delivery
.nack(BasicNackOptions {
requeue: true,
multiple: false, // Only nack this message, not others
})
.await
.map_err(Into::into)

let mut retry = Retry::new(should_retry, RETRY_SCHEDULE);
let mut acker = Some(self.acker);
loop {
if let Some(result) = retry
.run(|| async {
let acker_ref = acker
.as_ref()
.expect("acker is always Some when trying to ack");
match acker_ref {
Acker::Memory(q) => {
tracing::debug!("nack {}", self.id);
q.send(self.task.clone(), None).await.trace()
}
Acker::Redis(q) => q.nack(&self.id, &self.task).await.trace(),
Acker::RabbitMQ(delivery) => {
// See https://www.rabbitmq.com/confirms.html#consumer-nacks-requeue

delivery
.nack(BasicNackOptions {
requeue: true,
multiple: false, // Only nack this message, not others
})
.await
.map_err(Into::into)
}
Acker::Omni(_) => match acker.take() {
Some(Acker::Omni(delivery)) => {
delivery
.nack()
.await
.map_err(|(e, delivery)| {
// Put the delivery back in acker beforr retrying, to
// satisfy the expect above.
acker = Some(Acker::Omni(delivery));
e.into()
})
.trace()
}
_ => unreachable!(),
},
}
}
},
should_retry,
RETRY_SCHEDULE,
)
.await
})
.await
{
return result;
}
}
}
}

impl TryFrom<Delivery> for TaskQueueDelivery {
type Error = Error;
fn try_from(value: Delivery) -> Result<Self> {
Ok(TaskQueueDelivery {
// FIXME(onelson): ksuid for the id?
// Since ack/nack is all handled internally by the omniqueue delivery, maybe it
// doesn't matter.
id: "".to_string(),
task: Arc::new(
value
.payload_serde_json()
.map_err(|_| Error::queue("Failed to decode queue task"))?
.ok_or_else(|| Error::queue("Unexpected empty delivery"))?,
),
acker: Acker::Omni(value),
})
}
}

Expand Down

0 comments on commit f15632e

Please sign in to comment.