From 8c711540691eb005cace78f3446871a7decbeb1a Mon Sep 17 00:00:00 2001 From: anoushk1234 Date: Sun, 23 Apr 2023 18:58:13 +0530 Subject: [PATCH] fix: status indicator works now + fixed async/sync context --- .gitignore | 3 +- tinydancer/src/consensus.rs | 192 +++++++++++++++++------------------ tinydancer/src/macros.rs | 11 ++ tinydancer/src/main.rs | 6 +- tinydancer/src/sampler.rs | 19 ++-- tinydancer/src/tinydancer.rs | 98 ++++++++---------- tinydancer/src/ui/ui.rs | 48 ++++----- 7 files changed, 188 insertions(+), 189 deletions(-) diff --git a/.gitignore b/.gitignore index 4ad91fc..951054a 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ **.log tmp/ .vscode -db/ \ No newline at end of file +db/ +archives \ No newline at end of file diff --git a/tinydancer/src/consensus.rs b/tinydancer/src/consensus.rs index 48ff428..5767c4e 100644 --- a/tinydancer/src/consensus.rs +++ b/tinydancer/src/consensus.rs @@ -1,5 +1,5 @@ -use crate::tinydancer::{endpoint, ClientService, ClientStatus, Cluster}; use crate::sampler::{ArchiveConfig, SlotSubscribeResponse}; +use crate::tinydancer::{endpoint, ClientService, ClientStatus, Cluster}; use crate::{convert_to_websocket, send_rpc_call, try_coerce_shred}; use anyhow::anyhow; use async_trait::async_trait; @@ -12,42 +12,42 @@ use rayon::prelude::*; use reqwest::Request; use rocksdb::{ColumnFamily, Options as RocksOptions, DB}; use serde::de::DeserializeOwned; +use serde_derive::Deserialize; +use serde_derive::Serialize; use solana_ledger::shred::{ShredId, ShredType}; use solana_ledger::{ -ancestor_iterator::{AncestorIterator, AncestorIteratorWithHash}, -blockstore::Blockstore, -// blockstore_db::columns::ShredCode, -shred::{Nonce, Shred, ShredCode, ShredData, ShredFetchStats, SIZE_OF_NONCE}, + ancestor_iterator::{AncestorIterator, AncestorIteratorWithHash}, + blockstore::Blockstore, + // blockstore_db::columns::ShredCode, + shred::{Nonce, Shred, ShredCode, ShredData, ShredFetchStats, SIZE_OF_NONCE}, }; use solana_sdk::hash::hashv; use solana_sdk::{ -clock::Slot, -genesis_config::ClusterType, -hash::{Hash, HASH_BYTES}, -packet::PACKET_DATA_SIZE, -pubkey::{Pubkey, PUBKEY_BYTES}, -signature::{Signable, Signature, Signer, SIGNATURE_BYTES}, -signer::keypair::Keypair, -timing::{duration_as_ms, timestamp}, + clock::Slot, + genesis_config::ClusterType, + hash::{Hash, HASH_BYTES}, + packet::PACKET_DATA_SIZE, + pubkey::{Pubkey, PUBKEY_BYTES}, + signature::{Signable, Signature, Signer, SIGNATURE_BYTES}, + signer::keypair::Keypair, + timing::{duration_as_ms, timestamp}, }; use std::str::FromStr; use std::sync::atomic::{AtomicU32, Ordering}; -use std::sync::{Arc, Mutex, MutexGuard}; +use std::sync::Arc; use std::{error::Error, ops::Add}; use std::{ -net::{SocketAddr, UdpSocket}, -thread::Builder, + net::{SocketAddr, UdpSocket}, + thread::Builder, }; use tiny_logger::logs::{debug, error, info}; use tokio::{ -sync::mpsc::UnboundedSender, -task::{JoinError, JoinHandle}, -sync::Mutex as TokioMutex, + sync::mpsc::UnboundedSender, + sync::{Mutex, MutexGuard}, + task::{JoinError, JoinHandle}, }; use tungstenite::{connect, Message}; use url::Url; -use serde_derive::Deserialize; -use serde_derive::Serialize; pub struct ConsensusService { consensus_indices: Vec, @@ -58,20 +58,20 @@ pub struct ConsensusServiceConfig { pub cluster: Cluster, pub archive_config: ArchiveConfig, pub instance: Arc, - pub status_consensus: Arc>, + pub client_status: Arc>, pub sample_qty: usize, } #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] - pub struct RpcBlockCommitment { +pub struct RpcBlockCommitment { pub commitment: Option, pub total_stake: u64, } #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] - pub struct GetCommittmentResponse { +pub struct GetCommittmentResponse { pub jsonrpc: String, pub result: RpcBlockCommitment, pub id: i64, @@ -84,96 +84,95 @@ pub const VOTE_THRESHOLD_SIZE: f64 = 2f64 / 3f64; #[async_trait] impl ClientService for ConsensusService { -type ServiceError = tokio::task::JoinError; + type ServiceError = tokio::task::JoinError; -fn new(config: ConsensusServiceConfig) -> Self { - let consensus_handler = tokio::spawn(async move { - let rpc_url = endpoint(config.cluster); - let pub_sub = convert_to_websocket!(rpc_url); + fn new(config: ConsensusServiceConfig) -> Self { + let consensus_handler = tokio::spawn(async move { + let rpc_url = endpoint(config.cluster); + let pub_sub = convert_to_websocket!(rpc_url); - let mut threads = Vec::default(); + let mut threads = Vec::default(); - let (slot_update_tx, slot_update_rx) = crossbeam::channel::unbounded::(); + let (slot_update_tx, slot_update_rx) = crossbeam::channel::unbounded::(); - let status_arc = config.status_consensus.clone(); + let status_arc = config.client_status.clone(); - // waits on new slots => triggers slot_verify_loop - threads.push(tokio::spawn(slot_update_loop( - slot_update_tx, - pub_sub, - config.status_consensus, - ))); + // waits on new slots => triggers slot_verify_loop + threads.push(tokio::spawn(slot_update_loop( + slot_update_tx, + pub_sub, + config.client_status, + ))); - // verify slot votes - threads.push(tokio::spawn(slot_verify_loop( - slot_update_rx, - rpc_url, - status_arc, - ))); + // verify slot votes + threads.push(tokio::spawn(slot_verify_loop( + slot_update_rx, + rpc_url, + status_arc, + ))); + for thread in threads { + thread.await; + } + }); - for thread in threads { - thread.await; + Self { + consensus_handler, + consensus_indices: Vec::default(), } - }); - - Self { - consensus_handler, - consensus_indices: Vec::default(), } -} -async fn join(self) -> std::result::Result<(), Self::ServiceError> { - self.consensus_handler.await -} + async fn join(self) -> std::result::Result<(), Self::ServiceError> { + self.consensus_handler.await + } } pub async fn slot_update_loop( slot_update_tx: Sender, pub_sub: String, - status_sampler: Arc>, + client_status: Arc>, ) -> anyhow::Result<()> { -let result = match connect(Url::parse(pub_sub.as_str()).unwrap()) { - Ok((socket, _response)) => Some((socket, _response)), - Err(_) => { - let mut status = status_sampler.lock().await; - *status = ClientStatus::Crashed(String::from("Client can't connect to socket")); - None - } -}; - -if result.is_none() { - return Err(anyhow!("")); -} - -let (mut socket, _response) = result.unwrap(); - -socket.write_message(Message::Text( - r#"{ "jsonrpc": "2.0", "id": 1, "method": "slotSubscribe" }"#.into(), -))?; + let result = match connect(Url::parse(pub_sub.as_str()).unwrap()) { + Ok((socket, _response)) => Some((socket, _response)), + Err(_) => { + let mut status = client_status.lock().await; + *status = ClientStatus::Crashed(String::from("Client can't connect to socket")); + None + } + }; -loop { - match socket.read_message() { - Ok(msg) => { - let res = serde_json::from_str::(msg.to_string().as_str()); + if result.is_none() { + return Err(anyhow!("")); + } - // info!("res: {:?}", msg.to_string().as_str()); - if let Ok(res) = res { - match slot_update_tx.send(res.params.result.root as u64) { - Ok(_) => { - info!("slot updated: {:?}", res.params.result.root); - } - Err(e) => { - info!("error here: {:?} {:?}", e, res.params.result.root as u64); - continue; // @TODO: we should add retries here incase send fails for some reason + let (mut socket, _response) = result.unwrap(); + + socket.write_message(Message::Text( + r#"{ "jsonrpc": "2.0", "id": 1, "method": "slotSubscribe" }"#.into(), + ))?; + + loop { + match socket.read_message() { + Ok(msg) => { + let res = serde_json::from_str::(msg.to_string().as_str()); + + // info!("res: {:?}", msg.to_string().as_str()); + if let Ok(res) = res { + match slot_update_tx.send(res.params.result.root as u64) { + Ok(_) => { + info!("slot updated: {:?}", res.params.result.root); + } + Err(e) => { + info!("error here: {:?} {:?}", e, res.params.result.root as u64); + continue; // @TODO: we should add retries here incase send fails for some reason + } } } } + Err(e) => info!("err: {:?}", e), } - Err(e) => info!("err: {:?}", e), } } -} // verifies the total vote on the slot > 2/3 fn verify_slot(slot_commitment: RpcBlockCommitment) -> bool { @@ -191,20 +190,20 @@ fn verify_slot(slot_commitment: RpcBlockCommitment) -> boo pub async fn slot_verify_loop( slot_update_rx: Receiver, endpoint: String, - status_sampler: Arc>, + client_status: Arc>, ) -> anyhow::Result<()> { -loop { - let mut status = status_sampler.lock().await; + loop { + let mut status = client_status.lock().await; if let ClientStatus::Crashed(_) = &*status { return Err(anyhow!("Client crashed")); } else { - *status = ClientStatus::Active(String::from( - "Monitoring Tinydancer: Verifying consensus", - )); + *status = + ClientStatus::Active(String::from("Monitoring Tinydancer: Verifying consensus")); } + drop(status); if let Ok(slot) = slot_update_rx.recv() { let slot_commitment_result = request_slot_voting(slot, &endpoint).await; - + if let Err(e) = slot_commitment_result { println!("Error {}", e); info!("{}", e); @@ -229,7 +228,6 @@ pub async fn request_slot_voting( slot: u64, endpoint: &String, ) -> Result { - let request = serde_json::json!({ "jsonrpc": "2.0", "id": 1, diff --git a/tinydancer/src/macros.rs b/tinydancer/src/macros.rs index c747f67..7c0f646 100644 --- a/tinydancer/src/macros.rs +++ b/tinydancer/src/macros.rs @@ -17,6 +17,17 @@ macro_rules! block_on { rt.handle().block_on($func).expect($error); }; } +#[macro_export] +macro_rules! block_on_async { + ($func:expr) => {{ + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on($func) + }}; +} + #[macro_export] macro_rules! try_coerce_shred { ($response:expr) => {{ diff --git a/tinydancer/src/main.rs b/tinydancer/src/main.rs index 77a36ab..325c710 100644 --- a/tinydancer/src/main.rs +++ b/tinydancer/src/main.rs @@ -46,9 +46,9 @@ use std::{ use tinydancer::{endpoint, Cluster, TinyDancer, TinyDancerConfig}; mod macros; use colored::Colorize; +mod consensus; mod rpc_wrapper; mod sampler; -mod consensus; mod ui; use anyhow::{anyhow, Result}; @@ -149,7 +149,7 @@ async fn main() -> Result<()> { archive_path, shred_archive_duration, tui_monitor, - consensus_mode + consensus_mode, } => { let config_file = get_config_file().map_err(|_| anyhow!("tinydancer config not set"))?; @@ -234,8 +234,6 @@ async fn main() -> Result<()> { } } ConfigSubcommands::Set { log_path, cluster } => { - // println!("{:?}", fs::create_dir_all("~/.config/tinydancer")); - let home_path = std::env::var("HOME").unwrap(); let tinydancer_dir = home_path + "/.config/tinydancer"; diff --git a/tinydancer/src/sampler.rs b/tinydancer/src/sampler.rs index 885ce17..daa1b77 100644 --- a/tinydancer/src/sampler.rs +++ b/tinydancer/src/sampler.rs @@ -31,7 +31,7 @@ use solana_sdk::{ }; use std::str::FromStr; use std::sync::atomic::{AtomicU32, Ordering}; -use std::sync::{Arc, Mutex, MutexGuard}; +use std::sync::Arc; use std::{error::Error, ops::Add}; use std::{ net::{SocketAddr, UdpSocket}, @@ -40,6 +40,7 @@ use std::{ use tiny_logger::logs::{debug, error, info}; use tokio::{ sync::mpsc::UnboundedSender, + sync::{Mutex, MutexGuard}, task::{JoinError, JoinHandle}, }; use tungstenite::{connect, Message}; @@ -56,7 +57,7 @@ pub struct SampleServiceConfig { pub cluster: Cluster, pub archive_config: ArchiveConfig, pub instance: Arc, - pub status_sampler: Arc>, + pub client_status: Arc>, pub sample_qty: usize, } @@ -81,13 +82,13 @@ impl ClientService for SampleService { let (shred_tx, shred_rx) = crossbeam::channel::unbounded(); let (verified_shred_tx, verified_shred_rx) = crossbeam::channel::unbounded(); - let status_arc = config.status_sampler.clone(); + let status_arc = config.client_status.clone(); // waits on new slots => triggers shred_update_loop threads.push(tokio::spawn(slot_update_loop( slot_update_tx, pub_sub, - config.status_sampler, + config.client_status, ))); // sample shreds from new slot @@ -158,13 +159,14 @@ pub async fn request_shreds( pub async fn slot_update_loop( slot_update_tx: Sender, pub_sub: String, - status_sampler: Arc>, + client_status: Arc>, ) -> anyhow::Result<()> { let result = match connect(Url::parse(pub_sub.as_str()).unwrap()) { Ok((socket, _response)) => Some((socket, _response)), Err(_) => { - let mut status = status_sampler.lock().unwrap(); + let mut status = client_status.lock().await; *status = ClientStatus::Crashed(String::from("Client can't connect to socket")); + drop(status); None } }; @@ -318,18 +320,19 @@ async fn shred_update_loop( slot_update_rx: Receiver, endpoint: String, shred_tx: Sender<(Vec>, solana_ledger::shred::Pubkey)>, - status_sampler: Arc>, + client_status: Arc>, sample_qty: usize, ) -> anyhow::Result<()> { loop { { - let mut status = status_sampler.lock().unwrap(); + let mut status = client_status.lock().await; if let ClientStatus::Crashed(_) = &*status { return Err(anyhow!("Client crashed")); } else { *status = ClientStatus::Active(String::from( "Monitoring Tinydancer: Actively Sampling Shreds", )); + drop(status) } } diff --git a/tinydancer/src/tinydancer.rs b/tinydancer/src/tinydancer.rs index 8c228e0..e08f89b 100644 --- a/tinydancer/src/tinydancer.rs +++ b/tinydancer/src/tinydancer.rs @@ -1,18 +1,14 @@ //! Sampler struct - incharge of sampling shreds // use rayon::prelude::*; -use std::{ - env, - sync::{Arc, Mutex, MutexGuard}, - thread::Result, -}; +use std::{env, sync::Arc, thread::Result}; // use tokio::time::Duration; use crate::{ block_on, + consensus::{ConsensusService, ConsensusServiceConfig}, rpc_wrapper::{TransactionService, TransactionServiceConfig}, sampler::{ArchiveConfig, SampleService, SampleServiceConfig, SHRED_CF}, - consensus::{ConsensusService, ConsensusServiceConfig}, ui::{UiConfig, UiService}, }; use anyhow::anyhow; @@ -24,7 +20,12 @@ use tiny_logger::logs::info; // use log::info; // use log4rs; use std::error::Error; -use tokio::{runtime::Runtime, task::JoinError, try_join, sync::Mutex as TokioMutex,}; +use tokio::{ + runtime::Runtime, + sync::{Mutex, MutexGuard}, + task::JoinError, + try_join, +}; // use std::{thread, thread::JoinHandle, time::Duration}; #[async_trait] @@ -64,13 +65,8 @@ use std::path::PathBuf; impl TinyDancer { pub async fn start(config: TinyDancerConfig) -> Result<()> { let status = ClientStatus::Initializing(String::from("Starting Up Tinydancer")); - let status_clone = status.clone(); - let client_status = Arc::new(Mutex::new(status_clone)); - let status_sampler = client_status.clone(); - - let consensus_client_status = Arc::new(TokioMutex::new(status.clone())); - let status_consensus = consensus_client_status.clone(); - + let client_status = Arc::new(Mutex::new(status)); + let client_status_ui = client_status.clone(); let TinyDancerConfig { enable_ui_service, rpc_endpoint, @@ -93,46 +89,6 @@ impl TinyDancer { .unwrap(); let db = Arc::new(db); - - if consensus_mode { - println!("Running in consensus_mode"); - - let consensus_service_config = ConsensusServiceConfig { - cluster: rpc_endpoint.clone(), - archive_config, - instance: db.clone(), - status_consensus: status_consensus.clone(), - sample_qty, - }; - - let consensus_service = ConsensusService::new(consensus_service_config); - - // run the sampling service - consensus_service - .join() - .await - .expect("error in consensus service thread"); - } - - else{ - - let sample_service_config = SampleServiceConfig { - cluster: rpc_endpoint.clone(), - archive_config, - instance: db.clone(), - status_sampler, - sample_qty, - }; - - let sample_service = SampleService::new(sample_service_config); - - // run the sampling service - sample_service - .join() - .await - .expect("error in sample service thread"); - } - let transaction_service = TransactionService::new(TransactionServiceConfig { cluster: rpc_endpoint.clone(), db_instance: db.clone(), @@ -140,14 +96,46 @@ impl TinyDancer { let ui_service = if enable_ui_service || tui_monitor { Some(UiService::new(UiConfig { - client_status, + client_status: client_status_ui, enable_ui_service, tui_monitor, })) } else { None }; + // run the sampling service + if !consensus_mode { + let sample_service_config = SampleServiceConfig { + cluster: rpc_endpoint.clone(), + archive_config: archive_config.clone(), + instance: db.clone(), + client_status: client_status.clone(), + sample_qty, + }; + let sample_service = SampleService::new(sample_service_config); + sample_service + .join() + .await + .expect("error in sample service thread"); + } + if consensus_mode { + let consensus_service_config = ConsensusServiceConfig { + cluster: rpc_endpoint.clone(), + archive_config, + instance: db.clone(), + client_status, + sample_qty, + }; + + let consensus_service = ConsensusService::new(consensus_service_config); + + // run the consensus service + consensus_service + .join() + .await + .expect("error in consensus service thread"); + } transaction_service .join() .await diff --git a/tinydancer/src/ui/ui.rs b/tinydancer/src/ui/ui.rs index c4703f6..5bcf9fd 100644 --- a/tinydancer/src/ui/ui.rs +++ b/tinydancer/src/ui/ui.rs @@ -1,3 +1,4 @@ +use crate::block_on_async; use crate::sampler::GetShredResponse; use crate::tinydancer::{ClientService, ClientStatus, TinyDancer}; use async_trait::async_trait; @@ -8,13 +9,14 @@ use crossterm::{ terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen}, }; use spinoff::{spinners, Color as SpinColor, Spinner}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::thread::sleep; use std::time::Duration; use std::{any::Any, thread::Thread}; use std::{fmt, thread::JoinHandle}; use thiserror::Error; use tiny_logger::logs::info; +use tokio::sync::{Mutex, MutexGuard}; use tui::layout::Rect; use tui::style::{Color, Modifier, Style}; use tui::text::{Span, Spans}; @@ -225,12 +227,30 @@ impl ClientService for UiService { threads.push(std::thread::spawn(move || loop { sleep(Duration::from_millis(100)); + enable_raw_mode(); + if crossterm::event::poll(Duration::from_millis(100)).unwrap() { + let ev = crossterm::event::read().unwrap(); + if ev + == Event::Key(KeyEvent { + code: KeyCode::Char('c'), + modifiers: KeyModifiers::CONTROL, + kind: KeyEventKind::Press, + state: KeyEventState::NONE, + }) + { + let mut status = block_on_async!(client_status.lock()); + *status = ClientStatus::ShuttingDown(String::from( + "Shutting Down Gracefully...", + )); + drop(status); + disable_raw_mode(); + } + } + let status = block_on_async!(client_status.lock()); - let status = client_status.lock().unwrap(); match &*status { ClientStatus::Active(msg) => { spinner.update(spinners::Dots, msg.clone(), SpinColor::Green); - // sleep(Duration::from_secs(100)); } ClientStatus::Initializing(msg) => { spinner.update(spinners::Dots, msg.clone(), SpinColor::Yellow); @@ -245,27 +265,7 @@ impl ClientService for UiService { } _ => {} } - Mutex::unlock(status); - enable_raw_mode(); - if crossterm::event::poll(Duration::from_millis(100)).unwrap() { - let ev = crossterm::event::read().unwrap(); - - if ev - == Event::Key(KeyEvent { - code: KeyCode::Char('c'), - modifiers: KeyModifiers::CONTROL, - kind: KeyEventKind::Press, - state: KeyEventState::NONE, - }) - { - let mut status = client_status.lock().unwrap(); - *status = ClientStatus::ShuttingDown(String::from( - "Shutting Down Gracefully...", - )); - Mutex::unlock(status); - disable_raw_mode(); - } - } + drop(status); })); }