Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support !Send future #36

Closed
akiradeveloper opened this issue Nov 25, 2021 · 4 comments · Fixed by #39
Closed

Support !Send future #36

akiradeveloper opened this issue Nov 25, 2021 · 4 comments · Fixed by #39
Assignees

Comments

@akiradeveloper
Copy link
Owner

Discussed in https://github.com/akiradeveloper/norpc/discussions/35

Originally posted by akiradeveloper November 25, 2021
Adding (?Send) to async_trait macro will generate async function that yields non-Sync future.

There are demands for non-Send futures.

My understanding is Send is now required because the future may move between threads. So, if this doesn't happen in some particular case we can remove Sync constraint from them.

Maybe we can use this to run all futures in the same thread? https://docs.rs/tokio/1.14.0/tokio/task/struct.LocalSet.html

@akiradeveloper akiradeveloper self-assigned this Nov 25, 2021
@akiradeveloper
Copy link
Owner Author

Here is an experiment. My idea is make nosync module to put ClientChannel and ServerChannel of no-sync variants. Need some change to norpc-macros as well to support ?Send attribute.

diff --git a/Cargo.toml b/Cargo.toml
index c279d3e..c34a8f5 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -2,10 +2,10 @@
 members = [
     "norpc",
     "norpc-macros",
-    "example/hello-world",
-    "example/kvstore",
-    "example/concurrent-message",
+    # "example/hello-world",
+    # "example/kvstore",
+    # "example/concurrent-message",
     "example/rate-limit",
-    "benchmark/noop",
+    # "benchmark/noop",
     "benchmark/noop-tarpc",
 ]
\ No newline at end of file
diff --git a/example/rate-limit/tests/rate_limit.rs b/example/rate-limit/tests/rate_limit.rs
index 4f15e9b..5556b75 100644
--- a/example/rate-limit/tests/rate_limit.rs
+++ b/example/rate-limit/tests/rate_limit.rs
@@ -11,21 +11,23 @@ trait RateLimit {

 #[derive(Clone)]
 struct RateLimitApp;
-#[norpc::async_trait]
+#[norpc::async_trait(?Send)]
 impl RateLimit for RateLimitApp {
     async fn noop(self) {}
 }
-#[tokio::test(flavor = "multi_thread")]
+// #[tokio::test(flavor = "multi_thread")]
 async fn test_rate_limit() {
     let (tx, rx) = mpsc::unbounded_channel();
-    tokio::spawn(async move {
-        let app = RateLimitApp;
-        let service = RateLimitService::new(app);
-        let service = ServiceBuilder::new()
-            .rate_limit(5000, std::time::Duration::from_secs(1))
-            .service(service);
-        let server = norpc::ServerChannel::new(rx, service);
-        server.serve().await
+    tokio::task::block_in_place(move || {
+        tokio::runtime::Handle::current().block_on(async move {
+            let app = RateLimitApp;
+            let service = RateLimitService::new(app);
+            let service = ServiceBuilder::new()
+                .rate_limit(5000, std::time::Duration::from_secs(1))
+                .service(service);
+            let server = norpc::ServerChannel::new(rx, service);
+            server.serve().await
+        });
     });
     let chan = norpc::ClientChannel::new(tx);
     let chan = ServiceBuilder::new()
diff --git a/norpc-macros/src/generator.rs b/norpc-macros/src/generator.rs
index 6a39677..c2b1f9e 100644
--- a/norpc-macros/src/generator.rs
+++ b/norpc-macros/src/generator.rs
@@ -72,7 +72,7 @@ fn generate_trait(svc: &Service) -> String {
     }
     format!(
         "
-               #[norpc::async_trait]
+               #[norpc::async_trait(?Send)]
                pub trait {svc_name}: Clone {{
                        {}
                }}
@@ -160,10 +160,10 @@ fn generate_server_impl(svc: &Service) -> String {
                        Self {{ app }}
                }}
        }}
-    impl<App: {svc_name} + 'static + Send> tower::Service<{svc_name}Request> for {svc_name}Service<App> {{
+    impl<App: {svc_name} + 'static> tower::Service<{svc_name}Request> for {svc_name}Service<App> {{
         type Response = {svc_name}Response;
         type Error = ();
-        type Future = std::pin::Pin<Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>>;
+        type Future = std::pin::Pin<Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>>>>;
         fn poll_ready(
             &mut self,
             _: &mut std::task::Context<'_>,
diff --git a/norpc/src/lib.rs b/norpc/src/lib.rs
index 654e4c3..77b22af 100644
--- a/norpc/src/lib.rs
+++ b/norpc/src/lib.rs
@@ -32,11 +32,11 @@ impl<X, Y> Clone for ClientChannel<X, Y> {
         }
     }
 }
-impl<X: 'static + Send, Y: 'static + Send> Service<X> for ClientChannel<X, Y> {
+impl<X: 'static, Y: 'static> Service<X> for ClientChannel<X, Y> {
     type Response = Y;
     type Error = Error;
     type Future =
-        std::pin::Pin<Box<dyn std::future::Future<Output = Result<Y, Self::Error>> + Send>>;
+        std::pin::Pin<Box<dyn std::future::Future<Output = Result<Y, Self::Error>>>>;
     fn poll_ready(
         &mut self,
@@ -72,27 +72,26 @@ pub struct ServerChannel<Req, Svc: Service<Req>> {
     service: Svc,
     rx: mpsc::UnboundedReceiver<Request<Req, Svc::Response>>,
 }
-impl<Req, Svc: Service<Req> + 'static + Send> ServerChannel<Req, Svc>
-where
-    Req: 'static + Send,
-    Svc::Future: Send,
-    Svc::Response: Send,
+impl<Req: 'static, Svc: Service<Req> + 'static> ServerChannel<Req, Svc>
 {
     pub fn new(rx: mpsc::UnboundedReceiver<Request<Req, Svc::Response>>, service: Svc) -> Self {
         Self { service, rx }
     }
     pub async fn serve(mut self) {
-        while let Some(Request { tx, inner }) = self.rx.recv().await {
-            // back-pressure
-            futures::future::poll_fn(|ctx| self.service.poll_ready(ctx))
-                .await
-                .ok();
-            let fut = self.service.call(inner);
-            tokio::spawn(async move {
-                if let Ok(rep) = fut.await {
-                    tx.send(rep).ok();
-                }
-            });
-        }
+        let local = tokio::task::LocalSet::new();
+        local.run_until(async move {
+            while let Some(Request { tx, inner }) = self.rx.recv().await {
+                // back-pressure
+                futures::future::poll_fn(|ctx| self.service.poll_ready(ctx))
+                    .await
+                    .ok();
+                let fut = self.service.call(inner);
+                tokio::task::spawn_local(async move {
+                    if let Ok(rep) = fut.await {
+                        tx.send(rep).ok();
+                    }
+                }).await;
+            }
+        }).await;

@akiradeveloper
Copy link
Owner Author

akiradeveloper commented Nov 25, 2021

Can we use https://github.com/DataDog/glommio?

This library seems to make an ecosystem about pinned execution.

Although pinned threads are not required for use of glommio, by creating N executors and binding each to a specific CPU one can use this crate to implement a thread-per-core system where context switches essentially never happen, allowing much higher efficiency.

+    tokio::task::block_in_place(move || {
+        tokio::runtime::Handle::current().block_on(async move {

looks like be replaceable with LocalExecutor.

use glommio::LocalExecutor;

let local_ex = LocalExecutor::default();

local_ex.run(async {
    println!("Hello world!");
});

Single-threaded executor.
The executor can only be run on the thread that created it.

@akiradeveloper
Copy link
Owner Author

Hahaha, glommio requires kernel 5.8. The end.

---- test_concurrent_message stdout ----
thread 'test_concurrent_message' panicked at 'Failed to register a probe. The most likely reason is that your kernel witnessed Romulus killing Remus (too old!! kernel should be at least 5.8)', /home/akira.hayakawa/.cargo/registry/src/github.com-1ecc6299db9ec823/glommio-0.6.0/src/sys/uring.rs:213:13

@akiradeveloper akiradeveloper linked a pull request Nov 26, 2021 that will close this issue
@akiradeveloper
Copy link
Owner Author

done.

@akiradeveloper akiradeveloper changed the title Support !Sync future Support !Send future Nov 27, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant