Skip to content

Commit

Permalink
redis: Allow customizing the processing queue key (#81)
Browse files Browse the repository at this point in the history
  • Loading branch information
svix-jplatte authored May 3, 2024
1 parent f50cf3d commit e31229e
Showing 1 changed file with 24 additions and 3 deletions.
27 changes: 24 additions & 3 deletions omniqueue/src/backends/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ impl<R: RedisConnection> QueueBackend for RedisBackend<R> {
pub struct RedisBackendBuilder<R = RedisConnectionManager, S = Static> {
config: RedisConfig,
use_redis_streams: bool,
processing_queue_key: Option<String>,
_phantom: PhantomData<fn() -> (R, S)>,
}

Expand All @@ -208,6 +209,7 @@ impl<R: RedisConnection> RedisBackendBuilder<R> {
Self {
config,
use_redis_streams: true,
processing_queue_key: None,
_phantom: PhantomData,
}
}
Expand All @@ -216,6 +218,7 @@ impl<R: RedisConnection> RedisBackendBuilder<R> {
RedisBackendBuilder {
config: self.config,
use_redis_streams: self.use_redis_streams,
processing_queue_key: self.processing_queue_key,
_phantom: PhantomData,
}
}
Expand Down Expand Up @@ -249,6 +252,24 @@ impl<R: RedisConnection> RedisBackendBuilder<R> {
self
}

/// Set a custom redis key for the processing queue.
///
/// This secondary queue is only used if you set
/// <code>[`use_redis_streams`][Self::use_redis_streams](false)</code>.
/// If you don't set a custom key, one will be selected based on the main
/// queue key. You only have to call this if you need precise control over
/// which keys omniqueue uses in redis.
pub fn processing_queue_key(mut self, value: String) -> Self {
self.processing_queue_key = Some(value);
self
}

fn get_processing_queue_key(&self) -> String {
self.processing_queue_key
.clone()
.unwrap_or_else(|| format!("{}_processing", self.config.queue_key))
}

pub async fn build_pair(self) -> Result<(RedisProducer<R>, RedisConsumer<R>)> {
let redis = R::from_dsn(&self.config.dsn)?;
let redis = bb8::Pool::builder()
Expand All @@ -258,7 +279,7 @@ impl<R: RedisConnection> RedisBackendBuilder<R> {
.map_err(QueueError::generic)?;

let background_tasks = self.start_background_tasks(redis.clone()).await;
let processing_queue_key = format!("{}_processing", self.config.queue_key);
let processing_queue_key = self.get_processing_queue_key();

Ok((
RedisProducer {
Expand Down Expand Up @@ -310,7 +331,7 @@ impl<R: RedisConnection> RedisBackendBuilder<R> {
.map_err(QueueError::generic)?;

let _background_tasks = self.start_background_tasks(redis.clone()).await;
let processing_queue_key = format!("{}_processing", self.config.queue_key);
let processing_queue_key = self.get_processing_queue_key();

Ok(RedisConsumer {
redis,
Expand Down Expand Up @@ -388,7 +409,7 @@ impl<R: RedisConnection> RedisBackendBuilder<R> {
join_set.spawn(fallback::background_task_processing(
redis.clone(),
self.config.queue_key.to_owned(),
format!("{}_processing", self.config.queue_key),
self.get_processing_queue_key(),
self.config.ack_deadline_ms,
));
}
Expand Down

0 comments on commit e31229e

Please sign in to comment.