From 8d8e552fa9151271b13ba57a6162860d34bb1196 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Fri, 9 Aug 2024 14:36:45 +0200 Subject: [PATCH] Make Delivery fields private, provider a constructor --- omniqueue/src/backends/azure_queue_storage.rs | 10 +++++----- omniqueue/src/backends/gcp_pubsub.rs | 10 +++++----- omniqueue/src/backends/in_memory.rs | 10 +++++----- omniqueue/src/backends/rabbitmq.rs | 10 +++++----- omniqueue/src/backends/redis/fallback.rs | 10 +++++----- omniqueue/src/backends/redis/streams.rs | 10 +++++----- omniqueue/src/backends/sqs.rs | 10 +++++----- omniqueue/src/queue/mod.rs | 11 +++++++++-- 8 files changed, 44 insertions(+), 37 deletions(-) diff --git a/omniqueue/src/backends/azure_queue_storage.rs b/omniqueue/src/backends/azure_queue_storage.rs index b21ab84..341a554 100644 --- a/omniqueue/src/backends/azure_queue_storage.rs +++ b/omniqueue/src/backends/azure_queue_storage.rs @@ -180,14 +180,14 @@ impl Acker for AqsAcker { impl AqsConsumer { fn wrap_message(&self, message: &Message) -> Delivery { - Delivery { - acker: Box::new(AqsAcker { + Delivery::new( + message.message_text.as_bytes().to_owned(), + AqsAcker { client: self.client.clone(), pop_receipt: message.pop_receipt(), already_acked_or_nacked: false, - }), - payload: Some(message.message_text.as_bytes().to_owned()), - } + }, + ) } /// Note that blocking receives are not supported by Azure Queue Storage. diff --git a/omniqueue/src/backends/gcp_pubsub.rs b/omniqueue/src/backends/gcp_pubsub.rs index 6882544..476c38b 100644 --- a/omniqueue/src/backends/gcp_pubsub.rs +++ b/omniqueue/src/backends/gcp_pubsub.rs @@ -223,13 +223,13 @@ impl GcpPubSubConsumer { // returned _outside of the Acker_. let payload = recv_msg.message.data.drain(..).collect(); - Delivery { - acker: Box::new(GcpPubSubAcker { + Delivery::new( + payload, + GcpPubSubAcker { recv_msg, subscription_id: self.subscription_id.clone(), - }), - payload: Some(payload), - } + }, + ) } } diff --git a/omniqueue/src/backends/in_memory.rs b/omniqueue/src/backends/in_memory.rs index 64ab26c..4c97fba 100644 --- a/omniqueue/src/backends/in_memory.rs +++ b/omniqueue/src/backends/in_memory.rs @@ -96,14 +96,14 @@ pub struct InMemoryConsumer { impl InMemoryConsumer { fn wrap_payload(&self, payload: Vec) -> Delivery { - Delivery { - payload: Some(payload.clone()), - acker: Box::new(InMemoryAcker { + Delivery::new( + payload.clone(), + InMemoryAcker { tx: self.tx.clone(), payload_copy: Some(payload), already_acked_or_nacked: false, - }), - } + }, + ) } pub async fn receive(&mut self) -> Result { diff --git a/omniqueue/src/backends/rabbitmq.rs b/omniqueue/src/backends/rabbitmq.rs index 31e3152..70b646a 100644 --- a/omniqueue/src/backends/rabbitmq.rs +++ b/omniqueue/src/backends/rabbitmq.rs @@ -215,13 +215,13 @@ pub struct RabbitMqConsumer { impl RabbitMqConsumer { fn wrap_delivery(&self, delivery: lapin::message::Delivery) -> Delivery { - Delivery { - payload: Some(delivery.data), - acker: Box::new(RabbitMqAcker { + Delivery::new( + delivery.data, + RabbitMqAcker { acker: Some(delivery.acker), requeue_on_nack: self.requeue_on_nack, - }), - } + }, + ) } pub async fn receive(&mut self) -> Result { diff --git a/omniqueue/src/backends/redis/fallback.rs b/omniqueue/src/backends/redis/fallback.rs index 061eb30..3eb88e1 100644 --- a/omniqueue/src/backends/redis/fallback.rs +++ b/omniqueue/src/backends/redis/fallback.rs @@ -68,15 +68,15 @@ async fn receive_with_timeout( fn make_delivery(consumer: &RedisConsumer, key: &[u8]) -> Result { let (_, payload) = from_key(key)?; - Ok(Delivery { - payload: Some(payload.to_owned()), - acker: Box::new(RedisFallbackAcker { + Ok(Delivery::new( + payload.to_owned(), + RedisFallbackAcker { redis: consumer.redis.clone(), processing_queue_key: consumer.processing_queue_key.clone(), key: key.to_owned(), already_acked_or_nacked: false, - }), - }) + }, + )) } struct RedisFallbackAcker { diff --git a/omniqueue/src/backends/redis/streams.rs b/omniqueue/src/backends/redis/streams.rs index ac12b4b..764a2f6 100644 --- a/omniqueue/src/backends/redis/streams.rs +++ b/omniqueue/src/backends/redis/streams.rs @@ -113,16 +113,16 @@ fn wrap_entry( .ok_or(QueueError::NoData)?; let payload: Vec = redis::from_redis_value(payload).map_err(QueueError::generic)?; - Ok(Delivery { - payload: Some(payload), - acker: Box::new(RedisStreamsAcker { + Ok(Delivery::new( + payload, + RedisStreamsAcker { redis: consumer.redis.clone(), queue_key: consumer.queue_key.to_owned(), consumer_group: consumer.consumer_group.to_owned(), entry_id, already_acked_or_nacked: false, - }), - }) + }, + )) } struct RedisStreamsAcker { diff --git a/omniqueue/src/backends/sqs.rs b/omniqueue/src/backends/sqs.rs index 201582c..e42102f 100644 --- a/omniqueue/src/backends/sqs.rs +++ b/omniqueue/src/backends/sqs.rs @@ -319,15 +319,15 @@ pub struct SqsConsumer { impl SqsConsumer { fn wrap_message(&self, message: &Message) -> Delivery { - Delivery { - acker: Box::new(SqsAcker { + Delivery::new( + message.body().unwrap_or_default().as_bytes().to_owned(), + SqsAcker { ack_client: self.client.clone(), queue_dsn: self.queue_dsn.clone(), receipt_handle: message.receipt_handle().map(ToOwned::to_owned), has_been_acked_or_nacked: false, - }), - payload: Some(message.body().unwrap_or_default().as_bytes().to_owned()), - } + }, + ) } pub async fn receive(&self) -> Result { diff --git a/omniqueue/src/queue/mod.rs b/omniqueue/src/queue/mod.rs index d3845b3..fe889c4 100644 --- a/omniqueue/src/queue/mod.rs +++ b/omniqueue/src/queue/mod.rs @@ -44,11 +44,18 @@ pub trait QueueBackend { /// The output of queue backends pub struct Delivery { - pub(crate) payload: Option>, - pub(crate) acker: Box, + payload: Option>, + acker: Box, } impl Delivery { + pub(crate) fn new(payload: Vec, acker: impl Acker + 'static) -> Self { + Self { + payload: Some(payload), + acker: Box::new(acker), + } + } + /// Acknowledges the receipt and successful processing of this [`Delivery`]. /// /// On failure, `self` is returned alongside the error to allow retrying.