From 3467a85dc2829038df5e49d62c48c3cabe189519 Mon Sep 17 00:00:00 2001 From: dippi Date: Sat, 3 Jul 2021 19:31:06 +0200 Subject: [PATCH 1/5] Resolve pubsub future only when nothing can happen anymore --- src/client/pubsub.rs | 57 +++++++++++++++++--------------------------- 1 file changed, 22 insertions(+), 35 deletions(-) diff --git a/src/client/pubsub.rs b/src/client/pubsub.rs index 4a91574..94b4b4b 100644 --- a/src/client/pubsub.rs +++ b/src/client/pubsub.rs @@ -131,7 +131,7 @@ impl PubsubConnectionInner { } } - fn handle_message(&mut self, msg: resp::RespValue) -> Result { + fn handle_message(&mut self, msg: resp::RespValue) -> Result<(), error::Error> { let (message_type, topic, msg) = match msg { resp::RespValue::Array(mut messages) => match ( messages.pop(), @@ -205,9 +205,6 @@ impl PubsubConnectionInner { ))); } } - if self.subscriptions.is_empty() { - return Ok(false); - } } b"punsubscribe" => { match self.psubscriptions.entry(topic) { @@ -221,9 +218,6 @@ impl PubsubConnectionInner { ))); } } - if self.psubscriptions.is_empty() { - return Ok(false); - } } b"message" => match self.subscriptions.get(&topic) { Some(sender) => { @@ -263,39 +257,32 @@ impl PubsubConnectionInner { } } - Ok(true) + Ok(()) } /// Returns true, if there are still valid subscriptions at the end, or false if not, i.e. the whole thing can be dropped. - fn handle_messages(&mut self, cx: &mut Context) -> Result { + fn handle_messages(&mut self, cx: &mut Context) -> Result<(), error::Error> { loop { match self.connection.poll_next_unpin(cx) { - Poll::Pending => return Ok(true), + Poll::Pending => return Ok(()), Poll::Ready(None) => { - if self.subscriptions.is_empty() { - return Ok(false); - } else { - // This can only happen if the connection is closed server-side - for sub in self.subscriptions.values() { - sub.unbounded_send(Err(error::Error::Connection( - ConnectionReason::NotConnected, - ))) - .unwrap(); - } - for psub in self.psubscriptions.values() { - psub.unbounded_send(Err(error::Error::Connection( - ConnectionReason::NotConnected, - ))) - .unwrap(); - } - return Err(error::Error::Connection(ConnectionReason::NotConnected)); + // This can only happen if the connection is closed server-side + for sub in self.subscriptions.values() { + sub.unbounded_send(Err(error::Error::Connection( + ConnectionReason::NotConnected, + ))) + .unwrap(); } + for psub in self.psubscriptions.values() { + psub.unbounded_send(Err(error::Error::Connection( + ConnectionReason::NotConnected, + ))) + .unwrap(); + } + return Err(error::Error::Connection(ConnectionReason::NotConnected)); } Poll::Ready(Some(Ok(message))) => { - let message_result = self.handle_message(message)?; - if !message_result { - return Ok(false); - } + self.handle_message(message)?; } Poll::Ready(Some(Err(e))) => { for sub in self.subscriptions.values() { @@ -326,11 +313,11 @@ impl Future for PubsubConnectionInner { let this_self = self.get_mut(); this_self.handle_new_subs(cx)?; this_self.do_flush(cx)?; - let cont = this_self.handle_messages(cx)?; - if cont { - Poll::Pending - } else { + this_self.handle_messages(cx)?; + if this_self.out_rx.is_done() { Poll::Ready(Ok(())) + } else { + Poll::Pending } } } From 36bf77259e7ffb71dd1055dd7668d90aef5431a7 Mon Sep 17 00:00:00 2001 From: dippi Date: Sun, 4 Jul 2021 12:27:33 +0200 Subject: [PATCH 2/5] Add tests to cover new pubsub completion policy --- src/client/pubsub.rs | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/src/client/pubsub.rs b/src/client/pubsub.rs index 94b4b4b..139a8ad 100644 --- a/src/client/pubsub.rs +++ b/src/client/pubsub.rs @@ -527,4 +527,40 @@ mod test { assert_eq!(result[1], "test-message-2".into()); assert_eq!(result[2], "test-message-3".into()); } + + #[tokio::test] + async fn test_connection_remains_open_after_unsubscription() { + let addr = "127.0.0.1:6379".parse().unwrap(); + let pubsub = super::pubsub_connect(addr) + .await + .expect("Cannot connect to Redis"); + + let topic_messages = pubsub + .subscribe("test-topic") + .await + .expect("Cannot subscribe to topic"); + drop(topic_messages); + + pubsub + .subscribe("test-topic") + .await + .expect("Cannot subscribe to topic"); + } +} + +#[tokio::test] +async fn test_connection_is_closed_after_channel_is_dropped() { + let addr = "127.0.0.1:6379".parse().unwrap(); + let connection = connect_with_auth(&addr, None, None) + .await + .expect("Cannot connect to Redis"); + let (out_tx, out_rx) = mpsc::unbounded(); + let handle = tokio::spawn(async { + match PubsubConnectionInner::new(connection, out_rx).await { + Ok(_) => (), + Err(e) => log::error!("Pub/Sub error: {:?}", e), + } + }); + drop(out_tx); + handle.await.expect("Complete"); } From b51f99f9eb178149d623b827e1c2ac2fffd60c06 Mon Sep 17 00:00:00 2001 From: dippi Date: Sun, 4 Jul 2021 12:51:31 +0200 Subject: [PATCH 3/5] Add a note on PubsubStream about the assumptions made --- src/client/pubsub.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/client/pubsub.rs b/src/client/pubsub.rs index 139a8ad..3bd94ba 100644 --- a/src/client/pubsub.rs +++ b/src/client/pubsub.rs @@ -443,6 +443,8 @@ impl PubsubConnection { pub struct PubsubStream { topic: String, underlying: PubsubStreamInner, + // Note that, to keep the Future running, PubsubConnectionInner relies on PubsubStream to hold + // a reference to the connection. If that's ever changed remember to adapt the readiness check. con: PubsubConnection, } From 26e1362dc1455541fe142c976e36cfb641c183e4 Mon Sep 17 00:00:00 2001 From: dippi Date: Sun, 4 Jul 2021 13:04:55 +0200 Subject: [PATCH 4/5] Reword expect in tests --- src/client/pubsub.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/pubsub.rs b/src/client/pubsub.rs index 3bd94ba..cf24a0c 100644 --- a/src/client/pubsub.rs +++ b/src/client/pubsub.rs @@ -564,5 +564,5 @@ async fn test_connection_is_closed_after_channel_is_dropped() { } }); drop(out_tx); - handle.await.expect("Complete"); + handle.await.expect("Error waiting on the JoinHandle"); } From 23a85ec51830f1c286c7edc7ef6cfd950fb087bd Mon Sep 17 00:00:00 2001 From: dippi Date: Sun, 4 Jul 2021 13:06:18 +0200 Subject: [PATCH 5/5] Add comment referencing the issue #50 --- src/client/pubsub.rs | 45 +++++++++++++++++++++----------------------- 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/src/client/pubsub.rs b/src/client/pubsub.rs index cf24a0c..dd263a1 100644 --- a/src/client/pubsub.rs +++ b/src/client/pubsub.rs @@ -193,32 +193,28 @@ impl PubsubConnectionInner { ))); } }, - b"unsubscribe" => { - match self.subscriptions.entry(topic) { - Entry::Occupied(entry) => { - entry.remove_entry(); - } - Entry::Vacant(vacant) => { - return Err(error::internal(format!( - "Unexpected unsubscribe message: {}", - vacant.key() - ))); - } + b"unsubscribe" => match self.subscriptions.entry(topic) { + Entry::Occupied(entry) => { + entry.remove_entry(); } - } - b"punsubscribe" => { - match self.psubscriptions.entry(topic) { - Entry::Occupied(entry) => { - entry.remove_entry(); - } - Entry::Vacant(vacant) => { - return Err(error::internal(format!( - "Unexpected unsubscribe message: {}", - vacant.key() - ))); - } + Entry::Vacant(vacant) => { + return Err(error::internal(format!( + "Unexpected unsubscribe message: {}", + vacant.key() + ))); } - } + }, + b"punsubscribe" => match self.psubscriptions.entry(topic) { + Entry::Occupied(entry) => { + entry.remove_entry(); + } + Entry::Vacant(vacant) => { + return Err(error::internal(format!( + "Unexpected unsubscribe message: {}", + vacant.key() + ))); + } + }, b"message" => match self.subscriptions.get(&topic) { Some(sender) => { if let Err(error) = sender.unbounded_send(Ok(msg)) { @@ -531,6 +527,7 @@ mod test { } #[tokio::test] + /// Regression test for https://github.com/benashford/redis-async-rs/issues/50 async fn test_connection_remains_open_after_unsubscription() { let addr = "127.0.0.1:6379".parse().unwrap(); let pubsub = super::pubsub_connect(addr)