From fc3b107e46e53a6c15b5da060cda1e50932653d1 Mon Sep 17 00:00:00 2001 From: James Lucas Date: Thu, 29 Aug 2024 21:53:37 -0500 Subject: [PATCH] Cleanup --- omniqueue/src/backends/redis/fallback.rs | 14 +++++------ omniqueue/src/backends/redis/mod.rs | 32 ++++++++++++++---------- omniqueue/src/backends/redis/streams.rs | 6 ++--- 3 files changed, 29 insertions(+), 23 deletions(-) diff --git a/omniqueue/src/backends/redis/fallback.rs b/omniqueue/src/backends/redis/fallback.rs index 2689c0d..dd29e65 100644 --- a/omniqueue/src/backends/redis/fallback.rs +++ b/omniqueue/src/backends/redis/fallback.rs @@ -26,7 +26,7 @@ pub(super) async fn send_raw( .get() .await .map_err(QueueError::generic)? - .lpush(&producer.queue_key, payload.list_payload()) + .lpush(&producer.queue_key, payload.into_list_payload()) .await .map_err(QueueError::generic) } @@ -200,16 +200,16 @@ async fn reenqueue_timed_out_messages( for key in keys { if key <= validity_limit { let payload = InternalPayload::from_list_item(&key)?; - - let refreshed_key = payload.list_payload(); - if payload.num_receives >= max_receives { + let num_receives = payload.num_receives; + let refreshed_key = payload.into_list_payload(); + if num_receives >= max_receives { trace!( - num_receives = payload.num_receives, + num_receives = num_receives, "Maximum attempts reached for message, not reenqueuing", ); } else { trace!( - num_receives = payload.num_receives, + num_receives = num_receives, "Pushing back overdue task to queue" ); let _: () = conn.rpush(queue_key, &refreshed_key).await?; @@ -228,5 +228,5 @@ async fn reenqueue_timed_out_messages( } fn regenerate_key(key: &[u8]) -> Result { - Ok(InternalPayload::from_list_item(key)?.list_payload()) + Ok(InternalPayload::from_list_item(key)?.into_list_payload()) } diff --git a/omniqueue/src/backends/redis/mod.rs b/omniqueue/src/backends/redis/mod.rs index aaf2c4c..ac3c1d5 100644 --- a/omniqueue/src/backends/redis/mod.rs +++ b/omniqueue/src/backends/redis/mod.rs @@ -94,10 +94,11 @@ struct InternalPayload { } impl InternalPayload { - // This method is a bit goofy b/c incrementing `num_receives` is - // done when deserializing. This could also be broken into a separate + // This method is a bit goofy b/c `num_receives` is incremented + // when instantiated. This could also be broken into a separate // method for clarity, but doing so may be more error-prone since - // it would need to be called everywhere this method is called: + // the struct would need to be cloned and the increment method called + // everywhere this method is called: fn from_list_item(payload: &[u8]) -> Result { // All information is stored in the key in which the ID and the [optional] // number of prior receives are separated by a `#`, and the JSON @@ -135,10 +136,10 @@ impl InternalPayload { }) } - // This method is a bit goofy b/c incrementing `num_receives` is - // done when deserializing. This could also be broken into a separate + // This method is a bit goofy b/c `num_receives` is incremented + // when instantiated. This could also be broken into a separate // method for clarity, but doing so may be more error-prone since - // it would need to be called everywhere this method is called: + // it would need to be called most places this method is called: fn from_stream_id(stream_id: &StreamId, payload_key: &str) -> Result { let StreamId { map, .. } = stream_id; @@ -163,19 +164,20 @@ impl InternalPayload { }) } - fn list_payload(&self) -> Vec { + fn into_list_payload(mut self) -> Vec { let id = delayed_key_id(); - - let mut result = Vec::with_capacity(id.len() + self.payload.len() + 1); + let num_receives = self.num_receives.to_string(); + let mut result = + Vec::with_capacity(id.len() + num_receives.as_bytes().len() + self.payload.len() + 3); result.extend(id.as_bytes()); result.push(b'#'); - result.extend(self.num_receives.to_string().as_bytes()); + result.extend(num_receives.as_bytes()); result.push(b'|'); - result.extend(&self.payload); + result.append(&mut self.payload); result } - fn stream_payload(self, payload_key: &str) -> Vec<(&str, Vec)> { + fn into_stream_payload(self, payload_key: &str) -> Vec<(&str, Vec)> { vec![ (payload_key, self.payload), ( @@ -713,7 +715,11 @@ impl RedisProducer { .get() .await .map_err(QueueError::generic)? - .zadd(&self.delayed_queue_key, payload.list_payload(), timestamp) + .zadd( + &self.delayed_queue_key, + payload.into_list_payload(), + timestamp, + ) .await .map_err(QueueError::generic)?; diff --git a/omniqueue/src/backends/redis/streams.rs b/omniqueue/src/backends/redis/streams.rs index bae21da..b85d770 100644 --- a/omniqueue/src/backends/redis/streams.rs +++ b/omniqueue/src/backends/redis/streams.rs @@ -40,7 +40,7 @@ pub(super) async fn send_raw( .xadd( &producer.queue_key, GENERATE_STREAM_ID, - &payload.stream_payload(&producer.payload_key), + &payload.into_stream_payload(&producer.payload_key), ) .await .map_err(QueueError::generic) @@ -178,7 +178,7 @@ pub(super) async fn add_to_main_queue( let _ = pipe.xadd( main_queue_name, GENERATE_STREAM_ID, - &payload.stream_payload(payload_key), + &payload.into_stream_payload(payload_key), ); } @@ -283,7 +283,7 @@ async fn reenqueue_timed_out_messages( let _ = pipe.xadd( main_queue_name, GENERATE_STREAM_ID, - &internal_payload.stream_payload(payload_key), + &internal_payload.into_stream_payload(payload_key), ); }