Skip to content

Commit

Permalink
Merge pull request #14 from xgroleau/feat/drop-bomb-cancellation
Browse files Browse the repository at this point in the history
feat: Added drop bomb to panic in case of request cancellation
  • Loading branch information
xgroleau authored Sep 21, 2023
2 parents ce7bf14 + 1aa1a4f commit 0c9472e
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 2 deletions.
52 changes: 52 additions & 0 deletions ector/examples/cancel_panic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//! Example where cancellation will cause a panic

#![macro_use]
#![feature(type_alias_impl_trait)]
#![feature(async_fn_in_trait)]
#![allow(incomplete_features)]

use {
ector::*,
embassy_time::{Duration, Timer},
futures::future::{join, select},
};

async fn test(addr: DynamicAddress<Request<&'static str, &'static str>>) {
let req = core::pin::pin!(addr.request("Hello"));
let timeout = Timer::after(Duration::from_secs(1));
select(req, timeout).await;
println!("Timeout");
}

#[embassy_executor::main]
async fn main(_s: embassy_executor::Spawner) {
// Example of request response
static SERVER: ActorContext<Server> = ActorContext::new();

let address = SERVER.dyn_address();
let server = SERVER.mount(Server);
let test = test(address);
join(server, test).await;
}

pub struct Server;

impl Actor for Server {
type Message = Request<&'static str, &'static str>;

async fn on_mount<M>(
&mut self,
_: DynamicAddress<Request<&'static str, &'static str>>,
mut inbox: M,
) -> !
where
M: Inbox<Self::Message>,
{
println!("Server started!");

loop {
// We don't reply, since we cancel
inbox.next().await;
}
}
}
20 changes: 18 additions & 2 deletions ector/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use embassy_sync::{
channel::{Channel, DynamicSender, Receiver, TrySendError},
};

use crate::drop::DropBomb;

/// Trait that each actor must implement. An actor defines a message type
/// that it acts on, and an implementation of `on_mount` which is invoked
/// when the actor is started.
Expand Down Expand Up @@ -78,13 +80,17 @@ impl<'a, M, R> ActorRequest<M, R> for DynamicSender<'a, Request<M, R>> {
async fn request(&self, message: M) -> R {
let channel: Channel<NoopRawMutex, R, 1> = Channel::new();
let sender: DynamicSender<'_, R> = channel.sender().into();
let bomb = DropBomb::new();

// We guarantee that channel lives until we've been notified on it, at which
// point its out of reach for the replier.
let reply_to = unsafe { core::mem::transmute(&sender) };
let message = Request::new(message, reply_to);
self.notify(message).await;
channel.receive().await
let res = channel.receive().await;

bomb.defuse();
res
}
}

Expand All @@ -111,13 +117,17 @@ where
async fn request(&self, message: M) -> R {
let channel: Channel<MUT, R, 1> = Channel::new();
let sender: DynamicSender<'_, R> = channel.sender().into();
let bomb = DropBomb::new();

// We guarantee that channel lives until we've been notified on it, at which
// point its out of reach for the replier.
let reply_to = unsafe { core::mem::transmute(&sender) };
let message = Request::new(message, reply_to);
self.notify(message).await;
channel.receive().await
let res = channel.receive().await;

bomb.defuse();
res
}
}

Expand All @@ -128,6 +138,9 @@ where
pub type DynamicAddress<M> = DynamicSender<'static, M>;

/// Type alias over a [DynamicAddress] using a [Request] as message
///
/// Safety: You should not cancel a request, it will cause the requesting thread to panic,
/// and cause UB if the Actor is still alive after the panic (i.e. in a different thread)
pub type DynamicRequestAddress<M, R> = DynamicSender<'static, Request<M, R>>;

/// A handle to another actor for dispatching messages.
Expand All @@ -137,6 +150,9 @@ pub type DynamicRequestAddress<M, R> = DynamicSender<'static, Request<M, R>>;
pub type Address<M, MUT, const N: usize = 1> = Sender<'static, MUT, M, N>;

/// Type alias over a [Address] using a [Request] as message
///
/// Safety: You should not cancel a request, it will cause the requesting thread to panic,
/// and cause UB if the Actor is still alive after the panic (i.e. in a different thread)
pub type RequestAddress<M, R, MUT, const N: usize = 1> = Sender<'static, MUT, Request<M, R>, N>;

pub struct Request<M, R>
Expand Down
25 changes: 25 additions & 0 deletions ector/src/drop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
//! Original reference https://github.com/embassy-rs/embassy/blob/e1ed492577d2151b4cc8ef995536dd045d2db087/embassy-hal-internal/src/drop.rs#L32

/// Panics if it is improperly disposed of.
///
/// This is to forbid cancelling a future/request.
///
/// To properly dispose, call the [defuse](Self::defuse) method before this object is dropped.
#[must_use = "to delay the drop bomb invokation to the end of the scope"]
pub struct DropBomb(());
impl DropBomb {
pub fn new() -> Self {
Self(())
}

/// Defuses the bomb, rendering it safe to drop.
pub fn defuse(self) {
core::mem::forget(self)
}
}

impl Drop for DropBomb {
fn drop(&mut self) {
panic!("Dropped before the request completed. You cannot cancel an ongoing request")
}
}
1 change: 1 addition & 0 deletions ector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
pub(crate) mod fmt;

mod actor;
mod drop;
pub use {actor::*, ector_macros::*};

// Reexport mutex types
Expand Down

0 comments on commit 0c9472e

Please sign in to comment.