Skip to content

Commit

Permalink
Improve kv::Watcher without messages
Browse files Browse the repository at this point in the history
Until now, if underlying watcher for given consumer did not have any pending
messages, it would indefinitely wait for the first one.
This commit improves it by checking message pending count on initial
consumer info, and returning `None` if there are no messages.

Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema committed Sep 27, 2024
1 parent ea4c372 commit 33dbbdc
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 0 deletions.
6 changes: 6 additions & 0 deletions async-nats/src/jetstream/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,8 @@ impl Store {
})?;

Ok(Watch {
no_messages: deliver_policy != DeliverPolicy::New
&& consumer.cached_info().num_pending == 0,
subscription: consumer.messages().await.map_err(|err| match err.kind() {
crate::jetstream::consumer::StreamErrorKind::TimedOut => {
WatchError::new(WatchErrorKind::TimedOut)
Expand Down Expand Up @@ -1072,6 +1074,7 @@ impl Store {

/// A structure representing a watch on a key-value bucket, yielding values whenever there are changes.
pub struct Watch {
no_messages: bool,
seen_current: bool,
subscription: super::consumer::push::Ordered,
prefix: String,
Expand All @@ -1085,6 +1088,9 @@ impl futures::Stream for Watch {
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
if self.no_messages {
return Poll::Ready(None);
}
match self.subscription.poll_next_unpin(cx) {
Poll::Ready(message) => match message {
None => Poll::Ready(None),
Expand Down
23 changes: 23 additions & 0 deletions async-nats/tests/kv_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,29 @@ mod kv {
}
}
}

#[tokio::test]
async fn watch_no_messages() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();

let context = async_nats::jetstream::new(client);
let kv = context
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "history".to_string(),
description: "test_description".to_string(),
history: 15,
storage: StorageType::File,
num_replicas: 1,
..Default::default()
})
.await
.unwrap();

let mut watcher = kv.watch_with_history("foo").await.unwrap();
assert!(watcher.next().await.is_none());
}

#[tokio::test]
async fn watch() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
Expand Down

0 comments on commit 33dbbdc

Please sign in to comment.