diff --git a/Cargo.toml b/Cargo.toml index c892e13..c3f3dcd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,19 +14,8 @@ vmm-sys-util = "0.11.0" libc = "0.2.39" [dev-dependencies] -criterion = "0.3.5" - -[features] -remote_endpoint = [] -test_utilities = [] +criterion = "0.5.1" [[bench]] name = "main" -harness = false - -[lib] -bench = false # https://bheisler.github.io/criterion.rs/book/faq.html#cargo-bench-gives-unrecognized-option-errors-for-valid-command-line-options - -[profile.bench] -lto = true -codegen-units = 1 +harness = false \ No newline at end of file diff --git a/benches/main.rs b/benches/main.rs index a845621..4c95c3e 100644 --- a/benches/main.rs +++ b/benches/main.rs @@ -3,71 +3,154 @@ use criterion::{criterion_group, criterion_main, Criterion}; -use event_manager::utilities::subscribers::{ - CounterInnerMutSubscriber, CounterSubscriber, CounterSubscriberWithData, -}; -use event_manager::{EventManager, EventSubscriber, MutEventSubscriber, SubscriberOps}; +use event_manager::{BufferedEventManager, EventManager}; +use std::os::fd::AsFd; +use std::os::fd::FromRawFd; +use std::os::fd::OwnedFd; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex}; +use vmm_sys_util::epoll::EventSet; // Test the performance of event manager when it manages a single subscriber type. // The performance is assessed under stress, all added subscribers have active events. fn run_basic_subscriber(c: &mut Criterion) { - let no_of_subscribers = 200; + let no_of_subscribers = 200i32; - let mut event_manager = EventManager::::new().unwrap(); - for _ in 0..no_of_subscribers { - let mut counter_subscriber = CounterSubscriber::default(); - counter_subscriber.trigger_event(); - event_manager.add_subscriber(counter_subscriber); - } + let mut event_manager = + BufferedEventManager::with_capacity(false, no_of_subscribers as usize).unwrap(); + + let subscribers = (0..no_of_subscribers).map(|_| { + // Create an eventfd that is initialized with 1 waiting event. + let event_fd = unsafe { + let raw_fd = libc::eventfd(1,0); + assert_ne!(raw_fd, -1); + OwnedFd::from_raw_fd(raw_fd) + }; + + event_manager.add(event_fd.as_fd(),EventSet::IN | EventSet::ERROR | EventSet::HANG_UP,Box::new(move |_:&mut EventManager, event_set: EventSet| { + match event_set { + EventSet::IN => (), + EventSet::ERROR => { + eprintln!("Got error on the monitored event."); + }, + EventSet::HANG_UP => { + // TODO Do this: https://github.com/rust-vmm/event-manager/blob/main/src/utilities/subscribers.rs#L116:L118 + panic!("Cannot continue execution. Associated fd was closed."); + }, + _ => { + eprintln!("Received spurious event from the event manager {event_set:#?}."); + } + } + })).unwrap(); + + event_fd + }).collect::>(); c.bench_function("process_basic", |b| { b.iter(|| { - let ev_count = event_manager.run().unwrap(); - assert_eq!(ev_count, no_of_subscribers) + assert_eq!(event_manager.wait(Some(0)), Ok(no_of_subscribers)); }) }); + + drop(subscribers); } // Test the performance of event manager when the subscribers are wrapped in an Arc. // The performance is assessed under stress, all added subscribers have active events. fn run_arc_mutex_subscriber(c: &mut Criterion) { - let no_of_subscribers = 200; + let no_of_subscribers = 200i32; + + let mut event_manager = + BufferedEventManager::with_capacity(false, no_of_subscribers as usize).unwrap(); + + let subscribers = (0..no_of_subscribers).map(|_| { + // Create an eventfd that is initialized with 1 waiting event. + let event_fd = unsafe { + let raw_fd = libc::eventfd(1,0); + assert_ne!(raw_fd, -1); + OwnedFd::from_raw_fd(raw_fd) + }; + let counter = Arc::new(Mutex::new(0u64)); + let counter_clone = counter.clone(); + + event_manager.add(event_fd.as_fd(),EventSet::IN | EventSet::ERROR | EventSet::HANG_UP,Box::new(move |_:&mut EventManager, event_set: EventSet| { + match event_set { + EventSet::IN => { + *counter_clone.lock().unwrap() += 1; + }, + EventSet::ERROR => { + eprintln!("Got error on the monitored event."); + }, + EventSet::HANG_UP => { + // TODO Do this: https://github.com/rust-vmm/event-manager/blob/main/src/utilities/subscribers.rs#L116:L118 + panic!("Cannot continue execution. Associated fd was closed."); + }, + _ => { + eprintln!("Received spurious event from the event manager {event_set:#?}."); + } + } + })).unwrap(); - let mut event_manager = EventManager::>>::new().unwrap(); - for _ in 0..no_of_subscribers { - let counter_subscriber = Arc::new(Mutex::new(CounterSubscriber::default())); - counter_subscriber.lock().unwrap().trigger_event(); - event_manager.add_subscriber(counter_subscriber); - } + (event_fd,counter) + }).collect::>(); c.bench_function("process_with_arc_mutex", |b| { b.iter(|| { - let ev_count = event_manager.run().unwrap(); - assert_eq!(ev_count, no_of_subscribers) + assert_eq!(event_manager.wait(Some(0)), Ok(no_of_subscribers)); }) }); + + drop(subscribers); } // Test the performance of event manager when the subscribers are wrapped in an Arc, and they // leverage inner mutability to update their internal state. // The performance is assessed under stress, all added subscribers have active events. fn run_subscriber_with_inner_mut(c: &mut Criterion) { - let no_of_subscribers = 200; + let no_of_subscribers = 200i32; + + let mut event_manager = + BufferedEventManager::with_capacity(false, no_of_subscribers as usize).unwrap(); + + let subscribers = (0..no_of_subscribers).map(|_| { + // Create an eventfd that is initialized with 1 waiting event. + let event_fd = unsafe { + let raw_fd = libc::eventfd(1,0); + assert_ne!(raw_fd, -1); + OwnedFd::from_raw_fd(raw_fd) + }; + let counter = Arc::new(AtomicU64::new(0)); + let counter_clone = counter.clone(); + + event_manager.add(event_fd.as_fd(),EventSet::IN | EventSet::ERROR | EventSet::HANG_UP,Box::new(move |_:&mut EventManager, event_set: EventSet| { + match event_set { + EventSet::IN => { + counter_clone.fetch_add(1, Ordering::SeqCst); + }, + EventSet::ERROR => { + eprintln!("Got error on the monitored event."); + }, + EventSet::HANG_UP => { + // TODO Do this: https://github.com/rust-vmm/event-manager/blob/main/src/utilities/subscribers.rs#L116:L118 + panic!("Cannot continue execution. Associated fd was closed."); + }, + _ => { + eprintln!("Received spurious event from the event manager {event_set:#?}."); + } + } + })).unwrap(); - let mut event_manager = EventManager::>::new().unwrap(); - for _ in 0..no_of_subscribers { - let counter_subscriber = CounterInnerMutSubscriber::default(); - counter_subscriber.trigger_event(); - event_manager.add_subscriber(Arc::new(counter_subscriber)); - } + (event_fd,counter) + }).collect::>(); c.bench_function("process_with_inner_mut", |b| { b.iter(|| { - let ev_count = event_manager.run().unwrap(); - assert_eq!(ev_count, no_of_subscribers) + assert_eq!(event_manager.wait(Some(0)), Ok(no_of_subscribers)); }) }); + + drop(subscribers); } // Test the performance of event manager when it manages subscribers of different types, that are @@ -76,63 +159,151 @@ fn run_subscriber_with_inner_mut(c: &mut Criterion) { // The performance is assessed under stress, all added subscribers have active events, and the // CounterSubscriberWithData subscribers have multiple active events. fn run_multiple_subscriber_types(c: &mut Criterion) { - let no_of_subscribers = 100; + let no_of_subscribers = 100i32; + + let total = no_of_subscribers + (no_of_subscribers * i32::try_from(EVENTS).unwrap()); + + let mut event_manager = + BufferedEventManager::with_capacity(false, usize::try_from(total).unwrap()).unwrap(); - let mut event_manager = EventManager::>>::new() - .expect("Cannot create event manager."); + let subscribers = (0..no_of_subscribers).map(|_| { + // Create an eventfd that is initialized with 1 waiting event. + let event_fd = unsafe { + let raw_fd = libc::eventfd(1,0); + assert_ne!(raw_fd, -1); + OwnedFd::from_raw_fd(raw_fd) + }; + let counter = Arc::new(AtomicU64::new(0)); + let counter_clone = counter.clone(); - for i in 0..no_of_subscribers { - // The `CounterSubscriberWithData` expects to receive a number as a parameter that - // represents the number it can use as its inner Events data. - let mut data_subscriber = CounterSubscriberWithData::new(i * no_of_subscribers); - data_subscriber.trigger_all_counters(); - event_manager.add_subscriber(Arc::new(Mutex::new(data_subscriber))); + event_manager.add(event_fd.as_fd(),EventSet::IN | EventSet::ERROR | EventSet::HANG_UP,Box::new(move |_:&mut EventManager, event_set: EventSet| { + match event_set { + EventSet::IN => { + counter_clone.fetch_add(1, Ordering::SeqCst); + }, + EventSet::ERROR => { + eprintln!("Got error on the monitored event."); + }, + EventSet::HANG_UP => { + // TODO Do this: https://github.com/rust-vmm/event-manager/blob/main/src/utilities/subscribers.rs#L116:L118 + panic!("Cannot continue execution. Associated fd was closed."); + }, + _ => { + eprintln!("Received spurious event from the event manager {event_set:#?}."); + } + } + })).unwrap(); - let mut counter_subscriber = CounterSubscriber::default(); - counter_subscriber.trigger_event(); - event_manager.add_subscriber(Arc::new(Mutex::new(counter_subscriber))); - } + (event_fd,counter) + }).collect::>(); + + const EVENTS: usize = 3; + + let subscribers_with_data = (0..no_of_subscribers) + .map(|_| { + let data = Arc::new([AtomicU64::new(0), AtomicU64::new(0), AtomicU64::new(0)]); + assert_eq!(data.len(), EVENTS); + + // Create eventfd's that are initialized with 1 waiting event. + let inner_subscribers = (0..EVENTS) + .map(|_| unsafe { + let raw_fd = libc::eventfd(1, 0); + assert_ne!(raw_fd, -1); + OwnedFd::from_raw_fd(raw_fd) + }) + .collect::>(); + + for i in 0..EVENTS { + let data_clone = data.clone(); + + event_manager + .add( + inner_subscribers[i].as_fd(), + EventSet::IN | EventSet::ERROR | EventSet::HANG_UP, + Box::new(move |_: &mut EventManager, event_set: EventSet| { + match event_set { + EventSet::IN => { + data_clone[i].fetch_add(1, Ordering::SeqCst); + } + EventSet::ERROR => { + eprintln!("Got error on the monitored event."); + } + EventSet::HANG_UP => { + // TODO Do this: https://github.com/rust-vmm/event-manager/blob/main/src/utilities/subscribers.rs#L116:L118 + panic!("Cannot continue execution. Associated fd was closed."); + } + _ => {} + } + }), + ) + .unwrap(); + } + + (inner_subscribers, data) + }) + .collect::>(); c.bench_function("process_dynamic_dispatch", |b| { b.iter(|| { - let _ = event_manager.run().unwrap(); + assert_eq!(event_manager.wait(Some(0)), Ok(total)); }) }); + + drop(subscribers); + drop(subscribers_with_data); } // Test the performance of event manager when it manages a single subscriber type. // Just a few of the events are active in this test scenario. fn run_with_few_active_events(c: &mut Criterion) { - let no_of_subscribers = 200; + let no_of_subscribers = 200i32; + let active = 1 + no_of_subscribers / 23; + + let mut event_manager = + BufferedEventManager::with_capacity(false, no_of_subscribers as usize).unwrap(); - let mut event_manager = EventManager::::new().unwrap(); + let subscribers = (0..no_of_subscribers).map(|i| { + // Create an eventfd that is initialized with 1 waiting event. + let event_fd = unsafe { + let raw_fd = libc::eventfd((i % 23 == 0) as u8 as u32,0); + assert_ne!(raw_fd, -1); + OwnedFd::from_raw_fd(raw_fd) + }; - for i in 0..no_of_subscribers { - let mut counter_subscriber = CounterSubscriber::default(); - // Let's activate the events for a few subscribers (i.e. only the ones that are - // divisible by 23). 23 is a random number that I just happen to like. - if i % 23 == 0 { - counter_subscriber.trigger_event(); - } - event_manager.add_subscriber(counter_subscriber); - } + event_manager.add(event_fd.as_fd(),EventSet::IN | EventSet::ERROR | EventSet::HANG_UP,Box::new(move |_:&mut EventManager, event_set: EventSet| { + match event_set { + EventSet::IN => (), + EventSet::ERROR => { + eprintln!("Got error on the monitored event."); + }, + EventSet::HANG_UP => { + // TODO Do this: https://github.com/rust-vmm/event-manager/blob/main/src/utilities/subscribers.rs#L116:L118 + panic!("Cannot continue execution. Associated fd was closed."); + }, + _ => { + eprintln!("Received spurious event from the event manager {event_set:#?}."); + } + } + })).unwrap(); + + event_fd + }).collect::>(); c.bench_function("process_dispatch_few_events", |b| { b.iter(|| { - let _ = event_manager.run().unwrap(); + assert_eq!(event_manager.wait(Some(0)), Ok(active)); }) }); + + drop(subscribers); } -criterion_group! { +criterion_group!( name = benches; config = Criterion::default() .sample_size(200) .measurement_time(std::time::Duration::from_secs(40)); targets = run_basic_subscriber, run_arc_mutex_subscriber, run_subscriber_with_inner_mut, - run_multiple_subscriber_types, run_with_few_active_events -} - -criterion_main! { - benches -} + run_multiple_subscriber_types, run_with_few_active_events +); +criterion_main!(benches); diff --git a/docs/DESIGN.md b/docs/DESIGN.md index 623076f..7bc587c 100644 --- a/docs/DESIGN.md +++ b/docs/DESIGN.md @@ -1,147 +1,13 @@ # Event Manager Design -## Interest List Updates - -Subscribers can update their interest list when the `EventManager` calls -their `process` function. The EventManager crates a specialized `EventOps` -object. `EventOps` limits the operations that the subscribers may call to the -ones that are related to the interest list as follows: -- Adding a new event that the subscriber is interested in. -- Modifying an existing event (for example: update an event to be - edge-triggered instead of being level-triggered or update the user data - associated with an event). -- Remove an existing event. - -The subscriber is responsible for handling the errors returned from calling -`add`, `modify` or `remove`. - -The `EventManager` knows how to associate these actions to a registered -subscriber because it adds the corresponding `SubscriberId` when it creates the -`EventOps` object. - -## Events - -By default, `Events` wrap a file descriptor, and a bit mask of events -(for example `EPOLLIN | EPOLLOUT`). The `Events` can optionally contain user -defined data. - -The `Events` are used in `add`, `remove` and `modify` functions -in [`EventOps`](../src/events.rs). While their semantic is very similar to that -of `libc::epoll_event`, they come with an additional requirement. When -creating `Events` objects, the subscribers must specify the file descriptor -associated with the event mask. There are a few reasons behind this choice: -- Reducing the number of parameters on the `EventOps` functions. Instead of - always passing the file descriptor along with an `epoll_event` object, the - user only needs to pass `Events`. -- Backing the file descriptor in `Events` provides a simple mapping from a file - descriptor to the subscriber that is watching events on that particular file - descriptor. - -Storing the file descriptor in all `Events` means that there are 32 bits left -for custom user data. -A file descriptor can be registered only once (it can be associated with only -one subscriber). - -### Using Events With Custom Data - -The 32-bits in custom data can be used to map events to internal callbacks -based on user-defined numeric values instead of file descriptors. In the -below example, the user defined values are consecutive so that the match -statement can be optimized to a jump table. - -```rust - struct Painter {} - const PROCESS_GREEN:u32 = 0; - const PROCESS_RED: u32 = 1; - const PROCESS_BLUE: u32 = 2; - - impl Painter { - fn process_green(&self, event: Events) {} - fn process_red(&self, event: Events) {} - fn process_blue(&self, events: Events) {} - } - - impl MutEventSubscriber for Painter { - fn init(&mut self, ops: &mut EventOps) { - let green_eventfd = EventFd::new(0).unwrap(); - let ev_for_green = Events::with_data(&green_eventfd, PROCESS_GREEN, EventSet::IN); - ops.add(ev_for_green).unwrap(); - let red_eventfd = EventFd::new(0).unwrap(); - let ev_for_red = Events::with_data(&red_eventfd, PROCESS_RED, EventSet::IN); - ops.add(ev_for_red).unwrap(); +`EventManager` is a wrapper over [epoll](https://man7.org/linux/man-pages/man7/epoll.7.html) that +allows for more ergonomic usage with many events. - let blue_eventfd = EventFd::new(0).unwrap(); - let ev_for_blue = Events::with_data(&blue_eventfd, PROCESS_BLUE, EventSet::IN); - ops.add(ev_for_blue).unwrap(); - } - - fn process(&mut self, events: Events, ops: &mut EventOps) { - match events.data() { - PROCESS_GREEN => self.process_green(events), - PROCESS_RED => self.process_red(events), - PROCESS_BLUE => self.process_blue(events), - _ => error!("spurious event"), - }; - } - } -``` - -## Remote Endpoint - -A manager remote endpoint allows users to interact with the `EventManger` -(as a `SubscriberOps` trait object) from a different thread of execution. -This is particularly useful when the `EventManager` owns the subscriber object -the user wants to interact with, and the communication happens from a separate -thread. This functionality is gated behind the `remote_endpoint` feature. - -The current implementation relies on passing boxed closures to the manager and -getting back a boxed result. The manager is notified about incoming invocation -requests via an [`EventFd`](https://docs.rs/vmm-sys-util/latest/vmm_sys_util/eventfd/struct.EventFd.html) -which is added by the manager to its internal run loop. The manager runs each -closure to completion, and then returns the boxed result using a sender object -that is part of the initial message that also included the closure. The -following example uses the previously defined `Painter` subscriber type. - -```rust -fn main() { - // Create an event manager object. - let mut event_manager = EventManager::::new().unwrap(); - - // Obtain a remote endpoint object. - let endpoint = event_manager.remote_endpoint(); - - // Move the event manager to a new thread and start running the event loop there. - let thread_handle = thread::spawn(move || loop { - event_manager.run().unwrap(); - }); - - let subscriber = Painter {}; - - // Add the subscriber using the remote endpoint. The subscriber is moved to the event - // manager thread, and is now owned by the manager. In return, we get the subscriber id, - // which can be used to identify the subscriber for subsequent operations. - let id = endpoint - .call_blocking(move |sub_ops| -> Result { - Ok(sub_ops.add_subscriber(subscriber)) - }) - .unwrap(); - // ... - - // Add a new event to the subscriber, using fd 1 as an example. - let events = Events::new_raw(1, EventSet::OUT); - endpoint - .call_blocking(move |sub_ops| -> Result<()> { sub_ops.event_ops(id)?.add(events) }) - .unwrap(); - - // ... +## Interest List Updates - thread_handle.join(); -} -``` +Event actions are represented by a closure, these are given a mutable reference to the +`EventManager`, this can be used to: -The `call_blocking` invocation sends a message over a channel to the event manager on the -other thread, and then blocks until a response is received. The event manager detects the -presence of such messages as with any other event, and handles them as part of the event -loop. This can lead to deadlocks if, for example, `call_blocking` is invoked in the `process` -implmentation of a subscriber to the same event manager. \ No newline at end of file +- Add a new event. +- Remove an existing event. \ No newline at end of file diff --git a/docs/DEVELOPMENT.md b/docs/DEVELOPMENT.md index f38a4a2..a8fa63f 100644 --- a/docs/DEVELOPMENT.md +++ b/docs/DEVELOPMENT.md @@ -2,31 +2,8 @@ ## Testing -The `event-manager` is tested using: -- unit tests - defined in their corresponding modules -- Rust integration tests - defined in the [tests](../tests) directory -- performance tests - defined in the [benches](../benches) directory - -The integration and performance tests share subscribers implementations -which can be found under the [src/utilities](../src/utilities) module. - -The `utilities` module is compiled only when using the `test_utilities` -feature. To run unit tests, integration tests, and performance tests, the user -needs to specify the `test_utilities` feature; otherwise the build fails. - -```bash -cargo test --features test_utilities -cargo bench --features test_utilities -``` - -We recommend running all the tests before submitting a PR as follows: +The `event-manager` is tested using unit tests. ```bash -cargo test --all-features -``` - -Performance tests are implemented using -[criterion](https://docs.rs/crate/criterion/). Running the performance tests -locally should work, but only when they're run as part of the CI performance -improvements/degradations can be noticed. More details about performance tests -[here](https://github.com/rust-vmm/rust-vmm-ci#performance-tests). +cargo test +``` \ No newline at end of file diff --git a/docs/event-manager.png b/docs/event-manager.png deleted file mode 100644 index 5e8808a..0000000 Binary files a/docs/event-manager.png and /dev/null differ diff --git a/rust-vmm-ci b/rust-vmm-ci index 8627b37..c2f8c93 160000 --- a/rust-vmm-ci +++ b/rust-vmm-ci @@ -1 +1 @@ -Subproject commit 8627b3766b2bedde4657c7e9ddfc6f95a20e6942 +Subproject commit c2f8c93e3796d8b3ea7dc339fad211457be9c238 diff --git a/src/endpoint.rs b/src/endpoint.rs deleted file mode 100644 index aae4251..0000000 --- a/src/endpoint.rs +++ /dev/null @@ -1,187 +0,0 @@ -// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 OR BSD-3-Clause - -//! A manager remote endpoint allows user to interact with the `EventManger` (as a `SubscriberOps` -//! trait object) from a different thread of execution. -//! -//! This is particularly useful when the `EventManager` owns (via `EventManager::add_subscriber`) -//! a subscriber object the user needs to work with (via `EventManager::subscriber_mut`), but the -//! `EventManager` being on a different thread requires synchronized handles. -//! -//! Until more sophisticated methods are explored (for example making the `EventManager` offer -//! interior mutability using something like an RCU mechanism), the current approach relies on -//! passing boxed closures to the manager and getting back a boxed result. The manager is notified -//! about incoming invocation requests via an `EventFd` which is added to the epoll event set. -//! The signature of the closures as they are received is the `FnOnceBox` type alias defined -//! below. The actual return type is opaque to the manager, but known to the initiator. The manager -//! runs each closure to completion, and then returns the boxed result using a sender object that -//! is part of the initial message that also included the closure. - -use std::any::Any; -use std::os::unix::io::{AsRawFd, RawFd}; -use std::result; -use std::sync::mpsc::{channel, Receiver, Sender}; -use std::sync::Arc; - -use vmm_sys_util::eventfd::{EventFd, EFD_NONBLOCK}; - -use super::{Errno, Error, MutEventSubscriber, Result, SubscriberOps}; - -// The return type of the closure received by the manager is erased (by placing it into a -// `Box` in order to have a single concrete type definition for the messages that -// contain closures to be executed (the `FnMsg` defined below). The actual return type is -// recovered by the initiator of the remote call (more details in the implementation of -// `RemoteEndpoint::call_blocking` below). The `Send` bound is required to send back the boxed -// result over the return channel. -type ErasedResult = Box; - -// Type alias for the boxed closures received by the manager. The `Send` bound at the end applies -// to the type of the closure, and is required to send the box over the channel. -type FnOnceBox = Box) -> ErasedResult + Send>; - -// The type of the messages received by the manager over its receive mpsc channel. -pub(crate) struct FnMsg { - // The closure to execute. - pub(crate) fnbox: FnOnceBox, - // The sending endpoint of the channel used by the remote called to wait for the result. - pub(crate) sender: Option>, -} - -// Used by the `EventManager` to keep state associated with the channel. -pub(crate) struct EventManagerChannel { - // A clone of this is given to every `RemoteEndpoint` and used to signal the presence of - // an new message on the channel. - pub(crate) event_fd: Arc, - // A clone of this sender is given to every `RemoteEndpoint` and used to send `FnMsg` objects - // to the `EventManager` over the channel. - pub(crate) sender: Sender>, - // The receiving half of the channel, used to receive incoming `FnMsg` objects. - pub(crate) receiver: Receiver>, -} - -impl EventManagerChannel { - pub(crate) fn new() -> Result { - let (sender, receiver) = channel(); - Ok(EventManagerChannel { - event_fd: Arc::new( - EventFd::new(EFD_NONBLOCK).map_err(|e| Error::EventFd(Errno::from(e)))?, - ), - sender, - receiver, - }) - } - - pub(crate) fn fd(&self) -> RawFd { - self.event_fd.as_raw_fd() - } - - pub(crate) fn remote_endpoint(&self) -> RemoteEndpoint { - RemoteEndpoint { - msg_sender: self.sender.clone(), - event_fd: self.event_fd.clone(), - } - } -} - -/// Enables interactions with an `EventManager` that runs on a different thread of execution. -pub struct RemoteEndpoint { - // A sender associated with `EventManager` channel requests are sent over. - msg_sender: Sender>, - // Used to notify the `EventManager` about the arrival of a new request. - event_fd: Arc, -} - -impl Clone for RemoteEndpoint { - fn clone(&self) -> Self { - RemoteEndpoint { - msg_sender: self.msg_sender.clone(), - event_fd: self.event_fd.clone(), - } - } -} - -impl RemoteEndpoint { - // Send a message to the remote EventManger and raise a notification. - fn send(&self, msg: FnMsg) -> Result<()> { - self.msg_sender.send(msg).map_err(|_| Error::ChannelSend)?; - self.event_fd - .write(1) - .map_err(|e| Error::EventFd(Errno::from(e)))?; - Ok(()) - } - - /// Call the specified closure on the associated remote `EventManager` (provided as a - /// `SubscriberOps` trait object), and return the result. This method blocks until the result - /// is received, and calling it from the same thread where the event loop runs leads to - /// a deadlock. - pub fn call_blocking(&self, f: F) -> result::Result - where - F: FnOnce(&mut dyn SubscriberOps) -> result::Result + Send + 'static, - O: Send + 'static, - E: From + Send + 'static, - { - // Create a temporary channel used to get back the result. We keep the receiving end, - // and put the sending end into the message we pass to the remote `EventManager`. - let (sender, receiver) = channel(); - - // We erase the return type of `f` by moving and calling it inside another closure which - // hides the result as an `ErasedResult`. This allows using the same channel to send - // closures with different signatures (and thus different types) to the remote - // `EventManager`. - let fnbox = Box::new( - move |ops: &mut dyn SubscriberOps| -> ErasedResult { Box::new(f(ops)) }, - ); - - // Send the message requesting the closure invocation. - self.send(FnMsg { - fnbox, - sender: Some(sender), - })?; - - // Block until a response is received. We can use unwrap because the downcast cannot fail, - // since the signature of F (more specifically, the return value) constrains the concrete - // type that's in the box. - let result_box = receiver - .recv() - .map_err(|_| Error::ChannelRecv)? - .downcast() - .unwrap(); - - // Turns out the dereference operator has a special behaviour for boxed objects; if we - // own a `b: Box` and call `*b`, the box goes away and we get the `T` inside. - *result_box - } - - /// Call the specified closure on the associated local/remote `EventManager` (provided as a - /// `SubscriberOps` trait object), and discard the result. This method only fires - /// the request but does not wait for result, so it may be called from the same thread where - /// the event loop runs. - pub fn fire(&self, f: F) -> Result<()> - where - F: FnOnce(&mut dyn SubscriberOps) + Send + 'static, - { - // We erase the return type of `f` by moving and calling it inside another closure which - // hides the result as an `ErasedResult`. This allows using the same channel send closures - // with different signatures (and thus different types) to the remote `EventManager`. - let fnbox = Box::new( - move |ops: &mut dyn SubscriberOps| -> ErasedResult { - f(ops); - Box::new(()) - }, - ); - - // Send the message requesting the closure invocation. - self.send(FnMsg { - fnbox, - sender: None, - }) - } - - /// Kick the worker thread to wake up from the epoll event loop. - pub fn kick(&self) -> Result<()> { - self.event_fd - .write(1) - .map(|_| ()) - .map_err(|e| Error::EventFd(Errno::from(e))) - } -} diff --git a/src/epoll.rs b/src/epoll.rs deleted file mode 100644 index b174f8d..0000000 --- a/src/epoll.rs +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 OR BSD-3-Clause - -use std::collections::HashMap; -use std::os::unix::io::RawFd; - -use vmm_sys_util::epoll::{ControlOperation, Epoll, EpollEvent}; - -use super::{Errno, Error, EventOps, Result, SubscriberId}; - -// Internal use structure that keeps the epoll related state of an EventManager. -pub(crate) struct EpollWrapper { - // The epoll wrapper. - pub(crate) epoll: Epoll, - // Records the id of the subscriber associated with the given RawFd. The event event_manager - // does not currently support more than one subscriber being associated with an fd. - pub(crate) fd_dispatch: HashMap, - // Records the set of fds that are associated with the subscriber that has the given id. - // This is used to keep track of all fds associated with a subscriber. - pub(crate) subscriber_watch_list: HashMap>, - // A scratch buffer to avoid allocating/freeing memory on each poll iteration. - pub(crate) ready_events: Vec, -} - -impl EpollWrapper { - pub(crate) fn new(ready_events_capacity: usize) -> Result { - Ok(EpollWrapper { - epoll: Epoll::new().map_err(|e| Error::Epoll(Errno::from(e)))?, - fd_dispatch: HashMap::new(), - subscriber_watch_list: HashMap::new(), - ready_events: vec![EpollEvent::default(); ready_events_capacity], - }) - } - - // Poll the underlying epoll fd for pending IO events. - pub(crate) fn poll(&mut self, milliseconds: i32) -> Result { - let event_count = match self.epoll.wait(milliseconds, &mut self.ready_events[..]) { - Ok(ev) => ev, - // EINTR is not actually an error that needs to be handled. The documentation - // for epoll.run specifies that run exits when it for an event, on timeout, or - // on interrupt. - Err(e) if e.raw_os_error() == Some(libc::EINTR) => return Ok(0), - Err(e) => return Err(Error::Epoll(Errno::from(e))), - }; - - Ok(event_count) - } - - // Remove the fds associated with the provided subscriber id from the epoll set and the - // other structures. The subscriber id must be valid. - pub(crate) fn remove(&mut self, subscriber_id: SubscriberId) { - let fds = self - .subscriber_watch_list - .remove(&subscriber_id) - .unwrap_or_default(); - for fd in fds { - // We ignore the result of the operation since there's nothing we can't do, and its - // not a significant error condition at this point. - let _ = self - .epoll - .ctl(ControlOperation::Delete, fd, EpollEvent::default()); - self.remove_event(fd); - } - } - - // Flush and stop receiving IO events associated with the file descriptor. - pub(crate) fn remove_event(&mut self, fd: RawFd) { - self.fd_dispatch.remove(&fd); - for event in self.ready_events.iter_mut() { - if event.fd() == fd { - // It's a little complex to remove the entry from the Vec, so do soft removal - // by setting it to default value. - *event = EpollEvent::default(); - } - } - } - - // Gets the id of the subscriber associated with the provided fd (if such an association - // exists). - pub(crate) fn subscriber_id(&self, fd: RawFd) -> Option { - self.fd_dispatch.get(&fd).copied() - } - - // Creates and returns an EventOps object for the subscriber associated with the provided - // id. The subscriber id must be valid. - pub(crate) fn ops_unchecked(&mut self, subscriber_id: SubscriberId) -> EventOps { - EventOps::new(self, subscriber_id) - } -} diff --git a/src/events.rs b/src/events.rs deleted file mode 100644 index d8dd8c7..0000000 --- a/src/events.rs +++ /dev/null @@ -1,322 +0,0 @@ -// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 OR BSD-3-Clause - -use std::os::unix::io::{AsRawFd, RawFd}; - -use super::{Errno, Error, Result, SubscriberId}; -use crate::epoll::EpollWrapper; -use vmm_sys_util::epoll::{ControlOperation, EpollEvent, EventSet}; - -/// Wrapper over an `epoll::EpollEvent` object. -/// -/// When working directly with epoll related methods, the user associates an `u64` wide -/// epoll_data_t object with every event. We want to use fds as identifiers, but at the same time -/// keep the ability to associate opaque data with an event. An `Events` object always contains an -/// fd and an `u32` data member that can be supplied by the user. When registering events with the -/// inner epoll event set, the fd and data members of `Events` are used together to generate the -/// underlying `u64` member of the epoll_data union. -#[derive(Clone, Copy, Debug)] -pub struct Events { - inner: EpollEvent, -} - -impl PartialEq for Events { - fn eq(&self, other: &Events) -> bool { - self.fd() == other.fd() - && self.data() == other.data() - && self.event_set() == other.event_set() - } -} - -impl Events { - pub(crate) fn with_inner(inner: EpollEvent) -> Self { - Self { inner } - } - - /// Create an empty event set associated with `source`. - /// - /// No explicit events are monitored for the associated file descriptor. - /// Nevertheless, [`EventSet::ERROR`](struct.EventSet.html#associatedconstant.ERROR) and - /// [`EventSet::HANG_UP`](struct.EventSet.html#associatedconstant.HANG_UP) are implicitly - /// monitored. - /// - /// # Arguments - /// - /// * source: object that wraps a file descriptor to be associated with `events` - /// - /// # Example - /// - /// ```rust - /// # use event_manager::Events; - /// # use vmm_sys_util::eventfd::EventFd; - /// let eventfd = EventFd::new(0).unwrap(); - /// let ev_set = Events::empty(&eventfd); - /// ``` - pub fn empty(source: &T) -> Self { - Self::empty_raw(source.as_raw_fd()) - } - - /// Create an empty event set associated with the supplied `RawFd` value. - /// - /// No explicit events are monitored for the associated file descriptor. - /// Nevertheless, [`EventSet::ERROR`](struct.EventSet.html#associatedconstant.ERROR) and - /// [`EventSet::HANG_UP`](struct.EventSet.html#associatedconstant.HANG_UP) are implicitly - /// monitored. - /// - /// # Example - /// - /// ```rust - /// # use event_manager::Events; - /// # use std::os::unix::io::AsRawFd; - /// # use vmm_sys_util::eventfd::EventFd; - /// let eventfd = EventFd::new(0).unwrap(); - /// let ev_set = Events::empty_raw(eventfd.as_raw_fd()); - /// ``` - pub fn empty_raw(fd: RawFd) -> Self { - Self::new_raw(fd, EventSet::empty()) - } - - /// Create an event with `source` and the associated `events` for monitoring. - /// - /// # Arguments - /// - /// * source: object that wraps a file descriptor to be associated with `events` - /// * events: events to monitor on the provided `source`; - /// [`EventSet::ERROR`](struct.EventSet.html#associatedconstant.ERROR) and - /// [`EventSet::HANG_UP`](struct.EventSet.html#associatedconstant.HANG_UP) are - /// always monitored and don't need to be explicitly added to the list. - /// - /// # Example - /// - /// ```rust - /// # use event_manager::{Events, EventSet}; - /// # use vmm_sys_util::eventfd::EventFd; - /// let eventfd = EventFd::new(0).unwrap(); - /// let event_set = EventSet::IN; - /// let ev_set = Events::new(&eventfd, event_set); - /// ``` - pub fn new(source: &T, events: EventSet) -> Self { - Self::new_raw(source.as_raw_fd(), events) - } - - /// Create an event with the supplied `RawFd` value and `events` for monitoring. - /// - /// # Arguments - /// - /// * source: file descriptor on which to monitor the `events` - /// * events: events to monitor on the provided `source`; - /// [`EventSet::ERROR`](struct.EventSet.html#associatedconstant.ERROR) and - /// [`EventSet::HANG_UP`](struct.EventSet.html#associatedconstant.HANG_UP) are - /// always monitored and don't need to be explicitly added to the list. - /// # Example - /// - /// ```rust - /// # use event_manager::{Events, EventSet}; - /// # use vmm_sys_util::eventfd::EventFd; - /// # use std::os::unix::io::AsRawFd; - /// let eventfd = EventFd::new(0).unwrap(); - /// let event_set = EventSet::IN; - /// let ev_set = Events::new_raw(eventfd.as_raw_fd(), event_set); - /// ``` - pub fn new_raw(source: RawFd, events: EventSet) -> Self { - Self::with_data_raw(source, 0, events) - } - - /// Create an event set associated with the underlying file descriptor of the source, active - /// events, and data. - /// - /// # Arguments - /// * source: object that wraps a file descriptor to be associated with `events` - /// * data: custom user data associated with the file descriptor; the data can be used for - /// uniquely identify monitored events instead of using the file descriptor. - /// * events: events to monitor on the provided `source`; - /// [`EventSet::ERROR`](struct.EventSet.html#associatedconstant.ERROR) and - /// [`EventSet::HANG_UP`](struct.EventSet.html#associatedconstant.HANG_UP) are - /// always monitored and don't need to be explicitly added to the list. - /// - /// # Examples - /// - /// ```rust - /// # use event_manager::{Events, EventSet}; - /// # use vmm_sys_util::eventfd::EventFd; - /// let eventfd = EventFd::new(0).unwrap(); - /// let event_set = EventSet::IN; - /// let custom_data = 42; - /// let ev_set = Events::with_data(&eventfd, custom_data, event_set); - /// ``` - pub fn with_data(source: &T, data: u32, events: EventSet) -> Self { - Self::with_data_raw(source.as_raw_fd(), data, events) - } - - /// Create an event set associated with the supplied `RawFd` value, active events, and data. - /// - /// # Arguments - /// * source: file descriptor to be associated with `events` - /// * data: custom user data associated with the file descriptor; the data can be used for - /// uniquely identify monitored events instead of using the file descriptor. - /// * events: events to monitor on the provided `source`; - /// [`EventSet::ERROR`](struct.EventSet.html#associatedconstant.ERROR) and - /// [`EventSet::HANG_UP`](struct.EventSet.html#associatedconstant.HANG_UP) are - /// always monitored and don't need to be explicitly added to the list. - /// - /// # Examples - /// - /// ```rust - /// # use event_manager::{Events, EventSet}; - /// # use std::os::unix::io::AsRawFd; - /// # use vmm_sys_util::eventfd::EventFd; - /// let eventfd = EventFd::new(0).unwrap(); - /// let event_set = EventSet::IN; - /// let custom_data = 42; - /// let ev_set = Events::with_data_raw(eventfd.as_raw_fd(), custom_data, event_set); - /// ``` - pub fn with_data_raw(source: RawFd, data: u32, events: EventSet) -> Self { - let inner_data = ((data as u64) << 32) + (source as u64); - Events { - inner: EpollEvent::new(events, inner_data), - } - } - - /// Return the inner fd value. - pub fn fd(&self) -> RawFd { - self.inner.data() as RawFd - } - - /// Return the inner data value. - pub fn data(&self) -> u32 { - (self.inner.data() >> 32) as u32 - } - - /// Return the active event set. - pub fn event_set(&self) -> EventSet { - self.inner.event_set() - } - - /// Return the inner `EpollEvent`. - pub fn epoll_event(&self) -> EpollEvent { - self.inner - } -} - -/// Opaque object associated with an `EventSubscriber` that allows the addition, modification, and -/// removal of events in the watchlist. -// Right now this is a concrete object, but going further it can be turned into a trait and -// passed around as a trait object. -pub struct EventOps<'a> { - // Mutable reference to the EpollContext of an EventManager. - epoll_wrapper: &'a mut EpollWrapper, - // The id of the event subscriber this object stands for. - subscriber_id: SubscriberId, -} - -impl<'a> EventOps<'a> { - pub(crate) fn new(epoll_wrapper: &'a mut EpollWrapper, subscriber_id: SubscriberId) -> Self { - EventOps { - epoll_wrapper, - subscriber_id, - } - } - - // Apply the provided control operation for the given events on the inner epoll wrapper. - fn ctl(&self, op: ControlOperation, events: Events) -> Result<()> { - self.epoll_wrapper - .epoll - .ctl(op, events.fd(), events.epoll_event()) - .map_err(|e| Error::Epoll(Errno::from(e))) - } - - /// Add the provided events to the inner epoll event set. - pub fn add(&mut self, events: Events) -> Result<()> { - let fd = events.fd(); - if self.epoll_wrapper.fd_dispatch.contains_key(&fd) { - return Err(Error::FdAlreadyRegistered); - } - - self.ctl(ControlOperation::Add, events)?; - - self.epoll_wrapper - .fd_dispatch - .insert(fd, self.subscriber_id); - - self.epoll_wrapper - .subscriber_watch_list - .entry(self.subscriber_id) - .or_insert_with(Vec::new) - .push(fd); - - Ok(()) - } - - /// Submit the provided changes to the inner epoll event set. - pub fn modify(&self, events: Events) -> Result<()> { - self.ctl(ControlOperation::Modify, events) - } - - /// Remove the specified events from the inner epoll event set. - pub fn remove(&mut self, events: Events) -> Result<()> { - // TODO: Add some more checks here? - self.ctl(ControlOperation::Delete, events)?; - self.epoll_wrapper.remove_event(events.fd()); - - if let Some(watch_list) = self - .epoll_wrapper - .subscriber_watch_list - .get_mut(&self.subscriber_id) - { - if let Some(index) = watch_list.iter().position(|&x| x == events.fd()) { - watch_list.remove(index); - } - } - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - use vmm_sys_util::eventfd::EventFd; - - #[test] - fn test_empty_events() { - let event_fd = EventFd::new(0).unwrap(); - - let events_raw = Events::empty_raw(event_fd.as_raw_fd()); - let events = Events::empty(&event_fd); - - assert_eq!(events, events_raw); - - assert_eq!(events.event_set(), EventSet::empty()); - assert_eq!(events.data(), 0); - assert_eq!(events.fd(), event_fd.as_raw_fd()); - } - - #[test] - fn test_events_no_data() { - let event_fd = EventFd::new(0).unwrap(); - let event_set = EventSet::IN; - - let events_raw = Events::new_raw(event_fd.as_raw_fd(), event_set); - let events = Events::new(&event_fd, event_set); - - assert_eq!(events_raw, events); - - assert_eq!(events.data(), 0); - assert_eq!(events.fd(), event_fd.as_raw_fd()); - assert_eq!(events.event_set(), event_set); - } - - #[test] - fn test_events_data() { - let event_fd = EventFd::new(0).unwrap(); - let event_set = EventSet::IN; - - let events_raw = Events::with_data_raw(event_fd.as_raw_fd(), 42, event_set); - let events = Events::with_data(&event_fd, 43, event_set); - - assert_ne!(events_raw, events); - - assert_eq!(events.data(), 43); - assert_eq!(events_raw.data(), 42); - } -} diff --git a/src/lib.rs b/src/lib.rs index feaaf43..c729516 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,282 +1,299 @@ -// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 OR BSD-3-Clause +use std::collections::HashMap; +use std::os::unix::io::{AsRawFd, RawFd}; -//! Event Manager traits and implementation. -#![deny(missing_docs)] +use vmm_sys_util::epoll::EventSet; -use std::cell::RefCell; -use std::ops::{Deref, DerefMut}; -use std::rc::Rc; -use std::result; -use std::sync::{Arc, Mutex}; +/// The function thats runs when an event occurs. +type Action = Box; -use vmm_sys_util::errno::Error as Errno; - -/// The type of epoll events we can monitor a file descriptor for. -pub use vmm_sys_util::epoll::EventSet; - -mod epoll; -mod events; -mod manager; -mod subscribers; -#[doc(hidden)] -#[cfg(feature = "test_utilities")] -pub mod utilities; - -pub use events::{EventOps, Events}; -pub use manager::{EventManager, MAX_READY_EVENTS_CAPACITY}; - -#[cfg(feature = "remote_endpoint")] -mod endpoint; -#[cfg(feature = "remote_endpoint")] -pub use endpoint::RemoteEndpoint; - -/// Error conditions that may appear during `EventManager` related operations. -#[derive(Debug, Eq, PartialEq)] -pub enum Error { - #[cfg(feature = "remote_endpoint")] - /// Cannot send message on channel. - ChannelSend, - #[cfg(feature = "remote_endpoint")] - /// Cannot receive message on channel. - ChannelRecv, - #[cfg(feature = "remote_endpoint")] - /// Operation on `eventfd` failed. - EventFd(Errno), - /// Operation on `libc::epoll` failed. - Epoll(Errno), - // TODO: should we allow fds to be registered multiple times? - /// The fd is already associated with an existing subscriber. - FdAlreadyRegistered, - /// The Subscriber ID does not exist or is no longer associated with a Subscriber. - InvalidId, - /// The ready list capacity passed to `EventManager::new` is invalid. - InvalidCapacity, -} - -impl std::fmt::Display for Error { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match self { - #[cfg(feature = "remote_endpoint")] - Error::ChannelSend => write!( - f, - "event_manager: failed to send message to remote endpoint" - ), - #[cfg(feature = "remote_endpoint")] - Error::ChannelRecv => write!( - f, - "event_manager: failed to receive message from remote endpoint" - ), - #[cfg(feature = "remote_endpoint")] - Error::EventFd(e) => write!( - f, - "event_manager: failed to manage EventFd file descriptor: {}", - e - ), - Error::Epoll(e) => write!( - f, - "event_manager: failed to manage epoll file descriptor: {}", - e - ), - Error::FdAlreadyRegistered => write!( - f, - "event_manager: file descriptor has already been registered" - ), - Error::InvalidId => write!(f, "event_manager: invalid subscriber Id"), - Error::InvalidCapacity => write!(f, "event_manager: invalid ready_list capacity"), - } - } +fn errno() -> i32 { + // SAFETY: Always safe. + unsafe { *libc::__errno_location() } } -impl std::error::Error for Error { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - match self { - #[cfg(feature = "remote_endpoint")] - Error::ChannelSend => None, - #[cfg(feature = "remote_endpoint")] - Error::ChannelRecv => None, - #[cfg(feature = "remote_endpoint")] - Error::EventFd(e) => Some(e), - Error::Epoll(e) => Some(e), - Error::FdAlreadyRegistered => None, - Error::InvalidId => None, - Error::InvalidCapacity => None, - } - } +pub struct BufferedEventManager { + event_manager: EventManager, + // TODO The length is always unused, a custom type could thus save `size_of::()` bytes. + buffer: Vec, } -/// Generic result type that may return `EventManager` errors. -pub type Result = result::Result; - -/// Opaque object that uniquely represents a subscriber registered with an `EventManager`. -#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] -pub struct SubscriberId(u64); - -/// Allows the interaction between an `EventManager` and different event subscribers that do not -/// require a `&mut self` borrow to perform `init` and `process`. -/// -/// Any type implementing this also trivially implements `MutEventSubscriber`. The main role of -/// `EventSubscriber` is to allow wrappers such as `Arc` and `Rc` to implement `EventSubscriber` -/// themselves when the inner type is also an implementor. -pub trait EventSubscriber { - /// Process `events` triggered in the event manager loop. +impl BufferedEventManager { + /// Add an entry to the interest list of the epoll file descriptor. /// - /// Optionally, the subscriber can use `ops` to update the events it monitors. - fn process(&self, events: Events, ops: &mut EventOps); - - /// Initialization called by the [EventManager](struct.EventManager.html) when the subscriber - /// is registered. + /// # Errors /// - /// The subscriber is expected to use `ops` to register the events it wants to monitor. - fn init(&self, ops: &mut EventOps); -} + /// When [`libc::epoll_ctl`] returns `-1`. + pub fn add(&mut self, fd: T, events: EventSet, f: Action) -> Result<(), i32> { + let res = self.event_manager.add(fd, events, f); + self.buffer.reserve(self.event_manager.events.len()); + res + } -/// Allows the interaction between an `EventManager` and different event subscribers. Methods -/// are invoked with a mutable `self` borrow. -pub trait MutEventSubscriber { - /// Process `events` triggered in the event manager loop. + /// Remove (deregister) the target file descriptor fd from the interest list. /// - /// Optionally, the subscriber can use `ops` to update the events it monitors. - fn process(&mut self, events: Events, ops: &mut EventOps); - - /// Initialization called by the [EventManager](struct.EventManager.html) when the subscriber - /// is registered. + /// Returns `Ok(true)` when the given `fd` was present and `Ok(false)` when it wasn't. /// - /// The subscriber is expected to use `ops` to register the events it wants to monitor. - fn init(&mut self, ops: &mut EventOps); -} - -/// API that allows users to add, remove, and interact with registered subscribers. -pub trait SubscriberOps { - /// Subscriber type for which the operations apply. - type Subscriber: MutEventSubscriber; + /// # Errors + /// + /// When [`libc::epoll_ctl`] returns `-1`. + pub fn del(&mut self, fd: T) -> Result { + self.event_manager.del(fd) + } - /// Registers a new subscriber and returns the ID associated with it. + /// Waits until an event fires then triggers the respective action returning `Ok(x)`. If + /// timeout is `Some(_)` it may also return after the given number of milliseconds with + /// `Ok(0)`. /// - /// # Panics + /// # Errors /// - /// This function might panic if the subscriber is already registered. Whether a panic - /// is triggered depends on the implementation of - /// [Subscriber::init()](trait.EventSubscriber.html#tymethod.init). + /// When [`libc::epoll_wait`] returns `-1`. /// - /// Typically, in the `init` function, the subscriber adds fds to its interest list. The same - /// fd cannot be added twice and the `EventManager` will return - /// [Error::FdAlreadyRegistered](enum.Error.html). Using `unwrap` in init in this situation - /// triggers a panic. - fn add_subscriber(&mut self, subscriber: Self::Subscriber) -> SubscriberId; - - /// Removes the subscriber corresponding to `subscriber_id` from the watch list. - fn remove_subscriber(&mut self, subscriber_id: SubscriberId) -> Result; - - /// Returns a mutable reference to the subscriber corresponding to `subscriber_id`. - fn subscriber_mut(&mut self, subscriber_id: SubscriberId) -> Result<&mut Self::Subscriber>; - - /// Creates an event operations wrapper for the subscriber corresponding to `subscriber_id`. + /// # Panics /// - /// The event operations can be used to update the events monitored by the subscriber. - fn event_ops(&mut self, subscriber_id: SubscriberId) -> Result; -} - -impl EventSubscriber for Arc { - fn process(&self, events: Events, ops: &mut EventOps) { - self.deref().process(events, ops); - } - - fn init(&self, ops: &mut EventOps) { - self.deref().init(ops); - } -} - -impl MutEventSubscriber for Arc { - fn process(&mut self, events: Events, ops: &mut EventOps) { - self.deref().process(events, ops); - } - - fn init(&mut self, ops: &mut EventOps) { - self.deref().init(ops); + /// When the value given in timeout does not fit within an `i32` e.g. + /// `timeout.map(|u| i32::try_from(u).unwrap())`. + pub fn wait(&mut self, timeout: Option) -> Result { + unsafe { + self.buffer.set_len(self.buffer.capacity()); + } + self.event_manager.wait(timeout, &mut self.buffer) } -} -impl EventSubscriber for Rc { - fn process(&self, events: Events, ops: &mut EventOps) { - self.deref().process(events, ops); + /// Creates new event manager. + /// + /// # Errors + /// + /// When [`libc::epoll_create1`] returns `-1`. + pub fn new(close_exec: bool) -> Result { + Ok(BufferedEventManager { + event_manager: EventManager::new(close_exec)?, + buffer: Vec::new(), + }) } - - fn init(&self, ops: &mut EventOps) { - self.deref().init(ops); + pub fn with_capacity(close_exec: bool, capacity: usize) -> Result { + Ok(BufferedEventManager { + event_manager: EventManager::new(close_exec)?, + buffer: Vec::with_capacity(capacity), + }) } } -impl MutEventSubscriber for Rc { - fn process(&mut self, events: Events, ops: &mut EventOps) { - self.deref().process(events, ops); - } - - fn init(&mut self, ops: &mut EventOps) { - self.deref().init(ops); +impl Default for BufferedEventManager { + fn default() -> Self { + Self::new(false).unwrap() } } -impl EventSubscriber for RefCell { - fn process(&self, events: Events, ops: &mut EventOps) { - self.borrow_mut().process(events, ops); - } - - fn init(&self, ops: &mut EventOps) { - self.borrow_mut().init(ops); - } -} - -impl MutEventSubscriber for RefCell { - fn process(&mut self, events: Events, ops: &mut EventOps) { - self.borrow_mut().process(events, ops); - } - - fn init(&mut self, ops: &mut EventOps) { - self.borrow_mut().init(ops); - } +pub struct EventManager { + epfd: RawFd, + events: HashMap, } -impl EventSubscriber for Mutex { - fn process(&self, events: Events, ops: &mut EventOps) { - self.lock().unwrap().process(events, ops); +impl EventManager { + /// Add an entry to the interest list of the epoll file descriptor. + /// + /// # Errors + /// + /// When [`libc::epoll_ctl`] returns `-1`. + pub fn add(&mut self, fd: T, events: EventSet, f: Action) -> Result<(), i32> { + let mut event = libc::epoll_event { + events: events.bits(), + r#u64: u64::try_from(fd.as_raw_fd()).unwrap(), + }; + // SAFETY: Safe when `fd` is a valid file descriptor. + match unsafe { libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_ADD, fd.as_raw_fd(), &mut event) } + { + 0 => { + self.events.insert(fd.as_raw_fd(), f); + Ok(()) + } + -1 => Err(errno()), + _ => unreachable!(), + } } - fn init(&self, ops: &mut EventOps) { - self.lock().unwrap().init(ops); + /// Remove (deregister) the target file descriptor fd from the interest list. + /// + /// Returns `Ok(true)` when the given `fd` was present and `Ok(false)` when it wasn't. + /// + /// # Errors + /// + /// When [`libc::epoll_ctl`] returns `-1`. + pub fn del(&mut self, fd: T) -> Result { + match self.events.remove(&fd.as_raw_fd()) { + Some(_) => { + // SAFETY: Safe when `fd` is a valid file descriptor. + match unsafe { + libc::epoll_ctl( + self.epfd, + libc::EPOLL_CTL_DEL, + fd.as_raw_fd(), + std::ptr::null_mut(), + ) + } { + 0 => Ok(true), + -1 => Err(errno()), + _ => unreachable!(), + } + } + None => Ok(false), + } } -} -impl MutEventSubscriber for Mutex { - fn process(&mut self, events: Events, ops: &mut EventOps) { - // If another user of this mutex panicked while holding the mutex, then - // we terminate the process. - self.get_mut().unwrap().process(events, ops); + /// Waits until an event fires then triggers the respective action returning `Ok(x)`. If + /// timeout is `Some(_)` it may also return after the given number of milliseconds with + /// `Ok(0)`. + /// + /// # Errors + /// + /// When [`libc::epoll_wait`] returns `-1`. + /// + /// # Panics + /// + /// When the value given in timeout does not fit within an `i32` e.g. + /// `timeout.map(|u| i32::try_from(u).unwrap())`. + pub fn wait( + &mut self, + timeout: Option, + buffer: &mut [libc::epoll_event], + ) -> Result { + // SAFETY: Always safe. + match unsafe { + libc::epoll_wait( + self.epfd, + buffer.as_mut_ptr(), + buffer.len().try_into().unwrap(), + timeout.map_or(-1i32, |u| i32::try_from(u).unwrap()), + ) + } { + -1 => Err(errno()), + // SAFETY: `x` elements are initialized by `libc::epoll_wait`. + n @ 0.. => unsafe { + for i in 0..usize::try_from(n).unwrap_unchecked() { + let event = buffer[i]; + // For all events which can fire there exists an entry within `self.events` thus + // it is safe to unwrap here. + let f: *const dyn Fn(&mut EventManager, EventSet) = self + .events + .get(&i32::try_from(event.u64).unwrap_unchecked()) + .unwrap_unchecked(); + (*f)(self, EventSet::from_bits_unchecked(event.events)); + } + Ok(n) + }, + _ => unreachable!(), + } } - fn init(&mut self, ops: &mut EventOps) { - // If another user of this mutex panicked while holding the mutex, then - // we terminate the process. - self.get_mut().unwrap().init(ops); + /// Creates new event manager. + /// + /// # Errors + /// + /// When [`libc::epoll_create1`] returns `-1`. + pub fn new(close_exec: bool) -> Result { + // SAFETY: Always safe. + match unsafe { libc::epoll_create1(if close_exec { libc::EPOLL_CLOEXEC } else { 0 }) } { + -1 => Err(errno()), + epfd => Ok(Self { + epfd, + events: HashMap::new(), + }), + } } } -impl EventSubscriber for Box { - fn process(&self, events: Events, ops: &mut EventOps) { - self.deref().process(events, ops); - } - - fn init(&self, ops: &mut EventOps) { - self.deref().init(ops); +impl Default for EventManager { + fn default() -> Self { + Self::new(false).unwrap() } } -impl MutEventSubscriber for Box { - fn process(&mut self, events: Events, ops: &mut EventOps) { - self.deref_mut().process(events, ops); +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::{AtomicBool, Ordering}; + + #[test] + fn delete() { + static COUNT: AtomicBool = AtomicBool::new(false); + let mut manager = BufferedEventManager::default(); + // We set value to 1 so it will trigger on a read event. + // SAFETY: Always safe. + let event_fd = unsafe { + let fd = libc::eventfd(1, 0); + assert_ne!(fd, -1); + fd + }; + manager + .add( + event_fd, + EventSet::IN, + // A closure which will flip the atomic boolean then remove the event fd from the + // interest list. + Box::new(move |x: &mut EventManager, _: EventSet| { + // Flips the atomic. + let cur = COUNT.load(Ordering::SeqCst); + COUNT.store(!cur, Ordering::SeqCst); + // Calls `EventManager::del` which removes the target file descriptor fd from + // the interest list of the inner epoll. + x.del(event_fd).unwrap(); + }), + ) + .unwrap(); + + // Assert the initial state of the atomic boolean. + assert!(!COUNT.load(Ordering::SeqCst)); + + // The file descriptor has been pre-armed, this will immediately call the respective + // closure. + assert_eq!(manager.wait(Some(10)), Ok(1)); + // As the closure will flip the atomic boolean we assert it has flipped correctly. + assert!(COUNT.load(Ordering::SeqCst)); + + // At this point we have called the closure, since the closure removes the event fd from the + // interest list of the inner epoll, calling this again should timeout as there are no event + // fd in the inner epolls interest list which could trigger. + assert_eq!(manager.wait(Some(10)), Ok(0)); + // As the `EventManager::wait` should timeout the value of the atomic boolean should not be + // flipped. + assert!(COUNT.load(Ordering::SeqCst)); } - fn init(&mut self, ops: &mut EventOps) { - self.deref_mut().init(ops); + #[test] + fn flip() { + static COUNT: AtomicBool = AtomicBool::new(false); + let mut manager = BufferedEventManager::default(); + // We set value to 1 so it will trigger on a read event. + // SAFETY: Always safe. + let event_fd = unsafe { + let fd = libc::eventfd(1, 0); + assert_ne!(fd, -1); + fd + }; + manager + .add( + event_fd, + EventSet::IN, + Box::new(|_: &mut EventManager, _: EventSet| { + // Flips the atomic. + let cur = COUNT.load(Ordering::SeqCst); + COUNT.store(!cur, Ordering::SeqCst); + }), + ) + .unwrap(); + + // Assert the initial state of the atomic boolean. + assert!(!COUNT.load(Ordering::SeqCst)); + + // As the closure will flip the atomic boolean we assert it has flipped correctly. + assert_eq!(manager.wait(Some(10)), Ok(1)); + // As the closure will flip the atomic boolean we assert it has flipped correctly. + assert!(COUNT.load(Ordering::SeqCst)); + + // The file descriptor has been pre-armed, this will immediately call the respective + // closure. + assert_eq!(manager.wait(Some(10)), Ok(1)); + // As the closure will flip the atomic boolean we assert it has flipped correctly. + assert!(!COUNT.load(Ordering::SeqCst)); } } diff --git a/src/manager.rs b/src/manager.rs deleted file mode 100644 index 99bab03..0000000 --- a/src/manager.rs +++ /dev/null @@ -1,483 +0,0 @@ -// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 OR BSD-3-Clause - -use std::mem::size_of; - -use vmm_sys_util::epoll::EpollEvent; -#[cfg(feature = "remote_endpoint")] -use vmm_sys_util::epoll::{ControlOperation, EventSet}; - -#[cfg(feature = "remote_endpoint")] -use super::endpoint::{EventManagerChannel, RemoteEndpoint}; -use super::epoll::EpollWrapper; -use super::subscribers::Subscribers; -#[cfg(feature = "remote_endpoint")] -use super::Errno; -use super::{Error, EventOps, Events, MutEventSubscriber, Result, SubscriberId, SubscriberOps}; - -/// Allows event subscribers to be registered, connected to the event loop, and later removed. -pub struct EventManager { - subscribers: Subscribers, - epoll_context: EpollWrapper, - - #[cfg(feature = "remote_endpoint")] - channel: EventManagerChannel, -} - -/// Maximum capacity of ready events that can be passed when initializing the `EventManager`. -// This constant is not defined inside the `EventManager` implementation because it would -// make it really weird to use as the `EventManager` uses generics (S: MutEventSubscriber). -// That means that when using this const, you could not write -// EventManager::MAX_READY_EVENTS_CAPACITY because the type `S` could not be inferred. -// -// This value is taken from: https://elixir.bootlin.com/linux/latest/source/fs/eventpoll.c#L101 -pub const MAX_READY_EVENTS_CAPACITY: usize = i32::MAX as usize / size_of::(); - -impl SubscriberOps for EventManager { - type Subscriber = T; - - /// Register a subscriber with the event event_manager and returns the associated ID. - fn add_subscriber(&mut self, subscriber: T) -> SubscriberId { - let subscriber_id = self.subscribers.add(subscriber); - self.subscribers - .get_mut_unchecked(subscriber_id) - // The index is valid because we've just added the subscriber. - .init(&mut self.epoll_context.ops_unchecked(subscriber_id)); - subscriber_id - } - - /// Unregisters and returns the subscriber associated with the provided ID. - fn remove_subscriber(&mut self, subscriber_id: SubscriberId) -> Result { - let subscriber = self - .subscribers - .remove(subscriber_id) - .ok_or(Error::InvalidId)?; - self.epoll_context.remove(subscriber_id); - Ok(subscriber) - } - - /// Return a mutable reference to the subscriber associated with the provided id. - fn subscriber_mut(&mut self, subscriber_id: SubscriberId) -> Result<&mut T> { - if self.subscribers.contains(subscriber_id) { - return Ok(self.subscribers.get_mut_unchecked(subscriber_id)); - } - Err(Error::InvalidId) - } - - /// Returns a `EventOps` object for the subscriber associated with the provided ID. - fn event_ops(&mut self, subscriber_id: SubscriberId) -> Result { - // Check if the subscriber_id is valid. - if self.subscribers.contains(subscriber_id) { - // The index is valid because the result of `find` was not `None`. - return Ok(self.epoll_context.ops_unchecked(subscriber_id)); - } - Err(Error::InvalidId) - } -} - -impl EventManager { - const DEFAULT_READY_EVENTS_CAPACITY: usize = 256; - - /// Create a new `EventManger` object. - pub fn new() -> Result { - Self::new_with_capacity(Self::DEFAULT_READY_EVENTS_CAPACITY) - } - - /// Creates a new `EventManger` object with specified event capacity. - /// - /// # Arguments - /// - /// * `ready_events_capacity`: maximum number of ready events to be - /// processed a single `run`. The maximum value of this - /// parameter is `EventManager::MAX_READY_EVENTS_CAPACITY`. - pub fn new_with_capacity(ready_events_capacity: usize) -> Result { - if ready_events_capacity > MAX_READY_EVENTS_CAPACITY { - return Err(Error::InvalidCapacity); - } - - let manager = EventManager { - subscribers: Subscribers::new(), - epoll_context: EpollWrapper::new(ready_events_capacity)?, - #[cfg(feature = "remote_endpoint")] - channel: EventManagerChannel::new()?, - }; - - #[cfg(feature = "remote_endpoint")] - manager - .epoll_context - .epoll - .ctl( - ControlOperation::Add, - manager.channel.fd(), - EpollEvent::new(EventSet::IN, manager.channel.fd() as u64), - ) - .map_err(|e| Error::Epoll(Errno::from(e)))?; - Ok(manager) - } - - /// Run the event loop blocking until events are triggered or an error is returned. - /// Calls [subscriber.process()](trait.EventSubscriber.html#tymethod.process) for each event. - /// - /// On success, it returns number of dispatched events or 0 when interrupted by a signal. - pub fn run(&mut self) -> Result { - self.run_with_timeout(-1) - } - - /// Wait for events for a maximum timeout of `miliseconds` or until an error is returned. - /// Calls [subscriber.process()](trait.EventSubscriber.html#tymethod.process) for each event. - /// - /// On success, it returns number of dispatched events or 0 when interrupted by a signal. - pub fn run_with_timeout(&mut self, milliseconds: i32) -> Result { - let event_count = self.epoll_context.poll(milliseconds)?; - self.dispatch_events(event_count); - - Ok(event_count) - } - - fn dispatch_events(&mut self, event_count: usize) { - // EpollEvent doesn't implement Eq or PartialEq, so... - let default_event: EpollEvent = EpollEvent::default(); - - // Used to record whether there's an endpoint event that needs to be handled. - #[cfg(feature = "remote_endpoint")] - let mut endpoint_event = None; - - for ev_index in 0..event_count { - let event = self.epoll_context.ready_events[ev_index]; - let fd = event.fd(); - - // Check whether this event has been discarded. - // EpollWrapper::remove_event() discards an IO event by setting it to default value. - if event.events() == default_event.events() && fd == default_event.fd() { - continue; - } - - if let Some(subscriber_id) = self.epoll_context.subscriber_id(fd) { - self.subscribers.get_mut_unchecked(subscriber_id).process( - Events::with_inner(event), - // The `subscriber_id` is valid because we checked it before. - &mut self.epoll_context.ops_unchecked(subscriber_id), - ); - } else { - #[cfg(feature = "remote_endpoint")] - { - // If we got here, there's a chance the event was triggered by the remote - // endpoint fd. Only check for incoming endpoint events right now, and defer - // actually handling them until all subscriber events have been handled first. - // This prevents subscribers whose events are about to be handled from being - // removed by an endpoint request (or other similar situations). - if fd == self.channel.fd() { - endpoint_event = Some(event); - continue; - } - } - - // This should not occur during normal operation. - unreachable!("Received event on fd from subscriber that is not registered"); - } - } - - #[cfg(feature = "remote_endpoint")] - self.dispatch_endpoint_event(endpoint_event); - } -} - -#[cfg(feature = "remote_endpoint")] -impl EventManager { - /// Return a `RemoteEndpoint` object, that allows interacting with the `EventManager` from a - /// different thread. Using `RemoteEndpoint::call_blocking` on the same thread the event loop - /// runs on leads to a deadlock. - pub fn remote_endpoint(&self) -> RemoteEndpoint { - self.channel.remote_endpoint() - } - - fn dispatch_endpoint_event(&mut self, endpoint_event: Option) { - if let Some(event) = endpoint_event { - if event.event_set() != EventSet::IN { - // This situation is virtually impossible to occur. If it does we have - // a programming error in our code. - unreachable!(); - } - self.handle_endpoint_calls(); - } - } - - fn handle_endpoint_calls(&mut self) { - // Clear the inner event_fd. We don't do anything about an error here at this point. - let _ = self.channel.event_fd.read(); - - // Process messages. We consider only `Empty` errors can appear here; we don't check - // for `Disconnected` errors because we keep at least one clone of `channel.sender` alive - // at all times ourselves. - while let Ok(msg) = self.channel.receiver.try_recv() { - match msg.sender { - Some(sender) => { - // We call the inner closure and attempt to send back the result, but can't really do - // anything in case of error here. - let _ = sender.send((msg.fnbox)(self)); - } - None => { - // Just call the function and discard the result. - let _ = (msg.fnbox)(self); - } - } - } - } -} - -#[cfg(test)] -mod tests { - use super::super::Error; - use super::*; - - use std::os::unix::io::{AsRawFd, RawFd}; - use std::sync::{Arc, Mutex}; - - use vmm_sys_util::{epoll::EventSet, eventfd::EventFd}; - - struct DummySubscriber { - event_fd_1: EventFd, - event_fd_2: EventFd, - - // Flags used for checking that the event event_manager called the `process` - // function for ev1/ev2. - processed_ev1_out: bool, - processed_ev2_out: bool, - processed_ev1_in: bool, - - // Flags used for driving register/unregister/modify of events from - // outside of the `process` function. - register_ev2: bool, - unregister_ev1: bool, - modify_ev1: bool, - } - - impl DummySubscriber { - fn new() -> Self { - DummySubscriber { - event_fd_1: EventFd::new(0).unwrap(), - event_fd_2: EventFd::new(0).unwrap(), - processed_ev1_out: false, - processed_ev2_out: false, - processed_ev1_in: false, - register_ev2: false, - unregister_ev1: false, - modify_ev1: false, - } - } - } - - impl DummySubscriber { - fn register_ev2(&mut self) { - self.register_ev2 = true; - } - - fn unregister_ev1(&mut self) { - self.unregister_ev1 = true; - } - - fn modify_ev1(&mut self) { - self.modify_ev1 = true; - } - - fn processed_ev1_out(&self) -> bool { - self.processed_ev1_out - } - - fn processed_ev2_out(&self) -> bool { - self.processed_ev2_out - } - - fn processed_ev1_in(&self) -> bool { - self.processed_ev1_in - } - - fn reset_state(&mut self) { - self.processed_ev1_out = false; - self.processed_ev2_out = false; - self.processed_ev1_in = false; - } - - fn handle_updates(&mut self, event_manager: &mut EventOps) { - if self.register_ev2 { - event_manager - .add(Events::new(&self.event_fd_2, EventSet::OUT)) - .unwrap(); - self.register_ev2 = false; - } - - if self.unregister_ev1 { - event_manager - .remove(Events::new_raw( - self.event_fd_1.as_raw_fd(), - EventSet::empty(), - )) - .unwrap(); - self.unregister_ev1 = false; - } - - if self.modify_ev1 { - event_manager - .modify(Events::new(&self.event_fd_1, EventSet::IN)) - .unwrap(); - self.modify_ev1 = false; - } - } - - fn handle_in(&mut self, source: RawFd) { - if self.event_fd_1.as_raw_fd() == source { - self.processed_ev1_in = true; - } - } - - fn handle_out(&mut self, source: RawFd) { - match source { - _ if self.event_fd_1.as_raw_fd() == source => { - self.processed_ev1_out = true; - } - _ if self.event_fd_2.as_raw_fd() == source => { - self.processed_ev2_out = true; - } - _ => {} - } - } - } - - impl MutEventSubscriber for DummySubscriber { - fn process(&mut self, events: Events, ops: &mut EventOps) { - let source = events.fd(); - let event_set = events.event_set(); - - // We only know how to treat EPOLLOUT and EPOLLIN. - // If we received anything else just stop processing the event. - let all_but_in_out = EventSet::all() - EventSet::OUT - EventSet::IN; - if event_set.intersects(all_but_in_out) { - return; - } - - self.handle_updates(ops); - - match event_set { - EventSet::IN => self.handle_in(source), - EventSet::OUT => self.handle_out(source), - _ => {} - } - } - - fn init(&mut self, ops: &mut EventOps) { - let event = Events::new(&self.event_fd_1, EventSet::OUT); - ops.add(event).unwrap(); - } - } - - #[test] - fn test_register() { - use super::SubscriberOps; - - let mut event_manager = EventManager::>>::new().unwrap(); - let dummy_subscriber = Arc::new(Mutex::new(DummySubscriber::new())); - - event_manager.add_subscriber(dummy_subscriber.clone()); - - dummy_subscriber.lock().unwrap().register_ev2(); - - // When running the loop the first time, ev1 should be processed, but ev2 shouldn't - // because it was just added as part of processing ev1. - event_manager.run().unwrap(); - assert!(dummy_subscriber.lock().unwrap().processed_ev1_out()); - assert!(!dummy_subscriber.lock().unwrap().processed_ev2_out()); - - // Check that both ev1 and ev2 are processed. - dummy_subscriber.lock().unwrap().reset_state(); - event_manager.run().unwrap(); - assert!(dummy_subscriber.lock().unwrap().processed_ev1_out()); - assert!(dummy_subscriber.lock().unwrap().processed_ev2_out()); - } - - #[test] - #[should_panic(expected = "FdAlreadyRegistered")] - fn test_add_invalid_subscriber() { - let mut event_manager = EventManager::>>::new().unwrap(); - let subscriber = Arc::new(Mutex::new(DummySubscriber::new())); - - event_manager.add_subscriber(subscriber.clone()); - event_manager.add_subscriber(subscriber); - } - - // Test that unregistering an event while processing another one works. - #[test] - fn test_unregister() { - let mut event_manager = EventManager::>>::new().unwrap(); - let dummy_subscriber = Arc::new(Mutex::new(DummySubscriber::new())); - - event_manager.add_subscriber(dummy_subscriber.clone()); - - // Disable ev1. We should only receive this event once. - dummy_subscriber.lock().unwrap().unregister_ev1(); - - event_manager.run().unwrap(); - assert!(dummy_subscriber.lock().unwrap().processed_ev1_out()); - - dummy_subscriber.lock().unwrap().reset_state(); - - // We expect no events to be available. Let's run with timeout so that run exists. - event_manager.run_with_timeout(100).unwrap(); - assert!(!dummy_subscriber.lock().unwrap().processed_ev1_out()); - } - - #[test] - fn test_modify() { - let mut event_manager = EventManager::>>::new().unwrap(); - let dummy_subscriber = Arc::new(Mutex::new(DummySubscriber::new())); - - event_manager.add_subscriber(dummy_subscriber.clone()); - - // Modify ev1 so that it waits for EPOLL_IN. - dummy_subscriber.lock().unwrap().modify_ev1(); - event_manager.run().unwrap(); - assert!(dummy_subscriber.lock().unwrap().processed_ev1_out()); - assert!(!dummy_subscriber.lock().unwrap().processed_ev2_out()); - - dummy_subscriber.lock().unwrap().reset_state(); - - // Make sure ev1 is ready for IN so that we don't loop forever. - dummy_subscriber - .lock() - .unwrap() - .event_fd_1 - .write(1) - .unwrap(); - - event_manager.run().unwrap(); - assert!(!dummy_subscriber.lock().unwrap().processed_ev1_out()); - assert!(!dummy_subscriber.lock().unwrap().processed_ev2_out()); - assert!(dummy_subscriber.lock().unwrap().processed_ev1_in()); - } - - #[test] - fn test_remove_subscriber() { - let mut event_manager = EventManager::>>::new().unwrap(); - let dummy_subscriber = Arc::new(Mutex::new(DummySubscriber::new())); - - let subscriber_id = event_manager.add_subscriber(dummy_subscriber.clone()); - event_manager.run().unwrap(); - assert!(dummy_subscriber.lock().unwrap().processed_ev1_out()); - - dummy_subscriber.lock().unwrap().reset_state(); - - event_manager.remove_subscriber(subscriber_id).unwrap(); - - // We expect no events to be available. Let's run with timeout so that run exits. - event_manager.run_with_timeout(100).unwrap(); - assert!(!dummy_subscriber.lock().unwrap().processed_ev1_out()); - - // Removing the subscriber twice should return an error. - assert_eq!( - event_manager - .remove_subscriber(subscriber_id) - .err() - .unwrap(), - Error::InvalidId - ); - } -} diff --git a/src/subscribers.rs b/src/subscribers.rs deleted file mode 100644 index fb88aa1..0000000 --- a/src/subscribers.rs +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 OR BSD-3-Clause - -use super::SubscriberId; -use std::collections::HashMap; - -// Internal structure used to keep the set of subscribers registered with an EventManger. -// This structure is a thin wrapper over a `HashMap` in which the keys are uniquely -// generated when calling `add`. -pub(crate) struct Subscribers { - // The key is the unique id of the subscriber and the entry is the `Subscriber`. - subscribers: HashMap, - // We are generating the unique ids by incrementing this counter for each added subscriber, - // and rely on the large value range of u64 to ensure each value is effectively - // unique over the runtime of any VMM. - next_id: u64, -} - -impl Subscribers { - pub(crate) fn new() -> Self { - Subscribers { - subscribers: HashMap::new(), - next_id: 1, - } - } - - // Adds a subscriber and generates an unique id to represent it. - pub(crate) fn add(&mut self, subscriber: T) -> SubscriberId { - let id = SubscriberId(self.next_id); - self.next_id += 1; - - self.subscribers.insert(id, subscriber); - - id - } - - // Remove and return the subscriber associated with the given id, if it exists. - pub(crate) fn remove(&mut self, subscriber_id: SubscriberId) -> Option { - self.subscribers.remove(&subscriber_id) - } - - // Checks whether a subscriber with `subscriber_id` is registered. - pub(crate) fn contains(&mut self, subscriber_id: SubscriberId) -> bool { - self.subscribers.contains_key(&subscriber_id) - } - - // Return a mutable reference to the subriber represented by `subscriber_id`. - // - // This method should only be called for indices that are known to be valid, otherwise - // panics can occur. - pub(crate) fn get_mut_unchecked(&mut self, subscriber_id: SubscriberId) -> &mut T { - self.subscribers.get_mut(&subscriber_id).unwrap() - } -} diff --git a/src/utilities/mod.rs b/src/utilities/mod.rs deleted file mode 100644 index c2d53d1..0000000 --- a/src/utilities/mod.rs +++ /dev/null @@ -1,18 +0,0 @@ -// Helper module for tests utilities. -// -// This module is only compiled with `test_utilities` feature on purpose. -// For production code, we do not want to export this functionality. -// At the same time, we need the test utilities to be public such that they can -// be used by multiple categories of tests. Two examples that deem this module -// necessary are the benchmark tests and the integration tests where the implementations -// of subscribers are shared. -// -// Having this module as a feature comes with a disadvantage that needs to be kept in mind. -// `cargo test` will only work when ran with `--feature test-utilities` (or with --all-features). A -// much nicer way to implement this would've been with a utilities crate that is used as -// `dev-dependencies`. Unfortunately, this is not possible because it would introduce a cyclic -// dependency. The `utilities` module has a dependency on `event-manager` because it needs to -// implement the `EventSubscriber` traits, and `event-manager` has a dependency on utilities so -// that they can be used in tests. -#![doc(hidden)] -pub mod subscribers; diff --git a/src/utilities/subscribers.rs b/src/utilities/subscribers.rs deleted file mode 100644 index 95ad790..0000000 --- a/src/utilities/subscribers.rs +++ /dev/null @@ -1,328 +0,0 @@ -// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 OR BSD-3-Clause -// -// Cargo thinks that some of the methods in this module are not used because -// not all of them are used in all the separate integration test modules. -// Cargo bug: https://github.com/rust-lang/rust/issues/46379 -// Let's allow dead code so that we don't get warnings all the time. -/// This module defines common subscribers that showcase the usage of the event-manager. -/// -/// 1. CounterSubscriber: -/// - a dummy subscriber that increments a counter on event -/// - only uses one event -/// - it has to be explicitly mutated; for this reason it implements `MutEventSubscriber`. -/// -/// 2. CounterSubscriberWithData: -/// - a dummy subscriber that increments a counter on events -/// - this subscriber takes care of multiple events and makes use of `Events::with_data` so -/// that in the `process` function it identifies the trigger of an event using the data -/// instead of the file descriptor -/// - it has to be explicitly mutated; for this reason it implements `MutEventSubscriber`. -/// -/// 3. CounterInnerMutSubscriber: -/// - a dummy subscriber that increments a counter on events -/// - the subscriber makes use of inner mutability; multi-threaded applications might want to -/// use inner mutability instead of having something heavy weight (i.e. Arc). -/// - this subscriber implement `EventSubscriber`. -use std::fmt::{Display, Formatter, Result}; -use std::os::unix::io::AsRawFd; -use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering; - -use vmm_sys_util::{epoll::EventSet, eventfd::EventFd}; - -use crate::{EventOps, EventSubscriber, Events, MutEventSubscriber}; - -/// A `Counter` is a helper structure for creating subscribers that increment a value -/// each time an event is triggered. -/// The `Counter` allows users to assert and de-assert an event, and to query the counter value. -pub struct Counter { - event_fd: EventFd, - counter: u64, -} - -impl Display for Counter { - fn fmt(&self, f: &mut Formatter<'_>) -> Result { - write!( - f, - "(event_fd = {}, counter = {})", - self.event_fd.as_raw_fd(), - self.counter - ) - } -} - -impl Counter { - pub fn new() -> Self { - Self { - event_fd: EventFd::new(0).unwrap(), - counter: 0, - } - } - - pub fn trigger_event(&mut self) { - let _ = self.event_fd.write(1); - } - - pub fn clear_event(&self) { - let _ = self.event_fd.read(); - } - - pub fn counter(&self) -> u64 { - self.counter - } -} - -impl Default for Counter { - fn default() -> Self { - Self::new() - } -} - -// A dummy subscriber that increments a counter whenever it processes -// a new request. -pub struct CounterSubscriber(Counter); - -impl std::ops::Deref for CounterSubscriber { - type Target = Counter; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl std::ops::DerefMut for CounterSubscriber { - fn deref_mut(&mut self) -> &mut Counter { - &mut self.0 - } -} - -impl Default for CounterSubscriber { - fn default() -> Self { - Self(Counter::new()) - } -} - -impl MutEventSubscriber for CounterSubscriber { - fn process(&mut self, events: Events, event_ops: &mut EventOps) { - match events.event_set() { - EventSet::IN => { - self.counter += 1; - } - EventSet::ERROR => { - eprintln!("Got error on the monitored event."); - } - EventSet::HANG_UP => { - event_ops - .remove(events) - .expect("Encountered error during cleanup."); - panic!("Cannot continue execution. Associated fd was closed."); - } - _ => { - eprintln!( - "Received spurious event from the event manager {:#?}.", - events.event_set() - ); - } - } - } - - fn init(&mut self, ops: &mut EventOps) { - ops.add(Events::new(&self.event_fd, EventSet::IN)) - .expect("Cannot register event."); - } -} - -// A dummy subscriber that makes use of the optional data in `Events` when -// registering & processing events. -// Using 3 counters each having associated event data to showcase the implementation of -// EventSubscriber trait with this scenario. -pub struct CounterSubscriberWithData { - counter_1: Counter, - counter_2: Counter, - counter_3: Counter, - - first_data: u32, - toggle_registry: bool, -} - -impl CounterSubscriberWithData { - // `first_data` represents the first event data that can be used by this subscriber. - pub fn new(first_data: u32) -> Self { - Self { - counter_1: Counter::new(), - counter_2: Counter::new(), - counter_3: Counter::new(), - // Using consecutive numbers for the event data helps the compiler to optimize - // match statements on counter_1_data, counter_2_data, counter_3_data using - // a jump table. - first_data, - toggle_registry: false, - } - } - - pub fn trigger_all_counters(&mut self) { - self.counter_1.trigger_event(); - self.counter_2.trigger_event(); - self.counter_3.trigger_event(); - } - - pub fn get_all_counter_values(&self) -> Vec { - vec![ - self.counter_1.counter(), - self.counter_2.counter(), - self.counter_3.counter(), - ] - } - - pub fn set_toggle_registry(&mut self, toggle: bool) { - self.toggle_registry = toggle; - } -} - -impl MutEventSubscriber for CounterSubscriberWithData { - fn process(&mut self, events: Events, ops: &mut EventOps) { - if self.toggle_registry { - self.toggle_registry = false; - - ops.remove(Events::with_data( - &self.counter_1.event_fd, - self.first_data, - EventSet::IN, - )) - .expect("Cannot remove event."); - ops.remove(Events::with_data( - &self.counter_2.event_fd, - self.first_data + 1, - EventSet::IN, - )) - .expect("Cannot remove event."); - ops.remove(Events::with_data( - &self.counter_3.event_fd, - self.first_data + 2, - EventSet::IN, - )) - .expect("Cannot remove event."); - - ops.add(Events::with_data( - &self.counter_1.event_fd, - self.first_data, - EventSet::IN, - )) - .expect("Cannot register event."); - ops.add(Events::with_data( - &self.counter_2.event_fd, - self.first_data + 1, - EventSet::IN, - )) - .expect("Cannot register event."); - ops.add(Events::with_data( - &self.counter_3.event_fd, - self.first_data + 2, - EventSet::IN, - )) - .expect("Cannot register event."); - } - match events.event_set() { - EventSet::IN => { - let event_id = events.data() - self.first_data; - match event_id { - 0 => { - self.counter_1.counter += 1; - } - 1 => { - self.counter_2.counter += 1; - } - 2 => { - self.counter_3.counter += 1; - } - _ => { - eprintln!("Received spurious event."); - } - }; - } - EventSet::ERROR => { - eprintln!("Got error on the monitored event."); - } - EventSet::HANG_UP => { - ops.remove(events) - .expect("Encountered error during cleanup."); - panic!("Cannot continue execution. Associated fd was closed."); - } - _ => {} - } - } - - fn init(&mut self, ops: &mut EventOps) { - ops.add(Events::with_data( - &self.counter_1.event_fd, - self.first_data, - EventSet::IN, - )) - .expect("Cannot register event."); - ops.add(Events::with_data( - &self.counter_2.event_fd, - self.first_data + 1, - EventSet::IN, - )) - .expect("Cannot register event."); - ops.add(Events::with_data( - &self.counter_3.event_fd, - self.first_data + 2, - EventSet::IN, - )) - .expect("Cannot register event."); - } -} - -pub struct CounterInnerMutSubscriber { - event_fd: EventFd, - counter: AtomicU64, -} - -impl Default for CounterInnerMutSubscriber { - fn default() -> Self { - Self { - event_fd: EventFd::new(0).unwrap(), - counter: AtomicU64::new(0), - } - } -} - -impl CounterInnerMutSubscriber { - pub fn trigger_event(&self) { - let _ = self.event_fd.write(1); - } - - pub fn clear_event(&self) { - let _ = self.event_fd.read(); - } - - pub fn counter(&self) -> u64 { - self.counter.load(Ordering::Relaxed) - } -} - -impl EventSubscriber for CounterInnerMutSubscriber { - fn process(&self, events: Events, ops: &mut EventOps) { - match events.event_set() { - EventSet::IN => { - self.counter.fetch_add(1, Ordering::Relaxed); - } - EventSet::ERROR => { - eprintln!("Got error on the monitored event."); - } - EventSet::HANG_UP => { - ops.remove(events) - .expect("Encountered error during cleanup."); - panic!("Cannot continue execution. Associated fd was closed."); - } - _ => {} - } - } - - fn init(&self, ops: &mut EventOps) { - ops.add(Events::new(&self.event_fd, EventSet::IN)) - .expect("Cannot register event."); - } -} diff --git a/tests/basic_event_manager.rs b/tests/basic_event_manager.rs deleted file mode 100644 index 8930ebf..0000000 --- a/tests/basic_event_manager.rs +++ /dev/null @@ -1,111 +0,0 @@ -// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 OR BSD-3-Clause -// -// A basic, single threaded application that only needs one type of -// subscriber: `CounterSubscriber`. -// -// The application has an `EventManager` and can register multiple subscribers -// of type `CounterSubscriber`. - -use std::ops::Drop; - -use event_manager::utilities::subscribers::CounterSubscriber; -use event_manager::{EventManager, SubscriberId, SubscriberOps}; - -struct App { - event_manager: EventManager, - subscribers_id: Vec, -} - -impl App { - fn new() -> Self { - Self { - event_manager: EventManager::::new().unwrap(), - subscribers_id: vec![], - } - } - - fn add_subscriber(&mut self) { - let counter_subscriber = CounterSubscriber::default(); - let id = self.event_manager.add_subscriber(counter_subscriber); - self.subscribers_id.push(id); - } - - fn run(&mut self) { - let _ = self.event_manager.run_with_timeout(100); - } - - fn inject_events_for(&mut self, subscriber_index: &[usize]) { - for i in subscriber_index { - let subscriber = self - .event_manager - .subscriber_mut(self.subscribers_id[*i]) - .unwrap(); - subscriber.trigger_event(); - } - } - - fn clear_events_for(&mut self, subscriber_indices: &[usize]) { - for i in subscriber_indices { - let subscriber = self - .event_manager - .subscriber_mut(self.subscribers_id[*i]) - .unwrap(); - subscriber.clear_event(); - } - } - - fn get_counters(&mut self) -> Vec { - let mut result = Vec::::new(); - for subscriber_id in &self.subscribers_id { - let subscriber = self.event_manager.subscriber_mut(*subscriber_id).unwrap(); - result.push(subscriber.counter()); - } - - result - } - - // Whenever the App does not need to monitor events anymore, it should explicitly call - // the `remove_subscriber` function. - fn cleanup(&mut self) { - for id in &self.subscribers_id { - let _ = self.event_manager.remove_subscriber(*id); - } - } -} - -impl Drop for App { - fn drop(&mut self) { - self.cleanup(); - } -} - -#[test] -fn test_single_threaded() { - let mut app = App::new(); - for _ in 0..100 { - app.add_subscriber(); - } - - // Random subscribers, in the sense that I randomly picked those numbers :) - let triggered_subscribers: Vec = vec![1, 3, 50, 97]; - app.inject_events_for(&triggered_subscribers); - app.run(); - - let counters = app.get_counters(); - for (i, &item) in counters.iter().enumerate() { - assert_eq!(item, 1 & (triggered_subscribers.contains(&i) as u64)); - } - - app.clear_events_for(&triggered_subscribers); - app.run(); - let counters = app.get_counters(); - for (i, &item) in counters.iter().enumerate() { - assert_eq!(item, 1 & (triggered_subscribers.contains(&i) as u64)); - } - - // Once the app does not need events anymore, the cleanup needs to be called. - // This is particularly important when the app continues the execution, but event monitoring - // is not necessary. - app.cleanup(); -} diff --git a/tests/endpoint.rs b/tests/endpoint.rs deleted file mode 100644 index 61a18a1..0000000 --- a/tests/endpoint.rs +++ /dev/null @@ -1,143 +0,0 @@ -// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 OR BSD-3-Clause - -#![cfg(feature = "remote_endpoint")] - -use std::any::Any; -use std::result; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; -use std::sync::Arc; -use std::thread; -use std::time::Duration; - -use event_manager::utilities::subscribers::CounterSubscriber; -use event_manager::{self, EventManager, MutEventSubscriber, SubscriberId}; - -trait GenericSubscriber: MutEventSubscriber { - fn as_mut_any(&mut self) -> &mut dyn Any; -} - -impl GenericSubscriber for CounterSubscriber { - fn as_mut_any(&mut self) -> &mut dyn Any { - self - } -} - -// This test showcases the use of the three `RemoteEndpoint` methods: `call_blocking`, `fire`, -// and `kick`. The setting is we're moving an `EventManager` to a separate thread, and we want -// to register subscribers that remain fully owned by the manager (so, for example, it's not -// necessary to wrap them in a `Mutex` if event handling needs mutable access to their inner -// state). We also use the `GenericSubscriber` trait defined above to show how we can still -// obtain and interact with references to the actual types when event subscribers are registered -// as trait objects. -#[test] -fn test_endpoint() { - let mut event_manager = EventManager::>::new().unwrap(); - let endpoint = event_manager.remote_endpoint(); - let sub = Box::::default(); - - let run_count = Arc::new(AtomicU64::new(0)); - let keep_running = Arc::new(AtomicBool::new(true)); - - // These get moved into the following closure. - let run_count_clone = run_count.clone(); - let keep_running_clone = keep_running.clone(); - - // We spawn the thread that runs the event loop. - let thread_handle = thread::spawn(move || { - loop { - // The manager gets moved to the new thread here. - event_manager.run().unwrap(); - // Increment this counter every time the `run` method call above returns. Ordering - // is not that important here because the other thread only reads the value. - run_count_clone.fetch_add(1, Ordering::Relaxed); - - // When `keep_running` is no longer `true`, we break the loop and the - // thread will exit. - if !keep_running_clone.load(Ordering::Acquire) { - break; - } - } - }); - - // We're back to the main program thread from now on. - - // `run_count` should not change for now as there's no possible activity for the manager. - thread::sleep(Duration::from_millis(100)); - assert_eq!(run_count.load(Ordering::Relaxed), 0); - - // Use `call_blocking` to register a subscriber (and move it to the event loop thread under - // the ownership of the manager). `call_block` also allows us to inspect the result of the - // operation, but it blocks waiting for it and cannot be invoked from the same thread as - // the event loop. - let sub_id = endpoint - .call_blocking( - |sub_ops| -> result::Result { - Ok(sub_ops.add_subscriber(sub)) - }, - ) - .unwrap(); - - // We've added a subscriber. No subscriber events are fired yet, but the manager run - // loop went through one iteration when the endpoint message was received, so `run_count` - // has benn incremented. - thread::sleep(Duration::from_millis(100)); - assert_eq!(run_count.load(Ordering::Relaxed), 1); - - // Now let's activate the subscriber event. It's going to generate continuous activity until - // we explicitly clear it. We use `endpoint` to interact with the subscriber, because the - // latter is fully owned by the manager. Also, we make use of the `as_mut_any` method - // from our `GenericSubscriber` trait to get a reference to the actual subscriber type - // (which has been erased as a trait object from the manager's perspective). We use `fire` - // here, so we don't get a result. - // - // `fire` can also be used from the same thread as the `event_manager` runs on without causing - // a deadlock, because it doesn't get a result from the closure. For example, we can pass an - // endpoint to a subscriber, and use `fire` as part of `process` if it's helpful for that - // particular use case. - // - // Not getting a result from the closure means we have to deal with error conditions within. - // We use `unwrap` here, but that's ok because if the subscriber associated with `sub_id` is - // not present, then we have a serious error in our program logic. - endpoint - .fire(move |sub_ops| { - let sub = sub_ops.subscriber_mut(sub_id).unwrap(); - // The following `unwrap` cannot fail because we know the type is `CounterSubscriber`. - sub.as_mut_any() - .downcast_mut::() - .unwrap() - .trigger_event() - }) - .unwrap(); - - // The event will start triggering at this point, so `run_count` will increase. - thread::sleep(Duration::from_millis(100)); - assert!(run_count.load(Ordering::Relaxed) > 1); - - // Let's clear the subscriber event. Using `fire` again. - endpoint - .fire(move |sub_ops| { - let sub = sub_ops.subscriber_mut(sub_id).unwrap(); - // The following `unwrap` cannot fail because we know the actual type - // is `CounterSubscriber`. - sub.as_mut_any() - .downcast_mut::() - .unwrap() - .clear_event() - }) - .unwrap(); - - // We wait a bit more. The manager will be once more become blocked waiting for events. - thread::sleep(Duration::from_millis(100)); - - keep_running.store(false, Ordering::Release); - - // Trying to `join` the manager here would lead to a deadlock, because it will never read - // the value of `keep_running` to break the loop while stuck waiting for events. We use the - // `kick` endpoint method to force `EventManager::run()` to return. - - endpoint.kick().unwrap(); - - // We can `join` the manager thread and finalize now. - thread_handle.join().unwrap(); -} diff --git a/tests/multi_threaded.rs b/tests/multi_threaded.rs deleted file mode 100644 index 8f1d653..0000000 --- a/tests/multi_threaded.rs +++ /dev/null @@ -1,150 +0,0 @@ -// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 OR BSD-3-Clause - -use std::sync::{Arc, Mutex}; -use std::thread; - -use event_manager::utilities::subscribers::{ - CounterInnerMutSubscriber, CounterSubscriber, CounterSubscriberWithData, -}; -use event_manager::{EventManager, EventSubscriber, MutEventSubscriber, SubscriberOps}; - -// Showcase how you can manage subscribers of different types using the same event manager -// in a multithreaded context. -#[test] -fn test_diff_type_subscribers() { - let mut event_manager = EventManager::>>::new() - .expect("Cannot create event manager."); - - // The `CounterSubscriberWithData` expects to receive a number as a parameter that represents - // the number it can use as its inner Events data. - let data_subscriber = Arc::new(Mutex::new(CounterSubscriberWithData::new(1))); - data_subscriber.lock().unwrap().trigger_all_counters(); - - let counter_subscriber = Arc::new(Mutex::new(CounterSubscriber::default())); - counter_subscriber.lock().unwrap().trigger_event(); - - // Saving the ids allows to modify the subscribers in the event manager context (i.e. removing - // the subscribers from the event manager loop). - let ds_id = event_manager.add_subscriber(data_subscriber); - let cs_id = event_manager.add_subscriber(counter_subscriber); - - let thread_handle = thread::spawn(move || { - // In a typical application this would be an infinite loop with some break condition. - // For tests, 100 iterations should suffice. - for _ in 0..100 { - // When the event manager loop exits, it will return the number of events it - // handled. - let ev_count = event_manager.run().unwrap(); - // We are expecting the ev_count to be: - // 4 = 3 (events triggered from data_subscriber) + 1 (event from counter_subscriber). - assert_eq!(ev_count, 4); - } - - // One really important thing is to always remove the subscribers once you don't need them - // any more. - let _ = event_manager.remove_subscriber(ds_id).unwrap(); - let _ = event_manager.remove_subscriber(cs_id).unwrap(); - }); - thread_handle.join().unwrap(); -} - -// Showcase how you can manage a single type of subscriber using the same event manager -// in a multithreaded context. -#[test] -fn test_one_type_subscriber() { - let mut event_manager = - EventManager::::new().expect("Cannot create event manager"); - - // The `CounterSubscriberWithData` expects to receive the number it can use as its inner - // `Events` data as a parameter. - // Let's make sure that the inner data of the two subscribers will not overlap by keeping some - // numbers between the first_data of each subscriber. - let data_subscriber_1 = CounterSubscriberWithData::new(1); - let data_subscriber_2 = CounterSubscriberWithData::new(1999); - - // Saving the ids allows to modify the subscribers in the event manager context (i.e. removing - // the subscribers from the event manager loop). - let ds_id_1 = event_manager.add_subscriber(data_subscriber_1); - let ds_id_2 = event_manager.add_subscriber(data_subscriber_2); - - // Since we moved the ownership of the subscriber to the event manager, now we need to get - // a mutable reference from it so that we can modify them. - // Enclosing this in a code block so that the references are dropped when they're no longer - // needed. - { - let data_subscriber_1 = event_manager.subscriber_mut(ds_id_1).unwrap(); - data_subscriber_1.trigger_all_counters(); - - let data_subscriber_2 = event_manager.subscriber_mut(ds_id_2).unwrap(); - data_subscriber_2.trigger_all_counters(); - } - - let thread_handle = thread::spawn(move || { - // In a typical application this would be an infinite loop with some break condition. - // For tests, 100 iterations should suffice. - for _ in 0..100 { - // When the event manager loop exits, it will return the number of events it - // handled. - let ev_count = event_manager.run().unwrap(); - // Since we triggered all counters, we're expecting the number of events to always be - // 6 (3 from one subscriber and 3 from the other). - assert_eq!(ev_count, 6); - } - - // One really important thing is to always remove the subscribers once you don't need them - // any more. - // When calling remove_subscriber, you receive ownership of the subscriber object. - let data_subscriber_1 = event_manager.remove_subscriber(ds_id_1).unwrap(); - let data_subscriber_2 = event_manager.remove_subscriber(ds_id_2).unwrap(); - - // Let's check how the subscribers look after 100 iterations. - // Each subscriber's counter start from 0. When an event is triggered, each counter - // is incremented. So after 100 iterations when they're all triggered, we expect them - // to be 100. - let expected_subscriber_counters = vec![100, 100, 100]; - assert_eq!( - data_subscriber_1.get_all_counter_values(), - expected_subscriber_counters - ); - assert_eq!( - data_subscriber_2.get_all_counter_values(), - expected_subscriber_counters - ); - }); - - thread_handle.join().unwrap(); -} - -// Showcase how you can manage subscribers that make use of inner mutability using the event manager -// in a multithreaded context. -#[test] -fn test_subscriber_inner_mut() { - // We are using just the `CounterInnerMutSubscriber` subscriber, so we could've initialize - // the event manager as: EventManager::::new(). - // The typical application will have more than one subscriber type, so let's just pretend - // this is the case in this example as well. - let mut event_manager = EventManager::>::new() - .expect("Cannot create event manager"); - - let subscriber = Arc::new(CounterInnerMutSubscriber::default()); - subscriber.trigger_event(); - - // Let's just clone the subscriber before adding it to the event manager. This will allow us - // to use it from this thread without the need to call into EventManager::subscriber_mut(). - let subscriber_id = event_manager.add_subscriber(subscriber.clone()); - - let thread_handle = thread::spawn(move || { - for _ in 0..100 { - // When the event manager loop exits, it will return the number of events it - // handled. - let ev_count = event_manager.run().unwrap(); - assert_eq!(ev_count, 1); - } - - assert!(event_manager.remove_subscriber(subscriber_id).is_ok()); - }); - thread_handle.join().unwrap(); - - assert_eq!(subscriber.counter(), 100); -} diff --git a/tests/negative_tests.rs b/tests/negative_tests.rs deleted file mode 100644 index ec53ae9..0000000 --- a/tests/negative_tests.rs +++ /dev/null @@ -1,105 +0,0 @@ -// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 OR BSD-3-Clause - -use std::os::unix::{io::AsRawFd, net::UnixStream}; -use std::sync::{ - atomic::{AtomicU64, Ordering}, - Arc, -}; - -use event_manager::{ - EventManager, EventOps, EventSubscriber, Events, SubscriberOps, MAX_READY_EVENTS_CAPACITY, -}; -use vmm_sys_util::epoll::EventSet; - -#[derive(Debug)] -struct UnixStreamSubscriber { - stream: UnixStream, - rhup_count: AtomicU64, - // When this flag is used, in the process function the subscriber will - // unregister the fd where an error was received. - // The errors that need to be handled: `EventSet::HANG_UP`, `EventSet::ERROR`. - with_unregister_on_err: bool, -} - -impl UnixStreamSubscriber { - fn new(stream: UnixStream) -> UnixStreamSubscriber { - Self { - stream, - rhup_count: AtomicU64::new(0), - with_unregister_on_err: false, - } - } - - fn new_with_unregister_on_err(stream: UnixStream) -> UnixStreamSubscriber { - Self { - stream, - rhup_count: AtomicU64::new(0), - with_unregister_on_err: true, - } - } -} - -impl EventSubscriber for UnixStreamSubscriber { - fn process(&self, events: Events, ops: &mut EventOps<'_>) { - if events.event_set().contains(EventSet::HANG_UP) { - let _ = self.rhup_count.fetch_add(1, Ordering::Relaxed); - if self.with_unregister_on_err { - ops.remove(Events::empty(&self.stream)).unwrap(); - } - } - } - - fn init(&self, ops: &mut EventOps<'_>) { - ops.add(Events::new(&self.stream, EventSet::IN)).unwrap(); - } -} - -#[test] -fn test_handling_errors_in_subscriber() { - let (sock1, sock2) = UnixStream::pair().unwrap(); - - let mut event_manager = EventManager::>::new().unwrap(); - let subscriber = Arc::new(UnixStreamSubscriber::new(sock1)); - event_manager.add_subscriber(subscriber.clone()); - - // SAFETY: safe because `sock2` is a valid Unix socket, as asserted by the `unwrap` above. - unsafe { libc::close(sock2.as_raw_fd()) }; - - event_manager.run_with_timeout(100).unwrap(); - event_manager.run_with_timeout(100).unwrap(); - event_manager.run_with_timeout(100).unwrap(); - - // Since the subscriber did not remove the event from its watch list, the - // `EPOLLRHUP` error will continuously be a ready event each time `run` is called. - // We called `run_with_timeout` 3 times, hence we expect `rhup_count` to be 3. - assert_eq!(subscriber.rhup_count.load(Ordering::Relaxed), 3); - - let (sock1, sock2) = UnixStream::pair().unwrap(); - let subscriber_with_unregister = - Arc::new(UnixStreamSubscriber::new_with_unregister_on_err(sock1)); - event_manager.add_subscriber(subscriber_with_unregister); - - // SAFETY: safe because `sock2` is a valid Unix socket, as asserted by the `unwrap` above. - unsafe { libc::close(sock2.as_raw_fd()) }; - - let ready_list_len = event_manager.run_with_timeout(100).unwrap(); - assert_eq!(ready_list_len, 2); - // At this point the `subscriber_with_unregister` should not yield events anymore. - // We expect the number of ready fds to be 1. - let ready_list_len = event_manager.run_with_timeout(100).unwrap(); - assert_eq!(ready_list_len, 1); -} - -#[test] -fn test_max_ready_list_size() { - assert!( - EventManager::>::new_with_capacity(MAX_READY_EVENTS_CAPACITY) - .is_ok() - ); - assert!(EventManager::>::new_with_capacity( - MAX_READY_EVENTS_CAPACITY + 1 - ) - .is_err()); - assert!(EventManager::>::new_with_capacity(usize::MAX).is_err()) -} diff --git a/tests/regressions.rs b/tests/regressions.rs deleted file mode 100644 index 92c83ec..0000000 --- a/tests/regressions.rs +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright (C) 2020 Alibaba Cloud. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 OR BSD-3-Clause - -use event_manager::utilities::subscribers::CounterSubscriberWithData; -use event_manager::{EventManager, SubscriberOps}; - -// Test for the race condition reported by: Karthik.n -// FYI: https://github.com/rust-vmm/event-manager/issues/41 -#[test] -fn test_reuse_file_descriptor() { - let mut event_manager = EventManager::::new().unwrap(); - let mut counter_subscriber = CounterSubscriberWithData::new(0); - - // Set flag to toggle the registration of all three fds on the first epoll event, so the final - // event counter should be 1. - counter_subscriber.set_toggle_registry(true); - counter_subscriber.trigger_all_counters(); - let id = event_manager.add_subscriber(counter_subscriber); - - event_manager.run().unwrap(); - let c_ref = event_manager.subscriber_mut(id).unwrap(); - let counters = c_ref.get_all_counter_values(); - assert_eq!(counters[0] + counters[1] + counters[2], 1); -}