From eab67e04fd11b81ecb5b65c5041b0c7ea67e002a Mon Sep 17 00:00:00 2001 From: Owen Nelson Date: Thu, 26 Oct 2023 17:09:53 -0700 Subject: [PATCH] add scheduled support for memory backend --- omniqueue/src/backends/memory_queue.rs | 44 +++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/omniqueue/src/backends/memory_queue.rs b/omniqueue/src/backends/memory_queue.rs index e6dd98e..abbd923 100644 --- a/omniqueue/src/backends/memory_queue.rs +++ b/omniqueue/src/backends/memory_queue.rs @@ -9,6 +9,7 @@ use crate::{ decoding::DecoderRegistry, encoding::{CustomEncoder, EncoderRegistry}, queue::{consumer::QueueConsumer, producer::QueueProducer, Acker, Delivery, QueueBackend}, + scheduled::ScheduledProducer, QueueError, }; @@ -72,7 +73,7 @@ impl QueueProducer for MemoryQueueProducer { self.registry.as_ref() } - async fn send_raw(&self, payload: &Vec) -> Result<(), QueueError> { + async fn send_raw(&self, payload: &Self::Payload) -> Result<(), QueueError> { self.tx .send(payload.clone()) .map(|_| ()) @@ -85,6 +86,26 @@ impl QueueProducer for MemoryQueueProducer { } } +#[async_trait] +impl ScheduledProducer for MemoryQueueProducer { + async fn send_raw_scheduled( + &self, + payload: &Self::Payload, + delay: Duration, + ) -> Result<(), QueueError> { + let tx = self.tx.clone(); + let payload = payload.clone(); + tokio::spawn(async move { + tracing::trace!("MemoryQueue: event sent > (delay: {:?})", delay); + tokio::time::sleep(delay).await; + if tx.send(payload).is_err() { + tracing::error!("Receiver dropped"); + } + }); + Ok(()) + } +} + pub struct MemoryQueueConsumer { registry: DecoderRegistry>, rx: broadcast::Receiver>, @@ -182,6 +203,7 @@ mod tests { use crate::{ queue::{consumer::QueueConsumer, producer::QueueProducer, QueueBuilder}, + scheduled::ScheduledProducer, QueueError, }; @@ -395,4 +417,24 @@ mod tests { assert!(elapsed >= deadline); assert!(elapsed <= deadline + Duration::from_millis(200)); } + + #[tokio::test] + async fn test_scheduled() { + let payload1 = ExType { a: 1 }; + + let (p, mut c) = QueueBuilder::::new(16) + .build_pair() + .await + .unwrap(); + + let delay = Duration::from_millis(100); + let now = Instant::now(); + p.send_serde_json_scheduled(&payload1, delay).await.unwrap(); + let delivery = c.receive().await.unwrap(); + + // `receive` will wait until the next message is available, so this assertion should be + // true if the delay was observed. + assert!(now.elapsed() >= delay); + assert_eq!(Some(payload1), delivery.payload_serde_json().unwrap()); + } }