Skip to content

Commit

Permalink
Improve RemoteSequencer tracing and logging
Browse files Browse the repository at this point in the history
  • Loading branch information
muhamadazmy committed Nov 13, 2024
1 parent 0f579f6 commit 8ab3785
Showing 1 changed file with 30 additions and 7 deletions.
37 changes: 30 additions & 7 deletions crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::{
};

use tokio::sync::{mpsc, Mutex, OwnedSemaphorePermit, Semaphore};
use tracing::{debug, instrument, Span};

use restate_core::{
network::{
Expand All @@ -33,7 +34,6 @@ use restate_types::{
replicated_loglet::ReplicatedLogletParams,
GenerationalNodeId,
};
use tracing::instrument;

use super::rpc_routers::SequencersRpc;
use crate::loglet::{
Expand Down Expand Up @@ -107,7 +107,7 @@ where
}

#[instrument(
level="trace",
level="debug",
skip_all,
fields(
otel.name = "replicated_loglet::remote_sequencer: append",
Expand Down Expand Up @@ -148,16 +148,14 @@ where
Ok(token) => break token,
Err(err) => {
match err.source {
NetworkError::ConnectError(_)
| NetworkError::ConnectionClosed(_)
| NetworkError::Timeout(_) => {
// we retry to re-connect one time
err @ NetworkError::Full => return Err(err.into()),
_ => {
// we retry on any other network error
connection = self.renew_connection(connection).await?;

msg = err.original;
continue;
}
err => return Err(err.into()),
}
}
};
Expand All @@ -170,6 +168,7 @@ where
}

/// Gets or starts a new remote sequencer connection
#[instrument(level = "debug", skip_all)]
async fn get_connection(&self) -> Result<RemoteSequencerConnection, NetworkError> {
let mut guard = self.connection.lock().await;
if let Some(connection) = guard.deref() {
Expand All @@ -190,6 +189,7 @@ where

/// Renew a connection to a remote sequencer. This guarantees that only a single connection
/// to the sequencer is available.
#[instrument(level = "debug", skip_all, fields(renewed = false))]
async fn renew_connection(
&self,
old: RemoteSequencerConnection,
Expand All @@ -207,6 +207,8 @@ where
.node_connection(self.params.sequencer)
.await?;

Span::current().record("renewed", true);

let connection =
RemoteSequencerConnection::start(self.known_global_tail.clone(), connection)?;

Expand All @@ -232,6 +234,7 @@ struct RemoteSequencerConnection {
}

impl RemoteSequencerConnection {
#[instrument(level = "debug", name = "connection_start", skip_all)]
fn start(
known_global_tail: TailOffsetWatch,
connection: WeakConnection,
Expand Down Expand Up @@ -260,6 +263,16 @@ impl RemoteSequencerConnection {
sequencer: GenerationalNodeId,
msg: Append,
) -> Result<RpcToken<Appended>, NetworkSendError<Append>> {
// there are other reasons that can render this connection unusable
// even if the underlying connection is still valid.
// if the channel is closed, this connection cannot be used anymore
if self.tx.is_closed() {
return Err(NetworkSendError::new(
msg,
NetworkError::Unavailable("Inflight commits channel is closed".into()),
));
}

let outgoing = Outgoing::new(sequencer, msg).assign_connection(self.inner.clone());

rpc_router
Expand All @@ -283,6 +296,7 @@ impl RemoteSequencerConnection {
if let Err(err) = self.tx.send(inflight_append) {
// if we failed to push this to be processed by the connection reactor task
// then we need to notify the caller
debug!("Inflight channel closed. Resolve commit as connection closed");
err.0
.commit_resolver
.error(AppendError::retryable(NetworkError::ConnectionClosed(
Expand All @@ -296,6 +310,7 @@ impl RemoteSequencerConnection {
/// This task will run until the [`AppendStream`] is dropped. Once dropped
/// all pending commits will be resolved with an error. it's up to the enqueuer
/// to retry if needed.
#[instrument(level = "debug", skip_all)]
async fn handle_appended_responses(
known_global_tail: TailOffsetWatch,
connection: WeakConnection,
Expand All @@ -315,6 +330,7 @@ impl RemoteSequencerConnection {
inflight
}
_ = &mut closed => {
debug!("Sequencer Connection closed while waiting for next inflight append");
break AppendError::retryable(NetworkError::ConnectionClosed(connection.peer()));
}
};
Expand All @@ -335,6 +351,7 @@ impl RemoteSequencerConnection {
incoming.map_err(AppendError::Shutdown)
},
_ = &mut closed => {
debug!("Sequencer Connection closed while waiting for response");
Err(AppendError::retryable(NetworkError::ConnectionClosed(connection.peer())))
}
};
Expand Down Expand Up @@ -375,6 +392,7 @@ impl RemoteSequencerConnection {
// While the UnknownLoglet status is non-terminal for the connection
// (since only one request is bad),
// the AppendError for the caller is terminal
debug!(error=%err, "Resolve commit with error");
commit_resolver.error(AppendError::other(err));
}
}
Expand All @@ -394,10 +412,15 @@ impl RemoteSequencerConnection {
//
// For now this should not be a problem since they can (possibly) retry
// to do the write again later.
debug!(cause=%err, "Draining inflight channel");
let mut count = 0;
while let Some(inflight) = rx.recv().await {
inflight.commit_resolver.error(err.clone());
count += 1;
}

debug!("Drained/Cancelled {count} inflight commits");

Ok(())
}
}
Expand Down

0 comments on commit 8ab3785

Please sign in to comment.