Skip to content

Commit

Permalink
WIP delay support - SQS, Mem, and Rabbit work, Redis is _rough_.
Browse files Browse the repository at this point in the history
  • Loading branch information
svix-onelson committed Oct 31, 2023
1 parent eab67e0 commit a78dd7b
Show file tree
Hide file tree
Showing 12 changed files with 449 additions and 18 deletions.
1 change: 1 addition & 0 deletions _rabbit/enabled_plugins
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[rabbitmq_management, rabbitmq_delayed_message_exchange].
Binary file not shown.
12 changes: 8 additions & 4 deletions omniqueue/src/backends/memory_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,11 +430,15 @@ mod tests {
let delay = Duration::from_millis(100);
let now = Instant::now();
p.send_serde_json_scheduled(&payload1, delay).await.unwrap();
let delivery = c.receive().await.unwrap();

// `receive` will wait until the next message is available, so this assertion should be
// true if the delay was observed.
let delivery = c
.receive_all(1, delay * 2)
.await
.unwrap()
.into_iter()
.next()
.unwrap();
assert!(now.elapsed() >= delay);
assert!(now.elapsed() < delay * 2);
assert_eq!(Some(payload1), delivery.payload_serde_json().unwrap());
}
}
38 changes: 35 additions & 3 deletions omniqueue/src/backends/rabbitmq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{any::TypeId, collections::HashMap};
use async_trait::async_trait;
use futures::StreamExt;
use futures_util::FutureExt;
use lapin::types::AMQPValue;
pub use lapin::{
acker::Acker as LapinAcker,
options::{
Expand All @@ -18,6 +19,7 @@ use crate::{
decoding::DecoderRegistry,
encoding::{CustomEncoder, EncoderRegistry},
queue::{consumer::QueueConsumer, producer::QueueProducer, Acker, Delivery, QueueBackend},
scheduled::ScheduledProducer,
QueueError,
};

Expand Down Expand Up @@ -89,13 +91,13 @@ async fn producer(

#[async_trait]
impl QueueBackend for RabbitMqBackend {
type Config = RabbitMqConfig;

type PayloadIn = Vec<u8>;

type PayloadOut = Vec<u8>;
type Producer = RabbitMqProducer;

type Consumer = RabbitMqConsumer;
type Producer = RabbitMqProducer;
type Config = RabbitMqConfig;

async fn new_pair(
cfg: RabbitMqConfig,
Expand Down Expand Up @@ -168,6 +170,36 @@ impl QueueProducer for RabbitMqProducer {
}
}

#[async_trait]
impl ScheduledProducer for RabbitMqProducer {
async fn send_raw_scheduled(
&self,
payload: &Self::Payload,
delay: Duration,
) -> Result<(), QueueError> {
let mut headers = FieldTable::default();

let delay_ms: u32 = delay
.as_millis()
.try_into()
.map_err(|_| QueueError::Generic("delay is too large".into()))?;
headers.insert("x-delay".into(), AMQPValue::LongUInt(delay_ms));

self.channel
.basic_publish(
&self.exchange,
&self.routing_key,
self.options,
payload,
self.properties.clone().with_headers(headers),
)
.await
.map_err(QueueError::generic)?;

Ok(())
}
}

pub struct RabbitMqConsumer {
registry: DecoderRegistry<Vec<u8>>,
consumer: Consumer,
Expand Down
Loading

0 comments on commit a78dd7b

Please sign in to comment.