diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index 9f48668e0..439a20d91 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -601,6 +601,8 @@ impl Store { })?; Ok(Watch { + no_messages: deliver_policy != DeliverPolicy::New + && consumer.cached_info().num_pending == 0, subscription: consumer.messages().await.map_err(|err| match err.kind() { crate::jetstream::consumer::StreamErrorKind::TimedOut => { WatchError::new(WatchErrorKind::TimedOut) @@ -1072,6 +1074,7 @@ impl Store { /// A structure representing a watch on a key-value bucket, yielding values whenever there are changes. pub struct Watch { + no_messages: bool, seen_current: bool, subscription: super::consumer::push::Ordered, prefix: String, @@ -1085,6 +1088,9 @@ impl futures::Stream for Watch { mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { + if self.no_messages { + return Poll::Ready(None); + } match self.subscription.poll_next_unpin(cx) { Poll::Ready(message) => match message { None => Poll::Ready(None), diff --git a/async-nats/tests/kv_tests.rs b/async-nats/tests/kv_tests.rs index 1901ca092..19b670478 100644 --- a/async-nats/tests/kv_tests.rs +++ b/async-nats/tests/kv_tests.rs @@ -532,6 +532,29 @@ mod kv { } } } + + #[tokio::test] + async fn watch_no_messages() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = async_nats::connect(server.client_url()).await.unwrap(); + + let context = async_nats::jetstream::new(client); + let kv = context + .create_key_value(async_nats::jetstream::kv::Config { + bucket: "history".to_string(), + description: "test_description".to_string(), + history: 15, + storage: StorageType::File, + num_replicas: 1, + ..Default::default() + }) + .await + .unwrap(); + + let mut watcher = kv.watch_with_history("foo").await.unwrap(); + assert!(watcher.next().await.is_none()); + } + #[tokio::test] async fn watch() { let server = nats_server::run_server("tests/configs/jetstream.conf");