Skip to content

Commit

Permalink
Introducing changes to allow for sync to compile (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
craftytrickster authored May 8, 2024
1 parent f451cfe commit bda25e3
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 9 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "stubborn-io"
version = "0.3.4"
version = "0.3.5"
authors = ["David Raifaizen <[email protected]>"]
edition = "2021"
description = "io traits/structs that automatically recover from potential disconnections/interruptions."
Expand All @@ -9,7 +9,7 @@ keywords = ["reconnect", "retry", "stubborn", "io", "StubbornTcpStream"]
repository = "https://github.com/craftytrickster/stubborn-io"
documentation = "https://docs.rs/stubborn-io"
readme = "README.md"

[dependencies]
tokio = { version = "1", features = ["time", "net"] }
rand = "0.8"
Expand Down
20 changes: 13 additions & 7 deletions src/tokio/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::io::{self, ErrorKind, IoSlice};
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
Expand Down Expand Up @@ -53,12 +54,13 @@ where

struct AttemptsTracker {
attempt_num: usize,
retries_remaining: Box<dyn Iterator<Item = Duration> + Send>,
retries_remaining: Box<dyn Iterator<Item = Duration> + Send + Sync>,
}

struct ReconnectStatus<T, C> {
attempts_tracker: AttemptsTracker,
reconnect_attempt: Pin<Box<dyn Future<Output = io::Result<T>> + Send>>,
#[allow(clippy::type_complexity)]
reconnect_attempt: Arc<Mutex<Pin<Box<dyn Future<Output = io::Result<T>> + Send>>>>,
_phantom_data: PhantomData<C>,
}

Expand All @@ -73,7 +75,9 @@ where
attempt_num: 0,
retries_remaining: (options.retries_to_attempt_fn)(),
},
reconnect_attempt: Box::pin(async { unreachable!("Not going to happen") }),
reconnect_attempt: Arc::new(Mutex::new(Box::pin(async {
unreachable!("Not going to happen")
}))),
_phantom_data: PhantomData,
}
}
Expand Down Expand Up @@ -244,7 +248,7 @@ where
T::establish(ctor_arg).await
};

reconnect_status.reconnect_attempt = Box::pin(reconnect_attempt);
reconnect_status.reconnect_attempt = Arc::new(Mutex::new(Box::pin(reconnect_attempt)));

info!(
"Will perform reconnect attempt #{} in {:?}.",
Expand All @@ -256,16 +260,18 @@ where
}

fn poll_disconnect(mut self: Pin<&mut Self>, cx: &mut Context) {
let (attempt, attempt_num) = match &mut self.status {
let (attempt, attempt_num) = match self.status {
Status::Connected => unreachable!(),
Status::Disconnected(ref mut status) => (
Pin::new(&mut status.reconnect_attempt),
status.reconnect_attempt.clone(),
status.attempts_tracker.attempt_num,
),
Status::FailedAndExhausted => unreachable!(),
};

match attempt.poll(cx) {
let mut attempt = attempt.lock().unwrap();

match attempt.as_mut().poll(cx) {
Poll::Ready(Ok(underlying_io)) => {
info!("Connection re-established");
cx.waker().wake_by_ref();
Expand Down
22 changes: 22 additions & 0 deletions tests/dummy_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,25 @@ mod already_connected {
assert!(msg.unwrap().is_err());
}
}

#[tokio::test]
async fn test_that_works_with_sync() {
fn make_framed<T>(_stream: T)
where
T: AsyncRead + AsyncWrite + Send + Sync + 'static,
{
let _ = _stream;
}

let options = ReconnectOptions::new();
let connect_outcomes = Arc::new(Mutex::new(vec![true]));
let ctor = DummyCtor {
connect_outcomes,
..DummyCtor::default()
};
let dummy = StubbornDummy::connect_with_options(ctor, options)
.await
.unwrap();

make_framed(dummy);
}

0 comments on commit bda25e3

Please sign in to comment.