Skip to content

Commit

Permalink
instrument futures instead of threads
Browse files Browse the repository at this point in the history
  • Loading branch information
rubdos committed Jan 8, 2024
1 parent 21ae233 commit ce4ba97
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 56 deletions.
1 change: 1 addition & 0 deletions libsignal-service-actix/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mpart-async = "0.6"
serde_json = "1.0"
futures = "0.3"
tracing = "0.1"
tracing-futures = "0.2"
bytes = "1"
rustls = "0.21"
rustls-pemfile = "0.3"
Expand Down
9 changes: 6 additions & 3 deletions libsignal-service-actix/src/push_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use libsignal_service::{
websocket::SignalWebSocket,
};
use serde::{Deserialize, Serialize};
use tracing_futures::Instrument;

use crate::websocket::AwcWebSocket;

Expand Down Expand Up @@ -85,6 +86,7 @@ impl AwcPushService {
})
}

#[tracing::instrument(name = "extracting error", skip(response))]
async fn from_response<S>(
response: &mut ClientResponse<S>,
) -> Result<(), ServiceError>
Expand Down Expand Up @@ -201,8 +203,7 @@ impl PushService for AwcPushService {
},
})?;

let _span =
tracing::debug_span!("processing response", ?response).entered();
let _span = tracing::debug_span!("processing response", ?response);

Self::from_response(&mut response).await?;

Expand Down Expand Up @@ -646,20 +647,22 @@ impl PushService for AwcPushService {
additional_headers: &[(&str, &str)],
credentials: Option<ServiceCredentials>,
) -> Result<SignalWebSocket, ServiceError> {
let span = tracing::debug_span!("websocket");
let (ws, stream) = AwcWebSocket::with_client(
&mut self.client,
self.cfg.base_url(Endpoint::Service),
path,
additional_headers,
credentials.as_ref(),
)
.instrument(span.clone())
.await?;
let (ws, task) = SignalWebSocket::from_socket(
ws,
stream,
keep_alive_path.to_owned(),
);
actix_rt::spawn(task);
actix_rt::spawn(task.instrument(span));
Ok(ws)
}
}
Expand Down
1 change: 1 addition & 0 deletions libsignal-service-hyper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ async-trait = "0.1"
bytes = "1.0"
futures = "0.3"
tracing = "0.1"
tracing-futures = "0.2"
mpart-async = "0.6"
serde = "1.0"
serde_json = "1.0"
Expand Down
1 change: 1 addition & 0 deletions libsignal-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ url = { version = "2.1", features = ["serde"] }
uuid = { version = "1", features = ["serde"] }

tracing = "0.1"
tracing-futures = "0.2"

[build-dependencies]
prost-build = "0.10"
Expand Down
72 changes: 33 additions & 39 deletions libsignal-service/src/account_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use libsignal_protocol::{
use prost::Message;
use serde::{Deserialize, Serialize};
use sha2::Sha256;
use tracing_futures::Instrument;
use zkgroup::profiles::ProfileKey;

use crate::pre_keys::KyberPreKeyEntity;
Expand Down Expand Up @@ -102,29 +103,26 @@ impl<Service: PushService> AccountManager<Service> {
pq_pre_keys_offset_id: u32,
use_last_resort_key: bool,
) -> Result<(u32, u32, u32), ServiceError> {
let prekey_status = {
let _span = tracing::span!(
let prekey_status = match self
.service
.get_pre_key_status(ServiceIdType::AccountIdentity)
.instrument(tracing::span!(
tracing::Level::DEBUG,
"Fetching pre key status"
)
.entered();
match self
.service
.get_pre_key_status(ServiceIdType::AccountIdentity)
.await
{
Ok(status) => status,
Err(ServiceError::Unauthorized) => {
tracing::info!("Got Unauthorized when fetching pre-key status. Assuming first installment.");
// Additionally, the second PUT request will fail if this really comes down to an
// authorization failure.
crate::push_service::PreKeyStatus {
count: 0,
pq_count: 0,
}
},
Err(e) => return Err(e),
}
))
.await
{
Ok(status) => status,
Err(ServiceError::Unauthorized) => {
tracing::info!("Got Unauthorized when fetching pre-key status. Assuming first installment.");
// Additionally, the second PUT request will fail if this really comes down to an
// authorization failure.
crate::push_service::PreKeyStatus {
count: 0,
pq_count: 0,
}
},
Err(e) => return Err(e),
};
tracing::trace!("Remaining pre-keys on server: {:?}", prekey_status);

Expand All @@ -140,11 +138,11 @@ impl<Service: PushService> AccountManager<Service> {
}

let pre_key_state = {
let _span =
tracing::span!(tracing::Level::DEBUG, "Generating pre keys")
.entered();
let span =
tracing::span!(tracing::Level::DEBUG, "Generating pre keys");

let identity_key_pair =
protocol_store.get_identity_key_pair().await?;
protocol_store.get_identity_key_pair().instrument(tracing::trace_span!(parent: &span, "get identity key pair")).await?;

let mut pre_key_entities = vec![];
let mut pq_pre_key_entities = vec![];
Expand All @@ -159,7 +157,7 @@ impl<Service: PushService> AccountManager<Service> {
let pre_key_record = PreKeyRecord::new(pre_key_id, &key_pair);
protocol_store
.save_pre_key(pre_key_id, &pre_key_record)
.await?;
.instrument(tracing::trace_span!(parent: &span, "save pre key", ?pre_key_id)).await?;
// TODO: Shouldn't this also remove the previous pre-keys from storage?
// I think we might want to update the storage, and then sync the storage to the
// server.
Expand All @@ -180,7 +178,7 @@ impl<Service: PushService> AccountManager<Service> {
)?;
protocol_store
.save_kyber_pre_key(pre_key_id, &pre_key_record)
.await?;
.instrument(tracing::trace_span!(parent: &span, "save kyber pre key", ?pre_key_id)).await?;
// TODO: Shouldn't this also remove the previous pre-keys from storage?
// I think we might want to update the storage, and then sync the storage to the
// server.
Expand Down Expand Up @@ -214,7 +212,7 @@ impl<Service: PushService> AccountManager<Service> {
next_signed_pre_key_id.into(),
&signed_prekey_record,
)
.await?;
.instrument(tracing::trace_span!(parent: &span, "save signed pre key", signed_pre_key_id = ?next_signed_pre_key_id)).await?;

PreKeyState {
pre_keys: pre_key_entities,
Expand All @@ -240,17 +238,13 @@ impl<Service: PushService> AccountManager<Service> {
}
};

{
let _span =
tracing::span!(tracing::Level::DEBUG, "Uploading pre keys")
.entered();
self.service
.register_pre_keys(
ServiceIdType::AccountIdentity,
pre_key_state,
)
.await?;
}
self.service
.register_pre_keys(ServiceIdType::AccountIdentity, pre_key_state)
.instrument(tracing::span!(
tracing::Level::DEBUG,
"Uploading pre keys"
))
.await?;

Ok((
pre_keys_offset_id + PRE_KEY_BATCH_SIZE,
Expand Down
26 changes: 12 additions & 14 deletions libsignal-service/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use libsignal_protocol::{
};
use rand::{CryptoRng, Rng};
use tracing::{info, trace};
use tracing_futures::Instrument;
use uuid::Uuid;

use crate::{
Expand Down Expand Up @@ -195,19 +196,16 @@ where
});

// Request upload attributes
let attrs = {
let _span =
tracing::trace_span!("requesting upload attributes").entered();
self.identified_ws
.get_attachment_v2_upload_attributes()
.await?
};
let (id, digest) = {
let _span = tracing::trace_span!("Uploading attachment").entered();
self.service
.upload_attachment(&attrs, &mut std::io::Cursor::new(&contents))
.await?
};
let attrs = self
.identified_ws
.get_attachment_v2_upload_attributes()
.instrument(tracing::trace_span!("requesting upload attributes"))
.await?;
let (id, digest) = self
.service
.upload_attachment(&attrs, &mut std::io::Cursor::new(&contents))
.instrument(tracing::trace_span!("Uploading attachment"))
.await?;

Ok(AttachmentPointer {
content_type: Some(spec.content_type),
Expand Down Expand Up @@ -758,10 +756,10 @@ where
}
}

let _span = tracing::trace_span!("encrypting message").entered();
let message = self
.cipher
.encrypt(&recipient_protocol_address, unidentified_access, content)
.instrument(tracing::trace_span!("encrypting message"))
.await?;

Ok(message)
Expand Down

0 comments on commit ce4ba97

Please sign in to comment.