From e31229ed9e72db5751036be4abd1b560d6c30381 Mon Sep 17 00:00:00 2001 From: Jonas Platte <158304798+svix-jplatte@users.noreply.github.com> Date: Fri, 3 May 2024 16:03:39 +0200 Subject: [PATCH] redis: Allow customizing the processing queue key (#81) --- omniqueue/src/backends/redis/mod.rs | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/omniqueue/src/backends/redis/mod.rs b/omniqueue/src/backends/redis/mod.rs index 9f838ec..83fa1b2 100644 --- a/omniqueue/src/backends/redis/mod.rs +++ b/omniqueue/src/backends/redis/mod.rs @@ -197,6 +197,7 @@ impl QueueBackend for RedisBackend { pub struct RedisBackendBuilder { config: RedisConfig, use_redis_streams: bool, + processing_queue_key: Option, _phantom: PhantomData (R, S)>, } @@ -208,6 +209,7 @@ impl RedisBackendBuilder { Self { config, use_redis_streams: true, + processing_queue_key: None, _phantom: PhantomData, } } @@ -216,6 +218,7 @@ impl RedisBackendBuilder { RedisBackendBuilder { config: self.config, use_redis_streams: self.use_redis_streams, + processing_queue_key: self.processing_queue_key, _phantom: PhantomData, } } @@ -249,6 +252,24 @@ impl RedisBackendBuilder { self } + /// Set a custom redis key for the processing queue. + /// + /// This secondary queue is only used if you set + /// [`use_redis_streams`][Self::use_redis_streams](false). + /// If you don't set a custom key, one will be selected based on the main + /// queue key. You only have to call this if you need precise control over + /// which keys omniqueue uses in redis. + pub fn processing_queue_key(mut self, value: String) -> Self { + self.processing_queue_key = Some(value); + self + } + + fn get_processing_queue_key(&self) -> String { + self.processing_queue_key + .clone() + .unwrap_or_else(|| format!("{}_processing", self.config.queue_key)) + } + pub async fn build_pair(self) -> Result<(RedisProducer, RedisConsumer)> { let redis = R::from_dsn(&self.config.dsn)?; let redis = bb8::Pool::builder() @@ -258,7 +279,7 @@ impl RedisBackendBuilder { .map_err(QueueError::generic)?; let background_tasks = self.start_background_tasks(redis.clone()).await; - let processing_queue_key = format!("{}_processing", self.config.queue_key); + let processing_queue_key = self.get_processing_queue_key(); Ok(( RedisProducer { @@ -310,7 +331,7 @@ impl RedisBackendBuilder { .map_err(QueueError::generic)?; let _background_tasks = self.start_background_tasks(redis.clone()).await; - let processing_queue_key = format!("{}_processing", self.config.queue_key); + let processing_queue_key = self.get_processing_queue_key(); Ok(RedisConsumer { redis, @@ -388,7 +409,7 @@ impl RedisBackendBuilder { join_set.spawn(fallback::background_task_processing( redis.clone(), self.config.queue_key.to_owned(), - format!("{}_processing", self.config.queue_key), + self.get_processing_queue_key(), self.config.ack_deadline_ms, )); }