diff --git a/libsignal-service-actix/Cargo.toml b/libsignal-service-actix/Cargo.toml index 803c27ee3..315b36536 100644 --- a/libsignal-service-actix/Cargo.toml +++ b/libsignal-service-actix/Cargo.toml @@ -18,12 +18,13 @@ actix-rt = "2.4" mpart-async = "0.6" serde_json = "1.0" futures = "0.3" +tracing = "0.1" +tracing-futures = "0.2" bytes = "1" rustls = "0.21" rustls-pemfile = "0.3" url = "2.1" serde = "1.0" -log = "0.4" rand = "0.8" thiserror = "1.0" @@ -33,7 +34,6 @@ base64 = "0.13" phonenumber = "0.3" [dev-dependencies] -env_logger = "0.9" image = { version = "0.23", default-features = false, features = ["png"] } opener = "0.5" qrcode = "0.12" diff --git a/libsignal-service-actix/examples/registering.rs b/libsignal-service-actix/examples/registering.rs index cf41ec7ce..fe3c8df2b 100644 --- a/libsignal-service-actix/examples/registering.rs +++ b/libsignal-service-actix/examples/registering.rs @@ -32,7 +32,6 @@ use structopt::StructOpt; #[actix_rt::main] async fn main() -> Result<(), Error> { - env_logger::init(); let client = "libsignal-service-hyper-example"; let use_voice = false; diff --git a/libsignal-service-actix/src/push_service.rs b/libsignal-service-actix/src/push_service.rs index 639a5a25f..608889a64 100644 --- a/libsignal-service-actix/src/push_service.rs +++ b/libsignal-service-actix/src/push_service.rs @@ -13,6 +13,7 @@ use libsignal_service::{ websocket::SignalWebSocket, }; use serde::{Deserialize, Serialize}; +use tracing_futures::Instrument; use crate::websocket::AwcWebSocket; @@ -47,7 +48,7 @@ impl AwcPushService { credentials_override: HttpAuthOverride, ) -> Result { let url = self.cfg.base_url(endpoint).join(path.as_ref())?; - log::debug!("HTTP request {} {}", method, url); + tracing::debug!(%url, %method, "HTTP request"); let mut builder = self.client.request(method, url.as_str()); for &header in additional_headers { builder = builder.insert_header(header); @@ -85,6 +86,7 @@ impl AwcPushService { }) } + #[tracing::instrument(name = "extracting error", skip(response))] async fn from_response( response: &mut ClientResponse, ) -> Result<(), ServiceError> @@ -108,7 +110,8 @@ impl AwcPushService { StatusCode::CONFLICT => { let mismatched_devices = response.json().await.map_err(|e| { - log::error!( + tracing::error!( + ?response, "Failed to decode HTTP 409 response: {}", e ); @@ -122,7 +125,11 @@ impl AwcPushService { }, StatusCode::GONE => { let stale_devices = response.json().await.map_err(|e| { - log::error!("Failed to decode HTTP 410 response: {}", e); + tracing::error!( + ?response, + "Failed to decode HTTP 410 response: {}", + e + ); ServiceError::UnhandledResponseCode { http_code: StatusCode::GONE.as_u16(), } @@ -131,7 +138,11 @@ impl AwcPushService { }, StatusCode::PRECONDITION_REQUIRED => { let proof_required = response.json().await.map_err(|e| { - log::error!("Failed to decode HTTP 428 response: {}", e); + tracing::error!( + ?response, + "Failed to decode HTTP 428 response: {}", + e + ); ServiceError::UnhandledResponseCode { http_code: StatusCode::PRECONDITION_REQUIRED.as_u16(), } @@ -141,7 +152,8 @@ impl AwcPushService { // XXX: fill in rest from PushServiceSocket code => { let contents = response.body().await; - log::trace!( + tracing::trace!( + ?response, "Unhandled response {} with body: {:?}", code.as_u16(), contents, @@ -191,7 +203,7 @@ impl PushService for AwcPushService { }, })?; - log::debug!("AwcPushService::get response: {:?}", response); + let _span = tracing::debug_span!("processing response", ?response); Self::from_response(&mut response).await?; @@ -203,7 +215,7 @@ impl PushService for AwcPushService { // actix already imports that anyway. let text = match response.body().await { Ok(text) => { - log::debug!( + tracing::debug!( "GET response: {:?}", String::from_utf8_lossy(&text) ); @@ -249,7 +261,8 @@ impl PushService for AwcPushService { }, })?; - log::debug!("AwcPushService::delete response: {:?}", response); + let _span = + tracing::debug_span!("processing response", ?response).entered(); Self::from_response(&mut response).await?; @@ -261,7 +274,7 @@ impl PushService for AwcPushService { // actix already imports that anyway. let text = match response.body().await { Ok(text) => { - log::debug!( + tracing::debug!( "GET response: {:?}", String::from_utf8_lossy(&text) ); @@ -303,7 +316,8 @@ impl PushService for AwcPushService { reason: e.to_string(), })?; - log::debug!("AwcPushService::put response: {:?}", response); + let _span = + tracing::debug_span!("processing response", ?response).entered(); Self::from_response(&mut response).await?; @@ -315,7 +329,7 @@ impl PushService for AwcPushService { // actix already imports that anyway. let text = match response.body().await { Ok(text) => { - log::debug!( + tracing::debug!( "GET response: {:?}", String::from_utf8_lossy(&text) ); @@ -356,7 +370,8 @@ impl PushService for AwcPushService { reason: e.to_string(), })?; - log::debug!("AwcPushService::patch response: {:?}", response); + let _span = + tracing::debug_span!("processing response", ?response).entered(); Self::from_response(&mut response).await?; @@ -368,7 +383,7 @@ impl PushService for AwcPushService { // actix already imports that anyway. let text = match response.body().await { Ok(text) => { - log::debug!( + tracing::debug!( "PATCH response: {:?}", String::from_utf8_lossy(&text) ); @@ -409,7 +424,8 @@ impl PushService for AwcPushService { reason: e.to_string(), })?; - log::debug!("AwcPushService::post response: {:?}", response); + let _span = + tracing::debug_span!("processing response", ?response).entered(); Self::from_response(&mut response).await?; @@ -421,7 +437,7 @@ impl PushService for AwcPushService { // actix already imports that anyway. let text = match response.body().await { Ok(text) => { - log::debug!( + tracing::debug!( "GET response: {:?}", String::from_utf8_lossy(&text) ); @@ -527,7 +543,8 @@ impl PushService for AwcPushService { reason: e.to_string(), })?; - log::debug!("AwcPushService::get_stream response: {:?}", response); + let _span = + tracing::debug_span!("processing response", ?response).entered(); Self::from_response(&mut response).await?; @@ -600,7 +617,7 @@ impl PushService for AwcPushService { // Unwrap, because no error type was used above body_contents.extend(b.unwrap()); } - log::trace!( + tracing::trace!( "Sending PUT with Content-Type={} and length {}", content_type, body_contents.len() @@ -615,7 +632,8 @@ impl PushService for AwcPushService { reason: e.to_string(), })?; - log::debug!("AwcPushService::put response: {:?}", response); + let _span = + tracing::debug_span!("processing response", ?response).entered(); Self::from_response(&mut response).await?; @@ -629,6 +647,7 @@ impl PushService for AwcPushService { additional_headers: &[(&str, &str)], credentials: Option, ) -> Result { + let span = tracing::debug_span!("websocket"); let (ws, stream) = AwcWebSocket::with_client( &mut self.client, self.cfg.base_url(Endpoint::Service), @@ -636,13 +655,14 @@ impl PushService for AwcPushService { additional_headers, credentials.as_ref(), ) + .instrument(span.clone()) .await?; let (ws, task) = SignalWebSocket::from_socket( ws, stream, keep_alive_path.to_owned(), ); - actix_rt::spawn(task); + actix_rt::spawn(task.instrument(span)); Ok(ws) } } diff --git a/libsignal-service-actix/src/websocket.rs b/libsignal-service-actix/src/websocket.rs index 5b224e720..3921cf79d 100644 --- a/libsignal-service-actix/src/websocket.rs +++ b/libsignal-service-actix/src/websocket.rs @@ -86,9 +86,9 @@ where futures::pin_mut!(tick); futures::select! { _ = tick => { - log::trace!("Triggering keep-alive"); + tracing::trace!("Triggering keep-alive"); if let Err(e) = incoming_sink.send(WebSocketStreamItem::KeepAliveRequest).await { - log::info!("Websocket sink has closed: {:?}.", e); + tracing::info!("Websocket sink has closed: {:?}.", e); break; }; }, @@ -96,7 +96,7 @@ where let frame = if let Some(frame) = frame { frame } else { - log::info!("process: Socket stream ended"); + tracing::info!("process: Socket stream ended"); break; }; @@ -105,24 +105,24 @@ where Frame::Continuation(_c) => todo!(), Frame::Ping(msg) => { - log::warn!("Received Ping({:?})", msg); + tracing::warn!(?msg, "received Ping"); continue; }, Frame::Pong(msg) => { - log::trace!("Received Pong({:?})", msg); + tracing::trace!(?msg, "received Pong"); continue; }, Frame::Text(frame) => { - log::warn!("Frame::Text {:?}", frame); + tracing::warn!(?frame, "frame::Text",); // this is a protocol violation, maybe break; is better? continue; }, Frame::Close(c) => { - log::warn!("Websocket closing: {:?}", c); + tracing::warn!(?c, "Websocket closing"); break; }, @@ -130,7 +130,7 @@ where // Match SendError if let Err(e) = incoming_sink.send(WebSocketStreamItem::Message(frame)).await { - log::info!("Websocket sink has closed: {:?}.", e); + tracing::info!("Websocket sink has closed: {:?}.", e); break; } }, @@ -160,14 +160,20 @@ impl AwcWebSocket { ); } - log::trace!("Will start websocket at {:?}", url); + tracing::trace!( + url.scheme = url.scheme(), + url.host = ?url.host(), + url.path = url.path(), + url.has_query = ?url.query().is_some(), + "starting websocket", + ); let mut ws = client.ws(url.as_str()); for (key, value) in additional_headers { ws = ws.header(*key, *value); } let (response, framed) = ws.connect().await?; - log::debug!("WebSocket connected: {:?}", response); + tracing::debug!(?response, "WebSocket connected"); let (incoming_sink, incoming_stream) = channel(5); @@ -179,7 +185,7 @@ impl AwcWebSocket { actix_rt::spawn(processing_task.map(|v| match v { Ok(()) => (), Err(e) => { - log::warn!("Processing task terminated with error: {:?}", e) + tracing::warn!("Processing task terminated with error: {:?}", e) }, })); diff --git a/libsignal-service-hyper/Cargo.toml b/libsignal-service-hyper/Cargo.toml index a62147b57..d874aac8d 100644 --- a/libsignal-service-hyper/Cargo.toml +++ b/libsignal-service-hyper/Cargo.toml @@ -12,7 +12,8 @@ libsignal-service = { path = "../libsignal-service" } async-trait = "0.1" bytes = "1.0" futures = "0.3" -log = "0.4" +tracing = "0.1" +tracing-futures = "0.2" mpart-async = "0.6" serde = "1.0" serde_json = "1.0" diff --git a/libsignal-service-hyper/src/push_service.rs b/libsignal-service-hyper/src/push_service.rs index f13648cb3..952cce4b3 100644 --- a/libsignal-service-hyper/src/push_service.rs +++ b/libsignal-service-hyper/src/push_service.rs @@ -92,7 +92,7 @@ impl HyperPushService { body: Option, ) -> Result, ServiceError> { let url = self.cfg.base_url(endpoint).join(path.as_ref())?; - log::debug!("HTTP request {} {}", method, url); + tracing::debug!("HTTP request {} {}", method, url); let mut builder = Request::builder() .method(method) .uri(url.as_str()) @@ -159,7 +159,7 @@ impl HyperPushService { StatusCode::CONFLICT => { let mismatched_devices = Self::json(&mut response).await.map_err(|e| { - log::error!( + tracing::error!( "Failed to decode HTTP 409 response: {}", e ); @@ -174,7 +174,7 @@ impl HyperPushService { StatusCode::GONE => { let stale_devices = Self::json(&mut response).await.map_err(|e| { - log::error!( + tracing::error!( "Failed to decode HTTP 410 response: {}", e ); @@ -187,7 +187,7 @@ impl HyperPushService { StatusCode::PRECONDITION_REQUIRED => { let proof_required = Self::json(&mut response).await.map_err(|e| { - log::error!( + tracing::error!( "Failed to decode HTTP 428 response: {}", e ); @@ -200,7 +200,7 @@ impl HyperPushService { }, // XXX: fill in rest from PushServiceSocket code => { - log::trace!( + tracing::trace!( "Unhandled response {} with body: {}", code.as_u16(), Self::text(&mut response).await?, @@ -554,7 +554,7 @@ impl PushService for HyperPushService { // Unwrap, because no error type was used above body_contents.extend(b.unwrap()); } - log::trace!( + tracing::trace!( "Sending PUT with Content-Type={} and length {}", content_type, body_contents.len() @@ -574,7 +574,7 @@ impl PushService for HyperPushService { ) .await?; - log::debug!("HyperPushService::PUT response: {:?}", response); + tracing::debug!("HyperPushService::PUT response: {:?}", response); Ok(()) } diff --git a/libsignal-service-hyper/src/websocket.rs b/libsignal-service-hyper/src/websocket.rs index 7747686ea..34624c0b3 100644 --- a/libsignal-service-hyper/src/websocket.rs +++ b/libsignal-service-hyper/src/websocket.rs @@ -103,9 +103,9 @@ where loop { tokio::select! { _ = ka_interval.tick() => { - log::trace!("Triggering keep-alive"); + tracing::trace!("Triggering keep-alive"); if let Err(e) = incoming_sink.send(WebSocketStreamItem::KeepAliveRequest).await { - log::info!("Websocket sink has closed: {:?}.", e); + tracing::info!("Websocket sink has closed: {:?}.", e); break; }; }, @@ -113,31 +113,31 @@ where let frame = if let Some(frame) = frame { frame } else { - log::info!("process: Socket stream ended"); + tracing::info!("process: Socket stream ended"); break; }; let frame = match frame? { Message::Binary(s) => s, Message::Ping(msg) => { - log::warn!("Received Ping({:?})", msg); + tracing::warn!("Received Ping({:?})", msg); continue; }, Message::Pong(msg) => { - log::trace!("Received Pong({:?})", msg); + tracing::trace!("Received Pong({:?})", msg); continue; }, Message::Text(frame) => { - log::warn!("Message::Text {:?}", frame); + tracing::warn!("Message::Text {:?}", frame); // this is a protocol violation, maybe break; is better? continue; }, Message::Close(c) => { - log::warn!("Websocket closing: {:?}", c); + tracing::warn!("Websocket closing: {:?}", c); break; }, @@ -146,7 +146,7 @@ where // Match SendError if let Err(e) = incoming_sink.send(WebSocketStreamItem::Message(Bytes::from(frame))).await { - log::info!("Websocket sink has closed: {:?}.", e); + tracing::info!("Websocket sink has closed: {:?}.", e); break; } }, @@ -181,7 +181,7 @@ impl TungsteniteWebSocket { ); } - log::trace!("Will start websocket at {:?}", url); + tracing::trace!("Will start websocket at {:?}", url); let mut request = url.into_client_request()?; @@ -198,7 +198,7 @@ impl TungsteniteWebSocket { connect_async_with_tls_connector(request, Some(tls_connector)) .await?; - log::debug!("WebSocket connected: {:?}", response); + tracing::debug!("WebSocket connected: {:?}", response); let (incoming_sink, incoming_stream) = channel(5); @@ -210,7 +210,7 @@ impl TungsteniteWebSocket { tokio::spawn(processing_task.map(|v| match v { Ok(()) => (), Err(e) => { - log::warn!("Processing task terminated with error: {:?}", e) + tracing::warn!("Processing task terminated with error: {:?}", e) }, })); diff --git a/libsignal-service/Cargo.toml b/libsignal-service/Cargo.toml index 6c6a4b6b6..f45f9009c 100644 --- a/libsignal-service/Cargo.toml +++ b/libsignal-service/Cargo.toml @@ -24,7 +24,6 @@ futures = "0.3" hex = "0.4" hkdf = "0.12" hmac = "0.12" -log = "0.4" phonenumber = "0.3" prost = "0.10" rand = "0.8" @@ -35,6 +34,9 @@ thiserror = "1.0" url = { version = "2.1", features = ["serde"] } uuid = { version = "1", features = ["serde"] } +tracing = { version = "0.1", features = ["log"] } +tracing-futures = "0.2" + [build-dependencies] prost-build = "0.10" diff --git a/libsignal-service/src/account_manager.rs b/libsignal-service/src/account_manager.rs index 5dac2b1b5..cc2e2d1a2 100644 --- a/libsignal-service/src/account_manager.rs +++ b/libsignal-service/src/account_manager.rs @@ -13,6 +13,7 @@ use libsignal_protocol::{ use prost::Message; use serde::{Deserialize, Serialize}; use sha2::Sha256; +use tracing_futures::Instrument; use zkgroup::profiles::ProfileKey; use crate::pre_keys::KyberPreKeyEntity; @@ -89,6 +90,7 @@ impl AccountManager { /// /// Returns the next pre-key offset, pq pre-key offset, and next signed pre-key offset as a tuple. #[allow(clippy::too_many_arguments)] + #[tracing::instrument(skip(self, protocol_store, csprng))] pub async fn update_pre_key_bundle< R: rand::Rng + rand::CryptoRng, P: ProtocolStore, @@ -104,11 +106,15 @@ impl AccountManager { let prekey_status = match self .service .get_pre_key_status(ServiceIdType::AccountIdentity) + .instrument(tracing::span!( + tracing::Level::DEBUG, + "Fetching pre key status" + )) .await { Ok(status) => status, Err(ServiceError::Unauthorized) => { - log::info!("Got Unauthorized when fetching pre-key status. Assuming first installment."); + tracing::info!("Got Unauthorized when fetching pre-key status. Assuming first installment."); // Additionally, the second PUT request will fail if this really comes down to an // authorization failure. crate::push_service::PreKeyStatus { @@ -118,12 +124,12 @@ impl AccountManager { }, Err(e) => return Err(e), }; - log::trace!("Remaining pre-keys on server: {:?}", prekey_status); + tracing::trace!("Remaining pre-keys on server: {:?}", prekey_status); if prekey_status.count >= PRE_KEY_MINIMUM && prekey_status.pq_count >= PRE_KEY_MINIMUM { - log::info!("Available keys sufficient"); + tracing::info!("Available keys sufficient"); return Ok(( pre_keys_offset_id, pq_pre_keys_offset_id, @@ -131,104 +137,115 @@ impl AccountManager { )); } - let identity_key_pair = protocol_store.get_identity_key_pair().await?; - - let mut pre_key_entities = vec![]; - let mut pq_pre_key_entities = vec![]; - - // EC keys - for i in 0..PRE_KEY_BATCH_SIZE { - let key_pair = KeyPair::generate(csprng); - let pre_key_id = (((pre_keys_offset_id + i) - % (PRE_KEY_MEDIUM_MAX_VALUE - 1)) - + 1) - .into(); - let pre_key_record = PreKeyRecord::new(pre_key_id, &key_pair); - protocol_store - .save_pre_key(pre_key_id, &pre_key_record) - .await?; - // TODO: Shouldn't this also remove the previous pre-keys from storage? - // I think we might want to update the storage, and then sync the storage to the - // server. - - pre_key_entities.push(PreKeyEntity::try_from(pre_key_record)?); - } - - // Kyber keys - for i in 0..PRE_KEY_BATCH_SIZE { - let pre_key_id = (((pq_pre_keys_offset_id + i) - % (PRE_KEY_MEDIUM_MAX_VALUE - 1)) - + 1) - .into(); - let pre_key_record = KyberPreKeyRecord::generate( - kem::KeyType::Kyber1024, - pre_key_id, - identity_key_pair.private_key(), - )?; - protocol_store - .save_kyber_pre_key(pre_key_id, &pre_key_record) - .await?; - // TODO: Shouldn't this also remove the previous pre-keys from storage? - // I think we might want to update the storage, and then sync the storage to the - // server. - - pq_pre_key_entities - .push(KyberPreKeyEntity::try_from(pre_key_record)?); - } - - // Generate and store the next signed prekey - let signed_pre_key_pair = KeyPair::generate(csprng); - let signed_pre_key_public = signed_pre_key_pair.public_key; - let signed_pre_key_signature = identity_key_pair - .private_key() - .calculate_signature(&signed_pre_key_public.serialize(), csprng)?; - - let unix_time = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap(); - - let signed_prekey_record = SignedPreKeyRecord::new( - next_signed_pre_key_id.into(), - unix_time.as_millis() as u64, - &signed_pre_key_pair, - &signed_pre_key_signature, - ); - - protocol_store - .save_signed_pre_key( + let pre_key_state = { + let span = + tracing::span!(tracing::Level::DEBUG, "Generating pre keys"); + + let identity_key_pair = + protocol_store.get_identity_key_pair().instrument(tracing::trace_span!(parent: &span, "get identity key pair")).await?; + + let mut pre_key_entities = vec![]; + let mut pq_pre_key_entities = vec![]; + + // EC keys + for i in 0..PRE_KEY_BATCH_SIZE { + let key_pair = KeyPair::generate(csprng); + let pre_key_id = (((pre_keys_offset_id + i) + % (PRE_KEY_MEDIUM_MAX_VALUE - 1)) + + 1) + .into(); + let pre_key_record = PreKeyRecord::new(pre_key_id, &key_pair); + protocol_store + .save_pre_key(pre_key_id, &pre_key_record) + .instrument(tracing::trace_span!(parent: &span, "save pre key", ?pre_key_id)).await?; + // TODO: Shouldn't this also remove the previous pre-keys from storage? + // I think we might want to update the storage, and then sync the storage to the + // server. + + pre_key_entities.push(PreKeyEntity::try_from(pre_key_record)?); + } + + // Kyber keys + for i in 0..PRE_KEY_BATCH_SIZE { + let pre_key_id = (((pq_pre_keys_offset_id + i) + % (PRE_KEY_MEDIUM_MAX_VALUE - 1)) + + 1) + .into(); + let pre_key_record = KyberPreKeyRecord::generate( + kem::KeyType::Kyber1024, + pre_key_id, + identity_key_pair.private_key(), + )?; + protocol_store + .save_kyber_pre_key(pre_key_id, &pre_key_record) + .instrument(tracing::trace_span!(parent: &span, "save kyber pre key", ?pre_key_id)).await?; + // TODO: Shouldn't this also remove the previous pre-keys from storage? + // I think we might want to update the storage, and then sync the storage to the + // server. + + pq_pre_key_entities + .push(KyberPreKeyEntity::try_from(pre_key_record)?); + } + + // Generate and store the next signed prekey + let signed_pre_key_pair = KeyPair::generate(csprng); + let signed_pre_key_public = signed_pre_key_pair.public_key; + let signed_pre_key_signature = + identity_key_pair.private_key().calculate_signature( + &signed_pre_key_public.serialize(), + csprng, + )?; + + let unix_time = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); + + let signed_prekey_record = SignedPreKeyRecord::new( next_signed_pre_key_id.into(), - &signed_prekey_record, - ) - .await?; + unix_time.as_millis() as u64, + &signed_pre_key_pair, + &signed_pre_key_signature, + ); - let pre_key_state = PreKeyState { - pre_keys: pre_key_entities, - signed_pre_key: signed_prekey_record.try_into()?, - identity_key: *identity_key_pair.public_key(), - pq_pre_keys: pq_pre_key_entities, - pq_last_resort_key: if use_last_resort_key { - log::warn!("Last resort Kyber key unimplemented"); - // Note about the last-resort key: - // mark_kyber_pre_key_used() should retain the last-resort key, but can safely - // remove the ephemeral pre keys. This implies that generating the last-resort key - // should notify the pre-key store, when saving the key, that it concerns a - // last-resort key. I don't see how this can be communicated to the store, and I - // fear that we need to reengineer the whole prekeystore system as a whole. - None - // Some(KyberPreKeyEntity { - // key_id: 0x7fffffff, - // public_key: "NDI=".into(), - // }) - } else { - None - }, + protocol_store + .save_signed_pre_key( + next_signed_pre_key_id.into(), + &signed_prekey_record, + ) + .instrument(tracing::trace_span!(parent: &span, "save signed pre key", signed_pre_key_id = ?next_signed_pre_key_id)).await?; + + PreKeyState { + pre_keys: pre_key_entities, + signed_pre_key: signed_prekey_record.try_into()?, + identity_key: *identity_key_pair.public_key(), + pq_pre_keys: pq_pre_key_entities, + pq_last_resort_key: if use_last_resort_key { + tracing::warn!("Last resort Kyber key unimplemented"); + // Note about the last-resort key: + // mark_kyber_pre_key_used() should retain the last-resort key, but can safely + // remove the ephemeral pre keys. This implies that generating the last-resort key + // should notify the pre-key store, when saving the key, that it concerns a + // last-resort key. I don't see how this can be communicated to the store, and I + // fear that we need to reengineer the whole prekeystore system as a whole. + None + // Some(KyberPreKeyEntity { + // key_id: 0x7fffffff, + // public_key: "NDI=".into(), + // }) + } else { + None + }, + } }; self.service .register_pre_keys(ServiceIdType::AccountIdentity, pre_key_state) + .instrument(tracing::span!( + tracing::Level::DEBUG, + "Uploading pre keys" + )) .await?; - log::trace!("Successfully refreshed prekeys"); Ok(( pre_keys_offset_id + PRE_KEY_BATCH_SIZE, pq_pre_keys_offset_id + PRE_KEY_BATCH_SIZE, @@ -313,7 +330,7 @@ impl AccountManager { let identity_key_pair = identity_store.get_identity_key_pair().await?; if credentials.uuid.is_none() { - log::warn!("No local UUID set"); + tracing::warn!("No local UUID set"); } let provisioning_code = self.new_device_provisioning_code().await?; diff --git a/libsignal-service/src/attachment_cipher.rs b/libsignal-service/src/attachment_cipher.rs index 06eeb2c6e..ebe0ad23a 100644 --- a/libsignal-service/src/attachment_cipher.rs +++ b/libsignal-service/src/attachment_cipher.rs @@ -18,6 +18,7 @@ pub enum AttachmentCipherError { /// /// The Vec will be reused when it has enough space to house the MAC, /// otherwise reallocation might happen. +#[tracing::instrument(skip(iv, key, plaintext))] pub fn encrypt_in_place(iv: [u8; 16], key: [u8; 64], plaintext: &mut Vec) { let aes_half = &key[..32]; let mac_half = &key[32..]; @@ -53,6 +54,7 @@ pub fn encrypt_in_place(iv: [u8; 16], key: [u8; 64], plaintext: &mut Vec) { /// Decrypts an attachment in place, given the key material. /// /// On error, ciphertext is not changed. +#[tracing::instrument(skip(key, ciphertext))] pub fn decrypt_in_place( key: [u8; 64], ciphertext: &mut Vec, diff --git a/libsignal-service/src/cipher.rs b/libsignal-service/src/cipher.rs index b91cc5ee0..0922253de 100644 --- a/libsignal-service/src/cipher.rs +++ b/libsignal-service/src/cipher.rs @@ -1,4 +1,4 @@ -use std::{convert::TryFrom, time::SystemTime}; +use std::{convert::TryFrom, fmt, time::SystemTime}; use aes::cipher::block_padding::{Iso7816, RawPadding}; use libsignal_protocol::{ @@ -34,6 +34,43 @@ pub struct ServiceCipher { local_device_id: u32, } +impl fmt::Debug for ServiceCipher { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ServiceCipher") + .field("protocol_store", &"...") + .field("csprng", &"...") + .field("trust_root", &"...") + .field("local_uuid", &self.local_uuid) + .field("local_device_id", &self.local_device_id) + .finish() + } +} + +fn debug_envelope(envelope: &Envelope) -> String { + if envelope.content.is_none() { + "Envelope { empty }".to_string() + } else { + format!( + "Envelope {{ \ + source_address: {}, \ + source_device: {:?}, \ + server_guid: {:?}, \ + timestamp: {:?}, \ + content: {} bytes, \ + }}", + if envelope.source_service_id.is_some() { + format!("{:?}", envelope.source_address()) + } else { + "unknown".to_string() + }, + envelope.source_device(), + envelope.server_guid(), + envelope.timestamp(), + envelope.content().len(), + ) + } +} + impl ServiceCipher where S: ProtocolStore + KyberPreKeyStore + SenderKeyStore + Clone, @@ -58,6 +95,7 @@ where /// Opens ("decrypts") an envelope. /// /// Envelopes may be empty, in which case this method returns `Ok(None)` + #[tracing::instrument(skip(envelope), fields(envelope = debug_envelope(&envelope)))] pub async fn open_envelope( &mut self, envelope: Envelope, @@ -89,6 +127,7 @@ where /// Triage of legacy messages happens inside this method, as opposed to the /// Java implementation, because it makes the borrow checker and the /// author happier. + #[tracing::instrument(skip(envelope), fields(envelope = debug_envelope(envelope)))] async fn decrypt( &mut self, envelope: &Envelope, @@ -107,7 +146,11 @@ where envelope.server_guid.as_ref().and_then(|g| match g.parse() { Ok(uuid) => Some(uuid), Err(e) => { - log::error!("Unparseable server_guid ({})", e); + tracing::error!( + ?envelope, + "Unparseable server_guid ({})", + e + ); None }, }); @@ -157,7 +200,7 @@ where Plaintext { metadata, data } }, Type::PlaintextContent => { - log::warn!("Envelope with plaintext content. This usually indicates a decryption retry."); + tracing::warn!(?envelope, "Envelope with plaintext content. This usually indicates a decryption retry."); let metadata = Metadata { sender: envelope.source_address(), sender_device: envelope.source_device(), @@ -241,7 +284,7 @@ where }; let needs_receipt = if envelope.source_service_id.is_some() { - log::warn!("Received an unidentified delivery over an identified channel. Marking needs_receipt=false"); + tracing::warn!(?envelope, "Received an unidentified delivery over an identified channel. Marking needs_receipt=false"); false } else { true @@ -276,6 +319,7 @@ where Ok(plaintext) } + #[tracing::instrument] pub(crate) async fn encrypt( &mut self, address: &ProtocolAddress, @@ -426,6 +470,21 @@ pub async fn get_preferred_protocol_address( /// is then validated against the `trust_root` baked into the client to ensure that the sender's /// identity was not forged. #[allow(clippy::too_many_arguments)] +#[tracing::instrument( + skip( + ciphertext, + trust_root, + identity_store, + session_store, + pre_key_store, + signed_pre_key_store, + sender_key_store, + kyber_pre_key_store + ), + fields( + ciphertext = ciphertext.len(), + ) +)] async fn sealed_sender_decrypt( ciphertext: &[u8], trust_root: &PublicKey, diff --git a/libsignal-service/src/envelope.rs b/libsignal-service/src/envelope.rs index c90a36330..46efe94f7 100644 --- a/libsignal-service/src/envelope.rs +++ b/libsignal-service/src/envelope.rs @@ -27,16 +27,17 @@ impl TryFrom for Envelope { } impl Envelope { + #[tracing::instrument(skip(input, signaling_key), fields(input_size = input.len()))] pub fn decrypt( input: &[u8], signaling_key: &SignalingKey, is_signaling_key_encrypted: bool, ) -> Result { if !is_signaling_key_encrypted { - log::trace!("Envelope::decrypt: not encrypted"); + tracing::trace!("Envelope::decrypt: not encrypted"); Ok(Envelope::decode(input)?) } else { - log::trace!("Envelope::decrypt: decrypting"); + tracing::trace!("Envelope::decrypt: decrypting"); if input.len() < VERSION_LENGTH || input[VERSION_OFFSET] != SUPPORTED_VERSION { @@ -77,7 +78,7 @@ impl Envelope { .decrypt_padded_vec_mut::(input) .expect("decryption"); - log::trace!("Envelope::decrypt: decrypted, decoding"); + tracing::trace!("Envelope::decrypt: decrypted, decoding"); Ok(Envelope::decode(&input as &[u8])?) } @@ -135,8 +136,10 @@ impl Envelope { let uuid = self .source_service_id .as_deref() - .and_then(|u| Uuid::parse_str(u).ok()) - .expect("valid uuid checked in constructor"); + .map(Uuid::parse_str) + .transpose() + .expect("valid uuid checked in constructor") + .expect("source_service_id is set"); ServiceAddress { uuid } } diff --git a/libsignal-service/src/groups_v2/manager.rs b/libsignal-service/src/groups_v2/manager.rs index fdafcf614..c132483e9 100644 --- a/libsignal-service/src/groups_v2/manager.rs +++ b/libsignal-service/src/groups_v2/manager.rs @@ -203,7 +203,7 @@ impl GroupsManager { credential_response, ) .map_err(|e| { - log::error!( + tracing::error!( "failed to receive auth credentials with PNI: {:?}", e ); diff --git a/libsignal-service/src/groups_v2/operations.rs b/libsignal-service/src/groups_v2/operations.rs index 7f53b4b8e..a78e986f6 100644 --- a/libsignal-service/src/groups_v2/operations.rs +++ b/libsignal-service/src/groups_v2/operations.rs @@ -163,7 +163,7 @@ impl GroupOperations { if bytes.is_empty() { GroupAttributeBlob::default() } else if bytes.len() < 29 { - log::warn!("bad encrypted blob length"); + tracing::warn!("bad encrypted blob length"); GroupAttributeBlob::default() } else { self.group_secret_params @@ -174,7 +174,7 @@ impl GroupOperations { .map_err(GroupDecodingError::ProtobufDecodeError) }) .unwrap_or_else(|e| { - log::warn!("bad encrypted blob: {}", e); + tracing::warn!("bad encrypted blob: {}", e); GroupAttributeBlob::default() }) } diff --git a/libsignal-service/src/messagepipe.rs b/libsignal-service/src/messagepipe.rs index b33bb2d43..683afe909 100644 --- a/libsignal-service/src/messagepipe.rs +++ b/libsignal-service/src/messagepipe.rs @@ -72,7 +72,7 @@ impl MessagePipe { { sink.send(env).await?; } else { - log::trace!("got empty message in websocket"); + tracing::trace!("got empty message in websocket"); } } @@ -129,7 +129,7 @@ impl MessagePipe { let stream = stream.map(Some); let runner = self.run(sink).map(|e| { - log::info!("sink was closed: {:?}", e); + tracing::info!("sink was closed: {:?}", e); None }); diff --git a/libsignal-service/src/models.rs b/libsignal-service/src/models.rs index 57f0a362c..0ff81cf95 100644 --- a/libsignal-service/src/models.rs +++ b/libsignal-service/src/models.rs @@ -82,7 +82,7 @@ impl Contact { reader: avatar_data, }) } else { - log::warn!("missing avatar content-type, skipping."); + tracing::warn!("missing avatar content-type, skipping."); None } }), diff --git a/libsignal-service/src/proto.rs b/libsignal-service/src/proto.rs index cf7d27d93..f24d08037 100644 --- a/libsignal-service/src/proto.rs +++ b/libsignal-service/src/proto.rs @@ -27,7 +27,7 @@ impl WebSocketRequestMessage { for header in &self.headers { let parts: Vec<_> = header.split(':').collect(); if parts.len() != 2 { - log::warn!( + tracing::warn!( "Got a weird header: {:?}, split in {:?}", header, parts diff --git a/libsignal-service/src/provisioning/pipe.rs b/libsignal-service/src/provisioning/pipe.rs index b942f75ec..6b9c9422a 100644 --- a/libsignal-service/src/provisioning/pipe.rs +++ b/libsignal-service/src/provisioning/pipe.rs @@ -153,7 +153,7 @@ impl ProvisioningPipe { let stream = stream.map(Some); let runner = self.run(sink).map(|_| { - log::info!("Sink closed, provisioning is done!"); + tracing::info!("Sink closed, provisioning is done!"); None }); diff --git a/libsignal-service/src/push_service.rs b/libsignal-service/src/push_service.rs index 155eb2919..09938f5b3 100644 --- a/libsignal-service/src/push_service.rs +++ b/libsignal-service/src/push_service.rs @@ -1110,7 +1110,7 @@ pub trait PushService: MaybeSend { (Err(e), _) => Err(e), (Ok(_resp), AvatarWrite::RetainAvatar) | (Ok(_resp), AvatarWrite::NoAvatar) => { - log::warn!( + tracing::warn!( "No avatar supplied but got avatar upload URL. Ignoring" ); Ok(None) diff --git a/libsignal-service/src/receiver.rs b/libsignal-service/src/receiver.rs index cef614745..9c8d5920f 100644 --- a/libsignal-service/src/receiver.rs +++ b/libsignal-service/src/receiver.rs @@ -81,7 +81,7 @@ impl MessageReceiver { match r { Ok(stream) => break stream, Err(ServiceError::Timeout { .. }) => { - log::warn!("get_attachment timed out, retrying"); + tracing::warn!("get_attachment timed out, retrying"); retries += 1; if retries >= MAX_DOWNLOAD_RETRIES { return Err(ServiceError::Timeout { diff --git a/libsignal-service/src/sender.rs b/libsignal-service/src/sender.rs index cf9884001..09c56eb02 100644 --- a/libsignal-service/src/sender.rs +++ b/libsignal-service/src/sender.rs @@ -5,8 +5,9 @@ use libsignal_protocol::{ process_prekey_bundle, DeviceId, ProtocolStore, SenderCertificate, SenderKeyStore, SignalProtocolError, }; -use log::{debug, info, trace}; use rand::{CryptoRng, Rng}; +use tracing::{info, trace}; +use tracing_futures::Instrument; use uuid::Uuid; use crate::{ @@ -61,6 +62,7 @@ pub struct SentMessage { /// Attachment specification to be used for uploading. /// /// Loose equivalent of Java's `SignalServiceAttachmentStream`. +#[derive(Debug)] pub struct AttachmentSpec { pub content_type: String, pub length: usize, @@ -150,6 +152,7 @@ where /// Encrypts and uploads an attachment /// /// Contents are accepted as an owned, plain text Vec, because encryption happens in-place. + #[tracing::instrument(skip(self, contents), fields(size = contents.len()))] pub async fn upload_attachment( &mut self, spec: AttachmentSpec, @@ -179,7 +182,7 @@ where ) }; if padded_len < len { - log::error!( + tracing::error!( "Padded len {} < len {}. Continuing with a privacy risk.", padded_len, len @@ -188,19 +191,20 @@ where contents.resize(padded_len, 0); } - crate::attachment_cipher::encrypt_in_place(iv, key, &mut contents); + tracing::trace_span!("encrypting attachment").in_scope(|| { + crate::attachment_cipher::encrypt_in_place(iv, key, &mut contents) + }); // Request upload attributes - log::trace!("Requesting upload attributes"); let attrs = self .identified_ws .get_attachment_v2_upload_attributes() + .instrument(tracing::trace_span!("requesting upload attributes")) .await?; - - log::trace!("Uploading attachment"); let (id, digest) = self .service .upload_attachment(&attrs, &mut std::io::Cursor::new(&contents)) + .instrument(tracing::trace_span!("Uploading attachment")) .await?; Ok(AttachmentPointer { @@ -240,6 +244,7 @@ where /// Upload contact details to the CDN /// /// Returns attachment ID and the attachment digest + #[tracing::instrument(skip(self, contacts))] async fn upload_contact_details( &mut self, contacts: Contacts, @@ -287,6 +292,10 @@ where } /// Send a message `content` to a single `recipient`. + #[tracing::instrument( + skip(self, unidentified_access, message), + fields(unidentified_access = unidentified_access.is_some()), + )] pub async fn send_message( &mut self, recipient: &ServiceAddress, @@ -329,7 +338,7 @@ where ContentBody::DataMessage(message), Ok(SentMessage { needs_sync, .. }), ) if *needs_sync || self.is_multi_device().await => { - log::debug!("sending multi-device sync message"); + tracing::debug!("sending multi-device sync message"); let sync_message = self .create_multi_device_sent_transcript_content( Some(recipient), @@ -351,13 +360,17 @@ where if end_session { let n = self.protocol_store.delete_all_sessions(recipient).await?; - log::debug!("ended {} sessions with {}", n, recipient.uuid); + tracing::debug!("ended {} sessions with {}", n, recipient.uuid); } results.remove(0) } /// Send a message to the recipients in a group. + #[tracing::instrument( + skip(self, recipients, message), + fields(recipients = recipients.as_ref().len()), + )] pub async fn send_message_to_group( &mut self, recipients: impl AsRef<[(ServiceAddress, Option)]>, @@ -398,7 +411,9 @@ where if needs_sync_in_results && message.is_none() { // XXX: does this happen? - log::warn!("Server claims need sync, but not sending datamessage."); + tracing::warn!( + "Server claims need sync, but not sending datamessage." + ); } // we only need to send a synchronization message once @@ -430,6 +445,11 @@ where } /// Send a message (`content`) to an address (`recipient`). + #[tracing::instrument( + level = "trace", + skip(self, unidentified_access, content_body), + fields(unidentified_access = unidentified_access.is_some()), + )] async fn try_send_message( &mut self, recipient: ServiceAddress, @@ -461,18 +481,18 @@ where }; let send = if let Some(unidentified) = &unidentified_access { - log::debug!("sending via unidentified"); + tracing::debug!("sending via unidentified"); self.unidentified_ws .send_messages_unidentified(messages, unidentified) .await } else { - log::debug!("sending identified"); + tracing::debug!("sending identified"); self.identified_ws.send_messages(messages).await }; match send { Ok(SendMessageResponse { needs_sync }) => { - log::debug!("message sent!"); + tracing::debug!("message sent!"); return Ok(SentMessage { recipient, unidentified: unidentified_access.is_some(), @@ -482,13 +502,13 @@ where Err(ServiceError::Unauthorized) if unidentified_access.is_some() => { - log::trace!("unauthorized error using unidentified; retry over identified"); + tracing::trace!("unauthorized error using unidentified; retry over identified"); unidentified_access = None; }, Err(ServiceError::MismatchedDevicesException(ref m)) => { - log::debug!("{:?}", m); + tracing::debug!("{:?}", m); for extra_device_id in &m.extra_devices { - log::debug!( + tracing::debug!( "dropping session with device {}", extra_device_id ); @@ -501,7 +521,7 @@ where } for missing_device_id in &m.missing_devices { - log::debug!( + tracing::debug!( "creating session with missing device {}", missing_device_id ); @@ -522,7 +542,7 @@ where ) .await .map_err(|e| { - log::error!("failed to create session: {}", e); + tracing::error!("failed to create session: {}", e); MessageSenderError::UntrustedIdentity { address: recipient, } @@ -530,9 +550,9 @@ where } }, Err(ServiceError::StaleDevices(ref m)) => { - log::debug!("{:?}", m); + tracing::debug!("{:?}", m); for extra_device_id in &m.stale_devices { - log::debug!( + tracing::debug!( "dropping session with device {}", extra_device_id ); @@ -545,20 +565,20 @@ where } }, Err(ServiceError::ProofRequiredError(ref p)) => { - log::debug!("{:?}", p); + tracing::debug!("{:?}", p); return Err(MessageSenderError::ProofRequired { token: p.token.clone(), options: p.options.clone(), }); }, Err(ServiceError::NotFoundError) => { - log::debug!("Not found when sending a message"); + tracing::debug!("Not found when sending a message"); return Err(MessageSenderError::NotFound { uuid: recipient.uuid, }); }, Err(e) => { - log::debug!( + tracing::debug!( "Default error handler for ws.send_messages: {}", e ); @@ -571,6 +591,10 @@ where } /// Upload contact details to the CDN and send a sync message + #[tracing::instrument( + skip(self, unidentified_access, contacts), + fields(unidentified_access = unidentified_access.is_some()), + )] pub async fn send_contact_details( &mut self, recipient: &ServiceAddress, @@ -608,6 +632,11 @@ where } // Equivalent with `getEncryptedMessages` + #[tracing::instrument( + level = "trace", + skip(self, unidentified_access, content), + fields(unidentified_access = unidentified_access.is_some()), + )] async fn create_encrypted_messages( &mut self, recipient: &ServiceAddress, @@ -633,7 +662,7 @@ where } for device_id in devices { - debug!("sending message to device {}", device_id); + trace!("sending message to device {}", device_id); messages.push( self.create_encrypted_message( recipient, @@ -651,6 +680,11 @@ where /// Equivalent to `getEncryptedMessage` /// /// When no session with the recipient exists, we need to create one. + #[tracing::instrument( + level = "trace", + skip(self, unidentified_access, content), + fields(unidentified_access = unidentified_access.is_some()), + )] async fn create_encrypted_message( &mut self, recipient: &ServiceAddress, @@ -661,7 +695,10 @@ where let recipient_protocol_address = recipient.to_protocol_address(device_id); - log::debug!("encrypting message for {}", recipient_protocol_address); + tracing::trace!( + "encrypting message for {}", + recipient_protocol_address + ); // establish a session with the recipient/device if necessary // no need to establish a session with ourselves (and our own current device) @@ -681,7 +718,7 @@ where .await { Ok(ok) => { - log::trace!("Get prekeys OK"); + tracing::trace!("Get prekeys OK"); ok }, Err(ServiceError::NotFoundError) => { @@ -722,6 +759,7 @@ where let message = self .cipher .encrypt(&recipient_protocol_address, unidentified_access, content) + .instrument(tracing::trace_span!("encrypting message")) .await?; Ok(message) diff --git a/libsignal-service/src/websocket.rs b/libsignal-service/src/websocket.rs index 63aa1a17c..04729bbec 100644 --- a/libsignal-service/src/websocket.rs +++ b/libsignal-service/src/websocket.rs @@ -95,7 +95,29 @@ impl SignalWebSocketProcess { frame: Bytes, ) -> Result<(), ServiceError> { let msg = WebSocketMessage::decode(frame)?; - log::trace!("decoded {:?}", msg); + if let Some(request) = &msg.request { + tracing::trace!( + "decoded WebSocketMessage request {{ r#type: {:?}, verb: {:?}, path: {:?}, body: {} bytes, headers: {:?}, id: {:?} }}", + msg.r#type(), + request.verb, + request.path, + request.body.as_ref().map(|x| x.len()).unwrap_or(0), + request.headers, + request.id, + ); + } else if let Some(response) = &msg.response { + tracing::trace!( + "decoded WebSocketMessage response {{ r#type: {:?}, status: {:?}, message: {:?}, body: {} bytes, headers: {:?}, id: {:?} }}", + msg.r#type(), + response.status, + response.message, + response.body.as_ref().map(|x| x.len()).unwrap_or(0), + response.headers, + response.id, + ); + } else { + tracing::debug!("decoded {msg:?}"); + } use web_socket_message::Type; match (msg.r#type(), msg.request, msg.response) { @@ -104,7 +126,7 @@ impl SignalWebSocketProcess { }), (Type::Request, Some(request), _) => { let (sink, recv) = oneshot::channel(); - log::trace!("sending request with body"); + tracing::trace!("sending request with body"); self.request_sink.send((request, sink)).await.map_err( |_| ServiceError::WsError { reason: "request handler failed".into(), @@ -124,7 +146,7 @@ impl SignalWebSocketProcess { self.outgoing_request_map.remove(&id) { if let Err(e) = responder.send(Ok(response)) { - log::warn!( + tracing::warn!( "Could not deliver response for id {}: {:?}", id, e @@ -134,7 +156,7 @@ impl SignalWebSocketProcess { self.outgoing_keep_alive_set.take(&id) { if response.status() != 200 { - log::warn!( + tracing::warn!( "Response code for keep-alive is not 200: {:?}", response ); @@ -143,7 +165,7 @@ impl SignalWebSocketProcess { }); } } else { - log::warn!( + tracing::warn!( "Response for non existing request: {:?}", response ); @@ -184,7 +206,14 @@ impl SignalWebSocketProcess { .filter(|x| !self.outgoing_request_map.contains_key(x)) .unwrap_or_else(|| self.next_request_id()), ); - log::trace!("sending request {:?}", request); + tracing::trace!( + "sending WebSocketRequestMessage {{ verb: {:?}, path: {:?}, body (bytes): {:?}, headers: {:?}, id: {:?} }}", + request.verb, + request.path, + request.body.as_ref().map(|x| x.len()), + request.headers, + request.id, + ); self.outgoing_request_map.insert(request.id.unwrap(), responder); let msg = WebSocketMessage { @@ -210,7 +239,7 @@ impl SignalWebSocketProcess { Some(WebSocketStreamItem::KeepAliveRequest) => { // XXX: would be nicer if we could drop this request into the request // queue above. - log::debug!("Sending keep alive upon request"); + tracing::debug!("Sending keep alive upon request"); let request = WebSocketRequestMessage { id: Some(self.next_request_id()), path: Some(self.keep_alive_path.clone()), @@ -236,7 +265,7 @@ impl SignalWebSocketProcess { response = self.outgoing_responses.next() => { match response { Some(Ok(response)) => { - log::trace!("sending response {:?}", response); + tracing::trace!("sending response {:?}", response); let msg = WebSocketMessage { r#type: Some(web_socket_message::Type::Response.into()), @@ -247,7 +276,7 @@ impl SignalWebSocketProcess { self.ws.send_message(buffer.into()).await?; } Some(Err(e)) => { - log::error!("could not generate response to a Signal request; responder was canceled: {}. Continuing.", e); + tracing::error!("could not generate response to a Signal request; responder was canceled: {}. Continuing.", e); } None => { unreachable!("outgoing responses should never fuse") @@ -292,7 +321,7 @@ impl SignalWebSocket { let process = process.run().map(|x| match x { Ok(()) => (), Err(e) => { - log::error!("SignalWebSocket: {}", e); + tracing::error!("SignalWebSocket: {}", e); }, }); @@ -383,7 +412,7 @@ impl SignalWebSocket { { let response = self.request(r).await?; if response.status() != 200 { - log::debug!( + tracing::debug!( "request_json with non-200 status code. message: {}", response.message() ); @@ -408,7 +437,7 @@ impl SignalWebSocket { 409 /* CONFLICT */ => { let mismatched_devices: MismatchedDevices = json(response.body()).map_err(|e| { - log::error!( + tracing::error!( "Failed to decode HTTP 409 response: {}", e ); @@ -423,7 +452,7 @@ impl SignalWebSocket { 410 /* GONE */ => { let stale_devices = json(response.body()).map_err(|e| { - log::error!( + tracing::error!( "Failed to decode HTTP 410 response: {}", e ); @@ -435,7 +464,7 @@ impl SignalWebSocket { }, 428 /* PRECONDITION_REQUIRED */ => { let proof_required = json(response.body()).map_err(|e| { - log::error!("Failed to decode HTTP 428 response: {}", e); + tracing::error!("Failed to decode HTTP 428 response: {}", e); ServiceError::UnhandledResponseCode { http_code: 428, }