diff --git a/bin_tests/src/bin/crashtracker_bin_test.rs b/bin_tests/src/bin/crashtracker_bin_test.rs index df141ce63..425240b8e 100644 --- a/bin_tests/src/bin/crashtracker_bin_test.rs +++ b/bin_tests/src/bin/crashtracker_bin_test.rs @@ -57,6 +57,7 @@ mod unix { resolve_frames: crashtracker::StacktraceCollection::WithoutSymbols, endpoint, timeout_ms: TEST_COLLECTOR_TIMEOUT_MS, + unix_socket_path: Some("".to_string()), }; let metadata = CrashtrackerMetadata { diff --git a/crashtracker-ffi/src/collector/datatypes.rs b/crashtracker-ffi/src/collector/datatypes.rs index ca5d39b07..c73fa354b 100644 --- a/crashtracker-ffi/src/collector/datatypes.rs +++ b/crashtracker-ffi/src/collector/datatypes.rs @@ -70,6 +70,8 @@ pub struct Config<'a> { /// This is given as a uint32_t, but the actual timeout needs to fit inside of an i32 (max /// 2^31-1). This is a limitation of the various interfaces used to guarantee the timeout. pub timeout_ms: u32, + /// Optional filename for a unix domain socket if the receiver is used asynchonously + pub optional_unix_socket_filename: CharSlice<'a>, } impl<'a> TryFrom> for datadog_crashtracker::CrashtrackerConfiguration { @@ -87,6 +89,7 @@ impl<'a> TryFrom> for datadog_crashtracker::CrashtrackerConfiguration let endpoint = value.endpoint.cloned(); let resolve_frames = value.resolve_frames; let timeout_ms = value.timeout_ms; + let unix_socket_path = option_from_char_slice(value.optional_unix_socket_filename)?; Self::new( additional_files, create_alt_stack, @@ -94,6 +97,7 @@ impl<'a> TryFrom> for datadog_crashtracker::CrashtrackerConfiguration endpoint, resolve_frames, timeout_ms, + unix_socket_path, ) } } diff --git a/crashtracker-ffi/src/collector/mod.rs b/crashtracker-ffi/src/collector/mod.rs index 7a8343b6e..830279694 100644 --- a/crashtracker-ffi/src/collector/mod.rs +++ b/crashtracker-ffi/src/collector/mod.rs @@ -8,6 +8,7 @@ use super::crash_info::Metadata; use crate::Result; use anyhow::Context; pub use counters::*; +use datadog_crashtracker::CrashtrackerReceiverConfig; pub use datatypes::*; pub use spans::*; @@ -94,3 +95,47 @@ pub unsafe extern "C" fn ddog_crasht_init( .context("ddog_crasht_init failed") .into() } + +#[no_mangle] +#[must_use] +/// Initialize the crash-tracking infrastructure without launching the receiver. +/// +/// # Preconditions +/// Requires `config` to be given with a `unix_socket_path`, which is normally optional. +/// # Safety +/// Crash-tracking functions are not reentrant. +/// No other crash-handler functions should be called concurrently. +/// # Atomicity +/// This function is not atomic. A crash during its execution may lead to +/// unexpected crash-handling behaviour. +pub unsafe extern "C" fn ddog_crasht_init_without_receiver( + config: Config, + metadata: Metadata, +) -> Result { + (|| { + let config: datadog_crashtracker::CrashtrackerConfiguration = config.try_into()?; + let metadata = metadata.try_into()?; + + // If the unix domain socket path is not set, then we throw an error--there's currently no + // other way to specify communication between an async receiver and a collector, so this + // isn't a valid configuration. + if config.unix_socket_path.is_none() { + return Err(anyhow::anyhow!("config.unix_socket_path must be set")); + } + if config.unix_socket_path.as_ref().unwrap().is_empty() { + return Err(anyhow::anyhow!("config.unix_socket_path can't be empty")); + } + + // Populate an empty receiver config + let receiver_config = CrashtrackerReceiverConfig { + args: vec![], + env: vec![], + path_to_receiver_binary: "".to_string(), + stderr_filename: None, + stdout_filename: None, + }; + datadog_crashtracker::init(config, receiver_config, metadata) + })() + .context("ddog_crasht_init failed") + .into() +} diff --git a/crashtracker/src/collector/api.rs b/crashtracker/src/collector/api.rs index 984369bb2..94f05050e 100644 --- a/crashtracker/src/collector/api.rs +++ b/crashtracker/src/collector/api.rs @@ -79,8 +79,6 @@ pub fn init( receiver_config: CrashtrackerReceiverConfig, metadata: CrashtrackerMetadata, ) -> anyhow::Result<()> { - // Setup the receiver first, so that if there is a crash detected it has - // somewhere to go. update_metadata(metadata)?; update_config(config)?; configure_receiver(receiver_config); @@ -131,6 +129,7 @@ fn test_crash() -> anyhow::Result<()> { endpoint, resolve_frames, timeout_ms, + None, )?; let metadata = CrashtrackerMetadata::new( "libname".to_string(), @@ -187,6 +186,7 @@ fn test_altstack_paradox() -> anyhow::Result<()> { endpoint, resolve_frames, timeout_ms, + None, ); // This is slightly over-tuned to the language of the error message, but it'd require some @@ -253,6 +253,7 @@ fn test_altstack_use_create() -> anyhow::Result<()> { endpoint, resolve_frames, timeout_ms, + None, )?; let metadata = CrashtrackerMetadata::new( "libname".to_string(), @@ -379,6 +380,7 @@ fn test_altstack_use_nocreate() -> anyhow::Result<()> { endpoint, resolve_frames, timeout_ms, + None, )?; let metadata = CrashtrackerMetadata::new( "libname".to_string(), @@ -505,6 +507,7 @@ fn test_altstack_nouse() -> anyhow::Result<()> { endpoint, resolve_frames, timeout_ms, + None, )?; let metadata = CrashtrackerMetadata::new( "libname".to_string(), @@ -666,6 +669,7 @@ fn test_waitall_nohang() -> anyhow::Result<()> { endpoint, resolve_frames, timeout_ms, + None, )?; let metadata = CrashtrackerMetadata::new( diff --git a/crashtracker/src/collector/crash_handler.rs b/crashtracker/src/collector/crash_handler.rs index 617bc38d1..c0ab68fa0 100644 --- a/crashtracker/src/collector/crash_handler.rs +++ b/crashtracker/src/collector/crash_handler.rs @@ -62,6 +62,7 @@ struct OldHandlers { struct Receiver { receiver_uds: RawFd, receiver_pid: i32, + oneshot: bool, } // The args_cstrings and env_vars_strings fields are just storage. Even though they're @@ -291,6 +292,7 @@ fn make_receiver(config: &CrashtrackerReceiverConfig) -> anyhow::Result { @@ -434,6 +436,60 @@ extern "C" fn handle_posix_sigaction(signum: i32, sig_info: *mut siginfo_t, ucon }; } +fn receiver_from_socket(unix_socket_path: &str) -> anyhow::Result { + // Creates a fake "Receiver", which can be waited on like a normal receiver. + // This is intended to support configurations where the collector is speaking to a long-lived, + // async receiver process. + if unix_socket_path.is_empty() { + return Err(anyhow::anyhow!("No receiver path provided")); + } + #[cfg(target_os = "linux")] + let unix_stream = if unix_socket_path.starts_with(['.', '/']) { + UnixStream::connect(unix_socket_path) + } else { + use std::os::linux::net::SocketAddrExt; + let addr = std::os::unix::net::SocketAddr::from_abstract_name(unix_socket_path)?; + UnixStream::connect_addr(&addr) + }; + #[cfg(not(target_os = "linux"))] + let unix_stream = UnixStream::connect(unix_socket_path); + let receiver_uds = unix_stream + .context("Failed to connect to receiver")? + .into_raw_fd(); + Ok(Receiver { + receiver_uds, + receiver_pid: 0, + oneshot: false, + }) +} + +fn receiver_finish(receiver: Receiver, start_time: Instant, timeout_ms: u32) { + let pollhup_allowed_ms = timeout_ms + .saturating_sub(start_time.elapsed().as_millis() as u32) + .min(i32::MAX as u32) as i32; + let _ = wait_for_pollhup(receiver.receiver_uds, pollhup_allowed_ms); + + // If this is a oneshot-type receiver (i.e., we spawned it), then we now need to ensure it gets + // cleaned up. + // We explicitly avoid the case where the receiver PID is 1. This is unbelievably unlikely, but + // should the situation arise we just walk away and let the PID leak. + if receiver.oneshot && receiver.receiver_pid > 1 { + // Either the receiver is done, it timed out, or something failed. + // In any case, can't guarantee that the receiver will exit. + // SIGKILL will ensure that the process ends eventually, but there's + // no bound on that time. + // We emit SIGKILL and try to reap its exit status for the remaining time, then give up. + unsafe { + libc::kill(receiver.receiver_pid, libc::SIGKILL); + } + + let receiver_pid_as_pid = Pid::from_raw(receiver.receiver_pid); + let reaping_allowed_ms = timeout_ms.saturating_sub(start_time.elapsed().as_millis() as u32); + + let _ = reap_child_non_blocking(receiver_pid_as_pid, reaping_allowed_ms); + } +} + fn handle_posix_signal_impl(signum: i32, sig_info: *mut siginfo_t) -> anyhow::Result<()> { // If this is a SIGSEGV signal, it could be called due to a stack overflow. In that case, since // this signal allocates to the stack and cannot guarantee it is running without SA_NODEFER, it @@ -453,6 +509,8 @@ fn handle_posix_signal_impl(signum: i32, sig_info: *mut siginfo_t) -> anyhow::Re // Leak config and metadata to avoid calling `drop` during a crash // Note that these operations also replace the global states. When the one-time guard is // passed, all global configuration and metadata becomes invalid. + // In a perfet world, we'd also grab the receiver config in this section, but since the + // execution forks based on whether or not the receiver is configured, we check that later. let config = CONFIG.swap(ptr::null_mut(), SeqCst); anyhow::ensure!(!config.is_null(), "No crashtracking config"); let (config, config_str) = unsafe { config.as_ref().context("No crashtracking receiver")? }; @@ -461,12 +519,10 @@ fn handle_posix_signal_impl(signum: i32, sig_info: *mut siginfo_t) -> anyhow::Re anyhow::ensure!(!metadata_ptr.is_null(), "No crashtracking metadata"); let (_metadata, metadata_string) = unsafe { metadata_ptr.as_ref().context("metadata ptr")? }; - let receiver_config = RECEIVER_CONFIG.swap(ptr::null_mut(), SeqCst); - anyhow::ensure!( - !receiver_config.is_null(), - "No crashtracking receiver config" - ); - let receiver_config = unsafe { receiver_config.as_ref().context("receiver config")? }; + let receiver_config = RECEIVER_CONFIG.load(SeqCst); + if receiver_config.is_null() { + return Err(anyhow::anyhow!("No receiver config")); + } // Since we've gotten this far, we're going to start working on the crash report. This // operation needs to be mindful of the total walltime elapsed during handling. This isn't only @@ -492,13 +548,23 @@ fn handle_posix_signal_impl(signum: i32, sig_info: *mut siginfo_t) -> anyhow::Re // disrupted. let _guard = SaGuard::<2>::new(&[signal::SIGCHLD, signal::SIGPIPE])?; - // Even though we just set a guard, we'll have to undo part of it in the receiver process in - // order to let it reap its own children properly. We have to do this anyway, so do it here in - // order to ensure that _this_ process has the right flags (especially for SIGCHLD). - let receiver = make_receiver(receiver_config)?; + // Optionally, create the receiver. This all hinges on whether or not the configuration has a + // non-null unix domain socket specified. If it doesn't, then we need to check the receiver + // configuration. If it does, then we just connect to the socket. + let unix_socket_path = config.unix_socket_path.clone().unwrap_or_default(); - // Creating this stream means the underlying RawFD is now owned by the stream, so - // we shouldn't close it manually. + let receiver = if !unix_socket_path.is_empty() { + receiver_from_socket(&unix_socket_path)? + } else { + let receiver_config = RECEIVER_CONFIG.load(SeqCst); + if receiver_config.is_null() { + return Err(anyhow::anyhow!("No receiver config")); + } + let receiver_config = unsafe { receiver_config.as_ref().context("receiver config")? }; + make_receiver(receiver_config)? + }; + + // No matter how the receiver was created, attach to its stream let mut unix_stream = unsafe { UnixStream::from_raw_fd(receiver.receiver_uds) }; // Currently the emission of the crash report doesn't have a firm time guarantee @@ -518,26 +584,8 @@ fn handle_posix_signal_impl(signum: i32, sig_info: *mut siginfo_t) -> anyhow::Re .shutdown(std::net::Shutdown::Write) .context("Could not shutdown writing on the stream")?; - // We have to wait for the receiver process and reap its exit status. - let pollhup_allowed_ms = timeout_ms - .saturating_sub(start_time.elapsed().as_millis() as u32) - .min(i32::MAX as u32) as i32; - let _ = wait_for_pollhup(receiver.receiver_uds, pollhup_allowed_ms) - .context("Failed to wait for pollhup")?; - - // Either the receiver is done, it timed out, or something failed. - // In any case, can't guarantee that the receiver will exit. - // SIGKILL will ensure that the process ends eventually, but there's - // no bound on that time. - // We emit SIGKILL and try to reap its exit status for the remaining time, then just give - // up. - unsafe { - libc::kill(receiver.receiver_pid, libc::SIGKILL); - } - let receiver_pid_as_pid = Pid::from_raw(receiver.receiver_pid); - let reaping_allowed_ms = timeout_ms.saturating_sub(start_time.elapsed().as_millis() as u32); - let _ = reap_child_non_blocking(receiver_pid_as_pid, reaping_allowed_ms) - .context("Failed to reap receiver process")?; + // We're done. Wrap up our interaction with the receiver. + receiver_finish(receiver, start_time, timeout_ms); res } diff --git a/crashtracker/src/receiver.rs b/crashtracker/src/receiver.rs index 5b0ab902c..aaef56de7 100644 --- a/crashtracker/src/receiver.rs +++ b/crashtracker/src/receiver.rs @@ -414,6 +414,7 @@ mod tests { None, StacktraceCollection::Disabled, 3000, + None, )?)?, ) .await?; diff --git a/crashtracker/src/shared/configuration.rs b/crashtracker/src/shared/configuration.rs index ccd4f8e1c..6f92932de 100644 --- a/crashtracker/src/shared/configuration.rs +++ b/crashtracker/src/shared/configuration.rs @@ -28,6 +28,7 @@ pub struct CrashtrackerConfiguration { pub endpoint: Option, pub resolve_frames: StacktraceCollection, pub timeout_ms: u32, + pub unix_socket_path: Option, } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] @@ -48,12 +49,7 @@ impl CrashtrackerReceiverConfig { stdout_filename: Option, ) -> anyhow::Result { anyhow::ensure!( - !path_to_receiver_binary.is_empty(), - "Expected a receiver binary" - ); - anyhow::ensure!( - stderr_filename.is_none() && stdout_filename.is_none() - || stderr_filename != stdout_filename, + stderr_filename.is_some() && stderr_filename != stdout_filename, "Can't give the same filename for stderr and stdout, they will conflict with each other" ); @@ -76,6 +72,7 @@ impl CrashtrackerConfiguration { endpoint: Option, resolve_frames: StacktraceCollection, timeout_ms: u32, + unix_socket_path: Option, ) -> anyhow::Result { // Requesting to create, but not use, the altstack is considered paradoxical. anyhow::ensure!( @@ -89,6 +86,8 @@ impl CrashtrackerConfiguration { } else { timeout_ms }; + // Note: don't check the receiver socket upfront, since a configuration can be interned + // before the receiver is started when using an async-receiver. Ok(Self { additional_files, create_alt_stack, @@ -96,6 +95,7 @@ impl CrashtrackerConfiguration { endpoint, resolve_frames, timeout_ms, + unix_socket_path, }) } }