From e64f0fb53698a1a29244e03108e7d4dbd619f540 Mon Sep 17 00:00:00 2001 From: Dominik Maier Date: Tue, 18 Jun 2024 14:58:37 +0100 Subject: [PATCH] Address comments from #2302 (#2322) * Address comments from #2302 * secure? * cleanup * early exit ftw * address clippy * Fix all the things --- libafl/Cargo.toml | 6 + .../broker_hooks/centralized_multi_machine.rs | 16 +- libafl/src/events/centralized.rs | 15 +- libafl/src/events/launcher.rs | 29 +- libafl/src/events/llmp/mgr.rs | 13 +- libafl/src/events/llmp/mod.rs | 5 +- libafl/src/events/llmp/restarting.rs | 4 +- libafl/src/events/mod.rs | 3 + libafl/src/events/multi_machine.rs | 184 ++- libafl/src/events/tcp.rs | 1364 +++++++++++++++++ libafl/src/lib.rs | 1 - libafl_bolts/src/lib.rs | 1 - libafl_bolts/src/llmp.rs | 81 +- libafl_qemu/libafl_qemu_sys/src/lib.rs | 3 + 14 files changed, 1580 insertions(+), 145 deletions(-) create mode 100644 libafl/src/events/tcp.rs diff --git a/libafl/Cargo.toml b/libafl/Cargo.toml index 7d84ca94de..68eb47e82a 100644 --- a/libafl/Cargo.toml +++ b/libafl/Cargo.toml @@ -58,6 +58,12 @@ handle_sigpipe = [] #! ## Additional Components +## Enables `TcpEventManager`, a simple EventManager proxying everything via TCP. This uses `tokio`. +tcp_manager = ["tokio", "std"] + +## Enables compression for the TCP manager +tcp_compression = ["tcp_manager", "libafl_bolts/gzip"] + ## Enable multi-machine support multi_machine = ["tokio", "std", "enumflags2", "ahash/std"] diff --git a/libafl/src/events/broker_hooks/centralized_multi_machine.rs b/libafl/src/events/broker_hooks/centralized_multi_machine.rs index 569e0801f3..4949af7eab 100644 --- a/libafl/src/events/broker_hooks/centralized_multi_machine.rs +++ b/libafl/src/events/broker_hooks/centralized_multi_machine.rs @@ -14,7 +14,6 @@ use libafl_bolts::{ shmem::ShMemProvider, ClientId, Error, }; -use log::debug; use tokio::{ net::ToSocketAddrs, runtime::Runtime, @@ -102,7 +101,7 @@ where A: Clone + Display + ToSocketAddrs + Send + Sync + 'static, I: Input + Send + Sync + 'static, { - /// Should not be created alone. Use [`TcpMultiMachineBuilder`] instead. + /// Should not be created alone. Use [`TcpMultiMachineHooksBuilder`] instead. pub(crate) fn new( shared_state: Arc>>, rt: Arc, @@ -120,8 +119,13 @@ where A: Clone + Display + ToSocketAddrs + Send + Sync + 'static, I: Input + Send + Sync + 'static, { - /// Should not be created alone. Use [`TcpMultiMachineBuilder`] instead. - pub(crate) fn new( + /// Should not be created alone. Use [`TcpMultiMachineHooksBuilder`] instead. + /// + /// # Safety + /// For [`Self::on_new_message`], this struct assumes that the `msg` parameter + /// (or rather, the memory it points to), lives sufficiently long + /// for an async background task to process it. + pub(crate) unsafe fn new( shared_state: Arc>>, rt: Arc, ) -> Self { @@ -200,7 +204,7 @@ where // TODO: do not copy here state_wr_lock.add_past_msg(msg); - debug!("Sending msg..."); + log::debug!("Sending msg..."); state_wr_lock .send_interesting_event_to_nodes(&mm_msg) @@ -239,7 +243,7 @@ where .receive_new_messages_from_nodes(&mut incoming_msgs) .await?; - debug!("received {} new incoming msg(s)", incoming_msgs.len()); + log::debug!("received {} new incoming msg(s)", incoming_msgs.len()); let msgs_to_forward: Result)>, Error> = incoming_msgs .into_iter() diff --git a/libafl/src/events/centralized.rs b/libafl/src/events/centralized.rs index 2e2e62ab35..2ad39e85a7 100644 --- a/libafl/src/events/centralized.rs +++ b/libafl/src/events/centralized.rs @@ -22,7 +22,6 @@ use libafl_bolts::{ tuples::Handle, ClientId, }; -use log::debug; use serde::{Deserialize, Serialize}; use super::NopEventManager; @@ -551,7 +550,7 @@ where }; let event: Event<<::State as UsesInput>::Input> = postcard::from_bytes(event_bytes)?; - debug!("Processor received message {}", event.name_detailed()); + log::debug!("Processor received message {}", event.name_detailed()); self.handle_in_main(fuzzer, executor, state, client_id, event)?; count += 1; } @@ -574,7 +573,7 @@ where Z: ExecutionProcessor::State> + EvaluatorObservers, { - debug!("handle_in_main!"); + log::debug!("handle_in_main!"); let event_name = event.name_detailed(); @@ -591,7 +590,7 @@ where #[cfg(feature = "multi_machine")] node_id, } => { - debug!( + log::debug!( "Received {} from {client_id:?} ({client_config:?}, forward {forward_id:?})", event_name ); @@ -604,7 +603,7 @@ where { state.scalability_monitor_mut().testcase_with_observers += 1; } - debug!( + log::debug!( "[{}] Running fuzzer with event {}", process::id(), event_name @@ -622,7 +621,7 @@ where { state.scalability_monitor_mut().testcase_without_observers += 1; } - debug!( + log::debug!( "[{}] Running fuzzer with event {}", process::id(), event_name @@ -652,7 +651,7 @@ where self.hooks.on_fire_all(state, client_id, &event)?; - debug!( + log::debug!( "[{}] Adding received Testcase {} as item #{item}...", process::id(), event_name @@ -660,7 +659,7 @@ where self.inner.fire(state, event)?; } else { - debug!("[{}] {} was discarded...)", process::id(), event_name); + log::debug!("[{}] {} was discarded...)", process::id(), event_name); } } _ => { diff --git a/libafl/src/events/launcher.rs b/libafl/src/events/launcher.rs index e70f8e02bd..21ce5cc08b 100644 --- a/libafl/src/events/launcher.rs +++ b/libafl/src/events/launcher.rs @@ -46,8 +46,6 @@ use libafl_bolts::{ shmem::ShMemProvider, tuples::{tuple_list, Handle}, }; -#[cfg(all(unix, feature = "std", feature = "fork"))] -use log::debug; #[cfg(feature = "std")] use typed_builder::TypedBuilder; @@ -57,7 +55,7 @@ use super::StdLlmpEventHook; #[cfg(all(unix, feature = "std", feature = "fork", feature = "multi_machine"))] use crate::events::multi_machine::NodeDescriptor; #[cfg(all(unix, feature = "std", feature = "fork", feature = "multi_machine"))] -use crate::events::multi_machine::TcpMultiMachineBuilder; +use crate::events::multi_machine::TcpMultiMachineHooks; #[cfg(all(unix, feature = "std", feature = "fork"))] use crate::events::{centralized::CentralizedEventManager, CentralizedLlmpHook}; #[cfg(all(unix, feature = "std", feature = "fork"))] @@ -657,7 +655,7 @@ where let num_cores = core_ids.len(); let mut handles = vec![]; - debug!("spawning on cores: {:?}", self.cores); + log::debug!("spawning on cores: {:?}", self.cores); self.opened_stdout_file = self .stdout_file @@ -700,7 +698,7 @@ where if index == 1 { // Main client - debug!("Running main client on PID {}", std::process::id()); + log::debug!("Running main client on PID {}", std::process::id()); let (state, mgr) = main_inner_mgr_builder.take().unwrap()(self, *bind_to)?; @@ -721,7 +719,7 @@ where self.main_run_client.take().unwrap()(state, c_mgr, *bind_to) } else { // Secondary clients - debug!("Running secondary client on PID {}", std::process::id()); + log::debug!("Running secondary client on PID {}", std::process::id()); let (state, mgr) = secondary_inner_mgr_builder.take().unwrap()(self, *bind_to)?; @@ -744,11 +742,18 @@ where #[cfg(feature = "multi_machine")] // Create this after forks, to avoid problems with tokio runtime - let (multi_machine_sender_hook, multi_machine_receiver_hook) = - TcpMultiMachineBuilder::build::< - SocketAddr, - <::State as UsesInput>::Input, - >(self.multi_machine_node_descriptor.clone())?; + + // # Safety + // The `multi_machine_receiver_hook` needs messages to outlive the receiver. + // The underlying memory region for incoming messages lives longer than the async thread processing them. + let TcpMultiMachineHooks { + sender: multi_machine_sender_hook, + receiver: multi_machine_receiver_hook, + } = unsafe { + TcpMultiMachineHooks::builder() + .node_descriptor(self.multi_machine_node_descriptor.clone()) + .build::<<::State as UsesInput>::Input>()? + }; let mut brokers = Brokers::new(); @@ -812,7 +817,7 @@ where brokers.add(Box::new(broker)); } - debug!( + log::debug!( "Brokers have been initialized on port {}.", std::process::id() ); diff --git a/libafl/src/events/llmp/mgr.rs b/libafl/src/events/llmp/mgr.rs index 8a03545226..b371523415 100644 --- a/libafl/src/events/llmp/mgr.rs +++ b/libafl/src/events/llmp/mgr.rs @@ -25,7 +25,6 @@ use libafl_bolts::{ llmp::{recv_tcp_msg, send_tcp_msg, TcpRequest, TcpResponse}, IP_LOCALHOST, }; -use log::debug; use serde::{Deserialize, Serialize}; #[cfg(feature = "llmp_compression")] @@ -370,7 +369,7 @@ where Ok(_) => (), Err(e) => log::error!("Failed to send tcp message {:#?}", e), } - debug!("Asking he broker to be disconnected"); + log::debug!("Asking he broker to be disconnected"); Ok(()) } @@ -423,11 +422,11 @@ where .. } => { #[cfg(feature = "std")] - debug!("[{}] Received new Testcase {evt_name} from {client_id:?} ({client_config:?}, forward {forward_id:?})", std::process::id()); + log::debug!("[{}] Received new Testcase {evt_name} from {client_id:?} ({client_config:?}, forward {forward_id:?})", std::process::id()); if self.always_interesting { let item = fuzzer.add_input(state, executor, self, input)?; - debug!("Added received Testcase as item #{item}"); + log::debug!("Added received Testcase as item #{item}"); } else { let res = if client_config.match_with(&self.configuration) && observers_buf.is_some() @@ -455,9 +454,9 @@ where )? }; if let Some(item) = res.1 { - debug!("Added received Testcase {evt_name} as item #{item}"); + log::debug!("Added received Testcase {evt_name} as item #{item}"); } else { - debug!("Testcase {evt_name} was discarded"); + log::debug!("Testcase {evt_name} was discarded"); } } } @@ -620,7 +619,7 @@ where msg }; let event: Event = postcard::from_bytes(event_bytes)?; - debug!("Received event in normal llmp {}", event.name_detailed()); + log::debug!("Received event in normal llmp {}", event.name_detailed()); self.handle_in_client(fuzzer, executor, state, client_id, event)?; count += 1; } diff --git a/libafl/src/events/llmp/mod.rs b/libafl/src/events/llmp/mod.rs index ddb50f4fa1..6ed2f27dc0 100644 --- a/libafl/src/events/llmp/mod.rs +++ b/libafl/src/events/llmp/mod.rs @@ -13,7 +13,6 @@ use libafl_bolts::{ shmem::{NopShMemProvider, ShMemProvider}, ClientId, }; -use log::debug; use serde::Deserialize; use crate::{ @@ -303,7 +302,7 @@ where Event::NewTestcase { input, forward_id, .. } => { - debug!("Received new Testcase to convert from {client_id:?} (forward {forward_id:?}, forward {forward_id:?})"); + log::debug!("Received new Testcase to convert from {client_id:?} (forward {forward_id:?}, forward {forward_id:?})"); let Some(converter) = self.converter_back.as_mut() else { return Ok(()); @@ -377,7 +376,7 @@ where }; let event: Event = postcard::from_bytes(event_bytes)?; - debug!("Processor received message {}", event.name_detailed()); + log::debug!("Processor received message {}", event.name_detailed()); self.handle_in_client(fuzzer, executor, state, manager, client_id, event)?; count += 1; } diff --git a/libafl/src/events/llmp/restarting.rs b/libafl/src/events/llmp/restarting.rs index 4227f8fea9..c28ddd3c7d 100644 --- a/libafl/src/events/llmp/restarting.rs +++ b/libafl/src/events/llmp/restarting.rs @@ -31,8 +31,6 @@ use libafl_bolts::{ use libafl_bolts::{ llmp::LlmpConnection, os::CTRL_C_EXIT, shmem::StdShMemProvider, staterestore::StateRestorer, }; -#[cfg(all(unix, feature = "fork"))] -use log::debug; use serde::{Deserialize, Serialize}; #[cfg(feature = "std")] use typed_builder::TypedBuilder; @@ -564,7 +562,7 @@ where handle.status() } ForkResult::Child => { - debug!( + log::debug!( "{} has been forked into {}", std::os::unix::process::parent_id(), std::process::id() diff --git a/libafl/src/events/mod.rs b/libafl/src/events/mod.rs index 2f900eb716..1ffd96c22f 100644 --- a/libafl/src/events/mod.rs +++ b/libafl/src/events/mod.rs @@ -16,6 +16,9 @@ pub mod launcher; #[allow(clippy::ignored_unit_patterns)] pub mod llmp; pub use llmp::*; +#[cfg(feature = "tcp_manager")] +#[allow(clippy::ignored_unit_patterns)] +pub mod tcp; pub mod broker_hooks; use alloc::{ diff --git a/libafl/src/events/multi_machine.rs b/libafl/src/events/multi_machine.rs index 45869074db..6587274c27 100644 --- a/libafl/src/events/multi_machine.rs +++ b/libafl/src/events/multi_machine.rs @@ -14,9 +14,8 @@ use std::{ use enumflags2::{bitflags, BitFlags}; #[cfg(feature = "llmp_compression")] -use libafl_bolts::bolts_prelude::GzipCompressor; +use libafl_bolts::compress::GzipCompressor; use libafl_bolts::{current_time, ownedref::OwnedRef, Error}; -use log::{debug, error}; use serde::{Deserialize, Serialize}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, @@ -30,7 +29,7 @@ use typed_builder::TypedBuilder; use crate::{ events::{Event, TcpMultiMachineLlmpReceiverHook, TcpMultiMachineLlmpSenderHook}, - inputs::Input, + inputs::{Input, NopInput}, }; const MAX_NB_RECEIVED_AT_ONCE: usize = 10; @@ -154,31 +153,85 @@ pub struct NodeDescriptor { pub flags: BitFlags, // The policy for shared messages between nodes. } +/// A set of multi-machine `broker_hooks`. +/// +/// Beware, the hooks should run in the same process as the one this function is called. +/// This is because we spawn a tokio runtime underneath. +/// Check `` for more details. +/// +/// Use `TcpMultiMachineHooks::builder()` to initialize the hooks. +/// +/// # Safety +/// The [`TcpMultiMachineLlmpReceiverHook`] assumes that the `msg` parameter +/// passed to the `on_new_message` method (or rather, the memory it points to), +/// lives sufficiently long for an async background task to process it. +#[derive(Debug)] +pub struct TcpMultiMachineHooks +where + I: Input, +{ + /// The sender hooks + pub sender: TcpMultiMachineLlmpSenderHook, + /// The hooks + pub receiver: TcpMultiMachineLlmpReceiverHook, +} + +impl TcpMultiMachineHooks<(), NopInput> { + /// Create the builder to build a new [`TcpMultiMachineHooks`] + /// containing a sender and a receiver from a [`NodeDescriptor`]. + #[must_use] + pub fn builder() -> TcpMultiMachineHooksBuilder<()> { + TcpMultiMachineHooksBuilder::<()> { + node_descriptor: None, + } + } +} + /// A Multi-machine `broker_hooks` builder. #[derive(Debug)] -pub struct TcpMultiMachineBuilder { - _private: (), +pub struct TcpMultiMachineHooksBuilder { + node_descriptor: Option>, +} + +impl TcpMultiMachineHooksBuilder { + /// Set the multi machine [`NodeDescriptor`] used by the resulting [`TcpMultiMachineHooks`]. + pub fn node_descriptor( + self, + node_descriptor: NodeDescriptor, + ) -> TcpMultiMachineHooksBuilder + where + A2: Clone + Display + ToSocketAddrs + Send + Sync + 'static, + { + TcpMultiMachineHooksBuilder:: { + node_descriptor: Some(node_descriptor), + } + } } -impl TcpMultiMachineBuilder { - /// Build a new couple [`TcpMultiMachineLlmpSenderHook`] / [`TcpMultiMachineLlmpReceiverHook`] from a [`NodeDescriptor`]. +impl TcpMultiMachineHooksBuilder +where + A: Clone + Display + ToSocketAddrs + Send + Sync + 'static, +{ + /// Build a new [`TcpMultiMachineHooks`] containing a sender and a receiver from a [`NodeDescriptor`]. /// Everything is initialized and ready to be used. /// Beware, the hooks should run in the same process as the one this function is called. /// This is because we spawn a tokio runtime underneath. /// Check `` for more details. - pub fn build( - node_descriptor: NodeDescriptor, - ) -> Result< - ( - TcpMultiMachineLlmpSenderHook, - TcpMultiMachineLlmpReceiverHook, - ), - Error, - > + /// + /// # Safety + /// The returned [`TcpMultiMachineLlmpReceiverHook`] assumes that the `msg` parameter + /// passed to the `on_new_message` method (or rather, the memory it points to), + /// lives sufficiently long for an async background task to process it. + pub unsafe fn build(mut self) -> Result, Error> where - A: Clone + Display + ToSocketAddrs + Send + Sync + 'static, I: Input + Send + Sync + 'static, { + let node_descriptor = self.node_descriptor.take().ok_or_else(|| { + Error::illegal_state( + "The node descriptor can never be `None` at this point in the code", + ) + })?; + // Create the state of the hook. This will be shared with the background server, so we wrap // it with concurrent-safe objects let state = Arc::new(RwLock::new(TcpMultiMachineState { @@ -197,10 +250,10 @@ impl TcpMultiMachineBuilder { TcpMultiMachineState::init::(&state.clone(), &rt.clone())?; } - Ok(( - TcpMultiMachineLlmpSenderHook::new(state.clone(), rt.clone()), - TcpMultiMachineLlmpReceiverHook::new(state, rt), - )) + Ok(TcpMultiMachineHooks { + sender: TcpMultiMachineLlmpSenderHook::new(state.clone(), rt.clone()), + receiver: TcpMultiMachineLlmpReceiverHook::new(state, rt), + }) } } @@ -230,10 +283,10 @@ where let timeout = current_time() + parent_lock.node_descriptor.timeout; parent_lock.parent = loop { - debug!("Trying to connect to parent @ {}..", parent_addr); + log::debug!("Trying to connect to parent @ {}..", parent_addr); match TcpStream::connect(parent_addr).await { Ok(stream) => { - debug!("Connected to parent @ {}", parent_addr); + log::debug!("Connected to parent @ {}", parent_addr); break Some(stream); } @@ -256,7 +309,7 @@ where let bg_state = self_mutex.clone(); let _handle: JoinHandle> = rt.spawn(async move { let addr = format!("0.0.0.0:{listening_port}"); - debug!("Starting background child task on {addr}..."); + log::debug!("Starting background child task on {addr}..."); let listener = TcpListener::bind(addr).await.map_err(|e| { Error::os_error(e, format!("Error while binding to port {listening_port}")) })?; @@ -264,31 +317,30 @@ where // The main listening loop. Should never fail. 'listening: loop { - debug!("listening for children on {:?}...", listener); + log::debug!("listening for children on {:?}...", listener); match listener.accept().await { Ok((mut stream, addr)) => { - debug!("{} joined the children.", addr); + log::debug!("{} joined the children.", addr); let mut state_guard = state.write().await; if let Err(e) = state_guard .send_old_events_to_stream::(&mut stream) .await { - error!("Error while send old messages: {:?}.", e); - error!("The loop will resume"); + log::error!("Error while send old messages: {e:?}."); + log::error!("The loop will resume"); continue 'listening; } state_guard.children.insert(NodeId::new(), stream); - debug!( - "[pid {}]{} added the child. nb children: {}", + log::debug!( + "[pid {}]{addr} added the child. nb children: {}", process::id(), - addr, state_guard.children.len() ); } Err(e) => { - error!("Error while accepting child {:?}.", e); + log::error!("Error while accepting child {e:?}."); } } } @@ -318,7 +370,7 @@ where ) -> Result>, Error> { // 0. Check if we should try to fetch something from the stream let mut dummy_byte: [u8; 1] = [0u8]; - debug!("Starting read msg..."); + log::debug!("Starting read msg..."); let n_read = match stream.try_read(&mut dummy_byte) { Ok(n) => n, @@ -328,14 +380,14 @@ where Err(e) => return Err(Error::os_error(e, "try read failed")), }; - debug!("msg read."); + log::debug!("msg read."); if n_read == 0 { - debug!("No dummy byte received..."); + log::debug!("No dummy byte received..."); return Ok(None); // Nothing to read from this stream } - debug!("Received dummy byte!"); + log::debug!("Received dummy byte!"); // we should always read the dummy byte at this point. assert_eq!(u8::from_le_bytes(dummy_byte), DUMMY_BYTE); @@ -368,15 +420,15 @@ where let msg_len = u32::to_le_bytes(serialized_msg.len() as u32); // 0. Write the dummy byte - debug!("Sending dummy byte"); + log::debug!("Sending dummy byte"); stream.write_all(&[DUMMY_BYTE]).await?; // 1. Write msg size - debug!("Sending msg len"); + log::debug!("Sending msg len"); stream.write_all(&msg_len).await?; // 2. Write msg - debug!("Sending msg"); + log::debug!("Sending msg"); stream.write_all(serialized_msg).await?; Ok(()) @@ -386,17 +438,17 @@ where &mut self, stream: &mut TcpStream, ) -> Result<(), Error> { - debug!("Send old events to new child..."); + log::debug!("Send old events to new child..."); for old_msg in &self.old_msgs { let event_ref: MultiMachineMsg = MultiMachineMsg::llmp_msg(OwnedRef::Ref(old_msg.as_slice())); - debug!("Sending an old message..."); + log::debug!("Sending an old message..."); Self::write_msg(stream, &event_ref).await?; - debug!("Old message sent."); + log::debug!("Old message sent."); } - debug!("Sent {} old messages.", self.old_msgs.len()); + log::debug!("Sent {} old messages.", self.old_msgs.len()); Ok(()) } @@ -405,7 +457,7 @@ where &mut self, msg: &MultiMachineMsg<'a, I>, ) -> Result<(), Error> { - debug!("Sending interesting events to nodes..."); + log::debug!("Sending interesting events to nodes..."); if self .node_descriptor @@ -413,10 +465,12 @@ where .intersects(NodePolicy::SendToParent) { if let Some(parent) = &mut self.parent { - debug!("Sending to parent..."); + log::debug!("Sending to parent..."); if let Err(e) = Self::write_msg(parent, msg).await { - error!("The parent disconnected. We won't try to communicate with it again."); - error!("Error: {:?}", e); + log::error!( + "The parent disconnected. We won't try to communicate with it again." + ); + log::error!("Error: {e:?}"); self.parent.take(); } } @@ -429,17 +483,19 @@ where { let mut ids_to_remove: Vec = Vec::new(); for (child_id, child_stream) in &mut self.children { - debug!("Sending to child..."); + log::debug!("Sending to child..."); if (Self::write_msg(child_stream, msg).await).is_err() { // most likely the child disconnected. drop the connection later on and continue. - debug!("The child disconnected. We won't try to communicate with it again."); + log::debug!( + "The child disconnected. We won't try to communicate with it again." + ); ids_to_remove.push(*child_id); } } // Garbage collect disconnected children for id_to_remove in &ids_to_remove { - debug!("Child {:?} has been garbage collected.", id_to_remove); + log::debug!("Child {:?} has been garbage collected.", id_to_remove); self.children.remove(id_to_remove); } } @@ -453,7 +509,7 @@ where &mut self, msgs: &mut Vec>, ) -> Result<(), Error> { - debug!("Checking for new events from other nodes..."); + log::debug!("Checking for new events from other nodes..."); let mut nb_received = 0usize; // Our (potential) parent could have something for us @@ -464,10 +520,10 @@ where return Ok(()); } - debug!("Receiving from parent..."); + log::debug!("Receiving from parent..."); match Self::read_msg(parent).await { Ok(Some(msg)) => { - debug!("Received event from parent"); + log::debug!("Received event from parent"); // The parent has something for us, we store it msgs.push(msg); nb_received += 1; @@ -475,13 +531,13 @@ where Ok(None) => { // nothing from the parent, we continue - debug!("Nothing from parent"); + log::debug!("Nothing from parent"); break; } Err(Error::OsError(_, _, _)) => { // most likely the parent disconnected. drop the connection - debug!( + log::debug!( "The parent disconnected. We won't try to communicate with it again." ); self.parent.take(); @@ -489,7 +545,7 @@ where } Err(e) => { - debug!("An error occurred and was not expected."); + log::debug!("An error occurred and was not expected."); return Err(e); } } @@ -498,7 +554,7 @@ where // What about the (potential) children? let mut ids_to_remove: Vec = Vec::new(); - debug!( + log::debug!( "[pid {}] Nb children: {}", process::id(), self.children.len() @@ -510,34 +566,34 @@ where return Ok(()); } - debug!("Receiving from child {:?}...", child_id); + log::debug!("Receiving from child {:?}...", child_id); match Self::read_msg(child_stream).await { Ok(Some(msg)) => { // The parent has something for us, we store it - debug!("Received event from child!"); + log::debug!("Received event from child!"); msgs.push(msg); nb_received += 1; } Ok(None) => { // nothing from the parent, we continue - debug!("Nothing from child"); + log::debug!("Nothing from child"); break; } Err(Error::OsError(e, _, _)) => { // most likely the parent disconnected. drop the connection - error!( + log::error!( "The parent disconnected. We won't try to communicate with it again." ); - error!("Error: {:?}", e); + log::error!("Error: {e:?}"); ids_to_remove.push(*child_id); break; } Err(e) => { // Other error - debug!("An error occurred and was not expected."); + log::debug!("An error occurred and was not expected."); return Err(e); } } @@ -546,7 +602,7 @@ where // Garbage collect disconnected children for id_to_remove in &ids_to_remove { - debug!("Child {:?} has been garbage collected.", id_to_remove); + log::debug!("Child {:?} has been garbage collected.", id_to_remove); self.children.remove(id_to_remove); } diff --git a/libafl/src/events/tcp.rs b/libafl/src/events/tcp.rs new file mode 100644 index 0000000000..91e45a0dda --- /dev/null +++ b/libafl/src/events/tcp.rs @@ -0,0 +1,1364 @@ +//! TCP-backed event manager for scalable multi-processed fuzzing + +use alloc::{boxed::Box, vec::Vec}; +#[cfg(all(unix, feature = "std", not(miri)))] +use core::ptr::addr_of_mut; +use core::{ + marker::PhantomData, + num::NonZeroUsize, + sync::atomic::{compiler_fence, Ordering}, + time::Duration, +}; +use std::{ + env, + io::{ErrorKind, Read, Write}, + net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs}, + sync::Arc, +}; + +#[cfg(feature = "tcp_compression")] +use libafl_bolts::compress::GzipCompressor; +#[cfg(feature = "std")] +use libafl_bolts::core_affinity::CoreId; +#[cfg(all(feature = "std", any(windows, not(feature = "fork"))))] +use libafl_bolts::os::startable_self; +#[cfg(all(unix, feature = "std", not(miri)))] +use libafl_bolts::os::unix_signals::setup_signal_handler; +#[cfg(feature = "std")] +use libafl_bolts::os::CTRL_C_EXIT; +#[cfg(all(feature = "std", feature = "fork", unix))] +use libafl_bolts::os::{fork, ForkResult}; +use libafl_bolts::{shmem::ShMemProvider, tuples::tuple_list, ClientId}; +#[cfg(feature = "std")] +use libafl_bolts::{shmem::StdShMemProvider, staterestore::StateRestorer}; +use serde::{de::DeserializeOwned, Deserialize}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + sync::{broadcast, broadcast::error::RecvError, mpsc}, + task::{spawn, JoinHandle}, +}; +#[cfg(feature = "std")] +use typed_builder::TypedBuilder; + +use super::{CustomBufEventResult, CustomBufHandlerFn}; +#[cfg(all(unix, feature = "std", not(miri)))] +use crate::events::EVENTMGR_SIGHANDLER_STATE; +use crate::{ + events::{ + BrokerEventResult, Event, EventConfig, EventFirer, EventManager, EventManagerHooksTuple, + EventManagerId, EventProcessor, EventRestarter, HasCustomBufHandlers, HasEventManagerId, + ProgressReporter, + }, + executors::{Executor, HasObservers}, + fuzzer::{EvaluatorObservers, ExecutionProcessor}, + inputs::{Input, UsesInput}, + monitors::Monitor, + state::{HasExecutions, HasLastReportTime, State, UsesState}, + Error, HasMetadata, +}; + +/// Tries to create (synchronously) a [`TcpListener`] that is `nonblocking` (for later use in tokio). +/// Will error if the port is already in use (or other errors occur) +fn create_nonblocking_listener(addr: A) -> Result { + let listener = TcpListener::bind(addr)?; + listener.set_nonblocking(true)?; + Ok(listener) +} + +/// An TCP-backed event manager for simple multi-processed fuzzing +#[derive(Debug)] +pub struct TcpEventBroker +where + I: Input, + MT: Monitor, + //CE: CustomEvent, +{ + monitor: MT, + /// A `nonblocking` [`TcpListener`] that we will `take` and convert to a Tokio listener in [`Self::broker_loop()`]. + listener: Option, + /// Amount of all clients ever, after which (when all are disconnected) this broker should quit. + exit_cleanly_after: Option, + phantom: PhantomData, +} + +const UNDEFINED_CLIENT_ID: ClientId = ClientId(0xffffffff); + +impl TcpEventBroker +where + I: Input, + MT: Monitor, +{ + /// Create a TCP broker, listening on the given address. + pub fn new(addr: A, monitor: MT) -> Result { + Ok(Self::with_listener( + create_nonblocking_listener(addr)?, + monitor, + )) + } + + /// Create a TCP broker, with a listener that needs to already be bound to an address. + pub fn with_listener(listener: TcpListener, monitor: MT) -> Self { + Self { + listener: Some(listener), + monitor, + phantom: PhantomData, + exit_cleanly_after: None, + } + } + + /// Exit the broker process cleanly after at least `n` clients attached and all of them disconnected again + pub fn set_exit_cleanly_after(&mut self, n_clients: NonZeroUsize) { + self.exit_cleanly_after = Some(n_clients); + } + + /// Run in the broker until all clients exit + #[tokio::main(flavor = "current_thread")] + #[allow(clippy::too_many_lines)] + pub async fn broker_loop(&mut self) -> Result<(), Error> { + let (tx_bc, rx) = broadcast::channel(65536); + let (tx, mut rx_mpsc) = mpsc::channel(65536); + + let exit_cleanly_after = self.exit_cleanly_after; + + let listener = self + .listener + .take() + .ok_or_else(|| Error::illegal_state("Listener has already been used / was none"))?; + let listener = tokio::net::TcpListener::from_std(listener)?; + + let tokio_broker = spawn(async move { + let mut recv_handles: Vec> = vec![]; + let mut receivers: Vec>>> = vec![]; + + loop { + let mut reached_max = false; + if let Some(max_clients) = exit_cleanly_after { + if max_clients.get() <= recv_handles.len() { + // we waited for all the clients we wanted to see attached. Now wait for them to close their tcp connections. + reached_max = true; + } + } + + // Asynchronously wait for an inbound socket. + let (socket, _) = listener.accept().await.expect("Accept failed"); + let (mut read, mut write) = tokio::io::split(socket); + + // Protocol: the new client communicate its old ClientId or -1 if new + let mut this_client_id = [0; 4]; + read.read_exact(&mut this_client_id) + .await + .expect("Socket closed?"); + let this_client_id = ClientId(u32::from_le_bytes(this_client_id)); + + let (this_client_id, is_old) = if this_client_id == UNDEFINED_CLIENT_ID { + if reached_max { + (UNDEFINED_CLIENT_ID, false) // Dumb id + } else { + // ClientIds for this broker start at 0. + (ClientId(recv_handles.len().try_into().unwrap()), false) + } + } else { + (this_client_id, true) + }; + + let this_client_id_bytes = this_client_id.0.to_le_bytes(); + + // Protocol: Send the client id for this node; + write.write_all(&this_client_id_bytes).await.unwrap(); + + if !is_old && reached_max { + continue; + } + + let tx_inner = tx.clone(); + + let handle = async move { + // In a loop, read data from the socket and write the data back. + loop { + let mut len_buf = [0; 4]; + + if read.read_exact(&mut len_buf).await.is_err() { + // The socket is closed, the client is restarting + log::info!("Socket closed, client restarting"); + return; + } + + let mut len = u32::from_le_bytes(len_buf); + // we forward the sender id as well, so we add 4 bytes to the message length + len += 4; + + log::debug!("TCP Manager - len +4 = {len:?}"); + + let mut buf = vec![0; len as usize]; + + if read + .read_exact(&mut buf) + .await + // .expect("Failed to read data from socket"); // TODO verify if we have to handle this error + .is_err() + { + // The socket is closed, the client is restarting + log::info!("Socket closed, client restarting"); + return; + } + + log::debug!("TCP Manager - len: {len:?} - {buf:?}"); + tx_inner.send(buf).await.expect("Could not send"); + } + }; + + let client_idx = this_client_id.0 as usize; + + // Keep all handles around. + if is_old { + recv_handles[client_idx].abort(); + recv_handles[client_idx] = spawn(handle); + } else { + recv_handles.push(spawn(handle)); + // Get old messages only if new + let rx_inner = Arc::new(tokio::sync::Mutex::new(rx.resubscribe())); + receivers.push(rx_inner.clone()); + } + + let rx_inner = receivers[client_idx].clone(); + + // The forwarding end. No need to keep a handle to this (TODO: unless they don't quit/get stuck?) + spawn(async move { + // In a loop, read data from the socket and write the data back. + loop { + let buf: Vec = match rx_inner.lock().await.recv().await { + Ok(buf) => buf, + Err(RecvError::Lagged(num)) => { + log::error!("Receiver lagged, skipping {num} messages"); + continue; + } + _ => panic!("Could not receive"), + }; + + log::debug!("TCP Manager - {buf:?}"); + + if buf.len() <= 4 { + log::warn!("We got no contents (or only the length) in a broadcast"); + continue; + } + + if buf[..4] == this_client_id_bytes { + log::debug!("TCP Manager - Not forwarding message from this very client ({this_client_id:?})." + ); + continue; + } + + // subtract 4 since the client_id isn't part of the actual message. + let len = u32::try_from(buf.len() - 4).unwrap(); + let len_buf: [u8; 4] = len.to_le_bytes(); + + // Write message length + if write.write_all(&len_buf).await.is_err() { + // The socket is closed, the client is restarting + log::info!("Socket closed, client restarting"); + return; + } + // Write the rest + if write.write_all(&buf).await.is_err() { + // The socket is closed, the client is restarting + log::info!("Socket closed, client restarting"); + return; + } + } + }); + } + + /*log::info!("Joining handles.."); + // wait for all clients to exit/error out + for recv_handle in recv_handles { + drop(recv_handle.await); + }*/ + }); + + loop { + let buf = rx_mpsc.recv().await.expect("Could not receive"); + + // read client ID. + let mut client_id_buf = [0_u8; 4]; + client_id_buf.copy_from_slice(&buf[..4]); + + let client_id = ClientId(u32::from_le_bytes(client_id_buf)); + + // cut off the ID. + let event_bytes = &buf[4..]; + + #[cfg(feature = "tcp_compression")] + let event_bytes = GzipCompressor::new().decompress(event_bytes)?; + + #[allow(clippy::needless_borrow)] // make decompressed vec and slice compatible + let event: Event = postcard::from_bytes(&event_bytes)?; + match Self::handle_in_broker(&mut self.monitor, client_id, &event)? { + BrokerEventResult::Forward => { + tx_bc.send(buf).expect("Could not send"); + } + BrokerEventResult::Handled => (), + } + + if tokio_broker.is_finished() { + tokio_broker.await.unwrap(); + break; + } + } + log::info!("TCP Manager - The last client quit. Exiting."); + + Err(Error::shutting_down()) + } + + /// Handle arriving events in the broker + #[allow(clippy::unnecessary_wraps)] + fn handle_in_broker( + monitor: &mut MT, + client_id: ClientId, + event: &Event, + ) -> Result { + match &event { + Event::NewTestcase { + corpus_size, + time, + executions, + forward_id, + .. + } => { + let id = if let Some(id) = *forward_id { + id + } else { + client_id + }; + monitor.client_stats_insert(id); + let client = monitor.client_stats_mut_for(id); + client.update_corpus_size(*corpus_size as u64); + client.update_executions(*executions, *time); + monitor.display(event.name(), id); + Ok(BrokerEventResult::Forward) + } + Event::UpdateExecStats { + time, + executions, + phantom: _, + } => { + // TODO: The monitor buffer should be added on client add. + monitor.client_stats_insert(client_id); + let client = monitor.client_stats_mut_for(client_id); + client.update_executions(*executions, *time); + monitor.display(event.name(), client_id); + Ok(BrokerEventResult::Handled) + } + Event::UpdateUserStats { + name, + value, + phantom: _, + } => { + monitor.client_stats_insert(client_id); + let client = monitor.client_stats_mut_for(client_id); + client.update_user_stats(name.clone(), value.clone()); + monitor.aggregate(name); + monitor.display(event.name(), client_id); + Ok(BrokerEventResult::Handled) + } + #[cfg(feature = "introspection")] + Event::UpdatePerfMonitor { + time, + executions, + introspection_monitor, + phantom: _, + } => { + // TODO: The monitor buffer should be added on client add. + + // Get the client for the staterestorer ID + monitor.client_stats_insert(client_id); + let client = monitor.client_stats_mut_for(client_id); + + // Update the normal monitor for this client + client.update_executions(*executions, *time); + + // Update the performance monitor for this client + client.update_introspection_monitor((**introspection_monitor).clone()); + + // Display the monitor via `.display` only on core #1 + monitor.display(event.name(), client_id); + + // Correctly handled the event + Ok(BrokerEventResult::Handled) + } + Event::Objective { + objective_size, + executions, + time, + } => { + monitor.client_stats_insert(client_id); + let client = monitor.client_stats_mut_for(client_id); + client.update_objective_size(*objective_size as u64); + client.update_executions(*executions, *time); + monitor.display(event.name(), client_id); + Ok(BrokerEventResult::Handled) + } + Event::Log { + severity_level, + message, + phantom: _, + } => { + let (_, _) = (severity_level, message); + // TODO rely on Monitor + log::log!((*severity_level).into(), "{message}"); + Ok(BrokerEventResult::Handled) + } + Event::CustomBuf { .. } => Ok(BrokerEventResult::Forward), + //_ => Ok(BrokerEventResult::Forward), + } + } +} + +/// An [`EventManager`] that forwards all events to other attached via tcp. +pub struct TcpEventManager +where + EMH: EventManagerHooksTuple, + S: State, +{ + /// We send message every `throttle` second + throttle: Option, + /// When we sent the last message + last_sent: Duration, + hooks: EMH, + /// The TCP stream for inter process communication + tcp: TcpStream, + /// Our `CientId` + client_id: ClientId, + /// The custom buf handler + custom_buf_handlers: Vec>>, + #[cfg(feature = "tcp_compression")] + compressor: GzipCompressor, + /// The configuration defines this specific fuzzer. + /// A node will not re-use the observer values sent over TCP + /// from nodes with other configurations. + configuration: EventConfig, + phantom: PhantomData, +} + +impl TcpEventManager<(), S> +where + S: State, +{ + /// Create a builder for [`TcpEventManager`] + #[must_use] + pub fn builder() -> TcpEventManagerBuilder<(), S> { + TcpEventManagerBuilder::new() + } +} + +/// Builder for `TcpEventManager` +#[derive(Debug, Copy, Clone)] +pub struct TcpEventManagerBuilder { + throttle: Option, + hooks: EMH, + phantom: PhantomData, +} + +impl Default for TcpEventManagerBuilder<(), S> { + fn default() -> Self { + Self::new() + } +} + +impl TcpEventManagerBuilder<(), S> { + /// Set the constructor + #[must_use] + pub fn new() -> Self { + Self { + throttle: None, + hooks: (), + phantom: PhantomData, + } + } + + /// Set the hooks + #[must_use] + pub fn hooks(self, hooks: EMH) -> TcpEventManagerBuilder { + TcpEventManagerBuilder { + throttle: self.throttle, + hooks, + phantom: PhantomData, + } + } +} + +impl TcpEventManagerBuilder +where + EMH: EventManagerHooksTuple, + S: State + HasExecutions + HasMetadata, +{ + /// Set the throttle + #[must_use] + pub fn throttle(mut self, throttle: Duration) -> Self { + self.throttle = Some(throttle); + self + } + + /// Create a manager from a raw TCP client with hooks + pub fn build_from_client( + self, + addr: &A, + client_id: ClientId, + configuration: EventConfig, + ) -> Result, Error> { + let mut tcp = TcpStream::connect(addr)?; + + let mut our_client_id_buf = client_id.0.to_le_bytes(); + tcp.write_all(&our_client_id_buf) + .expect("Cannot write to the broker"); + + tcp.read_exact(&mut our_client_id_buf) + .expect("Cannot read from the broker"); + let client_id = ClientId(u32::from_le_bytes(our_client_id_buf)); + + log::info!("Our client id: {client_id:?}"); + + Ok(TcpEventManager { + throttle: self.throttle, + last_sent: Duration::from_secs(0), + hooks: self.hooks, + tcp, + client_id, + #[cfg(feature = "tcp_compression")] + compressor: GzipCompressor::new(), + configuration, + phantom: PhantomData, + custom_buf_handlers: vec![], + }) + } + + /// Create an TCP event manager on a port specifying the client id with hooks + /// + /// If the port is not yet bound, it will act as a broker; otherwise, it + /// will act as a client. + pub fn build_on_port( + self, + port: u16, + client_id: ClientId, + configuration: EventConfig, + ) -> Result, Error> { + Self::build_from_client(self, &("127.0.0.1", port), client_id, configuration) + } + + /// Create an TCP event manager on a port specifying the client id from env with hooks + /// + /// If the port is not yet bound, it will act as a broker; otherwise, it + /// will act as a client. + pub fn build_existing_from_env( + self, + addr: &A, + env_name: &str, + configuration: EventConfig, + ) -> Result, Error> { + let this_id = ClientId(str::parse::(&env::var(env_name)?)?); + Self::build_from_client(self, addr, this_id, configuration) + } +} + +impl core::fmt::Debug for TcpEventManager +where + EMH: EventManagerHooksTuple, + S: State, +{ + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let mut debug_struct = f.debug_struct("TcpEventManager"); + let debug = debug_struct.field("tcp", &self.tcp); + //.field("custom_buf_handlers", &self.custom_buf_handlers) + #[cfg(feature = "tcp_compression")] + let debug = debug.field("compressor", &self.compressor); + debug + .field("configuration", &self.configuration) + .field("phantom", &self.phantom) + .finish_non_exhaustive() + } +} + +impl Drop for TcpEventManager +where + EMH: EventManagerHooksTuple, + S: State, +{ + /// TCP clients will have to wait until their pages are mapped by somebody. + fn drop(&mut self) { + self.await_restart_safe(); + } +} + +impl TcpEventManager +where + EMH: EventManagerHooksTuple, + S: State + HasExecutions + HasMetadata, +{ + /// Write the client id for a client [`EventManager`] to env vars + pub fn to_env(&self, env_name: &str) { + env::set_var(env_name, format!("{}", self.client_id.0)); + } + + // Handle arriving events in the client + #[allow(clippy::unused_self)] + fn handle_in_client( + &mut self, + fuzzer: &mut Z, + executor: &mut E, + state: &mut S, + client_id: ClientId, + event: Event, + ) -> Result<(), Error> + where + E: Executor + HasObservers, + for<'a> E::Observers: Deserialize<'a>, + Z: ExecutionProcessor + EvaluatorObservers, + { + if !self.hooks.pre_exec_all(state, client_id, &event)? { + return Ok(()); + } + match event { + Event::NewTestcase { + input, + client_config, + exit_kind, + observers_buf, + forward_id, + .. + } => { + log::info!("Received new Testcase from {client_id:?} ({client_config:?}, forward {forward_id:?})"); + + let _res = if client_config.match_with(&self.configuration) + && observers_buf.is_some() + { + let observers: E::Observers = + postcard::from_bytes(observers_buf.as_ref().unwrap())?; + #[cfg(feature = "scalability_introspection")] + { + state.scalability_monitor_mut().testcase_with_observers += 1; + } + fuzzer.execute_and_process(state, self, input, &observers, &exit_kind, false)? + } else { + #[cfg(feature = "scalability_introspection")] + { + state.scalability_monitor_mut().testcase_without_observers += 1; + } + fuzzer.evaluate_input_with_observers::( + state, executor, self, input, false, + )? + }; + if let Some(item) = _res.1 { + log::info!("Added received Testcase as item #{item}"); + } + } + Event::CustomBuf { tag, buf } => { + for handler in &mut self.custom_buf_handlers { + if handler(state, &tag, &buf)? == CustomBufEventResult::Handled { + break; + } + } + } + _ => { + return Err(Error::unknown(format!( + "Received illegal message that message should not have arrived: {:?}.", + event.name() + ))) + } + } + self.hooks.post_exec_all(state, client_id)?; + Ok(()) + } +} + +impl TcpEventManager +where + EMH: EventManagerHooksTuple, + S: State, +{ + /// Send information that this client is exiting. + /// The other side may free up all allocated memory. + /// We are no longer allowed to send anything afterwards. + pub fn send_exiting(&mut self) -> Result<(), Error> { + //TODO: Should not be needed since TCP does that for us + //self.tcp.sender.send_exiting() + Ok(()) + } +} + +impl UsesState for TcpEventManager +where + EMH: EventManagerHooksTuple, + S: State, +{ + type State = S; +} + +impl EventFirer for TcpEventManager +where + EMH: EventManagerHooksTuple, + S: State, +{ + fn should_send(&self) -> bool { + if let Some(throttle) = self.throttle { + libafl_bolts::current_time() - self.last_sent > throttle + } else { + true + } + } + + fn fire( + &mut self, + _state: &mut Self::State, + event: Event<::Input>, + ) -> Result<(), Error> { + let serialized = postcard::to_allocvec(&event)?; + + #[cfg(feature = "tcp_compression")] + let serialized = self.compressor.compress(&serialized); + + let size = u32::try_from(serialized.len())?; + self.tcp.write_all(&size.to_le_bytes())?; + self.tcp.write_all(&self.client_id.0.to_le_bytes())?; + self.tcp.write_all(&serialized)?; + + self.last_sent = libafl_bolts::current_time(); + Ok(()) + } + + fn configuration(&self) -> EventConfig { + self.configuration + } +} + +impl EventRestarter for TcpEventManager +where + EMH: EventManagerHooksTuple, + S: State, +{ + /// The TCP client needs to wait until a broker has mapped all pages before shutting down. + /// Otherwise, the OS may already have removed the shared maps. + fn await_restart_safe(&mut self) { + // wait until we can drop the message safely. + //self.tcp.await_safe_to_unmap_blocking(); + } +} + +impl EventProcessor for TcpEventManager +where + E: HasObservers + Executor, + for<'a> E::Observers: Deserialize<'a>, + EMH: EventManagerHooksTuple, + S: State + HasExecutions + HasMetadata, + Z: EvaluatorObservers + ExecutionProcessor, +{ + fn process( + &mut self, + fuzzer: &mut Z, + state: &mut Self::State, + executor: &mut E, + ) -> Result { + // TODO: Get around local event copy by moving handle_in_client + let self_id = self.client_id; + let mut len_buf = [0_u8; 4]; + let mut count = 0; + + self.tcp.set_nonblocking(true).expect("set to non-blocking"); + + // read all pending messages + loop { + match self.tcp.read_exact(&mut len_buf) { + Ok(()) => { + self.tcp.set_nonblocking(false).expect("set to blocking"); + let len = u32::from_le_bytes(len_buf); + let mut buf = vec![0_u8; 4_usize + len as usize]; + self.tcp.read_exact(&mut buf)?; + + let mut client_id_buf = [0_u8; 4]; + client_id_buf.copy_from_slice(&buf[..4]); + + let other_client_id = ClientId(u32::from_le_bytes(client_id_buf)); + + self.tcp.set_nonblocking(true).expect("set to non-blocking"); + if self_id == other_client_id { + panic!("Own ID should never have been sent by the broker"); + } else { + log::info!("{self_id:?} (from {other_client_id:?}) Received: {buf:?}"); + + let buf = &buf[4..]; + #[cfg(feature = "tcp_compression")] + let buf = self.compressor.decompress(buf)?; + + // make decompressed vec and slice compatible + #[allow(clippy::needless_borrow)] + let event = postcard::from_bytes(&buf)?; + + self.handle_in_client(fuzzer, executor, state, other_client_id, event)?; + count += 1; + } + } + + Err(e) if e.kind() == ErrorKind::WouldBlock => { + // no new data on the socket + break; + } + Err(e) => { + panic!("Unexpected error {e:?}"); + } + } + } + self.tcp.set_nonblocking(false).expect("set to blocking"); + + Ok(count) + } +} + +impl EventManager for TcpEventManager +where + E: HasObservers + Executor, + for<'a> E::Observers: Deserialize<'a>, + EMH: EventManagerHooksTuple, + S: State + HasExecutions + HasMetadata + HasLastReportTime, + Z: EvaluatorObservers + ExecutionProcessor, +{ +} + +impl HasCustomBufHandlers for TcpEventManager +where + EMH: EventManagerHooksTuple, + S: State, +{ + fn add_custom_buf_handler( + &mut self, + handler: Box Result>, + ) { + self.custom_buf_handlers.push(handler); + } +} + +impl ProgressReporter for TcpEventManager +where + EMH: EventManagerHooksTuple, + S: State + HasExecutions + HasMetadata + HasLastReportTime, +{ +} + +impl HasEventManagerId for TcpEventManager +where + EMH: EventManagerHooksTuple, + S: State, +{ + /// Gets the id assigned to this staterestorer. + fn mgr_id(&self) -> EventManagerId { + EventManagerId(self.client_id.0 as usize) + } +} + +/// A manager that can restart on the fly, storing states in-between (in `on_restart`) +#[cfg(feature = "std")] +#[derive(Debug)] +pub struct TcpRestartingEventManager +where + EMH: EventManagerHooksTuple, + S: State, + SP: ShMemProvider + 'static, + //CE: CustomEvent, +{ + /// The embedded TCP event manager + tcp_mgr: TcpEventManager, + /// The staterestorer to serialize the state for the next runner + staterestorer: StateRestorer, + /// Decide if the state restorer must save the serialized state + save_state: bool, +} + +#[cfg(feature = "std")] +impl UsesState for TcpRestartingEventManager +where + EMH: EventManagerHooksTuple, + S: State, + SP: ShMemProvider + 'static, +{ + type State = S; +} + +#[cfg(feature = "std")] +impl ProgressReporter for TcpRestartingEventManager +where + EMH: EventManagerHooksTuple, + S: State + HasExecutions + HasMetadata + HasLastReportTime, + SP: ShMemProvider, +{ +} + +#[cfg(feature = "std")] +impl EventFirer for TcpRestartingEventManager +where + EMH: EventManagerHooksTuple, + SP: ShMemProvider, + S: State, + //CE: CustomEvent, +{ + fn should_send(&self) -> bool { + self.tcp_mgr.should_send() + } + + fn fire( + &mut self, + state: &mut Self::State, + event: Event<::Input>, + ) -> Result<(), Error> { + // Check if we are going to crash in the event, in which case we store our current state for the next runner + self.tcp_mgr.fire(state, event) + } + + fn configuration(&self) -> EventConfig { + self.tcp_mgr.configuration() + } +} + +#[cfg(feature = "std")] +impl EventRestarter for TcpRestartingEventManager +where + EMH: EventManagerHooksTuple, + S: State + HasExecutions, + SP: ShMemProvider, + //CE: CustomEvent, +{ + /// The tcp client needs to wait until a broker mapped all pages, before shutting down. + /// Otherwise, the OS may already have removed the shared maps, + #[inline] + fn await_restart_safe(&mut self) { + self.tcp_mgr.await_restart_safe(); + } + + /// Reset the single page (we reuse it over and over from pos 0), then send the current state to the next runner. + fn on_restart(&mut self, state: &mut S) -> Result<(), Error> { + state.on_restart()?; + + // First, reset the page to 0 so the next iteration can read read from the beginning of this page + self.staterestorer.reset(); + self.staterestorer.save(&if self.save_state { + Some((state, self.tcp_mgr.client_id)) + } else { + None + })?; + + self.await_restart_safe(); + Ok(()) + } + + fn send_exiting(&mut self) -> Result<(), Error> { + self.staterestorer.send_exiting(); + // Also inform the broker that we are about to exit. + // This way, the broker can clean up the pages, and eventually exit. + self.tcp_mgr.send_exiting() + } +} + +#[cfg(feature = "std")] +impl EventProcessor for TcpRestartingEventManager +where + E: HasObservers + Executor, Z>, + for<'a> E::Observers: Deserialize<'a>, + EMH: EventManagerHooksTuple, + S: State + HasExecutions + HasMetadata, + SP: ShMemProvider + 'static, + Z: EvaluatorObservers + ExecutionProcessor, //CE: CustomEvent, +{ + fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result { + self.tcp_mgr.process(fuzzer, state, executor) + } +} + +#[cfg(feature = "std")] +impl EventManager for TcpRestartingEventManager +where + E: HasObservers + Executor, Z>, + for<'a> E::Observers: Deserialize<'a>, + EMH: EventManagerHooksTuple, + S: State + HasExecutions + HasMetadata + HasLastReportTime, + SP: ShMemProvider + 'static, + Z: EvaluatorObservers + ExecutionProcessor, //CE: CustomEvent, +{ +} + +#[cfg(feature = "std")] +impl HasEventManagerId for TcpRestartingEventManager +where + EMH: EventManagerHooksTuple, + S: State, + SP: ShMemProvider + 'static, +{ + fn mgr_id(&self) -> EventManagerId { + self.tcp_mgr.mgr_id() + } +} + +/// The tcp connection from the actual fuzzer to the process supervising it +const _ENV_FUZZER_SENDER: &str = "_AFL_ENV_FUZZER_SENDER"; +const _ENV_FUZZER_RECEIVER: &str = "_AFL_ENV_FUZZER_RECEIVER"; +/// The tcp (2 way) connection from a fuzzer to the broker (broadcasting all other fuzzer messages) +const _ENV_FUZZER_BROKER_CLIENT_INITIAL: &str = "_AFL_ENV_FUZZER_BROKER_CLIENT"; + +#[cfg(feature = "std")] +impl TcpRestartingEventManager +where + EMH: EventManagerHooksTuple, + S: State, + SP: ShMemProvider + 'static, + //CE: CustomEvent, +{ + /// Create a new runner, the executed child doing the actual fuzzing. + pub fn new(tcp_mgr: TcpEventManager, staterestorer: StateRestorer) -> Self { + Self { + tcp_mgr, + staterestorer, + save_state: true, + } + } + + /// Create a new runner specifying if it must save the serialized state on restart. + pub fn with_save_state( + tcp_mgr: TcpEventManager, + staterestorer: StateRestorer, + save_state: bool, + ) -> Self { + Self { + tcp_mgr, + staterestorer, + save_state, + } + } + + /// Get the staterestorer + pub fn staterestorer(&self) -> &StateRestorer { + &self.staterestorer + } + + /// Get the staterestorer (mutable) + pub fn staterestorer_mut(&mut self) -> &mut StateRestorer { + &mut self.staterestorer + } +} + +/// The kind of manager we're creating right now +#[cfg(feature = "std")] +#[derive(Debug, Clone, Copy)] +pub enum TcpManagerKind { + /// Any kind will do + Any, + /// A client, getting messages from a local broker. + Client { + /// The CPU core ID of this client + cpu_core: Option, + }, + /// A broker, forwarding all packets of local clients via TCP. + Broker, +} + +/// Sets up a restarting fuzzer, using the [`StdShMemProvider`], and standard features. +/// The restarting mgr is a combination of restarter and runner, that can be used on systems with and without `fork` support. +/// The restarter will spawn a new process each time the child crashes or timeouts. +#[cfg(feature = "std")] +#[allow(clippy::type_complexity)] +pub fn setup_restarting_mgr_tcp( + monitor: MT, + broker_port: u16, + configuration: EventConfig, +) -> Result< + ( + Option, + TcpRestartingEventManager<(), S, StdShMemProvider>, + ), + Error, +> +where + MT: Monitor + Clone, + S: State + HasExecutions + HasMetadata, +{ + TcpRestartingMgr::builder() + .shmem_provider(StdShMemProvider::new()?) + .monitor(Some(monitor)) + .broker_port(broker_port) + .configuration(configuration) + .hooks(tuple_list!()) + .build() + .launch() +} + +/// Provides a `builder` which can be used to build a [`TcpRestartingMgr`], which is a combination of a +/// `restarter` and `runner`, that can be used on systems both with and without `fork` support. The +/// `restarter` will start a new process each time the child crashes or times out. +#[cfg(feature = "std")] +#[allow(clippy::default_trait_access, clippy::ignored_unit_patterns)] +#[derive(TypedBuilder, Debug)] +pub struct TcpRestartingMgr +where + S: UsesInput + DeserializeOwned, + SP: ShMemProvider + 'static, + MT: Monitor, + //CE: CustomEvent, +{ + /// The shared memory provider to use for the broker or client spawned by the restarting + /// manager. + shmem_provider: SP, + /// The configuration + configuration: EventConfig, + /// The monitor to use + #[builder(default = None)] + monitor: Option, + /// The broker port to use + #[builder(default = 1337_u16)] + broker_port: u16, + /// The address to connect to + #[builder(default = None)] + remote_broker_addr: Option, + /// The type of manager to build + #[builder(default = TcpManagerKind::Any)] + kind: TcpManagerKind, + /// The amount of external clients that should have connected (not counting our own tcp client) + /// before this broker quits _after the last client exited_. + /// If `None`, the broker will never quit when the last client exits, but run forever. + /// + /// So, if this value is `Some(2)`, the broker will not exit after client 1 connected and disconnected, + /// but it will quit after client 2 connected and disconnected. + #[builder(default = None)] + exit_cleanly_after: Option, + /// Tell the manager to serialize or not the state on restart + #[builder(default = true)] + serialize_state: bool, + /// The hooks for `handle_in_client` + hooks: EMH, + #[builder(setter(skip), default = PhantomData)] + phantom_data: PhantomData, +} + +#[cfg(feature = "std")] +#[allow(clippy::type_complexity, clippy::too_many_lines)] +impl TcpRestartingMgr +where + EMH: EventManagerHooksTuple + Copy + Clone, + SP: ShMemProvider, + S: State + HasExecutions + HasMetadata, + MT: Monitor + Clone, +{ + /// Launch the restarting manager + pub fn launch(&mut self) -> Result<(Option, TcpRestartingEventManager), Error> { + // We start ourself as child process to actually fuzz + let (staterestorer, _new_shmem_provider, core_id) = if env::var(_ENV_FUZZER_SENDER).is_err() + { + let broker_things = |mut broker: TcpEventBroker, _remote_broker_addr| { + if let Some(exit_cleanly_after) = self.exit_cleanly_after { + broker.set_exit_cleanly_after(exit_cleanly_after); + } + + broker.broker_loop() + }; + + // We get here if we are on Unix, or we are a broker on Windows (or without forks). + let (mgr, core_id) = match self.kind { + TcpManagerKind::Any => { + let connection = create_nonblocking_listener(("127.0.0.1", self.broker_port)); + match connection { + Ok(listener) => { + let event_broker = TcpEventBroker::::with_listener( + listener, + self.monitor.take().unwrap(), + ); + + // Yep, broker. Just loop here. + log::info!( + "Doing broker things. Run this tool again to start fuzzing in a client." + ); + + broker_things(event_broker, self.remote_broker_addr)?; + + return Err(Error::shutting_down()); + } + Err(Error::OsError(..)) => { + // port was likely already bound + let mgr = TcpEventManagerBuilder::new() + .hooks(self.hooks) + .build_from_client( + &("127.0.0.1", self.broker_port), + UNDEFINED_CLIENT_ID, + self.configuration, + )?; + (mgr, None) + } + Err(e) => { + return Err(e); + } + } + } + TcpManagerKind::Broker => { + let event_broker = TcpEventBroker::::new( + format!("127.0.0.1:{}", self.broker_port), + self.monitor.take().unwrap(), + )?; + + broker_things(event_broker, self.remote_broker_addr)?; + unreachable!("The broker may never return normally, only on errors or when shutting down."); + } + TcpManagerKind::Client { cpu_core } => { + // We are a client + let mgr = TcpEventManagerBuilder::new() + .hooks(self.hooks) + .build_on_port(self.broker_port, UNDEFINED_CLIENT_ID, self.configuration)?; + + (mgr, cpu_core) + } + }; + + if let Some(core_id) = core_id { + let core_id: CoreId = core_id; + log::info!("Setting core affinity to {core_id:?}"); + core_id.set_affinity()?; + } + + // We are the fuzzer respawner in a tcp client + mgr.to_env(_ENV_FUZZER_BROKER_CLIENT_INITIAL); + + // First, create a channel from the current fuzzer to the next to store state between restarts. + #[cfg(unix)] + let staterestorer: StateRestorer = + StateRestorer::new(self.shmem_provider.new_shmem(256 * 1024 * 1024)?); + + #[cfg(not(unix))] + let staterestorer: StateRestorer = + StateRestorer::new(self.shmem_provider.new_shmem(256 * 1024 * 1024)?); + // Store the information to a map. + staterestorer.write_to_env(_ENV_FUZZER_SENDER)?; + + let mut ctr: u64 = 0; + // Client->parent loop + loop { + log::info!("Spawning next client (id {ctr})"); + println!("Spawning next client (id {ctr}) {core_id:?}"); + + // On Unix, we fork (when fork feature is enabled) + #[cfg(all(unix, feature = "fork"))] + let child_status = { + self.shmem_provider.pre_fork()?; + match unsafe { fork() }? { + ForkResult::Parent(handle) => { + unsafe { + libc::signal(libc::SIGINT, libc::SIG_IGN); + } + self.shmem_provider.post_fork(false)?; + handle.status() + } + ForkResult::Child => { + self.shmem_provider.post_fork(true)?; + break (staterestorer, self.shmem_provider.clone(), core_id); + } + } + }; + + // If this guy wants to fork, then ignore sigit + #[cfg(any(windows, not(feature = "fork")))] + unsafe { + #[cfg(windows)] + libafl_bolts::os::windows_exceptions::signal( + libafl_bolts::os::windows_exceptions::SIGINT, + libafl_bolts::os::windows_exceptions::sig_ign(), + ); + + #[cfg(unix)] + libc::signal(libc::SIGINT, libc::SIG_IGN); + } + + // On Windows (or in any case without fork), we spawn ourself again + #[cfg(any(windows, not(feature = "fork")))] + let child_status = startable_self()?.status()?; + #[cfg(any(windows, not(feature = "fork")))] + let child_status = child_status.code().unwrap_or_default(); + + compiler_fence(Ordering::SeqCst); + + if child_status == CTRL_C_EXIT || staterestorer.wants_to_exit() { + return Err(Error::shutting_down()); + } + + #[allow(clippy::manual_assert)] + if !staterestorer.has_content() && self.serialize_state { + #[cfg(unix)] + if child_status == 137 { + // Out of Memory, see https://tldp.org/LDP/abs/html/exitcodes.html + // and https://github.com/AFLplusplus/LibAFL/issues/32 for discussion. + panic!("Fuzzer-respawner: The fuzzed target crashed with an out of memory error! Fix your harness, or switch to another executor (for example, a forkserver)."); + } + + // Storing state in the last round did not work + panic!("Fuzzer-respawner: Storing state in crashed fuzzer instance did not work, no point to spawn the next client! This can happen if the child calls `exit()`, in that case make sure it uses `abort()`, if it got killed unrecoverable (OOM), or if there is a bug in the fuzzer itself. (Child exited with: {child_status})"); + } + + ctr = ctr.wrapping_add(1); + } + } else { + // We are the newly started fuzzing instance (i.e. on Windows), first, connect to our own restore map. + // We get here *only on Windows*, if we were started by a restarting fuzzer. + // A staterestorer and a receiver for single communication + ( + StateRestorer::from_env(&mut self.shmem_provider, _ENV_FUZZER_SENDER)?, + self.shmem_provider.clone(), + None, + ) + }; + + // At this point we are the fuzzer *NOT* the restarter. + // We setup signal handlers to clean up shmem segments used by state restorer + #[cfg(all(unix, not(miri)))] + if let Err(_e) = unsafe { setup_signal_handler(addr_of_mut!(EVENTMGR_SIGHANDLER_STATE)) } { + // We can live without a proper ctrl+c signal handler. Print and ignore. + log::error!("Failed to setup signal handlers: {_e}"); + } + + if let Some(core_id) = core_id { + let core_id: CoreId = core_id; + core_id.set_affinity()?; + } + + // If we're restarting, deserialize the old state. + let (state, mut mgr) = if let Some((state_opt, this_id)) = staterestorer.restore()? { + ( + state_opt, + TcpRestartingEventManager::with_save_state( + TcpEventManagerBuilder::new() + .hooks(self.hooks) + .build_on_port(self.broker_port, this_id, self.configuration)?, + staterestorer, + self.serialize_state, + ), + ) + } else { + log::info!("First run. Let's set it all up"); + // Mgr to send and receive msgs from/to all other fuzzer instances + let mgr = TcpEventManagerBuilder::new() + .hooks(self.hooks) + .build_existing_from_env( + &("127.0.0.1", self.broker_port), + _ENV_FUZZER_BROKER_CLIENT_INITIAL, + self.configuration, + )?; + + ( + None, + TcpRestartingEventManager::with_save_state( + mgr, + staterestorer, + self.serialize_state, + ), + ) + }; + // We reset the staterestorer, the next staterestorer and receiver (after crash) will reuse the page from the initial message. + mgr.staterestorer.reset(); + + /* TODO: Not sure if this is needed + // We commit an empty NO_RESTART message to this buf, against infinite loops, + // in case something crashes in the fuzzer. + staterestorer.send_buf(_TCP_TAG_NO_RESTART, []); + */ + + Ok((state, mgr)) + } +} diff --git a/libafl/src/lib.rs b/libafl/src/lib.rs index 9dcf4de75f..88047386ae 100644 --- a/libafl/src/lib.rs +++ b/libafl/src/lib.rs @@ -32,7 +32,6 @@ Welcome to `LibAFL` clippy::similar_names, clippy::too_many_lines, clippy::into_iter_without_iter, // broken - clippy::type_complexity, )] #![cfg_attr(not(test), warn( missing_debug_implementations, diff --git a/libafl_bolts/src/lib.rs b/libafl_bolts/src/lib.rs index 7da88b1bef..bc966d107d 100644 --- a/libafl_bolts/src/lib.rs +++ b/libafl_bolts/src/lib.rs @@ -31,7 +31,6 @@ clippy::ptr_cast_constness, clippy::negative_feature_names, clippy::too_many_lines, - clippy::if_not_else )] #![cfg_attr(not(test), warn( missing_debug_implementations, diff --git a/libafl_bolts/src/llmp.rs b/libafl_bolts/src/llmp.rs index 8360c1da09..a45f12610a 100644 --- a/libafl_bolts/src/llmp.rs +++ b/libafl_bolts/src/llmp.rs @@ -87,7 +87,6 @@ use std::{ #[cfg(all(debug_assertions, feature = "llmp_debug", feature = "std"))] use backtrace::Backtrace; -use log::debug; #[cfg(all(unix, feature = "std"))] #[cfg(not(any(target_os = "solaris", target_os = "illumos")))] use nix::sys::socket::{self, sockopt::ReusePort}; @@ -1288,7 +1287,7 @@ where self.last_msg_sent = msg; self.has_unsent_message = false; - debug!( + log::debug!( "[{} - {:#x}] Send message with id {}", self.id.0, self as *const Self as u64, mid ); @@ -1702,7 +1701,7 @@ where return Err(Error::illegal_state("Unexpected message in map (out of map bounds) - buggy client or tampered shared map detected!")); } - debug!( + log::debug!( "[{} - {:#x}] Received message with ID {}...", self.id.0, self as *const Self as u64, @@ -2369,47 +2368,49 @@ impl Brokers { loop { self.llmp_brokers.retain_mut(|broker| { - if !broker.is_shutting_down() { - if current_milliseconds() > end_time { - broker - .on_timeout() - .expect("An error occurred in broker timeout. Exiting."); - end_time = current_milliseconds() + timeout; - } - - if broker - .broker_once() - .expect("An error occurred when brokering. Exiting.") - { - end_time = current_milliseconds() + timeout; - } - - if let Some(exit_after_count) = broker.exit_after() { - // log::trace!( - // "Clients connected: {} && > {} - {} >= {}", - // self.has_clients(), - // self.num_clients_seen, - // self.listeners.len(), - // exit_after_count - // ); - if !broker.has_clients() - && (broker.num_clients_seen() - broker.nb_listeners()) - >= exit_after_count.into() - { - // No more clients connected, and the amount of clients we were waiting for was previously connected. - // exit cleanly. - return false; - } - } + if broker.is_shutting_down() { - true - } else { broker.send_buf(LLMP_TAG_EXITING, &[]).expect( "Error when shutting down broker: Could not send LLMP_TAG_EXITING msg.", ); - false + return false + } + + if current_milliseconds() > end_time { + broker + .on_timeout() + .expect("An error occurred in broker timeout. Exiting."); + end_time = current_milliseconds() + timeout; + } + + if broker + .broker_once() + .expect("An error occurred when brokering. Exiting.") + { + end_time = current_milliseconds() + timeout; + } + + if let Some(exit_after_count) = broker.exit_after() { + // log::trace!( + // "Clients connected: {} && > {} - {} >= {}", + // self.has_clients(), + // self.num_clients_seen, + // self.listeners.len(), + // exit_after_count + // ); + if !broker.has_clients() + && (broker.num_clients_seen() - broker.nb_listeners()) + >= exit_after_count.into() + { + // No more clients connected, and the amount of clients we were waiting for was previously connected. + // exit cleanly. + return false; + } + } + + true }); if self.llmp_brokers.is_empty() { @@ -2769,7 +2770,7 @@ where self.inner.forward_msg(msg)?; } - debug!("New msg vector: {}", new_msgs.len()); + log::debug!("New msg vector: {}", new_msgs.len()); for (new_msg_tag, new_msg_flag, new_msg) in new_msgs { self.inner.llmp_out.send_buf_with_flags( new_msg_tag, @@ -3696,7 +3697,7 @@ where break stream; } - debug!("Connection Refused. Retrying..."); + log::debug!("Connection Refused. Retrying..."); #[cfg(feature = "std")] thread::sleep(Duration::from_millis(50)); diff --git a/libafl_qemu/libafl_qemu_sys/src/lib.rs b/libafl_qemu/libafl_qemu_sys/src/lib.rs index ab2bcb917a..6ddc509ccd 100644 --- a/libafl_qemu/libafl_qemu_sys/src/lib.rs +++ b/libafl_qemu/libafl_qemu_sys/src/lib.rs @@ -27,6 +27,7 @@ mod bindings { pub use bindings::*; #[cfg(any(feature = "clippy", not(target_os = "linux")))] +#[allow(dead_code)] #[rustfmt::skip] mod x86_64_stub_bindings; @@ -126,6 +127,7 @@ pub type GuestHwAddrInfo = crate::qemu_plugin_hwaddr; #[derive(Debug)] #[repr(C)] +#[cfg(target_os = "linux")] #[cfg_attr(feature = "python", pyclass(unsendable))] pub struct MapInfo { start: GuestAddr, @@ -213,6 +215,7 @@ extern_c_checked! { pub fn libafl_qemu_gdb_reply(buf: *const u8, len: usize); } +#[cfg(target_os = "linux")] #[cfg_attr(feature = "python", pymethods)] impl MapInfo { #[must_use]