diff --git a/libafl/Cargo.toml b/libafl/Cargo.toml index 2592a2c7c1..7a21b39727 100644 --- a/libafl/Cargo.toml +++ b/libafl/Cargo.toml @@ -142,6 +142,9 @@ unicode = ["libafl_bolts/alloc", "ahash/std", "serde/rc", "bitvec"] ## Enable multi-part input formats and mutators multipart_inputs = ["arrayvec", "rand_trait"] +## Share objectives across nodes +share_objectives = [] + #! ## LibAFL-Bolts Features ## Provide the `#[derive(SerdeAny)]` macro. diff --git a/libafl/src/events/centralized.rs b/libafl/src/events/centralized.rs index e0b1a441f5..e73090117d 100644 --- a/libafl/src/events/centralized.rs +++ b/libafl/src/events/centralized.rs @@ -287,7 +287,7 @@ where if !self.is_main { // secondary node let mut is_tc = false; - // Forward to main only if new tc or heartbeat + // Forward to main only if new tc, heartbeat, or optionally, a new objective let should_be_forwarded = match &mut event { Event::NewTestcase { forward_id, .. } => { *forward_id = Some(ClientId(self.inner.mgr_id().0 as u32)); @@ -295,6 +295,10 @@ where true } Event::UpdateExecStats { .. } => true, // send it but this guy won't be handled. the only purpose is to keep this client alive else the broker thinks it is dead and will dc it + + #[cfg(feature = "share_objectives")] + Event::Objective { .. } => true, + Event::Stop => true, _ => false, }; @@ -679,6 +683,12 @@ where log::debug!("[{}] {} was discarded...)", process::id(), event_name); } } + + #[cfg(feature = "share_objectives")] + Event::Objective { .. } => { + log::debug!("Received new Objective"); + } + Event::Stop => { state.request_stop(); } diff --git a/libafl/src/events/llmp/mgr.rs b/libafl/src/events/llmp/mgr.rs index 687d7e10d4..87b454e435 100644 --- a/libafl/src/events/llmp/mgr.rs +++ b/libafl/src/events/llmp/mgr.rs @@ -461,6 +461,12 @@ where } } } + + #[cfg(feature = "share_objectives")] + Event::Objective { .. } => { + log::debug!("Received new Objective"); + } + Event::CustomBuf { tag, buf } => { for handler in &mut self.custom_buf_handlers { if handler(state, &tag, &buf)? == CustomBufEventResult::Handled { diff --git a/libafl/src/events/llmp/mod.rs b/libafl/src/events/llmp/mod.rs index 0175d33c6f..76bbe90d33 100644 --- a/libafl/src/events/llmp/mod.rs +++ b/libafl/src/events/llmp/mod.rs @@ -24,6 +24,11 @@ use crate::{ state::{HasCorpus, HasExecutions, NopState, State, Stoppable, UsesState}, Error, HasMetadata, }; +#[cfg(feature = "share_objectives")] +use crate::{ + corpus::Testcase, + state::{HasCurrentTestcase, HasSolutions}, +}; /// The llmp event manager pub mod mgr; @@ -284,6 +289,7 @@ where } // Handle arriving events in the client + #[cfg(not(feature = "share_objectives"))] fn handle_in_client( &mut self, fuzzer: &mut Z, @@ -340,6 +346,150 @@ where } } + /// Handle arriving events in the client + #[cfg(not(feature = "share_objectives"))] + #[allow(clippy::unused_self)] + pub fn process( + &mut self, + fuzzer: &mut Z, + state: &mut S, + executor: &mut E, + manager: &mut EM, + ) -> Result + where + E: Executor + HasObservers, + EM: UsesState + EventFirer, + S::Corpus: Corpus, + for<'a> E::Observers: Deserialize<'a>, + Z: ExecutionProcessor::Input, E::Observers, S> + + EvaluatorObservers::Input, S>, + { + // TODO: Get around local event copy by moving handle_in_client + let self_id = self.llmp.sender().id(); + let mut count = 0; + while let Some((client_id, tag, _flags, msg)) = self.llmp.recv_buf_with_flags()? { + assert!( + tag != _LLMP_TAG_EVENT_TO_BROKER, + "EVENT_TO_BROKER parcel should not have arrived in the client!" + ); + + if client_id == self_id { + continue; + } + #[cfg(not(feature = "llmp_compression"))] + let event_bytes = msg; + #[cfg(feature = "llmp_compression")] + let compressed; + #[cfg(feature = "llmp_compression")] + let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { + compressed = self.compressor.decompress(msg)?; + &compressed + } else { + msg + }; + + let event: Event = postcard::from_bytes(event_bytes)?; + log::debug!("Processor received message {}", event.name_detailed()); + self.handle_in_client(fuzzer, executor, state, manager, client_id, event)?; + count += 1; + } + Ok(count) + } +} + +#[cfg(feature = "share_objectives")] +impl LlmpEventConverter +where + S: UsesInput + + HasExecutions + + HasSolutions + + HasMetadata + + Stoppable + + HasCurrentTestcase + + HasCorpus, + S::Solutions: Corpus, + SP: ShMemProvider, + IC: InputConverter, + ICB: InputConverter, + DI: Input, +{ + // Handle arriving events in the client + fn handle_in_client( + &mut self, + fuzzer: &mut Z, + executor: &mut E, + state: &mut S, + manager: &mut EM, + client_id: ClientId, + event: Event, + ) -> Result<(), Error> + where + E: Executor + HasObservers, + EM: UsesState + EventFirer, + S::Corpus: Corpus, + for<'a> E::Observers: Deserialize<'a>, + Z: ExecutionProcessor::Input, E::Observers, S> + + EvaluatorObservers::Input, S>, + { + match event { + Event::NewTestcase { + input, forward_id, .. + } => { + log::debug!("Received new Testcase to convert from {client_id:?} (forward {forward_id:?}, forward {forward_id:?})"); + + let Some(converter) = self.converter_back.as_mut() else { + return Ok(()); + }; + + let res = fuzzer.evaluate_input_with_observers( + state, + executor, + manager, + converter.convert(input)?, + false, + )?; + + if let Some(item) = res.1 { + log::info!("Added received Testcase as item #{item}"); + } + Ok(()) + } + Event::Objective { input, .. } => { + log::debug!("Received new Objective"); + + let Some(converter) = self.converter_back.as_mut() else { + return Ok(()); + }; + + let converted_input = converter.convert(input)?; + let mut testcase = Testcase::from(converted_input); + testcase.set_parent_id_optional(*state.corpus().current()); + + if let Ok(mut tc) = state.current_testcase_mut() { + tc.found_objective(); + } + + state.solutions_mut().add(testcase)?; + log::info!("Added received Objective to Corpus"); + + Ok(()) + } + Event::CustomBuf { tag, buf } => { + for handler in &mut self.custom_buf_handlers { + if handler(state, &tag, &buf)? == CustomBufEventResult::Handled { + break; + } + } + Ok(()) + } + Event::Stop => Ok(()), + _ => Err(Error::unknown(format!( + "Received illegal message that message should not have arrived: {:?}.", + event.name() + ))), + } + } + /// Handle arriving events in the client #[allow(clippy::unused_self)] pub fn process( diff --git a/libafl/src/events/mod.rs b/libafl/src/events/mod.rs index 81ab7f7d32..dd48c1c203 100644 --- a/libafl/src/events/mod.rs +++ b/libafl/src/events/mod.rs @@ -325,6 +325,9 @@ where }, /// A new objective was found Objective { + /// Input of newly found Objective + #[cfg(feature = "share_objectives")] + input: I, /// Objective corpus size objective_size: usize, /// The time when this event was created diff --git a/libafl/src/events/tcp.rs b/libafl/src/events/tcp.rs index 5b4ac815db..b00381f6c9 100644 --- a/libafl/src/events/tcp.rs +++ b/libafl/src/events/tcp.rs @@ -645,6 +645,12 @@ where log::info!("Added received Testcase as item #{item}"); } } + + #[cfg(feature = "share_objectives")] + Event::Objective { .. } => { + log::info!("Received new Objective"); + } + Event::CustomBuf { tag, buf } => { for handler in &mut self.custom_buf_handlers { if handler(state, &tag, &buf)? == CustomBufEventResult::Handled { diff --git a/libafl/src/executors/inprocess/mod.rs b/libafl/src/executors/inprocess/mod.rs index 366532cbf7..bbcb733afd 100644 --- a/libafl/src/executors/inprocess/mod.rs +++ b/libafl/src/executors/inprocess/mod.rs @@ -476,6 +476,9 @@ pub fn run_observers_and_save_state( .fire( state, Event::Objective { + #[cfg(feature = "share_objectives")] + input: input.clone(), + objective_size: state.solutions().count(), time: libafl_bolts::current_time(), }, diff --git a/libafl/src/fuzzer/mod.rs b/libafl/src/fuzzer/mod.rs index 4ac5aeedb6..8974895b2b 100644 --- a/libafl/src/fuzzer/mod.rs +++ b/libafl/src/fuzzer/mod.rs @@ -426,6 +426,9 @@ where manager.fire( state, Event::Objective { + #[cfg(feature = "share_objectives")] + input, + objective_size: state.solutions().count(), time: current_time(), }, @@ -610,6 +613,9 @@ where manager.fire( state, Event::Objective { + #[cfg(feature = "share_objectives")] + input, + objective_size: state.solutions().count(), time: current_time(), }, diff --git a/libafl/src/stages/sync.rs b/libafl/src/stages/sync.rs index 7f51ed6f13..0316b3b400 100644 --- a/libafl/src/stages/sync.rs +++ b/libafl/src/stages/sync.rs @@ -10,6 +10,8 @@ use std::path::{Path, PathBuf}; use libafl_bolts::{current_time, fs::find_new_files_rec, shmem::ShMemProvider, Named}; use serde::{Deserialize, Serialize}; +#[cfg(feature = "share_objectives")] +use crate::state::HasSolutions; use crate::{ corpus::{Corpus, CorpusId, HasCurrentCorpusId}, events::{llmp::LlmpEventConverter, Event, EventConfig, EventFirer}, @@ -232,6 +234,8 @@ where client: LlmpEventConverter, } +// Do not include trait bound HasSolutions to S if share_objectives is disabled +#[cfg(not(feature = "share_objectives"))] impl Stage for SyncFromBrokerStage where EM: EventFirer, @@ -323,6 +327,102 @@ where } } +// Add trait bound HasSolutions to S if share_objectives is enabled +#[cfg(feature = "share_objectives")] +impl Stage for SyncFromBrokerStage +where + EM: EventFirer, + S: State + + HasExecutions + + HasCorpus + + HasRand + + HasMetadata + + HasSolutions + + UsesInput::Input> + + Stoppable, + SP: ShMemProvider, + E: HasObservers + Executor, + for<'a> E::Observers: Deserialize<'a>, + Z: EvaluatorObservers::Input, S> + + ExecutionProcessor::Input, E::Observers, S>, + IC: InputConverter::Input, To = DI>, + ICB: InputConverter::Input>, + DI: Input, + <::Corpus as Corpus>::Input: Input + Clone, + // S::Corpus: Corpus, // delete me + S::Solutions: Corpus, +{ + #[inline] + fn perform( + &mut self, + fuzzer: &mut Z, + executor: &mut E, + state: &mut S, + manager: &mut EM, + ) -> Result<(), Error> { + if self.client.can_convert() { + let last_id = state + .metadata_map() + .get::() + .and_then(|m| m.last_id); + + let mut cur_id = + last_id.map_or_else(|| state.corpus().first(), |id| state.corpus().next(id)); + + while let Some(id) = cur_id { + let input = state.corpus().cloned_input_for_id(id)?; + + self.client.fire( + state, + Event::NewTestcase { + input, + observers_buf: None, + exit_kind: ExitKind::Ok, + corpus_size: 0, // TODO choose if sending 0 or the actual real value + client_config: EventConfig::AlwaysUnique, + time: current_time(), + forward_id: None, + #[cfg(all(unix, feature = "std", feature = "multi_machine"))] + node_id: None, + }, + )?; + + cur_id = state.corpus().next(id); + } + + let last = state.corpus().last(); + if last_id.is_none() { + state + .metadata_map_mut() + .insert(SyncFromBrokerMetadata::new(last)); + } else { + state + .metadata_map_mut() + .get_mut::() + .unwrap() + .last_id = last; + } + } + + self.client.process(fuzzer, state, executor, manager)?; + #[cfg(feature = "introspection")] + state.introspection_monitor_mut().finish_stage(); + Ok(()) + } + + #[inline] + fn should_restart(&mut self, _state: &mut S) -> Result { + // No restart handling needed - does not execute the target. + Ok(true) + } + + #[inline] + fn clear_progress(&mut self, _state: &mut S) -> Result<(), Error> { + // Not needed - does not execute the target. + Ok(()) + } +} + impl SyncFromBrokerStage where SP: ShMemProvider + 'static,