Skip to content

Commit

Permalink
Merge pull request #902 from iduartgomez/186464436-expose-metrics
Browse files Browse the repository at this point in the history
186464436 - Create monitoring UI tool for observing network behaviour
  • Loading branch information
iduartgomez authored Dec 2, 2023
2 parents cf8a433 + 3596cd2 commit e6f3ad6
Show file tree
Hide file tree
Showing 99 changed files with 9,470 additions and 1,622 deletions.
818 changes: 469 additions & 349 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ members = ["crates/*"]

[workspace.dependencies]
arrayvec = { version = "0.7", features = ["serde"] }
axum = { version = "0.7", default-features = false }
blake3 = { version = "1", features = ["std", "traits-preview"] }
bs58 = "0.5"
chacha20poly1305 = "0.10"
chrono = { version = "0.4", default-features = true }
clap = "4"
crossbeam = "0.8.2"
ctrlc = { version = "3.4" }
dashmap = "^5.5"
either = "1.8"
fastrand = "2"
futures = "0.3"
Expand Down
13 changes: 8 additions & 5 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ path = "src/bin/freenet.rs"
anyhow = "1"
asynchronous-codec = "0.6"
async-trait = "0.1"
axum = { default-features = false, features = ["headers", "http1", "matched-path", "query", "tower-log", "ws"], version = "0.6" }
axum = { default-features = false, features = ["http1", "matched-path", "query", "tower-log", "ws"], workspace = true }
bincode = "1"
blake3 = { workspace = true }
bs58 = "0.5"
Expand All @@ -28,14 +28,16 @@ chrono = { workspace = true }
clap = { features = ["derive", "env"], workspace = true }
config = { features = ["toml"], version = "0.13.0" }
cookie = "0.17"
crossbeam = "0.8.2"
crossbeam = { workspace = true }
ctrlc = { features = ["termination"], workspace = true }
dashmap = "^5.5"
dashmap = { workspace = true }
delegate = "0.10"
directories = "5"
either = { features = ["serde"], workspace = true }
fastrand = { workspace = true }
flatbuffers = "23.5.26"
futures = "0.3.21"
headers = "0.4"
itertools = "0.11"
libp2p = { default-features = false, features = ["autonat", "dns", "ed25519", "identify", "macros", "noise", "ping", "tcp", "tokio", "yamux"], version = "0.52.3" }
libp2p-identity = { features = ["ed25519", "rand"], version = "0.2.7" }
Expand All @@ -54,8 +56,9 @@ stretto = { features = ["async", "sync"], version = "0.8" }
tar = { version = "0.4.38" }
thiserror = "1"
tokio = { features = ["fs", "macros", "rt-multi-thread", "sync", "process"], version = "1" }
tower-http = { features = ["fs", "trace"], version = "0.4" }
ulid = { features = ["serde"], version = "0.4" }
tokio-tungstenite = "0.20"
tower-http = { features = ["fs", "trace"], version = "0.5" }
ulid = { features = ["serde"], version = "1.1" }
unsigned-varint = "0.7"
wasmer = { features = ["sys"], workspace = true }
xz2 = { version = "0.1" }
Expand Down
16 changes: 16 additions & 0 deletions crates/core/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use std::process::Command;

fn main() {
let status = Command::new("flatc")
.arg("--rust")
.arg("-o")
.arg("src/generated")
.arg("../../schemas/flatbuffers/topology.fbs")
.status();
if let Err(err) = status {
println!("failed compiling flatbuffers schema: {err}");
println!("refer to https://github.com/google/flatbuffers to install the flatc compiler");
} else {
let _ = Command::new("cargo").arg("fmt").status();
}
}
81 changes: 42 additions & 39 deletions crates/core/src/client_events/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use axum::{
ws::{Message, WebSocket},
Query, WebSocketUpgrade,
},
http::StatusCode,
response::{IntoResponse, Response},
routing::get,
Extension, Router,
Expand All @@ -18,6 +19,7 @@ use freenet_stdlib::{
prelude::*,
};
use futures::{future::BoxFuture, stream::SplitSink, FutureExt, SinkExt, StreamExt};
use headers::Header;
use serde::Deserialize;
use tokio::sync::{mpsc, Mutex};

Expand Down Expand Up @@ -116,13 +118,13 @@ impl WebSocketProxy {

struct EncodingProtocolExt(EncodingProtocol);

impl axum::headers::Header for EncodingProtocolExt {
impl headers::Header for EncodingProtocolExt {
fn name() -> &'static axum::http::HeaderName {
static HEADER: OnceLock<axum::http::HeaderName> = OnceLock::new();
HEADER.get_or_init(|| axum::http::HeaderName::from_static("encoding-protocol"))
}

fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error>
where
Self: Sized,
I: Iterator<Item = &'i axum::http::HeaderValue>,
Expand All @@ -134,7 +136,7 @@ impl axum::headers::Header for EncodingProtocolExt {
"flatbuffers" => Some(EncodingProtocolExt(EncodingProtocol::Flatbuffers)),
_ => None,
})
.ok_or_else(axum::headers::Error::invalid)
.ok_or_else(headers::Error::invalid)
}

fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
Expand All @@ -153,53 +155,54 @@ struct ConnectionInfo {
encoding_protocol: Option<EncodingProtocol>,
}

async fn connection_info<B>(
encoding_protoc: Result<
axum::TypedHeader<EncodingProtocolExt>,
axum::extract::rejection::TypedHeaderRejection,
>,
auth_token: Result<
axum::TypedHeader<axum::headers::Authorization<axum::headers::authorization::Bearer>>,
axum::extract::rejection::TypedHeaderRejection,
>,
async fn connection_info(
Query(ConnectionInfo {
auth_token: auth_token_q,
encoding_protocol,
}): Query<ConnectionInfo>,
mut req: axum::http::Request<B>,
next: axum::middleware::Next<B>,
mut req: axum::extract::Request,
next: axum::middleware::Next,
) -> Response {
use headers::{
authorization::{Authorization, Bearer},
HeaderMapExt,
};
// tracing::info!(
// "headers: {:?}",
// req.headers()
// .iter()
// .flat_map(|(k, v)| v.to_str().ok().map(|v| format!("{k}: {v}")))
// .collect::<Vec<_>>()
// );
let encoding_protoc = match encoding_protoc {
Ok(protoc) => protoc.0 .0,
Err(err)
if matches!(
err.reason(),
axum::extract::rejection::TypedHeaderRejectionReason::Missing
) =>
{
encoding_protocol.unwrap_or(EncodingProtocol::Flatbuffers)

let encoding_protoc = match req.headers().typed_try_get::<EncodingProtocolExt>() {
Ok(Some(protoc)) => protoc.0,
Ok(None) => encoding_protocol.unwrap_or(EncodingProtocol::Flatbuffers),
Err(_error) => {
return (
StatusCode::BAD_REQUEST,
format!(
"Incorrect `{header}` header specification",
header = EncodingProtocolExt::name()
),
)
.into_response()
}
Err(other) => return other.into_response(),
};

let auth_token = match auth_token {
Ok(auth_token) => Some(AuthToken::from(auth_token.token().to_owned())),
Err(err)
if matches!(
err.reason(),
axum::extract::rejection::TypedHeaderRejectionReason::Missing
) =>
{
auth_token_q
let auth_token = match req.headers().typed_try_get::<Authorization<Bearer>>() {
Ok(Some(value)) => Some(AuthToken::from(value.token().to_owned())),
Ok(None) => auth_token_q,
Err(_error) => {
return (
StatusCode::BAD_REQUEST,
format!(
"Incorrect Bearer `{header}` header specification",
header = Authorization::<Bearer>::name()
),
)
.into_response()
}
Err(other) => return other.into_response(),
};

tracing::debug!(
Expand Down Expand Up @@ -234,13 +237,13 @@ async fn websocket_interface(
) -> Result<(), DynError> {
let (mut response_rx, client_id) = new_client_connection(&request_sender).await?;
let (mut tx, mut rx) = ws.split();
let listeners: Arc<Mutex<VecDeque<(_, mpsc::UnboundedReceiver<HostResult>)>>> =
let contract_updates: Arc<Mutex<VecDeque<(_, mpsc::UnboundedReceiver<HostResult>)>>> =
Arc::new(Mutex::new(VecDeque::new()));
loop {
let active_listeners = listeners.clone();
let contract_updates_cp = contract_updates.clone();
let listeners_task = async move {
loop {
let mut lock = active_listeners.lock().await;
let mut lock = contract_updates_cp.lock().await;
let active_listeners = &mut *lock;
for _ in 0..active_listeners.len() {
if let Some((key, mut listener)) = active_listeners.pop_front() {
Expand Down Expand Up @@ -287,7 +290,7 @@ async fn websocket_interface(

tokio::select! { biased;
msg = async { process_host_response(response_rx.recv().await, client_id, encoding_protoc, &mut tx).await } => {
let active_listeners = listeners.clone();
let active_listeners = contract_updates.clone();
if let Some(NewSubscription { key, callback }) = msg? {
tracing::debug!(cli_id = %client_id, contract = %key, "added new notification listener");
let active_listeners = &mut *active_listeners.lock().await;
Expand Down Expand Up @@ -357,7 +360,7 @@ async fn process_client_request(
Ok(Message::Binary(data)) => data,
Ok(Message::Text(data)) => data.into_bytes(),
Ok(Message::Close(_)) => return Err(None),
Ok(Message::Ping(_)) => return Ok(Some(Message::Pong(vec![0, 3, 2]))),
Ok(Message::Ping(ping)) => return Ok(Some(Message::Pong(ping))),
Ok(m) => {
tracing::debug!(msg = ?m, "received random message");
return Ok(None);
Expand Down
85 changes: 3 additions & 82 deletions crates/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl Config {
}

fn load_conf() -> std::io::Result<Config> {
let settings = config::Config::builder()
let settings: config::Config = config::Config::builder()
.add_source(config::Environment::with_prefix("FREENET"))
.build()
.unwrap();
Expand Down Expand Up @@ -359,93 +359,14 @@ pub fn set_logger() {
.compare_exchange(
false,
true,
std::sync::atomic::Ordering::Acquire,
std::sync::atomic::Ordering::Release,
std::sync::atomic::Ordering::SeqCst,
)
.is_err()
{
return;
}

tracer::init_tracer().expect("failed tracing initialization")
}
}

#[cfg(feature = "trace")]
mod tracer {
use tracing_subscriber::{Layer, Registry};

use crate::DynError;

pub fn init_tracer() -> Result<(), DynError> {
let default_filter = if cfg!(any(test, debug_assertions)) {
tracing_subscriber::filter::LevelFilter::DEBUG
} else {
tracing_subscriber::filter::LevelFilter::INFO
};
let filter_layer = tracing_subscriber::EnvFilter::builder()
.with_default_directive(default_filter.into())
.from_env_lossy()
.add_directive("stretto=off".parse().expect("infallible"))
.add_directive("sqlx=error".parse().expect("infallible"));

// use opentelemetry_sdk::propagation::TraceContextPropagator;
use tracing_subscriber::layer::SubscriberExt;

let disabled_logs = std::env::var("FREENET_DISABLE_LOGS").is_ok();
let to_stderr = std::env::var("FREENET_LOG_TO_STDERR").is_ok();
let layers = {
let fmt_layer = tracing_subscriber::fmt::layer().with_level(true);
let fmt_layer = if cfg!(any(test, debug_assertions)) {
fmt_layer.with_file(true).with_line_number(true)
} else {
fmt_layer
};
let fmt_layer = if to_stderr {
fmt_layer.with_writer(std::io::stderr).boxed()
} else {
fmt_layer.boxed()
};

#[cfg(feature = "trace-ot")]
{
let disabled_ot_traces = std::env::var("FREENET_DISABLE_TRACES").is_ok();
let identifier = if let Ok(peer) = std::env::var("FREENET_PEER_ID") {
format!("freenet-core-{peer}")
} else {
"freenet-core".to_string()
};
let tracing_ot_layer = {
// Connect the Jaeger OT tracer with the tracing middleware
let ot_jaeger_tracer =
opentelemetry_jaeger::config::agent::AgentPipeline::default()
.with_service_name(identifier)
.install_simple()?;
// Get a tracer which will route OT spans to a Jaeger agent
tracing_opentelemetry::layer().with_tracer(ot_jaeger_tracer)
};
if !disabled_logs && !disabled_ot_traces {
fmt_layer.and_then(tracing_ot_layer).boxed()
} else if !disabled_ot_traces {
tracing_ot_layer.boxed()
} else {
return Ok(());
}
}
#[cfg(not(feature = "trace-ot"))]
{
if disabled_logs {
return Ok(());
}
fmt_layer.boxed()
}
};
let filtered = layers.with_filter(filter_layer);
// Create a subscriber which includes the tracing Jaeger OT layer and a fmt layer
let subscriber = Registry::default().with(filtered);

// Set the global subscriber
tracing::subscriber::set_global_default(subscriber).expect("Error setting subscriber");
Ok(())
crate::tracing::tracer::init_tracer().expect("failed tracing initialization")
}
}
Loading

0 comments on commit e6f3ad6

Please sign in to comment.