Skip to content

Commit

Permalink
fix(socket sink): gracefully shutdown on reload when stream is termin…
Browse files Browse the repository at this point in the history
…ated (#21455)

* fix(socket sink): gracefully shutdown on reload when stream is terminated

* fix configured out dep
  • Loading branch information
neuronull authored Oct 8, 2024
1 parent 5581b24 commit 597ebd1
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 25 deletions.
3 changes: 3 additions & 0 deletions changelog.d/21455-socket-sink-graceful-reload.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
All TCP based socket sinks now gracefully handle config reloads under load. Previously, when a configuration reload occurred and data was flowing through the topology, the vector process crashed due to the TCP sink attempting to access the stream when it had been terminated.

authors: neuronull
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub mod serde;
#[cfg(windows)]
pub mod service;
pub mod signal;
#[cfg(all(any(feature = "sinks-socket", feature = "sinks-statsd"), unix))]
#[cfg(unix)]
pub(crate) mod sink_ext;
#[allow(unreachable_pub)]
pub mod sinks;
Expand Down
48 changes: 24 additions & 24 deletions src/sinks/util/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::{
use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use futures::{stream::BoxStream, task::noop_waker_ref, SinkExt, StreamExt};
use futures_util::{future::ready, stream};
use snafu::{ResultExt, Snafu};
use tokio::{
io::{AsyncRead, ReadBuf},
Expand All @@ -29,6 +28,7 @@ use crate::{
ConnectionOpen, OpenGauge, SocketMode, SocketSendError, TcpSocketConnectionEstablished,
TcpSocketConnectionShutdown, TcpSocketOutgoingConnectionError,
},
sink_ext::VecSinkExt,
sinks::{
util::{
retries::ExponentialBackoff,
Expand Down Expand Up @@ -281,34 +281,34 @@ where
// We need [Peekable](https://docs.rs/futures/0.3.6/futures/stream/struct.Peekable.html) for initiating
// connection only when we have something to send.
let mut encoder = self.encoder.clone();
let mut input = input.map(|mut event| {
let byte_size = event.size_of();
let json_byte_size = event.estimated_json_encoded_size_of();
let finalizers = event.metadata_mut().take_finalizers();
self.transformer.transform(&mut event);
let mut bytes = BytesMut::new();

// Errors are handled by `Encoder`.
if encoder.encode(event, &mut bytes).is_ok() {
let item = bytes.freeze();
EncodedEvent {
item,
finalizers,
byte_size,
json_byte_size,
let mut input = input
.map(|mut event| {
let byte_size = event.size_of();
let json_byte_size = event.estimated_json_encoded_size_of();
let finalizers = event.metadata_mut().take_finalizers();
self.transformer.transform(&mut event);
let mut bytes = BytesMut::new();

// Errors are handled by `Encoder`.
if encoder.encode(event, &mut bytes).is_ok() {
let item = bytes.freeze();
EncodedEvent {
item,
finalizers,
byte_size,
json_byte_size,
}
} else {
EncodedEvent::new(Bytes::new(), 0, JsonSize::zero())
}
} else {
EncodedEvent::new(Bytes::new(), 0, JsonSize::zero())
}
});
})
.peekable();

while let Some(item) = input.next().await {
while Pin::new(&mut input).peek().await.is_some() {
let mut sink = self.connect().await;
let _open_token = OpenGauge::new().open(|count| emit!(ConnectionOpen { count }));

let mut mapped_input = stream::once(ready(item)).chain(&mut input).map(Ok);

let result = match sink.send_all(&mut mapped_input).await {
let result = match sink.send_all_peekable(&mut (&mut input).peekable()).await {
Ok(()) => sink.close().await,
Err(error) => Err(error),
};
Expand Down

0 comments on commit 597ebd1

Please sign in to comment.