Skip to content

Commit

Permalink
Support redis max_receives config option
Browse files Browse the repository at this point in the history
This adds a `max_receives` option to both Redis queue implementations.
This is the first step in supporting deadletter queuing.

`InternalPayload` type aliases have been added to represent Omniqueue
items that track the current number of times a message has been
received (`num_receives`). `num_receives` is incremented whenever
an item is re-queued from the pending/processing queues until it
hits `max_receives`, at which point the message is abandoned. Later
we will support putting this in an optional deadletter queue.

I originally tried adding a proper struct for `InternalPayload`, but
I don't think that clarified anything and was less memory optimal,
so the relevant logic has been captured in simple functions instead.
  • Loading branch information
jaymell committed Aug 30, 2024
1 parent 6fbecdb commit 5d6fcc6
Show file tree
Hide file tree
Showing 6 changed files with 364 additions and 105 deletions.
94 changes: 63 additions & 31 deletions omniqueue/src/backends/redis/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use svix_ksuid::{KsuidLike as _, KsuidMs};
use time::OffsetDateTime;
use tracing::{error, trace};

use super::{from_key, to_key, RawPayload, RedisConnection, RedisConsumer, RedisProducer};
use super::{
internal_from_list, internal_to_fallback_delivery, internal_to_list_payload, RawPayload,
RedisConnection, RedisConsumer, RedisProducer,
};
use crate::{queue::Acker, Delivery, QueueError, Result};

pub(super) async fn send_raw<R: RedisConnection>(
Expand All @@ -21,7 +24,7 @@ pub(super) async fn send_raw<R: RedisConnection>(
.get()
.await
.map_err(QueueError::generic)?
.lpush(&producer.queue_key, to_key(payload))
.lpush(&producer.queue_key, internal_to_list_payload((payload, 0)))
.await
.map_err(QueueError::generic)
}
Expand All @@ -45,7 +48,7 @@ async fn receive_with_timeout<R: RedisConnection>(
consumer: &RedisConsumer<R>,
timeout: Duration,
) -> Result<Option<Delivery>> {
let key: Option<Vec<u8>> = consumer
let payload: Option<Vec<u8>> = consumer
.redis
.get()
.await
Expand All @@ -61,29 +64,33 @@ async fn receive_with_timeout<R: RedisConnection>(
.await
.map_err(QueueError::generic)?;

key.map(|key| make_delivery(consumer, &key)).transpose()
match payload {
Some(old_payload) => {
let (payload, num_receives) = internal_from_list(&old_payload)?;
Some(internal_to_fallback_delivery(
(payload.to_vec(), num_receives),
consumer,
old_payload,
))
.transpose()
}
None => Ok(None),
}
}

fn make_delivery<R: RedisConnection>(consumer: &RedisConsumer<R>, key: &[u8]) -> Result<Delivery> {
let (_, payload) = from_key(key)?;

Ok(Delivery::new(
payload.to_owned(),
RedisFallbackAcker {
redis: consumer.redis.clone(),
processing_queue_key: consumer.processing_queue_key.clone(),
key: key.to_owned(),
already_acked_or_nacked: false,
},
))
}
pub(super) struct RedisFallbackAcker<M: ManageConnection> {
pub(super) redis: bb8::Pool<M>,
pub(super) processing_queue_key: String,
// We delete based on the payload -- and since the
// `num_receives` changes after receiving it's the
// `old_payload`, since `num_receives` is part of the
// payload. Make sense?
pub(super) old_payload: RawPayload,

struct RedisFallbackAcker<M: ManageConnection> {
redis: bb8::Pool<M>,
processing_queue_key: String,
key: RawPayload,
pub(super) already_acked_or_nacked: bool,

already_acked_or_nacked: bool,
pub(super) max_receives: usize,
pub(super) num_receives: usize,
}

impl<R: RedisConnection> Acker for RedisFallbackAcker<R> {
Expand All @@ -97,7 +104,7 @@ impl<R: RedisConnection> Acker for RedisFallbackAcker<R> {
.get()
.await
.map_err(QueueError::generic)?
.lrem(&self.processing_queue_key, 1, &self.key)
.lrem(&self.processing_queue_key, 1, &self.old_payload)
.await
.map_err(QueueError::generic)?;

Expand All @@ -107,6 +114,11 @@ impl<R: RedisConnection> Acker for RedisFallbackAcker<R> {
}

async fn nack(&mut self) -> Result<()> {
if self.num_receives >= self.max_receives {
trace!("Maximum attempts reached");
return self.ack().await;
}

if self.already_acked_or_nacked {
return Err(QueueError::CannotAckOrNackTwice);
}
Expand Down Expand Up @@ -144,13 +156,19 @@ pub(super) async fn background_task_processing<R: RedisConnection>(
queue_key: String,
processing_queue_key: String,
ack_deadline_ms: i64,
max_receives: usize,
) -> Result<()> {
// FIXME: ack_deadline_ms should be unsigned
let ack_deadline = Duration::from_millis(ack_deadline_ms as _);
loop {
if let Err(err) =
reenqueue_timed_out_messages(&pool, &queue_key, &processing_queue_key, ack_deadline)
.await
if let Err(err) = reenqueue_timed_out_messages(
&pool,
&queue_key,
&processing_queue_key,
ack_deadline,
max_receives,
)
.await
{
error!("{err}");
tokio::time::sleep(Duration::from_millis(500)).await;
Expand All @@ -164,6 +182,7 @@ async fn reenqueue_timed_out_messages<R: RedisConnection>(
queue_key: &str,
processing_queue_key: &str,
ack_deadline: Duration,
max_receives: usize,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
const BATCH_SIZE: isize = 50;

Expand All @@ -180,10 +199,24 @@ async fn reenqueue_timed_out_messages<R: RedisConnection>(
let keys: Vec<RawPayload> = conn.lrange(processing_queue_key, 0, BATCH_SIZE).await?;
for key in keys {
if key <= validity_limit {
let internal = internal_from_list(&key)?;
let num_receives = internal.1;
if num_receives >= max_receives {
trace!(
num_receives = num_receives,
"Maximum attempts reached for message, not reenqueuing",
);
} else {
trace!(
num_receives = num_receives,
"Pushing back overdue task to queue"
);
let _: () = conn
.rpush(queue_key, internal_to_list_payload(internal))
.await?;
}

// We use LREM to be sure we only delete the keys we should be deleting
trace!("Pushing back overdue task to queue");
let refreshed_key = regenerate_key(&key)?;
let _: () = conn.rpush(queue_key, &refreshed_key).await?;
let _: () = conn.lrem(processing_queue_key, 1, &key).await?;
}
}
Expand All @@ -196,6 +229,5 @@ async fn reenqueue_timed_out_messages<R: RedisConnection>(
}

fn regenerate_key(key: &[u8]) -> Result<RawPayload> {
let (_, payload) = from_key(key)?;
Ok(to_key(payload))
Ok(internal_to_list_payload(internal_from_list(key)?))
}
Loading

0 comments on commit 5d6fcc6

Please sign in to comment.