diff --git a/tinydancer/src/consensus.rs b/tinydancer/src/consensus.rs new file mode 100644 index 0000000..48ff428 --- /dev/null +++ b/tinydancer/src/consensus.rs @@ -0,0 +1,246 @@ +use crate::tinydancer::{endpoint, ClientService, ClientStatus, Cluster}; +use crate::sampler::{ArchiveConfig, SlotSubscribeResponse}; +use crate::{convert_to_websocket, send_rpc_call, try_coerce_shred}; +use anyhow::anyhow; +use async_trait::async_trait; +use crossbeam::channel::{Receiver, Sender}; +use futures::Sink; +use itertools::Itertools; +use rand::distributions::Uniform; +use rand::prelude::*; +use rayon::prelude::*; +use reqwest::Request; +use rocksdb::{ColumnFamily, Options as RocksOptions, DB}; +use serde::de::DeserializeOwned; +use solana_ledger::shred::{ShredId, ShredType}; +use solana_ledger::{ +ancestor_iterator::{AncestorIterator, AncestorIteratorWithHash}, +blockstore::Blockstore, +// blockstore_db::columns::ShredCode, +shred::{Nonce, Shred, ShredCode, ShredData, ShredFetchStats, SIZE_OF_NONCE}, +}; +use solana_sdk::hash::hashv; +use solana_sdk::{ +clock::Slot, +genesis_config::ClusterType, +hash::{Hash, HASH_BYTES}, +packet::PACKET_DATA_SIZE, +pubkey::{Pubkey, PUBKEY_BYTES}, +signature::{Signable, Signature, Signer, SIGNATURE_BYTES}, +signer::keypair::Keypair, +timing::{duration_as_ms, timestamp}, +}; +use std::str::FromStr; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::{Arc, Mutex, MutexGuard}; +use std::{error::Error, ops::Add}; +use std::{ +net::{SocketAddr, UdpSocket}, +thread::Builder, +}; +use tiny_logger::logs::{debug, error, info}; +use tokio::{ +sync::mpsc::UnboundedSender, +task::{JoinError, JoinHandle}, +sync::Mutex as TokioMutex, +}; +use tungstenite::{connect, Message}; +use url::Url; +use serde_derive::Deserialize; +use serde_derive::Serialize; + +pub struct ConsensusService { + consensus_indices: Vec, + consensus_handler: JoinHandle<()>, +} + +pub struct ConsensusServiceConfig { + pub cluster: Cluster, + pub archive_config: ArchiveConfig, + pub instance: Arc, + pub status_consensus: Arc>, + pub sample_qty: usize, +} + +#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] + pub struct RpcBlockCommitment { + pub commitment: Option, + pub total_stake: u64, +} + +#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] + pub struct GetCommittmentResponse { + pub jsonrpc: String, + pub result: RpcBlockCommitment, + pub id: i64, +} + +pub const MAX_LOCKOUT_HISTORY: usize = 31; +pub type BlockCommitmentArray = [u64; MAX_LOCKOUT_HISTORY + 1]; + +pub const VOTE_THRESHOLD_SIZE: f64 = 2f64 / 3f64; + +#[async_trait] +impl ClientService for ConsensusService { +type ServiceError = tokio::task::JoinError; + +fn new(config: ConsensusServiceConfig) -> Self { + let consensus_handler = tokio::spawn(async move { + let rpc_url = endpoint(config.cluster); + let pub_sub = convert_to_websocket!(rpc_url); + + let mut threads = Vec::default(); + + let (slot_update_tx, slot_update_rx) = crossbeam::channel::unbounded::(); + + let status_arc = config.status_consensus.clone(); + + // waits on new slots => triggers slot_verify_loop + threads.push(tokio::spawn(slot_update_loop( + slot_update_tx, + pub_sub, + config.status_consensus, + ))); + + // verify slot votes + threads.push(tokio::spawn(slot_verify_loop( + slot_update_rx, + rpc_url, + status_arc, + ))); + + + for thread in threads { + thread.await; + } + }); + + Self { + consensus_handler, + consensus_indices: Vec::default(), + } +} + +async fn join(self) -> std::result::Result<(), Self::ServiceError> { + self.consensus_handler.await +} +} + +pub async fn slot_update_loop( + slot_update_tx: Sender, + pub_sub: String, + status_sampler: Arc>, +) -> anyhow::Result<()> { +let result = match connect(Url::parse(pub_sub.as_str()).unwrap()) { + Ok((socket, _response)) => Some((socket, _response)), + Err(_) => { + let mut status = status_sampler.lock().await; + *status = ClientStatus::Crashed(String::from("Client can't connect to socket")); + None + } +}; + +if result.is_none() { + return Err(anyhow!("")); +} + +let (mut socket, _response) = result.unwrap(); + +socket.write_message(Message::Text( + r#"{ "jsonrpc": "2.0", "id": 1, "method": "slotSubscribe" }"#.into(), +))?; + +loop { + match socket.read_message() { + Ok(msg) => { + let res = serde_json::from_str::(msg.to_string().as_str()); + + // info!("res: {:?}", msg.to_string().as_str()); + if let Ok(res) = res { + match slot_update_tx.send(res.params.result.root as u64) { + Ok(_) => { + info!("slot updated: {:?}", res.params.result.root); + } + Err(e) => { + info!("error here: {:?} {:?}", e, res.params.result.root as u64); + continue; // @TODO: we should add retries here incase send fails for some reason + } + } + } + } + Err(e) => info!("err: {:?}", e), + } +} +} + +// verifies the total vote on the slot > 2/3 +fn verify_slot(slot_commitment: RpcBlockCommitment) -> bool { + let commitment_array = &slot_commitment.commitment; + let total_stake = &slot_commitment.total_stake; + let sum: u64 = commitment_array.iter().flatten().sum(); + + if (sum as f64 / *total_stake as f64) > VOTE_THRESHOLD_SIZE { + true + } else { + false + } +} + +pub async fn slot_verify_loop( + slot_update_rx: Receiver, + endpoint: String, + status_sampler: Arc>, +) -> anyhow::Result<()> { +loop { + let mut status = status_sampler.lock().await; + if let ClientStatus::Crashed(_) = &*status { + return Err(anyhow!("Client crashed")); + } else { + *status = ClientStatus::Active(String::from( + "Monitoring Tinydancer: Verifying consensus", + )); + } + if let Ok(slot) = slot_update_rx.recv() { + let slot_commitment_result = request_slot_voting(slot, &endpoint).await; + + if let Err(e) = slot_commitment_result { + println!("Error {}", e); + info!("{}", e); + continue; + } + + let slot_commitment = slot_commitment_result.unwrap(); + + let verified = verify_slot(slot_commitment.result); + + if verified { + info!("slot {:?} verified ", slot); + } else { + info!("slot {:?} failed to verified ", slot); + info!("sample INVALID for slot : {:?}", slot); + } + } + } +} + +pub async fn request_slot_voting( + slot: u64, + endpoint: &String, +) -> Result { + + let request = serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "getBlockCommitment", + "params": [ + slot + ] + }) + .to_string(); + + let res = send_rpc_call!(endpoint, request); + + serde_json::from_str::(&res) +} diff --git a/tinydancer/src/main.rs b/tinydancer/src/main.rs index 9d1c522..77a36ab 100644 --- a/tinydancer/src/main.rs +++ b/tinydancer/src/main.rs @@ -48,6 +48,7 @@ mod macros; use colored::Colorize; mod rpc_wrapper; mod sampler; +mod consensus; mod ui; use anyhow::{anyhow, Result}; @@ -85,6 +86,10 @@ pub enum Commands { /// Duration after which shreds will be purged #[clap(required = false, default_value_t = 10000000)] shred_archive_duration: u64, + + /// Run the node in consensus mode + #[clap(long, short)] + consensus_mode: bool, }, /// Verify the samples for a single slot Verify { @@ -144,6 +149,7 @@ async fn main() -> Result<()> { archive_path, shred_archive_duration, tui_monitor, + consensus_mode } => { let config_file = get_config_file().map_err(|_| anyhow!("tinydancer config not set"))?; @@ -152,6 +158,7 @@ async fn main() -> Result<()> { rpc_endpoint: get_cluster(config_file.cluster), sample_qty, tui_monitor, + consensus_mode, log_path: config_file.log_path, archive_config: { archive_path diff --git a/tinydancer/src/sampler.rs b/tinydancer/src/sampler.rs index 4d7448a..885ce17 100644 --- a/tinydancer/src/sampler.rs +++ b/tinydancer/src/sampler.rs @@ -155,7 +155,7 @@ pub async fn request_shreds( serde_json::from_str::(&res) } -async fn slot_update_loop( +pub async fn slot_update_loop( slot_update_tx: Sender, pub_sub: String, status_sampler: Arc>, @@ -535,6 +535,7 @@ pub struct GetShredResponse { pub result: GetShredResult, pub id: i64, } + #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct GetShredResult { diff --git a/tinydancer/src/tinydancer.rs b/tinydancer/src/tinydancer.rs index 09cbcb3..8c228e0 100644 --- a/tinydancer/src/tinydancer.rs +++ b/tinydancer/src/tinydancer.rs @@ -12,6 +12,7 @@ use crate::{ block_on, rpc_wrapper::{TransactionService, TransactionServiceConfig}, sampler::{ArchiveConfig, SampleService, SampleServiceConfig, SHRED_CF}, + consensus::{ConsensusService, ConsensusServiceConfig}, ui::{UiConfig, UiService}, }; use anyhow::anyhow; @@ -23,7 +24,7 @@ use tiny_logger::logs::info; // use log::info; // use log4rs; use std::error::Error; -use tokio::{runtime::Runtime, task::JoinError, try_join}; +use tokio::{runtime::Runtime, task::JoinError, try_join, sync::Mutex as TokioMutex,}; // use std::{thread, thread::JoinHandle, time::Duration}; #[async_trait] @@ -49,6 +50,7 @@ pub struct TinyDancerConfig { pub enable_ui_service: bool, pub archive_config: ArchiveConfig, pub tui_monitor: bool, + pub consensus_mode: bool, pub log_path: String, } @@ -62,10 +64,13 @@ use std::path::PathBuf; impl TinyDancer { pub async fn start(config: TinyDancerConfig) -> Result<()> { let status = ClientStatus::Initializing(String::from("Starting Up Tinydancer")); - - let client_status = Arc::new(Mutex::new(status)); + let status_clone = status.clone(); + let client_status = Arc::new(Mutex::new(status_clone)); let status_sampler = client_status.clone(); + let consensus_client_status = Arc::new(TokioMutex::new(status.clone())); + let status_consensus = consensus_client_status.clone(); + let TinyDancerConfig { enable_ui_service, rpc_endpoint, @@ -73,9 +78,10 @@ impl TinyDancer { tui_monitor, log_path, archive_config, + consensus_mode, } = config.clone(); std::env::set_var("RUST_LOG", "info"); - tiny_logger::setup_file_with_default(&log_path, "RUST_LOG"); + // tiny_logger::setup_file_with_default(&log_path, "RUST_LOG"); let mut opts = rocksdb::Options::default(); opts.create_if_missing(true); @@ -87,14 +93,45 @@ impl TinyDancer { .unwrap(); let db = Arc::new(db); - let sample_service_config = SampleServiceConfig { - cluster: rpc_endpoint.clone(), - archive_config, - instance: db.clone(), - status_sampler, - sample_qty, - }; - let sample_service = SampleService::new(sample_service_config); + + if consensus_mode { + println!("Running in consensus_mode"); + + let consensus_service_config = ConsensusServiceConfig { + cluster: rpc_endpoint.clone(), + archive_config, + instance: db.clone(), + status_consensus: status_consensus.clone(), + sample_qty, + }; + + let consensus_service = ConsensusService::new(consensus_service_config); + + // run the sampling service + consensus_service + .join() + .await + .expect("error in consensus service thread"); + } + + else{ + + let sample_service_config = SampleServiceConfig { + cluster: rpc_endpoint.clone(), + archive_config, + instance: db.clone(), + status_sampler, + sample_qty, + }; + + let sample_service = SampleService::new(sample_service_config); + + // run the sampling service + sample_service + .join() + .await + .expect("error in sample service thread"); + } let transaction_service = TransactionService::new(TransactionServiceConfig { cluster: rpc_endpoint.clone(), @@ -111,12 +148,6 @@ impl TinyDancer { None }; - // run - sample_service - .join() - .await - .expect("error in sample service thread"); - transaction_service .join() .await @@ -147,6 +178,7 @@ pub fn endpoint(cluster: Cluster) -> String { Cluster::Custom(url) => url, } } +#[derive(Clone, PartialEq, Debug)] pub enum ClientStatus { Initializing(String), SearchingForRPCService(String),