From a29c56c2b794ac1c672e3c411b2a3a28a381cd17 Mon Sep 17 00:00:00 2001 From: AH-dark Date: Fri, 8 Nov 2024 00:29:49 +0800 Subject: [PATCH] feat: added tracing layer & propagation for volo-grpc --- Cargo.lock | 68 +++++++++++++++++++ Cargo.toml | 9 ++- volo-grpc/Cargo.toml | 5 ++ volo-grpc/src/layer/mod.rs | 2 + volo-grpc/src/layer/tracing.rs | 116 +++++++++++++++++++++++++++++++++ 5 files changed, 197 insertions(+), 3 deletions(-) create mode 100644 volo-grpc/src/layer/tracing.rs diff --git a/Cargo.lock b/Cargo.lock index 18817b20..bc44c346 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -950,6 +950,12 @@ dependencies = [ "url", ] +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "governor" version = "0.7.0" @@ -2096,6 +2102,38 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "570074cc999d1a58184080966e5bd3bf3a9a4af650c3b05047c2621e7405cd17" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror 1.0.68", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2c627d9f4c9cdc1f21a29ee4bfbd6028fcb8bcf2a857b43f3abdf72c9c862f3" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "once_cell", + "opentelemetry", + "percent-encoding", + "rand", + "thiserror 1.0.68", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -3749,6 +3787,24 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc58af5d3f6c5811462cabb3289aec0093f7338e367e5a33d28c0433b3c7360b" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + [[package]] name = "tracing-subscriber" version = "0.3.18" @@ -4052,6 +4108,7 @@ dependencies = [ "matchit 0.8.4", "metainfo", "motore", + "opentelemetry", "percent-encoding", "pilota", "pin-project", @@ -4065,6 +4122,7 @@ dependencies = [ "tonic-web", "tower 0.5.1", "tracing", + "tracing-opentelemetry", "tracing-subscriber", "volo", ] @@ -4254,6 +4312,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "0.25.4" diff --git a/Cargo.toml b/Cargo.toml index 30a7abbe..57120f4b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,9 +43,9 @@ async-stream = "0.3" base64 = "0.22" bytes = "1" chrono = { version = "0.4", default-features = false, features = [ - "std", - "alloc", - "clock", + "std", + "alloc", + "clock", ] } clap = "4" colored = "2" @@ -141,6 +141,9 @@ tokio-native-tls = "0.3" tungstenite = "0.24" tokio-tungstenite = "0.24" +opentelemetry = { version = "0.26.0", features = ["trace"] } +tracing-opentelemetry = "0.27.0" + [profile.release] opt-level = 3 debug = false diff --git a/volo-grpc/Cargo.toml b/volo-grpc/Cargo.toml index 0092b5e4..bc75e4d8 100644 --- a/volo-grpc/Cargo.toml +++ b/volo-grpc/Cargo.toml @@ -67,6 +67,9 @@ tower = { workspace = true, features = [ ] } tracing.workspace = true +opentelemetry = { workspace = true, optional = true } +tracing-opentelemetry = { workspace = true, optional = true } + tokio-rustls = { workspace = true, optional = true } tokio-native-tls = { workspace = true, optional = true } @@ -82,3 +85,5 @@ native-tls = ["__tls", "dep:tokio-native-tls", "volo/native-tls"] native-tls-vendored = ["native-tls", "volo/native-tls-vendored"] grpc-web = ["dep:tonic", "dep:tonic-web"] + +opentelemetry = ["dep:opentelemetry", "dep:tracing-opentelemetry"] diff --git a/volo-grpc/src/layer/mod.rs b/volo-grpc/src/layer/mod.rs index 39908cf0..22485e1b 100644 --- a/volo-grpc/src/layer/mod.rs +++ b/volo-grpc/src/layer/mod.rs @@ -3,4 +3,6 @@ pub mod grpc_timeout; #[cfg(feature = "grpc-web")] pub mod grpc_web; pub mod loadbalance; +#[cfg(feature = "opentelemetry")] +pub mod tracing; pub mod user_agent; diff --git a/volo-grpc/src/layer/tracing.rs b/volo-grpc/src/layer/tracing.rs new file mode 100644 index 00000000..b4a72257 --- /dev/null +++ b/volo-grpc/src/layer/tracing.rs @@ -0,0 +1,116 @@ +use opentelemetry::trace::{SpanContext, SpanId, TraceContextExt, TraceId}; +use std::fmt::Debug; +use std::str::FromStr; +use tracing::Instrument; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +use volo::context::Context; +use volo::{Layer, Service}; + +use crate::metadata::{KeyRef, MetadataKey, MetadataValue}; +use crate::{Request, Response}; + +impl opentelemetry::propagation::Extractor for crate::metadata::MetadataMap { + fn get(&self, key: &str) -> Option<&str> { + self.get(key).and_then(|v| v.to_str().ok()) + } + + fn keys(&self) -> Vec<&str> { + self.keys() + .filter_map(|k| match k { + KeyRef::Ascii(k) => Some(k.as_str()), + KeyRef::Binary(_) => None, + }) + .collect::>() + } +} + +impl opentelemetry::propagation::Injector for crate::metadata::MetadataMap { + fn set(&mut self, key: &str, value: String) { + self.insert( + MetadataKey::from_str(key).unwrap(), + MetadataValue::from_str(value.as_str()).unwrap(), + ); + } +} + +#[derive(Clone, Copy, Debug, Default)] +pub struct ClientTracingLayer; + +impl Layer for ClientTracingLayer { + type Service = ClientTracingService; + + fn layer(self, inner: S) -> Self::Service { + ClientTracingService(inner) + } +} + +#[derive(Clone, Debug)] +pub struct ClientTracingService(S); + +#[volo::service] +impl Service> for ClientTracingService +where + S: Service> + Send + 'static + Sync, + Cx: Context + 'static + Send, + ReqBody: Send + 'static, +{ + async fn call(&self, cx: &mut Cx, mut req: Request) -> Result { + let span = tracing::span!( + tracing::Level::INFO, + "rpc_call", + method = cx.rpc_info().method().as_str() + ); + + let otel_cx = span.context(); + opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.inject_context(&otel_cx, req.metadata_mut()); + }); + + self.0.call(cx, req).await + } +} + +pub struct ServerTracingLayer; + +impl Layer for ServerTracingLayer { + type Service = ServerTracingService; + + fn layer(self, inner: S) -> Self::Service { + ServerTracingService(inner) + } +} + +#[derive(Clone, Debug)] +pub struct ServerTracingService(S); + +#[volo::service] +impl Service> for ServerTracingService +where + S: Service, Response = Response, Error = ResErr> + + Send + + 'static + + Sync, + Cx: Context + 'static + Send, + ReqBody: Send + 'static, + ResBody: Send + 'static, + ResErr: Debug + Send + 'static, +{ + async fn call(&self, cx: &mut Cx, req: Request) -> Result { + let method = cx.rpc_info().method().as_str(); + let span = tracing::span!( + tracing::Level::INFO, + "rpc_call", + rpc.method = method, + otel.name = format!("RPC {}", method), + otel.kind = "server", + ); + + opentelemetry::global::get_text_map_propagator(|propagator| { + let cx = propagator.extract(req.metadata()); + span.set_parent(cx); + }); + + self.0.call(cx, req).instrument(span).await + } +}