From b2d1f5a26bcf77bff9974fd5370f6c122c779390 Mon Sep 17 00:00:00 2001 From: "Dmitriy D. Volkov" Date: Wed, 10 Jul 2024 12:46:24 +0300 Subject: [PATCH 01/13] use nats jetstream as an option --- src/sinks/nats/config.rs | 54 +++++++++++++++++++++++++++++++++++++++ src/sinks/nats/mod.rs | 2 ++ src/sinks/nats/service.rs | 9 +++---- src/sinks/nats/sink.rs | 10 ++++---- 4 files changed, 65 insertions(+), 10 deletions(-) diff --git a/src/sinks/nats/config.rs b/src/sinks/nats/config.rs index a9f52f4ec88c0..661c5ba3ef0aa 100644 --- a/src/sinks/nats/config.rs +++ b/src/sinks/nats/config.rs @@ -1,3 +1,4 @@ +use bytes::Bytes; use futures_util::TryFutureExt; use snafu::ResultExt; use vector_lib::codecs::JsonSerializerConfig; @@ -76,6 +77,10 @@ pub struct NatsSinkConfig { #[configurable(derived)] #[serde(default)] pub(super) request: TowerRequestConfig, + + #[configurable(derived)] + #[serde(default)] + pub(super) jetstream: bool, } fn default_name() -> String { @@ -93,6 +98,7 @@ impl GenerateConfig for NatsSinkConfig { tls: None, url: "nats://127.0.0.1:4222".into(), request: Default::default(), + jetstream: Default::default(), }) .unwrap() } @@ -130,8 +136,56 @@ impl NatsSinkConfig { options.connect(&self.url).await.context(ConnectSnafu) } + + pub(super) async fn publisher(&self) -> Result { + let connection = self.connect().await?; + + match self.jetstream { + true => Ok(NatsPublisher::JetStream(async_nats::jetstream::new( + connection, + ))), + false => Ok(NatsPublisher::Core(connection)), + } + } } async fn healthcheck(config: NatsSinkConfig) -> crate::Result<()> { config.connect().map_ok(|_| ()).map_err(|e| e.into()).await } + +pub enum NatsPublisher { + Core(async_nats::Client), + JetStream(async_nats::jetstream::Context), +} + +impl NatsPublisher { + pub(super) async fn publish( + &self, + subject: S, + payload: Bytes, + ) -> Result<(), NatsError> { + match self { + NatsPublisher::Core(client) => { + client + .publish(subject, payload) + .await + .map_err(|e| NatsError::PublishError { + source: Box::new(e), + })?; + client.flush().await.map_err(|e| NatsError::PublishError { + source: Box::new(e), + }) + } + NatsPublisher::JetStream(jetstream) => { + let ack = jetstream.publish(subject, payload).await.map_err(|e| { + NatsError::PublishError { + source: Box::new(e), + } + })?; + ack.await.map(|_| ()).map_err(|e| NatsError::PublishError { + source: Box::new(e), + }) + } + } + } +} diff --git a/src/sinks/nats/mod.rs b/src/sinks/nats/mod.rs index a1729c84d574a..affaf422531d4 100644 --- a/src/sinks/nats/mod.rs +++ b/src/sinks/nats/mod.rs @@ -27,4 +27,6 @@ enum NatsError { Connect { source: async_nats::ConnectError }, #[snafu(display("NATS Server Error: {}", source))] ServerError { source: async_nats::Error }, + #[snafu(display("NATS Publish Error: {}", source))] + PublishError { source: async_nats::Error }, } diff --git a/src/sinks/nats/service.rs b/src/sinks/nats/service.rs index 0eb2407ab5738..1aeccced639f9 100644 --- a/src/sinks/nats/service.rs +++ b/src/sinks/nats/service.rs @@ -7,11 +7,11 @@ use futures_util::TryFutureExt; use crate::sinks::prelude::*; -use super::{request_builder::NatsRequest, NatsError}; +use super::{config::NatsPublisher, request_builder::NatsRequest, NatsError}; #[derive(Clone)] pub(super) struct NatsService { - pub(super) connection: Arc, + pub(super) publisher: Arc, } pub(super) struct NatsResponse { @@ -44,13 +44,12 @@ impl Service for NatsService { } fn call(&mut self, req: NatsRequest) -> Self::Future { - let connection = Arc::clone(&self.connection); + let publisher = Arc::clone(&self.publisher); Box::pin(async move { - match connection + match publisher .publish(req.subject, req.bytes) .map_err(async_nats::Error::from) - .and_then(|_| connection.flush().map_err(Into::into)) .await { Err(error) => Err(NatsError::ServerError { source: error }), diff --git a/src/sinks/nats/sink.rs b/src/sinks/nats/sink.rs index f2f4524b6ecfb..a5b94d4a9c759 100644 --- a/src/sinks/nats/sink.rs +++ b/src/sinks/nats/sink.rs @@ -5,7 +5,7 @@ use snafu::ResultExt; use crate::sinks::prelude::*; use super::{ - config::{NatsSinkConfig, NatsTowerRequestConfigDefaults}, + config::{NatsPublisher, NatsSinkConfig, NatsTowerRequestConfigDefaults}, request_builder::{NatsEncoder, NatsRequestBuilder}, service::{NatsResponse, NatsService}, EncodingSnafu, NatsError, @@ -20,7 +20,7 @@ pub(super) struct NatsSink { request: TowerRequestConfig, transformer: Transformer, encoder: Encoder<()>, - connection: Arc, + publisher: Arc, subject: Template, } @@ -42,7 +42,7 @@ impl NatsSink { } pub(super) async fn new(config: NatsSinkConfig) -> Result { - let connection = Arc::new(config.connect().await?); + let publisher = Arc::new(config.publisher().await?); let transformer = config.encoding.transformer(); let serializer = config.encoding.build().context(EncodingSnafu)?; let encoder = Encoder::<()>::new(serializer); @@ -51,9 +51,9 @@ impl NatsSink { Ok(NatsSink { request, - connection, transformer, encoder, + publisher, subject, }) } @@ -71,7 +71,7 @@ impl NatsSink { let service = ServiceBuilder::new() .settings(request, NatsRetryLogic) .service(NatsService { - connection: Arc::clone(&self.connection), + publisher: Arc::clone(&self.publisher), }); input From 30de051e3e61155829846f3ae2e2c52c24af7755 Mon Sep 17 00:00:00 2001 From: "Dmitriy D. Volkov" Date: Wed, 10 Jul 2024 13:33:06 +0300 Subject: [PATCH 02/13] no need to manually flush messages --- src/sinks/nats/config.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/sinks/nats/config.rs b/src/sinks/nats/config.rs index 661c5ba3ef0aa..3f87a11f2e8cc 100644 --- a/src/sinks/nats/config.rs +++ b/src/sinks/nats/config.rs @@ -171,10 +171,7 @@ impl NatsPublisher { .await .map_err(|e| NatsError::PublishError { source: Box::new(e), - })?; - client.flush().await.map_err(|e| NatsError::PublishError { - source: Box::new(e), - }) + }) } NatsPublisher::JetStream(jetstream) => { let ack = jetstream.publish(subject, payload).await.map_err(|e| { From 3209e57291b62c42d7591851a8d899d98d9448aa Mon Sep 17 00:00:00 2001 From: "Dmitriy D. Volkov" Date: Thu, 11 Jul 2024 17:11:56 +0300 Subject: [PATCH 03/13] fix jetstream option annotation + prettify code --- src/sinks/nats/config.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/sinks/nats/config.rs b/src/sinks/nats/config.rs index 3f87a11f2e8cc..e9250ad54212d 100644 --- a/src/sinks/nats/config.rs +++ b/src/sinks/nats/config.rs @@ -78,7 +78,9 @@ pub struct NatsSinkConfig { #[serde(default)] pub(super) request: TowerRequestConfig, - #[configurable(derived)] + /// Send messages using [Jetstream][jetstream]. + /// + /// [jetstream]: https://docs.nats.io/nats-concepts/jetstream #[serde(default)] pub(super) jetstream: bool, } @@ -140,11 +142,12 @@ impl NatsSinkConfig { pub(super) async fn publisher(&self) -> Result { let connection = self.connect().await?; - match self.jetstream { - true => Ok(NatsPublisher::JetStream(async_nats::jetstream::new( + if self.jetstream { + Ok(NatsPublisher::JetStream(async_nats::jetstream::new( connection, - ))), - false => Ok(NatsPublisher::Core(connection)), + ))) + } else { + Ok(NatsPublisher::Core(connection)) } } } From 11afa3f5266978cba11beaea97e6609442a2cfde Mon Sep 17 00:00:00 2001 From: "Dmitriy D. Volkov" Date: Thu, 11 Jul 2024 18:02:51 +0300 Subject: [PATCH 04/13] generate component docs --- website/cue/reference/components/sinks/base/nats.cue | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/website/cue/reference/components/sinks/base/nats.cue b/website/cue/reference/components/sinks/base/nats.cue index 175089fdf9324..3b60213df9d1a 100644 --- a/website/cue/reference/components/sinks/base/nats.cue +++ b/website/cue/reference/components/sinks/base/nats.cue @@ -383,6 +383,15 @@ base: components: sinks: nats: configuration: { } } } + jetstream: { + description: """ + Send messages using [Jetstream][jetstream]. + + [jetstream]: https://docs.nats.io/nats-concepts/jetstream + """ + required: false + type: bool: default: false + } request: { description: """ Middleware settings for outbound requests. From ce4e8c2904d83bf94255bd805f5c6986bd0a09c5 Mon Sep 17 00:00:00 2001 From: "Dmitriy D. Volkov" Date: Fri, 12 Jul 2024 17:18:24 +0300 Subject: [PATCH 05/13] add more precise field description + generate docs --- src/sinks/nats/config.rs | 2 ++ website/cue/reference/components/sinks/base/nats.cue | 2 ++ 2 files changed, 4 insertions(+) diff --git a/src/sinks/nats/config.rs b/src/sinks/nats/config.rs index e9250ad54212d..74ef78f75f1cb 100644 --- a/src/sinks/nats/config.rs +++ b/src/sinks/nats/config.rs @@ -80,6 +80,8 @@ pub struct NatsSinkConfig { /// Send messages using [Jetstream][jetstream]. /// + /// If set, the `subject` must belong to an existing JetStream stream. + /// /// [jetstream]: https://docs.nats.io/nats-concepts/jetstream #[serde(default)] pub(super) jetstream: bool, diff --git a/website/cue/reference/components/sinks/base/nats.cue b/website/cue/reference/components/sinks/base/nats.cue index 3b60213df9d1a..b1709d2b7d64a 100644 --- a/website/cue/reference/components/sinks/base/nats.cue +++ b/website/cue/reference/components/sinks/base/nats.cue @@ -387,6 +387,8 @@ base: components: sinks: nats: configuration: { description: """ Send messages using [Jetstream][jetstream]. + If set, the `subject` must belong to an existing JetStream stream. + [jetstream]: https://docs.nats.io/nats-concepts/jetstream """ required: false From 86b9956d46964fabc2ff7f751af53e0fd2cba063 Mon Sep 17 00:00:00 2001 From: "Dmitriy D. Volkov" Date: Mon, 15 Jul 2024 12:40:07 +0300 Subject: [PATCH 06/13] check nats stream existence in healthcheck --- src/sinks/nats/config.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/sinks/nats/config.rs b/src/sinks/nats/config.rs index 74ef78f75f1cb..969eeed122963 100644 --- a/src/sinks/nats/config.rs +++ b/src/sinks/nats/config.rs @@ -155,7 +155,21 @@ impl NatsSinkConfig { } async fn healthcheck(config: NatsSinkConfig) -> crate::Result<()> { - config.connect().map_ok(|_| ()).map_err(|e| e.into()).await + let connection = config.connect().await?; + if !config.jetstream { + return Ok(()); + } + + async_nats::jetstream::new(connection) + .get_stream(config.subject.to_string()) + .map_ok(|_| ()) + .map_err(|e| { + NatsError::ServerError { + source: Box::new(e), + } + .into() + }) + .await } pub enum NatsPublisher { From 56cf84451bf0b135faedddccb61be2805caed32c Mon Sep 17 00:00:00 2001 From: "Dmitriy D. Volkov" Date: Wed, 17 Jul 2024 17:41:14 +0300 Subject: [PATCH 07/13] do not check stream existance --- src/sinks/nats/config.rs | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/src/sinks/nats/config.rs b/src/sinks/nats/config.rs index 969eeed122963..74ef78f75f1cb 100644 --- a/src/sinks/nats/config.rs +++ b/src/sinks/nats/config.rs @@ -155,21 +155,7 @@ impl NatsSinkConfig { } async fn healthcheck(config: NatsSinkConfig) -> crate::Result<()> { - let connection = config.connect().await?; - if !config.jetstream { - return Ok(()); - } - - async_nats::jetstream::new(connection) - .get_stream(config.subject.to_string()) - .map_ok(|_| ()) - .map_err(|e| { - NatsError::ServerError { - source: Box::new(e), - } - .into() - }) - .await + config.connect().map_ok(|_| ()).map_err(|e| e.into()).await } pub enum NatsPublisher { From 0721bf5cecf9098981ef67820109943b8e4bc23d Mon Sep 17 00:00:00 2001 From: "Dmitriy D. Volkov" Date: Mon, 22 Jul 2024 12:25:33 +0300 Subject: [PATCH 08/13] add new field to struct in tests --- src/sinks/nats/integration_tests.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/sinks/nats/integration_tests.rs b/src/sinks/nats/integration_tests.rs index cf2c620db9401..bc0c8dce0611d 100644 --- a/src/sinks/nats/integration_tests.rs +++ b/src/sinks/nats/integration_tests.rs @@ -78,6 +78,7 @@ async fn nats_no_auth() { tls: None, auth: None, request: Default::default(), + jetstream: false, }; let r = publish_and_check(conf).await; @@ -110,6 +111,7 @@ async fn nats_userpass_auth_valid() { }, }), request: Default::default(), + jetstream: false, }; publish_and_check(conf) @@ -139,6 +141,7 @@ async fn nats_userpass_auth_invalid() { }, }), request: Default::default(), + jetstream: false, }; let r = publish_and_check(conf).await; @@ -170,6 +173,7 @@ async fn nats_token_auth_valid() { }, }), request: Default::default(), + jetstream: false, }; let r = publish_and_check(conf).await; @@ -201,6 +205,7 @@ async fn nats_token_auth_invalid() { }, }), request: Default::default(), + jetstream: false, }; let r = publish_and_check(conf).await; @@ -233,6 +238,7 @@ async fn nats_nkey_auth_valid() { }, }), request: Default::default(), + jetstream: false, }; let r = publish_and_check(conf).await; @@ -265,6 +271,7 @@ async fn nats_nkey_auth_invalid() { }, }), request: Default::default(), + jetstream: false, }; let r = publish_and_check(conf).await; @@ -298,6 +305,7 @@ async fn nats_tls_valid() { }), auth: None, request: Default::default(), + jetstream: false, }; let r = publish_and_check(conf).await; @@ -325,6 +333,7 @@ async fn nats_tls_invalid() { tls: None, auth: None, request: Default::default(), + jetstream: false, }; let r = publish_and_check(conf).await; @@ -360,6 +369,7 @@ async fn nats_tls_client_cert_valid() { }), auth: None, request: Default::default(), + jetstream: false, }; let r = publish_and_check(conf).await; @@ -393,6 +403,7 @@ async fn nats_tls_client_cert_invalid() { }), auth: None, request: Default::default(), + jetstream: false, }; let r = publish_and_check(conf).await; @@ -430,6 +441,7 @@ async fn nats_tls_jwt_auth_valid() { }, }), request: Default::default(), + jetstream: false, }; let r = publish_and_check(conf).await; @@ -467,6 +479,7 @@ async fn nats_tls_jwt_auth_invalid() { }, }), request: Default::default(), + jetstream: false, }; let r = publish_and_check(conf).await; From dca5a3fbacdb69014c42d8c6736b6c575477ce34 Mon Sep 17 00:00:00 2001 From: "Dmitriy D. Volkov" Date: Mon, 22 Jul 2024 12:31:58 +0300 Subject: [PATCH 09/13] add changelog --- .../20834_nats_jetstream_sink.feature.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 changelog.d/20834_nats_jetstream_sink.feature.md diff --git a/changelog.d/20834_nats_jetstream_sink.feature.md b/changelog.d/20834_nats_jetstream_sink.feature.md new file mode 100644 index 0000000000000..e01e5cb461d53 --- /dev/null +++ b/changelog.d/20834_nats_jetstream_sink.feature.md @@ -0,0 +1,18 @@ +Add possibility to use NATS JetStream in NATS sink. Can be turned on/off via `jetstream` option (default is false). + +### Example + +#### Config + +``` +sinks: + nats: + type: nats + inputs: + - in + subject: nork + url: "nats://localhost:4222" + jetstream: true + encoding: + codec: json +``` \ No newline at end of file From 7eeeb52dcea54fd5a154d11ddabca4383a3f93b3 Mon Sep 17 00:00:00 2001 From: "Dmitriy D. Volkov" Date: Mon, 22 Jul 2024 18:02:01 +0300 Subject: [PATCH 10/13] add author to changelog --- changelog.d/20834_nats_jetstream_sink.feature.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/changelog.d/20834_nats_jetstream_sink.feature.md b/changelog.d/20834_nats_jetstream_sink.feature.md index e01e5cb461d53..ae0f42f5d2d08 100644 --- a/changelog.d/20834_nats_jetstream_sink.feature.md +++ b/changelog.d/20834_nats_jetstream_sink.feature.md @@ -15,4 +15,6 @@ sinks: jetstream: true encoding: codec: json -``` \ No newline at end of file +``` + +authors: whatcouldbepizza From 7189c7fea655c462e11b20860e8785ec97279378 Mon Sep 17 00:00:00 2001 From: "Dmitriy D. Volkov" Date: Mon, 22 Jul 2024 18:02:58 +0300 Subject: [PATCH 11/13] remove example config from changelog --- .../20834_nats_jetstream_sink.feature.md | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/changelog.d/20834_nats_jetstream_sink.feature.md b/changelog.d/20834_nats_jetstream_sink.feature.md index ae0f42f5d2d08..eb8593e17219d 100644 --- a/changelog.d/20834_nats_jetstream_sink.feature.md +++ b/changelog.d/20834_nats_jetstream_sink.feature.md @@ -1,20 +1,3 @@ Add possibility to use NATS JetStream in NATS sink. Can be turned on/off via `jetstream` option (default is false). -### Example - -#### Config - -``` -sinks: - nats: - type: nats - inputs: - - in - subject: nork - url: "nats://localhost:4222" - jetstream: true - encoding: - codec: json -``` - authors: whatcouldbepizza From 602ba0d42287f2548a15cb066370ccff85dea104 Mon Sep 17 00:00:00 2001 From: "Dmitriy D. Volkov" Date: Tue, 23 Jul 2024 18:07:26 +0300 Subject: [PATCH 12/13] flush core messages --- src/sinks/nats/config.rs | 7 +++++++ src/sinks/nats/sink.rs | 7 +++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/sinks/nats/config.rs b/src/sinks/nats/config.rs index 74ef78f75f1cb..7bdba9db801bc 100644 --- a/src/sinks/nats/config.rs +++ b/src/sinks/nats/config.rs @@ -190,4 +190,11 @@ impl NatsPublisher { } } } + + pub(super) async fn flush(&self) -> Result<(), ()> { + if let NatsPublisher::Core(client) = self { + return client.flush().map_err(|_| ()).await; + } + Ok(()) + } } diff --git a/src/sinks/nats/sink.rs b/src/sinks/nats/sink.rs index a5b94d4a9c759..9cc2048a0d615 100644 --- a/src/sinks/nats/sink.rs +++ b/src/sinks/nats/sink.rs @@ -74,7 +74,7 @@ impl NatsSink { publisher: Arc::clone(&self.publisher), }); - input + let result = input .filter_map(|event| std::future::ready(self.make_nats_event(event))) .request_builder(default_request_builder_concurrency_limit(), request_builder) .filter_map(|request| async move { @@ -89,7 +89,10 @@ impl NatsSink { .into_driver(service) .protocol("nats") .run() - .await + .await; + + self.publisher.flush().await?; + return result; } } From 0188122f3ed15e54c07d33d43458a131a7f4f9ac Mon Sep 17 00:00:00 2001 From: "Dmitriy D. Volkov" Date: Wed, 24 Jul 2024 12:27:12 +0300 Subject: [PATCH 13/13] flush after each message --- src/sinks/nats/config.rs | 14 +++++++------- src/sinks/nats/sink.rs | 7 ++----- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/src/sinks/nats/config.rs b/src/sinks/nats/config.rs index 7bdba9db801bc..8388648a08c48 100644 --- a/src/sinks/nats/config.rs +++ b/src/sinks/nats/config.rs @@ -174,9 +174,16 @@ impl NatsPublisher { client .publish(subject, payload) .await + .map_err(|e| NatsError::PublishError { + source: Box::new(e), + })?; + client + .flush() + .map_ok(|_| ()) .map_err(|e| NatsError::PublishError { source: Box::new(e), }) + .await } NatsPublisher::JetStream(jetstream) => { let ack = jetstream.publish(subject, payload).await.map_err(|e| { @@ -190,11 +197,4 @@ impl NatsPublisher { } } } - - pub(super) async fn flush(&self) -> Result<(), ()> { - if let NatsPublisher::Core(client) = self { - return client.flush().map_err(|_| ()).await; - } - Ok(()) - } } diff --git a/src/sinks/nats/sink.rs b/src/sinks/nats/sink.rs index 9cc2048a0d615..a5b94d4a9c759 100644 --- a/src/sinks/nats/sink.rs +++ b/src/sinks/nats/sink.rs @@ -74,7 +74,7 @@ impl NatsSink { publisher: Arc::clone(&self.publisher), }); - let result = input + input .filter_map(|event| std::future::ready(self.make_nats_event(event))) .request_builder(default_request_builder_concurrency_limit(), request_builder) .filter_map(|request| async move { @@ -89,10 +89,7 @@ impl NatsSink { .into_driver(service) .protocol("nats") .run() - .await; - - self.publisher.flush().await?; - return result; + .await } }