From fa7981f68a95d4cddbb07843af62b41c730ee0a3 Mon Sep 17 00:00:00 2001 From: Austin Bonander Date: Wed, 12 Aug 2020 23:55:32 -0700 Subject: [PATCH] fix(pool): ignore spurious wakeups when waiting for a connection fixes #622 --- sqlx-core/src/pool/inner.rs | 47 +++++++++++++++++++++++++------- tests/mysql/mysql.rs | 54 +++++++++++++++++++++++++++++++++++++ tests/postgres/postgres.rs | 54 +++++++++++++++++++++++++++++++++++++ 3 files changed, 145 insertions(+), 10 deletions(-) diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index e17903c4bb..aef5983e8a 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -13,12 +13,13 @@ use std::mem; use std::ptr; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::Arc; +use std::task::Context; use std::time::Instant; pub(crate) struct SharedPool { pub(super) connect_options: ::Options, pub(super) idle_conns: ArrayQueue>, - waiters: SegQueue, + waiters: SegQueue>, pub(super) size: AtomicU32, is_closed: AtomicBool, pub(super) options: PoolOptions, @@ -122,19 +123,22 @@ impl SharedPool { return Err(Error::PoolClosed); } - let mut waker_pushed = false; + let mut waiter = None; timeout( deadline_as_timeout::(deadline)?, // `poll_fn` gets us easy access to a `Waker` that we can push to our queue - future::poll_fn(|ctx| -> Poll<()> { - if !waker_pushed { - // only push the waker once - self.waiters.push(ctx.waker().to_owned()); - waker_pushed = true; - Poll::Pending - } else { + future::poll_fn(|cx| -> Poll<()> { + let waiter = waiter.get_or_insert_with(|| { + let waiter = Waiter::new(cx); + self.waiters.push(waiter.clone()); + waiter + }); + + if waiter.is_woken() { Poll::Ready(()) + } else { + Poll::Pending } }), ) @@ -346,7 +350,7 @@ fn spawn_reaper(pool: &Arc>) { /// (where the pool thinks it has more connections than it does). pub(in crate::pool) struct DecrementSizeGuard<'a> { size: &'a AtomicU32, - waiters: &'a SegQueue, + waiters: &'a SegQueue>, dropped: bool, } @@ -379,3 +383,26 @@ impl Drop for DecrementSizeGuard<'_> { } } } + +struct Waiter { + woken: AtomicBool, + waker: Waker, +} + +impl Waiter { + fn new(cx: &mut Context<'_>) -> Arc { + Arc::new(Self { + woken: AtomicBool::new(false), + waker: cx.waker().clone(), + }) + } + + fn wake(&self) { + self.woken.store(true, Ordering::Release); + self.waker.wake_by_ref(); + } + + fn is_woken(&self) -> bool { + self.woken.load(Ordering::Acquire) + } +} diff --git a/tests/mysql/mysql.rs b/tests/mysql/mysql.rs index 1bcd1c1024..fa69ee3914 100644 --- a/tests/mysql/mysql.rs +++ b/tests/mysql/mysql.rs @@ -333,3 +333,57 @@ async fn it_can_prepare_then_execute() -> anyhow::Result<()> { Ok(()) } + +// repro is more reliable with the basic scheduler used by `#[tokio::test]` +#[cfg(feature = "runtime-tokio")] +#[tokio::test] +async fn test_issue_622() -> anyhow::Result<()> { + use std::time::Instant; + + setup_if_needed(); + + let pool = MySqlPoolOptions::new() + .max_connections(1) // also fails with higher counts, e.g. 5 + .connect(&std::env::var("DATABASE_URL").unwrap()) + .await?; + + println!("pool state: {:?}", pool); + + let mut handles = vec![]; + + // given repro spawned 100 tasks but I found it reliably reproduced with 3 + for i in 0..3 { + let pool = pool.clone(); + + handles.push(sqlx_rt::spawn(async move { + { + let mut conn = pool.acquire().await.unwrap(); + + let _ = sqlx::query("SELECT 1").fetch_one(&mut conn).await.unwrap(); + + // conn gets dropped here and should be returned to the pool + } + + // (do some other work here without holding on to a connection) + // this actually fixes the issue, depending on the timeout used + // sqlx_rt::sleep(Duration::from_millis(500)).await; + + { + let start = Instant::now(); + match pool.acquire().await { + Ok(conn) => { + println!("{} acquire took {:?}", i, start.elapsed()); + drop(conn); + } + Err(e) => panic!("{} acquire returned error: {} pool state: {:?}", i, e, pool), + } + } + + Result::<(), anyhow::Error>::Ok(()) + })); + } + + futures::future::try_join_all(handles).await?; + + Ok(()) +} diff --git a/tests/postgres/postgres.rs b/tests/postgres/postgres.rs index bb7ba78fbe..f64a6b0a5a 100644 --- a/tests/postgres/postgres.rs +++ b/tests/postgres/postgres.rs @@ -709,3 +709,57 @@ async fn it_can_prepare_then_execute() -> anyhow::Result<()> { Ok(()) } + +// repro is more reliable with the basic scheduler used by `#[tokio::test]` +#[cfg(feature = "runtime-tokio")] +#[tokio::test] +async fn test_issue_622() -> anyhow::Result<()> { + use std::time::Instant; + + setup_if_needed(); + + let pool = PgPoolOptions::new() + .max_connections(1) // also fails with higher counts, e.g. 5 + .connect(&std::env::var("DATABASE_URL").unwrap()) + .await?; + + println!("pool state: {:?}", pool); + + let mut handles = vec![]; + + // given repro spawned 100 tasks but I found it reliably reproduced with 3 + for i in 0..3 { + let pool = pool.clone(); + + handles.push(sqlx_rt::spawn(async move { + { + let mut conn = pool.acquire().await.unwrap(); + + let _ = sqlx::query("SELECT 1").fetch_one(&mut conn).await.unwrap(); + + // conn gets dropped here and should be returned to the pool + } + + // (do some other work here without holding on to a connection) + // this actually fixes the issue, depending on the timeout used + // sqlx_rt::sleep(Duration::from_millis(500)).await; + + { + let start = Instant::now(); + match pool.acquire().await { + Ok(conn) => { + println!("{} acquire took {:?}", i, start.elapsed()); + drop(conn); + } + Err(e) => panic!("{} acquire returned error: {} pool state: {:?}", i, e, pool), + } + } + + Result::<(), anyhow::Error>::Ok(()) + })); + } + + futures::future::try_join_all(handles).await?; + + Ok(()) +}