diff --git a/.github/workflows/checks-all.yml b/.github/workflows/checks-all.yml index 6c7df551e..89e117195 100755 --- a/.github/workflows/checks-all.yml +++ b/.github/workflows/checks-all.yml @@ -100,8 +100,10 @@ jobs: env: CELESTIA_LOG_LEVEL: FATAL # adjust the log level while debugging run: | - nix develop --command bash -c "just suzuka-full-node native build.setup.eth-local.celestia-local.test -t=false" - nix develop --command bash -c "just suzuka-full-node native build.setup.eth-local.celestia-local.test -t=false" + nix develop --command bash \ + -c "just suzuka-full-node native build.setup.eth-local.celestia-local.test.telemetry.test-telemetry -t=false" + nix develop --command bash \ + -c "just suzuka-full-node native build.setup.eth-local.celestia-local.test -t=false" suzuka-full-node-malicious: if: github.event.label.name == 'cicd:suzuka-full-node-malicious' || github.ref == 'refs/heads/main' diff --git a/Cargo.lock b/Cargo.lock index b8deafc41..60f032440 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -420,7 +420,7 @@ dependencies = [ "serde_json", "tokio", "tokio-stream", - "tower", + "tower 0.4.13", "tracing", ] @@ -465,7 +465,7 @@ dependencies = [ "serde_json", "tokio", "tokio-stream", - "tower", + "tower 0.4.13", "tracing", "url", ] @@ -664,7 +664,7 @@ dependencies = [ "serde_json", "thiserror", "tokio", - "tower", + "tower 0.4.13", "tracing", "url", ] @@ -678,7 +678,7 @@ dependencies = [ "alloy-transport", "reqwest 0.12.8", "serde_json", - "tower", + "tower 0.4.13", "tracing", "url", ] @@ -3549,7 +3549,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.3.4", "bitflags 1.3.2", "bytes 1.8.0", "futures-util", @@ -3565,7 +3565,34 @@ dependencies = [ "rustversion", "serde", "sync_wrapper 0.1.2", - "tower", + "tower 0.4.13", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum" +version = "0.7.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "504e3947307ac8326a5437504c517c4b56716c9d98fac0028c2acc7ca47d70ae" +dependencies = [ + "async-trait", + "axum-core 0.4.5", + "bytes 1.8.0", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper 1.0.1", + "tower 0.5.1", "tower-layer", "tower-service", ] @@ -3587,6 +3614,26 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes 1.8.0", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper 1.0.1", + "tower-layer", + "tower-service", +] + [[package]] name = "az" version = "1.2.1" @@ -6789,7 +6836,7 @@ dependencies = [ "tokio", "tokio-retry", "tonic 0.9.2", - "tower", + "tower 0.4.13", "tracing", ] @@ -7337,6 +7384,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "httparse", + "httpdate", "itoa", "pin-project-lite", "smallvec", @@ -7390,6 +7438,19 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper 1.5.0", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -7421,9 +7482,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.9" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41296eb09f183ac68eec06e03cdbea2e759633d4067b2f6552fc2e009bcad08b" +checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4" dependencies = [ "bytes 1.8.0", "futures-channel", @@ -7932,7 +7993,7 @@ dependencies = [ "serde_json", "thiserror", "tokio", - "tower", + "tower 0.4.13", "tracing", "url", ] @@ -8733,7 +8794,6 @@ dependencies = [ "tempfile", "thiserror", "tokio", - "tonic 0.11.0", "tracing", "tracing-test", ] @@ -9756,7 +9816,14 @@ dependencies = [ name = "movement-tracing" version = "0.0.2" dependencies = [ - "tracing-appender", + "anyhow", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk", + "tokio", + "tracing", + "tracing-opentelemetry", "tracing-subscriber 0.3.18", ] @@ -10248,6 +10315,77 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "570074cc999d1a58184080966e5bd3bf3a9a4af650c3b05047c2621e7405cd17" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29e1f9c8b032d4f635c730c0efcf731d5e2530ea13fa8bef7939ddc8420696bd" +dependencies = [ + "async-trait", + "futures-core", + "http 1.1.0", + "opentelemetry", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost 0.13.3", + "thiserror", + "tokio", + "tonic 0.12.3", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9d3968ce3aefdcca5c27e3c4ea4391b37547726a70893aab52d3de95d5f8b34" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost 0.13.3", + "tonic 0.12.3", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db945c1eaea8ac6a9677185357480d215bb6999faa9f691d0c4d4d641eab7a09" + +[[package]] +name = "opentelemetry_sdk" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2c627d9f4c9cdc1f21a29ee4bfbd6028fcb8bcf2a857b43f3abdf72c9c862f3" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "once_cell", + "opentelemetry", + "percent-encoding", + "rand 0.8.5", + "serde_json", + "thiserror", + "tokio", + "tokio-stream", +] + [[package]] name = "ordered-float" version = "2.10.1" @@ -11356,6 +11494,16 @@ dependencies = [ "prost-derive 0.12.6", ] +[[package]] +name = "prost" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b0487d90e047de87f984913713b85c601c05609aad5b0df4b4573fbf69aa13f" +dependencies = [ + "bytes 1.8.0", + "prost-derive 0.13.3", +] + [[package]] name = "prost-build" version = "0.12.6" @@ -11403,6 +11551,19 @@ dependencies = [ "syn 2.0.85", ] +[[package]] +name = "prost-derive" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" +dependencies = [ + "anyhow", + "itertools 0.13.0", + "proc-macro2", + "quote", + "syn 2.0.85", +] + [[package]] name = "prost-reflect" version = "0.13.1" @@ -14213,7 +14374,7 @@ checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" dependencies = [ "async-stream", "async-trait", - "axum", + "axum 0.6.20", "base64 0.21.7", "bytes 1.8.0", "flate2", @@ -14223,7 +14384,7 @@ dependencies = [ "http 0.2.12", "http-body 0.4.6", "hyper 0.14.31", - "hyper-timeout", + "hyper-timeout 0.4.1", "percent-encoding", "pin-project 1.1.7", "prost 0.11.9", @@ -14231,7 +14392,7 @@ dependencies = [ "tokio", "tokio-rustls 0.24.1", "tokio-stream", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", "tracing", @@ -14246,7 +14407,7 @@ checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" dependencies = [ "async-stream", "async-trait", - "axum", + "axum 0.6.20", "base64 0.21.7", "bytes 1.8.0", "flate2", @@ -14254,7 +14415,7 @@ dependencies = [ "http 0.2.12", "http-body 0.4.6", "hyper 0.14.31", - "hyper-timeout", + "hyper-timeout 0.4.1", "percent-encoding", "pin-project 1.1.7", "prost 0.12.6", @@ -14264,13 +14425,43 @@ dependencies = [ "tokio", "tokio-rustls 0.25.0", "tokio-stream", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", "tracing", "zstd 0.12.4", ] +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-stream", + "async-trait", + "axum 0.7.7", + "base64 0.22.1", + "bytes 1.8.0", + "h2 0.4.6", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.5.0", + "hyper-timeout 0.5.2", + "hyper-util", + "percent-encoding", + "pin-project 1.1.7", + "prost 0.13.3", + "socket2 0.5.7", + "tokio", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tonic-build" version = "0.11.0" @@ -14317,6 +14508,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper 0.1.2", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-layer" version = "0.3.3" @@ -14341,18 +14546,6 @@ dependencies = [ "tracing-core", ] -[[package]] -name = "tracing-appender" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf" -dependencies = [ - "crossbeam-channel", - "thiserror", - "time", - "tracing-subscriber 0.3.18", -] - [[package]] name = "tracing-attributes" version = "0.1.27" @@ -14385,6 +14578,24 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc58af5d3f6c5811462cabb3289aec0093f7338e367e5a33d28c0433b3c7360b" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber 0.3.18", + "web-time", +] + [[package]] name = "tracing-serde" version = "0.1.3" @@ -15065,6 +15276,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "0.23.1" diff --git a/Cargo.toml b/Cargo.toml index 34ede7390..322e18fe0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -248,6 +248,10 @@ move-vm-ext = { path = "types/move-vm-ext" } num-derive = "0.4.2" num-traits = "0.2.14" once_cell = "1.8.0" +opentelemetry = { version = "0.26" } +opentelemetry-otlp = { version = "0.26" } +opentelemetry_sdk = { version = "0.26", features = ["rt-tokio"] } +opentelemetry-semantic-conventions = { version = "0.26" } parking_lot = { version = "0.12.1" } poem = { version = "=1.3.59", features = ["anyhow", "rustls"] } poem-openapi = { version = "=2.0.11", features = ["swagger-ui", "url"] } @@ -284,7 +288,7 @@ tonic-build = { version = "0.11", features = ["prost"] } tonic-reflection = "0.11" ### To try (experimental) std support, add `features = [ "std" ]` to risc0-zkvm tracing = "0.1.40" -tracing-appender = "0.2" +tracing-opentelemetry = { version = "0.27" } tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-test = "0.2.5" trie-db = "0.28.0" diff --git a/docker/compose/suzuka-full-node/docker-compose.telemetry.yml b/docker/compose/suzuka-full-node/docker-compose.telemetry.yml new file mode 100644 index 000000000..470d3c274 --- /dev/null +++ b/docker/compose/suzuka-full-node/docker-compose.telemetry.yml @@ -0,0 +1,26 @@ +services: + jaeger: + image: jaegertracing/all-in-one:1.62.0 + container_name: jaeger + environment: + - COLLECTOR_OTLP_ENABLED=true + ports: + - "4317:4317" + - "16686:16686" + healthcheck: + test: curl http://localhost:16686/api/traces + start_period: 3s + + suzuka-full-node: + environment: + - MOVEMENT_OTLP=http://jaeger:4317 + depends_on: + - jaeger: + condition: service_healthy + + m1-da-light-node: + environment: + - MOVEMENT_OTLP=http://jaeger:4317 + depends_on: + - jaeger: + condition: service_healthy diff --git a/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs b/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs index 3b9353e05..bad4faf94 100644 --- a/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs +++ b/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs @@ -12,8 +12,6 @@ use tokio::sync::RwLock; use tracing::info; use url::Url; -const TIMING_LOG_ENV: &str = "SUZUKA_TIMING_LOG"; - pub fn get_suzuka_config( dot_movement: &DotMovement, ) -> Result { @@ -247,10 +245,12 @@ pub async fn basic_coin_transfers( #[tokio::main] async fn main() -> Result<(), anyhow::Error> { - let tracing_config = movement_tracing::Config { - timing_log_path: std::env::var_os(TIMING_LOG_ENV).map(Into::into), - }; - let _guard = movement_tracing::init_tracing_subscriber(tracing_config); + let tracing_config = movement_tracing::Config::from_env()?; + let _guard = movement_tracing::init_tracing_subscriber( + env!("CARGO_BIN_NAME"), + env!("CARGO_PKG_VERSION"), + &tracing_config, + )?; // get the lead dot movement from the environment let dot_movement = DotMovement::try_from_env()?; diff --git a/networks/suzuka/suzuka-full-node/src/main.rs b/networks/suzuka/suzuka-full-node/src/main.rs index eea463a88..def29bb20 100644 --- a/networks/suzuka/suzuka-full-node/src/main.rs +++ b/networks/suzuka/suzuka-full-node/src/main.rs @@ -1,15 +1,15 @@ use suzuka_full_node::manager::Manager; -use std::env; use std::process::ExitCode; -const TIMING_LOG_ENV: &str = "SUZUKA_TIMING_LOG"; - #[tokio::main] async fn main() -> Result { - let tracing_config = - movement_tracing::Config { timing_log_path: env::var_os(TIMING_LOG_ENV).map(Into::into) }; - let _guard = movement_tracing::init_tracing_subscriber(tracing_config); + let tracing_config = movement_tracing::Config::from_env()?; + let _guard = movement_tracing::init_tracing_subscriber( + env!("CARGO_BIN_NAME"), + env!("CARGO_PKG_VERSION"), + &tracing_config, + )?; // get the config file let dot_movement = dot_movement::DotMovement::try_from_env()?; diff --git a/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs b/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs index db3437dd5..ba2d1575b 100644 --- a/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs +++ b/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs @@ -113,9 +113,10 @@ where } }; + let block_id_hex = hex::encode(&block_id); info!( - block_id = %hex::encode(block_id.clone()), - da_height = da_height, + block_id = block_id_hex, + da_height, time = block_timestamp, "Processing block from DA" ); @@ -133,9 +134,10 @@ where let block: Block = bcs::from_bytes(&block_bytes[..])?; - // get the transactions + // get the transactions count before the block is consumed let transactions_count = block.transactions().len(); - let span = info_span!(target: "movement_timing", "execute_block", id = ?block_id); + let span = + info_span!(target: "movement_telemetry", "execute_block", block_id = %block_id_hex); let commitment = self.execute_block_with_retries(block, block_timestamp).instrument(span).await?; @@ -159,7 +161,7 @@ where } } } else { - info!(block_id = ?block_id, "Skipping settlement"); + info!(block_id = block_id_hex, "Skipping settlement"); } Ok(()) @@ -178,18 +180,22 @@ where block: Block, mut block_timestamp: u64, ) -> anyhow::Result { - for _ in 0..self.execution_extension.block_retry_count { + let retry_count = self.execution_extension.block_retry_count; + for _ in 0..retry_count { // we have to clone here because the block is supposed to be consumed by the executor match self.execute_block(block.clone(), block_timestamp).await { - Ok(commitment) => return Ok(commitment), + Ok(commitment) => { + info!(target: "movement_telemetry", "execute_block_succeeded"); + return Ok(commitment); + } Err(e) => { - info!("Failed to execute block: {:?}. Retrying", e); + info!(target: "movement_telemetry", error = %e, "execute_block_failed"); block_timestamp += self.execution_extension.block_retry_increment_microseconds; // increase the timestamp by 5 ms (5000 microseconds) } } } - anyhow::bail!("Failed to execute block after 5 retries") + anyhow::bail!("Failed to execute block after {retry_count} retries") } async fn execute_block( @@ -221,6 +227,22 @@ where continue; } + // Instrumentation for aggregated metrics: + // Transactions per second: https://github.com/movementlabsxyz/movement/discussions/422 + // Transaction latency: https://github.com/movementlabsxyz/movement/discussions/423 + // Transaction failure rate: https://github.com/movementlabsxyz/movement/discussions/428 + // + // TODO: as the block can be attempted to be executed repeatedly, + // collect this data once and export in telemetry + // on the final success or failure. + info!( + target: "movement_telemetry", + tx_hash = %signed_transaction.committed_hash(), + sender = %signed_transaction.sender(), + sequence_number = signed_transaction.sequence_number(), + "executing_transaction" + ); + let signature_verified_transaction = SignatureVerifiedTransaction::Valid( Transaction::UserTransaction(signed_transaction), ); diff --git a/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs b/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs index a02108f53..35553fe92 100644 --- a/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs +++ b/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs @@ -5,14 +5,11 @@ use m1_da_light_node_util::config::Config as LightNodeConfig; use maptos_dof_execution::SignedTransaction; use tokio::sync::mpsc; -use tracing::{info, warn}; +use tracing::{info, info_span, warn, Instrument}; use std::ops::ControlFlow; -use std::sync::atomic::AtomicU64; use std::time::{Duration, Instant}; -const LOGGING_UID: AtomicU64 = AtomicU64::new(0); - pub struct Task { transaction_receiver: mpsc::Receiver, da_light_node_client: LightNodeServiceClient, @@ -29,14 +26,13 @@ impl Task { } pub async fn run(mut self) -> anyhow::Result<()> { - while let ControlFlow::Continue(()) = self.spawn_write_next_transaction_batch().await? {} + while let ControlFlow::Continue(()) = self.build_and_write_batch().await? {} Ok(()) } /// Constructs a batch of transactions then spawns the write request to the DA in the background. - async fn spawn_write_next_transaction_batch( - &mut self, - ) -> Result, anyhow::Error> { + #[tracing::instrument(target = "movement_telemetry", skip(self))] + async fn build_and_write_batch(&mut self) -> Result, anyhow::Error> { use ControlFlow::{Break, Continue}; // limit the total time batching transactions @@ -45,7 +41,6 @@ impl Task { let mut transactions = Vec::new(); - let batch_id = LOGGING_UID.fetch_add(1, std::sync::atomic::Ordering::SeqCst); loop { let remaining = match half_building_time.checked_sub(start.elapsed().as_millis() as u64) { @@ -64,13 +59,15 @@ impl Task { { Ok(transaction) => match transaction { Some(transaction) => { + // Instrumentation for aggregated metrics: + // Transactions per second: https://github.com/movementlabsxyz/movement/discussions/422 + // Transaction latency: https://github.com/movementlabsxyz/movement/discussions/423 info!( - target : "movement_timing", - batch_id = %batch_id, + target: "movement_telemetry", tx_hash = %transaction.committed_hash(), sender = %transaction.sender(), sequence_number = transaction.sequence_number(), - "received transaction", + "received_transaction", ); let serialized_aptos_transaction = serde_json::to_vec(&transaction)?; let movement_transaction = movement_types::transaction::Transaction::new( @@ -93,19 +90,22 @@ impl Task { if transactions.len() > 0 { info!( - target: "movement_timing", - batch_id = %batch_id, + target: "movement_telemetry", transaction_count = transactions.len(), "built_batch_write" ); let batch_write = BatchWriteRequest { blobs: transactions }; // spawn the actual batch write request in the background let mut da_light_node_client = self.da_light_node_client.clone(); - tokio::spawn(async move { - if let Err(e) = da_light_node_client.batch_write(batch_write).await { - warn!("failed to write batch to DA: {:?}", e); + let write_span = info_span!(target: "movement_telemetry", "batch_write"); + tokio::spawn( + async move { + if let Err(e) = da_light_node_client.batch_write(batch_write).await { + warn!("failed to write batch to DA: {:?}", e); + } } - }); + .instrument(write_span), + ); } Ok(Continue(())) diff --git a/process-compose/suzuka-full-node/process-compose.telemetry.yml b/process-compose/suzuka-full-node/process-compose.telemetry.yml new file mode 100644 index 000000000..3e119a705 --- /dev/null +++ b/process-compose/suzuka-full-node/process-compose.telemetry.yml @@ -0,0 +1,28 @@ +version: "3" + +environment: + +processes: + otlp-collector: + is_daemon: true + command: | + docker run -d --rm --name suzuka-otlp-collector -p16686:16686 -p4317:4317 -e COLLECTOR_OTLP_ENABLED=true jaegertracing/all-in-one:latest + shutdown: + command: | + docker stop suzuka-otlp-collector + readiness_probe: + initial_delay_seconds: 3 + exec: + command: curl http://localhost:16686/api/traces + suzuka-full-node: + depends_on: + otlp-collector: + condition: process_started + environment: + - MOVEMENT_OTLP=http://localhost:4317 + m1-da-light-node: + depends_on: + otlp-collector: + condition: process_started + environment: + - MOVEMENT_OTLP=http://localhost:4317 diff --git a/process-compose/suzuka-full-node/process-compose.test-telemetry.yml b/process-compose/suzuka-full-node/process-compose.test-telemetry.yml new file mode 100644 index 000000000..5aecc679e --- /dev/null +++ b/process-compose/suzuka-full-node/process-compose.test-telemetry.yml @@ -0,0 +1,35 @@ +version: "3" + +environment: + +processes: + + test-telemetry: + command: | + M1_DA_LIGHT_NODE_TRACES_JSON=$(curl http://localhost:16686/api/traces?service=m1-da-light-node) + # expect the total field to be greater than 0 + if [ $(echo $M1_DA_LIGHT_NODE_TRACES_JSON | jq '.total') -gt 0 ]; then + echo "M1-DA-Light-Node traces found" + else + echo "M1-DA-Light-Node traces not found + exit 1 + fi + + SUZUKA_TRACES_JSON=$(curl http://localhost:16686/api/traces?service=suzuka-full-node) + # expect the total field to be greater than 0 + if [ $(echo $SUZUKA_TRACES_JSON | jq '.total') -gt 0 ]; then + echo "Suzuka traces found" + else + echo "Suzuka traces not found + exit 1 + fi + + depends_on: + otlp-collector: + condition: process_healthy + suzuka-full-node: + condition: process_healthy + suzuka-faucet: + condition: process_healthy + availability: + exit_on_end: true diff --git a/protocol-units/da/m1/light-node/src/main.rs b/protocol-units/da/m1/light-node/src/main.rs index 4d8037ca8..345a2d3a7 100644 --- a/protocol-units/da/m1/light-node/src/main.rs +++ b/protocol-units/da/m1/light-node/src/main.rs @@ -1,15 +1,14 @@ use k256::Secp256k1; use m1_da_light_node::v1::{LightNodeV1, Manager}; -use std::env; - -const TIMING_LOG_ENV: &str = "M1_DA_LIGHT_NODE_TIMING_LOG"; - #[tokio::main] async fn main() -> Result<(), Box> { - let tracing_config = - movement_tracing::Config { timing_log_path: env::var_os(TIMING_LOG_ENV).map(Into::into) }; - let _guard = movement_tracing::init_tracing_subscriber(tracing_config); + let tracing_config = movement_tracing::Config::from_env()?; + let _guard = movement_tracing::init_tracing_subscriber( + env!("CARGO_BIN_NAME"), + env!("CARGO_PKG_VERSION"), + &tracing_config, + )?; let dot_movement = dot_movement::DotMovement::try_from_env()?; let config_path = dot_movement.get_config_json_path(); diff --git a/protocol-units/da/m1/light-node/src/v1/sequencer.rs b/protocol-units/da/m1/light-node/src/v1/sequencer.rs index 6b97b4e44..70c541a84 100644 --- a/protocol-units/da/m1/light-node/src/v1/sequencer.rs +++ b/protocol-units/da/m1/light-node/src/v1/sequencer.rs @@ -15,7 +15,7 @@ use std::boxed::Box; use std::fmt::Debug; use std::path::PathBuf; use std::pin::Pin; -use std::sync::{atomic::AtomicU64, Arc}; +use std::sync::Arc; use std::time::Duration; use tokio::{ @@ -39,8 +39,6 @@ use movement_types::block::Block; use crate::v1::{passthrough::LightNodeV1 as LightNodeV1PassThrough, LightNodeV1Operations}; -const LOGGING_UID: AtomicU64 = AtomicU64::new(0); - #[derive(Clone)] pub struct LightNodeV1 where @@ -114,23 +112,24 @@ where AffinePoint: FromEncodedPoint + ToEncodedPoint + VerifyPrimitive, FieldBytesSize: ModulusSize, { + #[tracing::instrument(target = "movement_telemetry", name = "build_block")] async fn tick_build_blocks(&self, sender: Sender) -> Result<(), anyhow::Error> { let memseq = self.memseq.clone(); - // this has an internal timeout based on its building time - // so in the worst case scenario we will roughly double the internal timeout - let uid = LOGGING_UID.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - debug!(target: "movement_timing", uid = %uid, "waiting_for_next_block",); let block = memseq.wait_for_next_block().await?; match block { Some(block) => { - info!(target: "movement_timing", block_id = %block.id(), uid = %uid, transaction_count = block.transactions().len(), "received_block"); + info!( + target: "movement_telemetry", + block_id = %block.id(), + transaction_count = block.transactions().len(), + "received_block", + ); sender.send(block).await?; Ok(()) } None => { - // no transactions to include - debug!(target: "movement_timing", uid = %uid, "no_transactions_to_include"); + debug!("no transactions to include"); Ok(()) } } @@ -138,7 +137,7 @@ where async fn submit_blocks(&self, blocks: &Vec) -> Result<(), anyhow::Error> { for block in blocks { - info!(target: "movement_timing", block_id = %block.block.id(), "inner_submitting_block"); + info!(target: "movement_telemetry", block_id = %block.block.id(), "inner_submitting_block"); } // get references to celestia blobs in the wrapped blocks let block_blobs = blocks @@ -149,16 +148,12 @@ where // use deref on the wrapped block to get the blob self.pass_through.submit_celestia_blobs(&block_blobs).await?; for block in blocks { - info!(target: "movement_timing", block_id = %block.block.id(), "inner_submitted_block"); + info!(target: "movement_telemetry", block_id = %block.block.id(), "inner_submitted_block"); } Ok(()) } - pub async fn submit_with_heuristic(&self, blocks: Vec) -> Result<(), anyhow::Error> { - for block in &blocks { - info!(target: "movement_timing", block_id = %block.id(), "submitting_block"); - } - + async fn submit_with_heuristic(&self, blocks: Vec) -> Result<(), anyhow::Error> { // wrap the blocks in a struct that can be split and compressed // spawn blocking because the compression is blocking and could be slow let pass_through = self.pass_through.clone(); @@ -211,9 +206,8 @@ where ) .await?; - info!("block group results: {:?}", block_group_results); for block_group_result in &block_group_results { - info!(target: "movement_timing", block_group_result = ?block_group_result, "block_group_result"); + info!(target: "movement_telemetry", block_group_result = ?block_group_result, "block_group_result"); } Ok(()) @@ -243,27 +237,24 @@ where } Ok(None) => { // The channel was closed - info!("sender dropped"); + debug!("sender dropped"); break; } Err(_) => { // The operation timed out - debug!( - target: "movement_timing", - batch_size = blocks.len(), - "timed_out_building_block" - ); + debug!(batch_size = blocks.len(), "timed_out_building_block"); break; } } } - info!(target: "movement_timing", block_count = blocks.len(), "read_blocks"); + info!(target: "movement_telemetry", block_count = blocks.len(), "read_blocks"); Ok(blocks) } /// Ticks the block proposer to build blocks and submit them + #[tracing::instrument(target = "movement_telemetry", name = "publish_blobs")] async fn tick_publish_blobs( &self, receiver: &mut Receiver, @@ -277,11 +268,11 @@ where // submit the blobs, resizing as needed for block_id in &ids { - info!(target: "movement_timing", %block_id, "submitting_block_batch"); + info!(target: "movement_telemetry", %block_id, "submitting_block_batch"); } self.submit_with_heuristic(blocks).await?; for block_id in &ids { - info!(target: "movement_timing", %block_id, "submitted_block_batch"); + info!(target: "movement_telemetry", %block_id, "submitted_block_batch"); } Ok(()) diff --git a/protocol-units/execution/opt-executor/Cargo.toml b/protocol-units/execution/opt-executor/Cargo.toml index e084c33a2..54c1bb0ab 100644 --- a/protocol-units/execution/opt-executor/Cargo.toml +++ b/protocol-units/execution/opt-executor/Cargo.toml @@ -59,15 +59,14 @@ aptos-mempool = { workspace = true } aptos-temppath = { workspace = true } aptos-faucet-core = { workspace = true } aptos-cached-packages = { workspace = true } -maptos-execution-util = { workspace = true } -movement-types = { workspace = true } aptos-indexer-grpc-fullnode = { workspace = true } aptos-indexer-grpc-table-info = { workspace = true } aptos-indexer = { workspace = true } aptos-protos = { workspace = true } aptos-logger = { workspace = true } -tonic = { workspace = true } +maptos-execution-util = { workspace = true } movement-rest = { workspace = true } +movement-types = { workspace = true } [dev-dependencies] dirs = { workspace = true } diff --git a/protocol-units/execution/opt-executor/src/executor/mod.rs b/protocol-units/execution/opt-executor/src/executor/mod.rs index 05dae3bfe..939e322fd 100644 --- a/protocol-units/execution/opt-executor/src/executor/mod.rs +++ b/protocol-units/execution/opt-executor/src/executor/mod.rs @@ -45,12 +45,6 @@ impl Executor { // a semaphore might be better here as this will rerun until the value does not change during the operation self.transactions_in_flight .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| { - info!( - target: "movement_timing", - count, - current, - "decrementing_transactions_in_flight", - ); Some(current.saturating_sub(count)) }) .unwrap_or_else(|_| 0); diff --git a/protocol-units/execution/opt-executor/src/transaction_pipe.rs b/protocol-units/execution/opt-executor/src/transaction_pipe.rs index 5f66137fc..71858e07f 100644 --- a/protocol-units/execution/opt-executor/src/transaction_pipe.rs +++ b/protocol-units/execution/opt-executor/src/transaction_pipe.rs @@ -17,7 +17,7 @@ use std::sync::{atomic::AtomicU64, Arc}; use std::time::{Duration, Instant}; use thiserror::Error; use tokio::sync::mpsc; -use tracing::{debug, info, info_span, warn, Instrument}; +use tracing::{debug, info, info_span, Instrument}; const GC_INTERVAL: Duration = Duration::from_secs(30); const TOO_NEW_TOLERANCE: u64 = 32; @@ -102,8 +102,11 @@ impl TransactionPipe { if let Some(request) = next { match request { MempoolClientRequest::SubmitTransaction(transaction, callback) => { + // Instrumentation for aggregated metrics: + // Transactions per second: https://github.com/movementlabsxyz/movement/discussions/422 + // Transaction latency: https://github.com/movementlabsxyz/movement/discussions/423 let span = info_span!( - target: "movement_timing", + target: "movement_telemetry", "submit_transaction", tx_hash = %transaction.committed_hash(), sender = %transaction.sender(), @@ -198,13 +201,17 @@ impl TransactionPipe { // For now, we are going to consider a transaction in flight until it exits the mempool and is sent to the DA as is indicated by WriteBatch. let in_flight = self.transactions_in_flight.load(std::sync::atomic::Ordering::Relaxed); info!( - target: "movement_timing", + target: "movement_telemetry", in_flight = %in_flight, "transactions_in_flight" ); if in_flight > self.in_flight_limit { + // Instrumentation for aggregated metrics: + // Transaction failure rate: https://github.com/movementlabsxyz/movement/discussions/428 + // The arguments for identifying the transaction are present on the current + // "submit_transaction" span. info!( - target: "movement_timing", + target: "movement_telemetry", "shedding_load" ); let status = MempoolStatus::new(MempoolStatusCode::MempoolIsFull); @@ -229,6 +236,16 @@ impl TransactionPipe { let sequence_number = match self.has_invalid_sequence_number(&transaction)? { SequenceNumberValidity::Valid(sequence_number) => sequence_number, SequenceNumberValidity::Invalid(status) => { + // Instrumentation for aggregated metrics: + // Transaction failure rate: https://github.com/movementlabsxyz/movement/discussions/428 + // The arguments for identifying the transaction are present on the current + // "submit_transaction" span. + info!( + target: "movement_telemetry", + status = %status.0, + code = ?status.1, + "sequence_number_invalid", + ); return Ok(status); } }; @@ -268,7 +285,11 @@ impl TransactionPipe { ); } _ => { - warn!("Transaction not accepted: {:?}", status); + // Instrumentation for aggregated metrics: + // Transaction failure rate: https://github.com/movementlabsxyz/movement/discussions/428 + // The arguments for identifying the transaction are present on the current + // "submit_transaction" span. + info!(target: "movement_telemetry", %status, "rejected_by_mempool"); } } diff --git a/util/tracing/Cargo.toml b/util/tracing/Cargo.toml index eaf030410..eceb5d5a7 100644 --- a/util/tracing/Cargo.toml +++ b/util/tracing/Cargo.toml @@ -11,8 +11,15 @@ publish.workspace = true rust-version.workspace = true [dependencies] -tracing-appender = { workspace = true } -tracing-subscriber = { workspace = true, features = ["json"] } +anyhow = { workspace = true } +opentelemetry = { workspace = true } +opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] } +opentelemetry-otlp = { workspace = true } +opentelemetry-semantic-conventions = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +tracing-opentelemetry = { workspace = true } +tracing-subscriber = { workspace = true } #console-subscriber = { workspace = true } [lints] diff --git a/util/tracing/src/config.rs b/util/tracing/src/config.rs new file mode 100644 index 000000000..561bcb991 --- /dev/null +++ b/util/tracing/src/config.rs @@ -0,0 +1,29 @@ +use anyhow::anyhow; +use std::env; + +const OTLP_TRACING_ENV: &str = "MOVEMENT_OTLP"; + +/// Options for tracing configuration. +#[derive(Debug, Default)] +pub struct Config { + /// URL of the OpenTelemetry collector endpoint using the OTLP gRPC protocol. + /// If the value is `None`, telemetry is not exported. + pub otlp_grpc_url: Option, +} + +impl Config { + /// Get the tracing configuration from well-known environment variables. + pub fn from_env() -> Result { + let otlp_grpc_url = match env::var(OTLP_TRACING_ENV) { + Ok(url) => Some(url), + Err(env::VarError::NotPresent) => None, + Err(env::VarError::NotUnicode(s)) => { + return Err(anyhow!( + "value of environment variable {OTLP_TRACING_ENV} is not valid UTF-8: {}", + s.to_string_lossy() + )); + } + }; + Ok(Self { otlp_grpc_url }) + } +} diff --git a/util/tracing/src/lib.rs b/util/tracing/src/lib.rs index f018f7e20..91e84b91b 100644 --- a/util/tracing/src/lib.rs +++ b/util/tracing/src/lib.rs @@ -1,78 +1,14 @@ -use tracing_appender::non_blocking::WorkerGuard as AppenderGuard; -use tracing_subscriber::filter::{self, EnvFilter, LevelFilter}; -use tracing_subscriber::fmt::format::FmtSpan; -use tracing_subscriber::prelude::*; - -use std::{env, fs::File, path::PathBuf}; - -const TIMING_ENV: &str = "MOVEMENT_TIMING"; - -/// The default path name for the timing log file. -/// If the path not specified in [`Config`] and the `MOVEMENT_TIMING` -/// environment variable is set, the log file with this name will be created. -pub const DEFAULT_TIMING_LOG_FILE: &str = "movement-timing.log"; - -/// A guard for background log appender(s) returned by `init_tracing_subscriber`. -pub struct WorkerGuard { - _drop_me: Option, -} - -/// Options for the tracing subscriber. -#[derive(Default)] -pub struct Config { - /// Custom name for the timing log file. - pub timing_log_path: Option, -} - -/// Sets up the tracing subscribers for a Movement process. This should be -/// called at the beginning of a process' `main` function. -/// Returns a guard object that should be dropped at the end of the process' -/// `main`` function scope. -/// -/// This function may output encounted errors to the standard error stream, -/// as this is the only facility -pub fn init_tracing_subscriber(config: Config) -> WorkerGuard { - // TODO: compose console_subscriber as a layer - let env_filter = EnvFilter::builder() - .with_default_directive(LevelFilter::INFO.into()) - .from_env_lossy(); - let log_layer = tracing_subscriber::fmt::layer().with_filter(env_filter); - - let (timing_layer, timing_writer_guard) = match env::var(TIMING_ENV) { - Err(env::VarError::NotPresent) => { - // Disable timing - (None, None) - } - Ok(timing_directives) => { - let env_filter = EnvFilter::new(timing_directives); - let timing_log_path = config - .timing_log_path - .as_deref() - .unwrap_or_else(|| DEFAULT_TIMING_LOG_FILE.as_ref()); - match File::create(timing_log_path) { - Ok(file) => { - let (writer, guard) = tracing_appender::non_blocking(file); - let layer = tracing_subscriber::fmt::layer() - .with_writer(writer) - .json() - .with_span_events(FmtSpan::CLOSE) - .with_filter(env_filter) - .with_filter(filter::filter_fn(|meta| meta.target() == "movement_timing")); - (Some(layer), Some(guard)) - } - Err(e) => { - eprintln!("can't create `{}`: {}", timing_log_path.display(), e); - (None, None) - } - } - } - Err(e) => { - eprintln!("invalid {TIMING_ENV}: {e}"); - (None, None) - } - }; - - tracing_subscriber::registry().with(log_layer).with(timing_layer).init(); - - WorkerGuard { _drop_me: timing_writer_guard } -} +//! Tracing setup for Movement services. +//! +//! Exporting of tracing data via [OpenTelemetry] is optionally supported +//! by setting "movement_telemetry" as the target in tracing spans and events. +//! +//! [OpenTelemetry]: https://opentelemetry.io/ + +mod config; +mod telemetry; +mod tracing; + +pub use config::Config; +pub use telemetry::ScopeGuard; +pub use tracing::init_tracing_subscriber; diff --git a/util/tracing/src/telemetry.rs b/util/tracing/src/telemetry.rs new file mode 100644 index 000000000..ee7cd9496 --- /dev/null +++ b/util/tracing/src/telemetry.rs @@ -0,0 +1,85 @@ +//! OpenTelemetry support for Movement services. +//! +//! Telemetry is currently being exported to components as an API distinct +//! from the tracing framework, due to [issues][tracing-opentelemetry#159] +//! with integrating OpenTelemetry as a tracing subscriber. +//! +//! [tracing-opentelemetry#159]: https://github.com/tokio-rs/tracing-opentelemetry/issues/159 + +use crate::Config; + +use opentelemetry::{trace::TracerProvider as _, KeyValue}; +use opentelemetry_otlp::WithExportConfig as _; +use opentelemetry_sdk::trace::{Config as TraceConfig, TracerProvider}; +use opentelemetry_sdk::{runtime::Tokio as TokioRuntimeSelector, Resource}; +use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION}; +use tokio::runtime; +use tracing::{error, Level, Subscriber}; +use tracing_opentelemetry::OpenTelemetryLayer; +use tracing_subscriber::filter; +use tracing_subscriber::prelude::*; +use tracing_subscriber::registry::LookupSpan; + +/// The scope guard object for the background tasks of the tracing subsystem. +/// +/// This object needs to be kept alive for the duration of the program. +#[must_use = "should be dropped at the end of the program scope"] +#[derive(Debug)] +pub struct ScopeGuard(Option); + +impl Drop for ScopeGuard { + fn drop(&mut self) { + fn shutdown_provider(tracer_provider: &TracerProvider) { + if let Err(e) = tracer_provider.shutdown() { + error!("OpenTelemetry tracer provider shutdown failed: {e}"); + } + } + + if let Some(tracer_provider) = self.0.take() { + // Make sure all batched traces are exported. + if let Ok(handle) = runtime::Handle::try_current() { + // Can't call shutdown in async context due to + // https://github.com/open-telemetry/opentelemetry-rust/issues/2047#issuecomment-2416480148 + handle.spawn_blocking(move || { + shutdown_provider(&tracer_provider); + }); + } else { + shutdown_provider(&tracer_provider); + } + } + } +} + +/// Adds an optional OpenTelemetry tracing layer to the provided subscriber. +/// +/// This function should be called at the start of the program before any +/// threads are able to use OpenTelemetry tracers. The function will panic +/// if not called within a Tokio runtime. +pub(crate) fn init_tracing_layer( + subscriber: S, + service_name: &'static str, + service_version: &'static str, + config: &Config, +) -> Result<(ScopeGuard, impl Subscriber), anyhow::Error> +where + S: Subscriber, + for<'span> S: LookupSpan<'span>, +{ + let (tracer_provider, layer) = if let Some(endpoint) = &config.otlp_grpc_url { + let exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint(endpoint); + let provider = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter(exporter) + .with_trace_config(TraceConfig::default().with_resource(Resource::new([ + KeyValue::new(SERVICE_NAME, service_name), + KeyValue::new(SERVICE_VERSION, service_version), + ]))) + .install_batch(TokioRuntimeSelector)?; + let layer = OpenTelemetryLayer::new(provider.tracer("movement")) + .with_filter(filter::Targets::new().with_target("movement_telemetry", Level::INFO)); + (Some(provider), Some(layer)) + } else { + (None, None) + }; + Ok((ScopeGuard(tracer_provider), subscriber.with(layer))) +} diff --git a/util/tracing/src/tracing.rs b/util/tracing/src/tracing.rs new file mode 100644 index 000000000..735f930dd --- /dev/null +++ b/util/tracing/src/tracing.rs @@ -0,0 +1,29 @@ +use crate::telemetry::{self, ScopeGuard}; +use crate::Config; +use tracing_subscriber::filter::{EnvFilter, LevelFilter}; +use tracing_subscriber::prelude::*; +use tracing_subscriber::{fmt, Registry}; + +/// Sets up the tracing subscribers for a Movement process. This should be +/// called at the beginning of a process' `main` function. +/// +/// If successful, returns a guard object that should be dropped at the end +/// of the process' `main` function scope. The guard keeps the background task +/// running to export telemetry (if configured) and makes sure the exporter is +/// properly shut down, flushing buffered export data, before the process exits. +pub fn init_tracing_subscriber( + service_name: &'static str, + service_version: &'static str, + config: &Config, +) -> Result { + // TODO: compose console_subscriber as a layer + let env_filter = EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(); + let fmt_layer = fmt::layer().with_filter(env_filter); + let subscriber = Registry::default().with(fmt_layer); + let (scope_guard, subscriber) = + telemetry::init_tracing_layer(subscriber, service_name, service_version, config)?; + subscriber.init(); + Ok(scope_guard) +}