Skip to content

Commit

Permalink
Make Delivery fields private, provider a constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
svix-jplatte committed Aug 9, 2024
1 parent 9a65015 commit 8d8e552
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 37 deletions.
10 changes: 5 additions & 5 deletions omniqueue/src/backends/azure_queue_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,14 @@ impl Acker for AqsAcker {

impl AqsConsumer {
fn wrap_message(&self, message: &Message) -> Delivery {
Delivery {
acker: Box::new(AqsAcker {
Delivery::new(
message.message_text.as_bytes().to_owned(),
AqsAcker {
client: self.client.clone(),
pop_receipt: message.pop_receipt(),
already_acked_or_nacked: false,
}),
payload: Some(message.message_text.as_bytes().to_owned()),
}
},
)
}

/// Note that blocking receives are not supported by Azure Queue Storage.
Expand Down
10 changes: 5 additions & 5 deletions omniqueue/src/backends/gcp_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,13 @@ impl GcpPubSubConsumer {
// returned _outside of the Acker_.
let payload = recv_msg.message.data.drain(..).collect();

Delivery {
acker: Box::new(GcpPubSubAcker {
Delivery::new(
payload,
GcpPubSubAcker {
recv_msg,
subscription_id: self.subscription_id.clone(),
}),
payload: Some(payload),
}
},
)
}
}

Expand Down
10 changes: 5 additions & 5 deletions omniqueue/src/backends/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,14 @@ pub struct InMemoryConsumer {

impl InMemoryConsumer {
fn wrap_payload(&self, payload: Vec<u8>) -> Delivery {
Delivery {
payload: Some(payload.clone()),
acker: Box::new(InMemoryAcker {
Delivery::new(
payload.clone(),
InMemoryAcker {
tx: self.tx.clone(),
payload_copy: Some(payload),
already_acked_or_nacked: false,
}),
}
},
)
}

pub async fn receive(&mut self) -> Result<Delivery> {
Expand Down
10 changes: 5 additions & 5 deletions omniqueue/src/backends/rabbitmq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,13 @@ pub struct RabbitMqConsumer {

impl RabbitMqConsumer {
fn wrap_delivery(&self, delivery: lapin::message::Delivery) -> Delivery {
Delivery {
payload: Some(delivery.data),
acker: Box::new(RabbitMqAcker {
Delivery::new(
delivery.data,
RabbitMqAcker {
acker: Some(delivery.acker),
requeue_on_nack: self.requeue_on_nack,
}),
}
},
)
}

pub async fn receive(&mut self) -> Result<Delivery> {
Expand Down
10 changes: 5 additions & 5 deletions omniqueue/src/backends/redis/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ async fn receive_with_timeout<R: RedisConnection>(
fn make_delivery<R: RedisConnection>(consumer: &RedisConsumer<R>, key: &[u8]) -> Result<Delivery> {
let (_, payload) = from_key(key)?;

Ok(Delivery {
payload: Some(payload.to_owned()),
acker: Box::new(RedisFallbackAcker {
Ok(Delivery::new(
payload.to_owned(),
RedisFallbackAcker {
redis: consumer.redis.clone(),
processing_queue_key: consumer.processing_queue_key.clone(),
key: key.to_owned(),
already_acked_or_nacked: false,
}),
})
},
))
}

struct RedisFallbackAcker<M: ManageConnection> {
Expand Down
10 changes: 5 additions & 5 deletions omniqueue/src/backends/redis/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,16 @@ fn wrap_entry<R: RedisConnection>(
.ok_or(QueueError::NoData)?;
let payload: Vec<u8> = redis::from_redis_value(payload).map_err(QueueError::generic)?;

Ok(Delivery {
payload: Some(payload),
acker: Box::new(RedisStreamsAcker {
Ok(Delivery::new(
payload,
RedisStreamsAcker {
redis: consumer.redis.clone(),
queue_key: consumer.queue_key.to_owned(),
consumer_group: consumer.consumer_group.to_owned(),
entry_id,
already_acked_or_nacked: false,
}),
})
},
))
}

struct RedisStreamsAcker<M: ManageConnection> {
Expand Down
10 changes: 5 additions & 5 deletions omniqueue/src/backends/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,15 +319,15 @@ pub struct SqsConsumer {

impl SqsConsumer {
fn wrap_message(&self, message: &Message) -> Delivery {
Delivery {
acker: Box::new(SqsAcker {
Delivery::new(
message.body().unwrap_or_default().as_bytes().to_owned(),
SqsAcker {
ack_client: self.client.clone(),
queue_dsn: self.queue_dsn.clone(),
receipt_handle: message.receipt_handle().map(ToOwned::to_owned),
has_been_acked_or_nacked: false,
}),
payload: Some(message.body().unwrap_or_default().as_bytes().to_owned()),
}
},
)
}

pub async fn receive(&self) -> Result<Delivery> {
Expand Down
11 changes: 9 additions & 2 deletions omniqueue/src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,18 @@ pub trait QueueBackend {

/// The output of queue backends
pub struct Delivery {
pub(crate) payload: Option<Vec<u8>>,
pub(crate) acker: Box<dyn Acker>,
payload: Option<Vec<u8>>,
acker: Box<dyn Acker>,
}

impl Delivery {
pub(crate) fn new(payload: Vec<u8>, acker: impl Acker + 'static) -> Self {
Self {
payload: Some(payload),
acker: Box::new(acker),
}
}

/// Acknowledges the receipt and successful processing of this [`Delivery`].
///
/// On failure, `self` is returned alongside the error to allow retrying.
Expand Down

0 comments on commit 8d8e552

Please sign in to comment.