diff --git a/archives/block_listenser.rs b/archives/block_listenser.rs new file mode 100644 index 0000000..b271319 --- /dev/null +++ b/archives/block_listenser.rs @@ -0,0 +1,385 @@ +use std::{ + collections::VecDeque, + sync::{atomic::AtomicU64, Arc}, + time::Duration, +}; + +use dashmap::DashMap; +use jsonrpsee::SubscriptionSink; + +use tiny_logger::logs::{info, warn}; + +use solana_rpc_client::nonblocking::rpc_client::RpcClient; +use solana_rpc_client_api::{ + config::RpcBlockConfig, + response::{Response as RpcResponse, RpcResponseContext}, +}; + +use solana_sdk::{ + commitment_config::{CommitmentConfig, CommitmentLevel}, + slot_history::Slot, +}; + +use solana_transaction_status::{ + option_serializer::OptionSerializer, RewardType, TransactionConfirmationStatus, + TransactionDetails, TransactionStatus, UiConfirmedBlock, UiTransactionEncoding, + UiTransactionStatusMeta, +}; +use tokio::{ + sync::{mpsc::Sender, Mutex}, + task::JoinHandle, + time::Instant, +}; + +use crate::rpc_wrapper::block_store::{BlockInformation, BlockStore}; + +use super::{TxProps, TxSender}; + +/// Background worker which listen's to new blocks +/// and keeps a track of confirmed txs +#[derive(Clone)] +pub struct BlockListener { + tx_sender: TxSender, + block_store: BlockStore, + rpc_client: Arc, + signature_subscribers: Arc>, +} + +pub struct BlockListnerNotificatons { + pub block: Sender, + pub tx: Sender, +} + +impl BlockListener { + pub fn new(rpc_client: Arc, tx_sender: TxSender, block_store: BlockStore) -> Self { + Self { + rpc_client, + tx_sender, + block_store, + signature_subscribers: Default::default(), + } + } + + pub async fn num_of_sigs_commited(&self, sigs: &[String]) -> usize { + let mut num_of_sigs_commited = 0; + for sig in sigs { + if self.tx_sender.txs_sent_store.contains_key(sig) { + num_of_sigs_commited += 1; + } + } + num_of_sigs_commited + } + + #[allow(deprecated)] + fn get_supported_commitment_config(commitment_config: CommitmentConfig) -> CommitmentConfig { + match commitment_config.commitment { + CommitmentLevel::Finalized | CommitmentLevel::Root | CommitmentLevel::Max => { + CommitmentConfig { + commitment: CommitmentLevel::Finalized, + } + } + _ => CommitmentConfig { + commitment: CommitmentLevel::Confirmed, + }, + } + } + + pub fn signature_subscribe( + &self, + signature: String, + commitment_config: CommitmentConfig, + sink: SubscriptionSink, + ) { + let commitment_config = Self::get_supported_commitment_config(commitment_config); + self.signature_subscribers + .insert((signature, commitment_config), (sink, Instant::now())); + } + + pub fn signature_un_subscribe(&self, signature: String, commitment_config: CommitmentConfig) { + let commitment_config = Self::get_supported_commitment_config(commitment_config); + self.signature_subscribers + .remove(&(signature, commitment_config)); + NUMBER_OF_SIGNATURE_SUBSCRIBERS.dec(); + } + + // fn increment_invalid_block_metric(commitment_config: CommitmentConfig) { + // if commitment_config.is_finalized() { + // INCOMPLETE_FIN_BLOCKS_RECV.inc(); + // } else { + // INCOMPLETE_CON_BLOCKS_RECV.inc(); + // } + // } + + pub async fn index_slot( + &self, + slot: Slot, + commitment_config: CommitmentConfig, + ) -> anyhow::Result<()> { + //info!("indexing slot {} commitment {}", slot, commitment_config.commitment); + let comfirmation_status = match commitment_config.commitment { + CommitmentLevel::Finalized => TransactionConfirmationStatus::Finalized, + _ => TransactionConfirmationStatus::Confirmed, + }; + + // let timer = if commitment_config.is_finalized() { + // TT_RECV_FIN_BLOCK.start_timer() + // } else { + // TT_RECV_CON_BLOCK.start_timer() + // }; + + let start = Instant::now(); + + let block = self + .rpc_client + .get_block_with_config( + slot, + RpcBlockConfig { + transaction_details: Some(TransactionDetails::Full), + commitment: Some(commitment_config), + max_supported_transaction_version: Some(0), + encoding: Some(UiTransactionEncoding::Base64), + rewards: Some(true), + }, + ) + .await?; + timer.observe_duration(); + + if commitment_config.is_finalized() { + FIN_BLOCKS_RECV.inc(); + } else { + CON_BLOCKS_RECV.inc(); + }; + + // let Some(block_height) = block.block_height else { + // Self::increment_invalid_block_metric(commitment_config); + // return Ok(()); + // }; + + // let Some(transactions) = block.transactions else { + // Self::increment_invalid_block_metric(commitment_config); + // return Ok(()); + // }; + + let blockhash = block.blockhash; + //let parent_slot = block.parent_slot; + + self.block_store + .add_block( + blockhash.clone(), + BlockInformation { + slot, + block_height, + instant: Instant::now(), + }, + commitment_config, + ) + .await; + + let mut transactions_processed = 0; + for tx in transactions { + let Some(UiTransactionStatusMeta { err, status, compute_units_consumed: _ ,.. }) = tx.meta else { + info!("tx with no meta"); + continue; + }; + + let tx = match tx.transaction.decode() { + Some(tx) => tx, + None => { + warn!("transaction could not be decoded"); + continue; + } + }; + transactions_processed += 1; + let sig = tx.signatures[0].to_string(); + + if let Some(mut tx_status) = self.tx_sender.txs_sent_store.get_mut(&sig) { + // + // Metrics + // + if status.is_ok() { + if commitment_config.is_finalized() { + TXS_FINALIZED.inc(); + } else { + TXS_CONFIRMED.inc(); + } + } + + tx_status.value_mut().status = Some(TransactionStatus { + slot, + confirmations: None, + status, + err: err.clone(), + confirmation_status: Some(comfirmation_status.clone()), + }); + }; + + // subscribers + if let Some((_sig, (mut sink, _))) = + self.signature_subscribers.remove(&(sig, commitment_config)) + { + // none if transaction succeeded + sink.send(&RpcResponse { + context: RpcResponseContext { + slot, + api_version: None, + }, + value: serde_json::json!({ "err": err }), + })?; + NUMBER_OF_SIGNATURE_SUBSCRIBERS.dec(); + } + } + + info!( + "Number of transactions processed {} for slot {} for commitment {} time taken {} ms", + transactions_processed, + slot, + commitment_config.commitment, + start.elapsed().as_millis() + ); + + Ok(()) + } + pub fn listen(self, commitment_config: CommitmentConfig) -> JoinHandle> { + let slots_task_queue = Arc::new(Mutex::new(VecDeque::<(u64, u8)>::new())); + let (slot_retry_queue_sx, mut slot_retry_queue_rx) = tokio::sync::mpsc::unbounded_channel(); + + // task to fetch blocks + for _i in 0..6 { + let this = self.clone(); + let slots_task_queue = slots_task_queue.clone(); + let slot_retry_queue_sx = slot_retry_queue_sx.clone(); + + tokio::spawn(async move { + let slots_task_queue = slots_task_queue.clone(); + loop { + let (slot, error_count) = { + let mut queue = slots_task_queue.lock().await; + match queue.pop_front() { + Some(t) => t, + None => { + // no task + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + continue; + } + } + }; + + if let Err(_) = this.index_slot(slot, commitment_config).await { + // usually as we index all the slots even if they are not been processed we get some errors for slot + // as they are not in long term storage of the rpc // we check 5 times before ignoring the slot + + if error_count > 5 { + // retried for 10 times / there should be no block for this slot + warn!( + "unable to get block at slot {} and commitment {}", + slot, commitment_config.commitment + ); + continue; + } else { + // add a task to be queued after a delay + let retry_at = tokio::time::Instant::now() + .checked_add(Duration::from_millis(100)) + .unwrap(); + let _ = slot_retry_queue_sx.send((slot, error_count, retry_at)); + BLOCKS_IN_RETRY_QUEUE.inc(); + } + }; + } + }); + } + + // a task that will queue back the slots to be retried after a certain delay + let recent_slot = Arc::new(AtomicU64::new(0)); + { + let slots_task_queue = slots_task_queue.clone(); + let recent_slot = recent_slot.clone(); + tokio::spawn(async move { + loop { + match slot_retry_queue_rx.recv().await { + Some((slot, error_count, instant)) => { + BLOCKS_IN_RETRY_QUEUE.dec(); + let recent_slot = + recent_slot.load(std::sync::atomic::Ordering::Relaxed); + // if slot is too old ignore + if recent_slot.saturating_sub(slot) > 256 { + // slot too old to retry + // most probably its an empty slot + continue; + } + + let now = tokio::time::Instant::now(); + if now < instant { + tokio::time::sleep_until(instant).await; + } + let mut queue = slots_task_queue.lock().await; + queue.push_back((slot, error_count + 1)); + } + None => { + break; + } + } + } + }); + } + + let rpc_client = self.rpc_client.clone(); + tokio::spawn(async move { + let slots_task_queue = slots_task_queue.clone(); + let last_latest_slot = self + .block_store + .get_latest_block_info(commitment_config) + .await + .slot; + // -5 for warmup + let mut last_latest_slot = last_latest_slot - 5; + recent_slot.store(last_latest_slot, std::sync::atomic::Ordering::Relaxed); + + // storage for recent slots processed + let rpc_client = rpc_client.clone(); + loop { + let new_slot = match rpc_client.get_slot_with_commitment(commitment_config).await { + Ok(new_slot) => new_slot, + Err(err) => { + warn!("Error while fetching slot {err:?}"); + ERRORS_WHILE_FETCHING_SLOTS.inc(); + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + continue; + } + }; + + if last_latest_slot == new_slot { + warn!("No new slots"); + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + continue; + } + + // filter already processed slots + let new_block_slots: Vec = (last_latest_slot..new_slot).collect(); + // context for lock + { + let mut lock = slots_task_queue.lock().await; + for slot in new_block_slots { + lock.push_back((slot, 0)); + } + BLOCKS_IN_QUEUE.set(lock.len() as i64); + } + + last_latest_slot = new_slot; + recent_slot.store(last_latest_slot, std::sync::atomic::Ordering::Relaxed); + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + } + }) + } + + pub fn clean(&self, ttl_duration: Duration) { + let length_before = self.signature_subscribers.len(); + self.signature_subscribers + .retain(|_k, (sink, instant)| !sink.is_closed() && instant.elapsed() < ttl_duration); + + NUMBER_OF_SIGNATURE_SUBSCRIBERS.set(self.signature_subscribers.len() as i64); + info!( + "Cleaned {} Signature Subscribers", + length_before - self.signature_subscribers.len() + ); + } +} diff --git a/tinydancer/src/consensus.rs b/tinydancer/src/consensus.rs index 48ff428..5767c4e 100644 --- a/tinydancer/src/consensus.rs +++ b/tinydancer/src/consensus.rs @@ -1,5 +1,5 @@ -use crate::tinydancer::{endpoint, ClientService, ClientStatus, Cluster}; use crate::sampler::{ArchiveConfig, SlotSubscribeResponse}; +use crate::tinydancer::{endpoint, ClientService, ClientStatus, Cluster}; use crate::{convert_to_websocket, send_rpc_call, try_coerce_shred}; use anyhow::anyhow; use async_trait::async_trait; @@ -12,42 +12,42 @@ use rayon::prelude::*; use reqwest::Request; use rocksdb::{ColumnFamily, Options as RocksOptions, DB}; use serde::de::DeserializeOwned; +use serde_derive::Deserialize; +use serde_derive::Serialize; use solana_ledger::shred::{ShredId, ShredType}; use solana_ledger::{ -ancestor_iterator::{AncestorIterator, AncestorIteratorWithHash}, -blockstore::Blockstore, -// blockstore_db::columns::ShredCode, -shred::{Nonce, Shred, ShredCode, ShredData, ShredFetchStats, SIZE_OF_NONCE}, + ancestor_iterator::{AncestorIterator, AncestorIteratorWithHash}, + blockstore::Blockstore, + // blockstore_db::columns::ShredCode, + shred::{Nonce, Shred, ShredCode, ShredData, ShredFetchStats, SIZE_OF_NONCE}, }; use solana_sdk::hash::hashv; use solana_sdk::{ -clock::Slot, -genesis_config::ClusterType, -hash::{Hash, HASH_BYTES}, -packet::PACKET_DATA_SIZE, -pubkey::{Pubkey, PUBKEY_BYTES}, -signature::{Signable, Signature, Signer, SIGNATURE_BYTES}, -signer::keypair::Keypair, -timing::{duration_as_ms, timestamp}, + clock::Slot, + genesis_config::ClusterType, + hash::{Hash, HASH_BYTES}, + packet::PACKET_DATA_SIZE, + pubkey::{Pubkey, PUBKEY_BYTES}, + signature::{Signable, Signature, Signer, SIGNATURE_BYTES}, + signer::keypair::Keypair, + timing::{duration_as_ms, timestamp}, }; use std::str::FromStr; use std::sync::atomic::{AtomicU32, Ordering}; -use std::sync::{Arc, Mutex, MutexGuard}; +use std::sync::Arc; use std::{error::Error, ops::Add}; use std::{ -net::{SocketAddr, UdpSocket}, -thread::Builder, + net::{SocketAddr, UdpSocket}, + thread::Builder, }; use tiny_logger::logs::{debug, error, info}; use tokio::{ -sync::mpsc::UnboundedSender, -task::{JoinError, JoinHandle}, -sync::Mutex as TokioMutex, + sync::mpsc::UnboundedSender, + sync::{Mutex, MutexGuard}, + task::{JoinError, JoinHandle}, }; use tungstenite::{connect, Message}; use url::Url; -use serde_derive::Deserialize; -use serde_derive::Serialize; pub struct ConsensusService { consensus_indices: Vec, @@ -58,20 +58,20 @@ pub struct ConsensusServiceConfig { pub cluster: Cluster, pub archive_config: ArchiveConfig, pub instance: Arc, - pub status_consensus: Arc>, + pub client_status: Arc>, pub sample_qty: usize, } #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] - pub struct RpcBlockCommitment { +pub struct RpcBlockCommitment { pub commitment: Option, pub total_stake: u64, } #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] - pub struct GetCommittmentResponse { +pub struct GetCommittmentResponse { pub jsonrpc: String, pub result: RpcBlockCommitment, pub id: i64, @@ -84,96 +84,95 @@ pub const VOTE_THRESHOLD_SIZE: f64 = 2f64 / 3f64; #[async_trait] impl ClientService for ConsensusService { -type ServiceError = tokio::task::JoinError; + type ServiceError = tokio::task::JoinError; -fn new(config: ConsensusServiceConfig) -> Self { - let consensus_handler = tokio::spawn(async move { - let rpc_url = endpoint(config.cluster); - let pub_sub = convert_to_websocket!(rpc_url); + fn new(config: ConsensusServiceConfig) -> Self { + let consensus_handler = tokio::spawn(async move { + let rpc_url = endpoint(config.cluster); + let pub_sub = convert_to_websocket!(rpc_url); - let mut threads = Vec::default(); + let mut threads = Vec::default(); - let (slot_update_tx, slot_update_rx) = crossbeam::channel::unbounded::(); + let (slot_update_tx, slot_update_rx) = crossbeam::channel::unbounded::(); - let status_arc = config.status_consensus.clone(); + let status_arc = config.client_status.clone(); - // waits on new slots => triggers slot_verify_loop - threads.push(tokio::spawn(slot_update_loop( - slot_update_tx, - pub_sub, - config.status_consensus, - ))); + // waits on new slots => triggers slot_verify_loop + threads.push(tokio::spawn(slot_update_loop( + slot_update_tx, + pub_sub, + config.client_status, + ))); - // verify slot votes - threads.push(tokio::spawn(slot_verify_loop( - slot_update_rx, - rpc_url, - status_arc, - ))); + // verify slot votes + threads.push(tokio::spawn(slot_verify_loop( + slot_update_rx, + rpc_url, + status_arc, + ))); + for thread in threads { + thread.await; + } + }); - for thread in threads { - thread.await; + Self { + consensus_handler, + consensus_indices: Vec::default(), } - }); - - Self { - consensus_handler, - consensus_indices: Vec::default(), } -} -async fn join(self) -> std::result::Result<(), Self::ServiceError> { - self.consensus_handler.await -} + async fn join(self) -> std::result::Result<(), Self::ServiceError> { + self.consensus_handler.await + } } pub async fn slot_update_loop( slot_update_tx: Sender, pub_sub: String, - status_sampler: Arc>, + client_status: Arc>, ) -> anyhow::Result<()> { -let result = match connect(Url::parse(pub_sub.as_str()).unwrap()) { - Ok((socket, _response)) => Some((socket, _response)), - Err(_) => { - let mut status = status_sampler.lock().await; - *status = ClientStatus::Crashed(String::from("Client can't connect to socket")); - None - } -}; - -if result.is_none() { - return Err(anyhow!("")); -} - -let (mut socket, _response) = result.unwrap(); - -socket.write_message(Message::Text( - r#"{ "jsonrpc": "2.0", "id": 1, "method": "slotSubscribe" }"#.into(), -))?; + let result = match connect(Url::parse(pub_sub.as_str()).unwrap()) { + Ok((socket, _response)) => Some((socket, _response)), + Err(_) => { + let mut status = client_status.lock().await; + *status = ClientStatus::Crashed(String::from("Client can't connect to socket")); + None + } + }; -loop { - match socket.read_message() { - Ok(msg) => { - let res = serde_json::from_str::(msg.to_string().as_str()); + if result.is_none() { + return Err(anyhow!("")); + } - // info!("res: {:?}", msg.to_string().as_str()); - if let Ok(res) = res { - match slot_update_tx.send(res.params.result.root as u64) { - Ok(_) => { - info!("slot updated: {:?}", res.params.result.root); - } - Err(e) => { - info!("error here: {:?} {:?}", e, res.params.result.root as u64); - continue; // @TODO: we should add retries here incase send fails for some reason + let (mut socket, _response) = result.unwrap(); + + socket.write_message(Message::Text( + r#"{ "jsonrpc": "2.0", "id": 1, "method": "slotSubscribe" }"#.into(), + ))?; + + loop { + match socket.read_message() { + Ok(msg) => { + let res = serde_json::from_str::(msg.to_string().as_str()); + + // info!("res: {:?}", msg.to_string().as_str()); + if let Ok(res) = res { + match slot_update_tx.send(res.params.result.root as u64) { + Ok(_) => { + info!("slot updated: {:?}", res.params.result.root); + } + Err(e) => { + info!("error here: {:?} {:?}", e, res.params.result.root as u64); + continue; // @TODO: we should add retries here incase send fails for some reason + } } } } + Err(e) => info!("err: {:?}", e), } - Err(e) => info!("err: {:?}", e), } } -} // verifies the total vote on the slot > 2/3 fn verify_slot(slot_commitment: RpcBlockCommitment) -> bool { @@ -191,20 +190,20 @@ fn verify_slot(slot_commitment: RpcBlockCommitment) -> boo pub async fn slot_verify_loop( slot_update_rx: Receiver, endpoint: String, - status_sampler: Arc>, + client_status: Arc>, ) -> anyhow::Result<()> { -loop { - let mut status = status_sampler.lock().await; + loop { + let mut status = client_status.lock().await; if let ClientStatus::Crashed(_) = &*status { return Err(anyhow!("Client crashed")); } else { - *status = ClientStatus::Active(String::from( - "Monitoring Tinydancer: Verifying consensus", - )); + *status = + ClientStatus::Active(String::from("Monitoring Tinydancer: Verifying consensus")); } + drop(status); if let Ok(slot) = slot_update_rx.recv() { let slot_commitment_result = request_slot_voting(slot, &endpoint).await; - + if let Err(e) = slot_commitment_result { println!("Error {}", e); info!("{}", e); @@ -229,7 +228,6 @@ pub async fn request_slot_voting( slot: u64, endpoint: &String, ) -> Result { - let request = serde_json::json!({ "jsonrpc": "2.0", "id": 1, diff --git a/tinydancer/src/macros.rs b/tinydancer/src/macros.rs index c747f67..7c0f646 100644 --- a/tinydancer/src/macros.rs +++ b/tinydancer/src/macros.rs @@ -17,6 +17,17 @@ macro_rules! block_on { rt.handle().block_on($func).expect($error); }; } +#[macro_export] +macro_rules! block_on_async { + ($func:expr) => {{ + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on($func) + }}; +} + #[macro_export] macro_rules! try_coerce_shred { ($response:expr) => {{ diff --git a/tinydancer/src/main.rs b/tinydancer/src/main.rs index 77a36ab..325c710 100644 --- a/tinydancer/src/main.rs +++ b/tinydancer/src/main.rs @@ -46,9 +46,9 @@ use std::{ use tinydancer::{endpoint, Cluster, TinyDancer, TinyDancerConfig}; mod macros; use colored::Colorize; +mod consensus; mod rpc_wrapper; mod sampler; -mod consensus; mod ui; use anyhow::{anyhow, Result}; @@ -149,7 +149,7 @@ async fn main() -> Result<()> { archive_path, shred_archive_duration, tui_monitor, - consensus_mode + consensus_mode, } => { let config_file = get_config_file().map_err(|_| anyhow!("tinydancer config not set"))?; @@ -234,8 +234,6 @@ async fn main() -> Result<()> { } } ConfigSubcommands::Set { log_path, cluster } => { - // println!("{:?}", fs::create_dir_all("~/.config/tinydancer")); - let home_path = std::env::var("HOME").unwrap(); let tinydancer_dir = home_path + "/.config/tinydancer"; diff --git a/tinydancer/src/sampler.rs b/tinydancer/src/sampler.rs index 885ce17..daa1b77 100644 --- a/tinydancer/src/sampler.rs +++ b/tinydancer/src/sampler.rs @@ -31,7 +31,7 @@ use solana_sdk::{ }; use std::str::FromStr; use std::sync::atomic::{AtomicU32, Ordering}; -use std::sync::{Arc, Mutex, MutexGuard}; +use std::sync::Arc; use std::{error::Error, ops::Add}; use std::{ net::{SocketAddr, UdpSocket}, @@ -40,6 +40,7 @@ use std::{ use tiny_logger::logs::{debug, error, info}; use tokio::{ sync::mpsc::UnboundedSender, + sync::{Mutex, MutexGuard}, task::{JoinError, JoinHandle}, }; use tungstenite::{connect, Message}; @@ -56,7 +57,7 @@ pub struct SampleServiceConfig { pub cluster: Cluster, pub archive_config: ArchiveConfig, pub instance: Arc, - pub status_sampler: Arc>, + pub client_status: Arc>, pub sample_qty: usize, } @@ -81,13 +82,13 @@ impl ClientService for SampleService { let (shred_tx, shred_rx) = crossbeam::channel::unbounded(); let (verified_shred_tx, verified_shred_rx) = crossbeam::channel::unbounded(); - let status_arc = config.status_sampler.clone(); + let status_arc = config.client_status.clone(); // waits on new slots => triggers shred_update_loop threads.push(tokio::spawn(slot_update_loop( slot_update_tx, pub_sub, - config.status_sampler, + config.client_status, ))); // sample shreds from new slot @@ -158,13 +159,14 @@ pub async fn request_shreds( pub async fn slot_update_loop( slot_update_tx: Sender, pub_sub: String, - status_sampler: Arc>, + client_status: Arc>, ) -> anyhow::Result<()> { let result = match connect(Url::parse(pub_sub.as_str()).unwrap()) { Ok((socket, _response)) => Some((socket, _response)), Err(_) => { - let mut status = status_sampler.lock().unwrap(); + let mut status = client_status.lock().await; *status = ClientStatus::Crashed(String::from("Client can't connect to socket")); + drop(status); None } }; @@ -318,18 +320,19 @@ async fn shred_update_loop( slot_update_rx: Receiver, endpoint: String, shred_tx: Sender<(Vec>, solana_ledger::shred::Pubkey)>, - status_sampler: Arc>, + client_status: Arc>, sample_qty: usize, ) -> anyhow::Result<()> { loop { { - let mut status = status_sampler.lock().unwrap(); + let mut status = client_status.lock().await; if let ClientStatus::Crashed(_) = &*status { return Err(anyhow!("Client crashed")); } else { *status = ClientStatus::Active(String::from( "Monitoring Tinydancer: Actively Sampling Shreds", )); + drop(status) } } diff --git a/tinydancer/src/tinydancer.rs b/tinydancer/src/tinydancer.rs index 8c228e0..e08f89b 100644 --- a/tinydancer/src/tinydancer.rs +++ b/tinydancer/src/tinydancer.rs @@ -1,18 +1,14 @@ //! Sampler struct - incharge of sampling shreds // use rayon::prelude::*; -use std::{ - env, - sync::{Arc, Mutex, MutexGuard}, - thread::Result, -}; +use std::{env, sync::Arc, thread::Result}; // use tokio::time::Duration; use crate::{ block_on, + consensus::{ConsensusService, ConsensusServiceConfig}, rpc_wrapper::{TransactionService, TransactionServiceConfig}, sampler::{ArchiveConfig, SampleService, SampleServiceConfig, SHRED_CF}, - consensus::{ConsensusService, ConsensusServiceConfig}, ui::{UiConfig, UiService}, }; use anyhow::anyhow; @@ -24,7 +20,12 @@ use tiny_logger::logs::info; // use log::info; // use log4rs; use std::error::Error; -use tokio::{runtime::Runtime, task::JoinError, try_join, sync::Mutex as TokioMutex,}; +use tokio::{ + runtime::Runtime, + sync::{Mutex, MutexGuard}, + task::JoinError, + try_join, +}; // use std::{thread, thread::JoinHandle, time::Duration}; #[async_trait] @@ -64,13 +65,8 @@ use std::path::PathBuf; impl TinyDancer { pub async fn start(config: TinyDancerConfig) -> Result<()> { let status = ClientStatus::Initializing(String::from("Starting Up Tinydancer")); - let status_clone = status.clone(); - let client_status = Arc::new(Mutex::new(status_clone)); - let status_sampler = client_status.clone(); - - let consensus_client_status = Arc::new(TokioMutex::new(status.clone())); - let status_consensus = consensus_client_status.clone(); - + let client_status = Arc::new(Mutex::new(status)); + let client_status_ui = client_status.clone(); let TinyDancerConfig { enable_ui_service, rpc_endpoint, @@ -93,46 +89,6 @@ impl TinyDancer { .unwrap(); let db = Arc::new(db); - - if consensus_mode { - println!("Running in consensus_mode"); - - let consensus_service_config = ConsensusServiceConfig { - cluster: rpc_endpoint.clone(), - archive_config, - instance: db.clone(), - status_consensus: status_consensus.clone(), - sample_qty, - }; - - let consensus_service = ConsensusService::new(consensus_service_config); - - // run the sampling service - consensus_service - .join() - .await - .expect("error in consensus service thread"); - } - - else{ - - let sample_service_config = SampleServiceConfig { - cluster: rpc_endpoint.clone(), - archive_config, - instance: db.clone(), - status_sampler, - sample_qty, - }; - - let sample_service = SampleService::new(sample_service_config); - - // run the sampling service - sample_service - .join() - .await - .expect("error in sample service thread"); - } - let transaction_service = TransactionService::new(TransactionServiceConfig { cluster: rpc_endpoint.clone(), db_instance: db.clone(), @@ -140,14 +96,46 @@ impl TinyDancer { let ui_service = if enable_ui_service || tui_monitor { Some(UiService::new(UiConfig { - client_status, + client_status: client_status_ui, enable_ui_service, tui_monitor, })) } else { None }; + // run the sampling service + if !consensus_mode { + let sample_service_config = SampleServiceConfig { + cluster: rpc_endpoint.clone(), + archive_config: archive_config.clone(), + instance: db.clone(), + client_status: client_status.clone(), + sample_qty, + }; + let sample_service = SampleService::new(sample_service_config); + sample_service + .join() + .await + .expect("error in sample service thread"); + } + if consensus_mode { + let consensus_service_config = ConsensusServiceConfig { + cluster: rpc_endpoint.clone(), + archive_config, + instance: db.clone(), + client_status, + sample_qty, + }; + + let consensus_service = ConsensusService::new(consensus_service_config); + + // run the consensus service + consensus_service + .join() + .await + .expect("error in consensus service thread"); + } transaction_service .join() .await diff --git a/tinydancer/src/ui/ui.rs b/tinydancer/src/ui/ui.rs index c4703f6..5bcf9fd 100644 --- a/tinydancer/src/ui/ui.rs +++ b/tinydancer/src/ui/ui.rs @@ -1,3 +1,4 @@ +use crate::block_on_async; use crate::sampler::GetShredResponse; use crate::tinydancer::{ClientService, ClientStatus, TinyDancer}; use async_trait::async_trait; @@ -8,13 +9,14 @@ use crossterm::{ terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen}, }; use spinoff::{spinners, Color as SpinColor, Spinner}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::thread::sleep; use std::time::Duration; use std::{any::Any, thread::Thread}; use std::{fmt, thread::JoinHandle}; use thiserror::Error; use tiny_logger::logs::info; +use tokio::sync::{Mutex, MutexGuard}; use tui::layout::Rect; use tui::style::{Color, Modifier, Style}; use tui::text::{Span, Spans}; @@ -225,12 +227,30 @@ impl ClientService for UiService { threads.push(std::thread::spawn(move || loop { sleep(Duration::from_millis(100)); + enable_raw_mode(); + if crossterm::event::poll(Duration::from_millis(100)).unwrap() { + let ev = crossterm::event::read().unwrap(); + if ev + == Event::Key(KeyEvent { + code: KeyCode::Char('c'), + modifiers: KeyModifiers::CONTROL, + kind: KeyEventKind::Press, + state: KeyEventState::NONE, + }) + { + let mut status = block_on_async!(client_status.lock()); + *status = ClientStatus::ShuttingDown(String::from( + "Shutting Down Gracefully...", + )); + drop(status); + disable_raw_mode(); + } + } + let status = block_on_async!(client_status.lock()); - let status = client_status.lock().unwrap(); match &*status { ClientStatus::Active(msg) => { spinner.update(spinners::Dots, msg.clone(), SpinColor::Green); - // sleep(Duration::from_secs(100)); } ClientStatus::Initializing(msg) => { spinner.update(spinners::Dots, msg.clone(), SpinColor::Yellow); @@ -245,27 +265,7 @@ impl ClientService for UiService { } _ => {} } - Mutex::unlock(status); - enable_raw_mode(); - if crossterm::event::poll(Duration::from_millis(100)).unwrap() { - let ev = crossterm::event::read().unwrap(); - - if ev - == Event::Key(KeyEvent { - code: KeyCode::Char('c'), - modifiers: KeyModifiers::CONTROL, - kind: KeyEventKind::Press, - state: KeyEventState::NONE, - }) - { - let mut status = client_status.lock().unwrap(); - *status = ClientStatus::ShuttingDown(String::from( - "Shutting Down Gracefully...", - )); - Mutex::unlock(status); - disable_raw_mode(); - } - } + drop(status); })); }