From b7323c4da35a0f11c3796c67fedc66ede52ee711 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Fri, 30 Jun 2023 14:39:17 +0200 Subject: [PATCH 1/2] Add Pull Consumer concrete errors Signed-off-by: Tomasz Pietrek --- async-nats/src/jetstream/consumer/pull.rs | 89 +++++++++++++++++++---- 1 file changed, 74 insertions(+), 15 deletions(-) diff --git a/async-nats/src/jetstream/consumer/pull.rs b/async-nats/src/jetstream/consumer/pull.rs index fcbdb5a9d..23a5ee847 100644 --- a/async-nats/src/jetstream/consumer/pull.rs +++ b/async-nats/src/jetstream/consumer/pull.rs @@ -34,7 +34,7 @@ use tracing::{debug, trace}; use crate::{ connection::State, jetstream::{self, Context}, - Error, StatusCode, Subscriber, + Error, StatusCode, SubscribeError, Subscriber, }; use super::{ @@ -315,7 +315,7 @@ impl Consumer { /// Ok(()) /// # } /// ``` - pub fn sequence(&self, batch: usize) -> Result { + pub fn sequence(&self, batch: usize) -> Result { let context = self.context.clone(); let subject = format!( "{}.CONSUMER.MSG.NEXT.{}.{}", @@ -324,10 +324,11 @@ impl Consumer { let request = serde_json::to_vec(&BatchConfig { batch, - expires: Some(Duration::from_secs(60).as_millis().try_into()?), + expires: Some(Duration::from_secs(60).as_millis().try_into().unwrap()), ..Default::default() }) - .map(Bytes::from)?; + .map(Bytes::from) + .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Serialize, err))?; Ok(Sequence { context, @@ -348,7 +349,7 @@ pub struct Batch { } impl<'a> Batch { - async fn batch(batch: BatchConfig, consumer: &Consumer) -> Result { + async fn batch(batch: BatchConfig, consumer: &Consumer) -> Result { let inbox = consumer.context.client.new_inbox(); let subscription = consumer.context.client.subscribe(inbox.clone()).await?; consumer.request_batch(batch, inbox.clone()).await?; @@ -442,11 +443,11 @@ pub struct Sequence<'a> { subject: String, request: Bytes, pending_messages: usize, - next: Option>>, + next: Option>>, } impl<'a> futures::Stream for Sequence<'a> { - type Item = Result; + type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, @@ -461,12 +462,22 @@ impl<'a> futures::Stream for Sequence<'a> { self.next = Some(Box::pin(async move { let inbox = context.client.new_inbox(); - let subscriber = context.client.subscribe(inbox.clone()).await?; + let subscriber = + context + .client + .subscribe(inbox.clone()) + .await + .map_err(|err| { + MessagesError::with_source(MessagesErrorKind::PullFailed, err) + })?; context .client .publish_with_reply(subject, inbox, request) - .await?; + .await + .map_err(|err| { + MessagesError::with_source(MessagesErrorKind::PullFailed, err) + })?; // TODO(tp): Add timeout config and defaults. Ok(Batch { @@ -481,7 +492,9 @@ impl<'a> futures::Stream for Sequence<'a> { match self.next.as_mut().unwrap().as_mut().poll(cx) { Poll::Ready(result) => { self.next = None; - Poll::Ready(Some(result)) + Poll::Ready(Some(result.map_err(|err| { + MessagesError::with_source(MessagesErrorKind::PullFailed, err) + }))) } Poll::Pending => Poll::Pending, } @@ -490,7 +503,9 @@ impl<'a> futures::Stream for Sequence<'a> { Some(next) => match next.as_mut().poll(cx) { Poll::Ready(result) => { self.next = None; - Poll::Ready(Some(result)) + Poll::Ready(Some(result.map_err(|err| { + MessagesError::with_source(MessagesErrorKind::PullFailed, err) + }))) } Poll::Pending => Poll::Pending, }, @@ -1739,7 +1754,7 @@ impl<'a> FetchBuilder<'a> { /// # Ok(()) /// # } /// ``` - pub async fn messages(self) -> Result { + pub async fn messages(self) -> Result { Batch::batch( BatchConfig { batch: self.batch, @@ -1987,7 +2002,7 @@ impl<'a> BatchBuilder<'a> { /// # Ok(()) /// # } /// ``` - pub async fn messages(self) -> Result { + pub async fn messages(self) -> Result { Batch::batch( BatchConfig { batch: self.batch, @@ -2202,7 +2217,7 @@ impl FromConsumer for Config { } #[derive(Debug)] -pub(crate) struct BatchRequestError { +pub struct BatchRequestError { kind: BatchRequestErrorKind, source: Option, } @@ -2225,12 +2240,56 @@ impl std::fmt::Display for BatchRequestError { } #[derive(Debug, Clone, Copy, PartialEq)] -pub(crate) enum BatchRequestErrorKind { +pub enum BatchRequestErrorKind { Publish, Flush, Serialize, } +#[derive(Debug)] +pub struct BatchError { + kind: BatchErrorKind, + source: Option>, +} +crate::error_impls!(BatchError, BatchErrorKind); + +impl std::fmt::Display for BatchError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match &self.kind() { + BatchErrorKind::PullFailed => { + write!(f, "pull request failed: {}", self.format_source()) + } + BatchErrorKind::Flush => { + write!(f, "flush failed: {}", self.format_source()) + } + BatchErrorKind::Serialize => { + write!(f, "serialize failed: {}", self.format_source()) + } + BatchErrorKind::Subscribe => write!(f, "subscribe failed: {}", self.format_source()), + } + } +} + +impl From for BatchError { + fn from(err: SubscribeError) -> Self { + BatchError::with_source(BatchErrorKind::Subscribe, err) + } +} + +impl From for BatchError { + fn from(err: BatchRequestError) -> Self { + BatchError::with_source(BatchErrorKind::PullFailed, err) + } +} + +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum BatchErrorKind { + Subscribe, + PullFailed, + Flush, + Serialize, +} + #[derive(Debug)] pub struct ConsumerRecreateError { kind: ConsumerRecreateErrorKind, From e6e2078dca6e86dda802369d3102f6c5fe984960 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Fri, 30 Jun 2023 14:51:03 +0200 Subject: [PATCH 2/2] Remove manual error Signed-off-by: Tomasz Pietrek --- async-nats/src/jetstream/consumer/pull.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async-nats/src/jetstream/consumer/pull.rs b/async-nats/src/jetstream/consumer/pull.rs index 23a5ee847..9c581bc91 100644 --- a/async-nats/src/jetstream/consumer/pull.rs +++ b/async-nats/src/jetstream/consumer/pull.rs @@ -2249,7 +2249,7 @@ pub enum BatchRequestErrorKind { #[derive(Debug)] pub struct BatchError { kind: BatchErrorKind, - source: Option>, + source: Option, } crate::error_impls!(BatchError, BatchErrorKind);