Skip to content

Commit

Permalink
Tracing instrumentation on Hyper
Browse files Browse the repository at this point in the history
  • Loading branch information
rubdos committed Jan 10, 2024
1 parent 5f88d45 commit faa0728
Showing 1 changed file with 17 additions and 2 deletions.
19 changes: 17 additions & 2 deletions libsignal-service-hyper/src/push_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use libsignal_service::{
};
use serde::{Deserialize, Serialize};
use tokio_rustls::rustls;
use tracing_futures::Instrument;

use crate::websocket::TungsteniteWebSocket;

Expand Down Expand Up @@ -82,6 +83,7 @@ impl HyperPushService {
tls_config
}

#[tracing::instrument(skip(self, path, body), fields(path = %path.as_ref()))]
async fn request(
&self,
method: Method,
Expand All @@ -92,7 +94,6 @@ impl HyperPushService {
body: Option<RequestBody>,
) -> Result<Response<Body>, ServiceError> {
let url = self.cfg.base_url(endpoint).join(path.as_ref())?;
tracing::debug!("HTTP request {} {}", method, url);
let mut builder = Request::builder()
.method(method)
.uri(url.as_str())
Expand Down Expand Up @@ -212,6 +213,7 @@ impl HyperPushService {
}
}

#[tracing::instrument(skip(response), fields(status = %response.status()))]
async fn json<T>(response: &mut Response<Body>) -> Result<T, ServiceError>
where
for<'de> T: Deserialize<'de>,
Expand All @@ -235,6 +237,7 @@ impl HyperPushService {
})
}

#[tracing::instrument(skip(response), fields(status = %response.status()))]
async fn protobuf<M>(
response: &mut Response<Body>,
) -> Result<M, ServiceError>
Expand All @@ -253,6 +256,7 @@ impl HyperPushService {
M::decode(body).map_err(ServiceError::ProtobufDecodeError)
}

#[tracing::instrument(skip(response), fields(status = %response.status()))]
async fn text(
response: &mut Response<Body>,
) -> Result<String, ServiceError> {
Expand Down Expand Up @@ -280,6 +284,7 @@ impl PushService for HyperPushService {
// This is in principle known at compile time, but long to write out.
type ByteStream = Box<dyn futures::io::AsyncRead + Unpin>;

#[tracing::instrument(skip(self))]
async fn get_json<T>(
&mut self,
service: Endpoint,
Expand All @@ -304,6 +309,7 @@ impl PushService for HyperPushService {
Self::json(&mut response).await
}

#[tracing::instrument(skip(self))]
async fn delete_json<T>(
&mut self,
service: Endpoint,
Expand All @@ -327,6 +333,7 @@ impl PushService for HyperPushService {
Self::json(&mut response).await
}

#[tracing::instrument(skip(self, value))]
async fn put_json<D, S>(
&mut self,
service: Endpoint,
Expand Down Expand Up @@ -362,6 +369,7 @@ impl PushService for HyperPushService {
Self::json(&mut response).await
}

#[tracing::instrument(skip(self, value))]
async fn patch_json<D, S>(
&mut self,
service: Endpoint,
Expand Down Expand Up @@ -397,6 +405,7 @@ impl PushService for HyperPushService {
Self::json(&mut response).await
}

#[tracing::instrument(skip(self, value))]
async fn post_json<D, S>(
&mut self,
service: Endpoint,
Expand Down Expand Up @@ -432,6 +441,7 @@ impl PushService for HyperPushService {
Self::json(&mut response).await
}

#[tracing::instrument(skip(self))]
async fn get_protobuf<T>(
&mut self,
service: Endpoint,
Expand All @@ -456,6 +466,7 @@ impl PushService for HyperPushService {
Self::protobuf(&mut response).await
}

#[tracing::instrument(skip(self, value))]
async fn put_protobuf<D, S>(
&mut self,
service: Endpoint,
Expand Down Expand Up @@ -486,6 +497,7 @@ impl PushService for HyperPushService {
Self::protobuf(&mut response).await
}

#[tracing::instrument(skip(self))]
async fn get_from_cdn(
&mut self,
cdn_id: u32,
Expand All @@ -510,6 +522,7 @@ impl PushService for HyperPushService {
))
}

#[tracing::instrument(skip(self, value, file), fields(file = file.as_ref().map(|_| "")))]
async fn post_to_cdn0<'s, C: std::io::Read + Send + 's>(
&mut self,
path: &str,
Expand Down Expand Up @@ -586,17 +599,19 @@ impl PushService for HyperPushService {
additional_headers: &[(&str, &str)],
credentials: Option<ServiceCredentials>,
) -> Result<SignalWebSocket, ServiceError> {
let span = tracing::debug_span!("websocket");
let (ws, stream) = TungsteniteWebSocket::with_tls_config(
Self::tls_config(&self.cfg),
self.cfg.base_url(Endpoint::Service),
path,
additional_headers,
credentials.as_ref(),
)
.instrument(span.clone())
.await?;
let (ws, task) =
SignalWebSocket::from_socket(ws, stream, keepalive_path.to_owned());
tokio::task::spawn(task);
tokio::task::spawn(task.instrument(span));
Ok(ws)
}
}
Expand Down

0 comments on commit faa0728

Please sign in to comment.