Skip to content

Commit

Permalink
add scheduled support for memory backend
Browse files Browse the repository at this point in the history
  • Loading branch information
svix-onelson committed Oct 27, 2023
1 parent 502dd15 commit eab67e0
Showing 1 changed file with 43 additions and 1 deletion.
44 changes: 43 additions & 1 deletion omniqueue/src/backends/memory_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
decoding::DecoderRegistry,
encoding::{CustomEncoder, EncoderRegistry},
queue::{consumer::QueueConsumer, producer::QueueProducer, Acker, Delivery, QueueBackend},
scheduled::ScheduledProducer,
QueueError,
};

Expand Down Expand Up @@ -72,7 +73,7 @@ impl QueueProducer for MemoryQueueProducer {
self.registry.as_ref()
}

async fn send_raw(&self, payload: &Vec<u8>) -> Result<(), QueueError> {
async fn send_raw(&self, payload: &Self::Payload) -> Result<(), QueueError> {
self.tx
.send(payload.clone())
.map(|_| ())
Expand All @@ -85,6 +86,26 @@ impl QueueProducer for MemoryQueueProducer {
}
}

#[async_trait]
impl ScheduledProducer for MemoryQueueProducer {
async fn send_raw_scheduled(
&self,
payload: &Self::Payload,
delay: Duration,
) -> Result<(), QueueError> {
let tx = self.tx.clone();
let payload = payload.clone();
tokio::spawn(async move {
tracing::trace!("MemoryQueue: event sent > (delay: {:?})", delay);
tokio::time::sleep(delay).await;
if tx.send(payload).is_err() {
tracing::error!("Receiver dropped");
}
});
Ok(())
}
}

pub struct MemoryQueueConsumer {
registry: DecoderRegistry<Vec<u8>>,
rx: broadcast::Receiver<Vec<u8>>,
Expand Down Expand Up @@ -182,6 +203,7 @@ mod tests {

use crate::{
queue::{consumer::QueueConsumer, producer::QueueProducer, QueueBuilder},
scheduled::ScheduledProducer,
QueueError,
};

Expand Down Expand Up @@ -395,4 +417,24 @@ mod tests {
assert!(elapsed >= deadline);
assert!(elapsed <= deadline + Duration::from_millis(200));
}

#[tokio::test]
async fn test_scheduled() {
let payload1 = ExType { a: 1 };

let (p, mut c) = QueueBuilder::<MemoryQueueBackend, _>::new(16)
.build_pair()
.await
.unwrap();

let delay = Duration::from_millis(100);
let now = Instant::now();
p.send_serde_json_scheduled(&payload1, delay).await.unwrap();
let delivery = c.receive().await.unwrap();

// `receive` will wait until the next message is available, so this assertion should be
// true if the delay was observed.
assert!(now.elapsed() >= delay);
assert_eq!(Some(payload1), delivery.payload_serde_json().unwrap());
}
}

0 comments on commit eab67e0

Please sign in to comment.