From a86478148dcd89a3c691b522e9e876f16ca92b93 Mon Sep 17 00:00:00 2001 From: James Lucas Date: Thu, 5 Sep 2024 09:52:54 -0500 Subject: [PATCH] Give InternalPayload* structs named fields --- omniqueue/src/backends/redis/fallback.rs | 13 ++++--- omniqueue/src/backends/redis/mod.rs | 48 ++++++++++++++++-------- omniqueue/src/backends/redis/streams.rs | 34 ++++++++++++----- 3 files changed, 65 insertions(+), 30 deletions(-) diff --git a/omniqueue/src/backends/redis/fallback.rs b/omniqueue/src/backends/redis/fallback.rs index afe3126..aeefc1d 100644 --- a/omniqueue/src/backends/redis/fallback.rs +++ b/omniqueue/src/backends/redis/fallback.rs @@ -26,7 +26,7 @@ pub(super) async fn send_raw( .map_err(QueueError::generic)? .lpush( &producer.queue_key, - internal_to_list_payload(InternalPayload(payload, 0)), + internal_to_list_payload(InternalPayload::new(payload)), ) .await .map_err(QueueError::generic) @@ -79,7 +79,10 @@ async fn receive_with_timeout( } fn internal_to_delivery( - InternalPayloadOwned(payload, num_receives): InternalPayloadOwned, + InternalPayloadOwned { + payload, + num_receives, + }: InternalPayloadOwned, consumer: &RedisConsumer, old_payload: Vec, ) -> Result { @@ -139,7 +142,7 @@ impl Acker for RedisFallbackAcker { // seems possible given that we're already in a failure // scenario), just push the full `InternalPayload` onto the DLQ: let payload = match internal_from_list(&self.old_payload) { - Ok(InternalPayload(payload, _)) => payload, + Ok(InternalPayload { payload, .. }) => payload, Err(e) => { warn!(error = ?e, "Failed to get original payload, sending to DLQ with internal payload"); &self.old_payload @@ -249,7 +252,7 @@ async fn reenqueue_timed_out_messages( for key in keys { if key <= validity_limit { let internal = internal_from_list(&key)?; - let num_receives = internal.num_receives(); + let num_receives = internal.num_receives; match &dlq_config { Some(dlq_config) if dlq_config.max_retries_reached(num_receives) => { @@ -257,7 +260,7 @@ async fn reenqueue_timed_out_messages( num_receives = num_receives, "Maximum attempts reached for message, moving item to DLQ", ); - send_to_dlq(pool, dlq_config, internal.payload()).await?; + send_to_dlq(pool, dlq_config, internal.payload).await?; } _ => { trace!( diff --git a/omniqueue/src/backends/redis/mod.rs b/omniqueue/src/backends/redis/mod.rs index 73534c4..46b9225 100644 --- a/omniqueue/src/backends/redis/mod.rs +++ b/omniqueue/src/backends/redis/mod.rs @@ -88,25 +88,38 @@ impl RedisConnection for RedisClusterConnectionManager { // First element is the raw payload slice, second // is `num_receives`, the number of the times // the message has previously been received. -struct InternalPayload<'a>(&'a [u8], usize); +struct InternalPayload<'a> { + payload: &'a [u8], + num_receives: usize, +} impl<'a> InternalPayload<'a> { - fn payload(&self) -> &[u8] { - self.0 - } - - fn num_receives(&self) -> usize { - self.1 + fn new(payload: &'a [u8]) -> Self { + Self { + payload, + num_receives: 0, + } } } // The same as `InternalPayload` but with an // owned payload. -struct InternalPayloadOwned(Vec, usize); +struct InternalPayloadOwned { + payload: Vec, + num_receives: usize, +} impl From> for InternalPayloadOwned { - fn from(InternalPayload(payload, num_receives): InternalPayload) -> Self { - Self(payload.to_vec(), num_receives) + fn from( + InternalPayload { + payload, + num_receives, + }: InternalPayload, + ) -> Self { + Self { + payload: payload.to_vec(), + num_receives, + } } } @@ -141,13 +154,18 @@ fn internal_from_list(payload: &[u8]) -> Result> { 1 }; - Ok(InternalPayload( - &payload[payload_sep_pos + 1..], + Ok(InternalPayload { + payload: &payload[payload_sep_pos + 1..], num_receives, - )) + }) } -fn internal_to_list_payload(InternalPayload(payload, num_receives): InternalPayload) -> Vec { +fn internal_to_list_payload( + InternalPayload { + payload, + num_receives, + }: InternalPayload, +) -> Vec { let id = delayed_key_id(); let num_receives = num_receives.to_string(); let mut result = @@ -654,7 +672,7 @@ impl RedisProducer { .map_err(QueueError::generic)? .zadd( &self.delayed_queue_key, - internal_to_list_payload(InternalPayload(payload, 0)), + internal_to_list_payload(InternalPayload::new(payload)), timestamp, ) .await diff --git a/omniqueue/src/backends/redis/streams.rs b/omniqueue/src/backends/redis/streams.rs index 0b157a6..32b1ab4 100644 --- a/omniqueue/src/backends/redis/streams.rs +++ b/omniqueue/src/backends/redis/streams.rs @@ -31,10 +31,10 @@ const PENDING_BATCH_SIZE: usize = 1000; macro_rules! internal_to_stream_payload { ($internal_payload:expr, $payload_key:expr) => { &[ - ($payload_key, $internal_payload.payload()), + ($payload_key, $internal_payload.payload), ( NUM_RECEIVES, - $internal_payload.num_receives().to_string().as_bytes(), + $internal_payload.num_receives.to_string().as_bytes(), ), ] }; @@ -52,7 +52,10 @@ pub(super) async fn send_raw( .xadd( &producer.queue_key, GENERATE_STREAM_ID, - internal_to_stream_payload!(InternalPayload(payload, 0), producer.payload_key.as_str()), + internal_to_stream_payload!( + InternalPayload::new(payload), + producer.payload_key.as_str() + ), ) .await .map_err(QueueError::generic) @@ -141,11 +144,17 @@ fn internal_from_stream(stream_id: &StreamId, payload_key: &str) -> Result( - InternalPayloadOwned(payload, num_receives): InternalPayloadOwned, + InternalPayloadOwned { + payload, + num_receives, + }: InternalPayloadOwned, consumer: &RedisConsumer, entry_id: String, ) -> Delivery { @@ -241,11 +250,11 @@ pub(super) async fn add_to_main_queue( for key in keys { // We don't care about `num_receives` here since we're // re-queuing from delayed queue: - let InternalPayload(payload, _) = internal_from_list(key)?; + let InternalPayload { payload, .. } = internal_from_list(key)?; let _ = pipe.xadd( main_queue_name, GENERATE_STREAM_ID, - internal_to_stream_payload!(InternalPayload(payload, 0), payload_key), + internal_to_stream_payload!(InternalPayload::new(payload), payload_key), ); } @@ -376,8 +385,10 @@ async fn reenqueue_timed_out_messages( // And reinsert the map of KV pairs into the MAIN queue with a new stream ID for stream_id in &ids { - let InternalPayloadOwned(payload, num_receives) = - internal_from_stream(stream_id, payload_key)?; + let InternalPayloadOwned { + payload, + num_receives, + } = internal_from_stream(stream_id, payload_key)?; if let Some(dlq_config) = &dlq_config { if num_receives >= dlq_config.max_receives { @@ -400,7 +411,10 @@ async fn reenqueue_timed_out_messages( main_queue_name, GENERATE_STREAM_ID, internal_to_stream_payload!( - InternalPayload(payload.as_slice(), num_receives), + InternalPayload { + payload: payload.as_slice(), + num_receives + }, payload_key ), );