From 583d1249a6dffb13423ded719ef2b58a56cf4190 Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Tue, 8 Oct 2024 16:52:56 +0300 Subject: [PATCH 01/17] feat: tracing with OpenTelemetry OTLP Replace the "movement_timing" tracing target and the logging layer it targeted with an optionally installed OpenTelemetry OTLP exporter. The name of the tracing target matched to send OpenTelemetry events is "movement_telemetry". --- Cargo.lock | 298 +++++++++++++++--- Cargo.toml | 5 +- .../src/bin/e2e/followers_consistent.rs | 8 +- networks/suzuka/suzuka-full-node/src/main.rs | 8 +- .../src/tasks/execute_settle.rs | 2 +- .../src/tasks/transaction_ingress.rs | 4 +- protocol-units/da/m1/light-node/src/main.rs | 9 +- .../da/m1/light-node/src/v1/passthrough.rs | 2 +- .../da/m1/light-node/src/v1/sequencer.rs | 22 +- .../opt-executor/src/executor/mod.rs | 2 +- .../opt-executor/src/transaction_pipe.rs | 6 +- util/tracing/Cargo.toml | 8 +- util/tracing/src/lib.rs | 97 +++--- 13 files changed, 326 insertions(+), 145 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 49ad9cbc1..be94a3370 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -402,7 +402,7 @@ dependencies = [ "serde_json", "tokio", "tokio-stream", - "tower", + "tower 0.4.13", "tracing", ] @@ -447,7 +447,7 @@ dependencies = [ "serde_json", "tokio", "tokio-stream", - "tower", + "tower 0.4.13", "tracing", "url", ] @@ -646,7 +646,7 @@ dependencies = [ "serde_json", "thiserror", "tokio", - "tower", + "tower 0.4.13", "tracing", "url", ] @@ -660,7 +660,7 @@ dependencies = [ "alloy-transport", "reqwest 0.12.8", "serde_json", - "tower", + "tower 0.4.13", "tracing", "url", ] @@ -3532,7 +3532,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.3.4", "bitflags 1.3.2", "bytes 1.7.2", "futures-util", @@ -3548,7 +3548,34 @@ dependencies = [ "rustversion", "serde", "sync_wrapper 0.1.2", - "tower", + "tower 0.4.13", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum" +version = "0.7.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "504e3947307ac8326a5437504c517c4b56716c9d98fac0028c2acc7ca47d70ae" +dependencies = [ + "async-trait", + "axum-core 0.4.5", + "bytes 1.7.2", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper 1.0.1", + "tower 0.5.1", "tower-layer", "tower-service", ] @@ -3570,6 +3597,26 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes 1.7.2", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper 1.0.1", + "tower-layer", + "tower-service", +] + [[package]] name = "az" version = "1.2.1" @@ -5194,7 +5241,7 @@ dependencies = [ "bitflags 1.3.2", "crossterm_winapi", "libc", - "mio 0.8.11", + "mio", "parking_lot", "signal-hook", "signal-hook-mio", @@ -5210,7 +5257,7 @@ dependencies = [ "bitflags 1.3.2", "crossterm_winapi", "libc", - "mio 0.8.11", + "mio", "parking_lot", "signal-hook", "signal-hook-mio", @@ -6723,7 +6770,7 @@ dependencies = [ "tokio", "tokio-retry", "tonic 0.9.2", - "tower", + "tower 0.4.13", "tracing", ] @@ -7272,6 +7319,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "httparse", + "httpdate", "itoa", "pin-project-lite", "smallvec", @@ -7325,6 +7373,19 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-timeout" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" +dependencies = [ + "hyper 1.4.1", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -7867,7 +7928,7 @@ dependencies = [ "serde_json", "thiserror", "tokio", - "tower", + "tower 0.4.13", "tracing", "url", ] @@ -8909,18 +8970,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "mio" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" -dependencies = [ - "hermit-abi 0.3.9", - "libc", - "wasi 0.11.0+wasi-snapshot-preview1", - "windows-sys 0.52.0", -] - [[package]] name = "mirai-annotations" version = "1.12.0" @@ -9627,7 +9676,11 @@ dependencies = [ name = "movement-tracing" version = "0.0.2" dependencies = [ - "tracing-appender", + "anyhow", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry_sdk", + "tracing-opentelemetry", "tracing-subscriber 0.3.18", ] @@ -10122,6 +10175,71 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "803801d3d3b71cd026851a53f974ea03df3d179cb758b260136a6c9e22e196af" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "596b1719b3cab83addb20bcbffdf21575279d9436d9ccccfe651a3bf0ab5ab06" +dependencies = [ + "async-trait", + "futures-core", + "http 1.1.0", + "opentelemetry", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost 0.13.3", + "thiserror", + "tokio", + "tonic 0.12.3", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c43620e8f93359eb7e627a3b16ee92d8585774986f24f2ab010817426c5ce61" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost 0.13.3", + "tonic 0.12.3", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0da0d6b47a3dbc6e9c9e36a0520e25cf943e046843818faaa3f87365a548c82" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "once_cell", + "opentelemetry", + "percent-encoding", + "rand 0.8.5", + "serde_json", + "thiserror", + "tokio", + "tokio-stream", +] + [[package]] name = "ordered-float" version = "2.10.1" @@ -11236,6 +11354,16 @@ dependencies = [ "prost-derive 0.12.6", ] +[[package]] +name = "prost" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b0487d90e047de87f984913713b85c601c05609aad5b0df4b4573fbf69aa13f" +dependencies = [ + "bytes 1.7.2", + "prost-derive 0.13.3", +] + [[package]] name = "prost-build" version = "0.12.6" @@ -11283,6 +11411,19 @@ dependencies = [ "syn 2.0.79", ] +[[package]] +name = "prost-derive" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" +dependencies = [ + "anyhow", + "itertools 0.13.0", + "proc-macro2", + "quote", + "syn 2.0.79", +] + [[package]] name = "prost-types" version = "0.11.9" @@ -12776,7 +12917,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34db1a06d485c9142248b7a054f034b349b212551f3dfd19c94d45a754a217cd" dependencies = [ "libc", - "mio 0.8.11", + "mio", "signal-hook", ] @@ -13792,21 +13933,22 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.40.0" +version = "1.38.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998" +checksum = "eb2caba9f80616f438e09748d5acda951967e1ea58508ef53d9c6402485a46df" dependencies = [ "backtrace", "bytes 1.7.2", "libc", - "mio 1.0.2", + "mio", + "num_cpus", "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2 0.5.7", "tokio-macros", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.48.0", ] [[package]] @@ -13821,9 +13963,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.4.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" +checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" dependencies = [ "proc-macro2", "quote", @@ -14050,7 +14192,7 @@ checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" dependencies = [ "async-stream", "async-trait", - "axum", + "axum 0.6.20", "base64 0.21.7", "bytes 1.7.2", "flate2", @@ -14060,7 +14202,7 @@ dependencies = [ "http 0.2.12", "http-body 0.4.6", "hyper 0.14.30", - "hyper-timeout", + "hyper-timeout 0.4.1", "percent-encoding", "pin-project 1.1.5", "prost 0.11.9", @@ -14068,7 +14210,7 @@ dependencies = [ "tokio", "tokio-rustls 0.24.1", "tokio-stream", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", "tracing", @@ -14083,7 +14225,7 @@ checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" dependencies = [ "async-stream", "async-trait", - "axum", + "axum 0.6.20", "base64 0.21.7", "bytes 1.7.2", "flate2", @@ -14091,7 +14233,7 @@ dependencies = [ "http 0.2.12", "http-body 0.4.6", "hyper 0.14.30", - "hyper-timeout", + "hyper-timeout 0.4.1", "percent-encoding", "pin-project 1.1.5", "prost 0.12.6", @@ -14101,13 +14243,43 @@ dependencies = [ "tokio", "tokio-rustls 0.25.0", "tokio-stream", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", "tracing", "zstd 0.12.4", ] +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-stream", + "async-trait", + "axum 0.7.7", + "base64 0.22.1", + "bytes 1.7.2", + "h2 0.4.6", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.4.1", + "hyper-timeout 0.5.1", + "hyper-util", + "percent-encoding", + "pin-project 1.1.5", + "prost 0.13.3", + "socket2 0.5.7", + "tokio", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tonic-build" version = "0.11.0" @@ -14174,6 +14346,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper 0.1.2", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-http" version = "0.4.4" @@ -14216,18 +14402,6 @@ dependencies = [ "tracing-core", ] -[[package]] -name = "tracing-appender" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf" -dependencies = [ - "crossbeam-channel", - "thiserror", - "time", - "tracing-subscriber 0.3.18", -] - [[package]] name = "tracing-attributes" version = "0.1.27" @@ -14260,6 +14434,24 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eabc56d23707ad55ba2a0750fc24767125d5a0f51993ba41ad2c441cc7b8dea" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber 0.3.18", + "web-time", +] + [[package]] name = "tracing-serde" version = "0.1.3" @@ -14949,6 +15141,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "0.23.1" diff --git a/Cargo.toml b/Cargo.toml index 04db262d7..6454c4095 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -246,6 +246,9 @@ move-vm-ext = { path = "types/move-vm-ext" } num-derive = "0.4.2" num-traits = "0.2.14" once_cell = "1.8.0" +opentelemetry = { version = "0.25" } +opentelemetry_sdk = { version = "0.25", features = ["rt-tokio"] } +opentelemetry-otlp = { version = "0.25" } parking_lot = { version = "0.12.1" } poem = { version = "=1.3.59", features = ["anyhow", "rustls"] } poem-openapi = { version = "=2.0.11", features = ["swagger-ui", "url"] } @@ -283,7 +286,7 @@ tonic-reflection = "0.11" tonic-web = "0.11" ### To try (experimental) std support, add `features = [ "std" ]` to risc0-zkvm tracing = "0.1.40" -tracing-appender = "0.2" +tracing-opentelemetry = { version = "0.26" } tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-test = "0.2.5" trie-db = "0.28.0" diff --git a/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs b/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs index 3b9353e05..b16d9c170 100644 --- a/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs +++ b/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs @@ -12,8 +12,6 @@ use tokio::sync::RwLock; use tracing::info; use url::Url; -const TIMING_LOG_ENV: &str = "SUZUKA_TIMING_LOG"; - pub fn get_suzuka_config( dot_movement: &DotMovement, ) -> Result { @@ -247,10 +245,8 @@ pub async fn basic_coin_transfers( #[tokio::main] async fn main() -> Result<(), anyhow::Error> { - let tracing_config = movement_tracing::Config { - timing_log_path: std::env::var_os(TIMING_LOG_ENV).map(Into::into), - }; - let _guard = movement_tracing::init_tracing_subscriber(tracing_config); + let tracing_config = movement_tracing::Config::from_env()?; + movement_tracing::init_tracing_subscriber(tracing_config)?; // get the lead dot movement from the environment let dot_movement = DotMovement::try_from_env()?; diff --git a/networks/suzuka/suzuka-full-node/src/main.rs b/networks/suzuka/suzuka-full-node/src/main.rs index eea463a88..29f71faa9 100644 --- a/networks/suzuka/suzuka-full-node/src/main.rs +++ b/networks/suzuka/suzuka-full-node/src/main.rs @@ -1,15 +1,11 @@ use suzuka_full_node::manager::Manager; -use std::env; use std::process::ExitCode; -const TIMING_LOG_ENV: &str = "SUZUKA_TIMING_LOG"; - #[tokio::main] async fn main() -> Result { - let tracing_config = - movement_tracing::Config { timing_log_path: env::var_os(TIMING_LOG_ENV).map(Into::into) }; - let _guard = movement_tracing::init_tracing_subscriber(tracing_config); + let tracing_config = movement_tracing::Config::from_env()?; + movement_tracing::init_tracing_subscriber(tracing_config)?; // get the config file let dot_movement = dot_movement::DotMovement::try_from_env()?; diff --git a/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs b/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs index e8a00da48..c278f6641 100644 --- a/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs +++ b/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs @@ -140,7 +140,7 @@ where // get the transactions let transactions_count = block.transactions().len(); - let span = info_span!(target: "movement_timing", "execute_block", id = %block_id); + let span = info_span!(target: "movement_telemetry", "execute_block", id = %block_id); let commitment = self.execute_block_with_retries(block, block_timestamp).instrument(span).await?; diff --git a/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs b/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs index a02108f53..b47d20a6f 100644 --- a/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs +++ b/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs @@ -65,7 +65,7 @@ impl Task { Ok(transaction) => match transaction { Some(transaction) => { info!( - target : "movement_timing", + target : "movement_telemetry", batch_id = %batch_id, tx_hash = %transaction.committed_hash(), sender = %transaction.sender(), @@ -93,7 +93,7 @@ impl Task { if transactions.len() > 0 { info!( - target: "movement_timing", + target: "movement_telemetry", batch_id = %batch_id, transaction_count = transactions.len(), "built_batch_write" diff --git a/protocol-units/da/m1/light-node/src/main.rs b/protocol-units/da/m1/light-node/src/main.rs index e9a8cb7f5..f72cf65b1 100644 --- a/protocol-units/da/m1/light-node/src/main.rs +++ b/protocol-units/da/m1/light-node/src/main.rs @@ -1,14 +1,9 @@ use m1_da_light_node::v1::{LightNodeV1, Manager}; -use std::env; - -const TIMING_LOG_ENV: &str = "M1_DA_LIGHT_NODE_TIMING_LOG"; - #[tokio::main] async fn main() -> Result<(), Box> { - let tracing_config = - movement_tracing::Config { timing_log_path: env::var_os(TIMING_LOG_ENV).map(Into::into) }; - let _guard = movement_tracing::init_tracing_subscriber(tracing_config); + let tracing_config = movement_tracing::Config::from_env()?; + movement_tracing::init_tracing_subscriber(tracing_config)?; let dot_movement = dot_movement::DotMovement::try_from_env()?; let config_path = dot_movement.get_config_json_path(); diff --git a/protocol-units/da/m1/light-node/src/v1/passthrough.rs b/protocol-units/da/m1/light-node/src/v1/passthrough.rs index c8d1cc3e9..1db727988 100644 --- a/protocol-units/da/m1/light-node/src/v1/passthrough.rs +++ b/protocol-units/da/m1/light-node/src/v1/passthrough.rs @@ -146,7 +146,7 @@ impl LightNodeV1 { Ok(verified_blobs) } - #[tracing::instrument(target = "movement_timing", level = "debug")] + #[tracing::instrument(target = "movement_telemetry", level = "debug")] async fn get_blobs_at_height(&self, height: u64) -> Result, anyhow::Error> { let celestia_blobs = self.get_celestia_blobs_at_height(height).await?; let mut blobs = Vec::new(); diff --git a/protocol-units/da/m1/light-node/src/v1/sequencer.rs b/protocol-units/da/m1/light-node/src/v1/sequencer.rs index 55f03bd7c..294b6a9e6 100644 --- a/protocol-units/da/m1/light-node/src/v1/sequencer.rs +++ b/protocol-units/da/m1/light-node/src/v1/sequencer.rs @@ -79,17 +79,17 @@ impl LightNodeV1 { // this has an internal timeout based on its building time // so in the worst case scenario we will roughly double the internal timeout let uid = LOGGING_UID.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - debug!(target: "movement_timing", uid = %uid, "waiting_for_next_block",); + debug!(target: "movement_telemetry", uid = %uid, "waiting_for_next_block",); let block = memseq.wait_for_next_block().await?; match block { Some(block) => { - info!(target: "movement_timing", block_id = %block.id(), uid = %uid, transaction_count = block.transactions().len(), "received_block"); + info!(target: "movement_telemetry", block_id = %block.id(), uid = %uid, transaction_count = block.transactions().len(), "received_block"); sender.send(block).await?; Ok(()) } None => { // no transactions to include - debug!(target: "movement_timing", uid = %uid, "no_transactions_to_include"); + debug!(target: "movement_telemetry", uid = %uid, "no_transactions_to_include"); Ok(()) } } @@ -97,7 +97,7 @@ impl LightNodeV1 { async fn submit_blocks(&self, blocks: &Vec) -> Result<(), anyhow::Error> { for block in blocks { - info!(target: "movement_timing", block_id = %block.block.id(), "inner_submitting_block"); + info!(target: "movement_telemetry", block_id = %block.block.id(), "inner_submitting_block"); } // get references to celestia blobs in the wrapped blocks let block_blobs = blocks @@ -108,14 +108,14 @@ impl LightNodeV1 { // use deref on the wrapped block to get the blob self.pass_through.submit_celestia_blobs(&block_blobs).await?; for block in blocks { - info!(target: "movement_timing", block_id = %block.block.id(), "inner_submitted_block"); + info!(target: "movement_telemetry", block_id = %block.block.id(), "inner_submitted_block"); } Ok(()) } pub async fn submit_with_heuristic(&self, blocks: Vec) -> Result<(), anyhow::Error> { for block in &blocks { - info!(target: "movement_timing", block_id = %block.id(), "submitting_block"); + info!(target: "movement_telemetry", block_id = %block.id(), "submitting_block"); } // wrap the blocks in a struct that can be split and compressed @@ -168,7 +168,7 @@ impl LightNodeV1 { info!("block group results: {:?}", block_group_results); for block_group_result in &block_group_results { - info!(target: "movement_timing", block_group_result = ?block_group_result, "block_group_result"); + info!(target: "movement_telemetry", block_group_result = ?block_group_result, "block_group_result"); } Ok(()) @@ -204,7 +204,7 @@ impl LightNodeV1 { Err(_) => { // The operation timed out debug!( - target: "movement_timing", + target: "movement_telemetry", batch_size = blocks.len(), "timed_out_building_block" ); @@ -213,7 +213,7 @@ impl LightNodeV1 { } } - info!(target: "movement_timing", block_count = blocks.len(), "read_blocks"); + info!(target: "movement_telemetry", block_count = blocks.len(), "read_blocks"); Ok(blocks) } @@ -232,11 +232,11 @@ impl LightNodeV1 { // submit the blobs, resizing as needed for block_id in &ids { - info!(target: "movement_timing", %block_id, "submitting_block_batch"); + info!(target: "movement_telemetry", %block_id, "submitting_block_batch"); } self.submit_with_heuristic(blocks).await?; for block_id in &ids { - info!(target: "movement_timing", %block_id, "submitted_block_batch"); + info!(target: "movement_telemetry", %block_id, "submitted_block_batch"); } Ok(()) diff --git a/protocol-units/execution/opt-executor/src/executor/mod.rs b/protocol-units/execution/opt-executor/src/executor/mod.rs index 05dae3bfe..ab14a262e 100644 --- a/protocol-units/execution/opt-executor/src/executor/mod.rs +++ b/protocol-units/execution/opt-executor/src/executor/mod.rs @@ -46,7 +46,7 @@ impl Executor { self.transactions_in_flight .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| { info!( - target: "movement_timing", + target: "movement_telemetry", count, current, "decrementing_transactions_in_flight", diff --git a/protocol-units/execution/opt-executor/src/transaction_pipe.rs b/protocol-units/execution/opt-executor/src/transaction_pipe.rs index 163157697..03abf4473 100644 --- a/protocol-units/execution/opt-executor/src/transaction_pipe.rs +++ b/protocol-units/execution/opt-executor/src/transaction_pipe.rs @@ -89,7 +89,7 @@ impl TransactionPipe { match request { MempoolClientRequest::SubmitTransaction(transaction, callback) => { let span = info_span!( - target: "movement_timing", + target: "movement_telemetry", "submit_transaction", tx_hash = %transaction.committed_hash(), sender = %transaction.sender(), @@ -124,13 +124,13 @@ impl TransactionPipe { // For now, we are going to consider a transaction in flight until it exits the mempool and is sent to the DA as is indicated by WriteBatch. let in_flight = self.transactions_in_flight.load(std::sync::atomic::Ordering::Relaxed); info!( - target: "movement_timing", + target: "movement_telemetry", in_flight = %in_flight, "transactions_in_flight" ); if in_flight > self.in_flight_limit { info!( - target: "movement_timing", + target: "movement_telemetry", "shedding_load" ); let status = MempoolStatus::new(MempoolStatusCode::MempoolIsFull); diff --git a/util/tracing/Cargo.toml b/util/tracing/Cargo.toml index eaf030410..dfb008c69 100644 --- a/util/tracing/Cargo.toml +++ b/util/tracing/Cargo.toml @@ -11,8 +11,12 @@ publish.workspace = true rust-version.workspace = true [dependencies] -tracing-appender = { workspace = true } -tracing-subscriber = { workspace = true, features = ["json"] } +anyhow = { workspace = true } +tracing-subscriber = { workspace = true } +opentelemetry = { workspace = true } +opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] } +opentelemetry-otlp = { workspace = true } +tracing-opentelemetry = { workspace = true } #console-subscriber = { workspace = true } [lints] diff --git a/util/tracing/src/lib.rs b/util/tracing/src/lib.rs index f018f7e20..b920012d8 100644 --- a/util/tracing/src/lib.rs +++ b/util/tracing/src/lib.rs @@ -1,78 +1,63 @@ -use tracing_appender::non_blocking::WorkerGuard as AppenderGuard; +use anyhow::anyhow; +use opentelemetry::trace::TracerProvider as _; +use opentelemetry_otlp::WithExportConfig as _; +use opentelemetry_sdk::runtime; use tracing_subscriber::filter::{self, EnvFilter, LevelFilter}; -use tracing_subscriber::fmt::format::FmtSpan; use tracing_subscriber::prelude::*; -use std::{env, fs::File, path::PathBuf}; +use std::env; -const TIMING_ENV: &str = "MOVEMENT_TIMING"; - -/// The default path name for the timing log file. -/// If the path not specified in [`Config`] and the `MOVEMENT_TIMING` -/// environment variable is set, the log file with this name will be created. -pub const DEFAULT_TIMING_LOG_FILE: &str = "movement-timing.log"; - -/// A guard for background log appender(s) returned by `init_tracing_subscriber`. -pub struct WorkerGuard { - _drop_me: Option, -} +const OTLP_TRACING_ENV: &str = "MOVEMENT_OTLP"; /// Options for the tracing subscriber. #[derive(Default)] pub struct Config { - /// Custom name for the timing log file. - pub timing_log_path: Option, + /// URL of the collector endpoint using the OTLP gRPC protocol. + pub otlp_grpc_url: Option, +} + +impl Config { + /// Get the tracing configuration from well-known environment variables. + pub fn from_env() -> Result { + let otlp_grpc_url = match env::var(OTLP_TRACING_ENV) { + Ok(url) => Some(url), + Err(env::VarError::NotPresent) => None, + Err(env::VarError::NotUnicode(s)) => { + return Err(anyhow!( + "value of environment variable {OTLP_TRACING_ENV} is not valid UTF-8: {}", + s.to_string_lossy() + )); + } + }; + Ok(Self { otlp_grpc_url }) + } } /// Sets up the tracing subscribers for a Movement process. This should be /// called at the beginning of a process' `main` function. -/// Returns a guard object that should be dropped at the end of the process' -/// `main`` function scope. -/// -/// This function may output encounted errors to the standard error stream, -/// as this is the only facility -pub fn init_tracing_subscriber(config: Config) -> WorkerGuard { +pub fn init_tracing_subscriber(config: Config) -> Result<(), anyhow::Error> { // TODO: compose console_subscriber as a layer let env_filter = EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) .from_env_lossy(); let log_layer = tracing_subscriber::fmt::layer().with_filter(env_filter); - let (timing_layer, timing_writer_guard) = match env::var(TIMING_ENV) { - Err(env::VarError::NotPresent) => { - // Disable timing - (None, None) - } - Ok(timing_directives) => { - let env_filter = EnvFilter::new(timing_directives); - let timing_log_path = config - .timing_log_path - .as_deref() - .unwrap_or_else(|| DEFAULT_TIMING_LOG_FILE.as_ref()); - match File::create(timing_log_path) { - Ok(file) => { - let (writer, guard) = tracing_appender::non_blocking(file); - let layer = tracing_subscriber::fmt::layer() - .with_writer(writer) - .json() - .with_span_events(FmtSpan::CLOSE) - .with_filter(env_filter) - .with_filter(filter::filter_fn(|meta| meta.target() == "movement_timing")); - (Some(layer), Some(guard)) - } - Err(e) => { - eprintln!("can't create `{}`: {}", timing_log_path.display(), e); - (None, None) - } - } - } - Err(e) => { - eprintln!("invalid {TIMING_ENV}: {e}"); - (None, None) - } + let telemetry_layer = if let Some(endpoint) = config.otlp_grpc_url { + let exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint(endpoint); + let tracer = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter(exporter) + .install_batch(runtime::Tokio)? + .tracer("movement_tracing"); + let layer = tracing_opentelemetry::layer() + .with_tracer(tracer) + .with_filter(filter::filter_fn(|meta| meta.target() == "movement_telemetry")); + Some(layer) + } else { + None }; - tracing_subscriber::registry().with(log_layer).with(timing_layer).init(); + tracing_subscriber::registry().with(log_layer).with(telemetry_layer).init(); - WorkerGuard { _drop_me: timing_writer_guard } + Ok(()) } From 57845ba9047f389ac528e776b4725930e00b44c4 Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Tue, 8 Oct 2024 17:01:22 +0300 Subject: [PATCH 02/17] feat(process-compose):telemetry overlay Add the telemetry overlay enabling OTLP telemetry export in suzuka-full-node and m1-da-light-node.. In the telemetry overlay for suzuka-full-node, add an OTLP collector start job running a docker container. --- .../process-compose.telemetry.yml | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 process-compose/suzuka-full-node/process-compose.telemetry.yml diff --git a/process-compose/suzuka-full-node/process-compose.telemetry.yml b/process-compose/suzuka-full-node/process-compose.telemetry.yml new file mode 100644 index 000000000..1bcefdc69 --- /dev/null +++ b/process-compose/suzuka-full-node/process-compose.telemetry.yml @@ -0,0 +1,24 @@ +version: "3" + +environment: + +processes: + otlp-collector: + is_daemon: true + command: | + docker run -d --rm --name suzuka-otlp-collector -p16686:16686 -p4317:4317 -e COLLECTOR_OTLP_ENABLED=true jaegertracing/all-in-one:latest + shutdown: + command: | + docker stop suzuka-otlp-collector + suzuka-full-node: + depends_on: + otlp-collector: + condition: process_started + env: + MOVEMENT_OTLP: http://localhost:4317 + m1-da-light-node: + depends_on: + otlp-collector: + condition: process_started + env: + MOVEMENT_OTLP: http://localhost:4317 From bece93e055a62d4db2425ee489d4120a4e651d43 Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Thu, 10 Oct 2024 13:59:32 +0300 Subject: [PATCH 03/17] feat(tracing)!: break out telemetry Provide telemetry API as separate from tracing rather than a globally installed layer. Installing an OpenTelemetry layer into the global tracing subscriber raises nasty reentrancy issues because the OTLP exporter stack also uses tracing under the hood. --- Cargo.lock | 27 ++++--- Cargo.toml | 9 ++- .../src/bin/e2e/followers_consistent.rs | 3 +- networks/suzuka/suzuka-full-node/src/main.rs | 9 ++- protocol-units/da/m1/light-node/src/main.rs | 9 ++- util/tracing/Cargo.toml | 1 + util/tracing/src/lib.rs | 65 +--------------- util/tracing/src/telemetry.rs | 77 +++++++++++++++++++ util/tracing/src/tracing.rs | 11 +++ 9 files changed, 129 insertions(+), 82 deletions(-) create mode 100644 util/tracing/src/telemetry.rs create mode 100644 util/tracing/src/tracing.rs diff --git a/Cargo.lock b/Cargo.lock index be94a3370..d7af69e4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9679,6 +9679,7 @@ dependencies = [ "anyhow", "opentelemetry", "opentelemetry-otlp", + "opentelemetry-semantic-conventions", "opentelemetry_sdk", "tracing-opentelemetry", "tracing-subscriber 0.3.18", @@ -10177,9 +10178,9 @@ dependencies = [ [[package]] name = "opentelemetry" -version = "0.25.0" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "803801d3d3b71cd026851a53f974ea03df3d179cb758b260136a6c9e22e196af" +checksum = "570074cc999d1a58184080966e5bd3bf3a9a4af650c3b05047c2621e7405cd17" dependencies = [ "futures-core", "futures-sink", @@ -10191,9 +10192,9 @@ dependencies = [ [[package]] name = "opentelemetry-otlp" -version = "0.25.0" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "596b1719b3cab83addb20bcbffdf21575279d9436d9ccccfe651a3bf0ab5ab06" +checksum = "29e1f9c8b032d4f635c730c0efcf731d5e2530ea13fa8bef7939ddc8420696bd" dependencies = [ "async-trait", "futures-core", @@ -10209,9 +10210,9 @@ dependencies = [ [[package]] name = "opentelemetry-proto" -version = "0.25.0" +version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c43620e8f93359eb7e627a3b16ee92d8585774986f24f2ab010817426c5ce61" +checksum = "c9d3968ce3aefdcca5c27e3c4ea4391b37547726a70893aab52d3de95d5f8b34" dependencies = [ "opentelemetry", "opentelemetry_sdk", @@ -10219,11 +10220,17 @@ dependencies = [ "tonic 0.12.3", ] +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db945c1eaea8ac6a9677185357480d215bb6999faa9f691d0c4d4d641eab7a09" + [[package]] name = "opentelemetry_sdk" -version = "0.25.0" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0da0d6b47a3dbc6e9c9e36a0520e25cf943e046843818faaa3f87365a548c82" +checksum = "d2c627d9f4c9cdc1f21a29ee4bfbd6028fcb8bcf2a857b43f3abdf72c9c862f3" dependencies = [ "async-trait", "futures-channel", @@ -14436,9 +14443,9 @@ dependencies = [ [[package]] name = "tracing-opentelemetry" -version = "0.26.0" +version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5eabc56d23707ad55ba2a0750fc24767125d5a0f51993ba41ad2c441cc7b8dea" +checksum = "dc58af5d3f6c5811462cabb3289aec0093f7338e367e5a33d28c0433b3c7360b" dependencies = [ "js-sys", "once_cell", diff --git a/Cargo.toml b/Cargo.toml index 6454c4095..4fb71d505 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -246,9 +246,10 @@ move-vm-ext = { path = "types/move-vm-ext" } num-derive = "0.4.2" num-traits = "0.2.14" once_cell = "1.8.0" -opentelemetry = { version = "0.25" } -opentelemetry_sdk = { version = "0.25", features = ["rt-tokio"] } -opentelemetry-otlp = { version = "0.25" } +opentelemetry = { version = "0.26" } +opentelemetry-otlp = { version = "0.26" } +opentelemetry_sdk = { version = "0.26", features = ["rt-tokio"] } +opentelemetry-semantic-conventions = { version = "0.26" } parking_lot = { version = "0.12.1" } poem = { version = "=1.3.59", features = ["anyhow", "rustls"] } poem-openapi = { version = "=2.0.11", features = ["swagger-ui", "url"] } @@ -286,7 +287,7 @@ tonic-reflection = "0.11" tonic-web = "0.11" ### To try (experimental) std support, add `features = [ "std" ]` to risc0-zkvm tracing = "0.1.40" -tracing-opentelemetry = { version = "0.26" } +tracing-opentelemetry = { version = "0.27" } tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-test = "0.2.5" trie-db = "0.28.0" diff --git a/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs b/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs index b16d9c170..b532120ab 100644 --- a/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs +++ b/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs @@ -245,8 +245,7 @@ pub async fn basic_coin_transfers( #[tokio::main] async fn main() -> Result<(), anyhow::Error> { - let tracing_config = movement_tracing::Config::from_env()?; - movement_tracing::init_tracing_subscriber(tracing_config)?; + movement_tracing::init_tracing_subscriber(); // get the lead dot movement from the environment let dot_movement = DotMovement::try_from_env()?; diff --git a/networks/suzuka/suzuka-full-node/src/main.rs b/networks/suzuka/suzuka-full-node/src/main.rs index 29f71faa9..f7d639da2 100644 --- a/networks/suzuka/suzuka-full-node/src/main.rs +++ b/networks/suzuka/suzuka-full-node/src/main.rs @@ -4,8 +4,13 @@ use std::process::ExitCode; #[tokio::main] async fn main() -> Result { - let tracing_config = movement_tracing::Config::from_env()?; - movement_tracing::init_tracing_subscriber(tracing_config)?; + movement_tracing::init_tracing_subscriber(); + let tracing_config = movement_tracing::telemetry::Config::from_env()?; + movement_tracing::telemetry::init_tracer_provider( + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_VERSION"), + tracing_config, + )?; // get the config file let dot_movement = dot_movement::DotMovement::try_from_env()?; diff --git a/protocol-units/da/m1/light-node/src/main.rs b/protocol-units/da/m1/light-node/src/main.rs index f72cf65b1..7a57a4081 100644 --- a/protocol-units/da/m1/light-node/src/main.rs +++ b/protocol-units/da/m1/light-node/src/main.rs @@ -2,8 +2,13 @@ use m1_da_light_node::v1::{LightNodeV1, Manager}; #[tokio::main] async fn main() -> Result<(), Box> { - let tracing_config = movement_tracing::Config::from_env()?; - movement_tracing::init_tracing_subscriber(tracing_config)?; + movement_tracing::init_tracing_subscriber(); + let tracing_config = movement_tracing::telemetry::Config::from_env()?; + movement_tracing::telemetry::init_tracer_provider( + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_VERSION"), + tracing_config, + )?; let dot_movement = dot_movement::DotMovement::try_from_env()?; let config_path = dot_movement.get_config_json_path(); diff --git a/util/tracing/Cargo.toml b/util/tracing/Cargo.toml index dfb008c69..42b38fb7e 100644 --- a/util/tracing/Cargo.toml +++ b/util/tracing/Cargo.toml @@ -16,6 +16,7 @@ tracing-subscriber = { workspace = true } opentelemetry = { workspace = true } opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] } opentelemetry-otlp = { workspace = true } +opentelemetry-semantic-conventions = { workspace = true } tracing-opentelemetry = { workspace = true } #console-subscriber = { workspace = true } diff --git a/util/tracing/src/lib.rs b/util/tracing/src/lib.rs index b920012d8..2e2ea0613 100644 --- a/util/tracing/src/lib.rs +++ b/util/tracing/src/lib.rs @@ -1,63 +1,4 @@ -use anyhow::anyhow; -use opentelemetry::trace::TracerProvider as _; -use opentelemetry_otlp::WithExportConfig as _; -use opentelemetry_sdk::runtime; -use tracing_subscriber::filter::{self, EnvFilter, LevelFilter}; -use tracing_subscriber::prelude::*; +pub mod telemetry; +mod tracing; -use std::env; - -const OTLP_TRACING_ENV: &str = "MOVEMENT_OTLP"; - -/// Options for the tracing subscriber. -#[derive(Default)] -pub struct Config { - /// URL of the collector endpoint using the OTLP gRPC protocol. - pub otlp_grpc_url: Option, -} - -impl Config { - /// Get the tracing configuration from well-known environment variables. - pub fn from_env() -> Result { - let otlp_grpc_url = match env::var(OTLP_TRACING_ENV) { - Ok(url) => Some(url), - Err(env::VarError::NotPresent) => None, - Err(env::VarError::NotUnicode(s)) => { - return Err(anyhow!( - "value of environment variable {OTLP_TRACING_ENV} is not valid UTF-8: {}", - s.to_string_lossy() - )); - } - }; - Ok(Self { otlp_grpc_url }) - } -} - -/// Sets up the tracing subscribers for a Movement process. This should be -/// called at the beginning of a process' `main` function. -pub fn init_tracing_subscriber(config: Config) -> Result<(), anyhow::Error> { - // TODO: compose console_subscriber as a layer - let env_filter = EnvFilter::builder() - .with_default_directive(LevelFilter::INFO.into()) - .from_env_lossy(); - let log_layer = tracing_subscriber::fmt::layer().with_filter(env_filter); - - let telemetry_layer = if let Some(endpoint) = config.otlp_grpc_url { - let exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint(endpoint); - let tracer = opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter(exporter) - .install_batch(runtime::Tokio)? - .tracer("movement_tracing"); - let layer = tracing_opentelemetry::layer() - .with_tracer(tracer) - .with_filter(filter::filter_fn(|meta| meta.target() == "movement_telemetry")); - Some(layer) - } else { - None - }; - - tracing_subscriber::registry().with(log_layer).with(telemetry_layer).init(); - - Ok(()) -} +pub use tracing::init_tracing_subscriber; diff --git a/util/tracing/src/telemetry.rs b/util/tracing/src/telemetry.rs new file mode 100644 index 000000000..921fd3382 --- /dev/null +++ b/util/tracing/src/telemetry.rs @@ -0,0 +1,77 @@ +//! OpenTelemetry support for Movement services. +//! +//! Telemetry is currently being exported to components as an API distinct +//! from the tracing framework, due to [issues][tracing-opentelemetry#159] +//! with integrating OpenTelemetry as a tracing subscriber. +//! +//! [tracing-opentelemetry#159]: https://github.com/tokio-rs/tracing-opentelemetry/issues/159 + +use anyhow::anyhow; +use opentelemetry::global::{self, BoxedTracer}; +use opentelemetry::trace::noop::NoopTracerProvider; +use opentelemetry::KeyValue; +use opentelemetry_otlp::WithExportConfig as _; +use opentelemetry_sdk::{runtime, trace::Config as TraceConfig, Resource}; +use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION}; + +use std::env; + +const OTLP_TRACING_ENV: &str = "MOVEMENT_OTLP"; + +/// Options for telemetry configuration. +#[derive(Debug, Default)] +pub struct Config { + /// URL of the collector endpoint using the OTLP gRPC protocol. + pub otlp_grpc_url: Option, +} + +impl Config { + /// Get the tracing configuration from well-known environment variables. + pub fn from_env() -> Result { + let otlp_grpc_url = match env::var(OTLP_TRACING_ENV) { + Ok(url) => Some(url), + Err(env::VarError::NotPresent) => None, + Err(env::VarError::NotUnicode(s)) => { + return Err(anyhow!( + "value of environment variable {OTLP_TRACING_ENV} is not valid UTF-8: {}", + s.to_string_lossy() + )); + } + }; + Ok(Self { otlp_grpc_url }) + } +} + +/// Global initialization of the OpenTelemetry tracer provider. +/// +/// This function should be called at the start of the program before any +/// threads are able to use OpenTelemetry tracers. The function will panic +/// if not called within a Tokio runtime. +pub fn init_tracer_provider( + service_name: &'static str, + service_version: &'static str, + config: Config, +) -> Result<(), anyhow::Error> { + if let Some(endpoint) = config.otlp_grpc_url { + dbg!(&endpoint); + let exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint(endpoint); + let provider = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter(exporter) + .with_trace_config(TraceConfig::default().with_resource(Resource::new([ + KeyValue::new(SERVICE_NAME, service_name), + KeyValue::new(SERVICE_VERSION, service_version), + ]))) + .install_batch(runtime::Tokio)?; + dbg!(&provider); + global::set_tracer_provider(provider); + } else { + global::set_tracer_provider(NoopTracerProvider::new()); + } + Ok(()) +} + +/// Get the tracer configured for the process. +pub fn tracer() -> BoxedTracer { + global::tracer("movement") +} diff --git a/util/tracing/src/tracing.rs b/util/tracing/src/tracing.rs new file mode 100644 index 000000000..0c6514e30 --- /dev/null +++ b/util/tracing/src/tracing.rs @@ -0,0 +1,11 @@ +use tracing_subscriber::filter::{EnvFilter, LevelFilter}; + +/// Sets up the tracing subscribers for a Movement process. This should be +/// called at the beginning of a process' `main` function. +pub fn init_tracing_subscriber() { + // TODO: compose console_subscriber as a layer + let env_filter = EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(); + tracing_subscriber::fmt().with_env_filter(env_filter).init(); +} From 607ba0b84ba02025795ce2c1d74787989ec18f26 Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Thu, 10 Oct 2024 17:00:17 +0300 Subject: [PATCH 04/17] fix(process-compose): correctly specify enviroment --- .../suzuka-full-node/process-compose.telemetry.yml | 8 ++++---- process-compose/suzuka-full-node/process-compose.yml | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/process-compose/suzuka-full-node/process-compose.telemetry.yml b/process-compose/suzuka-full-node/process-compose.telemetry.yml index 1bcefdc69..88180844b 100644 --- a/process-compose/suzuka-full-node/process-compose.telemetry.yml +++ b/process-compose/suzuka-full-node/process-compose.telemetry.yml @@ -14,11 +14,11 @@ processes: depends_on: otlp-collector: condition: process_started - env: - MOVEMENT_OTLP: http://localhost:4317 + environment: + - MOVEMENT_OTLP=http://localhost:4317 m1-da-light-node: depends_on: otlp-collector: condition: process_started - env: - MOVEMENT_OTLP: http://localhost:4317 + environment: + - MOVEMENT_OTLP=http://localhost:4317 diff --git a/process-compose/suzuka-full-node/process-compose.yml b/process-compose/suzuka-full-node/process-compose.yml index cfb8a1365..52381e599 100644 --- a/process-compose/suzuka-full-node/process-compose.yml +++ b/process-compose/suzuka-full-node/process-compose.yml @@ -44,8 +44,8 @@ processes: suzuka-full-node: command: | suzuka-full-node - env: - RUST_LOG: info,aptos-indexer=debug + environment: + - RUST_LOG=info depends_on: m1-da-light-node: condition: process_healthy From 56899ca050c61e2edc84378402633f3a7a5babb0 Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Thu, 10 Oct 2024 15:07:53 +0300 Subject: [PATCH 05/17] feat: produce some telemetry spans and events Install an OpenTelemetryLayer configured with the OTLP exporter. The tracing spans and events to export are selected by target "movement_telemetry". --- Cargo.lock | 2 +- .../src/bin/e2e/followers_consistent.rs | 7 +- networks/suzuka/suzuka-full-node/src/main.rs | 9 +- .../src/tasks/execute_settle.rs | 4 +- .../src/tasks/transaction_ingress.rs | 22 +++-- protocol-units/da/m1/light-node/src/main.rs | 9 +- .../execution/opt-executor/Cargo.toml | 5 +- util/tracing/Cargo.toml | 3 +- util/tracing/src/config.rs | 29 +++++++ util/tracing/src/lib.rs | 12 ++- util/tracing/src/telemetry.rs | 83 +++++++++---------- util/tracing/src/tracing.rs | 20 ++++- 12 files changed, 132 insertions(+), 73 deletions(-) create mode 100644 util/tracing/src/config.rs diff --git a/Cargo.lock b/Cargo.lock index d7af69e4d..2cf8a1a69 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8688,7 +8688,6 @@ dependencies = [ "tempfile", "thiserror", "tokio", - "tonic 0.11.0", "tracing", "tracing-test", ] @@ -9681,6 +9680,7 @@ dependencies = [ "opentelemetry-otlp", "opentelemetry-semantic-conventions", "opentelemetry_sdk", + "tracing", "tracing-opentelemetry", "tracing-subscriber 0.3.18", ] diff --git a/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs b/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs index b532120ab..bad4faf94 100644 --- a/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs +++ b/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs @@ -245,7 +245,12 @@ pub async fn basic_coin_transfers( #[tokio::main] async fn main() -> Result<(), anyhow::Error> { - movement_tracing::init_tracing_subscriber(); + let tracing_config = movement_tracing::Config::from_env()?; + let _guard = movement_tracing::init_tracing_subscriber( + env!("CARGO_BIN_NAME"), + env!("CARGO_PKG_VERSION"), + &tracing_config, + )?; // get the lead dot movement from the environment let dot_movement = DotMovement::try_from_env()?; diff --git a/networks/suzuka/suzuka-full-node/src/main.rs b/networks/suzuka/suzuka-full-node/src/main.rs index f7d639da2..def29bb20 100644 --- a/networks/suzuka/suzuka-full-node/src/main.rs +++ b/networks/suzuka/suzuka-full-node/src/main.rs @@ -4,12 +4,11 @@ use std::process::ExitCode; #[tokio::main] async fn main() -> Result { - movement_tracing::init_tracing_subscriber(); - let tracing_config = movement_tracing::telemetry::Config::from_env()?; - movement_tracing::telemetry::init_tracer_provider( - env!("CARGO_PKG_NAME"), + let tracing_config = movement_tracing::Config::from_env()?; + let _guard = movement_tracing::init_tracing_subscriber( + env!("CARGO_BIN_NAME"), env!("CARGO_PKG_VERSION"), - tracing_config, + &tracing_config, )?; // get the config file diff --git a/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs b/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs index c278f6641..e47297b1f 100644 --- a/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs +++ b/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs @@ -138,7 +138,7 @@ where }) .await??; - // get the transactions + // get the transactions count before the block is consumed let transactions_count = block.transactions().len(); let span = info_span!(target: "movement_telemetry", "execute_block", id = %block_id); let commitment = @@ -152,7 +152,7 @@ where self.da_db.set_synced_height(da_height - 1).await?; // set the block as executed - self.da_db.add_executed_block(block_id.to_string()).await?; + self.da_db.add_executed_block(block_id.clone()).await?; // todo: this needs defaults if self.settlement_enabled() { diff --git a/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs b/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs index b47d20a6f..140511995 100644 --- a/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs +++ b/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs @@ -5,10 +5,10 @@ use m1_da_light_node_util::config::Config as LightNodeConfig; use maptos_dof_execution::SignedTransaction; use tokio::sync::mpsc; -use tracing::{info, warn}; +use tracing::{info, info_span, warn, Instrument}; use std::ops::ControlFlow; -use std::sync::atomic::AtomicU64; +use std::sync::atomic::{self, AtomicU64}; use std::time::{Duration, Instant}; const LOGGING_UID: AtomicU64 = AtomicU64::new(0); @@ -29,7 +29,16 @@ impl Task { } pub async fn run(mut self) -> anyhow::Result<()> { - while let ControlFlow::Continue(()) = self.spawn_write_next_transaction_batch().await? {} + loop { + let batch_id = LOGGING_UID.fetch_add(1, atomic::Ordering::Relaxed); + let span = + info_span!(target: "movement_telemetry", "write_batch", batch_id = %batch_id); + if let ControlFlow::Break(()) = + self.spawn_write_next_transaction_batch().instrument(span).await? + { + break; + } + } Ok(()) } @@ -45,7 +54,6 @@ impl Task { let mut transactions = Vec::new(); - let batch_id = LOGGING_UID.fetch_add(1, std::sync::atomic::Ordering::SeqCst); loop { let remaining = match half_building_time.checked_sub(start.elapsed().as_millis() as u64) { @@ -65,12 +73,11 @@ impl Task { Ok(transaction) => match transaction { Some(transaction) => { info!( - target : "movement_telemetry", - batch_id = %batch_id, + target: "movement_telemetry", tx_hash = %transaction.committed_hash(), sender = %transaction.sender(), sequence_number = transaction.sequence_number(), - "received transaction", + "received_transaction", ); let serialized_aptos_transaction = serde_json::to_vec(&transaction)?; let movement_transaction = movement_types::transaction::Transaction::new( @@ -94,7 +101,6 @@ impl Task { if transactions.len() > 0 { info!( target: "movement_telemetry", - batch_id = %batch_id, transaction_count = transactions.len(), "built_batch_write" ); diff --git a/protocol-units/da/m1/light-node/src/main.rs b/protocol-units/da/m1/light-node/src/main.rs index 7a57a4081..86af2fe67 100644 --- a/protocol-units/da/m1/light-node/src/main.rs +++ b/protocol-units/da/m1/light-node/src/main.rs @@ -2,12 +2,11 @@ use m1_da_light_node::v1::{LightNodeV1, Manager}; #[tokio::main] async fn main() -> Result<(), Box> { - movement_tracing::init_tracing_subscriber(); - let tracing_config = movement_tracing::telemetry::Config::from_env()?; - movement_tracing::telemetry::init_tracer_provider( - env!("CARGO_PKG_NAME"), + let tracing_config = movement_tracing::Config::from_env()?; + let _guard = movement_tracing::init_tracing_subscriber( + env!("CARGO_BIN_NAME"), env!("CARGO_PKG_VERSION"), - tracing_config, + &tracing_config, )?; let dot_movement = dot_movement::DotMovement::try_from_env()?; diff --git a/protocol-units/execution/opt-executor/Cargo.toml b/protocol-units/execution/opt-executor/Cargo.toml index e084c33a2..54c1bb0ab 100644 --- a/protocol-units/execution/opt-executor/Cargo.toml +++ b/protocol-units/execution/opt-executor/Cargo.toml @@ -59,15 +59,14 @@ aptos-mempool = { workspace = true } aptos-temppath = { workspace = true } aptos-faucet-core = { workspace = true } aptos-cached-packages = { workspace = true } -maptos-execution-util = { workspace = true } -movement-types = { workspace = true } aptos-indexer-grpc-fullnode = { workspace = true } aptos-indexer-grpc-table-info = { workspace = true } aptos-indexer = { workspace = true } aptos-protos = { workspace = true } aptos-logger = { workspace = true } -tonic = { workspace = true } +maptos-execution-util = { workspace = true } movement-rest = { workspace = true } +movement-types = { workspace = true } [dev-dependencies] dirs = { workspace = true } diff --git a/util/tracing/Cargo.toml b/util/tracing/Cargo.toml index 42b38fb7e..6c9fad7f8 100644 --- a/util/tracing/Cargo.toml +++ b/util/tracing/Cargo.toml @@ -12,12 +12,13 @@ rust-version.workspace = true [dependencies] anyhow = { workspace = true } -tracing-subscriber = { workspace = true } opentelemetry = { workspace = true } opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] } opentelemetry-otlp = { workspace = true } opentelemetry-semantic-conventions = { workspace = true } +tracing = { workspace = true } tracing-opentelemetry = { workspace = true } +tracing-subscriber = { workspace = true } #console-subscriber = { workspace = true } [lints] diff --git a/util/tracing/src/config.rs b/util/tracing/src/config.rs new file mode 100644 index 000000000..561bcb991 --- /dev/null +++ b/util/tracing/src/config.rs @@ -0,0 +1,29 @@ +use anyhow::anyhow; +use std::env; + +const OTLP_TRACING_ENV: &str = "MOVEMENT_OTLP"; + +/// Options for tracing configuration. +#[derive(Debug, Default)] +pub struct Config { + /// URL of the OpenTelemetry collector endpoint using the OTLP gRPC protocol. + /// If the value is `None`, telemetry is not exported. + pub otlp_grpc_url: Option, +} + +impl Config { + /// Get the tracing configuration from well-known environment variables. + pub fn from_env() -> Result { + let otlp_grpc_url = match env::var(OTLP_TRACING_ENV) { + Ok(url) => Some(url), + Err(env::VarError::NotPresent) => None, + Err(env::VarError::NotUnicode(s)) => { + return Err(anyhow!( + "value of environment variable {OTLP_TRACING_ENV} is not valid UTF-8: {}", + s.to_string_lossy() + )); + } + }; + Ok(Self { otlp_grpc_url }) + } +} diff --git a/util/tracing/src/lib.rs b/util/tracing/src/lib.rs index 2e2ea0613..91e84b91b 100644 --- a/util/tracing/src/lib.rs +++ b/util/tracing/src/lib.rs @@ -1,4 +1,14 @@ -pub mod telemetry; +//! Tracing setup for Movement services. +//! +//! Exporting of tracing data via [OpenTelemetry] is optionally supported +//! by setting "movement_telemetry" as the target in tracing spans and events. +//! +//! [OpenTelemetry]: https://opentelemetry.io/ + +mod config; +mod telemetry; mod tracing; +pub use config::Config; +pub use telemetry::ScopeGuard; pub use tracing::init_tracing_subscriber; diff --git a/util/tracing/src/telemetry.rs b/util/tracing/src/telemetry.rs index 921fd3382..a34e64a44 100644 --- a/util/tracing/src/telemetry.rs +++ b/util/tracing/src/telemetry.rs @@ -6,54 +6,53 @@ //! //! [tracing-opentelemetry#159]: https://github.com/tokio-rs/tracing-opentelemetry/issues/159 -use anyhow::anyhow; -use opentelemetry::global::{self, BoxedTracer}; -use opentelemetry::trace::noop::NoopTracerProvider; -use opentelemetry::KeyValue; +use crate::Config; + +use opentelemetry::{trace::TracerProvider as _, KeyValue}; use opentelemetry_otlp::WithExportConfig as _; -use opentelemetry_sdk::{runtime, trace::Config as TraceConfig, Resource}; +use opentelemetry_sdk::trace::{Config as TraceConfig, TracerProvider}; +use opentelemetry_sdk::{runtime, Resource}; use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION}; +use tracing::{error, Level, Subscriber}; +use tracing_opentelemetry::OpenTelemetryLayer; +use tracing_subscriber::filter; +use tracing_subscriber::prelude::*; +use tracing_subscriber::registry::LookupSpan; -use std::env; - -const OTLP_TRACING_ENV: &str = "MOVEMENT_OTLP"; - -/// Options for telemetry configuration. -#[derive(Debug, Default)] -pub struct Config { - /// URL of the collector endpoint using the OTLP gRPC protocol. - pub otlp_grpc_url: Option, -} +/// The scope guard object for the background tasks of the tracing subsystem. +/// +/// This object needs to be kept alive for the duration of the program. +#[must_use = "should be dropped at the end of the program scope"] +#[derive(Debug)] +pub struct ScopeGuard(Option); -impl Config { - /// Get the tracing configuration from well-known environment variables. - pub fn from_env() -> Result { - let otlp_grpc_url = match env::var(OTLP_TRACING_ENV) { - Ok(url) => Some(url), - Err(env::VarError::NotPresent) => None, - Err(env::VarError::NotUnicode(s)) => { - return Err(anyhow!( - "value of environment variable {OTLP_TRACING_ENV} is not valid UTF-8: {}", - s.to_string_lossy() - )); +impl Drop for ScopeGuard { + fn drop(&mut self) { + if let Some(tracer_provider) = &self.0 { + // Make sure all batched traces are exported. + if let Err(e) = tracer_provider.shutdown() { + error!("OpenTelemetry tracer provider shutdown failed: {e}"); } - }; - Ok(Self { otlp_grpc_url }) + } } } -/// Global initialization of the OpenTelemetry tracer provider. +/// Adds an optional OpenTelemetry tracing layer to the provided subscriber. /// /// This function should be called at the start of the program before any /// threads are able to use OpenTelemetry tracers. The function will panic /// if not called within a Tokio runtime. -pub fn init_tracer_provider( +pub(crate) fn init_tracing_layer( + subscriber: S, service_name: &'static str, service_version: &'static str, - config: Config, -) -> Result<(), anyhow::Error> { - if let Some(endpoint) = config.otlp_grpc_url { - dbg!(&endpoint); + config: &Config, +) -> Result<(ScopeGuard, impl Subscriber), anyhow::Error> +where + S: Subscriber, + for<'span> S: LookupSpan<'span>, +{ + let (tracer_provider, layer) = if let Some(endpoint) = &config.otlp_grpc_url { let exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint(endpoint); let provider = opentelemetry_otlp::new_pipeline() .tracing() @@ -63,15 +62,11 @@ pub fn init_tracer_provider( KeyValue::new(SERVICE_VERSION, service_version), ]))) .install_batch(runtime::Tokio)?; - dbg!(&provider); - global::set_tracer_provider(provider); + let layer = OpenTelemetryLayer::new(provider.tracer("movement")) + .with_filter(filter::Targets::new().with_target("movement_telemetry", Level::INFO)); + (Some(provider), Some(layer)) } else { - global::set_tracer_provider(NoopTracerProvider::new()); - } - Ok(()) -} - -/// Get the tracer configured for the process. -pub fn tracer() -> BoxedTracer { - global::tracer("movement") + (None, None) + }; + Ok((ScopeGuard(tracer_provider), subscriber.with(layer))) } diff --git a/util/tracing/src/tracing.rs b/util/tracing/src/tracing.rs index 0c6514e30..e9875d7eb 100644 --- a/util/tracing/src/tracing.rs +++ b/util/tracing/src/tracing.rs @@ -1,11 +1,27 @@ +use crate::telemetry::{self, ScopeGuard}; +use crate::Config; use tracing_subscriber::filter::{EnvFilter, LevelFilter}; +use tracing_subscriber::prelude::*; +use tracing_subscriber::{fmt, Registry}; /// Sets up the tracing subscribers for a Movement process. This should be /// called at the beginning of a process' `main` function. -pub fn init_tracing_subscriber() { +/// +/// If successful, returns a guard object that should be dropped at the end +/// of the process' `main` function scope. +pub fn init_tracing_subscriber( + service_name: &'static str, + service_version: &'static str, + config: &Config, +) -> Result { // TODO: compose console_subscriber as a layer let env_filter = EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) .from_env_lossy(); - tracing_subscriber::fmt().with_env_filter(env_filter).init(); + let fmt_layer = fmt::layer().with_filter(env_filter); + let subscriber = Registry::default().with(fmt_layer); + let (scope_guard, subscriber) = + telemetry::init_tracing_layer(subscriber, service_name, service_version, config)?; + subscriber.init(); + Ok(scope_guard) } From f713d7116ba53afa566169f73b00387077175ff4 Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Wed, 16 Oct 2024 15:52:09 +0300 Subject: [PATCH 06/17] fix(tracing): work around another shutdown problem The implementation of shutdown in the opentelemetry_sdk exporter calls futures_executor::block_on, which does not play well with the multithreaded Tokio runtime. --- Cargo.lock | 1 + util/tracing/Cargo.toml | 1 + util/tracing/src/telemetry.rs | 21 +++++++++++++++++---- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2cf8a1a69..b047f80a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9680,6 +9680,7 @@ dependencies = [ "opentelemetry-otlp", "opentelemetry-semantic-conventions", "opentelemetry_sdk", + "tokio", "tracing", "tracing-opentelemetry", "tracing-subscriber 0.3.18", diff --git a/util/tracing/Cargo.toml b/util/tracing/Cargo.toml index 6c9fad7f8..eceb5d5a7 100644 --- a/util/tracing/Cargo.toml +++ b/util/tracing/Cargo.toml @@ -16,6 +16,7 @@ opentelemetry = { workspace = true } opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] } opentelemetry-otlp = { workspace = true } opentelemetry-semantic-conventions = { workspace = true } +tokio = { workspace = true } tracing = { workspace = true } tracing-opentelemetry = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/util/tracing/src/telemetry.rs b/util/tracing/src/telemetry.rs index a34e64a44..ee7cd9496 100644 --- a/util/tracing/src/telemetry.rs +++ b/util/tracing/src/telemetry.rs @@ -11,8 +11,9 @@ use crate::Config; use opentelemetry::{trace::TracerProvider as _, KeyValue}; use opentelemetry_otlp::WithExportConfig as _; use opentelemetry_sdk::trace::{Config as TraceConfig, TracerProvider}; -use opentelemetry_sdk::{runtime, Resource}; +use opentelemetry_sdk::{runtime::Tokio as TokioRuntimeSelector, Resource}; use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION}; +use tokio::runtime; use tracing::{error, Level, Subscriber}; use tracing_opentelemetry::OpenTelemetryLayer; use tracing_subscriber::filter; @@ -28,12 +29,24 @@ pub struct ScopeGuard(Option); impl Drop for ScopeGuard { fn drop(&mut self) { - if let Some(tracer_provider) = &self.0 { - // Make sure all batched traces are exported. + fn shutdown_provider(tracer_provider: &TracerProvider) { if let Err(e) = tracer_provider.shutdown() { error!("OpenTelemetry tracer provider shutdown failed: {e}"); } } + + if let Some(tracer_provider) = self.0.take() { + // Make sure all batched traces are exported. + if let Ok(handle) = runtime::Handle::try_current() { + // Can't call shutdown in async context due to + // https://github.com/open-telemetry/opentelemetry-rust/issues/2047#issuecomment-2416480148 + handle.spawn_blocking(move || { + shutdown_provider(&tracer_provider); + }); + } else { + shutdown_provider(&tracer_provider); + } + } } } @@ -61,7 +74,7 @@ where KeyValue::new(SERVICE_NAME, service_name), KeyValue::new(SERVICE_VERSION, service_version), ]))) - .install_batch(runtime::Tokio)?; + .install_batch(TokioRuntimeSelector)?; let layer = OpenTelemetryLayer::new(provider.tracer("movement")) .with_filter(filter::Targets::new().with_target("movement_telemetry", Level::INFO)); (Some(provider), Some(layer)) From 8ce2e07e054b6966c946504f655f38bacf519e56 Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Wed, 16 Oct 2024 18:19:53 +0300 Subject: [PATCH 07/17] feat: spans for each telemetry event OpenTelemetry needs spans at the top level of its log event model at least. --- .../src/tasks/transaction_ingress.rs | 31 ++++++---------- .../da/m1/light-node/src/v1/passthrough.rs | 1 - .../da/m1/light-node/src/v1/sequencer.rs | 35 +++++++------------ .../opt-executor/src/executor/mod.rs | 6 ---- 4 files changed, 24 insertions(+), 49 deletions(-) diff --git a/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs b/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs index 140511995..eac4da334 100644 --- a/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs +++ b/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs @@ -8,11 +8,8 @@ use tokio::sync::mpsc; use tracing::{info, info_span, warn, Instrument}; use std::ops::ControlFlow; -use std::sync::atomic::{self, AtomicU64}; use std::time::{Duration, Instant}; -const LOGGING_UID: AtomicU64 = AtomicU64::new(0); - pub struct Task { transaction_receiver: mpsc::Receiver, da_light_node_client: LightNodeServiceClient, @@ -29,23 +26,13 @@ impl Task { } pub async fn run(mut self) -> anyhow::Result<()> { - loop { - let batch_id = LOGGING_UID.fetch_add(1, atomic::Ordering::Relaxed); - let span = - info_span!(target: "movement_telemetry", "write_batch", batch_id = %batch_id); - if let ControlFlow::Break(()) = - self.spawn_write_next_transaction_batch().instrument(span).await? - { - break; - } - } + while let ControlFlow::Continue(()) = self.build_and_write_batch().await? {} Ok(()) } /// Constructs a batch of transactions then spawns the write request to the DA in the background. - async fn spawn_write_next_transaction_batch( - &mut self, - ) -> Result, anyhow::Error> { + #[tracing::instrument(target = "movement_telemetry", skip(self))] + async fn build_and_write_batch(&mut self) -> Result, anyhow::Error> { use ControlFlow::{Break, Continue}; // limit the total time batching transactions @@ -107,11 +94,15 @@ impl Task { let batch_write = BatchWriteRequest { blobs: transactions }; // spawn the actual batch write request in the background let mut da_light_node_client = self.da_light_node_client.clone(); - tokio::spawn(async move { - if let Err(e) = da_light_node_client.batch_write(batch_write).await { - warn!("failed to write batch to DA: {:?}", e); + let write_span = info_span!(target: "movement_telemetry", "batch_write"); + tokio::spawn( + async move { + if let Err(e) = da_light_node_client.batch_write(batch_write).await { + warn!("failed to write batch to DA: {:?}", e); + } } - }); + .instrument(write_span), + ); } Ok(Continue(())) diff --git a/protocol-units/da/m1/light-node/src/v1/passthrough.rs b/protocol-units/da/m1/light-node/src/v1/passthrough.rs index 1db727988..b16977f10 100644 --- a/protocol-units/da/m1/light-node/src/v1/passthrough.rs +++ b/protocol-units/da/m1/light-node/src/v1/passthrough.rs @@ -146,7 +146,6 @@ impl LightNodeV1 { Ok(verified_blobs) } - #[tracing::instrument(target = "movement_telemetry", level = "debug")] async fn get_blobs_at_height(&self, height: u64) -> Result, anyhow::Error> { let celestia_blobs = self.get_celestia_blobs_at_height(height).await?; let mut blobs = Vec::new(); diff --git a/protocol-units/da/m1/light-node/src/v1/sequencer.rs b/protocol-units/da/m1/light-node/src/v1/sequencer.rs index 294b6a9e6..8fe60a3bb 100644 --- a/protocol-units/da/m1/light-node/src/v1/sequencer.rs +++ b/protocol-units/da/m1/light-node/src/v1/sequencer.rs @@ -2,7 +2,7 @@ use std::boxed::Box; use std::fmt; use std::path::PathBuf; use std::pin::Pin; -use std::sync::{atomic::AtomicU64, Arc}; +use std::sync::Arc; use std::time::Duration; use tokio::{ @@ -26,8 +26,6 @@ use movement_types::block::Block; use crate::v1::{passthrough::LightNodeV1 as LightNodeV1PassThrough, LightNodeV1Operations}; -const LOGGING_UID: AtomicU64 = AtomicU64::new(0); - #[derive(Clone)] pub struct LightNodeV1 { pub pass_through: LightNodeV1PassThrough, @@ -73,23 +71,24 @@ impl LightNodeV1Operations for LightNodeV1 { } impl LightNodeV1 { + #[tracing::instrument(target = "movement_telemetry", name = "build_block")] async fn tick_build_blocks(&self, sender: Sender) -> Result<(), anyhow::Error> { let memseq = self.memseq.clone(); - // this has an internal timeout based on its building time - // so in the worst case scenario we will roughly double the internal timeout - let uid = LOGGING_UID.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - debug!(target: "movement_telemetry", uid = %uid, "waiting_for_next_block",); let block = memseq.wait_for_next_block().await?; match block { Some(block) => { - info!(target: "movement_telemetry", block_id = %block.id(), uid = %uid, transaction_count = block.transactions().len(), "received_block"); + info!( + target: "movement_telemetry", + block_id = %block.id(), + transaction_count = block.transactions().len(), + "received_block", + ); sender.send(block).await?; Ok(()) } None => { - // no transactions to include - debug!(target: "movement_telemetry", uid = %uid, "no_transactions_to_include"); + debug!("no transactions to include"); Ok(()) } } @@ -113,11 +112,7 @@ impl LightNodeV1 { Ok(()) } - pub async fn submit_with_heuristic(&self, blocks: Vec) -> Result<(), anyhow::Error> { - for block in &blocks { - info!(target: "movement_telemetry", block_id = %block.id(), "submitting_block"); - } - + async fn submit_with_heuristic(&self, blocks: Vec) -> Result<(), anyhow::Error> { // wrap the blocks in a struct that can be split and compressed // spawn blocking because the compression is blocking and could be slow let namespace = self.pass_through.celestia_namespace.clone(); @@ -166,7 +161,6 @@ impl LightNodeV1 { ) .await?; - info!("block group results: {:?}", block_group_results); for block_group_result in &block_group_results { info!(target: "movement_telemetry", block_group_result = ?block_group_result, "block_group_result"); } @@ -198,16 +192,12 @@ impl LightNodeV1 { } Ok(None) => { // The channel was closed - info!("sender dropped"); + debug!("sender dropped"); break; } Err(_) => { // The operation timed out - debug!( - target: "movement_telemetry", - batch_size = blocks.len(), - "timed_out_building_block" - ); + debug!(batch_size = blocks.len(), "timed_out_building_block"); break; } } @@ -219,6 +209,7 @@ impl LightNodeV1 { } /// Ticks the block proposer to build blocks and submit them + #[tracing::instrument(target = "movement_telemetry", name = "publish_blobs")] async fn tick_publish_blobs( &self, receiver: &mut Receiver, diff --git a/protocol-units/execution/opt-executor/src/executor/mod.rs b/protocol-units/execution/opt-executor/src/executor/mod.rs index ab14a262e..939e322fd 100644 --- a/protocol-units/execution/opt-executor/src/executor/mod.rs +++ b/protocol-units/execution/opt-executor/src/executor/mod.rs @@ -45,12 +45,6 @@ impl Executor { // a semaphore might be better here as this will rerun until the value does not change during the operation self.transactions_in_flight .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| { - info!( - target: "movement_telemetry", - count, - current, - "decrementing_transactions_in_flight", - ); Some(current.saturating_sub(count)) }) .unwrap_or_else(|_| 0); From c915948146f583fef8248a559a978380a3f55d9f Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Thu, 17 Oct 2024 16:12:41 +0300 Subject: [PATCH 08/17] feat(suzuka-full-node): execute_block telemetry Emit telemetry events detailing the success or failure --- .../suzuka-full-node/src/tasks/execute_settle.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs b/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs index e47297b1f..2320453f7 100644 --- a/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs +++ b/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs @@ -183,18 +183,22 @@ where block: Block, mut block_timestamp: u64, ) -> anyhow::Result { - for _ in 0..self.execution_extension.block_retry_count { + let retry_count = self.execution_extension.block_retry_count; + for _ in 0..retry_count { // we have to clone here because the block is supposed to be consumed by the executor match self.execute_block(block.clone(), block_timestamp).await { - Ok(commitment) => return Ok(commitment), + Ok(commitment) => { + info!(target: "movement_telemetry", "execute_block_succeeded"); + return Ok(commitment); + } Err(e) => { - info!("Failed to execute block: {:?}. Retrying", e); + info!(target: "movement_telemetry", error = %e, "execute_block_failed"); block_timestamp += self.execution_extension.block_retry_increment_microseconds; // increase the timestamp by 5 ms (5000 microseconds) } } } - anyhow::bail!("Failed to execute block after 5 retries") + anyhow::bail!("Failed to execute block after {retry_count} retries") } async fn execute_block( From c1ba7fc8fb28c31545786827f825c01569301f7b Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Thu, 17 Oct 2024 16:16:31 +0300 Subject: [PATCH 09/17] chore: comments on some metrics In the transaction_ingress task of suzuka-full-node and executor's transaction_pipe, add comments detailing which metrics the telemetry events are contributing to. --- .../suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs | 3 +++ protocol-units/execution/opt-executor/src/transaction_pipe.rs | 3 +++ 2 files changed, 6 insertions(+) diff --git a/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs b/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs index eac4da334..35553fe92 100644 --- a/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs +++ b/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs @@ -59,6 +59,9 @@ impl Task { { Ok(transaction) => match transaction { Some(transaction) => { + // Instrumentation for aggregated metrics: + // Transactions per second: https://github.com/movementlabsxyz/movement/discussions/422 + // Transaction latency: https://github.com/movementlabsxyz/movement/discussions/423 info!( target: "movement_telemetry", tx_hash = %transaction.committed_hash(), diff --git a/protocol-units/execution/opt-executor/src/transaction_pipe.rs b/protocol-units/execution/opt-executor/src/transaction_pipe.rs index 03abf4473..c5cc535d8 100644 --- a/protocol-units/execution/opt-executor/src/transaction_pipe.rs +++ b/protocol-units/execution/opt-executor/src/transaction_pipe.rs @@ -88,6 +88,9 @@ impl TransactionPipe { if let Some(request) = next { match request { MempoolClientRequest::SubmitTransaction(transaction, callback) => { + // Instrumentation for aggregated metrics: + // Transactions per second: https://github.com/movementlabsxyz/movement/discussions/422 + // Transaction latency: https://github.com/movementlabsxyz/movement/discussions/423 let span = info_span!( target: "movement_telemetry", "submit_transaction", From 372f9b49bad010f5c37ca278edcef37e6ac660b3 Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Thu, 17 Oct 2024 16:29:10 +0300 Subject: [PATCH 10/17] feat(opt-executor): emit telemetry on tx failure At the points where transaction is dropped in the submit_transaction method, emit telemetry events. These will help compute the transaction failure rate. --- .../opt-executor/src/transaction_pipe.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/protocol-units/execution/opt-executor/src/transaction_pipe.rs b/protocol-units/execution/opt-executor/src/transaction_pipe.rs index c5cc535d8..3f9f80b71 100644 --- a/protocol-units/execution/opt-executor/src/transaction_pipe.rs +++ b/protocol-units/execution/opt-executor/src/transaction_pipe.rs @@ -132,6 +132,10 @@ impl TransactionPipe { "transactions_in_flight" ); if in_flight > self.in_flight_limit { + // Instrumentation for aggregated metrics: + // Transaction failure rate: https://github.com/movementlabsxyz/movement/discussions/428 + // The arguments for identifying the transaction are present on the current + // "submit_transaction" span. info!( target: "movement_telemetry", "shedding_load" @@ -160,6 +164,14 @@ impl TransactionPipe { let sequence_number = vm_validator::get_account_sequence_number(&state_view, transaction.sender())?; if transaction.sequence_number() < sequence_number { + // Instrumentation for aggregated metrics: + // Transaction failure rate: https://github.com/movementlabsxyz/movement/discussions/428 + // The arguments for identifying the transaction are present on the current + // "submit_transaction" span. + info!( + target: "movement_telemetry", + "sequence_number_too_old" + ); let status = MempoolStatus::new(MempoolStatusCode::VmError); return Ok((status, Some(DiscardedVMStatus::SEQUENCE_NUMBER_TOO_OLD))); } @@ -186,7 +198,11 @@ impl TransactionPipe { self.core_mempool.commit_transaction(&sender, sequence_number); } _ => { - warn!("Transaction not accepted: {:?}", status); + // Instrumentation for aggregated metrics: + // Transaction failure rate: https://github.com/movementlabsxyz/movement/discussions/428 + // The arguments for identifying the transaction are present on the current + // "submit_transaction" span. + info!(target: "movement_telemetry", %status, "rejected_by_mempool"); } } From acc202bb939270bcc6e0544a728f7b693506faa0 Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Thu, 17 Oct 2024 16:48:59 +0300 Subject: [PATCH 11/17] feat(suzuka-full-node): telemetry on executed tx --- .../suzuka-full-node/src/tasks/execute_settle.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs b/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs index 2320453f7..6a419bb3a 100644 --- a/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs +++ b/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs @@ -230,6 +230,22 @@ where continue; } + // Instrumentation for aggregated metrics: + // Transactions per second: https://github.com/movementlabsxyz/movement/discussions/422 + // Transaction latency: https://github.com/movementlabsxyz/movement/discussions/423 + // Transaction failure rate: https://github.com/movementlabsxyz/movement/discussions/428 + // + // TODO: as the block can be attempted to be executed repeatedly, + // collect this data once and export in telemetry + // on the final success or failure. + info!( + target: "movement_telemetry", + tx_hash = %signed_transaction.committed_hash(), + sender = %signed_transaction.sender(), + sequence_number = signed_transaction.sequence_number(), + "executing_transaction" + ); + let signature_verified_transaction = SignatureVerifiedTransaction::Valid( Transaction::UserTransaction(signed_transaction), ); From 954c8e46d00963030eb74c74a06f4d2c79aeb8b2 Mon Sep 17 00:00:00 2001 From: Liam Monninger Date: Fri, 18 Oct 2024 10:51:47 -0700 Subject: [PATCH 12/17] fix: add feed. --- .../suzuka-full-node/process-compose.feed.yml | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 process-compose/suzuka-full-node/process-compose.feed.yml diff --git a/process-compose/suzuka-full-node/process-compose.feed.yml b/process-compose/suzuka-full-node/process-compose.feed.yml new file mode 100644 index 000000000..8d40c9b45 --- /dev/null +++ b/process-compose/suzuka-full-node/process-compose.feed.yml @@ -0,0 +1,17 @@ +version: "3" + +environment: + +processes: + + client-test-feed: + command: | + # loop the test examples + while true; do + cargo test -p suzuka-client test_example_ || break + done + depends_on: + suzuka-full-node: + condition: process_healthy + suzuka-faucet: + condition: process_healthy \ No newline at end of file From e6144f81e38ee5f0c525480d66b859f1ceb6360d Mon Sep 17 00:00:00 2001 From: Liam Monninger Date: Fri, 18 Oct 2024 11:26:26 -0700 Subject: [PATCH 13/17] fix: add otlp test. --- .../process-compose.telemetry.yml | 4 +++ .../process-compose.test-telemetry.yml | 35 +++++++++++++++++++ 2 files changed, 39 insertions(+) create mode 100644 process-compose/suzuka-full-node/process-compose.test-telemetry.yml diff --git a/process-compose/suzuka-full-node/process-compose.telemetry.yml b/process-compose/suzuka-full-node/process-compose.telemetry.yml index 88180844b..3e119a705 100644 --- a/process-compose/suzuka-full-node/process-compose.telemetry.yml +++ b/process-compose/suzuka-full-node/process-compose.telemetry.yml @@ -10,6 +10,10 @@ processes: shutdown: command: | docker stop suzuka-otlp-collector + readiness_probe: + initial_delay_seconds: 3 + exec: + command: curl http://localhost:16686/api/traces suzuka-full-node: depends_on: otlp-collector: diff --git a/process-compose/suzuka-full-node/process-compose.test-telemetry.yml b/process-compose/suzuka-full-node/process-compose.test-telemetry.yml new file mode 100644 index 000000000..5aecc679e --- /dev/null +++ b/process-compose/suzuka-full-node/process-compose.test-telemetry.yml @@ -0,0 +1,35 @@ +version: "3" + +environment: + +processes: + + test-telemetry: + command: | + M1_DA_LIGHT_NODE_TRACES_JSON=$(curl http://localhost:16686/api/traces?service=m1-da-light-node) + # expect the total field to be greater than 0 + if [ $(echo $M1_DA_LIGHT_NODE_TRACES_JSON | jq '.total') -gt 0 ]; then + echo "M1-DA-Light-Node traces found" + else + echo "M1-DA-Light-Node traces not found + exit 1 + fi + + SUZUKA_TRACES_JSON=$(curl http://localhost:16686/api/traces?service=suzuka-full-node) + # expect the total field to be greater than 0 + if [ $(echo $SUZUKA_TRACES_JSON | jq '.total') -gt 0 ]; then + echo "Suzuka traces found" + else + echo "Suzuka traces not found + exit 1 + fi + + depends_on: + otlp-collector: + condition: process_healthy + suzuka-full-node: + condition: process_healthy + suzuka-faucet: + condition: process_healthy + availability: + exit_on_end: true From f59cb2f5ca3ce07117b5cb9bfc63b34df56fd7ec Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Mon, 21 Oct 2024 11:34:35 +0300 Subject: [PATCH 14/17] chore(tracing): comment on the guard --- util/tracing/src/tracing.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/util/tracing/src/tracing.rs b/util/tracing/src/tracing.rs index e9875d7eb..735f930dd 100644 --- a/util/tracing/src/tracing.rs +++ b/util/tracing/src/tracing.rs @@ -8,7 +8,9 @@ use tracing_subscriber::{fmt, Registry}; /// called at the beginning of a process' `main` function. /// /// If successful, returns a guard object that should be dropped at the end -/// of the process' `main` function scope. +/// of the process' `main` function scope. The guard keeps the background task +/// running to export telemetry (if configured) and makes sure the exporter is +/// properly shut down, flushing buffered export data, before the process exits. pub fn init_tracing_subscriber( service_name: &'static str, service_version: &'static str, From 2fab8d707256b17d6accb90f87d2520e380c3ecd Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Mon, 21 Oct 2024 11:44:18 +0300 Subject: [PATCH 15/17] feat(ci): add telemetry to local tests Apply the telemetry overlay when running tests in the local setup. --- .github/workflows/checks-all.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/checks-all.yml b/.github/workflows/checks-all.yml index 9ce4e3810..c57b546c6 100755 --- a/.github/workflows/checks-all.yml +++ b/.github/workflows/checks-all.yml @@ -100,7 +100,8 @@ jobs: env: CELESTIA_LOG_LEVEL: FATAL # adjust the log level while debugging run: | - nix develop --command bash -c "just suzuka-full-node native build.setup.eth-local.celestia-local.test -t=false" + nix develop --command bash \ + -c "just suzuka-full-node native build.setup.eth-local.celestia-local.test.telemetry.test-telemetry -t=false" suzuka-multi-node-local: if: github.event.label.name == 'cicd:suzuka-multi-node-local' || github.ref == 'refs/heads/main' From f06cf53bf3e3e5d22963ca4291b472234d3c2789 Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Mon, 21 Oct 2024 23:32:23 +0300 Subject: [PATCH 16/17] feat(docker-compose): add telemetry overlay Port the simple jaeger setup from process-compose. --- .../docker-compose.telemetry.yml | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 docker/compose/suzuka-full-node/docker-compose.telemetry.yml diff --git a/docker/compose/suzuka-full-node/docker-compose.telemetry.yml b/docker/compose/suzuka-full-node/docker-compose.telemetry.yml new file mode 100644 index 000000000..470d3c274 --- /dev/null +++ b/docker/compose/suzuka-full-node/docker-compose.telemetry.yml @@ -0,0 +1,26 @@ +services: + jaeger: + image: jaegertracing/all-in-one:1.62.0 + container_name: jaeger + environment: + - COLLECTOR_OTLP_ENABLED=true + ports: + - "4317:4317" + - "16686:16686" + healthcheck: + test: curl http://localhost:16686/api/traces + start_period: 3s + + suzuka-full-node: + environment: + - MOVEMENT_OTLP=http://jaeger:4317 + depends_on: + - jaeger: + condition: service_healthy + + m1-da-light-node: + environment: + - MOVEMENT_OTLP=http://jaeger:4317 + depends_on: + - jaeger: + condition: service_healthy From a2e6653d2b7d958e2bf7a6027ff371386bb1a74d Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Fri, 1 Nov 2024 17:38:55 +0200 Subject: [PATCH 17/17] chore: commit Cargo.lock updates --- Cargo.lock | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f3073fbf2..931229b7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3578,7 +3578,7 @@ checksum = "504e3947307ac8326a5437504c517c4b56716c9d98fac0028c2acc7ca47d70ae" dependencies = [ "async-trait", "axum-core 0.4.5", - "bytes 1.7.2", + "bytes 1.8.0", "futures-util", "http 1.1.0", "http-body 1.0.1", @@ -3621,7 +3621,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" dependencies = [ "async-trait", - "bytes 1.7.2", + "bytes 1.8.0", "futures-util", "http 1.1.0", "http-body 1.0.1", @@ -11442,7 +11442,7 @@ version = "0.13.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b0487d90e047de87f984913713b85c601c05609aad5b0df4b4573fbf69aa13f" dependencies = [ - "bytes 1.7.2", + "bytes 1.8.0", "prost-derive 0.13.3", ] @@ -11503,7 +11503,7 @@ dependencies = [ "itertools 0.13.0", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.85", ] [[package]] @@ -14344,7 +14344,7 @@ dependencies = [ "async-trait", "axum 0.7.7", "base64 0.22.1", - "bytes 1.7.2", + "bytes 1.8.0", "h2 0.4.6", "http 1.1.0", "http-body 1.0.1", @@ -14353,7 +14353,7 @@ dependencies = [ "hyper-timeout 0.5.1", "hyper-util", "percent-encoding", - "pin-project 1.1.6", + "pin-project 1.1.7", "prost 0.13.3", "socket2 0.5.7", "tokio",