Skip to content

Commit

Permalink
Allow the SidecarTransport to recreate its underlying transport (#440)
Browse files Browse the repository at this point in the history
* Allow the SidecarTransport to recreate its underlying transport in case it is broken.

This is handled in the Rust code so that sidecar's clients (like the PHP tracer)
can use the SidecarTransport in a thread safe way.

* Expose reconnect function in sidecar-ffi

* Add tests

* Add comments

* Fix clippy errors

* Fix windows CI

---------

Co-authored-by: Bob Weinand <[email protected]>
  • Loading branch information
iamluc and bwoebi authored May 27, 2024
1 parent 315a093 commit 259ca9c
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 5 deletions.
32 changes: 32 additions & 0 deletions sidecar-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ pub unsafe extern "C" fn ddog_sidecar_runtimeMeta_drop(meta: Box<RuntimeMetadata
drop(meta)
}

/// Reports the runtime configuration to the telemetry.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_sidecar_telemetry_enqueueConfig(
Expand All @@ -282,6 +283,7 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_enqueueConfig(
MaybeError::None
}

/// Reports a dependency to the telemetry.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_sidecar_telemetry_addDependency(
Expand Down Expand Up @@ -309,6 +311,7 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_addDependency(
MaybeError::None
}

/// Reports an integration to the telemetry.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_sidecar_telemetry_addIntegration(
Expand Down Expand Up @@ -340,6 +343,7 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_addIntegration(
MaybeError::None
}

/// Registers a service and flushes any queued actions.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_sidecar_telemetry_flushServiceData(
Expand All @@ -362,6 +366,7 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_flushServiceData(
MaybeError::None
}

/// Enqueues a list of actions to be performed.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_sidecar_telemetry_end(
Expand All @@ -381,11 +386,13 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_end(
MaybeError::None
}

// Returns whether the sidecar transport is closed or not.
#[no_mangle]
pub extern "C" fn ddog_sidecar_is_closed(transport: &mut Box<SidecarTransport>) -> bool {
transport.is_closed()
}

/// Sets the configuration for a session.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_sidecar_session_set_config(
Expand Down Expand Up @@ -456,6 +463,7 @@ impl<'a> TryInto<SerializedTracerHeaderTags> for &'a TracerHeaderTags<'a> {
}
}

/// Sends a trace to the sidecar via shared memory.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_sidecar_send_trace_v04_shm(
Expand All @@ -476,6 +484,7 @@ pub unsafe extern "C" fn ddog_sidecar_send_trace_v04_shm(
MaybeError::None
}

/// Sends a trace as bytes to the sidecar.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_sidecar_send_trace_v04_bytes(
Expand All @@ -496,6 +505,7 @@ pub unsafe extern "C" fn ddog_sidecar_send_trace_v04_bytes(
MaybeError::None
}

/// Dumps the current state of the sidecar.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_sidecar_dump(
Expand All @@ -512,6 +522,7 @@ pub unsafe extern "C" fn ddog_sidecar_dump(
ffi::CharSlice::from_raw_parts(malloced as *mut c_char, size)
}

/// Retrieves the current statistics of the sidecar.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_sidecar_stats(
Expand All @@ -528,6 +539,7 @@ pub unsafe extern "C" fn ddog_sidecar_stats(
ffi::CharSlice::from_raw_parts(malloced as *mut c_char, size)
}

/// Send a DogStatsD "count" metric.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_sidecar_dogstatsd_count(
Expand All @@ -551,6 +563,7 @@ pub unsafe extern "C" fn ddog_sidecar_dogstatsd_count(
MaybeError::None
}

/// Send a DogStatsD "distribution" metric.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_sidecar_dogstatsd_distribution(
Expand All @@ -574,6 +587,7 @@ pub unsafe extern "C" fn ddog_sidecar_dogstatsd_distribution(
MaybeError::None
}

/// Send a DogStatsD "gauge" metric.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_sidecar_dogstatsd_gauge(
Expand All @@ -597,6 +611,7 @@ pub unsafe extern "C" fn ddog_sidecar_dogstatsd_gauge(
MaybeError::None
}

/// Send a DogStatsD "histogram" metric.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_sidecar_dogstatsd_histogram(
Expand All @@ -620,6 +635,7 @@ pub unsafe extern "C" fn ddog_sidecar_dogstatsd_histogram(
MaybeError::None
}

/// Send a DogStatsD "set" metric.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_sidecar_dogstatsd_set(
Expand All @@ -642,3 +658,19 @@ pub unsafe extern "C" fn ddog_sidecar_dogstatsd_set(

MaybeError::None
}

/// This function creates a new transport using the provided callback function when the current
/// transport is closed.
///
/// # Arguments
///
/// * `transport` - The transport used for communication.
/// * `factory` - A C function that must return a pointer to "ddog_SidecarTransport"
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub extern "C" fn ddog_sidecar_reconnect(
transport: &mut Box<SidecarTransport>,
factory: unsafe extern "C" fn() -> Option<Box<SidecarTransport>>,
) {
transport.reconnect(|| unsafe { factory() });
}
155 changes: 150 additions & 5 deletions sidecar/src/service/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,94 @@ use super::{
SidecarInterfaceRequest, SidecarInterfaceResponse,
};
use crate::dogstatsd::DogStatsDAction;
use datadog_ipc::platform::ShmHandle;
use datadog_ipc::platform::{Channel, ShmHandle};
use datadog_ipc::transport::blocking::BlockingTransport;
use std::sync::Mutex;
use std::{
borrow::Cow,
io,
time::{Duration, Instant},
};
use tracing::info;

/// `SidecarTransport` is a type alias for the `BlockingTransport` struct from the `datadog_ipc`
/// crate. It is used for sending `SidecarInterfaceRequest` and receiving
/// `SidecarInterfaceResponse`.
/// `SidecarTransport` is a wrapper around a BlockingTransport struct from the `datadog_ipc` crate
/// that handles transparent reconnection.
/// It is used for sending `SidecarInterfaceRequest` and receiving `SidecarInterfaceResponse`.
///
/// This transport is used for communication between different parts of the sidecar service.
/// It is a blocking transport, meaning that it will block the current thread until the operation is
/// complete.
pub type SidecarTransport = BlockingTransport<SidecarInterfaceResponse, SidecarInterfaceRequest>;
pub struct SidecarTransport {
pub inner: Mutex<BlockingTransport<SidecarInterfaceResponse, SidecarInterfaceRequest>>,
}

impl SidecarTransport {
pub fn reconnect<F>(&mut self, factory: F)
where
F: FnOnce() -> Option<Box<SidecarTransport>>,
{
let mut transport = match self.inner.lock() {
Ok(t) => t,
Err(_) => return,
};
if transport.is_closed() {
info!("The sidecar transport is closed. Reconnecting...");
let new = match factory() {
None => return,
Some(n) => n.inner.into_inner(),
};
if new.is_err() {
return;
}
*transport = new.unwrap();
}
}

pub fn set_read_timeout(&mut self, timeout: Option<Duration>) -> io::Result<()> {
match self.inner.lock() {
Ok(mut t) => t.set_read_timeout(timeout),
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())),
}
}

pub fn set_write_timeout(&mut self, timeout: Option<Duration>) -> io::Result<()> {
match self.inner.lock() {
Ok(mut t) => t.set_write_timeout(timeout),
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())),
}
}

pub fn is_closed(&self) -> bool {
match self.inner.lock() {
Ok(t) => t.is_closed(),
// Should happen only during the "reconnection" phase. During this phase the transport
// is always closed.
Err(_) => true,
}
}

pub fn send(&mut self, item: SidecarInterfaceRequest) -> io::Result<()> {
match self.inner.lock() {
Ok(mut t) => t.send(item),
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())),
}
}

pub fn call(&mut self, item: SidecarInterfaceRequest) -> io::Result<SidecarInterfaceResponse> {
match self.inner.lock() {
Ok(mut t) => t.call(item),
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())),
}
}
}

impl From<Channel> for SidecarTransport {
fn from(c: Channel) -> Self {
SidecarTransport {
inner: Mutex::new(c.into()),
}
}
}

/// Shuts down a runtime.
///
Expand Down Expand Up @@ -261,3 +333,76 @@ pub fn ping(transport: &mut SidecarTransport) -> io::Result<Duration> {
.checked_duration_since(start)
.unwrap_or_default())
}

#[cfg(test)]
#[cfg(unix)]
mod tests {
use crate::service::blocking::SidecarTransport;
use datadog_ipc::platform::Channel;
use std::net::Shutdown;
use std::os::unix::net::{UnixListener, UnixStream};
use std::time::Duration;

#[test]
#[cfg_attr(miri, ignore)]
fn test_reconnect() {
let bind_addr = "/tmp/test_reconnect.sock";
let _ = std::fs::remove_file(bind_addr);

let listener = UnixListener::bind(bind_addr).expect("Cannot bind");
let sock = UnixStream::connect_addr(&listener.local_addr().unwrap()).unwrap();

let mut transport = SidecarTransport::from(Channel::from(sock.try_clone().unwrap()));
assert!(!transport.is_closed());

sock.shutdown(Shutdown::Both)
.expect("shutdown function failed");
assert!(transport.is_closed());

transport.reconnect(|| {
let new_sock = UnixStream::connect_addr(&listener.local_addr().unwrap()).unwrap();
Some(Box::new(SidecarTransport::from(Channel::from(new_sock))))
});
assert!(!transport.is_closed());

let _ = std::fs::remove_file(bind_addr);
}

#[test]
#[cfg_attr(miri, ignore)]
fn test_set_timeout() {
let bind_addr = "/tmp/test_set_timeout.sock";
let _ = std::fs::remove_file(bind_addr);

let listener = UnixListener::bind(bind_addr).expect("Cannot bind");
let sock = UnixStream::connect_addr(&listener.local_addr().unwrap()).unwrap();

let mut transport = SidecarTransport::from(Channel::from(sock.try_clone().unwrap()));
assert_eq!(
Duration::default(),
sock.read_timeout().unwrap().unwrap_or_default()
);
assert_eq!(
Duration::default(),
sock.write_timeout().unwrap().unwrap_or_default()
);

transport
.set_read_timeout(Some(Duration::from_millis(200)))
.expect("set_read_timeout function failed");
transport
.set_write_timeout(Some(Duration::from_millis(300)))
.expect("set_write_timeout function failed");

assert_eq!(
Duration::from_millis(200),
sock.read_timeout().unwrap().unwrap_or_default()
);
assert_eq!(
Duration::from_millis(300),
sock.write_timeout().unwrap().unwrap_or_default()
);

let _ = std::fs::remove_file(bind_addr);
}
}

0 comments on commit 259ca9c

Please sign in to comment.