Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[crashtracking]: add named socket support back in #722

Merged
merged 14 commits into from
Nov 13, 2024
Merged
1 change: 1 addition & 0 deletions bin_tests/src/bin/crashtracker_bin_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ mod unix {
resolve_frames: crashtracker::StacktraceCollection::WithoutSymbols,
endpoint,
timeout_ms: TEST_COLLECTOR_TIMEOUT_MS,
unix_socket_path: Some("".to_string()),
};

let metadata = CrashtrackerMetadata {
Expand Down
4 changes: 4 additions & 0 deletions crashtracker-ffi/src/collector/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ pub struct Config<'a> {
/// This is given as a uint32_t, but the actual timeout needs to fit inside of an i32 (max
/// 2^31-1). This is a limitation of the various interfaces used to guarantee the timeout.
pub timeout_ms: u32,
/// Optional filename for a unix domain socket if the receiver is used asynchonously
pub optional_unix_socket_filename: CharSlice<'a>,
}

impl<'a> TryFrom<Config<'a>> for datadog_crashtracker::CrashtrackerConfiguration {
Expand All @@ -87,13 +89,15 @@ impl<'a> TryFrom<Config<'a>> for datadog_crashtracker::CrashtrackerConfiguration
let endpoint = value.endpoint.cloned();
let resolve_frames = value.resolve_frames;
let timeout_ms = value.timeout_ms;
let unix_socket_path = option_from_char_slice(value.optional_unix_socket_filename)?;
Self::new(
additional_files,
create_alt_stack,
use_alt_stack,
endpoint,
resolve_frames,
timeout_ms,
unix_socket_path,
)
}
}
Expand Down
40 changes: 40 additions & 0 deletions crashtracker-ffi/src/collector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ use crate::Result;
use anyhow::Context;
pub use counters::*;
pub use datatypes::*;
use ddcommon_ffi::{
slice::{AsBytes, CharSlice},
Slice,
};
pub use spans::*;

#[no_mangle]
Expand Down Expand Up @@ -94,3 +98,39 @@ pub unsafe extern "C" fn ddog_crasht_init(
.context("ddog_crasht_init failed")
.into()
}

#[no_mangle]
#[must_use]
/// Initialize the crash-tracking infrastructure, writing to an unix socket in case of crash.
///
/// # Preconditions
/// None.
/// # Safety
/// Crash-tracking functions are not reentrant.
/// No other crash-handler functions should be called concurrently.
/// # Atomicity
/// This function is not atomic. A crash during its execution may lead to
/// unexpected crash-handling behaviour.
pub unsafe extern "C" fn ddog_crasht_init_with_unix_socket(
config: Config,
socket_path: CharSlice,
metadata: Metadata,
) -> Result {
(|| {
let mut config: datadog_crashtracker::CrashtrackerConfiguration = config.try_into()?;
let socket_path = socket_path.try_to_utf8()?;
config.unix_socket_path = Some(socket_path.to_string());
let metadata = metadata.try_into()?;
let receiver_config = ReceiverConfig {
args: Slice::empty(),
env: Slice::empty(),
path_to_receiver_binary: CharSlice::empty(),
optional_stdout_filename: CharSlice::empty(),
optional_stderr_filename: CharSlice::empty(),
};
let receiver_config = receiver_config.try_into()?;
datadog_crashtracker::init(config, receiver_config, metadata)
})()
.context("ddog_crasht_init failed")
.into()
}
8 changes: 6 additions & 2 deletions crashtracker/src/collector/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ pub fn init(
receiver_config: CrashtrackerReceiverConfig,
metadata: CrashtrackerMetadata,
) -> anyhow::Result<()> {
// Setup the receiver first, so that if there is a crash detected it has
// somewhere to go.
update_metadata(metadata)?;
update_config(config)?;
configure_receiver(receiver_config);
Expand Down Expand Up @@ -131,6 +129,7 @@ fn test_crash() -> anyhow::Result<()> {
endpoint,
resolve_frames,
timeout_ms,
None,
)?;
let metadata = CrashtrackerMetadata::new(
"libname".to_string(),
Expand Down Expand Up @@ -187,6 +186,7 @@ fn test_altstack_paradox() -> anyhow::Result<()> {
endpoint,
resolve_frames,
timeout_ms,
None,
);

// This is slightly over-tuned to the language of the error message, but it'd require some
Expand Down Expand Up @@ -253,6 +253,7 @@ fn test_altstack_use_create() -> anyhow::Result<()> {
endpoint,
resolve_frames,
timeout_ms,
None,
)?;
let metadata = CrashtrackerMetadata::new(
"libname".to_string(),
Expand Down Expand Up @@ -379,6 +380,7 @@ fn test_altstack_use_nocreate() -> anyhow::Result<()> {
endpoint,
resolve_frames,
timeout_ms,
None,
)?;
let metadata = CrashtrackerMetadata::new(
"libname".to_string(),
Expand Down Expand Up @@ -505,6 +507,7 @@ fn test_altstack_nouse() -> anyhow::Result<()> {
endpoint,
resolve_frames,
timeout_ms,
None,
)?;
let metadata = CrashtrackerMetadata::new(
"libname".to_string(),
Expand Down Expand Up @@ -665,6 +668,7 @@ fn test_waitall_nohang() -> anyhow::Result<()> {
endpoint,
resolve_frames,
timeout_ms,
None,
)?;

let metadata = CrashtrackerMetadata::new(
Expand Down
102 changes: 70 additions & 32 deletions crashtracker/src/collector/crash_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ struct OldHandlers {
struct Receiver {
receiver_uds: RawFd,
receiver_pid: i32,
oneshot: bool,
}

// The args_cstrings and env_vars_strings fields are just storage. Even though they're
Expand Down Expand Up @@ -291,6 +292,7 @@ fn make_receiver(config: &CrashtrackerReceiverConfig) -> anyhow::Result<Receiver
Ok(Receiver {
receiver_uds: uds_parent,
receiver_pid: pid,
oneshot: true,
})
}
_ => {
Expand Down Expand Up @@ -434,6 +436,50 @@ extern "C" fn handle_posix_sigaction(signum: i32, sig_info: *mut siginfo_t, ucon
};
}

fn receiver_from_socket(unix_socket_path: &str) -> anyhow::Result<Receiver> {
// Creates a fake "Receiver", which can be waited on like a normal receiver.
// This is intended to support configurations where the collector is speaking to a long-lived,
// async receiver process.
if unix_socket_path.is_empty() {
return Err(anyhow::anyhow!("No receiver path provided"));
}
let socket_path = std::path::Path::new(unix_socket_path);
let unix_stream = UnixStream::connect(socket_path).context("Failed to connect to receiver")?;
let receiver_uds = unix_stream.into_raw_fd();
Ok(Receiver {
receiver_uds,
receiver_pid: 0,
oneshot: false,
})
}

fn receiver_finish(receiver: Receiver, start_time: Instant, timeout_ms: u32) {
let pollhup_allowed_ms = timeout_ms
.saturating_sub(start_time.elapsed().as_millis() as u32)
.min(i32::MAX as u32) as i32;
let _ = wait_for_pollhup(receiver.receiver_uds, pollhup_allowed_ms);

// If this is a oneshot-type receiver (i.e., we spawned it), then we now need to ensure it gets
// cleaned up.
// We explicitly avoid the case where the receiver PID is 1. This is unbelievably unlikely, but
// should the situation arise we just walk away and let the PID leak.
if receiver.oneshot && receiver.receiver_pid > 1 {
// Either the receiver is done, it timed out, or something failed.
// In any case, can't guarantee that the receiver will exit.
// SIGKILL will ensure that the process ends eventually, but there's
// no bound on that time.
// We emit SIGKILL and try to reap its exit status for the remaining time, then give up.
unsafe {
libc::kill(receiver.receiver_pid, libc::SIGKILL);
}

let receiver_pid_as_pid = Pid::from_raw(receiver.receiver_pid);
let reaping_allowed_ms = timeout_ms.saturating_sub(start_time.elapsed().as_millis() as u32);

let _ = reap_child_non_blocking(receiver_pid_as_pid, reaping_allowed_ms);
}
}

fn handle_posix_signal_impl(signum: i32, sig_info: *mut siginfo_t) -> anyhow::Result<()> {
// If this is a SIGSEGV signal, it could be called due to a stack overflow. In that case, since
// this signal allocates to the stack and cannot guarantee it is running without SA_NODEFER, it
Expand All @@ -453,6 +499,8 @@ fn handle_posix_signal_impl(signum: i32, sig_info: *mut siginfo_t) -> anyhow::Re
// Leak config and metadata to avoid calling `drop` during a crash
// Note that these operations also replace the global states. When the one-time guard is
// passed, all global configuration and metadata becomes invalid.
// In a perfet world, we'd also grab the receiver config in this section, but since the
// execution forks based on whether or not the receiver is configured, we check that later.
let config = CONFIG.swap(ptr::null_mut(), SeqCst);
anyhow::ensure!(!config.is_null(), "No crashtracking config");
let (config, config_str) = unsafe { config.as_ref().context("No crashtracking receiver")? };
Expand All @@ -461,12 +509,10 @@ fn handle_posix_signal_impl(signum: i32, sig_info: *mut siginfo_t) -> anyhow::Re
anyhow::ensure!(!metadata_ptr.is_null(), "No crashtracking metadata");
let (_metadata, metadata_string) = unsafe { metadata_ptr.as_ref().context("metadata ptr")? };

let receiver_config = RECEIVER_CONFIG.swap(ptr::null_mut(), SeqCst);
anyhow::ensure!(
!receiver_config.is_null(),
"No crashtracking receiver config"
);
let receiver_config = unsafe { receiver_config.as_ref().context("receiver config")? };
let receiver_config = RECEIVER_CONFIG.load(SeqCst);
if receiver_config.is_null() {
return Err(anyhow::anyhow!("No receiver config"));
}

// Since we've gotten this far, we're going to start working on the crash report. This
// operation needs to be mindful of the total walltime elapsed during handling. This isn't only
Expand All @@ -492,13 +538,23 @@ fn handle_posix_signal_impl(signum: i32, sig_info: *mut siginfo_t) -> anyhow::Re
// disrupted.
let _guard = SaGuard::<2>::new(&[signal::SIGCHLD, signal::SIGPIPE])?;

// Even though we just set a guard, we'll have to undo part of it in the receiver process in
// order to let it reap its own children properly. We have to do this anyway, so do it here in
// order to ensure that _this_ process has the right flags (especially for SIGCHLD).
let receiver = make_receiver(receiver_config)?;
// Optionally, create the receiver. This all hinges on whether or not the configuration has a
// non-null unix domain socket specified. If it doesn't, then we need to check the receiver
// configuration. If it does, then we just connect to the socket.
let unix_socket_path = config.unix_socket_path.clone().unwrap_or_default();

let receiver = if !unix_socket_path.is_empty() {
receiver_from_socket(&unix_socket_path)?
} else {
let receiver_config = RECEIVER_CONFIG.load(SeqCst);
if receiver_config.is_null() {
return Err(anyhow::anyhow!("No receiver config"));
}
let receiver_config = unsafe { receiver_config.as_ref().context("receiver config")? };
make_receiver(receiver_config)?
};

// Creating this stream means the underlying RawFD is now owned by the stream, so
// we shouldn't close it manually.
// No matter how the receiver was creatd, attach to its stream
sanchda marked this conversation as resolved.
Show resolved Hide resolved
let mut unix_stream = unsafe { UnixStream::from_raw_fd(receiver.receiver_uds) };

// Currently the emission of the crash report doesn't have a firm time guarantee
Expand All @@ -518,26 +574,8 @@ fn handle_posix_signal_impl(signum: i32, sig_info: *mut siginfo_t) -> anyhow::Re
.shutdown(std::net::Shutdown::Write)
.context("Could not shutdown writing on the stream")?;

// We have to wait for the receiver process and reap its exit status.
let pollhup_allowed_ms = timeout_ms
.saturating_sub(start_time.elapsed().as_millis() as u32)
.min(i32::MAX as u32) as i32;
let _ = wait_for_pollhup(receiver.receiver_uds, pollhup_allowed_ms)
.context("Failed to wait for pollhup")?;

// Either the receiver is done, it timed out, or something failed.
// In any case, can't guarantee that the receiver will exit.
// SIGKILL will ensure that the process ends eventually, but there's
// no bound on that time.
// We emit SIGKILL and try to reap its exit status for the remaining time, then just give
// up.
unsafe {
libc::kill(receiver.receiver_pid, libc::SIGKILL);
}
let receiver_pid_as_pid = Pid::from_raw(receiver.receiver_pid);
let reaping_allowed_ms = timeout_ms.saturating_sub(start_time.elapsed().as_millis() as u32);
let _ = reap_child_non_blocking(receiver_pid_as_pid, reaping_allowed_ms)
.context("Failed to reap receiver process")?;
// We're done. Wrap up our interaction with the receiver.
receiver_finish(receiver, start_time, timeout_ms);

res
}
Expand Down
1 change: 1 addition & 0 deletions crashtracker/src/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ mod tests {
None,
StacktraceCollection::Disabled,
3000,
None,
)?)?,
)
.await?;
Expand Down
5 changes: 5 additions & 0 deletions crashtracker/src/shared/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub struct CrashtrackerConfiguration {
pub endpoint: Option<Endpoint>,
pub resolve_frames: StacktraceCollection,
pub timeout_ms: u32,
pub unix_socket_path: Option<String>,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
Expand Down Expand Up @@ -76,6 +77,7 @@ impl CrashtrackerConfiguration {
endpoint: Option<Endpoint>,
resolve_frames: StacktraceCollection,
timeout_ms: u32,
unix_socket_path: Option<String>,
) -> anyhow::Result<Self> {
// Requesting to create, but not use, the altstack is considered paradoxical.
anyhow::ensure!(
Expand All @@ -89,13 +91,16 @@ impl CrashtrackerConfiguration {
} else {
timeout_ms
};
// Note: don't check the receiver socket upfront, since a configuration can be interned
// before the receiver is started when using an async-receiver.
Ok(Self {
additional_files,
create_alt_stack,
use_alt_stack,
endpoint,
resolve_frames,
timeout_ms,
unix_socket_path,
})
}
}
Loading