From d6dc82500217424e6778f451dcc08c1407dbb572 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Fri, 13 Oct 2023 17:21:39 +0200 Subject: [PATCH 01/18] Add tracing --- libsignal-service-actix/Cargo.toml | 1 + libsignal-service-hyper/Cargo.toml | 1 + libsignal-service/Cargo.toml | 2 ++ libsignal-service/src/cipher.rs | 4 ++++ libsignal-service/src/envelope.rs | 1 + 5 files changed, 9 insertions(+) diff --git a/libsignal-service-actix/Cargo.toml b/libsignal-service-actix/Cargo.toml index 803c27ee3..4535a799d 100644 --- a/libsignal-service-actix/Cargo.toml +++ b/libsignal-service-actix/Cargo.toml @@ -18,6 +18,7 @@ actix-rt = "2.4" mpart-async = "0.6" serde_json = "1.0" futures = "0.3" +tracing = "0.1" bytes = "1" rustls = "0.21" rustls-pemfile = "0.3" diff --git a/libsignal-service-hyper/Cargo.toml b/libsignal-service-hyper/Cargo.toml index a62147b57..982daecc0 100644 --- a/libsignal-service-hyper/Cargo.toml +++ b/libsignal-service-hyper/Cargo.toml @@ -12,6 +12,7 @@ libsignal-service = { path = "../libsignal-service" } async-trait = "0.1" bytes = "1.0" futures = "0.3" +tracing = "0.1" log = "0.4" mpart-async = "0.6" serde = "1.0" diff --git a/libsignal-service/Cargo.toml b/libsignal-service/Cargo.toml index 6c6a4b6b6..7ccd4c1bc 100644 --- a/libsignal-service/Cargo.toml +++ b/libsignal-service/Cargo.toml @@ -35,6 +35,8 @@ thiserror = "1.0" url = { version = "2.1", features = ["serde"] } uuid = { version = "1", features = ["serde"] } +tracing = "0.1" + [build-dependencies] prost-build = "0.10" diff --git a/libsignal-service/src/cipher.rs b/libsignal-service/src/cipher.rs index b91cc5ee0..926406770 100644 --- a/libsignal-service/src/cipher.rs +++ b/libsignal-service/src/cipher.rs @@ -58,6 +58,7 @@ where /// Opens ("decrypts") an envelope. /// /// Envelopes may be empty, in which case this method returns `Ok(None)` + #[tracing::instrument] pub async fn open_envelope( &mut self, envelope: Envelope, @@ -89,6 +90,7 @@ where /// Triage of legacy messages happens inside this method, as opposed to the /// Java implementation, because it makes the borrow checker and the /// author happier. + #[tracing::instrument] async fn decrypt( &mut self, envelope: &Envelope, @@ -276,6 +278,7 @@ where Ok(plaintext) } + #[tracing::instrument] pub(crate) async fn encrypt( &mut self, address: &ProtocolAddress, @@ -426,6 +429,7 @@ pub async fn get_preferred_protocol_address( /// is then validated against the `trust_root` baked into the client to ensure that the sender's /// identity was not forged. #[allow(clippy::too_many_arguments)] +#[tracing::instrument] async fn sealed_sender_decrypt( ciphertext: &[u8], trust_root: &PublicKey, diff --git a/libsignal-service/src/envelope.rs b/libsignal-service/src/envelope.rs index c90a36330..a5071a961 100644 --- a/libsignal-service/src/envelope.rs +++ b/libsignal-service/src/envelope.rs @@ -27,6 +27,7 @@ impl TryFrom for Envelope { } impl Envelope { + #[tracing::instrument] pub fn decrypt( input: &[u8], signaling_key: &SignalingKey, From c58663c6675319ed38779809134a2108a991f711 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Sat, 6 Jan 2024 14:29:06 +0100 Subject: [PATCH 02/18] s/log::/tracing:: --- libsignal-service-actix/Cargo.toml | 1 - .../examples/registering.rs | 1 - libsignal-service-actix/src/push_service.rs | 42 +++++++++------- libsignal-service-actix/src/websocket.rs | 22 ++++----- libsignal-service-hyper/src/push_service.rs | 14 +++--- libsignal-service-hyper/src/websocket.rs | 22 ++++----- libsignal-service/src/account_manager.rs | 12 ++--- libsignal-service/src/cipher.rs | 6 +-- libsignal-service/src/envelope.rs | 6 +-- libsignal-service/src/groups_v2/manager.rs | 2 +- libsignal-service/src/groups_v2/operations.rs | 4 +- libsignal-service/src/messagepipe.rs | 4 +- libsignal-service/src/models.rs | 2 +- libsignal-service/src/proto.rs | 2 +- libsignal-service/src/provisioning/pipe.rs | 2 +- libsignal-service/src/push_service.rs | 2 +- libsignal-service/src/receiver.rs | 2 +- libsignal-service/src/sender.rs | 49 ++++++++++--------- libsignal-service/src/websocket.rs | 28 +++++------ 19 files changed, 116 insertions(+), 107 deletions(-) diff --git a/libsignal-service-actix/Cargo.toml b/libsignal-service-actix/Cargo.toml index 4535a799d..52e97b7eb 100644 --- a/libsignal-service-actix/Cargo.toml +++ b/libsignal-service-actix/Cargo.toml @@ -34,7 +34,6 @@ base64 = "0.13" phonenumber = "0.3" [dev-dependencies] -env_logger = "0.9" image = { version = "0.23", default-features = false, features = ["png"] } opener = "0.5" qrcode = "0.12" diff --git a/libsignal-service-actix/examples/registering.rs b/libsignal-service-actix/examples/registering.rs index cf41ec7ce..fe3c8df2b 100644 --- a/libsignal-service-actix/examples/registering.rs +++ b/libsignal-service-actix/examples/registering.rs @@ -32,7 +32,6 @@ use structopt::StructOpt; #[actix_rt::main] async fn main() -> Result<(), Error> { - env_logger::init(); let client = "libsignal-service-hyper-example"; let use_voice = false; diff --git a/libsignal-service-actix/src/push_service.rs b/libsignal-service-actix/src/push_service.rs index 639a5a25f..5a011da3a 100644 --- a/libsignal-service-actix/src/push_service.rs +++ b/libsignal-service-actix/src/push_service.rs @@ -47,7 +47,7 @@ impl AwcPushService { credentials_override: HttpAuthOverride, ) -> Result { let url = self.cfg.base_url(endpoint).join(path.as_ref())?; - log::debug!("HTTP request {} {}", method, url); + tracing::debug!("HTTP request {} {}", method, url); let mut builder = self.client.request(method, url.as_str()); for &header in additional_headers { builder = builder.insert_header(header); @@ -108,7 +108,7 @@ impl AwcPushService { StatusCode::CONFLICT => { let mismatched_devices = response.json().await.map_err(|e| { - log::error!( + tracing::error!( "Failed to decode HTTP 409 response: {}", e ); @@ -122,7 +122,10 @@ impl AwcPushService { }, StatusCode::GONE => { let stale_devices = response.json().await.map_err(|e| { - log::error!("Failed to decode HTTP 410 response: {}", e); + tracing::error!( + "Failed to decode HTTP 410 response: {}", + e + ); ServiceError::UnhandledResponseCode { http_code: StatusCode::GONE.as_u16(), } @@ -131,7 +134,10 @@ impl AwcPushService { }, StatusCode::PRECONDITION_REQUIRED => { let proof_required = response.json().await.map_err(|e| { - log::error!("Failed to decode HTTP 428 response: {}", e); + tracing::error!( + "Failed to decode HTTP 428 response: {}", + e + ); ServiceError::UnhandledResponseCode { http_code: StatusCode::PRECONDITION_REQUIRED.as_u16(), } @@ -141,7 +147,7 @@ impl AwcPushService { // XXX: fill in rest from PushServiceSocket code => { let contents = response.body().await; - log::trace!( + tracing::trace!( "Unhandled response {} with body: {:?}", code.as_u16(), contents, @@ -191,7 +197,7 @@ impl PushService for AwcPushService { }, })?; - log::debug!("AwcPushService::get response: {:?}", response); + tracing::debug!("AwcPushService::get response: {:?}", response); Self::from_response(&mut response).await?; @@ -203,7 +209,7 @@ impl PushService for AwcPushService { // actix already imports that anyway. let text = match response.body().await { Ok(text) => { - log::debug!( + tracing::debug!( "GET response: {:?}", String::from_utf8_lossy(&text) ); @@ -249,7 +255,7 @@ impl PushService for AwcPushService { }, })?; - log::debug!("AwcPushService::delete response: {:?}", response); + tracing::debug!("AwcPushService::delete response: {:?}", response); Self::from_response(&mut response).await?; @@ -261,7 +267,7 @@ impl PushService for AwcPushService { // actix already imports that anyway. let text = match response.body().await { Ok(text) => { - log::debug!( + tracing::debug!( "GET response: {:?}", String::from_utf8_lossy(&text) ); @@ -303,7 +309,7 @@ impl PushService for AwcPushService { reason: e.to_string(), })?; - log::debug!("AwcPushService::put response: {:?}", response); + tracing::debug!("AwcPushService::put response: {:?}", response); Self::from_response(&mut response).await?; @@ -315,7 +321,7 @@ impl PushService for AwcPushService { // actix already imports that anyway. let text = match response.body().await { Ok(text) => { - log::debug!( + tracing::debug!( "GET response: {:?}", String::from_utf8_lossy(&text) ); @@ -356,7 +362,7 @@ impl PushService for AwcPushService { reason: e.to_string(), })?; - log::debug!("AwcPushService::patch response: {:?}", response); + tracing::debug!("AwcPushService::patch response: {:?}", response); Self::from_response(&mut response).await?; @@ -368,7 +374,7 @@ impl PushService for AwcPushService { // actix already imports that anyway. let text = match response.body().await { Ok(text) => { - log::debug!( + tracing::debug!( "PATCH response: {:?}", String::from_utf8_lossy(&text) ); @@ -409,7 +415,7 @@ impl PushService for AwcPushService { reason: e.to_string(), })?; - log::debug!("AwcPushService::post response: {:?}", response); + tracing::debug!("AwcPushService::post response: {:?}", response); Self::from_response(&mut response).await?; @@ -421,7 +427,7 @@ impl PushService for AwcPushService { // actix already imports that anyway. let text = match response.body().await { Ok(text) => { - log::debug!( + tracing::debug!( "GET response: {:?}", String::from_utf8_lossy(&text) ); @@ -527,7 +533,7 @@ impl PushService for AwcPushService { reason: e.to_string(), })?; - log::debug!("AwcPushService::get_stream response: {:?}", response); + tracing::debug!("AwcPushService::get_stream response: {:?}", response); Self::from_response(&mut response).await?; @@ -600,7 +606,7 @@ impl PushService for AwcPushService { // Unwrap, because no error type was used above body_contents.extend(b.unwrap()); } - log::trace!( + tracing::trace!( "Sending PUT with Content-Type={} and length {}", content_type, body_contents.len() @@ -615,7 +621,7 @@ impl PushService for AwcPushService { reason: e.to_string(), })?; - log::debug!("AwcPushService::put response: {:?}", response); + tracing::debug!("AwcPushService::put response: {:?}", response); Self::from_response(&mut response).await?; diff --git a/libsignal-service-actix/src/websocket.rs b/libsignal-service-actix/src/websocket.rs index 5b224e720..53c35a041 100644 --- a/libsignal-service-actix/src/websocket.rs +++ b/libsignal-service-actix/src/websocket.rs @@ -86,9 +86,9 @@ where futures::pin_mut!(tick); futures::select! { _ = tick => { - log::trace!("Triggering keep-alive"); + tracing::trace!("Triggering keep-alive"); if let Err(e) = incoming_sink.send(WebSocketStreamItem::KeepAliveRequest).await { - log::info!("Websocket sink has closed: {:?}.", e); + tracing::info!("Websocket sink has closed: {:?}.", e); break; }; }, @@ -96,7 +96,7 @@ where let frame = if let Some(frame) = frame { frame } else { - log::info!("process: Socket stream ended"); + tracing::info!("process: Socket stream ended"); break; }; @@ -105,24 +105,24 @@ where Frame::Continuation(_c) => todo!(), Frame::Ping(msg) => { - log::warn!("Received Ping({:?})", msg); + tracing::warn!("Received Ping({:?})", msg); continue; }, Frame::Pong(msg) => { - log::trace!("Received Pong({:?})", msg); + tracing::trace!("Received Pong({:?})", msg); continue; }, Frame::Text(frame) => { - log::warn!("Frame::Text {:?}", frame); + tracing::warn!("Frame::Text {:?}", frame); // this is a protocol violation, maybe break; is better? continue; }, Frame::Close(c) => { - log::warn!("Websocket closing: {:?}", c); + tracing::warn!("Websocket closing: {:?}", c); break; }, @@ -130,7 +130,7 @@ where // Match SendError if let Err(e) = incoming_sink.send(WebSocketStreamItem::Message(frame)).await { - log::info!("Websocket sink has closed: {:?}.", e); + tracing::info!("Websocket sink has closed: {:?}.", e); break; } }, @@ -160,14 +160,14 @@ impl AwcWebSocket { ); } - log::trace!("Will start websocket at {:?}", url); + tracing::trace!("Will start websocket at {:?}", url); let mut ws = client.ws(url.as_str()); for (key, value) in additional_headers { ws = ws.header(*key, *value); } let (response, framed) = ws.connect().await?; - log::debug!("WebSocket connected: {:?}", response); + tracing::debug!("WebSocket connected: {:?}", response); let (incoming_sink, incoming_stream) = channel(5); @@ -179,7 +179,7 @@ impl AwcWebSocket { actix_rt::spawn(processing_task.map(|v| match v { Ok(()) => (), Err(e) => { - log::warn!("Processing task terminated with error: {:?}", e) + tracing::warn!("Processing task terminated with error: {:?}", e) }, })); diff --git a/libsignal-service-hyper/src/push_service.rs b/libsignal-service-hyper/src/push_service.rs index f13648cb3..952cce4b3 100644 --- a/libsignal-service-hyper/src/push_service.rs +++ b/libsignal-service-hyper/src/push_service.rs @@ -92,7 +92,7 @@ impl HyperPushService { body: Option, ) -> Result, ServiceError> { let url = self.cfg.base_url(endpoint).join(path.as_ref())?; - log::debug!("HTTP request {} {}", method, url); + tracing::debug!("HTTP request {} {}", method, url); let mut builder = Request::builder() .method(method) .uri(url.as_str()) @@ -159,7 +159,7 @@ impl HyperPushService { StatusCode::CONFLICT => { let mismatched_devices = Self::json(&mut response).await.map_err(|e| { - log::error!( + tracing::error!( "Failed to decode HTTP 409 response: {}", e ); @@ -174,7 +174,7 @@ impl HyperPushService { StatusCode::GONE => { let stale_devices = Self::json(&mut response).await.map_err(|e| { - log::error!( + tracing::error!( "Failed to decode HTTP 410 response: {}", e ); @@ -187,7 +187,7 @@ impl HyperPushService { StatusCode::PRECONDITION_REQUIRED => { let proof_required = Self::json(&mut response).await.map_err(|e| { - log::error!( + tracing::error!( "Failed to decode HTTP 428 response: {}", e ); @@ -200,7 +200,7 @@ impl HyperPushService { }, // XXX: fill in rest from PushServiceSocket code => { - log::trace!( + tracing::trace!( "Unhandled response {} with body: {}", code.as_u16(), Self::text(&mut response).await?, @@ -554,7 +554,7 @@ impl PushService for HyperPushService { // Unwrap, because no error type was used above body_contents.extend(b.unwrap()); } - log::trace!( + tracing::trace!( "Sending PUT with Content-Type={} and length {}", content_type, body_contents.len() @@ -574,7 +574,7 @@ impl PushService for HyperPushService { ) .await?; - log::debug!("HyperPushService::PUT response: {:?}", response); + tracing::debug!("HyperPushService::PUT response: {:?}", response); Ok(()) } diff --git a/libsignal-service-hyper/src/websocket.rs b/libsignal-service-hyper/src/websocket.rs index 7747686ea..34624c0b3 100644 --- a/libsignal-service-hyper/src/websocket.rs +++ b/libsignal-service-hyper/src/websocket.rs @@ -103,9 +103,9 @@ where loop { tokio::select! { _ = ka_interval.tick() => { - log::trace!("Triggering keep-alive"); + tracing::trace!("Triggering keep-alive"); if let Err(e) = incoming_sink.send(WebSocketStreamItem::KeepAliveRequest).await { - log::info!("Websocket sink has closed: {:?}.", e); + tracing::info!("Websocket sink has closed: {:?}.", e); break; }; }, @@ -113,31 +113,31 @@ where let frame = if let Some(frame) = frame { frame } else { - log::info!("process: Socket stream ended"); + tracing::info!("process: Socket stream ended"); break; }; let frame = match frame? { Message::Binary(s) => s, Message::Ping(msg) => { - log::warn!("Received Ping({:?})", msg); + tracing::warn!("Received Ping({:?})", msg); continue; }, Message::Pong(msg) => { - log::trace!("Received Pong({:?})", msg); + tracing::trace!("Received Pong({:?})", msg); continue; }, Message::Text(frame) => { - log::warn!("Message::Text {:?}", frame); + tracing::warn!("Message::Text {:?}", frame); // this is a protocol violation, maybe break; is better? continue; }, Message::Close(c) => { - log::warn!("Websocket closing: {:?}", c); + tracing::warn!("Websocket closing: {:?}", c); break; }, @@ -146,7 +146,7 @@ where // Match SendError if let Err(e) = incoming_sink.send(WebSocketStreamItem::Message(Bytes::from(frame))).await { - log::info!("Websocket sink has closed: {:?}.", e); + tracing::info!("Websocket sink has closed: {:?}.", e); break; } }, @@ -181,7 +181,7 @@ impl TungsteniteWebSocket { ); } - log::trace!("Will start websocket at {:?}", url); + tracing::trace!("Will start websocket at {:?}", url); let mut request = url.into_client_request()?; @@ -198,7 +198,7 @@ impl TungsteniteWebSocket { connect_async_with_tls_connector(request, Some(tls_connector)) .await?; - log::debug!("WebSocket connected: {:?}", response); + tracing::debug!("WebSocket connected: {:?}", response); let (incoming_sink, incoming_stream) = channel(5); @@ -210,7 +210,7 @@ impl TungsteniteWebSocket { tokio::spawn(processing_task.map(|v| match v { Ok(()) => (), Err(e) => { - log::warn!("Processing task terminated with error: {:?}", e) + tracing::warn!("Processing task terminated with error: {:?}", e) }, })); diff --git a/libsignal-service/src/account_manager.rs b/libsignal-service/src/account_manager.rs index 5dac2b1b5..c1d542816 100644 --- a/libsignal-service/src/account_manager.rs +++ b/libsignal-service/src/account_manager.rs @@ -108,7 +108,7 @@ impl AccountManager { { Ok(status) => status, Err(ServiceError::Unauthorized) => { - log::info!("Got Unauthorized when fetching pre-key status. Assuming first installment."); + tracing::info!("Got Unauthorized when fetching pre-key status. Assuming first installment."); // Additionally, the second PUT request will fail if this really comes down to an // authorization failure. crate::push_service::PreKeyStatus { @@ -118,12 +118,12 @@ impl AccountManager { }, Err(e) => return Err(e), }; - log::trace!("Remaining pre-keys on server: {:?}", prekey_status); + tracing::trace!("Remaining pre-keys on server: {:?}", prekey_status); if prekey_status.count >= PRE_KEY_MINIMUM && prekey_status.pq_count >= PRE_KEY_MINIMUM { - log::info!("Available keys sufficient"); + tracing::info!("Available keys sufficient"); return Ok(( pre_keys_offset_id, pq_pre_keys_offset_id, @@ -207,7 +207,7 @@ impl AccountManager { identity_key: *identity_key_pair.public_key(), pq_pre_keys: pq_pre_key_entities, pq_last_resort_key: if use_last_resort_key { - log::warn!("Last resort Kyber key unimplemented"); + tracing::warn!("Last resort Kyber key unimplemented"); // Note about the last-resort key: // mark_kyber_pre_key_used() should retain the last-resort key, but can safely // remove the ephemeral pre keys. This implies that generating the last-resort key @@ -228,7 +228,7 @@ impl AccountManager { .register_pre_keys(ServiceIdType::AccountIdentity, pre_key_state) .await?; - log::trace!("Successfully refreshed prekeys"); + tracing::trace!("Successfully refreshed prekeys"); Ok(( pre_keys_offset_id + PRE_KEY_BATCH_SIZE, pq_pre_keys_offset_id + PRE_KEY_BATCH_SIZE, @@ -313,7 +313,7 @@ impl AccountManager { let identity_key_pair = identity_store.get_identity_key_pair().await?; if credentials.uuid.is_none() { - log::warn!("No local UUID set"); + tracing::warn!("No local UUID set"); } let provisioning_code = self.new_device_provisioning_code().await?; diff --git a/libsignal-service/src/cipher.rs b/libsignal-service/src/cipher.rs index 926406770..9b6a71283 100644 --- a/libsignal-service/src/cipher.rs +++ b/libsignal-service/src/cipher.rs @@ -109,7 +109,7 @@ where envelope.server_guid.as_ref().and_then(|g| match g.parse() { Ok(uuid) => Some(uuid), Err(e) => { - log::error!("Unparseable server_guid ({})", e); + tracing::error!("Unparseable server_guid ({})", e); None }, }); @@ -159,7 +159,7 @@ where Plaintext { metadata, data } }, Type::PlaintextContent => { - log::warn!("Envelope with plaintext content. This usually indicates a decryption retry."); + tracing::warn!("Envelope with plaintext content. This usually indicates a decryption retry."); let metadata = Metadata { sender: envelope.source_address(), sender_device: envelope.source_device(), @@ -243,7 +243,7 @@ where }; let needs_receipt = if envelope.source_service_id.is_some() { - log::warn!("Received an unidentified delivery over an identified channel. Marking needs_receipt=false"); + tracing::warn!("Received an unidentified delivery over an identified channel. Marking needs_receipt=false"); false } else { true diff --git a/libsignal-service/src/envelope.rs b/libsignal-service/src/envelope.rs index a5071a961..868ba9137 100644 --- a/libsignal-service/src/envelope.rs +++ b/libsignal-service/src/envelope.rs @@ -34,10 +34,10 @@ impl Envelope { is_signaling_key_encrypted: bool, ) -> Result { if !is_signaling_key_encrypted { - log::trace!("Envelope::decrypt: not encrypted"); + tracing::trace!("Envelope::decrypt: not encrypted"); Ok(Envelope::decode(input)?) } else { - log::trace!("Envelope::decrypt: decrypting"); + tracing::trace!("Envelope::decrypt: decrypting"); if input.len() < VERSION_LENGTH || input[VERSION_OFFSET] != SUPPORTED_VERSION { @@ -78,7 +78,7 @@ impl Envelope { .decrypt_padded_vec_mut::(input) .expect("decryption"); - log::trace!("Envelope::decrypt: decrypted, decoding"); + tracing::trace!("Envelope::decrypt: decrypted, decoding"); Ok(Envelope::decode(&input as &[u8])?) } diff --git a/libsignal-service/src/groups_v2/manager.rs b/libsignal-service/src/groups_v2/manager.rs index fdafcf614..c132483e9 100644 --- a/libsignal-service/src/groups_v2/manager.rs +++ b/libsignal-service/src/groups_v2/manager.rs @@ -203,7 +203,7 @@ impl GroupsManager { credential_response, ) .map_err(|e| { - log::error!( + tracing::error!( "failed to receive auth credentials with PNI: {:?}", e ); diff --git a/libsignal-service/src/groups_v2/operations.rs b/libsignal-service/src/groups_v2/operations.rs index 7f53b4b8e..a78e986f6 100644 --- a/libsignal-service/src/groups_v2/operations.rs +++ b/libsignal-service/src/groups_v2/operations.rs @@ -163,7 +163,7 @@ impl GroupOperations { if bytes.is_empty() { GroupAttributeBlob::default() } else if bytes.len() < 29 { - log::warn!("bad encrypted blob length"); + tracing::warn!("bad encrypted blob length"); GroupAttributeBlob::default() } else { self.group_secret_params @@ -174,7 +174,7 @@ impl GroupOperations { .map_err(GroupDecodingError::ProtobufDecodeError) }) .unwrap_or_else(|e| { - log::warn!("bad encrypted blob: {}", e); + tracing::warn!("bad encrypted blob: {}", e); GroupAttributeBlob::default() }) } diff --git a/libsignal-service/src/messagepipe.rs b/libsignal-service/src/messagepipe.rs index b33bb2d43..683afe909 100644 --- a/libsignal-service/src/messagepipe.rs +++ b/libsignal-service/src/messagepipe.rs @@ -72,7 +72,7 @@ impl MessagePipe { { sink.send(env).await?; } else { - log::trace!("got empty message in websocket"); + tracing::trace!("got empty message in websocket"); } } @@ -129,7 +129,7 @@ impl MessagePipe { let stream = stream.map(Some); let runner = self.run(sink).map(|e| { - log::info!("sink was closed: {:?}", e); + tracing::info!("sink was closed: {:?}", e); None }); diff --git a/libsignal-service/src/models.rs b/libsignal-service/src/models.rs index 57f0a362c..0ff81cf95 100644 --- a/libsignal-service/src/models.rs +++ b/libsignal-service/src/models.rs @@ -82,7 +82,7 @@ impl Contact { reader: avatar_data, }) } else { - log::warn!("missing avatar content-type, skipping."); + tracing::warn!("missing avatar content-type, skipping."); None } }), diff --git a/libsignal-service/src/proto.rs b/libsignal-service/src/proto.rs index cf7d27d93..f24d08037 100644 --- a/libsignal-service/src/proto.rs +++ b/libsignal-service/src/proto.rs @@ -27,7 +27,7 @@ impl WebSocketRequestMessage { for header in &self.headers { let parts: Vec<_> = header.split(':').collect(); if parts.len() != 2 { - log::warn!( + tracing::warn!( "Got a weird header: {:?}, split in {:?}", header, parts diff --git a/libsignal-service/src/provisioning/pipe.rs b/libsignal-service/src/provisioning/pipe.rs index b942f75ec..6b9c9422a 100644 --- a/libsignal-service/src/provisioning/pipe.rs +++ b/libsignal-service/src/provisioning/pipe.rs @@ -153,7 +153,7 @@ impl ProvisioningPipe { let stream = stream.map(Some); let runner = self.run(sink).map(|_| { - log::info!("Sink closed, provisioning is done!"); + tracing::info!("Sink closed, provisioning is done!"); None }); diff --git a/libsignal-service/src/push_service.rs b/libsignal-service/src/push_service.rs index 155eb2919..09938f5b3 100644 --- a/libsignal-service/src/push_service.rs +++ b/libsignal-service/src/push_service.rs @@ -1110,7 +1110,7 @@ pub trait PushService: MaybeSend { (Err(e), _) => Err(e), (Ok(_resp), AvatarWrite::RetainAvatar) | (Ok(_resp), AvatarWrite::NoAvatar) => { - log::warn!( + tracing::warn!( "No avatar supplied but got avatar upload URL. Ignoring" ); Ok(None) diff --git a/libsignal-service/src/receiver.rs b/libsignal-service/src/receiver.rs index cef614745..9c8d5920f 100644 --- a/libsignal-service/src/receiver.rs +++ b/libsignal-service/src/receiver.rs @@ -81,7 +81,7 @@ impl MessageReceiver { match r { Ok(stream) => break stream, Err(ServiceError::Timeout { .. }) => { - log::warn!("get_attachment timed out, retrying"); + tracing::warn!("get_attachment timed out, retrying"); retries += 1; if retries >= MAX_DOWNLOAD_RETRIES { return Err(ServiceError::Timeout { diff --git a/libsignal-service/src/sender.rs b/libsignal-service/src/sender.rs index cf9884001..232caafaf 100644 --- a/libsignal-service/src/sender.rs +++ b/libsignal-service/src/sender.rs @@ -5,8 +5,8 @@ use libsignal_protocol::{ process_prekey_bundle, DeviceId, ProtocolStore, SenderCertificate, SenderKeyStore, SignalProtocolError, }; -use log::{debug, info, trace}; use rand::{CryptoRng, Rng}; +use tracing::{debug, info, trace}; use uuid::Uuid; use crate::{ @@ -179,7 +179,7 @@ where ) }; if padded_len < len { - log::error!( + tracing::error!( "Padded len {} < len {}. Continuing with a privacy risk.", padded_len, len @@ -191,13 +191,13 @@ where crate::attachment_cipher::encrypt_in_place(iv, key, &mut contents); // Request upload attributes - log::trace!("Requesting upload attributes"); + tracing::trace!("Requesting upload attributes"); let attrs = self .identified_ws .get_attachment_v2_upload_attributes() .await?; - log::trace!("Uploading attachment"); + tracing::trace!("Uploading attachment"); let (id, digest) = self .service .upload_attachment(&attrs, &mut std::io::Cursor::new(&contents)) @@ -329,7 +329,7 @@ where ContentBody::DataMessage(message), Ok(SentMessage { needs_sync, .. }), ) if *needs_sync || self.is_multi_device().await => { - log::debug!("sending multi-device sync message"); + tracing::debug!("sending multi-device sync message"); let sync_message = self .create_multi_device_sent_transcript_content( Some(recipient), @@ -351,7 +351,7 @@ where if end_session { let n = self.protocol_store.delete_all_sessions(recipient).await?; - log::debug!("ended {} sessions with {}", n, recipient.uuid); + tracing::debug!("ended {} sessions with {}", n, recipient.uuid); } results.remove(0) @@ -398,7 +398,9 @@ where if needs_sync_in_results && message.is_none() { // XXX: does this happen? - log::warn!("Server claims need sync, but not sending datamessage."); + tracing::warn!( + "Server claims need sync, but not sending datamessage." + ); } // we only need to send a synchronization message once @@ -461,18 +463,18 @@ where }; let send = if let Some(unidentified) = &unidentified_access { - log::debug!("sending via unidentified"); + tracing::debug!("sending via unidentified"); self.unidentified_ws .send_messages_unidentified(messages, unidentified) .await } else { - log::debug!("sending identified"); + tracing::debug!("sending identified"); self.identified_ws.send_messages(messages).await }; match send { Ok(SendMessageResponse { needs_sync }) => { - log::debug!("message sent!"); + tracing::debug!("message sent!"); return Ok(SentMessage { recipient, unidentified: unidentified_access.is_some(), @@ -482,13 +484,13 @@ where Err(ServiceError::Unauthorized) if unidentified_access.is_some() => { - log::trace!("unauthorized error using unidentified; retry over identified"); + tracing::trace!("unauthorized error using unidentified; retry over identified"); unidentified_access = None; }, Err(ServiceError::MismatchedDevicesException(ref m)) => { - log::debug!("{:?}", m); + tracing::debug!("{:?}", m); for extra_device_id in &m.extra_devices { - log::debug!( + tracing::debug!( "dropping session with device {}", extra_device_id ); @@ -501,7 +503,7 @@ where } for missing_device_id in &m.missing_devices { - log::debug!( + tracing::debug!( "creating session with missing device {}", missing_device_id ); @@ -522,7 +524,7 @@ where ) .await .map_err(|e| { - log::error!("failed to create session: {}", e); + tracing::error!("failed to create session: {}", e); MessageSenderError::UntrustedIdentity { address: recipient, } @@ -530,9 +532,9 @@ where } }, Err(ServiceError::StaleDevices(ref m)) => { - log::debug!("{:?}", m); + tracing::debug!("{:?}", m); for extra_device_id in &m.stale_devices { - log::debug!( + tracing::debug!( "dropping session with device {}", extra_device_id ); @@ -545,20 +547,20 @@ where } }, Err(ServiceError::ProofRequiredError(ref p)) => { - log::debug!("{:?}", p); + tracing::debug!("{:?}", p); return Err(MessageSenderError::ProofRequired { token: p.token.clone(), options: p.options.clone(), }); }, Err(ServiceError::NotFoundError) => { - log::debug!("Not found when sending a message"); + tracing::debug!("Not found when sending a message"); return Err(MessageSenderError::NotFound { uuid: recipient.uuid, }); }, Err(e) => { - log::debug!( + tracing::debug!( "Default error handler for ws.send_messages: {}", e ); @@ -661,7 +663,10 @@ where let recipient_protocol_address = recipient.to_protocol_address(device_id); - log::debug!("encrypting message for {}", recipient_protocol_address); + tracing::debug!( + "encrypting message for {}", + recipient_protocol_address + ); // establish a session with the recipient/device if necessary // no need to establish a session with ourselves (and our own current device) @@ -681,7 +686,7 @@ where .await { Ok(ok) => { - log::trace!("Get prekeys OK"); + tracing::trace!("Get prekeys OK"); ok }, Err(ServiceError::NotFoundError) => { diff --git a/libsignal-service/src/websocket.rs b/libsignal-service/src/websocket.rs index 63aa1a17c..f02a88355 100644 --- a/libsignal-service/src/websocket.rs +++ b/libsignal-service/src/websocket.rs @@ -95,7 +95,7 @@ impl SignalWebSocketProcess { frame: Bytes, ) -> Result<(), ServiceError> { let msg = WebSocketMessage::decode(frame)?; - log::trace!("decoded {:?}", msg); + tracing::trace!("decoded {:?}", msg); use web_socket_message::Type; match (msg.r#type(), msg.request, msg.response) { @@ -104,7 +104,7 @@ impl SignalWebSocketProcess { }), (Type::Request, Some(request), _) => { let (sink, recv) = oneshot::channel(); - log::trace!("sending request with body"); + tracing::trace!("sending request with body"); self.request_sink.send((request, sink)).await.map_err( |_| ServiceError::WsError { reason: "request handler failed".into(), @@ -124,7 +124,7 @@ impl SignalWebSocketProcess { self.outgoing_request_map.remove(&id) { if let Err(e) = responder.send(Ok(response)) { - log::warn!( + tracing::warn!( "Could not deliver response for id {}: {:?}", id, e @@ -134,7 +134,7 @@ impl SignalWebSocketProcess { self.outgoing_keep_alive_set.take(&id) { if response.status() != 200 { - log::warn!( + tracing::warn!( "Response code for keep-alive is not 200: {:?}", response ); @@ -143,7 +143,7 @@ impl SignalWebSocketProcess { }); } } else { - log::warn!( + tracing::warn!( "Response for non existing request: {:?}", response ); @@ -184,7 +184,7 @@ impl SignalWebSocketProcess { .filter(|x| !self.outgoing_request_map.contains_key(x)) .unwrap_or_else(|| self.next_request_id()), ); - log::trace!("sending request {:?}", request); + tracing::trace!("sending request {:?}", request); self.outgoing_request_map.insert(request.id.unwrap(), responder); let msg = WebSocketMessage { @@ -210,7 +210,7 @@ impl SignalWebSocketProcess { Some(WebSocketStreamItem::KeepAliveRequest) => { // XXX: would be nicer if we could drop this request into the request // queue above. - log::debug!("Sending keep alive upon request"); + tracing::debug!("Sending keep alive upon request"); let request = WebSocketRequestMessage { id: Some(self.next_request_id()), path: Some(self.keep_alive_path.clone()), @@ -236,7 +236,7 @@ impl SignalWebSocketProcess { response = self.outgoing_responses.next() => { match response { Some(Ok(response)) => { - log::trace!("sending response {:?}", response); + tracing::trace!("sending response {:?}", response); let msg = WebSocketMessage { r#type: Some(web_socket_message::Type::Response.into()), @@ -247,7 +247,7 @@ impl SignalWebSocketProcess { self.ws.send_message(buffer.into()).await?; } Some(Err(e)) => { - log::error!("could not generate response to a Signal request; responder was canceled: {}. Continuing.", e); + tracing::error!("could not generate response to a Signal request; responder was canceled: {}. Continuing.", e); } None => { unreachable!("outgoing responses should never fuse") @@ -292,7 +292,7 @@ impl SignalWebSocket { let process = process.run().map(|x| match x { Ok(()) => (), Err(e) => { - log::error!("SignalWebSocket: {}", e); + tracing::error!("SignalWebSocket: {}", e); }, }); @@ -383,7 +383,7 @@ impl SignalWebSocket { { let response = self.request(r).await?; if response.status() != 200 { - log::debug!( + tracing::debug!( "request_json with non-200 status code. message: {}", response.message() ); @@ -408,7 +408,7 @@ impl SignalWebSocket { 409 /* CONFLICT */ => { let mismatched_devices: MismatchedDevices = json(response.body()).map_err(|e| { - log::error!( + tracing::error!( "Failed to decode HTTP 409 response: {}", e ); @@ -423,7 +423,7 @@ impl SignalWebSocket { 410 /* GONE */ => { let stale_devices = json(response.body()).map_err(|e| { - log::error!( + tracing::error!( "Failed to decode HTTP 410 response: {}", e ); @@ -435,7 +435,7 @@ impl SignalWebSocket { }, 428 /* PRECONDITION_REQUIRED */ => { let proof_required = json(response.body()).map_err(|e| { - log::error!("Failed to decode HTTP 428 response: {}", e); + tracing::error!("Failed to decode HTTP 428 response: {}", e); ServiceError::UnhandledResponseCode { http_code: 428, } From fc1c695870c22cdf66c1a21d30a87e1cc005e3cb Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Sat, 6 Jan 2024 14:29:55 +0100 Subject: [PATCH 03/18] Remove log dependency --- libsignal-service-actix/Cargo.toml | 1 - libsignal-service-hyper/Cargo.toml | 1 - libsignal-service/Cargo.toml | 1 - 3 files changed, 3 deletions(-) diff --git a/libsignal-service-actix/Cargo.toml b/libsignal-service-actix/Cargo.toml index 52e97b7eb..782816358 100644 --- a/libsignal-service-actix/Cargo.toml +++ b/libsignal-service-actix/Cargo.toml @@ -24,7 +24,6 @@ rustls = "0.21" rustls-pemfile = "0.3" url = "2.1" serde = "1.0" -log = "0.4" rand = "0.8" thiserror = "1.0" diff --git a/libsignal-service-hyper/Cargo.toml b/libsignal-service-hyper/Cargo.toml index 982daecc0..547d4359e 100644 --- a/libsignal-service-hyper/Cargo.toml +++ b/libsignal-service-hyper/Cargo.toml @@ -13,7 +13,6 @@ async-trait = "0.1" bytes = "1.0" futures = "0.3" tracing = "0.1" -log = "0.4" mpart-async = "0.6" serde = "1.0" serde_json = "1.0" diff --git a/libsignal-service/Cargo.toml b/libsignal-service/Cargo.toml index 7ccd4c1bc..03bc31a1e 100644 --- a/libsignal-service/Cargo.toml +++ b/libsignal-service/Cargo.toml @@ -24,7 +24,6 @@ futures = "0.3" hex = "0.4" hkdf = "0.12" hmac = "0.12" -log = "0.4" phonenumber = "0.3" prost = "0.10" rand = "0.8" From abb957d05f6a4aa3612f797b89e8dd98f5eaf7ae Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Sat, 6 Jan 2024 14:30:02 +0100 Subject: [PATCH 04/18] Skip undebuggable fields --- libsignal-service/src/cipher.rs | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/libsignal-service/src/cipher.rs b/libsignal-service/src/cipher.rs index 9b6a71283..a592fffaa 100644 --- a/libsignal-service/src/cipher.rs +++ b/libsignal-service/src/cipher.rs @@ -1,4 +1,4 @@ -use std::{convert::TryFrom, time::SystemTime}; +use std::{convert::TryFrom, fmt, time::SystemTime}; use aes::cipher::block_padding::{Iso7816, RawPadding}; use libsignal_protocol::{ @@ -34,6 +34,18 @@ pub struct ServiceCipher { local_device_id: u32, } +impl fmt::Debug for ServiceCipher { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ServiceCipher") + .field("protocol_store", &"...") + .field("csprng", &"...") + .field("trust_root", &"...") + .field("local_uuid", &self.local_uuid) + .field("local_device_id", &self.local_device_id) + .finish() + } +} + impl ServiceCipher where S: ProtocolStore + KyberPreKeyStore + SenderKeyStore + Clone, @@ -429,7 +441,16 @@ pub async fn get_preferred_protocol_address( /// is then validated against the `trust_root` baked into the client to ensure that the sender's /// identity was not forged. #[allow(clippy::too_many_arguments)] -#[tracing::instrument] +#[tracing::instrument(skip( + ciphertext, + trust_root, + identity_store, + session_store, + pre_key_store, + signed_pre_key_store, + sender_key_store, + kyber_pre_key_store +))] async fn sealed_sender_decrypt( ciphertext: &[u8], trust_root: &PublicKey, From d47bd55cd4ae8a5b3c32afde0204d268b1c2c6c2 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Sat, 6 Jan 2024 14:40:07 +0100 Subject: [PATCH 05/18] Instrument update_pre_key_bundle --- libsignal-service/src/account_manager.rs | 235 +++++++++++++---------- 1 file changed, 129 insertions(+), 106 deletions(-) diff --git a/libsignal-service/src/account_manager.rs b/libsignal-service/src/account_manager.rs index c1d542816..de18bde59 100644 --- a/libsignal-service/src/account_manager.rs +++ b/libsignal-service/src/account_manager.rs @@ -89,6 +89,7 @@ impl AccountManager { /// /// Returns the next pre-key offset, pq pre-key offset, and next signed pre-key offset as a tuple. #[allow(clippy::too_many_arguments)] + #[tracing::instrument(skip(self, protocol_store, csprng))] pub async fn update_pre_key_bundle< R: rand::Rng + rand::CryptoRng, P: ProtocolStore, @@ -101,22 +102,29 @@ impl AccountManager { pq_pre_keys_offset_id: u32, use_last_resort_key: bool, ) -> Result<(u32, u32, u32), ServiceError> { - let prekey_status = match self - .service - .get_pre_key_status(ServiceIdType::AccountIdentity) - .await - { - Ok(status) => status, - Err(ServiceError::Unauthorized) => { - tracing::info!("Got Unauthorized when fetching pre-key status. Assuming first installment."); - // Additionally, the second PUT request will fail if this really comes down to an - // authorization failure. - crate::push_service::PreKeyStatus { - count: 0, - pq_count: 0, - } - }, - Err(e) => return Err(e), + let prekey_status = { + let _span = tracing::span!( + tracing::Level::DEBUG, + "Fetching pre key status" + ) + .entered(); + match self + .service + .get_pre_key_status(ServiceIdType::AccountIdentity) + .await + { + Ok(status) => status, + Err(ServiceError::Unauthorized) => { + tracing::info!("Got Unauthorized when fetching pre-key status. Assuming first installment."); + // Additionally, the second PUT request will fail if this really comes down to an + // authorization failure. + crate::push_service::PreKeyStatus { + count: 0, + pq_count: 0, + } + }, + Err(e) => return Err(e), + } }; tracing::trace!("Remaining pre-keys on server: {:?}", prekey_status); @@ -131,104 +139,119 @@ impl AccountManager { )); } - let identity_key_pair = protocol_store.get_identity_key_pair().await?; - - let mut pre_key_entities = vec![]; - let mut pq_pre_key_entities = vec![]; + let pre_key_state = { + let _span = + tracing::span!(tracing::Level::DEBUG, "Generating pre keys") + .entered(); + let identity_key_pair = + protocol_store.get_identity_key_pair().await?; + + let mut pre_key_entities = vec![]; + let mut pq_pre_key_entities = vec![]; + + // EC keys + for i in 0..PRE_KEY_BATCH_SIZE { + let key_pair = KeyPair::generate(csprng); + let pre_key_id = (((pre_keys_offset_id + i) + % (PRE_KEY_MEDIUM_MAX_VALUE - 1)) + + 1) + .into(); + let pre_key_record = PreKeyRecord::new(pre_key_id, &key_pair); + protocol_store + .save_pre_key(pre_key_id, &pre_key_record) + .await?; + // TODO: Shouldn't this also remove the previous pre-keys from storage? + // I think we might want to update the storage, and then sync the storage to the + // server. + + pre_key_entities.push(PreKeyEntity::try_from(pre_key_record)?); + } + + // Kyber keys + for i in 0..PRE_KEY_BATCH_SIZE { + let pre_key_id = (((pq_pre_keys_offset_id + i) + % (PRE_KEY_MEDIUM_MAX_VALUE - 1)) + + 1) + .into(); + let pre_key_record = KyberPreKeyRecord::generate( + kem::KeyType::Kyber1024, + pre_key_id, + identity_key_pair.private_key(), + )?; + protocol_store + .save_kyber_pre_key(pre_key_id, &pre_key_record) + .await?; + // TODO: Shouldn't this also remove the previous pre-keys from storage? + // I think we might want to update the storage, and then sync the storage to the + // server. + + pq_pre_key_entities + .push(KyberPreKeyEntity::try_from(pre_key_record)?); + } + + // Generate and store the next signed prekey + let signed_pre_key_pair = KeyPair::generate(csprng); + let signed_pre_key_public = signed_pre_key_pair.public_key; + let signed_pre_key_signature = + identity_key_pair.private_key().calculate_signature( + &signed_pre_key_public.serialize(), + csprng, + )?; + + let unix_time = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); + + let signed_prekey_record = SignedPreKeyRecord::new( + next_signed_pre_key_id.into(), + unix_time.as_millis() as u64, + &signed_pre_key_pair, + &signed_pre_key_signature, + ); - // EC keys - for i in 0..PRE_KEY_BATCH_SIZE { - let key_pair = KeyPair::generate(csprng); - let pre_key_id = (((pre_keys_offset_id + i) - % (PRE_KEY_MEDIUM_MAX_VALUE - 1)) - + 1) - .into(); - let pre_key_record = PreKeyRecord::new(pre_key_id, &key_pair); protocol_store - .save_pre_key(pre_key_id, &pre_key_record) + .save_signed_pre_key( + next_signed_pre_key_id.into(), + &signed_prekey_record, + ) .await?; - // TODO: Shouldn't this also remove the previous pre-keys from storage? - // I think we might want to update the storage, and then sync the storage to the - // server. - pre_key_entities.push(PreKeyEntity::try_from(pre_key_record)?); - } + PreKeyState { + pre_keys: pre_key_entities, + signed_pre_key: signed_prekey_record.try_into()?, + identity_key: *identity_key_pair.public_key(), + pq_pre_keys: pq_pre_key_entities, + pq_last_resort_key: if use_last_resort_key { + tracing::warn!("Last resort Kyber key unimplemented"); + // Note about the last-resort key: + // mark_kyber_pre_key_used() should retain the last-resort key, but can safely + // remove the ephemeral pre keys. This implies that generating the last-resort key + // should notify the pre-key store, when saving the key, that it concerns a + // last-resort key. I don't see how this can be communicated to the store, and I + // fear that we need to reengineer the whole prekeystore system as a whole. + None + // Some(KyberPreKeyEntity { + // key_id: 0x7fffffff, + // public_key: "NDI=".into(), + // }) + } else { + None + }, + } + }; - // Kyber keys - for i in 0..PRE_KEY_BATCH_SIZE { - let pre_key_id = (((pq_pre_keys_offset_id + i) - % (PRE_KEY_MEDIUM_MAX_VALUE - 1)) - + 1) - .into(); - let pre_key_record = KyberPreKeyRecord::generate( - kem::KeyType::Kyber1024, - pre_key_id, - identity_key_pair.private_key(), - )?; - protocol_store - .save_kyber_pre_key(pre_key_id, &pre_key_record) + { + let _span = + tracing::span!(tracing::Level::DEBUG, "Uploading pre keys") + .entered(); + self.service + .register_pre_keys( + ServiceIdType::AccountIdentity, + pre_key_state, + ) .await?; - // TODO: Shouldn't this also remove the previous pre-keys from storage? - // I think we might want to update the storage, and then sync the storage to the - // server. - - pq_pre_key_entities - .push(KyberPreKeyEntity::try_from(pre_key_record)?); } - // Generate and store the next signed prekey - let signed_pre_key_pair = KeyPair::generate(csprng); - let signed_pre_key_public = signed_pre_key_pair.public_key; - let signed_pre_key_signature = identity_key_pair - .private_key() - .calculate_signature(&signed_pre_key_public.serialize(), csprng)?; - - let unix_time = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap(); - - let signed_prekey_record = SignedPreKeyRecord::new( - next_signed_pre_key_id.into(), - unix_time.as_millis() as u64, - &signed_pre_key_pair, - &signed_pre_key_signature, - ); - - protocol_store - .save_signed_pre_key( - next_signed_pre_key_id.into(), - &signed_prekey_record, - ) - .await?; - - let pre_key_state = PreKeyState { - pre_keys: pre_key_entities, - signed_pre_key: signed_prekey_record.try_into()?, - identity_key: *identity_key_pair.public_key(), - pq_pre_keys: pq_pre_key_entities, - pq_last_resort_key: if use_last_resort_key { - tracing::warn!("Last resort Kyber key unimplemented"); - // Note about the last-resort key: - // mark_kyber_pre_key_used() should retain the last-resort key, but can safely - // remove the ephemeral pre keys. This implies that generating the last-resort key - // should notify the pre-key store, when saving the key, that it concerns a - // last-resort key. I don't see how this can be communicated to the store, and I - // fear that we need to reengineer the whole prekeystore system as a whole. - None - // Some(KyberPreKeyEntity { - // key_id: 0x7fffffff, - // public_key: "NDI=".into(), - // }) - } else { - None - }, - }; - - self.service - .register_pre_keys(ServiceIdType::AccountIdentity, pre_key_state) - .await?; - - tracing::trace!("Successfully refreshed prekeys"); Ok(( pre_keys_offset_id + PRE_KEY_BATCH_SIZE, pq_pre_keys_offset_id + PRE_KEY_BATCH_SIZE, From 731e2d2d147c54ab7b03f1acc1913d0ae54c5cbe Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Sat, 6 Jan 2024 14:54:44 +0100 Subject: [PATCH 06/18] Instrument sender --- libsignal-service/src/attachment_cipher.rs | 2 + libsignal-service/src/sender.rs | 65 +++++++++++++++++----- 2 files changed, 52 insertions(+), 15 deletions(-) diff --git a/libsignal-service/src/attachment_cipher.rs b/libsignal-service/src/attachment_cipher.rs index 06eeb2c6e..ebe0ad23a 100644 --- a/libsignal-service/src/attachment_cipher.rs +++ b/libsignal-service/src/attachment_cipher.rs @@ -18,6 +18,7 @@ pub enum AttachmentCipherError { /// /// The Vec will be reused when it has enough space to house the MAC, /// otherwise reallocation might happen. +#[tracing::instrument(skip(iv, key, plaintext))] pub fn encrypt_in_place(iv: [u8; 16], key: [u8; 64], plaintext: &mut Vec) { let aes_half = &key[..32]; let mac_half = &key[32..]; @@ -53,6 +54,7 @@ pub fn encrypt_in_place(iv: [u8; 16], key: [u8; 64], plaintext: &mut Vec) { /// Decrypts an attachment in place, given the key material. /// /// On error, ciphertext is not changed. +#[tracing::instrument(skip(key, ciphertext))] pub fn decrypt_in_place( key: [u8; 64], ciphertext: &mut Vec, diff --git a/libsignal-service/src/sender.rs b/libsignal-service/src/sender.rs index 232caafaf..9abff77e9 100644 --- a/libsignal-service/src/sender.rs +++ b/libsignal-service/src/sender.rs @@ -6,7 +6,7 @@ use libsignal_protocol::{ SenderKeyStore, SignalProtocolError, }; use rand::{CryptoRng, Rng}; -use tracing::{debug, info, trace}; +use tracing::{info, trace}; use uuid::Uuid; use crate::{ @@ -61,6 +61,7 @@ pub struct SentMessage { /// Attachment specification to be used for uploading. /// /// Loose equivalent of Java's `SignalServiceAttachmentStream`. +#[derive(Debug)] pub struct AttachmentSpec { pub content_type: String, pub length: usize, @@ -150,6 +151,7 @@ where /// Encrypts and uploads an attachment /// /// Contents are accepted as an owned, plain text Vec, because encryption happens in-place. + #[tracing::instrument(skip(self, contents))] pub async fn upload_attachment( &mut self, spec: AttachmentSpec, @@ -188,20 +190,24 @@ where contents.resize(padded_len, 0); } - crate::attachment_cipher::encrypt_in_place(iv, key, &mut contents); + tracing::trace_span!("encrypting attachment").in_scope(|| { + crate::attachment_cipher::encrypt_in_place(iv, key, &mut contents) + }); // Request upload attributes - tracing::trace!("Requesting upload attributes"); - let attrs = self - .identified_ws - .get_attachment_v2_upload_attributes() - .await?; - - tracing::trace!("Uploading attachment"); - let (id, digest) = self - .service - .upload_attachment(&attrs, &mut std::io::Cursor::new(&contents)) - .await?; + let attrs = { + let _span = + tracing::trace_span!("requesting upload attributes").entered(); + self.identified_ws + .get_attachment_v2_upload_attributes() + .await? + }; + let (id, digest) = { + let _span = tracing::trace_span!("Uploading attachment").entered(); + self.service + .upload_attachment(&attrs, &mut std::io::Cursor::new(&contents)) + .await? + }; Ok(AttachmentPointer { content_type: Some(spec.content_type), @@ -240,6 +246,7 @@ where /// Upload contact details to the CDN /// /// Returns attachment ID and the attachment digest + #[tracing::instrument(skip(self, contacts))] async fn upload_contact_details( &mut self, contacts: Contacts, @@ -287,6 +294,10 @@ where } /// Send a message `content` to a single `recipient`. + #[tracing::instrument( + skip(self, unidentified_access, message), + fields(unidentified_access = unidentified_access.is_some()), + )] pub async fn send_message( &mut self, recipient: &ServiceAddress, @@ -358,6 +369,10 @@ where } /// Send a message to the recipients in a group. + #[tracing::instrument( + skip(self, recipients, message), + fields(recipients = recipients.as_ref().len()), + )] pub async fn send_message_to_group( &mut self, recipients: impl AsRef<[(ServiceAddress, Option)]>, @@ -432,6 +447,11 @@ where } /// Send a message (`content`) to an address (`recipient`). + #[tracing::instrument( + level = "trace", + skip(self, unidentified_access, content_body), + fields(unidentified_access = unidentified_access.is_some()), + )] async fn try_send_message( &mut self, recipient: ServiceAddress, @@ -573,6 +593,10 @@ where } /// Upload contact details to the CDN and send a sync message + #[tracing::instrument( + skip(self, unidentified_access, contacts), + fields(unidentified_access = unidentified_access.is_some()), + )] pub async fn send_contact_details( &mut self, recipient: &ServiceAddress, @@ -610,6 +634,11 @@ where } // Equivalent with `getEncryptedMessages` + #[tracing::instrument( + level = "trace", + skip(self, unidentified_access, content), + fields(unidentified_access = unidentified_access.is_some()), + )] async fn create_encrypted_messages( &mut self, recipient: &ServiceAddress, @@ -635,7 +664,7 @@ where } for device_id in devices { - debug!("sending message to device {}", device_id); + trace!("sending message to device {}", device_id); messages.push( self.create_encrypted_message( recipient, @@ -653,6 +682,11 @@ where /// Equivalent to `getEncryptedMessage` /// /// When no session with the recipient exists, we need to create one. + #[tracing::instrument( + level = "trace", + skip(self, unidentified_access, content), + fields(unidentified_access = unidentified_access.is_some()), + )] async fn create_encrypted_message( &mut self, recipient: &ServiceAddress, @@ -663,7 +697,7 @@ where let recipient_protocol_address = recipient.to_protocol_address(device_id); - tracing::debug!( + tracing::trace!( "encrypting message for {}", recipient_protocol_address ); @@ -724,6 +758,7 @@ where } } + let _span = tracing::trace_span!("encrypting message").entered(); let message = self .cipher .encrypt(&recipient_protocol_address, unidentified_access, content) From 3fb20a0bbfc7412ebddbdaab43dcce20cd546211 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Sun, 7 Jan 2024 09:23:36 +0100 Subject: [PATCH 07/18] Smaller instrumentation log on Envelope --- libsignal-service/src/cipher.rs | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/libsignal-service/src/cipher.rs b/libsignal-service/src/cipher.rs index a592fffaa..34477875c 100644 --- a/libsignal-service/src/cipher.rs +++ b/libsignal-service/src/cipher.rs @@ -46,6 +46,23 @@ impl fmt::Debug for ServiceCipher { } } +fn debug_envelope(envelope: &Envelope) -> String { + format!( + "Envelope {{ \ + source_address: {:?}, \ + source_device: {:?}, \ + server_guid: {:?}, \ + timestamp: {:?}, \ + content: {} bytes, \ + }}", + envelope.source_address(), + envelope.source_device(), + envelope.server_guid(), + envelope.timestamp(), + envelope.content().len(), + ) +} + impl ServiceCipher where S: ProtocolStore + KyberPreKeyStore + SenderKeyStore + Clone, @@ -70,7 +87,7 @@ where /// Opens ("decrypts") an envelope. /// /// Envelopes may be empty, in which case this method returns `Ok(None)` - #[tracing::instrument] + #[tracing::instrument(skip(envelope), fields(envelope = debug_envelope(&envelope)))] pub async fn open_envelope( &mut self, envelope: Envelope, @@ -102,7 +119,7 @@ where /// Triage of legacy messages happens inside this method, as opposed to the /// Java implementation, because it makes the borrow checker and the /// author happier. - #[tracing::instrument] + #[tracing::instrument(skip(envelope), fields(envelope = debug_envelope(envelope)))] async fn decrypt( &mut self, envelope: &Envelope, From 38a322f7cbd213e2324b85eb9eb14cdd2e466e70 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Sun, 7 Jan 2024 14:17:08 +0100 Subject: [PATCH 08/18] Cleaner panic message for Envelope::source_address --- libsignal-service/src/envelope.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/libsignal-service/src/envelope.rs b/libsignal-service/src/envelope.rs index 868ba9137..8a44a69f8 100644 --- a/libsignal-service/src/envelope.rs +++ b/libsignal-service/src/envelope.rs @@ -136,8 +136,10 @@ impl Envelope { let uuid = self .source_service_id .as_deref() - .and_then(|u| Uuid::parse_str(u).ok()) - .expect("valid uuid checked in constructor"); + .map(Uuid::parse_str) + .transpose() + .expect("valid uuid checked in constructor") + .expect("source_service_id is set"); ServiceAddress { uuid } } From 3e3287abe63182c9c9ff1ecdb685031ed6d71820 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Sun, 7 Jan 2024 14:19:05 +0100 Subject: [PATCH 09/18] Short debug format for empty Envelopes --- libsignal-service/src/cipher.rs | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/libsignal-service/src/cipher.rs b/libsignal-service/src/cipher.rs index 34477875c..fec99acfa 100644 --- a/libsignal-service/src/cipher.rs +++ b/libsignal-service/src/cipher.rs @@ -47,20 +47,24 @@ impl fmt::Debug for ServiceCipher { } fn debug_envelope(envelope: &Envelope) -> String { - format!( - "Envelope {{ \ + if envelope.content.is_none() { + "Envelope { empty }".to_string() + } else { + format!( + "Envelope {{ \ source_address: {:?}, \ source_device: {:?}, \ server_guid: {:?}, \ timestamp: {:?}, \ content: {} bytes, \ }}", - envelope.source_address(), - envelope.source_device(), - envelope.server_guid(), - envelope.timestamp(), - envelope.content().len(), - ) + envelope.source_address(), + envelope.source_device(), + envelope.server_guid(), + envelope.timestamp(), + envelope.content().len(), + ) + } } impl ServiceCipher From 776de62f31452eb9ee87b68d08b36f11d4a22714 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Sun, 7 Jan 2024 15:29:49 +0100 Subject: [PATCH 10/18] debug_envelope logic when source_uuid is unset --- libsignal-service/src/cipher.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/libsignal-service/src/cipher.rs b/libsignal-service/src/cipher.rs index fec99acfa..4cff65b43 100644 --- a/libsignal-service/src/cipher.rs +++ b/libsignal-service/src/cipher.rs @@ -52,13 +52,17 @@ fn debug_envelope(envelope: &Envelope) -> String { } else { format!( "Envelope {{ \ - source_address: {:?}, \ - source_device: {:?}, \ - server_guid: {:?}, \ - timestamp: {:?}, \ - content: {} bytes, \ - }}", - envelope.source_address(), + source_address: {}, \ + source_device: {:?}, \ + server_guid: {:?}, \ + timestamp: {:?}, \ + content: {} bytes, \ + }}", + if envelope.source_service_id.is_some() { + format!("{:?}", envelope.source_address()) + } else { + "unknown".to_string() + }, envelope.source_device(), envelope.server_guid(), envelope.timestamp(), From 88609264866b1cf9dc1a6ab28bb95146cac1e688 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Mon, 8 Jan 2024 11:46:34 +0100 Subject: [PATCH 11/18] Less verbose logging of keys and contents --- libsignal-service/src/cipher.rs | 25 +++++++++++++++---------- libsignal-service/src/envelope.rs | 2 +- libsignal-service/src/websocket.rs | 24 +++++++++++++++++++++++- 3 files changed, 39 insertions(+), 12 deletions(-) diff --git a/libsignal-service/src/cipher.rs b/libsignal-service/src/cipher.rs index 4cff65b43..cafbab7c0 100644 --- a/libsignal-service/src/cipher.rs +++ b/libsignal-service/src/cipher.rs @@ -466,16 +466,21 @@ pub async fn get_preferred_protocol_address( /// is then validated against the `trust_root` baked into the client to ensure that the sender's /// identity was not forged. #[allow(clippy::too_many_arguments)] -#[tracing::instrument(skip( - ciphertext, - trust_root, - identity_store, - session_store, - pre_key_store, - signed_pre_key_store, - sender_key_store, - kyber_pre_key_store -))] +#[tracing::instrument( + skip( + ciphertext, + trust_root, + identity_store, + session_store, + pre_key_store, + signed_pre_key_store, + sender_key_store, + kyber_pre_key_store + ), + fields( + ciphertext = ciphertext.len(), + ) +)] async fn sealed_sender_decrypt( ciphertext: &[u8], trust_root: &PublicKey, diff --git a/libsignal-service/src/envelope.rs b/libsignal-service/src/envelope.rs index 8a44a69f8..46efe94f7 100644 --- a/libsignal-service/src/envelope.rs +++ b/libsignal-service/src/envelope.rs @@ -27,7 +27,7 @@ impl TryFrom for Envelope { } impl Envelope { - #[tracing::instrument] + #[tracing::instrument(skip(input, signaling_key), fields(input_size = input.len()))] pub fn decrypt( input: &[u8], signaling_key: &SignalingKey, diff --git a/libsignal-service/src/websocket.rs b/libsignal-service/src/websocket.rs index f02a88355..694263bb6 100644 --- a/libsignal-service/src/websocket.rs +++ b/libsignal-service/src/websocket.rs @@ -95,7 +95,29 @@ impl SignalWebSocketProcess { frame: Bytes, ) -> Result<(), ServiceError> { let msg = WebSocketMessage::decode(frame)?; - tracing::trace!("decoded {:?}", msg); + if let Some(request) = &msg.request { + tracing::trace!( + "decoded WebSocketMessage request {{ r#type: {:?}, verb: {:?}, path: {:?}, body: {} bytes, headers: {:?}, id: {:?} }}", + msg.r#type(), + request.verb, + request.path, + request.body.as_ref().map(|x| x.len()).unwrap_or(0), + request.headers, + request.id, + ); + } else if let Some(response) = &msg.response { + tracing::trace!( + "decoded WebSocketMessage response {{ r#type: {:?}, status: {:?}, message: {:?}, body: {} bytes, headers: {:?}, id: {:?} }}", + msg.r#type(), + response.status, + response.message, + response.body.as_ref().map(|x| x.len()).unwrap_or(0), + response.headers, + response.id, + ); + } else { + tracing::debug!("decoded {msg:?}"); + } use web_socket_message::Type; match (msg.r#type(), msg.request, msg.response) { From 49ac04a6f40269ceb0ccf88e4a4aee0a1b3694e9 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Mon, 8 Jan 2024 14:42:41 +0100 Subject: [PATCH 12/18] More compact request log --- libsignal-service/src/websocket.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/libsignal-service/src/websocket.rs b/libsignal-service/src/websocket.rs index 694263bb6..04729bbec 100644 --- a/libsignal-service/src/websocket.rs +++ b/libsignal-service/src/websocket.rs @@ -206,7 +206,14 @@ impl SignalWebSocketProcess { .filter(|x| !self.outgoing_request_map.contains_key(x)) .unwrap_or_else(|| self.next_request_id()), ); - tracing::trace!("sending request {:?}", request); + tracing::trace!( + "sending WebSocketRequestMessage {{ verb: {:?}, path: {:?}, body (bytes): {:?}, headers: {:?}, id: {:?} }}", + request.verb, + request.path, + request.body.as_ref().map(|x| x.len()), + request.headers, + request.id, + ); self.outgoing_request_map.insert(request.id.unwrap(), responder); let msg = WebSocketMessage { From c0319e250c98c37290bcda051d8bb38e50c2f325 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Mon, 8 Jan 2024 15:49:27 +0100 Subject: [PATCH 13/18] Refactor some events into spans and variables --- libsignal-service-actix/src/push_service.rs | 27 +++++++++++++++------ libsignal-service/src/cipher.rs | 10 +++++--- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/libsignal-service-actix/src/push_service.rs b/libsignal-service-actix/src/push_service.rs index 5a011da3a..0ef21ae4f 100644 --- a/libsignal-service-actix/src/push_service.rs +++ b/libsignal-service-actix/src/push_service.rs @@ -47,7 +47,7 @@ impl AwcPushService { credentials_override: HttpAuthOverride, ) -> Result { let url = self.cfg.base_url(endpoint).join(path.as_ref())?; - tracing::debug!("HTTP request {} {}", method, url); + tracing::debug!(%url, %method, "HTTP request"); let mut builder = self.client.request(method, url.as_str()); for &header in additional_headers { builder = builder.insert_header(header); @@ -109,6 +109,7 @@ impl AwcPushService { let mismatched_devices = response.json().await.map_err(|e| { tracing::error!( + ?response, "Failed to decode HTTP 409 response: {}", e ); @@ -123,6 +124,7 @@ impl AwcPushService { StatusCode::GONE => { let stale_devices = response.json().await.map_err(|e| { tracing::error!( + ?response, "Failed to decode HTTP 410 response: {}", e ); @@ -135,6 +137,7 @@ impl AwcPushService { StatusCode::PRECONDITION_REQUIRED => { let proof_required = response.json().await.map_err(|e| { tracing::error!( + ?response, "Failed to decode HTTP 428 response: {}", e ); @@ -148,6 +151,7 @@ impl AwcPushService { code => { let contents = response.body().await; tracing::trace!( + ?response, "Unhandled response {} with body: {:?}", code.as_u16(), contents, @@ -197,7 +201,8 @@ impl PushService for AwcPushService { }, })?; - tracing::debug!("AwcPushService::get response: {:?}", response); + let _span = + tracing::debug_span!("processing response", ?response).entered(); Self::from_response(&mut response).await?; @@ -255,7 +260,8 @@ impl PushService for AwcPushService { }, })?; - tracing::debug!("AwcPushService::delete response: {:?}", response); + let _span = + tracing::debug_span!("processing response", ?response).entered(); Self::from_response(&mut response).await?; @@ -309,7 +315,8 @@ impl PushService for AwcPushService { reason: e.to_string(), })?; - tracing::debug!("AwcPushService::put response: {:?}", response); + let _span = + tracing::debug_span!("processing response", ?response).entered(); Self::from_response(&mut response).await?; @@ -362,7 +369,8 @@ impl PushService for AwcPushService { reason: e.to_string(), })?; - tracing::debug!("AwcPushService::patch response: {:?}", response); + let _span = + tracing::debug_span!("processing response", ?response).entered(); Self::from_response(&mut response).await?; @@ -415,7 +423,8 @@ impl PushService for AwcPushService { reason: e.to_string(), })?; - tracing::debug!("AwcPushService::post response: {:?}", response); + let _span = + tracing::debug_span!("processing response", ?response).entered(); Self::from_response(&mut response).await?; @@ -533,7 +542,8 @@ impl PushService for AwcPushService { reason: e.to_string(), })?; - tracing::debug!("AwcPushService::get_stream response: {:?}", response); + let _span = + tracing::debug_span!("processing response", ?response).entered(); Self::from_response(&mut response).await?; @@ -621,7 +631,8 @@ impl PushService for AwcPushService { reason: e.to_string(), })?; - tracing::debug!("AwcPushService::put response: {:?}", response); + let _span = + tracing::debug_span!("processing response", ?response).entered(); Self::from_response(&mut response).await?; diff --git a/libsignal-service/src/cipher.rs b/libsignal-service/src/cipher.rs index cafbab7c0..0922253de 100644 --- a/libsignal-service/src/cipher.rs +++ b/libsignal-service/src/cipher.rs @@ -146,7 +146,11 @@ where envelope.server_guid.as_ref().and_then(|g| match g.parse() { Ok(uuid) => Some(uuid), Err(e) => { - tracing::error!("Unparseable server_guid ({})", e); + tracing::error!( + ?envelope, + "Unparseable server_guid ({})", + e + ); None }, }); @@ -196,7 +200,7 @@ where Plaintext { metadata, data } }, Type::PlaintextContent => { - tracing::warn!("Envelope with plaintext content. This usually indicates a decryption retry."); + tracing::warn!(?envelope, "Envelope with plaintext content. This usually indicates a decryption retry."); let metadata = Metadata { sender: envelope.source_address(), sender_device: envelope.source_device(), @@ -280,7 +284,7 @@ where }; let needs_receipt = if envelope.source_service_id.is_some() { - tracing::warn!("Received an unidentified delivery over an identified channel. Marking needs_receipt=false"); + tracing::warn!(?envelope, "Received an unidentified delivery over an identified channel. Marking needs_receipt=false"); false } else { true From 313484c8e58a4975f884320b97c138a6797db067 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Mon, 8 Jan 2024 15:51:49 +0100 Subject: [PATCH 14/18] Some more variables --- libsignal-service-actix/src/websocket.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/libsignal-service-actix/src/websocket.rs b/libsignal-service-actix/src/websocket.rs index 53c35a041..c5b741313 100644 --- a/libsignal-service-actix/src/websocket.rs +++ b/libsignal-service-actix/src/websocket.rs @@ -105,24 +105,24 @@ where Frame::Continuation(_c) => todo!(), Frame::Ping(msg) => { - tracing::warn!("Received Ping({:?})", msg); + tracing::warn!(?msg, "received Ping"); continue; }, Frame::Pong(msg) => { - tracing::trace!("Received Pong({:?})", msg); + tracing::trace!(?msg, "received Pong"); continue; }, Frame::Text(frame) => { - tracing::warn!("Frame::Text {:?}", frame); + tracing::warn!(?frame, "frame::Text",); // this is a protocol violation, maybe break; is better? continue; }, Frame::Close(c) => { - tracing::warn!("Websocket closing: {:?}", c); + tracing::warn!(?c, "Websocket closing"); break; }, From 20e4416b1591da9d286919d30d44ea14ce67158c Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Mon, 8 Jan 2024 19:17:17 +0100 Subject: [PATCH 15/18] Add attachment length to tracing upload --- libsignal-service/src/sender.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libsignal-service/src/sender.rs b/libsignal-service/src/sender.rs index 9abff77e9..658c30c2b 100644 --- a/libsignal-service/src/sender.rs +++ b/libsignal-service/src/sender.rs @@ -151,7 +151,7 @@ where /// Encrypts and uploads an attachment /// /// Contents are accepted as an owned, plain text Vec, because encryption happens in-place. - #[tracing::instrument(skip(self, contents))] + #[tracing::instrument(skip(self, contents), fields(size = contents.len()))] pub async fn upload_attachment( &mut self, spec: AttachmentSpec, From 21ae233947421efe3bd801b4204c23bfb27d4931 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Mon, 8 Jan 2024 20:21:49 +0100 Subject: [PATCH 16/18] Mask websocket query --- libsignal-service-actix/src/websocket.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/libsignal-service-actix/src/websocket.rs b/libsignal-service-actix/src/websocket.rs index c5b741313..3921cf79d 100644 --- a/libsignal-service-actix/src/websocket.rs +++ b/libsignal-service-actix/src/websocket.rs @@ -160,14 +160,20 @@ impl AwcWebSocket { ); } - tracing::trace!("Will start websocket at {:?}", url); + tracing::trace!( + url.scheme = url.scheme(), + url.host = ?url.host(), + url.path = url.path(), + url.has_query = ?url.query().is_some(), + "starting websocket", + ); let mut ws = client.ws(url.as_str()); for (key, value) in additional_headers { ws = ws.header(*key, *value); } let (response, framed) = ws.connect().await?; - tracing::debug!("WebSocket connected: {:?}", response); + tracing::debug!(?response, "WebSocket connected"); let (incoming_sink, incoming_stream) = channel(5); From ce4ba9707d83b7626357828ee08be12ea34173f6 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Mon, 8 Jan 2024 20:59:07 +0100 Subject: [PATCH 17/18] instrument futures instead of threads --- libsignal-service-actix/Cargo.toml | 1 + libsignal-service-actix/src/push_service.rs | 9 ++- libsignal-service-hyper/Cargo.toml | 1 + libsignal-service/Cargo.toml | 1 + libsignal-service/src/account_manager.rs | 72 ++++++++++----------- libsignal-service/src/sender.rs | 26 ++++---- 6 files changed, 54 insertions(+), 56 deletions(-) diff --git a/libsignal-service-actix/Cargo.toml b/libsignal-service-actix/Cargo.toml index 782816358..315b36536 100644 --- a/libsignal-service-actix/Cargo.toml +++ b/libsignal-service-actix/Cargo.toml @@ -19,6 +19,7 @@ mpart-async = "0.6" serde_json = "1.0" futures = "0.3" tracing = "0.1" +tracing-futures = "0.2" bytes = "1" rustls = "0.21" rustls-pemfile = "0.3" diff --git a/libsignal-service-actix/src/push_service.rs b/libsignal-service-actix/src/push_service.rs index 0ef21ae4f..608889a64 100644 --- a/libsignal-service-actix/src/push_service.rs +++ b/libsignal-service-actix/src/push_service.rs @@ -13,6 +13,7 @@ use libsignal_service::{ websocket::SignalWebSocket, }; use serde::{Deserialize, Serialize}; +use tracing_futures::Instrument; use crate::websocket::AwcWebSocket; @@ -85,6 +86,7 @@ impl AwcPushService { }) } + #[tracing::instrument(name = "extracting error", skip(response))] async fn from_response( response: &mut ClientResponse, ) -> Result<(), ServiceError> @@ -201,8 +203,7 @@ impl PushService for AwcPushService { }, })?; - let _span = - tracing::debug_span!("processing response", ?response).entered(); + let _span = tracing::debug_span!("processing response", ?response); Self::from_response(&mut response).await?; @@ -646,6 +647,7 @@ impl PushService for AwcPushService { additional_headers: &[(&str, &str)], credentials: Option, ) -> Result { + let span = tracing::debug_span!("websocket"); let (ws, stream) = AwcWebSocket::with_client( &mut self.client, self.cfg.base_url(Endpoint::Service), @@ -653,13 +655,14 @@ impl PushService for AwcPushService { additional_headers, credentials.as_ref(), ) + .instrument(span.clone()) .await?; let (ws, task) = SignalWebSocket::from_socket( ws, stream, keep_alive_path.to_owned(), ); - actix_rt::spawn(task); + actix_rt::spawn(task.instrument(span)); Ok(ws) } } diff --git a/libsignal-service-hyper/Cargo.toml b/libsignal-service-hyper/Cargo.toml index 547d4359e..d874aac8d 100644 --- a/libsignal-service-hyper/Cargo.toml +++ b/libsignal-service-hyper/Cargo.toml @@ -13,6 +13,7 @@ async-trait = "0.1" bytes = "1.0" futures = "0.3" tracing = "0.1" +tracing-futures = "0.2" mpart-async = "0.6" serde = "1.0" serde_json = "1.0" diff --git a/libsignal-service/Cargo.toml b/libsignal-service/Cargo.toml index 03bc31a1e..45285513e 100644 --- a/libsignal-service/Cargo.toml +++ b/libsignal-service/Cargo.toml @@ -35,6 +35,7 @@ url = { version = "2.1", features = ["serde"] } uuid = { version = "1", features = ["serde"] } tracing = "0.1" +tracing-futures = "0.2" [build-dependencies] prost-build = "0.10" diff --git a/libsignal-service/src/account_manager.rs b/libsignal-service/src/account_manager.rs index de18bde59..cc2e2d1a2 100644 --- a/libsignal-service/src/account_manager.rs +++ b/libsignal-service/src/account_manager.rs @@ -13,6 +13,7 @@ use libsignal_protocol::{ use prost::Message; use serde::{Deserialize, Serialize}; use sha2::Sha256; +use tracing_futures::Instrument; use zkgroup::profiles::ProfileKey; use crate::pre_keys::KyberPreKeyEntity; @@ -102,29 +103,26 @@ impl AccountManager { pq_pre_keys_offset_id: u32, use_last_resort_key: bool, ) -> Result<(u32, u32, u32), ServiceError> { - let prekey_status = { - let _span = tracing::span!( + let prekey_status = match self + .service + .get_pre_key_status(ServiceIdType::AccountIdentity) + .instrument(tracing::span!( tracing::Level::DEBUG, "Fetching pre key status" - ) - .entered(); - match self - .service - .get_pre_key_status(ServiceIdType::AccountIdentity) - .await - { - Ok(status) => status, - Err(ServiceError::Unauthorized) => { - tracing::info!("Got Unauthorized when fetching pre-key status. Assuming first installment."); - // Additionally, the second PUT request will fail if this really comes down to an - // authorization failure. - crate::push_service::PreKeyStatus { - count: 0, - pq_count: 0, - } - }, - Err(e) => return Err(e), - } + )) + .await + { + Ok(status) => status, + Err(ServiceError::Unauthorized) => { + tracing::info!("Got Unauthorized when fetching pre-key status. Assuming first installment."); + // Additionally, the second PUT request will fail if this really comes down to an + // authorization failure. + crate::push_service::PreKeyStatus { + count: 0, + pq_count: 0, + } + }, + Err(e) => return Err(e), }; tracing::trace!("Remaining pre-keys on server: {:?}", prekey_status); @@ -140,11 +138,11 @@ impl AccountManager { } let pre_key_state = { - let _span = - tracing::span!(tracing::Level::DEBUG, "Generating pre keys") - .entered(); + let span = + tracing::span!(tracing::Level::DEBUG, "Generating pre keys"); + let identity_key_pair = - protocol_store.get_identity_key_pair().await?; + protocol_store.get_identity_key_pair().instrument(tracing::trace_span!(parent: &span, "get identity key pair")).await?; let mut pre_key_entities = vec![]; let mut pq_pre_key_entities = vec![]; @@ -159,7 +157,7 @@ impl AccountManager { let pre_key_record = PreKeyRecord::new(pre_key_id, &key_pair); protocol_store .save_pre_key(pre_key_id, &pre_key_record) - .await?; + .instrument(tracing::trace_span!(parent: &span, "save pre key", ?pre_key_id)).await?; // TODO: Shouldn't this also remove the previous pre-keys from storage? // I think we might want to update the storage, and then sync the storage to the // server. @@ -180,7 +178,7 @@ impl AccountManager { )?; protocol_store .save_kyber_pre_key(pre_key_id, &pre_key_record) - .await?; + .instrument(tracing::trace_span!(parent: &span, "save kyber pre key", ?pre_key_id)).await?; // TODO: Shouldn't this also remove the previous pre-keys from storage? // I think we might want to update the storage, and then sync the storage to the // server. @@ -214,7 +212,7 @@ impl AccountManager { next_signed_pre_key_id.into(), &signed_prekey_record, ) - .await?; + .instrument(tracing::trace_span!(parent: &span, "save signed pre key", signed_pre_key_id = ?next_signed_pre_key_id)).await?; PreKeyState { pre_keys: pre_key_entities, @@ -240,17 +238,13 @@ impl AccountManager { } }; - { - let _span = - tracing::span!(tracing::Level::DEBUG, "Uploading pre keys") - .entered(); - self.service - .register_pre_keys( - ServiceIdType::AccountIdentity, - pre_key_state, - ) - .await?; - } + self.service + .register_pre_keys(ServiceIdType::AccountIdentity, pre_key_state) + .instrument(tracing::span!( + tracing::Level::DEBUG, + "Uploading pre keys" + )) + .await?; Ok(( pre_keys_offset_id + PRE_KEY_BATCH_SIZE, diff --git a/libsignal-service/src/sender.rs b/libsignal-service/src/sender.rs index 658c30c2b..09c56eb02 100644 --- a/libsignal-service/src/sender.rs +++ b/libsignal-service/src/sender.rs @@ -7,6 +7,7 @@ use libsignal_protocol::{ }; use rand::{CryptoRng, Rng}; use tracing::{info, trace}; +use tracing_futures::Instrument; use uuid::Uuid; use crate::{ @@ -195,19 +196,16 @@ where }); // Request upload attributes - let attrs = { - let _span = - tracing::trace_span!("requesting upload attributes").entered(); - self.identified_ws - .get_attachment_v2_upload_attributes() - .await? - }; - let (id, digest) = { - let _span = tracing::trace_span!("Uploading attachment").entered(); - self.service - .upload_attachment(&attrs, &mut std::io::Cursor::new(&contents)) - .await? - }; + let attrs = self + .identified_ws + .get_attachment_v2_upload_attributes() + .instrument(tracing::trace_span!("requesting upload attributes")) + .await?; + let (id, digest) = self + .service + .upload_attachment(&attrs, &mut std::io::Cursor::new(&contents)) + .instrument(tracing::trace_span!("Uploading attachment")) + .await?; Ok(AttachmentPointer { content_type: Some(spec.content_type), @@ -758,10 +756,10 @@ where } } - let _span = tracing::trace_span!("encrypting message").entered(); let message = self .cipher .encrypt(&recipient_protocol_address, unidentified_access, content) + .instrument(tracing::trace_span!("encrypting message")) .await?; Ok(message) From bac79848028eef31d578f91dbaa3a32b1caf904f Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Tue, 9 Jan 2024 11:31:13 +0100 Subject: [PATCH 18/18] Enable log compatibility for dependencies --- libsignal-service/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libsignal-service/Cargo.toml b/libsignal-service/Cargo.toml index 45285513e..f45f9009c 100644 --- a/libsignal-service/Cargo.toml +++ b/libsignal-service/Cargo.toml @@ -34,7 +34,7 @@ thiserror = "1.0" url = { version = "2.1", features = ["serde"] } uuid = { version = "1", features = ["serde"] } -tracing = "0.1" +tracing = { version = "0.1", features = ["log"] } tracing-futures = "0.2" [build-dependencies]