Skip to content

Commit

Permalink
Give InternalPayload* structs named fields
Browse files Browse the repository at this point in the history
  • Loading branch information
jaymell committed Sep 5, 2024
1 parent 2c26551 commit a864781
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 30 deletions.
13 changes: 8 additions & 5 deletions omniqueue/src/backends/redis/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub(super) async fn send_raw<R: RedisConnection>(
.map_err(QueueError::generic)?
.lpush(
&producer.queue_key,
internal_to_list_payload(InternalPayload(payload, 0)),
internal_to_list_payload(InternalPayload::new(payload)),
)
.await
.map_err(QueueError::generic)
Expand Down Expand Up @@ -79,7 +79,10 @@ async fn receive_with_timeout<R: RedisConnection>(
}

fn internal_to_delivery<R: RedisConnection>(
InternalPayloadOwned(payload, num_receives): InternalPayloadOwned,
InternalPayloadOwned {
payload,
num_receives,
}: InternalPayloadOwned,
consumer: &RedisConsumer<R>,
old_payload: Vec<u8>,
) -> Result<Delivery> {
Expand Down Expand Up @@ -139,7 +142,7 @@ impl<R: RedisConnection> Acker for RedisFallbackAcker<R> {
// seems possible given that we're already in a failure
// scenario), just push the full `InternalPayload` onto the DLQ:
let payload = match internal_from_list(&self.old_payload) {
Ok(InternalPayload(payload, _)) => payload,
Ok(InternalPayload { payload, .. }) => payload,
Err(e) => {
warn!(error = ?e, "Failed to get original payload, sending to DLQ with internal payload");
&self.old_payload
Expand Down Expand Up @@ -249,15 +252,15 @@ async fn reenqueue_timed_out_messages<R: RedisConnection>(
for key in keys {
if key <= validity_limit {
let internal = internal_from_list(&key)?;
let num_receives = internal.num_receives();
let num_receives = internal.num_receives;

match &dlq_config {
Some(dlq_config) if dlq_config.max_retries_reached(num_receives) => {
trace!(
num_receives = num_receives,
"Maximum attempts reached for message, moving item to DLQ",
);
send_to_dlq(pool, dlq_config, internal.payload()).await?;
send_to_dlq(pool, dlq_config, internal.payload).await?;
}
_ => {
trace!(
Expand Down
48 changes: 33 additions & 15 deletions omniqueue/src/backends/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,25 +88,38 @@ impl RedisConnection for RedisClusterConnectionManager {
// First element is the raw payload slice, second
// is `num_receives`, the number of the times
// the message has previously been received.
struct InternalPayload<'a>(&'a [u8], usize);
struct InternalPayload<'a> {
payload: &'a [u8],
num_receives: usize,
}

impl<'a> InternalPayload<'a> {
fn payload(&self) -> &[u8] {
self.0
}

fn num_receives(&self) -> usize {
self.1
fn new(payload: &'a [u8]) -> Self {
Self {
payload,
num_receives: 0,
}
}
}

// The same as `InternalPayload` but with an
// owned payload.
struct InternalPayloadOwned(Vec<u8>, usize);
struct InternalPayloadOwned {
payload: Vec<u8>,
num_receives: usize,
}

impl From<InternalPayload<'_>> for InternalPayloadOwned {
fn from(InternalPayload(payload, num_receives): InternalPayload) -> Self {
Self(payload.to_vec(), num_receives)
fn from(
InternalPayload {
payload,
num_receives,
}: InternalPayload,
) -> Self {
Self {
payload: payload.to_vec(),
num_receives,
}
}
}

Expand Down Expand Up @@ -141,13 +154,18 @@ fn internal_from_list(payload: &[u8]) -> Result<InternalPayload<'_>> {
1
};

Ok(InternalPayload(
&payload[payload_sep_pos + 1..],
Ok(InternalPayload {
payload: &payload[payload_sep_pos + 1..],
num_receives,
))
})
}

fn internal_to_list_payload(InternalPayload(payload, num_receives): InternalPayload) -> Vec<u8> {
fn internal_to_list_payload(
InternalPayload {
payload,
num_receives,
}: InternalPayload,
) -> Vec<u8> {
let id = delayed_key_id();
let num_receives = num_receives.to_string();
let mut result =
Expand Down Expand Up @@ -654,7 +672,7 @@ impl<R: RedisConnection> RedisProducer<R> {
.map_err(QueueError::generic)?
.zadd(
&self.delayed_queue_key,
internal_to_list_payload(InternalPayload(payload, 0)),
internal_to_list_payload(InternalPayload::new(payload)),
timestamp,
)
.await
Expand Down
34 changes: 24 additions & 10 deletions omniqueue/src/backends/redis/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ const PENDING_BATCH_SIZE: usize = 1000;
macro_rules! internal_to_stream_payload {
($internal_payload:expr, $payload_key:expr) => {
&[
($payload_key, $internal_payload.payload()),
($payload_key, $internal_payload.payload),
(
NUM_RECEIVES,
$internal_payload.num_receives().to_string().as_bytes(),
$internal_payload.num_receives.to_string().as_bytes(),
),
]
};
Expand All @@ -52,7 +52,10 @@ pub(super) async fn send_raw<R: RedisConnection>(
.xadd(
&producer.queue_key,
GENERATE_STREAM_ID,
internal_to_stream_payload!(InternalPayload(payload, 0), producer.payload_key.as_str()),
internal_to_stream_payload!(
InternalPayload::new(payload),
producer.payload_key.as_str()
),
)
.await
.map_err(QueueError::generic)
Expand Down Expand Up @@ -141,11 +144,17 @@ fn internal_from_stream(stream_id: &StreamId, payload_key: &str) -> Result<Inter
.ok_or(QueueError::NoData)
.and_then(|x| redis::from_redis_value(x).map_err(QueueError::generic))?;

Ok(InternalPayloadOwned(payload, num_receives))
Ok(InternalPayloadOwned {
payload,
num_receives,
})
}

fn internal_to_delivery<R: RedisConnection>(
InternalPayloadOwned(payload, num_receives): InternalPayloadOwned,
InternalPayloadOwned {
payload,
num_receives,
}: InternalPayloadOwned,
consumer: &RedisConsumer<R>,
entry_id: String,
) -> Delivery {
Expand Down Expand Up @@ -241,11 +250,11 @@ pub(super) async fn add_to_main_queue(
for key in keys {
// We don't care about `num_receives` here since we're
// re-queuing from delayed queue:
let InternalPayload(payload, _) = internal_from_list(key)?;
let InternalPayload { payload, .. } = internal_from_list(key)?;
let _ = pipe.xadd(
main_queue_name,
GENERATE_STREAM_ID,
internal_to_stream_payload!(InternalPayload(payload, 0), payload_key),
internal_to_stream_payload!(InternalPayload::new(payload), payload_key),
);
}

Expand Down Expand Up @@ -376,8 +385,10 @@ async fn reenqueue_timed_out_messages<R: RedisConnection>(

// And reinsert the map of KV pairs into the MAIN queue with a new stream ID
for stream_id in &ids {
let InternalPayloadOwned(payload, num_receives) =
internal_from_stream(stream_id, payload_key)?;
let InternalPayloadOwned {
payload,
num_receives,
} = internal_from_stream(stream_id, payload_key)?;

if let Some(dlq_config) = &dlq_config {
if num_receives >= dlq_config.max_receives {
Expand All @@ -400,7 +411,10 @@ async fn reenqueue_timed_out_messages<R: RedisConnection>(
main_queue_name,
GENERATE_STREAM_ID,
internal_to_stream_payload!(
InternalPayload(payload.as_slice(), num_receives),
InternalPayload {
payload: payload.as_slice(),
num_receives
},
payload_key
),
);
Expand Down

0 comments on commit a864781

Please sign in to comment.