Skip to content

Commit

Permalink
Merge branch '1447-rebase-01' into issue-1447-stake-contract
Browse files Browse the repository at this point in the history
  • Loading branch information
miloszm committed Mar 8, 2024
2 parents 4ca2414 + 70a9ae2 commit f52d524
Show file tree
Hide file tree
Showing 319 changed files with 26,016 additions and 21,236 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/webwallet_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ jobs:
run: npm ci
working-directory: ./web-wallet

- name: Formatting check
run: npm run format
working-directory: ./web-wallet

- name: Linting
run: npm run lint
working-directory: ./web-wallet
Expand Down
4 changes: 4 additions & 0 deletions consensus/src/execution_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@ impl<'a, DB: Database, T: Operations + 'static> ExecutionCtx<'a, DB, T> {
Err(ConsensusError::FutureEvent) => {
trace!("future msg {:?}", msg);

self.outbound.send(msg.clone()).await.unwrap_or_else(|err| {
error!("unable to re-publish a handled msg {:?}", err)
});

self.future_msgs.lock().await.put_event(
msg.header.round,
msg.get_step(),
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/quorum/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ impl<'p, D: Database> Executor<'p, D> {
0,
msg.clone(),
);

self.publish(msg.clone()).await;
}
Status::Present => {
if let Some(block) = self.collect_inbound_msg(msg).await
Expand Down
1 change: 1 addition & 0 deletions node-data/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ async-channel = "1.7"
chrono = "0.4"
bs58 = { version = "0.4" }
tracing = "0.1"
anyhow = "1"


[dev-dependencies]
Expand Down
59 changes: 34 additions & 25 deletions node-data/src/bls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,11 @@ impl Debug for PublicKeyBytes {
pub fn load_keys(
path: String,
pwd: String,
) -> (dusk_bls12_381_sign::SecretKey, PublicKey) {
) -> anyhow::Result<(dusk_bls12_381_sign::SecretKey, PublicKey)> {
let path_buf = PathBuf::from(path);
let (pk, sk) = read_from_file(path_buf, &pwd);
let (pk, sk) = read_from_file(path_buf, &pwd)?;

(sk, PublicKey::new(pk))
Ok((sk, PublicKey::new(pk)))
}

/// Fetches BLS public and secret keys from an encrypted consensus keys file.
Expand All @@ -151,10 +151,10 @@ pub fn load_keys(
fn read_from_file(
path: PathBuf,
pwd: &str,
) -> (
) -> anyhow::Result<(
dusk_bls12_381_sign::PublicKey,
dusk_bls12_381_sign::SecretKey,
) {
)> {
use serde::Deserialize;

/// Bls key pair helper structure
Expand All @@ -166,36 +166,45 @@ fn read_from_file(

// attempt to load and decode wallet
println!("{path:?}");
let ciphertext =
fs::read(path).expect("path should be valid consensus keys file");
let ciphertext = fs::read(&path).map_err(|e| {
anyhow::anyhow!(
"{} should be valid consensus keys file {e}",
path.display()
)
})?;

let mut hasher = Sha256::new();
hasher.update(pwd.as_bytes());
let hashed_pwd = hasher.finalize().to_vec();

let bytes = decrypt(&ciphertext[..], &hashed_pwd).unwrap_or_else(|_| {
let hashed_pwd = blake3::hash(pwd.as_bytes());
let bytes = decrypt(&ciphertext[..], hashed_pwd.as_bytes())
.expect("Invalid consensus keys password");
warn!("Your consensus keys are in the old format");
warn!("Consider to export them using a new version of the wallet");
bytes
});
let bytes = match decrypt(&ciphertext[..], &hashed_pwd) {
Ok(bytes) => bytes,
Err(_) => {
let bytes = decrypt(&ciphertext[..], &hashed_pwd).map_err(|e| {
anyhow::anyhow!("Invalid consensus keys password {e}")
})?;
warn!("Your consensus keys are in the old format");
warn!("Consider to export them using a new version of the wallet");
bytes
}
};

let keys: BlsKeyPair =
serde_json::from_slice(&bytes).expect("keys files should contain json");
let keys: BlsKeyPair = serde_json::from_slice(&bytes)
.map_err(|e| anyhow::anyhow!("keys files should contain json {e}"))?;

let sk = dusk_bls12_381_sign::SecretKey::from_slice(
&base64::decode(keys.secret_key_bls).expect("sk should be base64")[..],
)
.expect("sk should be valid");
let sk_bytes = base64::decode(keys.secret_key_bls)
.map_err(|e| anyhow::anyhow!("sk should be base64 {e}"))?;

let sk = dusk_bls12_381_sign::SecretKey::from_slice(&sk_bytes)
.map_err(|e| anyhow::anyhow!("sk should be valid {e:?}"))?;

let pk = dusk_bls12_381_sign::PublicKey::from_slice(
&base64::decode(keys.public_key_bls).expect("pk should be base64")[..],
&base64::decode(keys.public_key_bls)
.map_err(|e| anyhow::anyhow!("pk should be base64 {e}"))?[..],
)
.expect("pk should be valid");
.map_err(|e| anyhow::anyhow!("pk should be valid {e:?}"))?;

(pk, sk)
Ok((pk, sk))
}

fn decrypt(data: &[u8], pwd: &[u8]) -> Result<Vec<u8>, BlockModeError> {
Expand Down Expand Up @@ -224,7 +233,7 @@ pub fn load_provisioners_keys(
path.push_str(&format!("node_{i}.keys"));
let path_buf = PathBuf::from(path);

let (pk, sk) = read_from_file(path_buf, &pwd);
let (pk, sk) = read_from_file(path_buf, &pwd).unwrap();

keys.push((sk, PublicKey::new(pk)));
}
Expand Down
9 changes: 7 additions & 2 deletions node-data/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -972,8 +972,13 @@ pub struct AsyncQueue<M: Clone> {
sender: async_channel::Sender<M>,
}

impl<M: Clone> Default for AsyncQueue<M> {
fn default() -> Self {
impl<M: Clone> AsyncQueue<M> {
pub fn bounded(cap: usize) -> Self {
let (sender, receiver) = async_channel::bounded(cap);
Self { receiver, sender }
}

pub fn unbounded() -> Self {
let (sender, receiver) = async_channel::unbounded();
Self { receiver, sender }
}
Expand Down
54 changes: 34 additions & 20 deletions node/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,38 +44,29 @@ const TOPICS: &[u8] = &[
const ACCEPT_BLOCK_TIMEOUT_SEC: Duration = Duration::from_secs(20);
const HEARTBEAT_SEC: Duration = Duration::from_secs(1);

pub struct ChainSrv {
pub struct ChainSrv<N: Network, DB: database::DB, VM: vm::VMExecution> {
/// Inbound wire messages queue
inbound: AsyncQueue<Message>,
keys_path: String,
acceptor: Option<Arc<RwLock<Acceptor<N, DB, VM>>>>,
}

#[async_trait]
impl<N: Network, DB: database::DB, VM: vm::VMExecution>
LongLivedService<N, DB, VM> for ChainSrv
LongLivedService<N, DB, VM> for ChainSrv<N, DB, VM>
{
async fn execute(
async fn initialize(
&mut self,
network: Arc<RwLock<N>>,
db: Arc<RwLock<DB>>,
vm: Arc<RwLock<VM>>,
) -> anyhow::Result<usize> {
// Register routes
LongLivedService::<N, DB, VM>::add_routes(
self,
TOPICS,
self.inbound.clone(),
&network,
)
.await?;

// Restore/Load most recent block
) -> anyhow::Result<()> {
let mrb = Self::load_most_recent_block(db.clone(), vm.clone()).await?;

let state_hash = mrb.inner().header().state_hash;
let provisioners_list = vm.read().await.get_provisioners(state_hash)?;

// Initialize Acceptor and trigger consensus task
// Initialize Acceptor
let acc = Acceptor::init_consensus(
&self.keys_path,
mrb,
Expand All @@ -85,7 +76,29 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
vm.clone(),
)
.await?;
let acc = Arc::new(RwLock::new(acc));

self.acceptor = Some(Arc::new(RwLock::new(acc)));

Ok(())
}

async fn execute(
&mut self,
network: Arc<RwLock<N>>,
_db: Arc<RwLock<DB>>,
_vm: Arc<RwLock<VM>>,
) -> anyhow::Result<usize> {
// Register routes
LongLivedService::<N, DB, VM>::add_routes(
self,
TOPICS,
self.inbound.clone(),
&network,
)
.await?;

let acc = self.acceptor.as_mut().expect("initialize is called");
acc.write().await.spawn_task().await;

// Start-up FSM instance
let mut fsm = SimpleFSM::new(acc.clone(), network.clone());
Expand Down Expand Up @@ -164,7 +177,7 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
| Payload::Ratification(_)
| Payload::Quorum(_) => {
if let Err(e) = acc.read().await.reroute_msg(msg).await {
warn!("Unable to reroute_msg to the acceptor: {e}");
warn!("msg discarded: {e}");
}
}
_ => warn!("invalid inbound message"),
Expand Down Expand Up @@ -200,11 +213,12 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
}
}

impl ChainSrv {
impl<N: Network, DB: database::DB, VM: vm::VMExecution> ChainSrv<N, DB, VM> {
pub fn new(keys_path: String) -> Self {
Self {
inbound: Default::default(),
inbound: AsyncQueue::unbounded(),
keys_path,
acceptor: None,
}
}

Expand All @@ -213,7 +227,7 @@ impl ChainSrv {
/// Panics
///
/// If register entry is read but block is not found.
async fn load_most_recent_block<DB: database::DB, VM: vm::VMExecution>(
async fn load_most_recent_block(
db: Arc<RwLock<DB>>,
vm: Arc<RwLock<VM>>,
) -> Result<BlockWithLabel> {
Expand Down
42 changes: 33 additions & 9 deletions node/src/chain/acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
db: db.clone(),
vm: vm.clone(),
network: network.clone(),
task: RwLock::new(Task::new_with_keys(keys_path.to_string())),
task: RwLock::new(Task::new_with_keys(keys_path.to_string())?),
};

// NB. After restart, state_root returned by VM is always the last
Expand All @@ -161,11 +161,10 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
acc.try_revert(RevertTarget::LastFinalizedState).await?;
}

acc.spawn_task().await;
Ok(acc)
}

async fn spawn_task(&self) {
pub async fn spawn_task(&self) {
let provisioners_list = self.provisioners_list.read().await.clone();
let base_timeouts = self.adjust_round_base_timeouts().await;

Expand All @@ -183,15 +182,36 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
pub(crate) async fn reroute_msg(
&self,
msg: Message,
) -> Result<(), async_channel::SendError<Message>> {
) -> Result<(), async_channel::TrySendError<Message>> {
let curr_tip = self.get_curr_height().await;

// Enqueue consensus msg only if local tip is close enough to the
// network tip.
let enable_enqueue =
msg.header.round >= curr_tip && msg.header.round < (curr_tip + 10);

match &msg.payload {
Payload::Candidate(_)
| Payload::Validation(_)
| Payload::Ratification(_) => {
self.task.read().await.main_inbound.send(msg).await?;
let task = self.task.read().await;
if !task.is_running() {
broadcast(&self.network, &msg).await;
}

if enable_enqueue {
task.main_inbound.try_send(msg)?;
}
}
Payload::Quorum(_) => {
self.task.read().await.quorum_inbound.send(msg).await?;
let task = self.task.read().await;
if !task.is_running() {
broadcast(&self.network, &msg).await;
}

if enable_enqueue {
task.quorum_inbound.try_send(msg)?;
}
}
_ => warn!("invalid inbound message"),
}
Expand Down Expand Up @@ -456,9 +476,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
// (Re)broadcast a fully valid block before any call to
// get_provisioners to speed up its propagation
if let Some(msg) = msg {
let _ = self.network.read().await.broadcast(msg).await.map_err(
|err| warn!("Unable to broadcast accepted block: {err}"),
);
broadcast(&self.network, msg).await;
}

self.log_missing_iterations(
Expand Down Expand Up @@ -763,6 +781,12 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
}
}

async fn broadcast<N: Network>(network: &Arc<RwLock<N>>, msg: &Message) {
let _ = network.read().await.broadcast(msg).await.map_err(|err| {
warn!("Unable to broadcast msg: {:?} {err} ", msg.topic())
});
}

/// Performs full verification of block header against prev_block header where
/// prev_block is usually the blockchain tip
///
Expand Down
Loading

0 comments on commit f52d524

Please sign in to comment.