Skip to content

Commit

Permalink
Drop subscription on object read done
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema committed Jul 1, 2023
1 parent c2c3b23 commit 650395a
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 55 deletions.
4 changes: 2 additions & 2 deletions async-nats/src/jetstream/consumer/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,8 +651,8 @@ impl<'a> futures::Stream for Ordered<'a> {
}
}
}
if let Some(subject) = message.reply {
warn!("got message with reply subject for ordered consumer");
if let Some(subject) = message.reply.clone() {
warn!("got message with reply subject for ordered consumer: {:?}", message);
// TODO store pending_publish as a future and return errors from it
let client = self.context.client.clone();
tokio::task::spawn(async move {
Expand Down
111 changes: 58 additions & 53 deletions async-nats/src/jetstream/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,13 +505,13 @@ pub struct Object<'a> {
remaining_bytes: Vec<u8>,
has_pending_messages: bool,
digest: Option<ring::digest::Context>,
subscription: crate::jetstream::consumer::push::Ordered<'a>,
subscription: Option<crate::jetstream::consumer::push::Ordered<'a>>,
}

impl<'a> Object<'a> {
pub(crate) fn new(subscription: Ordered<'a>, info: ObjectInfo) -> Self {
Object {
subscription,
subscription: Some(subscription),
info,
remaining_bytes: Vec::new(),
has_pending_messages: true,
Expand Down Expand Up @@ -539,63 +539,68 @@ impl tokio::io::AsyncRead for Object<'_> {
}

if self.has_pending_messages {
match self.subscription.poll_next_unpin(cx) {
Poll::Ready(message) => match message {
Some(message) => {
let message = message.map_err(|err| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("error from JetStream subscription: {err}"),
)
})?;
let len = cmp::min(buf.remaining(), message.payload.len());
buf.put_slice(&message.payload[..len]);
if let Some(context) = &mut self.digest {
context.update(&message.payload);
}
self.remaining_bytes
.extend_from_slice(&message.payload[len..]);

let info = message.info().map_err(|err| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("error from JetStream subscription: {err}"),
)
})?;
if info.pending == 0 {
let digest = self.digest.take().map(|context| context.finish());
if let Some(digest) = digest {
if self
.info
.digest
.as_ref()
.map(|digest_self| {
format!("SHA-256={}", URL_SAFE.encode(digest))
!= *digest_self
})
.unwrap_or(false)
{
if let Some(subscription) = self.subscription.as_mut() {
match subscription.poll_next_unpin(cx) {
Poll::Ready(message) => match message {
Some(message) => {
let message = message.map_err(|err| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("error from JetStream subscription: {err}"),
)
})?;
let len = cmp::min(buf.remaining(), message.payload.len());
buf.put_slice(&message.payload[..len]);
if let Some(context) = &mut self.digest {
context.update(&message.payload);
}
self.remaining_bytes
.extend_from_slice(&message.payload[len..]);

let info = message.info().map_err(|err| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("error from JetStream subscription: {err}"),
)
})?;
if info.pending == 0 {
let digest = self.digest.take().map(|context| context.finish());
if let Some(digest) = digest {
if self
.info
.digest
.as_ref()
.map(|digest_self| {
format!("SHA-256={}", URL_SAFE.encode(digest))
!= *digest_self
})
.unwrap_or(false)
{
return Poll::Ready(Err(io::Error::new(
ErrorKind::InvalidData,
"wrong digest",
)));
}
} else {
return Poll::Ready(Err(io::Error::new(
ErrorKind::InvalidData,
"wrong digest",
"digest should be Some",
)));
}
} else {
return Poll::Ready(Err(io::Error::new(
ErrorKind::InvalidData,
"digest should be Some",
)));
self.has_pending_messages = false;
self.subscription = None;
}
self.has_pending_messages = false;
Poll::Ready(Ok(()))
}
Poll::Ready(Ok(()))
}
None => Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"subscription ended before reading whole object",
))),
},
Poll::Pending => Poll::Pending,
None => Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"subscription ended before reading whole object",
))),
},
Poll::Pending => Poll::Pending,
}
} else {
Poll::Ready(Ok(()))
}
} else {
Poll::Ready(Ok(()))
Expand Down

0 comments on commit 650395a

Please sign in to comment.