diff --git a/gremlin-client/src/aio/connection.rs b/gremlin-client/src/aio/connection.rs index 2e103520..b560d28f 100644 --- a/gremlin-client/src/aio/connection.rs +++ b/gremlin-client/src/aio/connection.rs @@ -65,6 +65,7 @@ pub enum Cmd { pub(crate) struct Conn { sender: Sender, valid: bool, + connection_uuid: Uuid, } impl std::fmt::Debug for Conn { @@ -138,7 +139,8 @@ impl Conn { let url = url::Url::parse(&opts.websocket_url()).expect("failed to parse url"); let websocket_config = opts.websocket_options.as_ref().map(WebSocketConfig::from); - info!("Openning websocket connection"); + let connection_uuid = Uuid::new_v4(); + info!("{connection_uuid} Openning websocket connection"); #[cfg(feature = "async-std-runtime")] let (client, _) = { @@ -161,18 +163,24 @@ impl Conn { .await? }; - info!("Opened websocket connection"); + info!("{connection_uuid} Opened websocket connection"); let (sink, stream) = client.split(); let (sender, receiver) = channel(20); let requests = Arc::new(Mutex::new(HashMap::new())); - sender_loop(sink, requests.clone(), receiver); + sender_loop(connection_uuid.clone(), sink, requests.clone(), receiver); - receiver_loop(stream, requests.clone(), sender.clone()); + receiver_loop( + connection_uuid.clone(), + stream, + requests.clone(), + sender.clone(), + ); Ok(Conn { sender, valid: true, + connection_uuid, }) } @@ -187,7 +195,10 @@ impl Conn { .send(Cmd::Msg((sender, id, payload))) .await .map_err(|e| { - error!("Marking websocket connection invalid on send error"); + error!( + "{} Marking websocket connection invalid on send error", + self.connection_uuid + ); self.valid = false; e })?; @@ -203,7 +214,10 @@ impl Conn { GremlinError::WebSocket(_) | GremlinError::WebSocketAsync(_) | GremlinError::WebSocketPoolAsync(_) => { - error!("Marking websocket connection invalid on received error"); + error!( + "{} Marking websocket connection invalid on received error", + self.connection_uuid + ); self.valid = false; } _ => {} @@ -219,17 +233,24 @@ impl Conn { impl Drop for Conn { fn drop(&mut self) { - warn!("Websocket connection instance dropped"); + warn!( + "{} Websocket connection instance dropped", + self.connection_uuid + ); send_shutdown(self); } } fn send_shutdown(conn: &mut Conn) { - warn!("Websocket connection instance shutting down channel"); + warn!( + "{} Websocket connection instance shutting down channel", + conn.connection_uuid + ); conn.sender.close_channel(); } fn sender_loop( + connection_uuid: Uuid, mut sink: SplitSink, requests: Arc>>>>, mut receiver: Receiver, @@ -242,7 +263,7 @@ fn sender_loop( let mut guard = requests.lock().await; guard.insert(msg.1, msg.0); if let Err(e) = sink.send(Message::Binary(msg.2)).await { - error!("Sink sending error occured"); + error!("{connection_uuid} Sink sending error occured"); let mut sender = guard.remove(&msg.1).unwrap(); sender .send(Err(GremlinError::from(Arc::new(e)))) @@ -252,29 +273,30 @@ fn sender_loop( drop(guard); } Cmd::Pong(data) => { - info!("Sending Pong"); + info!("{connection_uuid} Sending Pong",); sink.send(Message::Pong(data)) .await .expect("Failed to send pong message."); } Cmd::Shutdown => { - warn!("Shuting down connection"); + warn!("{connection_uuid} Shuting down connection"); let mut guard = requests.lock().await; guard.clear(); } }, None => { - warn!("Sending loop breaking"); + warn!("{connection_uuid} Sending loop breaking"); break; } } } - warn!("Sending loop closing sink"); + warn!("{connection_uuid} Sending loop closing sink"); let _ = sink.close().await; }); } fn receiver_loop( + connection_uuid: Uuid, mut stream: SplitStream, requests: Arc>>>>, mut sender: Sender, @@ -285,7 +307,7 @@ fn receiver_loop( Some(Err(error)) => { let mut guard = requests.lock().await; let error = Arc::new(error); - error!("Receiver loop error"); + error!("{connection_uuid} Receiver loop error"); for s in guard.values_mut() { match s.send(Err(error.clone().into())).await { Ok(_r) => {} @@ -319,13 +341,13 @@ fn receiver_loop( } } Message::Ping(data) => { - info!("Received Ping"); + info!("{connection_uuid} Received Ping"); let _ = sender.send(Cmd::Pong(data)).await; } _ => {} }, None => { - warn!("Receiver loop breaking"); + warn!("{connection_uuid} Receiver loop breaking"); break; } }