diff --git a/benches/main.rs b/benches/main.rs index 9761f87..d6fbf5c 100644 --- a/benches/main.rs +++ b/benches/main.rs @@ -29,7 +29,7 @@ fn run_basic_subscriber(c: &mut Criterion) { OwnedFd::from_raw_fd(raw_fd) }; - event_manager.add(event_fd.as_fd(),EventSet::IN | EventSet::ERROR | EventSet::HANG_UP,Box::new(move |_:&mut EventManager, event_set: EventSet| { + event_manager.add(event_fd.as_fd(), EventSet::IN | EventSet::ERROR | EventSet::HANG_UP, Box::new(move |_:&mut EventManager<()>, event_set: EventSet| { match event_set { EventSet::IN => (), EventSet::ERROR => { @@ -47,9 +47,10 @@ fn run_basic_subscriber(c: &mut Criterion) { event_fd }).collect::>(); + let expected = vec![();usize::try_from(no_of_subscribers).unwrap()]; c.bench_function("process_basic", |b| { b.iter(|| { - assert_eq!(event_manager.wait(Some(0)), Ok(no_of_subscribers)); + assert_eq!(event_manager.wait(Some(0)), Ok(expected.as_slice())); }) }); @@ -75,7 +76,7 @@ fn run_arc_mutex_subscriber(c: &mut Criterion) { let counter = Arc::new(Mutex::new(0u64)); let counter_clone = counter.clone(); - event_manager.add(event_fd.as_fd(),EventSet::IN | EventSet::ERROR | EventSet::HANG_UP,Box::new(move |_:&mut EventManager, event_set: EventSet| { + event_manager.add(event_fd.as_fd(),EventSet::IN | EventSet::ERROR | EventSet::HANG_UP,Box::new(move |_:&mut EventManager<()>, event_set: EventSet| { match event_set { EventSet::IN => { *counter_clone.lock().unwrap() += 1; @@ -95,9 +96,10 @@ fn run_arc_mutex_subscriber(c: &mut Criterion) { (event_fd,counter) }).collect::>(); + let expected = vec![();usize::try_from(no_of_subscribers).unwrap()]; c.bench_function("process_with_arc_mutex", |b| { b.iter(|| { - assert_eq!(event_manager.wait(Some(0)), Ok(no_of_subscribers)); + assert_eq!(event_manager.wait(Some(0)), Ok(expected.as_slice())); }) }); @@ -124,7 +126,7 @@ fn run_subscriber_with_inner_mut(c: &mut Criterion) { let counter = Arc::new(AtomicU64::new(0)); let counter_clone = counter.clone(); - event_manager.add(event_fd.as_fd(),EventSet::IN | EventSet::ERROR | EventSet::HANG_UP,Box::new(move |_:&mut EventManager, event_set: EventSet| { + event_manager.add(event_fd.as_fd(),EventSet::IN | EventSet::ERROR | EventSet::HANG_UP,Box::new(move |_:&mut EventManager<()>, event_set: EventSet| { match event_set { EventSet::IN => { counter_clone.fetch_add(1, Ordering::SeqCst); @@ -144,9 +146,10 @@ fn run_subscriber_with_inner_mut(c: &mut Criterion) { (event_fd,counter) }).collect::>(); + let expected = vec![();usize::try_from(no_of_subscribers).unwrap()]; c.bench_function("process_with_inner_mut", |b| { b.iter(|| { - assert_eq!(event_manager.wait(Some(0)), Ok(no_of_subscribers)); + assert_eq!(event_manager.wait(Some(0)), Ok(expected.as_slice())); }) }); @@ -177,7 +180,7 @@ fn run_multiple_subscriber_types(c: &mut Criterion) { let counter = Arc::new(AtomicU64::new(0)); let counter_clone = counter.clone(); - event_manager.add(event_fd.as_fd(),EventSet::IN | EventSet::ERROR | EventSet::HANG_UP,Box::new(move |_:&mut EventManager, event_set: EventSet| { + event_manager.add(event_fd.as_fd(),EventSet::IN | EventSet::ERROR | EventSet::HANG_UP,Box::new(move |_:&mut EventManager<()>, event_set: EventSet| { match event_set { EventSet::IN => { counter_clone.fetch_add(1, Ordering::SeqCst); @@ -223,7 +226,7 @@ fn run_multiple_subscriber_types(c: &mut Criterion) { inner_subscribers[i].as_fd(), EventSet::IN | EventSet::ERROR | EventSet::HANG_UP, Box::new( - move |_: &mut EventManager, event_set: EventSet| match event_set { + move |_: &mut EventManager<()>, event_set: EventSet| match event_set { EventSet::IN => { data_clone[i].fetch_add(1, Ordering::SeqCst); } @@ -244,9 +247,10 @@ fn run_multiple_subscriber_types(c: &mut Criterion) { }) .collect::>(); + let expected = vec![();usize::try_from(total).unwrap()]; c.bench_function("process_dynamic_dispatch", |b| { b.iter(|| { - assert_eq!(event_manager.wait(Some(0)), Ok(total)); + assert_eq!(event_manager.wait(Some(0)), Ok(expected.as_slice())); }) }); @@ -272,7 +276,7 @@ fn run_with_few_active_events(c: &mut Criterion) { OwnedFd::from_raw_fd(raw_fd) }; - event_manager.add(event_fd.as_fd(),EventSet::IN | EventSet::ERROR | EventSet::HANG_UP,Box::new(move |_:&mut EventManager, event_set: EventSet| { + event_manager.add(event_fd.as_fd(),EventSet::IN | EventSet::ERROR | EventSet::HANG_UP,Box::new(move |_:&mut EventManager<()>, event_set: EventSet| { match event_set { EventSet::IN => (), EventSet::ERROR => { @@ -290,9 +294,10 @@ fn run_with_few_active_events(c: &mut Criterion) { event_fd }).collect::>(); + let expected = vec![();usize::try_from(active).unwrap()]; c.bench_function("process_dispatch_few_events", |b| { b.iter(|| { - assert_eq!(event_manager.wait(Some(0)), Ok(active)); + assert_eq!(event_manager.wait(Some(0)), Ok(expected.as_slice())); }) }); diff --git a/src/lib.rs b/src/lib.rs index 8eba029..4df6727 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,7 +6,7 @@ use std::os::unix::io::{AsRawFd, RawFd}; use vmm_sys_util::epoll::EventSet; /// The function thats runs when an event occurs. -type Action = Box; +type Action = Box, EventSet) -> T>; fn errno() -> i32 { // SAFETY: Always safe. @@ -14,21 +14,24 @@ fn errno() -> i32 { } #[derive(Debug)] -pub struct BufferedEventManager { - event_manager: EventManager, +pub struct BufferedEventManager { + event_manager: EventManager, // TODO The length is always unused, a custom type could thus save `size_of::()` bytes. buffer: Vec, + // TODO The length is always unused, a custom type could thus save `size_of::()` bytes. + output_buffer: Vec, } -impl BufferedEventManager { +impl BufferedEventManager { /// Add an entry to the interest list of the epoll file descriptor. /// /// # Errors /// /// When [`libc::epoll_ctl`] returns `-1`. - pub fn add(&mut self, fd: T, events: EventSet, f: Action) -> Result<(), i32> { + pub fn add(&mut self, fd: Fd, events: EventSet, f: Action) -> Result<(), i32> { let res = self.event_manager.add(fd, events, f); self.buffer.reserve(self.event_manager.events.len()); + self.output_buffer.reserve(self.event_manager.events.len()); res } @@ -39,7 +42,7 @@ impl BufferedEventManager { /// # Errors /// /// When [`libc::epoll_ctl`] returns `-1`. - pub fn del(&mut self, fd: T) -> Result { + pub fn del(&mut self, fd: Fd) -> Result { self.event_manager.del(fd) } @@ -55,13 +58,21 @@ impl BufferedEventManager { /// /// When the value given in timeout does not fit within an `i32` e.g. /// `timeout.map(|u| i32::try_from(u).unwrap())`. - pub fn wait(&mut self, timeout: Option) -> Result { + pub fn wait(&mut self, timeout: Option) -> Result<&[T], i32> { // SAFETY: `EventManager::wait` initializes N element from the start of the slice and only // accesses these, thus it will never access uninitialized memory, making this safe. unsafe { self.buffer.set_len(self.buffer.capacity()); + self.output_buffer.set_len(self.output_buffer.capacity()); + } + let n = self + .event_manager + .wait(timeout, &mut self.buffer, &mut self.output_buffer)?; + unsafe { + Ok(self + .output_buffer + .get_unchecked(..usize::try_from(n).unwrap_unchecked())) } - self.event_manager.wait(timeout, &mut self.buffer) } /// Creates new event manager. @@ -72,29 +83,31 @@ impl BufferedEventManager { pub fn new(close_exec: bool) -> Result { Ok(BufferedEventManager { event_manager: EventManager::new(close_exec)?, - buffer: Vec::new(), + buffer: Vec::with_capacity(0), + output_buffer: Vec::with_capacity(0), }) } pub fn with_capacity(close_exec: bool, capacity: usize) -> Result { Ok(BufferedEventManager { event_manager: EventManager::new(close_exec)?, buffer: Vec::with_capacity(capacity), + output_buffer: Vec::with_capacity(capacity), }) } } -impl Default for BufferedEventManager { +impl Default for BufferedEventManager { fn default() -> Self { Self::new(false).unwrap() } } -pub struct EventManager { +pub struct EventManager { epfd: RawFd, - events: HashMap, + events: HashMap>, } -impl std::fmt::Debug for EventManager { +impl std::fmt::Debug for EventManager { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("EventManager") .field("epfd", &self.epfd) @@ -110,13 +123,13 @@ impl std::fmt::Debug for EventManager { } } -impl EventManager { +impl EventManager { /// Add an entry to the interest list of the epoll file descriptor. /// /// # Errors /// /// When [`libc::epoll_ctl`] returns `-1`. - pub fn add(&mut self, fd: T, events: EventSet, f: Action) -> Result<(), i32> { + pub fn add(&mut self, fd: Fd, events: EventSet, f: Action) -> Result<(), i32> { let mut event = libc::epoll_event { events: events.bits(), r#u64: u64::try_from(fd.as_raw_fd()).unwrap(), @@ -140,7 +153,7 @@ impl EventManager { /// # Errors /// /// When [`libc::epoll_ctl`] returns `-1`. - pub fn del(&mut self, fd: T) -> Result { + pub fn del(&mut self, fd: Fd) -> Result { match self.events.remove(&fd.as_raw_fd()) { Some(_) => { // SAFETY: Safe when `fd` is a valid file descriptor. @@ -177,13 +190,14 @@ impl EventManager { &mut self, timeout: Option, buffer: &mut [libc::epoll_event], + output_buffer: &mut [T], ) -> Result { // SAFETY: Always safe. match unsafe { libc::epoll_wait( self.epfd, buffer.as_mut_ptr(), - buffer.len().try_into().unwrap(), + buffer.len().try_into().unwrap_unchecked(), timeout.map_or(-1i32, |u| i32::try_from(u).unwrap()), ) } { @@ -195,11 +209,11 @@ impl EventManager { let event = buffer[i]; // For all events which can fire there exists an entry within `self.events` thus // it is safe to unwrap here. - let f: *const dyn Fn(&mut EventManager, EventSet) = self + let f: *const dyn Fn(&mut EventManager, EventSet) -> T = self .events .get(&i32::try_from(event.u64).unwrap_unchecked()) .unwrap_unchecked(); - (*f)(self, EventSet::from_bits_unchecked(event.events)); + output_buffer[i] = (*f)(self, EventSet::from_bits_unchecked(event.events)); } Ok(n) }, @@ -224,7 +238,7 @@ impl EventManager { } } -impl Default for EventManager { +impl Default for EventManager { fn default() -> Self { Self::new(false).unwrap() } @@ -241,6 +255,7 @@ mod tests { fn delete() { static COUNT: AtomicBool = AtomicBool::new(false); let mut manager = BufferedEventManager::default(); + // We set value to 1 so it will trigger on a read event. // SAFETY: Always safe. let event_fd = unsafe { @@ -248,13 +263,14 @@ mod tests { assert_ne!(fd, -1); fd }; + manager .add( event_fd, EventSet::IN, // A closure which will flip the atomic boolean then remove the event fd from the // interest list. - Box::new(move |x: &mut EventManager, _: EventSet| { + Box::new(move |x: &mut EventManager<()>, _| { // Flips the atomic. let cur = COUNT.load(Ordering::SeqCst); COUNT.store(!cur, Ordering::SeqCst); @@ -270,14 +286,17 @@ mod tests { // The file descriptor has been pre-armed, this will immediately call the respective // closure. - assert_eq!(manager.wait(Some(10)), Ok(1)); + let vec = vec![()]; + assert_eq!(manager.wait(Some(10)), Ok(vec.as_slice())); + // As the closure will flip the atomic boolean we assert it has flipped correctly. assert!(COUNT.load(Ordering::SeqCst)); // At this point we have called the closure, since the closure removes the event fd from the // interest list of the inner epoll, calling this again should timeout as there are no event // fd in the inner epolls interest list which could trigger. - assert_eq!(manager.wait(Some(10)), Ok(0)); + let vec = vec![]; + assert_eq!(manager.wait(Some(10)), Ok(vec.as_slice())); // As the `EventManager::wait` should timeout the value of the atomic boolean should not be // flipped. assert!(COUNT.load(Ordering::SeqCst)); @@ -298,7 +317,7 @@ mod tests { .add( event_fd, EventSet::IN, - Box::new(|_: &mut EventManager, _: EventSet| { + Box::new(|_, _| { // Flips the atomic. let cur = COUNT.load(Ordering::SeqCst); COUNT.store(!cur, Ordering::SeqCst); @@ -310,13 +329,15 @@ mod tests { assert!(!COUNT.load(Ordering::SeqCst)); // As the closure will flip the atomic boolean we assert it has flipped correctly. - assert_eq!(manager.wait(Some(10)), Ok(1)); + let vec = vec![()]; + assert_eq!(manager.wait(Some(10)), Ok(vec.as_slice())); // As the closure will flip the atomic boolean we assert it has flipped correctly. assert!(COUNT.load(Ordering::SeqCst)); // The file descriptor has been pre-armed, this will immediately call the respective // closure. - assert_eq!(manager.wait(Some(10)), Ok(1)); + let vec = vec![()]; + assert_eq!(manager.wait(Some(10)), Ok(vec.as_slice())); // As the closure will flip the atomic boolean we assert it has flipped correctly. assert!(!COUNT.load(Ordering::SeqCst)); } @@ -332,7 +353,7 @@ mod tests { let mut manager = BufferedEventManager::default(); // Setup eventfd's and counters. - let subscribers = (0..100) + let subscribers = (0..SUBSCRIBERS) .map(|_| { // SAFETY: Always safe. let event_fd = unsafe { @@ -347,9 +368,7 @@ mod tests { .add( event_fd.as_fd(), EventSet::IN, - Box::new(move |_: &mut EventManager, _: EventSet| { - counter_clone.fetch_add(1, Ordering::SeqCst); - }), + Box::new(move |_, _| counter_clone.fetch_add(1, Ordering::SeqCst)), ) .unwrap(); @@ -375,10 +394,42 @@ mod tests { } // Check counter are the correct values - let n = i32::try_from(FIRING).unwrap(); - assert_eq!(manager.wait(None), Ok(n)); + let arr = [0; FIRING]; + assert_eq!(manager.wait(None), Ok(arr.as_slice())); for i in set { assert_eq!(subscribers[i].1.load(Ordering::SeqCst), 1); } } + + #[test] + fn results() { + let mut manager = BufferedEventManager::default(); + + // We set value to 1 so it will trigger on a read event. + // SAFETY: Always safe. + let event_fd = unsafe { + let fd = libc::eventfd(1, 0); + assert_ne!(fd, -1); + fd + }; + + manager + .add(event_fd, EventSet::IN, Box::new(|_, _| Ok(()))) + .unwrap(); + + // We set value to 1 so it will trigger on a read event. + // SAFETY: Always safe. + let event_fd = unsafe { + let fd = libc::eventfd(1, 0); + assert_ne!(fd, -1); + fd + }; + + manager + .add(event_fd, EventSet::IN, Box::new(|_, _| Err(()))) + .unwrap(); + + let arr = [Ok(()), Err(())]; + assert_eq!(manager.wait(None), Ok(arr.as_slice())); + } }