Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Missing Sync somewhere along the chain of Pool::get #333

Closed
samchouse opened this issue Jun 15, 2024 · 7 comments
Closed

Missing Sync somewhere along the chain of Pool::get #333

samchouse opened this issue Jun 15, 2024 · 7 comments
Labels
A-core Area: Core / deadpool A-postgres Area: PostgreSQL support / deadpool-postgres invalid The issue is invalid and has been raised in error.

Comments

@samchouse
Copy link

db.get().await; causes problems with the future resulting in an error. I'm not sure exactly where the issue lies.

   Compiling deadpool v0.12.1 (/home/sam/deadpool)
   Compiling deadpool-postgres v0.14.0 (/home/sam/deadpool/postgres)
   Compiling adrastos_core v0.1.0 (/home/sam/Documents/projects/personal/adrastos/crates/core)
warning: unused import: `db::postgres::Database`
 --> crates/core/src/task_queue.rs:5:13
  |
5 | use crate::{db::postgres::Database, id::Id};
  |             ^^^^^^^^^^^^^^^^^^^^^^
  |
  = note: `#[warn(unused_imports)]` on by default

warning: this function depends on never type fallback being `()`
   --> crates/core/src/auth/oauth2/mod.rs:129:5
    |
129 | /     pub async fn initialize_login(
130 | |         &self,
131 | |         provider: OAuth2Provider,
132 | |         redis_pool: &RedisPool,
133 | |     ) -> Result<(Url, CsrfToken), String> {
    | |_________________________________________^
    |
    = warning: this was previously accepted by the compiler but is being phased out; it will become a hard error in a future release!
    = note: for more information, see issue #123748 <https://github.com/rust-lang/rust/issues/123748>
    = help: specify the types explicitly
    = note: `#[warn(dependency_on_unit_never_type_fallback)]` on by default

warning: unused variable: `id`
  --> crates/core/src/task_queue.rs:30:22
   |
30 |                 for (id, task) in queue.write().await.0.iter() {
   |                      ^^ help: if this is intentional, prefix it with an underscore: `_id`
   |
   = note: `#[warn(unused_variables)]` on by default

warning: unused variable: `task`
  --> crates/core/src/task_queue.rs:30:26
   |
30 |                 for (id, task) in queue.write().await.0.iter() {
   |                          ^^^^ help: if this is intentional, prefix it with an underscore: `_task`

error[E0277]: `dyn Future<Output = Result<(), deadpool::managed::hooks::HookError<tokio_postgres::Error>>> + Send` cannot be shared between threads safely
    --> crates/core/src/task_queue.rs:49:9
     |
49   | /         Box::pin(async move {
50   | |             db.get().await;
51   | |         })
     | |__________^ `dyn Future<Output = Result<(), deadpool::managed::hooks::HookError<tokio_postgres::Error>>> + Send` cannot be shared between threads safely
     |
     = help: the trait `std::marker::Sync` is not implemented for `dyn Future<Output = Result<(), deadpool::managed::hooks::HookError<tokio_postgres::Error>>> + Send`, which is required by `{async block@crates/core/src/task_queue.rs:49:18: 51:10}: std::marker::Sync`
     = note: required for `Unique<dyn Future<Output = Result<(), deadpool::managed::hooks::HookError<tokio_postgres::Error>>> + Send>` to implement `std::marker::Sync`
note: required because it appears within the type `Box<dyn Future<Output = Result<(), deadpool::managed::hooks::HookError<tokio_postgres::Error>>> + Send>`
    --> /home/sam/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/boxed.rs:236:12
     |
236  | pub struct Box<
     |            ^^^
note: required because it appears within the type `Pin<Box<dyn Future<Output = Result<(), deadpool::managed::hooks::HookError<tokio_postgres::Error>>> + Send>>`
    --> /home/sam/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/pin.rs:1090:12
     |
1090 | pub struct Pin<Ptr> {
     |            ^^^
note: required because it's used within this `async` fn body
    --> /home/sam/deadpool/src/managed/hooks.rs:124:42
     |
124  |       ) -> Result<(), HookError<M::Error>> {
     |  __________________________________________^
125  | |         for hook in &self.vec {
126  | |             match hook {
127  | |                 Hook::Fn(f) => f(&mut inner.obj, &inner.metrics)?,
...    |
131  | |         Ok(())
132  | |     }
     | |_____^
note: required because it's used within this `async` fn body
    --> /home/sam/deadpool/src/managed/mod.rs:383:62
     |
383  |       ) -> Result<Option<ObjectInner<M>>, PoolError<M::Error>> {
     |  ______________________________________________________________^
384  | |         let mut unready_obj = UnreadyObject {
385  | |             inner: Some(inner_obj),
386  | |             pool: &self.inner,
...    |
420  | |         Ok(Some(unready_obj.ready()))
421  | |     }
     | |_____^
note: required because it's used within this `async` fn body
    --> /home/sam/deadpool/src/managed/mod.rs:321:92
     |
321  |       pub async fn timeout_get(&self, timeouts: &Timeouts) -> Result<W, PoolError<M::Error>> {
     |  ____________________________________________________________________________________________^
322  | |         let _ = self.inner.users.fetch_add(1, Ordering::Relaxed);
323  | |         let users_guard = DropGuard(|| {
324  | |             let _ = self.inner.users.fetch_sub(1, Ordering::Relaxed);
...    |
375  | |         .into())
376  | |     }
     | |_____^
note: required because it's used within this `async` fn body
    --> /home/sam/deadpool/src/managed/mod.rs:311:63
     |
311  |       pub async fn get(&self) -> Result<W, PoolError<M::Error>> {
     |  _______________________________________________________________^
312  | |         self.timeout_get(&self.timeouts()).await
313  | |     }
     | |_____^
note: required because it's used within this `async` block
    --> crates/core/src/task_queue.rs:49:18
     |
49   |           Box::pin(async move {
     |  __________________^
50   | |             db.get().await;
51   | |         })
     | |_________^
     = note: required for the cast from `Pin<Box<{async block@crates/core/src/task_queue.rs:49:18: 51:10}>>` to `Pin<Box<dyn Future<Output = ()> + Send + std::marker::Sync>>`
     = note: the full name for the type has been written to '/home/sam/Documents/projects/personal/adrastos/target/debug/deps/adrastos_core-43f5e88ae6228d23.long-type-18325601849322697777.txt'
     = note: consider using `--verbose` to print the full type name to the console
     = note: the full name for the type has been written to '/home/sam/Documents/projects/personal/adrastos/target/debug/deps/adrastos_core-43f5e88ae6228d23.long-type-13642881050076820353.txt'
     = note: consider using `--verbose` to print the full type name to the console
     = note: the full name for the type has been written to '/home/sam/Documents/projects/personal/adrastos/target/debug/deps/adrastos_core-43f5e88ae6228d23.long-type-6699085670344616249.txt'
     = note: consider using `--verbose` to print the full type name to the console

error[E0277]: `dyn Future<Output = Result<(tokio_postgres::Client, tokio::task::JoinHandle<()>), tokio_postgres::Error>> + Send` cannot be shared between threads safely
    --> crates/core/src/task_queue.rs:49:9
     |
49   | /         Box::pin(async move {
50   | |             db.get().await;
51   | |         })
     | |__________^ `dyn Future<Output = Result<(tokio_postgres::Client, tokio::task::JoinHandle<()>), tokio_postgres::Error>> + Send` cannot be shared between threads safely
     |
     = help: the trait `std::marker::Sync` is not implemented for `dyn Future<Output = Result<(tokio_postgres::Client, tokio::task::JoinHandle<()>), tokio_postgres::Error>> + Send`, which is required by `{async block@crates/core/src/task_queue.rs:49:18: 51:10}: std::marker::Sync`
     = note: required for `Unique<dyn Future<Output = Result<(tokio_postgres::Client, tokio::task::JoinHandle<()>), tokio_postgres::Error>> + Send>` to implement `std::marker::Sync`
note: required because it appears within the type `Box<dyn Future<Output = Result<(tokio_postgres::Client, tokio::task::JoinHandle<()>), tokio_postgres::Error>> + Send>`
    --> /home/sam/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/boxed.rs:236:12
     |
236  | pub struct Box<
     |            ^^^
note: required because it appears within the type `Pin<Box<dyn Future<Output = Result<(tokio_postgres::Client, tokio::task::JoinHandle<()>), tokio_postgres::Error>> + Send>>`
    --> /home/sam/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/pin.rs:1090:12
     |
1090 | pub struct Pin<Ptr> {
     |            ^^^
note: required because it's used within this `async` fn body
    --> /home/sam/deadpool/postgres/src/lib.rs:149:60
     |
149  |       async fn create(&self) -> Result<ClientWrapper, Error> {
     |  ____________________________________________________________^
150  | |         let (client, conn_task) = self.connect.connect(&self.pg_config).await?;
151  | |         let client_wrapper = ClientWrapper::new(client, conn_task);
152  | |         self.statement_caches
153  | |             .attach(&client_wrapper.statement_cache);
154  | |         Ok(client_wrapper)
155  | |     }
     | |_____^
note: required because it's used within this `async` fn body
    --> /home/sam/deadpool/src/managed/mod.rs:653:30
     |
653  |   ) -> Result<O, PoolError<E>> {
     |  ______________________________^
654  | |     match (runtime, duration) {
655  | |         (_, None) => future.await.map_err(Into::into),
656  | |         (Some(runtime), Some(duration)) => runtime
...    |
662  | |     }
663  | | }
     | |_^
note: required because it's used within this `async` fn body
    --> /home/sam/deadpool/src/managed/mod.rs:427:62
     |
427  |       ) -> Result<Option<ObjectInner<M>>, PoolError<M::Error>> {
     |  ______________________________________________________________^
428  | |         let mut unready_obj = UnreadyObject {
429  | |             inner: Some(ObjectInner {
430  | |                 obj: apply_timeout(
...    |
455  | |         Ok(Some(unready_obj.ready()))
456  | |     }
     | |_____^
note: required because it's used within this `async` fn body
    --> /home/sam/deadpool/src/managed/mod.rs:321:92
     |
321  |       pub async fn timeout_get(&self, timeouts: &Timeouts) -> Result<W, PoolError<M::Error>> {
     |  ____________________________________________________________________________________________^
322  | |         let _ = self.inner.users.fetch_add(1, Ordering::Relaxed);
323  | |         let users_guard = DropGuard(|| {
324  | |             let _ = self.inner.users.fetch_sub(1, Ordering::Relaxed);
...    |
375  | |         .into())
376  | |     }
     | |_____^
note: required because it's used within this `async` fn body
    --> /home/sam/deadpool/src/managed/mod.rs:311:63
     |
311  |       pub async fn get(&self) -> Result<W, PoolError<M::Error>> {
     |  _______________________________________________________________^
312  | |         self.timeout_get(&self.timeouts()).await
313  | |     }
     | |_____^
note: required because it's used within this `async` block
    --> crates/core/src/task_queue.rs:49:18
     |
49   |           Box::pin(async move {
     |  __________________^
50   | |             db.get().await;
51   | |         })
     | |_________^
     = note: required for the cast from `Pin<Box<{async block@crates/core/src/task_queue.rs:49:18: 51:10}>>` to `Pin<Box<dyn Future<Output = ()> + Send + std::marker::Sync>>`
     = note: the full name for the type has been written to '/home/sam/Documents/projects/personal/adrastos/target/debug/deps/adrastos_core-43f5e88ae6228d23.long-type-18325601849322697777.txt'
     = note: consider using `--verbose` to print the full type name to the console
     = note: the full name for the type has been written to '/home/sam/Documents/projects/personal/adrastos/target/debug/deps/adrastos_core-43f5e88ae6228d23.long-type-13642881050076820353.txt'
     = note: consider using `--verbose` to print the full type name to the console
     = note: the full name for the type has been written to '/home/sam/Documents/projects/personal/adrastos/target/debug/deps/adrastos_core-43f5e88ae6228d23.long-type-11138257683375059716.txt'
     = note: consider using `--verbose` to print the full type name to the console
     = note: the full name for the type has been written to '/home/sam/Documents/projects/personal/adrastos/target/debug/deps/adrastos_core-43f5e88ae6228d23.long-type-18437699527042099929.txt'
     = note: consider using `--verbose` to print the full type name to the console

For more information about this error, try `rustc --explain E0277`.
warning: `adrastos_core` (lib) generated 4 warnings
error: could not compile `adrastos_core` (lib) due to 2 previous errors; 4 warnings emitted
use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc};

use tokio::{sync::RwLock, time};

use crate::{db::postgres::Database, id::Id};

type TaskFuture = Pin<Box<dyn Future<Output = ()> + Send + Sync>>;

pub struct TaskQueue(HashMap<String, Box<dyn Fn(String) -> TaskFuture + Send + Sync>>);

impl TaskQueue {
    pub fn new() -> Self {
        Self(HashMap::new())
    }

    pub fn add_task<T: Fn(String) -> TaskFuture + Send + Sync + 'static>(&mut self, task: T) {
        self.0.insert(Id::new().to_string(), Box::new(task));
    }

    // pub fn clear_task(&mut self, id: Id) {
    //     self.0.remove(&id);
    // }

    pub fn run(queue: Arc<RwLock<Self>>) -> tokio::task::JoinHandle<()> {
        tokio::spawn(async move {
            let mut interval = time::interval(tokio::time::Duration::from_millis(500));

            loop {
                interval.tick().await;
                for (id, task) in queue.write().await.0.iter() {
                    // task(id.clone());
                }
            }
        })
    }
}

impl Default for TaskQueue {
    fn default() -> Self {
        Self::new()
    }
}

fn a(db: deadpool_postgres::Pool) {
    let mut a = TaskQueue::new();
    a.add_task(move |id| {
        println!("{id}");
        let db = db.clone();
        Box::pin(async move {
            db.get().await;
        })
    });
}
@bikeshedder
Copy link
Owner

Could you provide a minimal reproduction example.
A PR for that in form of a unit test would be highly appreciated!

@bikeshedder bikeshedder added bug Category: This is a bug. A-core Area: Core / deadpool A-postgres Area: PostgreSQL support / deadpool-postgres labels Jun 17, 2024
@samchouse
Copy link
Author

Done #334

@samchouse
Copy link
Author

Actually I realized that I had something like this setup in Actix and the only difference between our types is this:

- Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
+ Pin<Box<dyn Future<Output = ()>>>;

I guess Send + Sync is unnecessary in this case however I'm not sure if this is still an issue with the library or the bounds are unnecessary in every case.

@bikeshedder
Copy link
Owner

The future returned by Pool::get() is + Send but not + Sync.

I do wonder why you need it to be + Sync? Futures can't be awaited or polled from multiple threads so I wonder why you need it to be + Sync in the first place?

After removing the + Sync markers the code compiles fine:

-    type ClosureFuture = Pin<Box<dyn std::future::Future<Output = ()> + Send + Sync>>;
-    async fn test_closure<T: Fn(String) -> ClosureFuture + Send + Sync + 'static>(_: T) {
+    type ClosureFuture = Pin<Box<dyn std::future::Future<Output = ()> + Send>>;
+    async fn test_closure<T: Fn(String) -> ClosureFuture + Send + 'static>(_: T) {

@samchouse
Copy link
Author

I'm not very good with async in Rust so I was just slapping Send + Sync everywhere. Since this is the case, everything seems to be fine with the lib and I'll close this issue. Sorry for the clutter!

@samchouse samchouse closed this as not planned Won't fix, can't repro, duplicate, stale Jun 17, 2024
@bikeshedder bikeshedder added invalid The issue is invalid and has been raised in error. and removed bug Category: This is a bug. labels Jun 17, 2024
@bikeshedder
Copy link
Owner

Adding Sync to a Future doesn't add anything. Quoting @Darksonn from the #tokio-users channel:

Generally it's nice for libraries to provide Sync futures as it avoids useless errors, but it's also a mistake for any library to require Sync.

Source: https://discord.com/channels/500028886025895936/500336333500448798/1252281055881334784

If you end up with some code that requires the future to be Sync you can put it inside a sync wrapper: https://github.com/tokio-rs/tokio/blob/master/tokio/src/util/sync_wrapper.rs

I'd be willing to change the deadpool crate so it returns Sync futures to satisfy code that uses futures wrong. A change like that must not change the hook and manager API though as I don't want to require those futures to be Sync.

Unless this becomes a major show stopper somewhere I won't be looking into that though. PRs welcome!

@samchouse
Copy link
Author

Thanks for all the reference it's super helpful! This issue is caused by me, not another library requiring the trait, so I was able to just remove the Sync trait from that future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-core Area: Core / deadpool A-postgres Area: PostgreSQL support / deadpool-postgres invalid The issue is invalid and has been raised in error.
Projects
None yet
Development

No branches or pull requests

2 participants