diff --git a/server/svix-server/src/queue/memory.rs b/server/svix-server/src/queue/memory.rs deleted file mode 100644 index 2df813bd4..000000000 --- a/server/svix-server/src/queue/memory.rs +++ /dev/null @@ -1,80 +0,0 @@ -use std::{sync::Arc, time::Duration}; - -use axum::async_trait; -use chrono::Utc; -use tokio::{sync::mpsc, time::sleep}; - -use crate::error::Error; -use crate::error::Result; - -use super::{ - Acker, QueueTask, TaskQueueConsumer, TaskQueueDelivery, TaskQueueProducer, TaskQueueReceive, - TaskQueueSend, -}; - -pub async fn new_pair() -> (TaskQueueProducer, TaskQueueConsumer) { - let (tx, rx) = mpsc::unbounded_channel::(); - ( - TaskQueueProducer::Memory(MemoryQueueProducer { tx }), - TaskQueueConsumer::Memory(MemoryQueueConsumer { rx }), - ) -} - -#[derive(Clone, Debug)] -pub struct MemoryQueueProducer { - tx: mpsc::UnboundedSender, -} - -#[async_trait] -impl TaskQueueSend for MemoryQueueProducer { - async fn send(&self, msg: Arc, delay: Option) -> Result<()> { - let timestamp = delay.map(|delay| Utc::now() + chrono::Duration::from_std(delay).unwrap()); - let delivery = TaskQueueDelivery::from_arc(msg, timestamp, Acker::Memory(self.clone())); - - if let Some(delay) = delay { - let tx = self.tx.clone(); - tokio::spawn(async move { - // We just assume memory queue always works, so we can defer the error handling - tracing::trace!("MemoryQueue: event sent > (delay: {:?})", delay); - sleep(delay).await; - if tx.send(delivery).is_err() { - tracing::error!("Receiver dropped"); - } - }); - } else if self.tx.send(delivery).is_err() { - tracing::error!("Receiver dropped"); - } - - Ok(()) - } -} - -pub struct MemoryQueueConsumer { - rx: mpsc::UnboundedReceiver, -} - -#[async_trait] -impl TaskQueueReceive for MemoryQueueConsumer { - async fn receive_all(&mut self) -> Result> { - let mut deliveries = tokio::select! { - _ = tokio::time::sleep(Duration::from_secs(30)) => return Ok(Vec::new()), - recv = self.rx.recv() => { - if let Some(delivery) = recv { - tracing::trace!("MemoryQueue: event recv <"); - vec![delivery] - } else { - return Err(Error::queue("Failed to fetch from queue")) - } - } - }; - - // possible errors are `Empty` or `Disconnected`. Either way, - // we want to return the deliveries that could be received. - // If it was Disconnected, the next call to receive_all will fail - while let Ok(delivery) = self.rx.try_recv() { - deliveries.push(delivery); - } - - Ok(deliveries) - } -} diff --git a/server/svix-server/src/queue/mod.rs b/server/svix-server/src/queue/mod.rs index 7b950909c..6b4506c92 100644 --- a/server/svix-server/src/queue/mod.rs +++ b/server/svix-server/src/queue/mod.rs @@ -4,10 +4,11 @@ use axum::async_trait; use chrono::{DateTime, Utc}; use lapin::options::{BasicAckOptions, BasicNackOptions}; use omniqueue::{ + backends::memory_queue::MemoryQueueBackend, queue::{ consumer::{DynConsumer, QueueConsumer}, producer::QueueProducer, - Delivery, + Delivery, QueueBackend as _, }, scheduled::ScheduledProducer, }; @@ -24,12 +25,8 @@ use crate::{ error::{Error, ErrorType, Result}, }; -use self::{ - memory::{MemoryQueueConsumer, MemoryQueueProducer}, - redis::{RedisQueueConsumer, RedisQueueInner, RedisQueueProducer}, -}; +use self::redis::{RedisQueueConsumer, RedisQueueInner, RedisQueueProducer}; -pub mod memory; pub mod rabbitmq; pub mod redis; @@ -56,7 +53,17 @@ pub async fn new_pair( let pool = crate::redis::new_redis_pool_clustered(dsn, cfg).await; redis::new_pair(pool, prefix).await } - QueueBackend::Memory => memory::new_pair().await, + QueueBackend::Memory => { + let (producer, consumer) = MemoryQueueBackend::builder(64) + .build_pair() + .await + .expect("building in-memory queue can't fail"); + + ( + TaskQueueProducer::Omni(Arc::new(producer.into_dyn_scheduled(Default::default()))), + TaskQueueConsumer::Omni(consumer.into_dyn(Default::default())), + ) + } QueueBackend::RabbitMq(dsn) => { let prefix = prefix.unwrap_or(""); let queue = format!("{prefix}-message-queue"); @@ -143,7 +150,6 @@ impl QueueTask { #[derive(Clone)] pub enum TaskQueueProducer { - Memory(MemoryQueueProducer), Redis(RedisQueueProducer), RabbitMq(rabbitmq::Producer), Omni(Arc), @@ -155,7 +161,6 @@ impl TaskQueueProducer { run_with_retries( || async { match self { - TaskQueueProducer::Memory(q) => q.send(task.clone(), delay).await, TaskQueueProducer::Redis(q) => q.send(task.clone(), delay).await, TaskQueueProducer::RabbitMq(q) => q.send(task.clone(), delay).await, TaskQueueProducer::Omni(q) => if let Some(delay) = delay { @@ -175,7 +180,6 @@ impl TaskQueueProducer { pub enum TaskQueueConsumer { Redis(RedisQueueConsumer), - Memory(MemoryQueueConsumer), RabbitMq(rabbitmq::Consumer), Omni(DynConsumer), } @@ -184,7 +188,6 @@ impl TaskQueueConsumer { pub async fn receive_all(&mut self) -> Result> { match self { TaskQueueConsumer::Redis(q) => q.receive_all().await.trace(), - TaskQueueConsumer::Memory(q) => q.receive_all().await.trace(), TaskQueueConsumer::RabbitMq(q) => q.receive_all().await.trace(), // FIXME(onelson): need to figure out what deadline/duration to use here TaskQueueConsumer::Omni(q) => q @@ -202,7 +205,6 @@ impl TaskQueueConsumer { /// Used by TaskQueueDeliveries to Ack/Nack itself #[derive(Debug)] enum Acker { - Memory(MemoryQueueProducer), Redis(Arc), RabbitMQ(lapin::message::Delivery), Omni(Delivery), @@ -238,7 +240,6 @@ impl TaskQueueDelivery { .as_ref() .expect("acker is always Some when trying to ack"); match acker_ref { - Acker::Memory(_) => Ok(()), // nothing to do Acker::Redis(q) => q.ack(&self.id, &self.task).await.trace(), Acker::RabbitMQ(delivery) => { delivery @@ -280,10 +281,6 @@ impl TaskQueueDelivery { .as_ref() .expect("acker is always Some when trying to ack"); match acker_ref { - Acker::Memory(q) => { - tracing::debug!("nack {}", self.id); - q.send(self.task.clone(), None).await.trace() - } Acker::Redis(q) => q.nack(&self.id, &self.task).await.trace(), Acker::RabbitMQ(delivery) => { // See https://www.rabbitmq.com/confirms.html#consumer-nacks-requeue @@ -349,148 +346,3 @@ trait TaskQueueSend: Sync + Send { trait TaskQueueReceive { async fn receive_all(&mut self) -> Result>; } - -#[cfg(test)] -mod tests { - use super::*; - - // TODO: Test Redis impl too - - /// Creates a [`MessageTask`] with filler information and the given MessageId inner String - fn mock_message(message_id: String) -> QueueTask { - MessageTask::new_task( - MessageId(message_id), - ApplicationId("TestEndpointID".to_owned()), - EndpointId("TestEndpointID".to_owned()), - MessageAttemptTriggerType::Scheduled, - ) - } - - /// Sends a message with the given TaskQueueProducer reference and asserts that the result is OK - async fn assert_send(tx: &TaskQueueProducer, message_id: &str) { - assert!(tx - .send(mock_message(message_id.to_owned()), None) - .await - .is_ok()); - } - - /// Receives a message with the given TaskQueueConsumer mutable reference and asserts that it is - /// equal to the mock message with the given message_id. - async fn assert_recv(rx: &mut TaskQueueConsumer, message_id: &str) { - assert_eq!( - *rx.receive_all().await.unwrap().first().unwrap().task, - mock_message(message_id.to_owned()) - ) - } - - #[tokio::test] - async fn test_single_producer_single_consumer() { - let (tx_mem, mut rx_mem) = memory::new_pair().await; - - let msg_id = "TestMessageID1"; - - assert_send(&tx_mem, msg_id).await; - assert_recv(&mut rx_mem, msg_id).await; - } - - #[tokio::test] - async fn test_multiple_producer_single_consumer() { - let (tx_mem, mut rx_mem) = memory::new_pair().await; - - let msg_1 = "TestMessageID1"; - let msg_2 = "TestMessageID2"; - - tokio::spawn({ - let tx_mem = tx_mem.clone(); - async move { - assert_send(&tx_mem, msg_1).await; - } - }); - tokio::spawn(async move { - assert_send(&tx_mem, msg_2).await; - }); - - let tasks = rx_mem.receive_all().await.unwrap(); - assert_eq!(*tasks[0].task, mock_message(msg_1.to_owned())); - assert_eq!(*tasks[1].task, mock_message(msg_2.to_owned())); - } - - #[tokio::test] - async fn test_delay() { - let (tx_mem, mut rx_mem) = memory::new_pair().await; - - let msg_1 = "TestMessageID1"; - let msg_2 = "TestMessageID2"; - - assert!(tx_mem - .send( - mock_message(msg_1.to_owned()), - Some(Duration::from_millis(200)) - ) - .await - .is_ok()); - assert_send(&tx_mem, msg_2).await; - - assert_recv(&mut rx_mem, msg_2).await; - assert_recv(&mut rx_mem, msg_1).await; - } - - #[tokio::test] - async fn test_ack() { - let (tx_mem, mut rx_mem) = memory::new_pair().await; - assert!(tx_mem - .send(mock_message("test".to_owned()), None) - .await - .is_ok()); - - let recv = rx_mem - .receive_all() - .await - .unwrap() - .into_iter() - .next() - .unwrap(); - - assert_eq!(*recv.task, mock_message("test".to_owned())); - - assert!(recv.ack().await.is_ok()); - - tokio::select! { - _ = rx_mem.receive_all() => { - panic!("`rx_mem` received second message"); - } - - // FIXME: Find out correct timeout duration - _ = tokio::time::sleep(Duration::from_millis(500)) => {} - } - } - - #[tokio::test] - async fn test_nack() { - let (tx_mem, mut rx_mem) = memory::new_pair().await; - assert!(tx_mem - .send(mock_message("test".to_owned()), None) - .await - .is_ok()); - - let recv = rx_mem - .receive_all() - .await - .unwrap() - .into_iter() - .next() - .unwrap(); - assert_eq!(*recv.task, mock_message("test".to_owned())); - - assert!(recv.nack().await.is_ok()); - - tokio::select! { - _ = rx_mem.receive_all() => {} - - // FIXME: Find out correct timeout duration - _ = tokio::time::sleep(Duration::from_millis(500)) => { - panic!("`rx_mem` did not receive second message"); - } - } - } -}