Skip to content

Commit

Permalink
Merge pull request #892 from freenet/184207213-more-observability-tools
Browse files Browse the repository at this point in the history
184207213 - Improve observability tools
  • Loading branch information
iduartgomez authored Nov 13, 2023
2 parents 19c4322 + c28659e commit 3c51d0d
Show file tree
Hide file tree
Showing 44 changed files with 1,236 additions and 620 deletions.
86 changes: 50 additions & 36 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 8 additions & 5 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ libp2p = { version = "0.52.3", features = [
libp2p-identity = { version = "0.2.7", features = ["ed25519", "rand"]}
once_cell = "1"
parking_lot = "0.12.0"
rand = { workspace = true }
rand = { workspace = true, features = ["small_rng"] }
serde = { workspace = true, features = ["rc", "derive"] }
serde_json = { workspace = true }
serde_with = { workspace = true }
Expand All @@ -74,9 +74,9 @@ xz2 = { version = "0.1" }

# Tracing deps
tracing = { version = "0.1" }
opentelemetry = { version = "0.20.0", default-features = false, features = ["rt-tokio", "trace"], optional = true }
opentelemetry-jaeger = { version = "0.19.0", features = ["rt-tokio","collector_client", "isahc"], optional = true }
tracing-opentelemetry = { version = "0.21.0", optional = true }
opentelemetry = "0.21.0"
opentelemetry-jaeger = { version = "0.20.0", features = ["rt-tokio","collector_client", "isahc"], optional = true }
tracing-opentelemetry = { version = "0.22.0", optional = true }
tracing-subscriber = { version = "0.3.16", optional = true }

# internal deps
Expand All @@ -97,6 +97,9 @@ default = ["trace", "websocket", "sqlite"]
rocks_db = ["rocksdb"]
sqlite = ["sqlx"]
websocket = ["axum/ws"]
trace = ["opentelemetry", "opentelemetry-jaeger", "tracing-opentelemetry", "tracing-subscriber"]
local-mode = []
network-mode = []

# trace features
trace = ["tracing-subscriber"]
trace-ot = ["trace", "opentelemetry-jaeger", "tracing-opentelemetry"]
2 changes: 1 addition & 1 deletion crates/core/src/client_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub(crate) mod combinator;
#[cfg(feature = "websocket")]
pub(crate) mod websocket;

pub(crate) type BoxedClient = Box<dyn ClientEventsProxy + Send + Sync + 'static>;
pub(crate) type BoxedClient = Box<dyn ClientEventsProxy + Send + 'static>;
pub type HostResult = Result<HostResponse, ClientError>;

#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
Expand Down
108 changes: 80 additions & 28 deletions crates/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub struct Config {
pub bootstrap_ip: IpAddr,
pub bootstrap_port: u16,
pub bootstrap_id: Option<PeerId>,
pub local_peer_keypair: Option<identity::Keypair>,
pub local_peer_keypair: identity::Keypair,
pub log_level: tracing::log::LevelFilter,
config_paths: ConfigPaths,
local_mode: AtomicBool,
Expand Down Expand Up @@ -157,6 +157,17 @@ impl Config {
.store(local_mode, std::sync::atomic::Ordering::SeqCst);
}

fn node_mode() -> OperationMode {
if Self::conf()
.local_mode
.load(std::sync::atomic::Ordering::SeqCst)
{
OperationMode::Local
} else {
OperationMode::Network
}
}

pub fn db_dir(&self) -> PathBuf {
if self.local_mode.load(std::sync::atomic::Ordering::SeqCst) {
self.config_paths.db_dir.join("local")
Expand Down Expand Up @@ -242,13 +253,14 @@ impl Config {
let (bootstrap_ip, bootstrap_port, bootstrap_id) = Config::get_bootstrap_host(&settings)?;
let config_paths = ConfigPaths::new()?;

let local_mode = settings.get_string("local_mode").is_ok();
let local_mode = settings.get_string("network_mode").is_err();

Ok(Config {
bootstrap_ip,
bootstrap_port,
bootstrap_id,
local_peer_keypair,
local_peer_keypair: local_peer_keypair
.unwrap_or_else(identity::Keypair::generate_ed25519),
log_level,
config_paths,
local_mode: AtomicBool::new(local_mode),
Expand Down Expand Up @@ -342,41 +354,81 @@ pub fn set_logger() {
return;
}

let filter = if cfg!(any(test, debug_assertions)) {
tracing_subscriber::filter::LevelFilter::DEBUG.into()
} else {
tracing_subscriber::filter::LevelFilter::INFO.into()
};

let sub = tracing_subscriber::fmt().with_level(true).with_env_filter(
tracing_subscriber::EnvFilter::builder()
.with_default_directive(filter)
.from_env_lossy()
.add_directive("stretto=off".parse().unwrap())
.add_directive("sqlx=error".parse().unwrap()),
);

if cfg!(any(test, debug_assertions)) {
sub.with_file(true).with_line_number(true).init();
} else {
sub.init();
}
tracer::init_tracer().expect("failed tracing initialization")
}
}

#[cfg(feature = "trace")]
pub(super) mod tracer {
use tracing::Subscriber;
use tracing_subscriber::{fmt, layer::Layered, Layer, Registry};

use crate::DynError;

use super::*;

pub fn init_tracer() -> Result<(), opentelemetry::trace::TraceError> {
use opentelemetry::{global, sdk::propagation::TraceContextPropagator};
pub fn init_tracer() -> Result<(), DynError> {
let filter = if cfg!(any(test, debug_assertions)) {
tracing_subscriber::filter::LevelFilter::DEBUG
} else {
tracing_subscriber::filter::LevelFilter::INFO
};
let filter_layer = tracing_subscriber::EnvFilter::builder()
.with_default_directive(filter.into())
.from_env_lossy()
.add_directive("stretto=off".parse().unwrap())
.add_directive("sqlx=error".parse().unwrap());

// use opentelemetry_sdk::propagation::TraceContextPropagator;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::Registry;

let tracer = opentelemetry_jaeger::new_agent_pipeline().install_simple()?;
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
let subscriber = Registry::default().with(telemetry);
global::set_text_map_propagator(TraceContextPropagator::new());
let disabled_logs = std::env::var("FREENET_DISABLE_LOGS").is_ok();
let layers = {
let fmt_layer = tracing_subscriber::fmt::layer().with_level(true);
let fmt_layer = if cfg!(any(test, debug_assertions)) {
fmt_layer.with_file(true).with_line_number(true)
} else {
fmt_layer
};
#[cfg(feature = "trace-ot")]
{
let identifier = if matches!(Config::node_mode(), OperationMode::Local) {
"freenet-core".to_string()
} else {
format!(
"freenet-core-{peer}",
peer = Config::conf().local_peer_keypair.public().to_peer_id()
)
};
let tracing_ot_layer = {
// Connect the Jaeger OT tracer with the tracing middleware
let ot_jaeger_tracer =
opentelemetry_jaeger::config::agent::AgentPipeline::default()
.with_service_name(identifier)
.install_simple()?;
// Get a tracer which will route OT spans to a Jaeger agent
tracing_opentelemetry::layer().with_tracer(ot_jaeger_tracer)
};
if !disabled_logs {
fmt_layer.and_then(tracing_ot_layer).boxed()
} else {
tracing_ot_layer.boxed()
}
}
#[cfg(not(feature = "trace-ot"))]
{
if disabled_logs {
return Ok(());
}
fmt_layer.boxed()
}
};
let filtered = layers.with_filter(filter_layer);
// Create a subscriber which includes the tracing Jaeger OT layer and a fmt layer
let subscriber = Registry::default().with(filtered);

// Set the global subscriber
tracing::subscriber::set_global_default(subscriber).expect("Error setting subscriber");
Ok(())
}
Expand Down
Loading

0 comments on commit 3c51d0d

Please sign in to comment.