Skip to content

Commit

Permalink
use nats jetstream as an option
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmitriy D. Volkov authored and whatcouldbepizza committed Jul 10, 2024
1 parent 0a72db7 commit 243c492
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 10 deletions.
54 changes: 54 additions & 0 deletions src/sinks/nats/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use bytes::Bytes;
use futures_util::TryFutureExt;
use snafu::ResultExt;
use vector_lib::codecs::JsonSerializerConfig;
Expand Down Expand Up @@ -76,6 +77,10 @@ pub struct NatsSinkConfig {
#[configurable(derived)]
#[serde(default)]
pub(super) request: TowerRequestConfig<NatsTowerRequestConfigDefaults>,

#[configurable(derived)]
#[serde(default)]
pub(super) jetstream: bool,
}

fn default_name() -> String {
Expand All @@ -93,6 +98,7 @@ impl GenerateConfig for NatsSinkConfig {
tls: None,
url: "nats://127.0.0.1:4222".into(),
request: Default::default(),
jetstream: Default::default(),
})
.unwrap()
}
Expand Down Expand Up @@ -130,8 +136,56 @@ impl NatsSinkConfig {

options.connect(&self.url).await.context(ConnectSnafu)
}

pub(super) async fn publisher(&self) -> Result<NatsPublisher, NatsError> {
let connection = self.connect().await?;

match self.jetstream {
true => Ok(NatsPublisher::JetStream(async_nats::jetstream::new(
connection,
))),
false => Ok(NatsPublisher::Core(connection)),
}
}
}

async fn healthcheck(config: NatsSinkConfig) -> crate::Result<()> {
config.connect().map_ok(|_| ()).map_err(|e| e.into()).await
}

pub enum NatsPublisher {
Core(async_nats::Client),
JetStream(async_nats::jetstream::Context),
}

impl NatsPublisher {
pub(super) async fn publish<S: async_nats::subject::ToSubject>(
&self,
subject: S,
payload: Bytes,
) -> Result<(), NatsError> {
match self {
NatsPublisher::Core(client) => {
client
.publish(subject, payload)
.await
.map_err(|e| NatsError::PublishError {
source: Box::new(e),
})?;
client.flush().await.map_err(|e| NatsError::PublishError {
source: Box::new(e),
})
}
NatsPublisher::JetStream(jetstream) => {
let ack = jetstream.publish(subject, payload).await.map_err(|e| {
NatsError::PublishError {
source: Box::new(e),
}
})?;
ack.await.map(|_| ()).map_err(|e| NatsError::PublishError {
source: Box::new(e),
})
}
}
}
}
2 changes: 2 additions & 0 deletions src/sinks/nats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ enum NatsError {
Connect { source: async_nats::ConnectError },
#[snafu(display("NATS Server Error: {}", source))]
ServerError { source: async_nats::Error },
#[snafu(display("NATS Publish Error: {}", source))]
PublishError { source: async_nats::Error },
}
9 changes: 4 additions & 5 deletions src/sinks/nats/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use futures_util::TryFutureExt;

use crate::sinks::prelude::*;

use super::{request_builder::NatsRequest, NatsError};
use super::{config::NatsPublisher, request_builder::NatsRequest, NatsError};

#[derive(Clone)]
pub(super) struct NatsService {
pub(super) connection: Arc<async_nats::Client>,
pub(super) publisher: Arc<NatsPublisher>,
}

pub(super) struct NatsResponse {
Expand Down Expand Up @@ -44,13 +44,12 @@ impl Service<NatsRequest> for NatsService {
}

fn call(&mut self, req: NatsRequest) -> Self::Future {
let connection = Arc::clone(&self.connection);
let publisher = Arc::clone(&self.publisher);

Box::pin(async move {
match connection
match publisher
.publish(req.subject, req.bytes)
.map_err(async_nats::Error::from)
.and_then(|_| connection.flush().map_err(Into::into))
.await
{
Err(error) => Err(NatsError::ServerError { source: error }),
Expand Down
10 changes: 5 additions & 5 deletions src/sinks/nats/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use snafu::ResultExt;
use crate::sinks::prelude::*;

use super::{
config::{NatsSinkConfig, NatsTowerRequestConfigDefaults},
config::{NatsPublisher, NatsSinkConfig, NatsTowerRequestConfigDefaults},
request_builder::{NatsEncoder, NatsRequestBuilder},
service::{NatsResponse, NatsService},
EncodingSnafu, NatsError,
Expand All @@ -20,7 +20,7 @@ pub(super) struct NatsSink {
request: TowerRequestConfig<NatsTowerRequestConfigDefaults>,
transformer: Transformer,
encoder: Encoder<()>,
connection: Arc<async_nats::Client>,
publisher: Arc<NatsPublisher>,
subject: Template,
}

Expand All @@ -42,7 +42,7 @@ impl NatsSink {
}

pub(super) async fn new(config: NatsSinkConfig) -> Result<Self, NatsError> {
let connection = Arc::new(config.connect().await?);
let publisher = Arc::new(config.publisher().await?);
let transformer = config.encoding.transformer();
let serializer = config.encoding.build().context(EncodingSnafu)?;
let encoder = Encoder::<()>::new(serializer);
Expand All @@ -51,9 +51,9 @@ impl NatsSink {

Ok(NatsSink {
request,
connection,
transformer,
encoder,
publisher,
subject,
})
}
Expand All @@ -71,7 +71,7 @@ impl NatsSink {
let service = ServiceBuilder::new()
.settings(request, NatsRetryLogic)
.service(NatsService {
connection: Arc::clone(&self.connection),
publisher: Arc::clone(&self.publisher),
});

input
Expand Down

0 comments on commit 243c492

Please sign in to comment.