Skip to content

Commit

Permalink
Enabling tracing by usage of opentelemtry
Browse files Browse the repository at this point in the history
  • Loading branch information
mariusz-reichert authored and m-reichert committed Aug 15, 2024
1 parent e11cd56 commit db37cef
Show file tree
Hide file tree
Showing 18 changed files with 1,368 additions and 250 deletions.
1,316 changes: 1,075 additions & 241 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,13 @@ url = "2.2.0"
hyper = "0.14"
hyperlocal = "0.8"
# close to same tokio version as dependent by hyper v0.14 and hyperlocal 0.8 -- things can go awry if they mismatch
tokio = { version = "1", features = ["sync", "macros"] }
tokio = { version = "1", features = ["sync", "macros", "rt-multi-thread", "rt"] }
opentelemetry = { version = "0.20.0", features = ["rt-tokio"] }
tracing-opentelemetry = "0.21.0"
opentelemetry-otlp = { version = "0.13.0", default-features = false, features = ["http-proto", "reqwest-client"] }
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
opentelemetry-semantic-conventions = "0.12.0"
tracing = { version = "0.1.40", features = ["async-await", "log"] }

# optional dependencies for electrum-discovery
electrum-client = { version = "0.8", optional = true }
Expand Down
7 changes: 5 additions & 2 deletions src/bin/electrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use electrs::{
errors::*,
metrics::Metrics,
new_index::{precache, ChainQuery, FetchFrom, Indexer, Mempool, Query, Store},
rest,
otlp_trace, rest,
signal::Waiter,
};

Expand Down Expand Up @@ -147,7 +147,10 @@ fn run_server(config: Arc<Config>) -> Result<()> {
Ok(())
}

fn main() {
#[tokio::main]
async fn main() {
let _tracing_guard = otlp_trace::init_tracing("electrs");

let config = Arc::new(Config::from_args());
if let Err(e) = run_server(config) {
error!("server failed: {}", e.display_chain());
Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ impl Config {
} else {
stderrlog::Timestamp::Off
});
log.init().expect("logging initialization failed");

let config = Config {
log,
network_type,
Expand Down
34 changes: 34 additions & 0 deletions src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use serde_json::{from_str, from_value, Value};
use bitcoin::consensus::encode::{deserialize, serialize_hex};
#[cfg(feature = "liquid")]
use elements::encode::{deserialize, serialize_hex};
use tracing::instrument;

use crate::chain::{Block, BlockHash, BlockHeader, Network, Transaction, Txid};
use crate::metrics::{HistogramOpts, HistogramVec, Metrics};
Expand All @@ -36,6 +37,7 @@ lazy_static! {
);
}

#[instrument(skip_all, name="Daemon::parse_hash<T>")]
fn parse_hash<T>(value: &Value) -> Result<T>
where
T: FromStr,
Expand All @@ -49,6 +51,7 @@ where
.chain_err(|| format!("non-hex value: {}", value))?)
}

#[instrument(skip_all, name="Daemon::header_from_value")]
fn header_from_value(value: Value) -> Result<BlockHeader> {
let header_hex = value
.as_str()
Expand Down Expand Up @@ -140,6 +143,7 @@ struct Connection {
signal: Waiter,
}

#[instrument(skip_all, name="Daemon::tcp_connect")]
fn tcp_connect(addr: SocketAddr, signal: &Waiter) -> Result<TcpStream> {
loop {
match TcpStream::connect_timeout(&addr, *DAEMON_CONNECTION_TIMEOUT) {
Expand All @@ -162,6 +166,7 @@ fn tcp_connect(addr: SocketAddr, signal: &Waiter) -> Result<TcpStream> {
}

impl Connection {
#[instrument(skip_all, name="Daemon::Connection::new")]
fn new(
addr: SocketAddr,
cookie_getter: Arc<dyn CookieGetter>,
Expand All @@ -181,10 +186,12 @@ impl Connection {
})
}

#[instrument(skip(self))]
fn reconnect(&self) -> Result<Connection> {
Connection::new(self.addr, self.cookie_getter.clone(), self.signal.clone())
}

#[instrument(skip_all, name="Daemon::Connection::send")]
fn send(&mut self, request: &str) -> Result<()> {
let cookie = &self.cookie_getter.get()?;
let msg = format!(
Expand All @@ -198,6 +205,8 @@ impl Connection {
})
}


#[instrument(skip_all, name="Daemon::Connection::recv")]
fn recv(&mut self) -> Result<String> {
// TODO: use proper HTTP parser.
let mut in_header = true;
Expand Down Expand Up @@ -353,6 +362,7 @@ impl Daemon {
Ok(daemon)
}

#[instrument(skip(self))]
pub fn reconnect(&self) -> Result<Daemon> {
Ok(Daemon {
daemon_dir: self.daemon_dir.clone(),
Expand All @@ -366,6 +376,7 @@ impl Daemon {
})
}

#[instrument(skip_all, name="Daemon::list_blk_files")]
pub fn list_blk_files(&self) -> Result<Vec<PathBuf>> {
let path = self.blocks_dir.join("blk*.dat");
debug!("listing block files at {:?}", path);
Expand All @@ -381,6 +392,7 @@ impl Daemon {
self.network.magic()
}

#[instrument(skip_all, name="Daemon::call_jsonrpc")]
fn call_jsonrpc(&self, method: &str, request: &Value) -> Result<Value> {
let mut conn = self.conn.lock().unwrap();
let timer = self.latency.with_label_values(&[method]).start_timer();
Expand All @@ -398,6 +410,7 @@ impl Daemon {
Ok(result)
}

#[instrument(skip_all, name="Daemon::handle_request_batch")]
fn handle_request_batch(&self, method: &str, params_list: &[Value]) -> Result<Vec<Value>> {
let id = self.message_id.next();
let chunks = params_list
Expand All @@ -420,6 +433,7 @@ impl Daemon {
Ok(results)
}

#[instrument(skip_all, name="Daemon::retry_request_batch")]
fn retry_request_batch(&self, method: &str, params_list: &[Value]) -> Result<Vec<Value>> {
loop {
match self.handle_request_batch(method, params_list) {
Expand All @@ -435,36 +449,43 @@ impl Daemon {
}
}

#[instrument(skip_all, name="Daemon::request")]
fn request(&self, method: &str, params: Value) -> Result<Value> {
let mut values = self.retry_request_batch(method, &[params])?;
assert_eq!(values.len(), 1);
Ok(values.remove(0))
}

#[instrument(skip_all, name="Daemon::requests")]
fn requests(&self, method: &str, params_list: &[Value]) -> Result<Vec<Value>> {
self.retry_request_batch(method, params_list)
}

// bitcoind JSONRPC API:

#[instrument(skip_all, name="Daemon::getblockchaininfo")]
pub fn getblockchaininfo(&self) -> Result<BlockchainInfo> {
let info: Value = self.request("getblockchaininfo", json!([]))?;
Ok(from_value(info).chain_err(|| "invalid blockchain info")?)
}

#[instrument(skip_all, name="Daemon::getnetworkinfo")]
fn getnetworkinfo(&self) -> Result<NetworkInfo> {
let info: Value = self.request("getnetworkinfo", json!([]))?;
Ok(from_value(info).chain_err(|| "invalid network info")?)
}

#[instrument(skip_all, name="Daemon::getbestblockhash")]
pub fn getbestblockhash(&self) -> Result<BlockHash> {
parse_hash(&self.request("getbestblockhash", json!([]))?)
}

#[instrument(skip_all, name="Daemon::getblockheader")]
pub fn getblockheader(&self, blockhash: &BlockHash) -> Result<BlockHeader> {
header_from_value(self.request("getblockheader", json!([blockhash, /*verbose=*/ false]))?)
}

#[instrument(skip_all, name="Daemon::getblockheaders")]
pub fn getblockheaders(&self, heights: &[usize]) -> Result<Vec<BlockHeader>> {
let heights: Vec<Value> = heights.iter().map(|height| json!([height])).collect();
let params_list: Vec<Value> = self
Expand All @@ -479,17 +500,20 @@ impl Daemon {
Ok(result)
}

#[instrument(skip_all, name="Daemon::getblock")]
pub fn getblock(&self, blockhash: &BlockHash) -> Result<Block> {
let block =
block_from_value(self.request("getblock", json!([blockhash, /*verbose=*/ false]))?)?;
assert_eq!(block.block_hash(), *blockhash);
Ok(block)
}

#[instrument(skip_all, name="Daemon::getblock_raw")]
pub fn getblock_raw(&self, blockhash: &BlockHash, verbose: u32) -> Result<Value> {
self.request("getblock", json!([blockhash, verbose]))
}

#[instrument(skip_all, name="Daemon::getblocks")]
pub fn getblocks(&self, blockhashes: &[BlockHash]) -> Result<Vec<Block>> {
let params_list: Vec<Value> = blockhashes
.iter()
Expand All @@ -503,6 +527,7 @@ impl Daemon {
Ok(blocks)
}

#[instrument(skip_all, name="Daemon::gettransactions")]
pub fn gettransactions(&self, txhashes: &[&Txid]) -> Result<Vec<Transaction>> {
let params_list: Vec<Value> = txhashes
.iter()
Expand All @@ -518,6 +543,7 @@ impl Daemon {
Ok(txs)
}

#[instrument(skip_all, name="Daemon::gettransaction_raw")]
pub fn gettransaction_raw(
&self,
txid: &Txid,
Expand All @@ -527,20 +553,24 @@ impl Daemon {
self.request("getrawtransaction", json!([txid, verbose, blockhash]))
}

#[instrument(skip_all, name="getmempooltx")]
pub fn getmempooltx(&self, txhash: &Txid) -> Result<Transaction> {
let value = self.request("getrawtransaction", json!([txhash, /*verbose=*/ false]))?;
tx_from_value(value)
}

#[instrument(skip_all, name="getmempooltxids")]
pub fn getmempooltxids(&self) -> Result<HashSet<Txid>> {
let res = self.request("getrawmempool", json!([/*verbose=*/ false]))?;
Ok(serde_json::from_value(res).chain_err(|| "invalid getrawmempool reply")?)
}

#[instrument(skip_all, name="broadcast")]
pub fn broadcast(&self, tx: &Transaction) -> Result<Txid> {
self.broadcast_raw(&serialize_hex(tx))
}

#[instrument(skip_all, name="broadcast_raw")]
pub fn broadcast_raw(&self, txhex: &str) -> Result<Txid> {
let txid = self.request("sendrawtransaction", json!([txhex]))?;
Ok(
Expand All @@ -552,6 +582,7 @@ impl Daemon {
// Get estimated feerates for the provided confirmation targets using a batch RPC request
// Missing estimates are logged but do not cause a failure, whatever is available is returned
#[allow(clippy::float_cmp)]
#[instrument(skip_all, name="Daemon::estimatesmartfee_batch")]
pub fn estimatesmartfee_batch(&self, conf_targets: &[u16]) -> Result<HashMap<u16, f64>> {
let params_list: Vec<Value> = conf_targets.iter().map(|t| json!([t, "ECONOMICAL"])).collect();

Expand Down Expand Up @@ -583,6 +614,7 @@ impl Daemon {
.collect())
}

#[instrument(skip_all, name="Daemon::get_all_headers")]
fn get_all_headers(&self, tip: &BlockHash) -> Result<Vec<BlockHeader>> {
let info: Value = self.request("getblockheader", json!([tip]))?;
let tip_height = info
Expand Down Expand Up @@ -610,6 +642,7 @@ impl Daemon {
}

// Returns a list of BlockHeaders in ascending height (i.e. the tip is last).
#[instrument(skip_all, name="Daemon::get_new_headers")]
pub fn get_new_headers(
&self,
indexed_headers: &HeaderList,
Expand Down Expand Up @@ -642,6 +675,7 @@ impl Daemon {
Ok(new_headers)
}

#[instrument(skip_all, name="Daemon::get_relayfee")]
pub fn get_relayfee(&self) -> Result<f64> {
let relayfee = self.getnetworkinfo()?.relayfee;

Expand Down
Loading

0 comments on commit db37cef

Please sign in to comment.