diff --git a/src/sinks/nats/config.rs b/src/sinks/nats/config.rs index a9f52f4ec88c0b..661c5ba3ef0aae 100644 --- a/src/sinks/nats/config.rs +++ b/src/sinks/nats/config.rs @@ -1,3 +1,4 @@ +use bytes::Bytes; use futures_util::TryFutureExt; use snafu::ResultExt; use vector_lib::codecs::JsonSerializerConfig; @@ -76,6 +77,10 @@ pub struct NatsSinkConfig { #[configurable(derived)] #[serde(default)] pub(super) request: TowerRequestConfig, + + #[configurable(derived)] + #[serde(default)] + pub(super) jetstream: bool, } fn default_name() -> String { @@ -93,6 +98,7 @@ impl GenerateConfig for NatsSinkConfig { tls: None, url: "nats://127.0.0.1:4222".into(), request: Default::default(), + jetstream: Default::default(), }) .unwrap() } @@ -130,8 +136,56 @@ impl NatsSinkConfig { options.connect(&self.url).await.context(ConnectSnafu) } + + pub(super) async fn publisher(&self) -> Result { + let connection = self.connect().await?; + + match self.jetstream { + true => Ok(NatsPublisher::JetStream(async_nats::jetstream::new( + connection, + ))), + false => Ok(NatsPublisher::Core(connection)), + } + } } async fn healthcheck(config: NatsSinkConfig) -> crate::Result<()> { config.connect().map_ok(|_| ()).map_err(|e| e.into()).await } + +pub enum NatsPublisher { + Core(async_nats::Client), + JetStream(async_nats::jetstream::Context), +} + +impl NatsPublisher { + pub(super) async fn publish( + &self, + subject: S, + payload: Bytes, + ) -> Result<(), NatsError> { + match self { + NatsPublisher::Core(client) => { + client + .publish(subject, payload) + .await + .map_err(|e| NatsError::PublishError { + source: Box::new(e), + })?; + client.flush().await.map_err(|e| NatsError::PublishError { + source: Box::new(e), + }) + } + NatsPublisher::JetStream(jetstream) => { + let ack = jetstream.publish(subject, payload).await.map_err(|e| { + NatsError::PublishError { + source: Box::new(e), + } + })?; + ack.await.map(|_| ()).map_err(|e| NatsError::PublishError { + source: Box::new(e), + }) + } + } + } +} diff --git a/src/sinks/nats/mod.rs b/src/sinks/nats/mod.rs index a1729c84d574ad..affaf422531d4e 100644 --- a/src/sinks/nats/mod.rs +++ b/src/sinks/nats/mod.rs @@ -27,4 +27,6 @@ enum NatsError { Connect { source: async_nats::ConnectError }, #[snafu(display("NATS Server Error: {}", source))] ServerError { source: async_nats::Error }, + #[snafu(display("NATS Publish Error: {}", source))] + PublishError { source: async_nats::Error }, } diff --git a/src/sinks/nats/service.rs b/src/sinks/nats/service.rs index 0eb2407ab57389..1aeccced639f91 100644 --- a/src/sinks/nats/service.rs +++ b/src/sinks/nats/service.rs @@ -7,11 +7,11 @@ use futures_util::TryFutureExt; use crate::sinks::prelude::*; -use super::{request_builder::NatsRequest, NatsError}; +use super::{config::NatsPublisher, request_builder::NatsRequest, NatsError}; #[derive(Clone)] pub(super) struct NatsService { - pub(super) connection: Arc, + pub(super) publisher: Arc, } pub(super) struct NatsResponse { @@ -44,13 +44,12 @@ impl Service for NatsService { } fn call(&mut self, req: NatsRequest) -> Self::Future { - let connection = Arc::clone(&self.connection); + let publisher = Arc::clone(&self.publisher); Box::pin(async move { - match connection + match publisher .publish(req.subject, req.bytes) .map_err(async_nats::Error::from) - .and_then(|_| connection.flush().map_err(Into::into)) .await { Err(error) => Err(NatsError::ServerError { source: error }), diff --git a/src/sinks/nats/sink.rs b/src/sinks/nats/sink.rs index f2f4524b6ecfb2..a5b94d4a9c759f 100644 --- a/src/sinks/nats/sink.rs +++ b/src/sinks/nats/sink.rs @@ -5,7 +5,7 @@ use snafu::ResultExt; use crate::sinks::prelude::*; use super::{ - config::{NatsSinkConfig, NatsTowerRequestConfigDefaults}, + config::{NatsPublisher, NatsSinkConfig, NatsTowerRequestConfigDefaults}, request_builder::{NatsEncoder, NatsRequestBuilder}, service::{NatsResponse, NatsService}, EncodingSnafu, NatsError, @@ -20,7 +20,7 @@ pub(super) struct NatsSink { request: TowerRequestConfig, transformer: Transformer, encoder: Encoder<()>, - connection: Arc, + publisher: Arc, subject: Template, } @@ -42,7 +42,7 @@ impl NatsSink { } pub(super) async fn new(config: NatsSinkConfig) -> Result { - let connection = Arc::new(config.connect().await?); + let publisher = Arc::new(config.publisher().await?); let transformer = config.encoding.transformer(); let serializer = config.encoding.build().context(EncodingSnafu)?; let encoder = Encoder::<()>::new(serializer); @@ -51,9 +51,9 @@ impl NatsSink { Ok(NatsSink { request, - connection, transformer, encoder, + publisher, subject, }) } @@ -71,7 +71,7 @@ impl NatsSink { let service = ServiceBuilder::new() .settings(request, NatsRetryLogic) .service(NatsService { - connection: Arc::clone(&self.connection), + publisher: Arc::clone(&self.publisher), }); input