Skip to content

Commit

Permalink
Fix rustfmt errors
Browse files Browse the repository at this point in the history
  • Loading branch information
caleb-leinz committed Jan 14, 2024
1 parent 4cf9726 commit 79a39f1
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@
//! ```
pub mod config;
pub mod strategies;
mod log;
pub mod strategies;

// in the future, there may be a mod for synchronous regular io too, which is why
// tokio is specifically chosen to place the async stuff
Expand Down
46 changes: 29 additions & 17 deletions src/tokio/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,16 +146,16 @@ where
pub async fn connect_with_options(ctor_arg: C, options: ReconnectOptions) -> io::Result<Self> {
let tcp = match T::establish(ctor_arg.clone()).await {
Ok(tcp) => {
logc!(Level::Info, "Initial connection succeeded.");
logc!(Level::Info, "Initial connection succeeded.");
(options.on_connect_callback)();
tcp
}
Err(e) => {
logc!(Level::Error, "Initial connection failed due to: {:?}.", e);
logc!(Level::Error, "Initial connection failed due to: {:?}.", e);
(options.on_connect_fail_callback)();

if options.exit_if_first_connect_fails {
logc!(Level::Error, "Bailing after initial connection failure.");
logc!(Level::Error, "Bailing after initial connection failure.");
return Err(e);
}

Expand All @@ -164,20 +164,22 @@ where
for (i, duration) in (options.retries_to_attempt_fn)().enumerate() {
let _reconnect_num = i + 1;

logc!(Level::Info,
logc!(
Level::Info,
"Will re-perform initial connect attempt #{} in {:?}.",
_reconnect_num, duration
);
_reconnect_num,
duration
);

sleep(duration).await;

logc!(Level::Info, "Attempting reconnect #{} now.", _reconnect_num);
logc!(Level::Info, "Attempting reconnect #{} now.", _reconnect_num);

match T::establish(ctor_arg.clone()).await {
Ok(tcp) => {
result = Ok(tcp);
(options.on_connect_callback)();
logc!(Level::Info, "Initial connection successfully established.");
logc!(Level::Info, "Initial connection successfully established.");
break;
}
Err(e) => {
Expand All @@ -190,7 +192,7 @@ where
match result {
Ok(tcp) => tcp,
Err(e) => {
logc!(Level::Error, "No more re-connect retries remaining. Never able to establish initial connection.");
logc!(Level::Error, "No more re-connect retries remaining. Never able to establish initial connection.");
return Err(e);
}
}
Expand All @@ -209,7 +211,7 @@ where
match &mut self.status {
// initial disconnect
Status::Connected => {
logc!(Level::Error, "Disconnect occurred");
logc!(Level::Error, "Disconnect occurred");
(self.options.on_disconnect_callback)();
self.status = Status::Disconnected(ReconnectStatus::new(&self.options));
}
Expand All @@ -228,7 +230,10 @@ where
let next_duration = match reconnect_status.attempts_tracker.retries_remaining.next() {
Some(duration) => duration,
None => {
logc!(Level::Error, "No more re-connect retries remaining. Giving up.");
logc!(
Level::Error,
"No more re-connect retries remaining. Giving up."
);
self.status = Status::FailedAndExhausted;
cx.waker().wake_by_ref();
return;
Expand All @@ -242,16 +247,18 @@ where

let reconnect_attempt = async move {
future_instant.await;
logc!(Level::Info, "Attempting reconnect #{} now.", _cur_num);
logc!(Level::Info, "Attempting reconnect #{} now.", _cur_num);
T::establish(ctor_arg).await
};

reconnect_status.reconnect_attempt = Box::pin(reconnect_attempt);

logc!(Level::Info,
logc!(
Level::Info,
"Will perform reconnect attempt #{} in {:?}.",
reconnect_status.attempts_tracker.attempt_num, next_duration
);
reconnect_status.attempts_tracker.attempt_num,
next_duration
);

cx.waker().wake_by_ref();
}
Expand All @@ -269,14 +276,19 @@ where

match attempt.poll(cx) {
Poll::Ready(Ok(underlying_io)) => {
logc!(Level::Info, "Connection re-established");
logc!(Level::Info, "Connection re-established");
cx.waker().wake_by_ref();
self.status = Status::Connected;
(self.options.on_connect_callback)();
self.underlying_io = underlying_io;
}
Poll::Ready(Err(_err)) => {
logc!(Level::Error, "Connection attempt #{} failed: {:?}", _attempt_num, _err);
logc!(
Level::Error,
"Connection attempt #{} failed: {:?}",
_attempt_num,
_err
);
self.on_disconnect(cx);
}
Poll::Pending => {}
Expand Down

0 comments on commit 79a39f1

Please sign in to comment.