diff --git a/luisa_compute/src/lib.rs b/luisa_compute/src/lib.rs index 549ece8..061cf46 100644 --- a/luisa_compute/src/lib.rs +++ b/luisa_compute/src/lib.rs @@ -148,7 +148,6 @@ impl Context { handle: api::Stream(default_stream.handle), native_handle: default_stream.native_handle, device: weak.clone(), - mutex: RawMutex::INIT, })), ctx: self.inner.clone(), }), diff --git a/luisa_compute/src/runtime.rs b/luisa_compute/src/runtime.rs index c550134..182a085 100644 --- a/luisa_compute/src/runtime.rs +++ b/luisa_compute/src/runtime.rs @@ -339,7 +339,6 @@ impl Device { device: self.inner.clone(), handle: api::Stream(stream.handle), native_handle: stream.native_handle, - mutex: RawMutex::INIT, }), } } @@ -506,13 +505,11 @@ pub(crate) enum StreamHandle { device: Weak, handle: api::Stream, native_handle: *mut std::ffi::c_void, - mutex: RawMutex, }, NonDefault { device: Arc, handle: api::Stream, native_handle: *mut std::ffi::c_void, - mutex: RawMutex, }, } unsafe impl Send for StreamHandle {} @@ -556,17 +553,13 @@ impl Swapchain { } } +/// Synchronization primitives between streams, resemble timeline semaphores. +/// `scope.signal(event, ticket)` signals the event with a ticket. +/// `scope.wait(event, ticket)` waits until the event is signaled with a ticket +#[derive(Clone)] pub struct Event { pub(crate) handle: Arc, } -pub struct EventWait<'a> { - event: &'a Event, - ticket: u64, -} -pub struct EventSignal<'a> { - event: &'a Event, - ticket: u64, -} impl Event { #[inline] @@ -591,20 +584,6 @@ impl Event { .inner .is_event_completed(self.handle.handle, ticket) } - #[inline] - pub fn wait(&self, ticket: u64) -> EventWait { - EventWait { - event: self, - ticket, - } - } - #[inline] - pub fn signal(&self, ticket: u64) -> EventSignal { - EventSignal { - event: self, - ticket, - } - } } pub(crate) struct EventHandle { @@ -612,6 +591,8 @@ pub(crate) struct EventHandle { handle: api::Event, native_handle: *mut std::ffi::c_void, } +unsafe impl Send for EventHandle {} +unsafe impl Sync for EventHandle {} impl Drop for EventHandle { fn drop(&mut self) { @@ -619,6 +600,15 @@ impl Drop for EventHandle { } } +/// A [`Stream`] is a sequence of commands that are executed in order. +/// Commands are submitted to a stream via [`Scope`]. +/// There is a default stream attached to each [`Device`], which can be accessed +/// via [`Device::default_stream`]. +/// All synchronous api implicitly use the default stream. +/// To create a new stream, use [`Device::create_stream`]. +/// Multiple stream executes in parallel, but commands within a stream are +/// executed in order. +/// To synchronize between streams, use [`Event`]. pub struct Stream { #[allow(dead_code)] pub(crate) device: Device, @@ -650,22 +640,6 @@ impl StreamHandle { StreamHandle::NonDefault { native_handle, .. } => *native_handle, } } - #[inline] - pub(crate) fn lock(&self) { - match self { - StreamHandle::Default { mutex, .. } => mutex.lock(), - StreamHandle::NonDefault { mutex, .. } => mutex.lock(), - } - } - #[inline] - pub(crate) fn unlock(&self) { - unsafe { - match self { - StreamHandle::Default { mutex, .. } => mutex.unlock(), - StreamHandle::NonDefault { mutex, .. } => mutex.unlock(), - } - } - } } impl Drop for StreamHandle { @@ -679,6 +653,19 @@ impl Drop for StreamHandle { } } +/// Scope<'a> represents the lifetime of command execution. +/// Commands can only be submitted within a scope. +/// When scope drops, the underlying [`Stream`] is autoatically synchronized. +/// +/// To use a scope, the easiest way is to use [`Stream::with_scope`]. +/// +/// To create a scope with manually assigned lifetime, use [`Stream::scope`]. +/// For example, `let scope:Scope<'static> = stream.scope();` creates a scope that +/// accepts commands with `'static` lifetime. +/// +/// To prevent a scope from synchronizing on drop, use [`Scope::detach`].Note this is only +/// available for `Scope<'static>` as certain commands such as `copy_to_async<'a>` requires +/// synchronization. pub struct Scope<'a> { handle: Arc, marker: PhantomData<&'a ()>, @@ -808,12 +795,16 @@ impl<'a> Scope<'a> { self } } +impl Scope<'static> { + pub fn detach(self) { + self.synchronized.set(true); + } +} impl<'a> Drop for Scope<'a> { fn drop(&mut self) { if !self.synchronized.get() { self.synchronize(); } - self.handle.unlock(); } } @@ -824,9 +815,10 @@ impl Stream { f(&s) } + /// Creates a new scope. + /// See [`Scope`] for more details. #[inline] pub fn scope<'a>(&self) -> Scope<'a> { - self.handle.lock(); Scope { handle: self.handle.clone(), marker: PhantomData {},