Skip to content

Commit

Permalink
redrive-dlq
Browse files Browse the repository at this point in the history
  • Loading branch information
jaymell committed Sep 5, 2024
1 parent 5f8584c commit 5d3eb70
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 11 deletions.
8 changes: 7 additions & 1 deletion omniqueue/src/backends/azure_queue_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,17 @@ impl AqsProducer {
let payload = serde_json::to_string(payload)?;
self.send_raw_scheduled(&payload, delay).await
}

pub async fn redrive_dlq(&self) -> Result<()> {
Err(QueueError::Unsupported(
"redrive_dlq is not supported by AqsBackend",
))
}
}

impl crate::QueueProducer for AqsProducer {
type Payload = String;
omni_delegate!(send_raw, send_serde_json);
omni_delegate!(send_raw, send_serde_json, redrive_dlq);
}
impl crate::ScheduledQueueProducer for AqsProducer {
omni_delegate!(send_raw_scheduled, send_serde_json_scheduled);
Expand Down
8 changes: 7 additions & 1 deletion omniqueue/src/backends/gcp_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ impl GcpPubSubProducer {
pub async fn send_serde_json<P: Serialize + Sync>(&self, payload: &P) -> Result<()> {
self.send_raw(&serde_json::to_vec(&payload)?).await
}

pub async fn redrive_dlq(&self) -> Result<()> {
Err(QueueError::Unsupported(
"redrive_dlq is not supported by GcpPubSubBackend",
))
}
}

impl std::fmt::Debug for GcpPubSubProducer {
Expand All @@ -178,7 +184,7 @@ impl std::fmt::Debug for GcpPubSubProducer {

impl crate::QueueProducer for GcpPubSubProducer {
type Payload = Payload;
omni_delegate!(send_raw, send_serde_json);
omni_delegate!(send_raw, send_serde_json, redrive_dlq);

/// This method is overwritten for the Google Cloud Pub/Sub backend to be
/// more efficient than the default of sequentially publishing `payloads`.
Expand Down
8 changes: 7 additions & 1 deletion omniqueue/src/backends/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,17 @@ impl InMemoryProducer {
let payload = serde_json::to_vec(payload)?;
self.send_raw_scheduled(&payload, delay).await
}

pub async fn redrive_dlq(&self) -> Result<()> {
Err(QueueError::Unsupported(
"redrive_dlq is not supported by InMemoryBackend",
))
}
}

impl crate::QueueProducer for InMemoryProducer {
type Payload = Vec<u8>;
omni_delegate!(send_raw, send_serde_json);
omni_delegate!(send_raw, send_serde_json, redrive_dlq);
}
impl crate::ScheduledQueueProducer for InMemoryProducer {
omni_delegate!(send_raw_scheduled, send_serde_json_scheduled);
Expand Down
8 changes: 7 additions & 1 deletion omniqueue/src/backends/rabbitmq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,17 @@ impl RabbitMqProducer {
let payload = serde_json::to_vec(payload)?;
self.send_raw_scheduled(&payload, delay).await
}

pub async fn redrive_dlq(&self) -> Result<()> {
Err(QueueError::Unsupported(
"redrive_dlq is not supported by RabbitMqBackend",
))
}
}

impl crate::QueueProducer for RabbitMqProducer {
type Payload = Vec<u8>;
omni_delegate!(send_raw, send_serde_json);
omni_delegate!(send_raw, send_serde_json, redrive_dlq);
}
impl crate::ScheduledQueueProducer for RabbitMqProducer {
omni_delegate!(send_raw_scheduled, send_serde_json_scheduled);
Expand Down
2 changes: 1 addition & 1 deletion omniqueue/src/backends/redis/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ async fn send_to_dlq<R: RedisConnection>(
.get()
.await
.map_err(QueueError::generic)?
.lpush(dlq, payload)
.rpush(dlq, payload)
.await
.map_err(QueueError::generic)?;

Expand Down
53 changes: 51 additions & 2 deletions omniqueue/src/backends/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ impl<R: RedisConnection> RedisBackendBuilder<R> {
payload_key: self.config.payload_key.clone(),
use_redis_streams: self.use_redis_streams,
_background_tasks: background_tasks.clone(),
dlq_config: self.config.dlq_config.clone(),
},
RedisConsumer {
redis,
Expand Down Expand Up @@ -421,6 +422,7 @@ impl<R: RedisConnection> RedisBackendBuilder<R> {
payload_key: self.config.payload_key,
use_redis_streams: self.use_redis_streams,
_background_tasks,
dlq_config: self.config.dlq_config,
})
}

Expand All @@ -444,7 +446,7 @@ impl<R: RedisConnection> RedisBackendBuilder<R> {
payload_key: self.config.payload_key,
use_redis_streams: self.use_redis_streams,
_background_tasks,
dlq_config: self.config.dlq_config.clone(),
dlq_config: self.config.dlq_config,
})
}

Expand Down Expand Up @@ -644,6 +646,7 @@ pub struct RedisProducer<M: ManageConnection> {
payload_key: String,
use_redis_streams: bool,
_background_tasks: Arc<JoinSet<Result<()>>>,
dlq_config: Option<DeadLetterQueueConfig>,
}

impl<R: RedisConnection> RedisProducer<R> {
Expand Down Expand Up @@ -698,11 +701,57 @@ impl<R: RedisConnection> RedisProducer<R> {
let payload = serde_json::to_vec(payload)?;
self.send_raw_scheduled(&payload, delay).await
}

pub async fn redrive_dlq(&self) -> Result<()> {
const BATCH_SIZE: isize = 50;

let DeadLetterQueueConfig { queue_key: dlq, .. } = self.dlq_config.as_ref().ok_or(
QueueError::Unsupported("Cannot redrive if there's no DeadLetterQueueConfig"),
)?;

loop {
let mut conn = self.redis.get().await.map_err(QueueError::generic)?;
let old_payloads: Vec<RawPayload> = conn
.lrange(dlq, 0, BATCH_SIZE)
.await
.map_err(QueueError::generic)?;

if old_payloads.is_empty() {
break;
}

let new_payloads = old_payloads
.iter()
.map(|x| InternalPayload::new(x))
.collect::<Vec<_>>();

if self.use_redis_streams {
streams::add_to_main_queue(
new_payloads,
&self.queue_key,
&self.payload_key,
&mut *conn,
)
.await?;
} else {
fallback::add_to_main_queue(new_payloads, &self.queue_key, &mut *conn).await?;
}

for payload in old_payloads {
let _: () = conn
.lrem(dlq, 1, &payload)
.await
.map_err(QueueError::generic)?;
}
}

Ok(())
}
}

impl<R: RedisConnection> crate::QueueProducer for RedisProducer<R> {
type Payload = Vec<u8>;
omni_delegate!(send_raw, send_serde_json);
omni_delegate!(send_raw, send_serde_json, redrive_dlq);
}
impl<R: RedisConnection> crate::ScheduledQueueProducer for RedisProducer<R> {
omni_delegate!(send_raw_scheduled, send_serde_json_scheduled);
Expand Down
2 changes: 1 addition & 1 deletion omniqueue/src/backends/redis/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ async fn send_to_dlq<R: RedisConnection>(
.get()
.await
.map_err(QueueError::generic)?
.lpush(dlq, &payload)
.rpush(dlq, &payload)
.await
.map_err(QueueError::generic)?;

Expand Down
8 changes: 7 additions & 1 deletion omniqueue/src/backends/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,11 +355,17 @@ impl SqsProducer {

Ok(())
}

pub async fn redrive_dlq(&self) -> Result<()> {
Err(QueueError::Unsupported(
"redrive_dlq is not supported by SqsBackend",
))
}
}

impl crate::QueueProducer for SqsProducer {
type Payload = String;
omni_delegate!(send_raw, send_serde_json);
omni_delegate!(send_raw, send_serde_json, redrive_dlq);

/// This method is overwritten for the SQS backend to be more efficient
/// than the default of sequentially publishing `payloads`.
Expand Down
9 changes: 9 additions & 0 deletions omniqueue/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ macro_rules! omni_delegate {
Self::send_serde_json_scheduled(self, payload, delay)
}
};
( redrive_dlq ) => {
#[deny(unconditional_recursion)] // method call must defer to an inherent method
fn redrive_dlq(
&self,
) -> impl std::future::Future<Output = Result<()>> + Send {
Self::redrive_dlq(self)
}
};

( $method1:ident, $($rest:ident),* $(,)? ) => {
omni_delegate!($method1);
omni_delegate!($($rest),*);
Expand Down
14 changes: 13 additions & 1 deletion omniqueue/src/queue/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ pub trait QueueProducer: Send + Sync + Sized {

fn send_raw(&self, payload: &Self::Payload) -> impl Future<Output = Result<()>> + Send;

fn redrive_dlq(&self) -> impl Future<Output = Result<()>> + Send;

/// Send a batch of raw messages.
///
/// The default implementation of this sends the payloads sequentially using
Expand Down Expand Up @@ -97,6 +99,8 @@ pub(crate) trait ErasedQueueProducer: Send + Sync {
&'a self,
payload: &'a [u8],
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;

fn redrive_dlq<'a>(&'a self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
}

struct DynProducerInner<P> {
Expand All @@ -110,6 +114,10 @@ impl<P: QueueProducer> ErasedQueueProducer for DynProducerInner<P> {
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move { self.inner.send_bytes(payload).await })
}

fn redrive_dlq<'a>(&'a self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move { self.inner.redrive_dlq().await })
}
}

impl DynProducer {
Expand All @@ -121,9 +129,13 @@ impl DynProducer {
let payload = serde_json::to_vec(payload)?;
self.send_raw(&payload).await
}

pub async fn redrive_dlq(&self) -> Result<()> {
self.0.redrive_dlq().await
}
}

impl crate::QueueProducer for DynProducer {
type Payload = Vec<u8>;
omni_delegate!(send_raw, send_serde_json);
omni_delegate!(send_raw, send_serde_json, redrive_dlq);
}
9 changes: 8 additions & 1 deletion omniqueue/src/scheduled/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ impl<P: ScheduledQueueProducer> ErasedQueueProducer for DynScheduledProducerInne
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move { self.inner.send_bytes(payload).await })
}
fn redrive_dlq<'a>(&'a self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move { self.inner.redrive_dlq().await })
}
}

impl<P: ScheduledQueueProducer> ErasedScheduledQueueProducer for DynScheduledProducerInner<P> {
Expand Down Expand Up @@ -103,11 +106,15 @@ impl DynScheduledProducer {
let payload = serde_json::to_vec(payload)?;
self.0.send_raw_scheduled(&payload, delay).await
}

pub async fn redrive_dlq(&self) -> Result<()> {
self.0.redrive_dlq().await
}
}

impl crate::QueueProducer for DynScheduledProducer {
type Payload = Vec<u8>;
omni_delegate!(send_raw, send_serde_json);
omni_delegate!(send_raw, send_serde_json, redrive_dlq);
}
impl crate::ScheduledQueueProducer for DynScheduledProducer {
omni_delegate!(send_raw_scheduled, send_serde_json_scheduled);
Expand Down

0 comments on commit 5d3eb70

Please sign in to comment.