From 295887034f580872278c311ba03e34d7bcfdd04f Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 5 Dec 2024 21:55:23 -0500 Subject: [PATCH] chore: Hide underlying channel in the Dispatcher (#1585) Hide the underlying dependency so we can have a stable API while making changes to the underlying building blocks and implementation --- vortex-io/src/dispatcher/compio.rs | 6 +++--- vortex-io/src/dispatcher/mod.rs | 25 ++++++++++++++++++++++--- vortex-io/src/dispatcher/tokio.rs | 6 +++--- vortex-io/src/dispatcher/wasm.rs | 6 +++--- 4 files changed, 31 insertions(+), 12 deletions(-) diff --git a/vortex-io/src/dispatcher/compio.rs b/vortex-io/src/dispatcher/compio.rs index bdc118285b..5d62a86842 100644 --- a/vortex-io/src/dispatcher/compio.rs +++ b/vortex-io/src/dispatcher/compio.rs @@ -6,7 +6,7 @@ use compio::runtime::{JoinHandle as CompioJoinHandle, Runtime, RuntimeBuilder}; use futures::channel::oneshot; use vortex_error::{vortex_bail, vortex_panic, VortexResult}; -use super::Dispatch; +use super::{Dispatch, JoinHandle as VortexJoinHandle}; trait CompioSpawn { fn spawn(self: Box) -> CompioJoinHandle<()>; @@ -71,7 +71,7 @@ impl CompioDispatcher { } impl Dispatch for CompioDispatcher { - fn dispatch(&self, task: F) -> VortexResult> + fn dispatch(&self, task: F) -> VortexResult> where F: (FnOnce() -> Fut) + Send + 'static, Fut: Future + 'static, @@ -80,7 +80,7 @@ impl Dispatch for CompioDispatcher { let (tx, rx) = oneshot::channel(); let compio_task = Box::new(CompioTask { task, result: tx }); match self.submitter.send(compio_task) { - Ok(()) => Ok(rx), + Ok(()) => Ok(VortexJoinHandle(rx)), Err(err) => vortex_bail!("Dispatcher error spawning task: {err}"), } } diff --git a/vortex-io/src/dispatcher/mod.rs b/vortex-io/src/dispatcher/mod.rs index fb518c157e..bc8c9401a4 100644 --- a/vortex-io/src/dispatcher/mod.rs +++ b/vortex-io/src/dispatcher/mod.rs @@ -6,11 +6,13 @@ mod tokio; mod wasm; use std::future::Future; +use std::task::Poll; use futures::channel::oneshot; +use futures::FutureExt; #[cfg(not(any(feature = "compio", feature = "tokio")))] use vortex_error::vortex_panic; -use vortex_error::VortexResult; +use vortex_error::{vortex_err, VortexResult}; #[cfg(feature = "compio")] use self::compio::*; @@ -43,7 +45,7 @@ pub trait Dispatch: sealed::Sealed { /// /// The returned `Future` will be executed to completion on a single thread, /// thus it may be `!Send`. - fn dispatch(&self, task: F) -> VortexResult> + fn dispatch(&self, task: F) -> VortexResult> where F: (FnOnce() -> Fut) + Send + 'static, Fut: Future + 'static, @@ -68,6 +70,23 @@ pub trait Dispatch: sealed::Sealed { #[derive(Debug)] pub struct IoDispatcher(Inner); +pub struct JoinHandle(oneshot::Receiver); + +impl Future for JoinHandle { + type Output = VortexResult; + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll { + match self.0.poll_unpin(cx) { + Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)), + Poll::Ready(Err(_)) => Poll::Ready(Err(vortex_err!("Task was canceled"))), + Poll::Pending => Poll::Pending, + } + } +} + #[derive(Debug)] enum Inner { #[cfg(feature = "tokio")] @@ -97,7 +116,7 @@ impl Default for IoDispatcher { impl Dispatch for IoDispatcher { #[allow(unused_variables)] // If no features are enabled `task` ends up being unused - fn dispatch(&self, task: F) -> VortexResult> + fn dispatch(&self, task: F) -> VortexResult> where F: (FnOnce() -> Fut) + Send + 'static, Fut: Future + 'static, diff --git a/vortex-io/src/dispatcher/tokio.rs b/vortex-io/src/dispatcher/tokio.rs index dde9987773..d6c0bd8e66 100644 --- a/vortex-io/src/dispatcher/tokio.rs +++ b/vortex-io/src/dispatcher/tokio.rs @@ -6,7 +6,7 @@ use futures::channel::oneshot; use tokio::task::{JoinHandle as TokioJoinHandle, LocalSet}; use vortex_error::{vortex_bail, vortex_panic, VortexResult}; -use super::Dispatch; +use super::{Dispatch, JoinHandle as VortexJoinHandle}; trait TokioSpawn { fn spawn(self: Box) -> TokioJoinHandle<()>; @@ -84,7 +84,7 @@ where } impl Dispatch for TokioDispatcher { - fn dispatch(&self, task: F) -> VortexResult> + fn dispatch(&self, task: F) -> VortexResult> where F: (FnOnce() -> Fut) + Send + 'static, Fut: Future + 'static, @@ -95,7 +95,7 @@ impl Dispatch for TokioDispatcher { let task = TokioTask { result: tx, task }; match self.submitter.send(Box::new(task)) { - Ok(()) => Ok(rx), + Ok(()) => Ok(VortexJoinHandle(rx)), Err(err) => vortex_bail!("Dispatcher error spawning task: {err}"), } } diff --git a/vortex-io/src/dispatcher/wasm.rs b/vortex-io/src/dispatcher/wasm.rs index f39737ecdd..f9a076316c 100644 --- a/vortex-io/src/dispatcher/wasm.rs +++ b/vortex-io/src/dispatcher/wasm.rs @@ -7,7 +7,7 @@ use futures::channel::oneshot::Receiver; use vortex_error::{vortex_panic, VortexResult}; use wasm_bindgen_futures::wasm_bindgen::__rt::Start; -use crate::Dispatch; +use crate::{Dispatch, JoinHandle as VortexJoinHandle}; /// `Dispatch`able type that is available when running Vortex in the browser or other WASM env. #[derive(Debug, Clone)] @@ -20,7 +20,7 @@ impl WasmDispatcher { } impl Dispatch for WasmDispatcher { - fn dispatch(&self, task: F) -> VortexResult> + fn dispatch(&self, task: F) -> VortexResult> where F: FnOnce() -> Fut + Send + 'static, Fut: Future + 'static, @@ -35,7 +35,7 @@ impl Dispatch for WasmDispatcher { }) .start(); - Ok(rx) + Ok(VortexJoinHandle(rx)) } fn shutdown(self) -> VortexResult<()> {