diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index f4bcec82b..9f48668e0 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -21,42 +21,31 @@ use std::{ task::Poll, }; -use crate::{HeaderValue, StatusCode}; +use crate::HeaderValue; use bytes::Bytes; use futures::StreamExt; use once_cell::sync::Lazy; use regex::Regex; -use time::{format_description::well_known::Rfc3339, OffsetDateTime}; +use time::OffsetDateTime; use tracing::debug; use crate::error::Error; -use crate::{header, Message}; +use crate::header; use self::bucket::Status; use super::{ consumer::{push::OrderedError, DeliverPolicy, StreamError, StreamErrorKind}, context::{PublishError, PublishErrorKind}, + message::StreamMessage, stream::{ - self, ConsumerError, ConsumerErrorKind, DirectGetError, DirectGetErrorKind, RawMessage, - Republish, Source, StorageType, Stream, + self, ConsumerError, ConsumerErrorKind, DirectGetError, DirectGetErrorKind, Republish, + Source, StorageType, Stream, }, }; -fn kv_operation_from_stream_message(message: &RawMessage) -> Operation { - match message.headers.as_deref() { - Some(headers) => headers.parse().unwrap_or(Operation::Put), - None => Operation::Put, - } -} - -fn kv_operation_from_message(message: &Message) -> Result { - let headers = message - .headers - .as_ref() - .ok_or_else(|| EntryError::with_source(EntryErrorKind::Other, "missing headers"))?; - - if let Some(op) = headers.get(KV_OPERATION) { +fn kv_operation_from_stream_message(message: &StreamMessage) -> Result { + if let Some(op) = message.headers.get(KV_OPERATION) { Operation::from_str(op.as_str()) .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err)) } else { @@ -66,6 +55,18 @@ fn kv_operation_from_message(message: &Message) -> Result )) } } +fn kv_operation_from_message(message: &crate::message::Message) -> Result { + let headers = match message.headers.as_ref() { + Some(headers) => headers, + None => return Ok(Operation::Put), + }; + if let Some(op) = headers.get(KV_OPERATION) { + Operation::from_str(op.as_str()) + .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err)) + } else { + Ok(Operation::Put) + } +} static VALID_BUCKET_RE: Lazy = Lazy::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap()); static VALID_KEY_RE: Lazy = Lazy::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap()); @@ -313,29 +314,11 @@ impl Store { Ok(ack.sequence) } - /// Retrieves the last [Entry] for a given key from a bucket. - /// - /// # Examples - /// - /// ```no_run - /// # #[tokio::main] - /// # async fn main() -> Result<(), async_nats::Error> { - /// let client = async_nats::connect("demo.nats.io:4222").await?; - /// let jetstream = async_nats::jetstream::new(client); - /// let kv = jetstream - /// .create_key_value(async_nats::jetstream::kv::Config { - /// bucket: "kv".to_string(), - /// history: 10, - /// ..Default::default() - /// }) - /// .await?; - /// let status = kv.put("key", "value".into()).await?; - /// let entry = kv.entry("key").await?; - /// println!("entry: {:?}", entry); - /// # Ok(()) - /// # } - /// ``` - pub async fn entry>(&self, key: T) -> Result, EntryError> { + async fn entry_maybe_revision>( + &self, + key: T, + revision: Option, + ) -> Result, EntryError> { let key: String = key.into(); if !is_valid_key(key.as_ref()) { return Err(EntryError::new(EntryErrorKind::InvalidKey)); @@ -343,60 +326,32 @@ impl Store { let subject = format!("{}{}", self.prefix.as_str(), &key); - let result: Option<(Message, Operation, u64, OffsetDateTime)> = { + let result: Option<(StreamMessage, Operation)> = { if self.stream.info.config.allow_direct { - let message = self - .stream - .direct_get_last_for_subject(subject.as_str()) - .await; + let message = match revision { + Some(revision) => { + let message = self.stream.direct_get(revision).await; + if let Ok(message) = message.as_ref() { + if message.subject.as_str() != subject { + println!("subject mismatch {}", message.subject); + return Ok(None); + } + } + message + } + None => { + self.stream + .direct_get_last_for_subject(subject.as_str()) + .await + } + }; match message { Ok(message) => { - let headers = message.headers.as_ref().ok_or_else(|| { - EntryError::with_source(EntryErrorKind::Other, "missing headers") - })?; - let operation = - kv_operation_from_message(&message).unwrap_or(Operation::Put); - - let sequence = headers - .get_last(header::NATS_SEQUENCE) - .ok_or_else(|| { - EntryError::with_source( - EntryErrorKind::Other, - "missing sequence headers", - ) - })? - .as_str() - .parse() - .map_err(|err| { - EntryError::with_source( - EntryErrorKind::Other, - format!("failed to parse headers sequence value: {}", err), - ) - })?; - - let created = headers - .get_last(header::NATS_TIME_STAMP) - .ok_or_else(|| { - EntryError::with_source( - EntryErrorKind::Other, - "did not found timestamp header", - ) - }) - .and_then(|created| { - OffsetDateTime::parse(created.as_str(), &Rfc3339).map_err(|err| { - EntryError::with_source( - EntryErrorKind::Other, - format!( - "failed to parse headers timestampt value: {}", - err - ), - ) - }) - })?; - - Some((message.message, operation, sequence, created)) + kv_operation_from_stream_message(&message).unwrap_or(Operation::Put); + + Some((message, operation)) } Err(err) => { if err.kind() == DirectGetErrorKind::NotFound { @@ -407,22 +362,28 @@ impl Store { } } } else { - let raw_message = self - .stream - .get_last_raw_message_by_subject(subject.as_str()) - .await; + let raw_message = match revision { + Some(revision) => { + let message = self.stream.get_raw_message(revision).await; + if let Ok(message) = message.as_ref() { + if message.subject.as_str() != subject { + return Ok(None); + } + } + message + } + None => { + self.stream + .get_last_raw_message_by_subject(subject.as_str()) + .await + } + }; match raw_message { Ok(raw_message) => { - let operation = kv_operation_from_stream_message(&raw_message); + let operation = kv_operation_from_stream_message(&raw_message) + .unwrap_or(Operation::Put); // TODO: unnecessary expensive, cloning whole Message. - let nats_message = Message::try_from(raw_message.clone()) - .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err))?; - Some(( - nats_message, - operation, - raw_message.sequence, - raw_message.time, - )) + Some((raw_message, operation)) } Err(err) => match err.kind() { crate::jetstream::stream::LastRawMessageErrorKind::NoMessageFound => None, @@ -438,17 +399,13 @@ impl Store { }; match result { - Some((message, operation, revision, created)) => { - if message.status == Some(StatusCode::NO_RESPONDERS) { - return Ok(None); - } - + Some((message, operation)) => { let entry = Entry { bucket: self.name.clone(), key, value: message.payload, - revision, - created, + revision: message.sequence, + created: message.time, operation, delta: 0, seen_current: false, @@ -460,6 +417,63 @@ impl Store { } } + /// Retrieves the last [Entry] for a given key from a bucket. + /// + /// # Examples + /// + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// let client = async_nats::connect("demo.nats.io:4222").await?; + /// let jetstream = async_nats::jetstream::new(client); + /// let kv = jetstream + /// .create_key_value(async_nats::jetstream::kv::Config { + /// bucket: "kv".to_string(), + /// history: 10, + /// ..Default::default() + /// }) + /// .await?; + /// let status = kv.put("key", "value".into()).await?; + /// let entry = kv.entry("key").await?; + /// println!("entry: {:?}", entry); + /// # Ok(()) + /// # } + /// ``` + pub async fn entry>(&self, key: T) -> Result, EntryError> { + self.entry_maybe_revision(key, None).await + } + + /// Retrieves the [Entry] for a given key revision from a bucket. + /// + /// # Examples + /// + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// let client = async_nats::connect("demo.nats.io:4222").await?; + /// let jetstream = async_nats::jetstream::new(client); + /// let kv = jetstream + /// .create_key_value(async_nats::jetstream::kv::Config { + /// bucket: "kv".to_string(), + /// history: 10, + /// ..Default::default() + /// }) + /// .await?; + /// let status = kv.put("key", "value".into()).await?; + /// let status = kv.put("key", "value2".into()).await?; + /// let entry = kv.entry_for_revision("key", 2).await?; + /// println!("entry: {:?}", entry); + /// # Ok(()) + /// # } + /// ``` + pub async fn entry_for_revision>( + &self, + key: T, + revision: u64, + ) -> Result, EntryError> { + self.entry_maybe_revision(key, Some(revision)).await + } + /// Creates a [futures::Stream] over [Entries][Entry] a given key in the bucket, which yields /// values whenever there are changes for that key. /// @@ -1083,7 +1097,8 @@ impl futures::Stream for Watch { ) })?; - let operation = kv_operation_from_message(&message).unwrap_or(Operation::Put); + let operation = + kv_operation_from_message(&message.message).unwrap_or(Operation::Put); let key = message .subject @@ -1211,7 +1226,7 @@ impl futures::Stream for Keys { } /// An entry in a key-value bucket. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct Entry { /// Name of the bucket the entry is in. pub bucket: String, diff --git a/async-nats/src/jetstream/message.rs b/async-nats/src/jetstream/message.rs index cbeadaf16..bf3ef6c0d 100644 --- a/async-nats/src/jetstream/message.rs +++ b/async-nats/src/jetstream/message.rs @@ -13,20 +13,104 @@ //! A wrapped `crate::Message` with `JetStream` related methods. use super::context::Context; -use crate::subject::Subject; -use crate::Error; +use crate::{error, header, Error}; +use crate::{subject::Subject, HeaderMap}; use bytes::Bytes; use futures::future::TryFutureExt; use futures::StreamExt; +use std::fmt::Display; use std::{mem, time::Duration}; +use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; +/// A message received directly from the stream, without leveraging a consumer. +#[derive(Debug, Clone)] +pub struct StreamMessage { + pub subject: Subject, + pub sequence: u64, + pub headers: HeaderMap, + pub payload: Bytes, + pub time: OffsetDateTime, +} + #[derive(Clone, Debug)] pub struct Message { pub message: crate::Message, pub context: Context, } +impl TryFrom for StreamMessage { + type Error = StreamMessageError; + + fn try_from(message: crate::Message) -> Result { + let headers = message.headers.ok_or_else(|| { + StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "no headers") + })?; + + let sequence = headers + .get_last(header::NATS_SEQUENCE) + .ok_or_else(|| { + StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "sequence") + }) + .and_then(|seq| { + seq.as_str().parse().map_err(|err| { + StreamMessageError::with_source( + StreamMessageErrorKind::ParseError, + format!("could not parse sequence header: {}", err), + ) + }) + })?; + + let time = headers + .get_last(header::NATS_TIME_STAMP) + .ok_or_else(|| { + StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "timestamp") + }) + .and_then(|time| { + OffsetDateTime::parse(time.as_str(), &Rfc3339).map_err(|err| { + StreamMessageError::with_source( + StreamMessageErrorKind::ParseError, + format!("could not parse timestamp header: {}", err), + ) + }) + })?; + + let subject = headers + .get_last(header::NATS_SUBJECT) + .ok_or_else(|| { + StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "subject") + })? + .as_str() + .into(); + + Ok(StreamMessage { + subject, + sequence, + headers, + payload: message.payload, + time, + }) + } +} + +#[derive(Debug, Clone, PartialEq)] +pub enum StreamMessageErrorKind { + MissingHeader, + ParseError, +} + +/// Error returned when library is unable to parse message got directly from the stream. +pub type StreamMessageError = error::Error; + +impl Display for StreamMessageErrorKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + StreamMessageErrorKind::MissingHeader => write!(f, "missing message header"), + StreamMessageErrorKind::ParseError => write!(f, "parse error"), + } + } +} + impl std::ops::Deref for Message { type Target = crate::Message; diff --git a/async-nats/src/jetstream/object_store/mod.rs b/async-nats/src/jetstream/object_store/mod.rs index 73abca56a..2658bfc11 100644 --- a/async-nats/src/jetstream/object_store/mod.rs +++ b/async-nats/src/jetstream/object_store/mod.rs @@ -19,7 +19,7 @@ use std::{cmp, str::FromStr, task::Poll, time::Duration}; use crate::crypto::Sha256; use crate::subject::Subject; use crate::{HeaderMap, HeaderValue}; -use base64::engine::general_purpose::{STANDARD, URL_SAFE}; +use base64::engine::general_purpose::URL_SAFE; use base64::engine::Engine; use bytes::BytesMut; use futures::future::BoxFuture; @@ -239,6 +239,7 @@ impl ObjectStore { // Grab last meta value we have. let subject = format!("$O.{}.M.{}", &self.name, &object_name); + // FIXME(jrm): we should use direct get here when possible. let message = self .stream .get_last_raw_message_by_subject(subject.as_str()) @@ -249,11 +250,8 @@ impl ObjectStore { } _ => InfoError::with_source(InfoErrorKind::Other, err), })?; - let decoded_payload = STANDARD - .decode(message.payload) - .map_err(|err| InfoError::with_source(InfoErrorKind::Other, err))?; let object_info = - serde_json::from_slice::(&decoded_payload).map_err(|err| { + serde_json::from_slice::(&message.payload).map_err(|err| { InfoError::with_source( InfoErrorKind::Other, format!("failed to decode info payload: {}", err), diff --git a/async-nats/src/jetstream/stream.rs b/async-nats/src/jetstream/stream.rs index bf9818f7b..f24128bcf 100755 --- a/async-nats/src/jetstream/stream.rs +++ b/async-nats/src/jetstream/stream.rs @@ -40,6 +40,7 @@ use super::{ consumer::{self, Consumer, FromConsumer, IntoConsumerConfig}, context::{RequestError, RequestErrorKind, StreamsError, StreamsErrorKind}, errors::ErrorCode, + message::{StreamMessage, StreamMessageError}, response::Response, Context, Message, }; @@ -91,6 +92,12 @@ impl From for DirectGetError { } } +impl From for DirectGetError { + fn from(err: StreamMessageError) -> Self { + DirectGetError::with_source(DirectGetErrorKind::Other, err) + } +} + #[derive(Clone, Debug, PartialEq)] pub enum DeleteMessageErrorKind { Request, @@ -372,7 +379,7 @@ impl Stream { /// # Ok(()) /// # } /// ``` - pub async fn direct_get(&self, sequence: u64) -> Result { + pub async fn direct_get(&self, sequence: u64) -> Result { let subject = format!( "{}.DIRECT.GET.{}", &self.context.prefix, &self.info.config.name @@ -385,11 +392,7 @@ impl Stream { .context .client .request(subject, serde_json::to_vec(&payload).map(Bytes::from)?) - .await - .map(|message| Message { - context: self.context.clone(), - message, - })?; + .await?; if let Some(status) = response.status { if let Some(ref description) = response.description { @@ -410,7 +413,7 @@ impl Stream { } } } - Ok(response) + StreamMessage::try_from(response).map_err(Into::into) } /// Gets last message for a given `subject`. @@ -447,7 +450,7 @@ impl Stream { pub async fn direct_get_last_for_subject>( &self, subject: T, - ) -> Result { + ) -> Result { let subject = format!( "{}.DIRECT.GET.{}.{}", &self.context.prefix, @@ -455,15 +458,7 @@ impl Stream { subject.as_ref() ); - let response = self - .context - .client - .request(subject, "".into()) - .await - .map(|message| Message { - context: self.context.clone(), - message, - })?; + let response = self.context.client.request(subject, "".into()).await?; if let Some(status) = response.status { if let Some(ref description) = response.description { match status { @@ -483,7 +478,7 @@ impl Stream { } } } - Ok(response) + StreamMessage::try_from(response).map_err(Into::into) } /// Get a raw message from the stream. /// @@ -512,19 +507,32 @@ impl Stream { /// # Ok(()) /// # } /// ``` - pub async fn get_raw_message(&self, sequence: u64) -> Result { + pub async fn get_raw_message(&self, sequence: u64) -> Result { let subject = format!("STREAM.MSG.GET.{}", &self.info.config.name); let payload = json!({ "seq": sequence, }); - let response: Response = self.context.request(subject, &payload).await?; + let response: Response = self + .context + .request(subject, &payload) + .map_err(|err| LastRawMessageError::with_source(LastRawMessageErrorKind::Other, err)) + .await?; + match response { - Response::Err { error } => Err(Box::new(std::io::Error::new( - ErrorKind::Other, - format!("nats: error while getting message: {}", error), - ))), - Response::Ok(value) => Ok(value.message), + Response::Err { error } => { + if error.error_code() == ErrorCode::NO_MESSAGE_FOUND { + Err(LastRawMessageError::new( + LastRawMessageErrorKind::NoMessageFound, + )) + } else { + Err(LastRawMessageError::new( + LastRawMessageErrorKind::JetStream(error), + )) + } + } + Response::Ok(value) => StreamMessage::try_from(value.message) + .map_err(|err| RawMessageError::with_source(RawMessageErrorKind::Other, err)), } } @@ -558,7 +566,7 @@ impl Stream { pub async fn get_last_raw_message_by_subject( &self, stream_subject: &str, - ) -> Result { + ) -> Result { let subject = format!("STREAM.MSG.GET.{}", &self.info.config.name); let payload = json!({ "last_by_subj": stream_subject, @@ -581,7 +589,9 @@ impl Stream { )) } } - Response::Ok(value) => Ok(value.message), + Response::Ok(value) => Ok(value.message.try_into().map_err(|err| { + LastRawMessageError::with_source(LastRawMessageErrorKind::Other, err) + })?), } } @@ -1300,7 +1310,7 @@ pub struct RawMessage { pub time: time::OffsetDateTime, } -impl TryFrom for crate::Message { +impl TryFrom for StreamMessage { type Error = crate::Error; fn try_from(value: RawMessage) -> Result { @@ -1312,23 +1322,15 @@ impl TryFrom for crate::Message { .map(|header| STANDARD.decode(header)) .map_or(Ok(None), |v| v.map(Some))?; - let length = decoded_headers - .as_ref() - .map_or_else(|| 0, |headers| headers.len()) - + decoded_payload.len() - + value.subject.len(); - - let (headers, status, description) = - decoded_headers.map_or_else(|| Ok((None, None, None)), |h| parse_headers(&h))?; + let (headers, _, _) = decoded_headers + .map_or_else(|| Ok((HeaderMap::new(), None, None)), |h| parse_headers(&h))?; - Ok(crate::Message { + Ok(StreamMessage { subject: value.subject.into(), - reply: None, payload: decoded_payload.into(), headers, - status, - description, - length, + sequence: value.sequence, + time: value.time, }) } } @@ -1341,7 +1343,7 @@ const HEADER_LINE: &str = "NATS/1.0"; #[allow(clippy::type_complexity)] fn parse_headers( buf: &[u8], -) -> Result<(Option, Option, Option), crate::Error> { +) -> Result<(HeaderMap, Option, Option), crate::Error> { let mut headers = HeaderMap::new(); let mut maybe_status: Option = None; let mut maybe_description: Option = None; @@ -1414,9 +1416,9 @@ fn parse_headers( } if headers.is_empty() { - Ok((None, maybe_status, maybe_description)) + Ok((HeaderMap::new(), maybe_status, maybe_description)) } else { - Ok((Some(headers), maybe_status, maybe_description)) + Ok((headers, maybe_status, maybe_description)) } } @@ -1873,6 +1875,8 @@ impl Display for LastRawMessageErrorKind { } pub type LastRawMessageError = Error; +pub type RawMessageErrorKind = LastRawMessageErrorKind; +pub type RawMessageError = LastRawMessageError; #[derive(Clone, Debug, PartialEq)] pub enum ConsumerErrorKind { diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index 0bcdb11ce..9c98ec25a 100755 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -661,13 +661,7 @@ mod jetstream { let message = stream.direct_get_last_for_subject("events").await.unwrap(); - let sequence = message - .headers - .as_ref() - .unwrap() - .get(header::NATS_SEQUENCE) - .unwrap() - .as_str(); + let sequence = message.headers.get(header::NATS_SEQUENCE).unwrap().as_str(); assert_eq!(sequence.parse::().unwrap(), publish_ack.sequence); assert_eq!(payload, message.payload.as_ref()); @@ -860,13 +854,7 @@ mod jetstream { let message = stream.direct_get(2).await.unwrap(); - let sequence = message - .headers - .as_ref() - .unwrap() - .get(header::NATS_SEQUENCE) - .unwrap() - .as_str(); + let sequence = message.headers.get(header::NATS_SEQUENCE).unwrap().as_str(); assert_eq!(sequence.parse::().unwrap(), publish_ack.sequence); assert_eq!(payload, message.payload.as_ref()); diff --git a/async-nats/tests/kv_tests.rs b/async-nats/tests/kv_tests.rs index 0d66e47a0..1901ca092 100644 --- a/async-nats/tests/kv_tests.rs +++ b/async-nats/tests/kv_tests.rs @@ -190,10 +190,14 @@ mod kv { let nothing = kv.get("nothing").await.unwrap(); assert_eq!(None, nothing); + let value = kv.entry_for_revision("key", 1).await.unwrap(); + assert_eq!(from_utf8(&value.unwrap().value).unwrap(), "data"); + context .update_stream(async_nats::jetstream::stream::Config { max_messages_per_subject: 10, name: "KV_test".into(), + subjects: vec!["$KV.test.>".into()], deny_delete: true, allow_direct: false, ..Default::default() @@ -208,6 +212,12 @@ mod kv { assert_eq!(None, nothing); let value = kv.entry("key").await.unwrap(); assert_eq!(from_utf8(&value.unwrap().value).unwrap(), payload); + + let value = kv.entry_for_revision("key", 1).await.unwrap(); + assert_eq!(from_utf8(&value.unwrap().value).unwrap(), "data"); + + let value = kv.entry_for_revision("key", 250).await.unwrap(); + assert!(value.is_none()); } #[tokio::test]