Skip to content

Commit

Permalink
Revert "Added uuid to connection instance logging"
Browse files Browse the repository at this point in the history
This reverts commit a342d8c.
  • Loading branch information
criminosis committed Sep 27, 2024
1 parent 19c9a40 commit cebe99a
Showing 1 changed file with 16 additions and 38 deletions.
54 changes: 16 additions & 38 deletions gremlin-client/src/aio/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ pub enum Cmd {
pub(crate) struct Conn {
sender: Sender<Cmd>,
valid: bool,
connection_uuid: Uuid,
}

impl std::fmt::Debug for Conn {
Expand Down Expand Up @@ -139,8 +138,7 @@ impl Conn {
let url = url::Url::parse(&opts.websocket_url()).expect("failed to parse url");

let websocket_config = opts.websocket_options.as_ref().map(WebSocketConfig::from);
let connection_uuid = Uuid::new_v4();
info!("{connection_uuid} Openning websocket connection");
info!("Openning websocket connection");

#[cfg(feature = "async-std-runtime")]
let (client, _) = {
Expand All @@ -163,24 +161,18 @@ impl Conn {
.await?
};

info!("{connection_uuid} Opened websocket connection");
info!("Opened websocket connection");
let (sink, stream) = client.split();
let (sender, receiver) = channel(20);
let requests = Arc::new(Mutex::new(HashMap::new()));

sender_loop(connection_uuid.clone(), sink, requests.clone(), receiver);
sender_loop(sink, requests.clone(), receiver);

receiver_loop(
connection_uuid.clone(),
stream,
requests.clone(),
sender.clone(),
);
receiver_loop(stream, requests.clone(), sender.clone());

Ok(Conn {
sender,
valid: true,
connection_uuid,
})
}

Expand All @@ -195,10 +187,7 @@ impl Conn {
.send(Cmd::Msg((sender, id, payload)))
.await
.map_err(|e| {
error!(
"{} Marking websocket connection invalid on send error",
self.connection_uuid
);
error!("Marking websocket connection invalid on send error");
self.valid = false;
e
})?;
Expand All @@ -214,10 +203,7 @@ impl Conn {
GremlinError::WebSocket(_)
| GremlinError::WebSocketAsync(_)
| GremlinError::WebSocketPoolAsync(_) => {
error!(
"{} Marking websocket connection invalid on received error",
self.connection_uuid
);
error!("Marking websocket connection invalid on received error");
self.valid = false;
}
_ => {}
Expand All @@ -233,24 +219,17 @@ impl Conn {

impl Drop for Conn {
fn drop(&mut self) {
warn!(
"{} Websocket connection instance dropped",
self.connection_uuid
);
warn!("Websocket connection instance dropped");
send_shutdown(self);
}
}

fn send_shutdown(conn: &mut Conn) {
warn!(
"{} Websocket connection instance shutting down channel",
conn.connection_uuid
);
warn!("Websocket connection instance shutting down channel");
conn.sender.close_channel();
}

fn sender_loop(
connection_uuid: Uuid,
mut sink: SplitSink<WSStream, Message>,
requests: Arc<Mutex<HashMap<Uuid, Sender<GremlinResult<Response>>>>>,
mut receiver: Receiver<Cmd>,
Expand All @@ -263,7 +242,7 @@ fn sender_loop(
let mut guard = requests.lock().await;
guard.insert(msg.1, msg.0);
if let Err(e) = sink.send(Message::Binary(msg.2)).await {
error!("{connection_uuid} Sink sending error occured");
error!("Sink sending error occured");
let mut sender = guard.remove(&msg.1).unwrap();
sender
.send(Err(GremlinError::from(Arc::new(e))))
Expand All @@ -273,30 +252,29 @@ fn sender_loop(
drop(guard);
}
Cmd::Pong(data) => {
info!("{connection_uuid} Sending Pong",);
info!("Sending Pong");
sink.send(Message::Pong(data))
.await
.expect("Failed to send pong message.");
}
Cmd::Shutdown => {
warn!("{connection_uuid} Shuting down connection");
warn!("Shuting down connection");
let mut guard = requests.lock().await;
guard.clear();
}
},
None => {
warn!("{connection_uuid} Sending loop breaking");
warn!("Sending loop breaking");
break;
}
}
}
warn!("{connection_uuid} Sending loop closing sink");
warn!("Sending loop closing sink");
let _ = sink.close().await;
});
}

fn receiver_loop(
connection_uuid: Uuid,
mut stream: SplitStream<WSStream>,
requests: Arc<Mutex<HashMap<Uuid, Sender<GremlinResult<Response>>>>>,
mut sender: Sender<Cmd>,
Expand All @@ -307,7 +285,7 @@ fn receiver_loop(
Some(Err(error)) => {
let mut guard = requests.lock().await;
let error = Arc::new(error);
error!("{connection_uuid} Receiver loop error");
error!("Receiver loop error");
for s in guard.values_mut() {
match s.send(Err(error.clone().into())).await {
Ok(_r) => {}
Expand Down Expand Up @@ -341,13 +319,13 @@ fn receiver_loop(
}
}
Message::Ping(data) => {
info!("{connection_uuid} Received Ping");
info!("Received Ping");
let _ = sender.send(Cmd::Pong(data)).await;
}
_ => {}
},
None => {
warn!("{connection_uuid} Receiver loop breaking");
warn!("Receiver loop breaking");
break;
}
}
Expand Down

0 comments on commit cebe99a

Please sign in to comment.