Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for verifying Tower BFT consensus #32

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
**.log
tmp/
.vscode
db/
db/
archives
244 changes: 244 additions & 0 deletions tinydancer/src/consensus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
use crate::sampler::{ArchiveConfig, SlotSubscribeResponse};
use crate::tinydancer::{endpoint, ClientService, ClientStatus, Cluster};
use crate::{convert_to_websocket, send_rpc_call, try_coerce_shred};
use anyhow::anyhow;
use async_trait::async_trait;
use crossbeam::channel::{Receiver, Sender};
use futures::Sink;
use itertools::Itertools;
use rand::distributions::Uniform;
use rand::prelude::*;
use rayon::prelude::*;
use reqwest::Request;
use rocksdb::{ColumnFamily, Options as RocksOptions, DB};
use serde::de::DeserializeOwned;
use serde_derive::Deserialize;
use serde_derive::Serialize;
use solana_ledger::shred::{ShredId, ShredType};
use solana_ledger::{
ancestor_iterator::{AncestorIterator, AncestorIteratorWithHash},
blockstore::Blockstore,
// blockstore_db::columns::ShredCode,
shred::{Nonce, Shred, ShredCode, ShredData, ShredFetchStats, SIZE_OF_NONCE},
};
use solana_sdk::hash::hashv;
use solana_sdk::{
clock::Slot,
genesis_config::ClusterType,
hash::{Hash, HASH_BYTES},
packet::PACKET_DATA_SIZE,
pubkey::{Pubkey, PUBKEY_BYTES},
signature::{Signable, Signature, Signer, SIGNATURE_BYTES},
signer::keypair::Keypair,
timing::{duration_as_ms, timestamp},
};
use std::str::FromStr;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::{error::Error, ops::Add};
use std::{
net::{SocketAddr, UdpSocket},
thread::Builder,
};
use tiny_logger::logs::{debug, error, info};
use tokio::{
sync::mpsc::UnboundedSender,
sync::{Mutex, MutexGuard},
task::{JoinError, JoinHandle},
};
use tungstenite::{connect, Message};
use url::Url;

pub struct ConsensusService {
consensus_indices: Vec<u64>,
consensus_handler: JoinHandle<()>,
}

pub struct ConsensusServiceConfig {
pub cluster: Cluster,
pub archive_config: ArchiveConfig,
pub instance: Arc<rocksdb::DB>,
pub client_status: Arc<Mutex<ClientStatus>>,
pub sample_qty: usize,
}

#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RpcBlockCommitment<T> {
pub commitment: Option<T>,
pub total_stake: u64,
}

#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct GetCommittmentResponse {
pub jsonrpc: String,
pub result: RpcBlockCommitment<BlockCommitmentArray>,
pub id: i64,
}

pub const MAX_LOCKOUT_HISTORY: usize = 31;
pub type BlockCommitmentArray = [u64; MAX_LOCKOUT_HISTORY + 1];

pub const VOTE_THRESHOLD_SIZE: f64 = 2f64 / 3f64;

#[async_trait]
impl ClientService<ConsensusServiceConfig> for ConsensusService {
type ServiceError = tokio::task::JoinError;

fn new(config: ConsensusServiceConfig) -> Self {
let consensus_handler = tokio::spawn(async move {
let rpc_url = endpoint(config.cluster);
let pub_sub = convert_to_websocket!(rpc_url);

let mut threads = Vec::default();

let (slot_update_tx, slot_update_rx) = crossbeam::channel::unbounded::<u64>();

let status_arc = config.client_status.clone();

// waits on new slots => triggers slot_verify_loop
threads.push(tokio::spawn(slot_update_loop(
slot_update_tx,
pub_sub,
config.client_status,
)));

// verify slot votes
threads.push(tokio::spawn(slot_verify_loop(
slot_update_rx,
rpc_url,
status_arc,
)));

for thread in threads {
thread.await;
}
});

Self {
consensus_handler,
consensus_indices: Vec::default(),
}
}

async fn join(self) -> std::result::Result<(), Self::ServiceError> {
self.consensus_handler.await
}
}

pub async fn slot_update_loop(
slot_update_tx: Sender<u64>,
pub_sub: String,
client_status: Arc<Mutex<ClientStatus>>,
) -> anyhow::Result<()> {
let result = match connect(Url::parse(pub_sub.as_str()).unwrap()) {
Ok((socket, _response)) => Some((socket, _response)),
Err(_) => {
let mut status = client_status.lock().await;
*status = ClientStatus::Crashed(String::from("Client can't connect to socket"));
None
}
};

if result.is_none() {
return Err(anyhow!(""));
}

let (mut socket, _response) = result.unwrap();

socket.write_message(Message::Text(
r#"{ "jsonrpc": "2.0", "id": 1, "method": "slotSubscribe" }"#.into(),
))?;

loop {
match socket.read_message() {
Ok(msg) => {
let res = serde_json::from_str::<SlotSubscribeResponse>(msg.to_string().as_str());

// info!("res: {:?}", msg.to_string().as_str());
if let Ok(res) = res {
match slot_update_tx.send(res.params.result.root as u64) {
Ok(_) => {
info!("slot updated: {:?}", res.params.result.root);
}
Err(e) => {
info!("error here: {:?} {:?}", e, res.params.result.root as u64);
continue; // @TODO: we should add retries here incase send fails for some reason
}
}
}
}
Err(e) => info!("err: {:?}", e),
}
}
}

// verifies the total vote on the slot > 2/3
fn verify_slot(slot_commitment: RpcBlockCommitment<BlockCommitmentArray>) -> bool {
let commitment_array = &slot_commitment.commitment;
let total_stake = &slot_commitment.total_stake;
let sum: u64 = commitment_array.iter().flatten().sum();

if (sum as f64 / *total_stake as f64) > VOTE_THRESHOLD_SIZE {
true
} else {
false
}
}

pub async fn slot_verify_loop(
slot_update_rx: Receiver<u64>,
endpoint: String,
client_status: Arc<Mutex<ClientStatus>>,
) -> anyhow::Result<()> {
loop {
let mut status = client_status.lock().await;
if let ClientStatus::Crashed(_) = &*status {
return Err(anyhow!("Client crashed"));
} else {
*status =
ClientStatus::Active(String::from("Monitoring Tinydancer: Verifying consensus"));
}
drop(status);
if let Ok(slot) = slot_update_rx.recv() {
let slot_commitment_result = request_slot_voting(slot, &endpoint).await;

if let Err(e) = slot_commitment_result {
println!("Error {}", e);
info!("{}", e);
continue;
}

let slot_commitment = slot_commitment_result.unwrap();

let verified = verify_slot(slot_commitment.result);

if verified {
info!("slot {:?} verified ", slot);
} else {
info!("slot {:?} failed to verified ", slot);
info!("sample INVALID for slot : {:?}", slot);
}
}
}
}

pub async fn request_slot_voting(
slot: u64,
endpoint: &String,
) -> Result<GetCommittmentResponse, serde_json::Error> {
let request = serde_json::json!({
"jsonrpc": "2.0",
"id": 1,
"method": "getBlockCommitment",
"params": [
slot
]
})
.to_string();

let res = send_rpc_call!(endpoint, request);

serde_json::from_str::<GetCommittmentResponse>(&res)
}
11 changes: 11 additions & 0 deletions tinydancer/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ macro_rules! block_on {
rt.handle().block_on($func).expect($error);
};
}
#[macro_export]
macro_rules! block_on_async {
($func:expr) => {{
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on($func)
}};
}

#[macro_export]
macro_rules! try_coerce_shred {
($response:expr) => {{
Expand Down
9 changes: 7 additions & 2 deletions tinydancer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use std::{
use tinydancer::{endpoint, Cluster, TinyDancer, TinyDancerConfig};
mod macros;
use colored::Colorize;
mod consensus;
mod rpc_wrapper;
mod sampler;
mod ui;
Expand Down Expand Up @@ -85,6 +86,10 @@ pub enum Commands {
/// Duration after which shreds will be purged
#[clap(required = false, default_value_t = 10000000)]
shred_archive_duration: u64,

/// Run the node in consensus mode
#[clap(long, short)]
consensus_mode: bool,
},
/// Verify the samples for a single slot
Verify {
Expand Down Expand Up @@ -144,6 +149,7 @@ async fn main() -> Result<()> {
archive_path,
shred_archive_duration,
tui_monitor,
consensus_mode,
} => {
let config_file =
get_config_file().map_err(|_| anyhow!("tinydancer config not set"))?;
Expand All @@ -152,6 +158,7 @@ async fn main() -> Result<()> {
rpc_endpoint: get_cluster(config_file.cluster),
sample_qty,
tui_monitor,
consensus_mode,
log_path: config_file.log_path,
archive_config: {
archive_path
Expand Down Expand Up @@ -227,8 +234,6 @@ async fn main() -> Result<()> {
}
}
ConfigSubcommands::Set { log_path, cluster } => {
// println!("{:?}", fs::create_dir_all("~/.config/tinydancer"));

let home_path = std::env::var("HOME").unwrap();
let tinydancer_dir = home_path + "/.config/tinydancer";

Expand Down
Loading