diff --git a/_rabbit/enabled_plugins b/_rabbit/enabled_plugins new file mode 100644 index 0000000..0dfabd2 --- /dev/null +++ b/_rabbit/enabled_plugins @@ -0,0 +1 @@ +[rabbitmq_management, rabbitmq_delayed_message_exchange]. \ No newline at end of file diff --git a/_rabbit/plugins/rabbitmq_delayed_message_exchange-3.11.1.ez b/_rabbit/plugins/rabbitmq_delayed_message_exchange-3.11.1.ez new file mode 100644 index 0000000..fb7530f Binary files /dev/null and b/_rabbit/plugins/rabbitmq_delayed_message_exchange-3.11.1.ez differ diff --git a/omniqueue/Cargo.toml b/omniqueue/Cargo.toml index c56c561..7ea5397 100644 --- a/omniqueue/Cargo.toml +++ b/omniqueue/Cargo.toml @@ -23,6 +23,7 @@ lapin = { version = "2", optional = true } redis = { version = "0.23", features = ["tokio-comp", "tokio-native-tls-comp", "streams"], optional = true } serde = { version = "1", features = ["derive", "rc"] } serde_json = "1" +svix-ksuid = { version = "0.7.0", optional = true } thiserror = "1" tokio = { version = "1", features = ["full"] } tokio-util = { version = "0.7", optional = true } @@ -43,6 +44,6 @@ gcp_pubsub = [ ] memory_queue = [] rabbitmq = ["dep:lapin"] -redis = ["dep:bb8", "dep:bb8-redis", "dep:redis"] +redis = ["dep:bb8", "dep:bb8-redis", "dep:redis", "dep:svix-ksuid"] redis_cluster = ["redis", "redis/cluster-async"] sqs = ["dep:aws-config", "dep:aws-sdk-sqs"] diff --git a/omniqueue/src/backends/memory_queue.rs b/omniqueue/src/backends/memory_queue.rs index abbd923..212972e 100644 --- a/omniqueue/src/backends/memory_queue.rs +++ b/omniqueue/src/backends/memory_queue.rs @@ -430,11 +430,15 @@ mod tests { let delay = Duration::from_millis(100); let now = Instant::now(); p.send_serde_json_scheduled(&payload1, delay).await.unwrap(); - let delivery = c.receive().await.unwrap(); - - // `receive` will wait until the next message is available, so this assertion should be - // true if the delay was observed. + let delivery = c + .receive_all(1, delay * 2) + .await + .unwrap() + .into_iter() + .next() + .unwrap(); assert!(now.elapsed() >= delay); + assert!(now.elapsed() < delay * 2); assert_eq!(Some(payload1), delivery.payload_serde_json().unwrap()); } } diff --git a/omniqueue/src/backends/rabbitmq.rs b/omniqueue/src/backends/rabbitmq.rs index 7b2788b..2185f93 100644 --- a/omniqueue/src/backends/rabbitmq.rs +++ b/omniqueue/src/backends/rabbitmq.rs @@ -4,6 +4,7 @@ use std::{any::TypeId, collections::HashMap}; use async_trait::async_trait; use futures::StreamExt; use futures_util::FutureExt; +use lapin::types::AMQPValue; pub use lapin::{ acker::Acker as LapinAcker, options::{ @@ -18,6 +19,7 @@ use crate::{ decoding::DecoderRegistry, encoding::{CustomEncoder, EncoderRegistry}, queue::{consumer::QueueConsumer, producer::QueueProducer, Acker, Delivery, QueueBackend}, + scheduled::ScheduledProducer, QueueError, }; @@ -89,13 +91,13 @@ async fn producer( #[async_trait] impl QueueBackend for RabbitMqBackend { - type Config = RabbitMqConfig; - type PayloadIn = Vec; + type PayloadOut = Vec; + type Producer = RabbitMqProducer; type Consumer = RabbitMqConsumer; - type Producer = RabbitMqProducer; + type Config = RabbitMqConfig; async fn new_pair( cfg: RabbitMqConfig, @@ -168,6 +170,36 @@ impl QueueProducer for RabbitMqProducer { } } +#[async_trait] +impl ScheduledProducer for RabbitMqProducer { + async fn send_raw_scheduled( + &self, + payload: &Self::Payload, + delay: Duration, + ) -> Result<(), QueueError> { + let mut headers = FieldTable::default(); + + let delay_ms: u32 = delay + .as_millis() + .try_into() + .map_err(|_| QueueError::Generic("delay is too large".into()))?; + headers.insert("x-delay".into(), AMQPValue::LongUInt(delay_ms)); + + self.channel + .basic_publish( + &self.exchange, + &self.routing_key, + self.options, + payload, + self.properties.clone().with_headers(headers), + ) + .await + .map_err(QueueError::generic)?; + + Ok(()) + } +} + pub struct RabbitMqConsumer { registry: DecoderRegistry>, consumer: Consumer, diff --git a/omniqueue/src/backends/redis/mod.rs b/omniqueue/src/backends/redis/mod.rs index 7762060..501610e 100644 --- a/omniqueue/src/backends/redis/mod.rs +++ b/omniqueue/src/backends/redis/mod.rs @@ -1,3 +1,29 @@ +//! Redis stream-based queue implementation +//! +//! # Redis Streams in Brief +//! Redis has a built-in queue called streams. With consumer groups and consumers, messages in this +//! queue will automatically be put into a pending queue when read and deleted when acknowledged. +//! +//! # The Implementation +//! This implementation uses this to allow worker instances to race for messages to dispatch which +//! are then, ideally, acknowledged. If a message is processing for more than 45 seconds, it is +//! reinserted at the back of the queue to be tried again. +//! +//! This implementation uses the following data structures: +//! - A "tasks to be processed" stream - which is what the consumer listens to for tasks. +//! AKA: Main +//! - A ZSET for delayed tasks with the sort order being the time-to-be-delivered +//! AKA: Delayed +//! +//! The implementation spawns an additional worker that monitors both the zset delayed tasks and +//! the tasks currently processing. It monitors the zset task set for tasks that should be +//! processed now, and the currently processing queue for tasks that have timed out and should be +//! put back on the main queue. + +// This lint warns on `let _: () = ...` which is used throughout this file for Redis commands which +// have generic return types. This is cleaner than the turbofish operator in my opinion. +#![allow(clippy::let_unit_value)] + use std::time::Duration; use std::{any::TypeId, collections::HashMap, marker::PhantomData}; @@ -5,11 +31,14 @@ use async_trait::async_trait; use bb8::ManageConnection; pub use bb8_redis::RedisMultiplexedConnectionManager; use redis::streams::{StreamId, StreamReadOptions, StreamReadReply}; +use svix_ksuid::KsuidLike; +use tokio::task::JoinHandle; use crate::{ decoding::DecoderRegistry, encoding::{CustomEncoder, EncoderRegistry}, queue::{consumer::QueueConsumer, producer::QueueProducer, Acker, Delivery, QueueBackend}, + scheduled::ScheduledProducer, QueueError, }; @@ -42,6 +71,7 @@ pub struct RedisConfig { pub max_connections: u16, pub reinsert_on_nack: bool, pub queue_key: String, + pub delayed_queue_key: String, pub consumer_group: String, pub consumer_name: String, pub payload_key: String, @@ -50,6 +80,8 @@ pub struct RedisConfig { pub struct RedisQueueBackend(PhantomData); pub type RedisClusterQueueBackend = RedisQueueBackend; +type RawPayload = Vec; + #[async_trait] impl QueueBackend for RedisQueueBackend where @@ -57,14 +89,14 @@ where R::Connection: redis::aio::ConnectionLike + Send + Sync, R::Error: 'static + std::error::Error + Send + Sync, { - type Config = RedisConfig; - // FIXME: Is it possible to use the types Redis actually uses? - type PayloadIn = Vec; - type PayloadOut = Vec; + type PayloadIn = RawPayload; + type PayloadOut = RawPayload; type Producer = RedisStreamProducer; + type Consumer = RedisStreamConsumer; + type Config = RedisConfig; async fn new_pair( cfg: RedisConfig, @@ -78,11 +110,20 @@ where .await .map_err(QueueError::generic)?; + let _ = start_scheduler_background_task( + redis.clone(), + &cfg.queue_key, + &cfg.delayed_queue_key, + &cfg.payload_key, + ) + .await; + Ok(( RedisStreamProducer { registry: custom_encoders, redis: redis.clone(), queue_key: cfg.queue_key.clone(), + delayed_queue_key: cfg.delayed_queue_key, payload_key: cfg.payload_key.clone(), }, RedisStreamConsumer { @@ -107,10 +148,18 @@ where .await .map_err(QueueError::generic)?; + let _ = start_scheduler_background_task( + redis.clone(), + &cfg.queue_key, + &cfg.delayed_queue_key, + &cfg.payload_key, + ) + .await; Ok(RedisStreamProducer { registry: custom_encoders, redis, queue_key: cfg.queue_key, + delayed_queue_key: cfg.delayed_queue_key, payload_key: cfg.payload_key, }) } @@ -126,6 +175,13 @@ where .await .map_err(QueueError::generic)?; + let _ = start_scheduler_background_task( + redis.clone(), + &cfg.queue_key, + &cfg.delayed_queue_key, + &cfg.payload_key, + ) + .await; Ok(RedisStreamConsumer { registry: custom_decoders, redis, @@ -137,6 +193,158 @@ where } } +// FIXME(onelson): there's a trait, [`SchedulerBackend`], but no obvious way to implement it in a +// way that makes good sense here. +// We need access to the pool, and various bits of config to spawn a task, but none of that is +// available where it matters right now. +// Doing my own thing for now - standalone function that takes what it needs. +async fn start_scheduler_background_task( + redis: bb8::Pool, + queue_key: &str, + delayed_queue_key: &str, + payload_key: &str, +) -> Option>> +where + R: RedisConnection, + R::Connection: redis::aio::ConnectionLike + Send + Sync, + R::Error: 'static + std::error::Error + Send + Sync, +{ + if delayed_queue_key.is_empty() { + tracing::warn!("no delayed_queue_key specified - delayed task scheduler disabled"); + return None; + } + + Some(tokio::spawn({ + let pool = redis.clone(); + let mqn = queue_key.to_string(); + let dqn = delayed_queue_key.to_string(); + let delayed_lock = format!("{delayed_queue_key}__lock"); + let payload_key = payload_key.to_string(); + tracing::debug!( + "spawning delayed task scheduler: delayed_queue_key=`{delayed_queue_key}`, \ + delayed_lock=`{delayed_lock}`" + ); + + async move { + loop { + if let Err(err) = background_task_delayed( + pool.clone(), + mqn.clone(), + dqn.clone(), + &delayed_lock, + &payload_key, + ) + .await + { + tracing::error!("{}", err); + tokio::time::sleep(Duration::from_millis(500)).await; + continue; + }; + } + } + })) +} + +/// Special ID for XADD command's which generates a stream ID automatically +const GENERATE_STREAM_ID: &str = "*"; +/// Special ID for XREADGROUP commands which reads any new messages +const LISTEN_STREAM_ID: &str = ">"; + +/// Moves "due" messages from a sorted set, where delayed messages are shelved, back onto the main queue. +async fn background_task_delayed( + pool: bb8::Pool, + main_queue_name: String, + delayed_queue_name: String, + delayed_lock: &str, + payload_key: &str, +) -> Result<(), QueueError> +where + R: RedisConnection, + R::Connection: redis::aio::ConnectionLike + Send + Sync, + R::Error: 'static + std::error::Error + Send + Sync, +{ + let batch_size: isize = 50; + + let mut conn = pool.get().await.map_err(QueueError::generic)?; + + // There is a lock on the delayed queue processing to avoid race conditions. So first try to + // acquire the lock should it not already exist. The lock expires after five seconds in case a + // worker crashes while holding the lock. + let mut cmd = redis::cmd("SET"); + cmd.arg(delayed_lock) + .arg(true) + .arg("NX") + .arg("PX") + .arg(5000); + // WIll be Some("OK") when set or None when not set + let resp: Option = cmd + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + + if resp.as_deref() == Some("OK") { + // First look for delayed keys whose time is up and add them to the main queue + let timestamp: i64 = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map_err(QueueError::generic)? + .as_secs() + .try_into() + .map_err(QueueError::generic)?; + + let keys: Vec = redis::Cmd::zrangebyscore_limit( + &delayed_queue_name, + 0isize, + // Subtract 1 from the timestamp to make it exclusive rather than inclusive, + // preventing premature delivery. + timestamp - 1, + 0isize, + batch_size, + ) + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + + if !keys.is_empty() { + // For each task, XADD them to the MAIN queue + let mut pipe = redis::pipe(); + for key in &keys { + let task = from_delayed_queue_key(key)?; + let _ = pipe.xadd(&main_queue_name, GENERATE_STREAM_ID, &[(payload_key, task)]); + } + let _: () = pipe + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + + // Then remove the tasks from the delayed queue so they aren't resent + let _: () = redis::Cmd::zrem(&delayed_queue_name, keys) + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + + // Make sure to release the lock after done processing + let _: () = redis::Cmd::del(delayed_lock) + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + } else { + // Make sure to release the lock before sleeping + let _: () = redis::Cmd::del(delayed_lock) + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + + // Wait for half a second before attempting to fetch again if nothing was found + tokio::time::sleep(Duration::from_millis(500)).await; + } + } else { + // Also sleep half a second if hte lock could not be fetched + tokio::time::sleep(Duration::from_millis(500)).await; + } + + Ok(()) +} + pub struct RedisStreamAcker { redis: bb8::Pool, queue_key: String, @@ -184,6 +392,7 @@ pub struct RedisStreamProducer { registry: EncoderRegistry>, redis: bb8::Pool, queue_key: String, + delayed_queue_key: String, payload_key: String, } @@ -202,11 +411,78 @@ where async fn send_raw(&self, payload: &Vec) -> Result<(), QueueError> { let mut conn = self.redis.get().await.map_err(QueueError::generic)?; - redis::Cmd::xadd(&self.queue_key, "*", &[(&self.payload_key, payload)]) - .query_async(&mut *conn) - .await + redis::Cmd::xadd( + &self.queue_key, + GENERATE_STREAM_ID, + &[(&self.payload_key, payload)], + ) + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + + Ok(()) + } +} + +/// Acts as a payload prefix for when payloads are written to zset keys. +/// +/// This ensures that messages with identical payloads: +/// - don't only get delivered once instead of N times. +/// - don't replace each other's "delivery due" timestamp. +fn delayed_key_id() -> String { + svix_ksuid::Ksuid::new(None, None).to_base62() +} + +/// Prefixes a payload with an id, separated by a pipe, e.g `ID|payload`. +fn to_delayed_queue_key(payload: &RawPayload) -> Result { + Ok(format!( + "{}|{}", + delayed_key_id(), + serde_json::to_string(payload).map_err(QueueError::generic)? + )) +} + +/// Returns the payload portion of a delayed zset key. +fn from_delayed_queue_key(key: &str) -> Result { + // All information is stored in the key in which the ID and JSON formatted task + // are separated by a `|`. So, take the key, then take the part after the `|`. + serde_json::from_str( + key.split('|') + .nth(1) + .ok_or_else(|| QueueError::Generic("Improper key format".into()))?, + ) + .map_err(QueueError::generic) +} + +#[async_trait] +impl ScheduledProducer for RedisStreamProducer +where + M: ManageConnection, + M::Connection: redis::aio::ConnectionLike + Send + Sync, + M::Error: 'static + std::error::Error + Send + Sync, +{ + async fn send_raw_scheduled( + &self, + payload: &Self::Payload, + delay: Duration, + ) -> Result<(), QueueError> { + let timestamp: i64 = (std::time::SystemTime::now() + delay) + .duration_since(std::time::UNIX_EPOCH) + .map_err(QueueError::generic)? + .as_secs() + .try_into() .map_err(QueueError::generic)?; + let mut conn = self.redis.get().await.map_err(QueueError::generic)?; + redis::Cmd::zadd( + &self.delayed_queue_key, + to_delayed_queue_key(payload)?, + timestamp, + ) + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + tracing::trace!("RedisQueue: event sent > (delay: {:?})", delay); Ok(()) } } @@ -260,7 +536,7 @@ where // Ensure an empty vec is never returned let read_out: StreamReadReply = redis::Cmd::xread_options( &[&self.queue_key], - &[">"], + &[LISTEN_STREAM_ID], &StreamReadOptions::default() .group(&self.consumer_group, &self.consumer_name) .block(100_000) @@ -285,7 +561,7 @@ where let read_out: StreamReadReply = redis::Cmd::xread_options( &[&self.queue_key], - &[">"], + &[LISTEN_STREAM_ID], &StreamReadOptions::default() .group(&self.consumer_group, &self.consumer_name) .block( diff --git a/omniqueue/src/backends/sqs.rs b/omniqueue/src/backends/sqs.rs index 5f79ea0..fc6abe5 100644 --- a/omniqueue/src/backends/sqs.rs +++ b/omniqueue/src/backends/sqs.rs @@ -221,7 +221,7 @@ impl ScheduledProducer for SqsQueueProducer { async fn send_raw_scheduled( &self, payload: &Self::Payload, - delay: std::time::Duration, + delay: Duration, ) -> Result<(), QueueError> { self.client .send_message() diff --git a/omniqueue/src/scheduled/mod.rs b/omniqueue/src/scheduled/mod.rs index cd0eed1..371fcc6 100644 --- a/omniqueue/src/scheduled/mod.rs +++ b/omniqueue/src/scheduled/mod.rs @@ -15,7 +15,7 @@ use crate::{ QueueError, QueuePayload, }; -// FIXME(onelson): unused? +// FIXME(onelson): only used by redis -- is this meant to be called internally or by the caller building the backend? #[async_trait] pub trait SchedulerBackend: QueueBackend { async fn start_scheduler_background_task( diff --git a/omniqueue/tests/rabbitmq.rs b/omniqueue/tests/rabbitmq.rs index 8a7efbe..d5e44f0 100644 --- a/omniqueue/tests/rabbitmq.rs +++ b/omniqueue/tests/rabbitmq.rs @@ -1,11 +1,14 @@ +use lapin::options::ExchangeDeclareOptions; +use lapin::types::AMQPValue; use lapin::{ options::{BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions}, types::FieldTable, - BasicProperties, Connection, ConnectionProperties, + BasicProperties, Connection, ConnectionProperties, ExchangeKind, }; use omniqueue::{ backends::rabbitmq::{RabbitMqBackend, RabbitMqConfig}, queue::{consumer::QueueConsumer, producer::QueueProducer, QueueBackend, QueueBuilder, Static}, + scheduled::ScheduledProducer, }; use serde::{Deserialize, Serialize}; use std::time::{Duration, Instant}; @@ -49,10 +52,39 @@ async fn make_test_queue( .await .unwrap(); + const DELAY_EXCHANGE: &str = "later-alligator"; + let mut args = FieldTable::default(); + args.insert( + "x-delayed-type".into(), + AMQPValue::LongString("direct".into()), + ); + channel + .exchange_declare( + DELAY_EXCHANGE, + ExchangeKind::Custom("x-delayed-message".to_string()), + ExchangeDeclareOptions { + auto_delete: true, + ..Default::default() + }, + args, + ) + .await + .unwrap(); + channel + .queue_bind( + &queue_name, + DELAY_EXCHANGE, + &queue_name, + Default::default(), + Default::default(), + ) + .await + .unwrap(); + let config = RabbitMqConfig { uri: MQ_URI.to_owned(), connection_properties: options, - publish_exchange: "".to_owned(), + publish_exchange: DELAY_EXCHANGE.to_string(), publish_routing_key: queue_name.clone(), publish_options: BasicPublishOptions::default(), publish_properties: BasicProperties::default(), @@ -272,3 +304,27 @@ async fn test_send_recv_all_late_arriving_items() { assert!(elapsed >= deadline); assert!(elapsed <= deadline + Duration::from_millis(200)); } + +#[tokio::test] +async fn test_scheduled() { + let payload1 = ExType { a: 1 }; + let (p, mut c) = make_test_queue(None, false) + .await + .build_pair() + .await + .unwrap(); + + let delay = Duration::from_secs(3); + let now = Instant::now(); + p.send_serde_json_scheduled(&payload1, delay).await.unwrap(); + let delivery = c + .receive_all(1, delay * 2) + .await + .unwrap() + .into_iter() + .next() + .unwrap(); + assert!(now.elapsed() >= delay); + assert!(now.elapsed() < delay * 2); + assert_eq!(Some(payload1), delivery.payload_serde_json().unwrap()); +} diff --git a/omniqueue/tests/redis.rs b/omniqueue/tests/redis.rs index a33edb5..8f5f50e 100644 --- a/omniqueue/tests/redis.rs +++ b/omniqueue/tests/redis.rs @@ -1,6 +1,7 @@ use omniqueue::{ backends::redis::{RedisConfig, RedisQueueBackend}, queue::{consumer::QueueConsumer, producer::QueueProducer, QueueBackend, QueueBuilder, Static}, + scheduled::ScheduledProducer, }; use redis::{AsyncCommands, Client, Commands}; use serde::{Deserialize, Serialize}; @@ -42,6 +43,7 @@ async fn make_test_queue() -> (QueueBuilder, RedisStr max_connections: 8, reinsert_on_nack: false, queue_key: stream_name.clone(), + delayed_queue_key: format!("{stream_name}::delayed"), consumer_group: "test_cg".to_owned(), consumer_name: "test_cn".to_owned(), payload_key: "payload".to_owned(), @@ -243,3 +245,25 @@ async fn test_send_recv_all_late_arriving_items() { assert!(elapsed >= deadline); assert!(elapsed <= deadline + Duration::from_millis(200)); } + +#[tokio::test] +async fn test_scheduled() { + let payload1 = ExType { a: 1 }; + let (builder, _drop) = make_test_queue().await; + + let (p, mut c) = builder.build_pair().await.unwrap(); + + let delay = Duration::from_secs(3); + let now = Instant::now(); + p.send_serde_json_scheduled(&payload1, delay).await.unwrap(); + let delivery = c + .receive_all(1, delay * 2) + .await + .unwrap() + .into_iter() + .next() + .unwrap(); + assert!(now.elapsed() >= delay); + assert!(now.elapsed() < delay * 2); + assert_eq!(Some(payload1), delivery.payload_serde_json().unwrap()); +} diff --git a/omniqueue/tests/redis_cluster.rs b/omniqueue/tests/redis_cluster.rs index 0f0f0b4..24683cb 100644 --- a/omniqueue/tests/redis_cluster.rs +++ b/omniqueue/tests/redis_cluster.rs @@ -1,6 +1,7 @@ use omniqueue::{ backends::redis::{RedisClusterQueueBackend, RedisConfig}, queue::{consumer::QueueConsumer, producer::QueueProducer, QueueBackend, QueueBuilder, Static}, + scheduled::ScheduledProducer, }; use redis::{cluster::ClusterClient, AsyncCommands, Commands}; use serde::{Deserialize, Serialize}; @@ -45,6 +46,7 @@ async fn make_test_queue() -> ( max_connections: 8, reinsert_on_nack: false, queue_key: stream_name.clone(), + delayed_queue_key: format!("{stream_name}::delay"), consumer_group: "test_cg".to_owned(), consumer_name: "test_cn".to_owned(), payload_key: "payload".to_owned(), @@ -246,3 +248,25 @@ async fn test_send_recv_all_late_arriving_items() { assert!(elapsed >= deadline); assert!(elapsed <= deadline + Duration::from_millis(200)); } + +#[tokio::test] +async fn test_scheduled() { + let payload1 = ExType { a: 1 }; + + let (builder, _drop) = make_test_queue().await; + let (p, mut c) = builder.build_pair().await.unwrap(); + + let delay = Duration::from_secs(3); + let now = Instant::now(); + p.send_serde_json_scheduled(&payload1, delay).await.unwrap(); + let delivery = c + .receive_all(1, delay * 2) + .await + .unwrap() + .into_iter() + .next() + .unwrap(); + assert!(now.elapsed() >= delay); + assert!(now.elapsed() < delay * 2); + assert_eq!(Some(payload1), delivery.payload_serde_json().unwrap()); +} diff --git a/omniqueue/tests/sqs.rs b/omniqueue/tests/sqs.rs index f696a4e..54987c0 100644 --- a/omniqueue/tests/sqs.rs +++ b/omniqueue/tests/sqs.rs @@ -2,6 +2,7 @@ use aws_sdk_sqs::Client; use omniqueue::{ backends::sqs::{SqsConfig, SqsQueueBackend}, queue::{consumer::QueueConsumer, producer::QueueProducer, QueueBackend, QueueBuilder, Static}, + scheduled::ScheduledProducer, }; use serde::{Deserialize, Serialize}; use std::time::{Duration, Instant}; @@ -223,3 +224,23 @@ async fn test_send_recv_all_late_arriving_items() { assert!(elapsed >= deadline); assert!(elapsed <= deadline + Duration::from_millis(200)); } + +#[tokio::test] +async fn test_scheduled() { + let payload1 = ExType { a: 1 }; + let (p, mut c) = make_test_queue().await.build_pair().await.unwrap(); + + let delay = Duration::from_secs(3); + let now = Instant::now(); + p.send_serde_json_scheduled(&payload1, delay).await.unwrap(); + let delivery = c + .receive_all(1, delay * 2) + .await + .unwrap() + .into_iter() + .next() + .unwrap(); + assert!(now.elapsed() >= delay); + assert!(now.elapsed() < delay * 2); + assert_eq!(Some(payload1), delivery.payload_serde_json().unwrap()); +} diff --git a/testing-docker-compose.yml b/testing-docker-compose.yml index c8b8b6a..a7db678 100644 --- a/testing-docker-compose.yml +++ b/testing-docker-compose.yml @@ -1,10 +1,15 @@ version: "3.7" services: - mq: + rabbitmq: image: rabbitmq:3.11.11-management-alpine ports: - "5672:5672" - "15672:15672" + environment: + RABBITMQ_PLUGINS_DIR: "/opt/rabbitmq/plugins:/usr/lib/rabbitmq/plugins" + volumes: + - ./_rabbit/enabled_plugins:/etc/rabbitmq/enabled_plugins + - ./_rabbit/plugins:/usr/lib/rabbitmq/plugins elasticmq: # Drop-in SQS replacement image: softwaremill/elasticmq:1.3.14