Skip to content

Commit

Permalink
Replace own in-memory queue implementation by omniqueue implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
svix-jplatte committed Feb 8, 2024
1 parent 3dfe78d commit dd732ce
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 242 deletions.
80 changes: 0 additions & 80 deletions server/svix-server/src/queue/memory.rs

This file was deleted.

176 changes: 14 additions & 162 deletions server/svix-server/src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use axum::async_trait;
use chrono::{DateTime, Utc};
use lapin::options::{BasicAckOptions, BasicNackOptions};
use omniqueue::{
backends::memory_queue::MemoryQueueBackend,
queue::{
consumer::{DynConsumer, QueueConsumer},
producer::QueueProducer,
Delivery,
Delivery, QueueBackend as _,
},
scheduled::ScheduledProducer,
};
Expand All @@ -24,12 +25,8 @@ use crate::{
error::{Error, ErrorType, Result},
};

use self::{
memory::{MemoryQueueConsumer, MemoryQueueProducer},
redis::{RedisQueueConsumer, RedisQueueInner, RedisQueueProducer},
};
use self::redis::{RedisQueueConsumer, RedisQueueInner, RedisQueueProducer};

pub mod memory;
pub mod rabbitmq;
pub mod redis;

Expand All @@ -56,7 +53,17 @@ pub async fn new_pair(
let pool = crate::redis::new_redis_pool_clustered(dsn, cfg).await;
redis::new_pair(pool, prefix).await
}
QueueBackend::Memory => memory::new_pair().await,
QueueBackend::Memory => {
let (producer, consumer) = MemoryQueueBackend::builder(64)
.build_pair()
.await
.expect("building in-memory queue can't fail");

(
TaskQueueProducer::Omni(Arc::new(producer.into_dyn_scheduled(Default::default()))),
TaskQueueConsumer::Omni(consumer.into_dyn(Default::default())),
)
}
QueueBackend::RabbitMq(dsn) => {
let prefix = prefix.unwrap_or("");
let queue = format!("{prefix}-message-queue");
Expand Down Expand Up @@ -143,7 +150,6 @@ impl QueueTask {

#[derive(Clone)]
pub enum TaskQueueProducer {
Memory(MemoryQueueProducer),
Redis(RedisQueueProducer),
RabbitMq(rabbitmq::Producer),
Omni(Arc<omniqueue::scheduled::DynScheduledProducer>),
Expand All @@ -155,7 +161,6 @@ impl TaskQueueProducer {
run_with_retries(
|| async {
match self {
TaskQueueProducer::Memory(q) => q.send(task.clone(), delay).await,
TaskQueueProducer::Redis(q) => q.send(task.clone(), delay).await,
TaskQueueProducer::RabbitMq(q) => q.send(task.clone(), delay).await,
TaskQueueProducer::Omni(q) => if let Some(delay) = delay {
Expand All @@ -175,7 +180,6 @@ impl TaskQueueProducer {

pub enum TaskQueueConsumer {
Redis(RedisQueueConsumer),
Memory(MemoryQueueConsumer),
RabbitMq(rabbitmq::Consumer),
Omni(DynConsumer),
}
Expand All @@ -184,7 +188,6 @@ impl TaskQueueConsumer {
pub async fn receive_all(&mut self) -> Result<Vec<TaskQueueDelivery>> {
match self {
TaskQueueConsumer::Redis(q) => q.receive_all().await.trace(),
TaskQueueConsumer::Memory(q) => q.receive_all().await.trace(),
TaskQueueConsumer::RabbitMq(q) => q.receive_all().await.trace(),
// FIXME(onelson): need to figure out what deadline/duration to use here
TaskQueueConsumer::Omni(q) => q
Expand All @@ -202,7 +205,6 @@ impl TaskQueueConsumer {
/// Used by TaskQueueDeliveries to Ack/Nack itself
#[derive(Debug)]
enum Acker {
Memory(MemoryQueueProducer),
Redis(Arc<RedisQueueInner>),
RabbitMQ(lapin::message::Delivery),
Omni(Delivery),
Expand Down Expand Up @@ -238,7 +240,6 @@ impl TaskQueueDelivery {
.as_ref()
.expect("acker is always Some when trying to ack");
match acker_ref {
Acker::Memory(_) => Ok(()), // nothing to do
Acker::Redis(q) => q.ack(&self.id, &self.task).await.trace(),
Acker::RabbitMQ(delivery) => {
delivery
Expand Down Expand Up @@ -280,10 +281,6 @@ impl TaskQueueDelivery {
.as_ref()
.expect("acker is always Some when trying to ack");
match acker_ref {
Acker::Memory(q) => {
tracing::debug!("nack {}", self.id);
q.send(self.task.clone(), None).await.trace()
}
Acker::Redis(q) => q.nack(&self.id, &self.task).await.trace(),
Acker::RabbitMQ(delivery) => {
// See https://www.rabbitmq.com/confirms.html#consumer-nacks-requeue
Expand Down Expand Up @@ -349,148 +346,3 @@ trait TaskQueueSend: Sync + Send {
trait TaskQueueReceive {
async fn receive_all(&mut self) -> Result<Vec<TaskQueueDelivery>>;
}

#[cfg(test)]
mod tests {
use super::*;

// TODO: Test Redis impl too

/// Creates a [`MessageTask`] with filler information and the given MessageId inner String
fn mock_message(message_id: String) -> QueueTask {
MessageTask::new_task(
MessageId(message_id),
ApplicationId("TestEndpointID".to_owned()),
EndpointId("TestEndpointID".to_owned()),
MessageAttemptTriggerType::Scheduled,
)
}

/// Sends a message with the given TaskQueueProducer reference and asserts that the result is OK
async fn assert_send(tx: &TaskQueueProducer, message_id: &str) {
assert!(tx
.send(mock_message(message_id.to_owned()), None)
.await
.is_ok());
}

/// Receives a message with the given TaskQueueConsumer mutable reference and asserts that it is
/// equal to the mock message with the given message_id.
async fn assert_recv(rx: &mut TaskQueueConsumer, message_id: &str) {
assert_eq!(
*rx.receive_all().await.unwrap().first().unwrap().task,
mock_message(message_id.to_owned())
)
}

#[tokio::test]
async fn test_single_producer_single_consumer() {
let (tx_mem, mut rx_mem) = memory::new_pair().await;

let msg_id = "TestMessageID1";

assert_send(&tx_mem, msg_id).await;
assert_recv(&mut rx_mem, msg_id).await;
}

#[tokio::test]
async fn test_multiple_producer_single_consumer() {
let (tx_mem, mut rx_mem) = memory::new_pair().await;

let msg_1 = "TestMessageID1";
let msg_2 = "TestMessageID2";

tokio::spawn({
let tx_mem = tx_mem.clone();
async move {
assert_send(&tx_mem, msg_1).await;
}
});
tokio::spawn(async move {
assert_send(&tx_mem, msg_2).await;
});

let tasks = rx_mem.receive_all().await.unwrap();
assert_eq!(*tasks[0].task, mock_message(msg_1.to_owned()));
assert_eq!(*tasks[1].task, mock_message(msg_2.to_owned()));
}

#[tokio::test]
async fn test_delay() {
let (tx_mem, mut rx_mem) = memory::new_pair().await;

let msg_1 = "TestMessageID1";
let msg_2 = "TestMessageID2";

assert!(tx_mem
.send(
mock_message(msg_1.to_owned()),
Some(Duration::from_millis(200))
)
.await
.is_ok());
assert_send(&tx_mem, msg_2).await;

assert_recv(&mut rx_mem, msg_2).await;
assert_recv(&mut rx_mem, msg_1).await;
}

#[tokio::test]
async fn test_ack() {
let (tx_mem, mut rx_mem) = memory::new_pair().await;
assert!(tx_mem
.send(mock_message("test".to_owned()), None)
.await
.is_ok());

let recv = rx_mem
.receive_all()
.await
.unwrap()
.into_iter()
.next()
.unwrap();

assert_eq!(*recv.task, mock_message("test".to_owned()));

assert!(recv.ack().await.is_ok());

tokio::select! {
_ = rx_mem.receive_all() => {
panic!("`rx_mem` received second message");
}

// FIXME: Find out correct timeout duration
_ = tokio::time::sleep(Duration::from_millis(500)) => {}
}
}

#[tokio::test]
async fn test_nack() {
let (tx_mem, mut rx_mem) = memory::new_pair().await;
assert!(tx_mem
.send(mock_message("test".to_owned()), None)
.await
.is_ok());

let recv = rx_mem
.receive_all()
.await
.unwrap()
.into_iter()
.next()
.unwrap();
assert_eq!(*recv.task, mock_message("test".to_owned()));

assert!(recv.nack().await.is_ok());

tokio::select! {
_ = rx_mem.receive_all() => {}

// FIXME: Find out correct timeout duration
_ = tokio::time::sleep(Duration::from_millis(500)) => {
panic!("`rx_mem` did not receive second message");
}
}
}
}

0 comments on commit dd732ce

Please sign in to comment.