Skip to content

Commit

Permalink
feat: introduce metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
zsluedem committed Jan 24, 2024
1 parent e1e8eac commit 456e5b3
Show file tree
Hide file tree
Showing 27 changed files with 1,724 additions and 555 deletions.
1,123 changes: 651 additions & 472 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ silius-bundler = { version = "0.3.0-alpha", path = "crates/bundler", default-fea
silius-contracts = { version = "0.3.0-alpha", path = "crates/contracts", default-features = false }
silius-grpc = { version = "0.3.0-alpha", path = "crates/grpc", default-features = false }
silius-mempool = { version = "0.3.0-alpha", path = "crates/mempool", default-features = false }
silius-metrics = { version = "0.3.0-alpha", path = "crates/metrics", default-features = false }
silius-p2p = { version = "0.3.0-alpha", path = "crates/p2p", default-features = false }
silius-primitives = { version = "0.3.0-alpha", path = "crates/primitives", default-features = false }
silius-rpc = { version = "0.3.0-alpha", path = "crates/rpc", default-features = false }
Expand All @@ -68,6 +69,8 @@ tokio = { version = "1.35", features = ["full"] }
# misc
expanded-pathbuf = "0.1.2"
eyre = "0.6.11"
jsonrpsee = { version = "0.21.0", features = ["server", "macros", "client"] }
metrics = "0.22.0"
lazy_static = "1.4.0"
parking_lot = "0.12.1"
serde = "1.0.193"
Expand Down
7 changes: 7 additions & 0 deletions bin/silius/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ silius-bundler = { workspace = true }
silius-contracts = { workspace = true }
silius-grpc = { workspace = true }
silius-mempool = { workspace = true, features = ["mdbx"] }
silius-metrics = { workspace = true }
silius-p2p = { workspace = true }
silius-primitives = { workspace = true }
silius-rpc = { workspace = true }
Expand All @@ -37,10 +38,16 @@ pin-utils = "0.1.0"
tokio = { workspace = true }

# misc
async-trait = { workspace = true }
dirs = "5.0.1"
expanded-pathbuf = { workspace = true }
eyre = { workspace = true }
log = "0.4.20"
metrics = { workspace = true }
metrics-exporter-prometheus = "0.13.0"
metrics-util = "0.16.0"
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
56 changes: 47 additions & 9 deletions bin/silius/src/bundler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
cli::args::{
BundlerAndUoPoolArgs, BundlerArgs, CreateWalletArgs, RpcArgs, StorageType, UoPoolArgs,
BundlerAndUoPoolArgs, BundlerArgs, CreateWalletArgs, MetricsArgs, RpcArgs, StorageType,
UoPoolArgs,
},
utils::unwrap_path_or_home,
};
Expand All @@ -19,6 +20,7 @@ use silius_mempool::{
CodeHashes, DatabaseTable, EntitiesReputation, Mempool, Reputation, UserOperations,
UserOperationsByEntity, UserOperationsBySender, WriteMap,
};
use silius_metrics::{launch_metrics_exporter, mempool::MetricsHandler};
use silius_primitives::{
bundler::SendStrategy,
constants::{
Expand Down Expand Up @@ -55,6 +57,7 @@ pub async fn launch_bundler<M>(
uopool_args: UoPoolArgs,
common_args: BundlerAndUoPoolArgs,
rpc_args: RpcArgs,
metrics_args: MetricsArgs,
eth_client: Arc<M>,
block_streams: Vec<BlockStream>,
) -> eyre::Result<()>
Expand All @@ -67,6 +70,7 @@ where
block_streams,
common_args.chain,
common_args.entry_points.clone(),
metrics_args.clone(),
)
.await?;

Expand All @@ -76,16 +80,22 @@ where
common_args.chain,
common_args.entry_points,
format!("http://{:?}:{:?}", uopool_args.uopool_addr, uopool_args.uopool_port),
metrics_args.clone(),
)
.await?;

launch_rpc(
rpc_args,
format!("http://{:?}:{:?}", uopool_args.uopool_addr, uopool_args.uopool_port),
format!("http://{:?}:{:?}", bundler_args.bundler_addr, bundler_args.bundler_port),
metrics_args.clone(),
)
.await?;

if metrics_args.enable_metrics {
launch_metrics_exporter(metrics_args.listen_addr(), metrics_args.custom_label_value);
}

Ok(())
}

Expand All @@ -95,6 +105,7 @@ pub async fn launch_bundling<M>(
chain: Option<NamedChain>,
entry_points: Vec<Address>,
uopool_grpc_listen_address: String,
metrics_args: MetricsArgs,
) -> eyre::Result<()>
where
M: Middleware + Clone + 'static,
Expand Down Expand Up @@ -140,6 +151,7 @@ where
eth_client,
client,
uopool_grpc_client,
metrics_args.enable_metrics,
);
}
SendStrategy::Flashbots => {
Expand Down Expand Up @@ -167,6 +179,7 @@ where
eth_client,
client,
uopool_grpc_client,
metrics_args.enable_metrics,
);
}
}
Expand All @@ -182,6 +195,7 @@ pub async fn launch_uopool<M>(
block_streams: Vec<BlockStream>,
chain: Option<NamedChain>,
entry_points: Vec<Address>,
metrics_args: MetricsArgs,
) -> eyre::Result<()>
where
M: Middleware + Clone + 'static,
Expand Down Expand Up @@ -220,7 +234,10 @@ where
args.min_priority_fee_per_gas,
);
let mempool = Mempool::new(
Arc::new(RwLock::new(HashMap::<UserOperationHash, UserOperationSigned>::default())),
Arc::new(RwLock::new(MetricsHandler::new(HashMap::<
UserOperationHash,
UserOperationSigned,
>::default()))),
Arc::new(RwLock::new(HashMap::<Address, HashSet<UserOperationHash>>::default())),
Arc::new(RwLock::new(HashMap::<Address, HashSet<UserOperationHash>>::default())),
Arc::new(RwLock::new(HashMap::<UserOperationHash, Vec<CodeHash>>::default())),
Expand All @@ -233,7 +250,9 @@ where
MIN_UNSTAKE_DELAY.into(),
Arc::new(RwLock::new(HashSet::<Address>::default())),
Arc::new(RwLock::new(HashSet::<Address>::default())),
Arc::new(RwLock::new(HashMap::<Address, ReputationEntry>::default())),
Arc::new(RwLock::new(MetricsHandler::new(
HashMap::<Address, ReputationEntry>::default(),
))),
);
for whiteaddr in args.whitelist.iter() {
reputation.add_whitelist(whiteaddr);
Expand All @@ -253,6 +272,7 @@ where
node_enr_file,
args.p2p_opts.to_config(),
args.p2p_opts.bootnodes,
metrics_args.enable_metrics,
)
.await?;
info!("Started uopool gRPC service at {:?}:{:?}", args.uopool_addr, args.uopool_port);
Expand All @@ -269,7 +289,7 @@ where
);
env.create_tables().expect("Create mdbx database tables failed");
let mempool = Mempool::new(
DatabaseTable::<WriteMap, UserOperations>::new(env.clone()),
MetricsHandler::new(DatabaseTable::<WriteMap, UserOperations>::new(env.clone())),
DatabaseTable::<WriteMap, UserOperationsBySender>::new(env.clone()),
DatabaseTable::<WriteMap, UserOperationsByEntity>::new(env.clone()),
DatabaseTable::<WriteMap, CodeHashes>::new(env.clone()),
Expand All @@ -282,7 +302,9 @@ where
MIN_UNSTAKE_DELAY.into(),
Arc::new(RwLock::new(HashSet::<Address>::default())),
Arc::new(RwLock::new(HashSet::<Address>::default())),
DatabaseTable::<WriteMap, EntitiesReputation>::new(env.clone()),
MetricsHandler::new(DatabaseTable::<WriteMap, EntitiesReputation>::new(
env.clone(),
)),
);
for whiteaddr in args.whitelist.iter() {
reputation.add_whitelist(whiteaddr);
Expand All @@ -302,6 +324,7 @@ where
node_enr_file,
args.p2p_opts.to_config(),
args.p2p_opts.bootnodes,
metrics_args.enable_metrics,
)
.await?;
info!("Started uopool gRPC service at {:?}:{:?}", args.uopool_addr, args.uopool_port);
Expand All @@ -314,7 +337,10 @@ where
args.min_priority_fee_per_gas,
);
let mempool = Mempool::new(
Arc::new(RwLock::new(HashMap::<UserOperationHash, UserOperationSigned>::default())),
Arc::new(RwLock::new(MetricsHandler::new(HashMap::<
UserOperationHash,
UserOperationSigned,
>::default()))),
Arc::new(RwLock::new(HashMap::<Address, HashSet<UserOperationHash>>::default())),
Arc::new(RwLock::new(HashMap::<Address, HashSet<UserOperationHash>>::default())),
Arc::new(RwLock::new(HashMap::<UserOperationHash, Vec<CodeHash>>::default())),
Expand All @@ -327,7 +353,9 @@ where
MIN_UNSTAKE_DELAY.into(),
Arc::new(RwLock::new(HashSet::<Address>::default())),
Arc::new(RwLock::new(HashSet::<Address>::default())),
Arc::new(RwLock::new(HashMap::<Address, ReputationEntry>::default())),
Arc::new(RwLock::new(MetricsHandler::new(
HashMap::<Address, ReputationEntry>::default(),
))),
);
for whiteaddr in args.whitelist.iter() {
reputation.add_whitelist(whiteaddr);
Expand All @@ -347,6 +375,7 @@ where
node_enr_file,
args.p2p_opts.to_config(),
args.p2p_opts.bootnodes,
metrics_args.enable_metrics,
)
.await?;
info!("Started uopool gRPC service at {:?}:{:?}", args.uopool_addr, args.uopool_port);
Expand All @@ -363,7 +392,7 @@ where
);
env.create_tables().expect("Create mdbx database tables failed");
let mempool = Mempool::new(
DatabaseTable::<WriteMap, UserOperations>::new(env.clone()),
MetricsHandler::new(DatabaseTable::<WriteMap, UserOperations>::new(env.clone())),
DatabaseTable::<WriteMap, UserOperationsBySender>::new(env.clone()),
DatabaseTable::<WriteMap, UserOperationsByEntity>::new(env.clone()),
DatabaseTable::<WriteMap, CodeHashes>::new(env.clone()),
Expand All @@ -376,7 +405,9 @@ where
MIN_UNSTAKE_DELAY.into(),
Arc::new(RwLock::new(HashSet::<Address>::default())),
Arc::new(RwLock::new(HashSet::<Address>::default())),
DatabaseTable::<WriteMap, EntitiesReputation>::new(env.clone()),
MetricsHandler::new(DatabaseTable::<WriteMap, EntitiesReputation>::new(
env.clone(),
)),
);
for whiteaddr in args.whitelist.iter() {
reputation.add_whitelist(whiteaddr);
Expand All @@ -396,6 +427,7 @@ where
node_enr_file,
args.p2p_opts.to_config(),
args.p2p_opts.bootnodes,
metrics_args.enable_metrics,
)
.await?;
info!("Started uopool gRPC service at {:?}:{:?}", args.uopool_addr, args.uopool_port);
Expand All @@ -409,6 +441,7 @@ pub async fn launch_rpc(
args: RpcArgs,
uopool_grpc_listen_address: String,
bundler_grpc_listen_address: String,
metrics_args: MetricsArgs,
) -> eyre::Result<()> {
if !args.is_enabled() {
return Err(eyre::eyre!("No RPC protocol is enabled"));
Expand All @@ -431,6 +464,11 @@ pub async fn launch_rpc(
server = server.with_proxy(eth_client_proxy_address);
}

if metrics_args.enable_metrics {
info!("Enabling json rpc server metrics.");
server = server.with_metrics()
}

let http_api: HashSet<String> = HashSet::from_iter(args.http_api.iter().cloned());
let ws_api: HashSet<String> = HashSet::from_iter(args.ws_api.iter().cloned());

Expand Down
60 changes: 58 additions & 2 deletions bin/silius/src/cli/args.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use crate::utils::{
parse_address, parse_duration, parse_enr, parse_send_bundle_mode, parse_u256, parse_uopool_mode,
parse_address, parse_duration, parse_enr, parse_label_value, parse_send_bundle_mode,
parse_u256, parse_uopool_mode,
};
use alloy_chains::NamedChain;
use clap::{Parser, ValueEnum};
use discv5::Enr;
use ethers::types::{Address, U256};
use expanded_pathbuf::ExpandedPathBuf;
use silius_metrics::label::LabelValue;
use silius_p2p::config::{Config, ListenAddr};
use silius_primitives::{
bundler::SendStrategy,
Expand All @@ -17,7 +19,7 @@ use silius_primitives::{
UoPoolMode,
};
use std::{
net::{IpAddr, Ipv4Addr},
net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf,
time::Duration,
};
Expand Down Expand Up @@ -129,6 +131,9 @@ pub struct BundlerAndUoPoolArgs {
/// Poll interval event filters and pending transactions in milliseconds.
#[clap(long, default_value = "500", value_parser= parse_duration)]
pub poll_interval: Duration,

#[clap(flatten)]
pub metrics: MetricsArgs,
}

/// RPC CLI args
Expand Down Expand Up @@ -290,6 +295,25 @@ impl P2PArgs {
}
}
}

#[derive(Clone, Debug, Parser, PartialEq)]
pub struct MetricsArgs {
#[clap(long)]
pub enable_metrics: bool,
#[clap(long, value_delimiter = ',', value_parser=parse_label_value)]
pub custom_label_value: Option<Vec<LabelValue>>,
#[clap(long = "metrics.addr", default_value = "127.0.0.1")]
pub listen_address: Ipv4Addr,
#[clap(long = "metrics.port", default_value = "3030")]
pub port: u16,
}

impl MetricsArgs {
pub fn listen_addr(&self) -> SocketAddr {
SocketAddr::new(IpAddr::V4(self.listen_address), self.port)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -354,6 +378,12 @@ mod tests {
Address::from_str("0x690B9A9E9aa1C9dB991C7721a92d351Db4FaC990").unwrap()
],
poll_interval: Duration::from_millis(5000),
metrics: MetricsArgs {
enable_metrics: false,
custom_label_value: None,
listen_address: Ipv4Addr::new(127, 0, 0, 1),
port: 3030
}
},
BundlerAndUoPoolArgs::try_parse_from(args).unwrap()
);
Expand Down Expand Up @@ -621,4 +651,30 @@ mod tests {
P2PArgs::try_parse_from(args).unwrap()
)
}

#[test]
fn metrics_args() {
let args = vec![
"metricsargs",
"--enable-metrics",
"--metrics.addr",
"127.0.0.1",
"--metrics.port",
"9090",
"--custom-label-value",
"custom=value",
];
assert_eq!(
MetricsArgs {
enable_metrics: true,
listen_address: Ipv4Addr::new(127, 0, 0, 1),
port: 9090,
custom_label_value: Some(vec![LabelValue::new(
String::from("custom"),
String::from("value")
)])
},
MetricsArgs::try_parse_from(args).unwrap()
)
}
}
Loading

0 comments on commit 456e5b3

Please sign in to comment.