Skip to content

Commit

Permalink
rusk-wallet: Add method to store last block height in cache
Browse files Browse the repository at this point in the history
Add sync status enum to display correct sync status to user and current synced block height from the cache
Handle channel TryRecv Error
  • Loading branch information
Daksh14 committed Oct 11, 2024
1 parent cbf7094 commit b03f31b
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 32 deletions.
46 changes: 37 additions & 9 deletions rusk-wallet/src/bin/interactive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
// Copyright (c) DUSK NETWORK. All rights reserved.

use bip39::{Language, Mnemonic, MnemonicType};
use flume::TryRecvError;
use requestty::Question;
use rusk_wallet::{
currency::Dusk,
dat::{DatFileVersion, LATEST_VERSION},
gas, Address, Error, Wallet, WalletPath, MAX_ADDRESSES,
gas, Address, Error, SyncStatus, Wallet, WalletPath, MAX_ADDRESSES,
};

use crate::command::DEFAULT_STAKE_GAS_LIMIT;
Expand Down Expand Up @@ -200,14 +201,41 @@ fn menu_addr(wallet: &Wallet<WalletFile>) -> anyhow::Result<AddrSelect> {
};

if let Some(rx) = &wallet.state()?.sync_rx {
if let Ok(status) = rx.try_recv() {
action_menu = action_menu
.separator()
.separator_msg(format!("Sync Status: {}", status));
} else {
action_menu = action_menu
.separator()
.separator_msg("Waiting for Sync to complete..".to_string());
match rx.try_recv() {
Ok(status) => {
let last_height = wallet.last_block_height()?;

action_menu = action_menu.separator().separator_msg(format!(
"Synced at last block height: {}",
last_height
));

match status {
SyncStatus::Synced => (),
SyncStatus::NotSynced => {
action_menu = action_menu
.separator()
.separator_msg("Syncing in progress..".to_string());
}
SyncStatus::Err(e) => {
action_menu = action_menu.separator().separator_msg(
format!("Sync failed with err: {:?}", e),
)
}
}
}
Err(e) => match e {
TryRecvError::Empty => {
action_menu = action_menu
.separator()
.separator_msg("Syncing in progress..".to_string());
}
TryRecvError::Disconnected => {
action_menu = action_menu.separator().separator_msg(
"Channel disconnected restart the wallet for sync status".to_string(),
);
}
},
}
}

Expand Down
44 changes: 27 additions & 17 deletions rusk-wallet/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,16 @@ impl Cache {
pk_bs58: &str,
block_height: u64,
note_data: (Note, BlsScalar),
cf_name: Option<String>,
) -> Result<(), Error> {
let cf_name = pk_bs58;
let cf_name = match cf_name {
Some(x) => x,
None => pk_bs58.to_string(),
};

let cf = self
.db
.cf_handle(cf_name)
.cf_handle(&cf_name)
.ok_or(Error::CacheDatabaseCorrupted)?;

let (note, nullifier) = note_data;
Expand Down Expand Up @@ -86,21 +90,7 @@ impl Cache {
) -> Result<(), Error> {
let cf_name = format!("spent_{pk_bs58}");

let cf = self
.db
.cf_handle(&cf_name)
.ok_or(Error::CacheDatabaseCorrupted)?;

let (note, nullifier) = note_data;

let leaf = NoteLeaf { block_height, note };
let data = rkyv::to_bytes::<NoteLeaf, TREE_LEAF>(&leaf)
.map_err(|_| Error::Rkyv)?;
let key = nullifier.to_bytes();

self.db.put_cf(&cf, key, data)?;

Ok(())
self.insert(pk_bs58, block_height, note_data, Some(cf_name))
}

pub(crate) fn spend_notes(
Expand Down Expand Up @@ -155,6 +145,26 @@ impl Cache {
}))
}

pub(crate) fn update_last_block_height(
&self,
block_height: u64,
) -> Result<(), Error> {
let last_block_height = self.last_block_height()?.unwrap_or(0);
let max = std::cmp::max(last_block_height, block_height);

self.db.put(b"last_block_height", max.to_be_bytes())?;

Ok(())
}

pub(crate) fn last_block_height(&self) -> Result<Option<u64>, Error> {
Ok(self.db.get(b"last_block_height")?.map(|x| {
let buff: [u8; 8] = x.try_into().expect("Invalid u64 in cache db");

u64::from_be_bytes(buff)
}))
}

/// Returns an iterator over all unspent notes nullifier for the given pk.
pub(crate) fn unspent_notes_id(
&self,
Expand Down
10 changes: 5 additions & 5 deletions rusk-wallet/src/clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub struct State {
client: RuesHttpClient,
prover: RuskHttpClient,
store: LocalStore,
pub sync_rx: Option<Receiver<String>>,
pub sync_rx: Option<Receiver<SyncStatus>>,
sync_join_handle: Option<JoinHandle<()>>,
}

Expand Down Expand Up @@ -121,7 +121,7 @@ impl State {
}

pub async fn register_sync(&mut self) -> Result<(), Error> {
let (sync_tx, sync_rx) = flume::unbounded::<String>();
let (sync_tx, sync_rx) = flume::unbounded::<SyncStatus>();

self.sync_rx = Some(sync_rx);

Expand All @@ -134,11 +134,11 @@ impl State {

let handle = tokio::spawn(async move {
loop {
let _ = sync_tx.send("Syncing..".to_string());
let _ = sync_tx.send(SyncStatus::NotSynced);

let _ = match sync_db(&client, &cache, &store, status).await {
Ok(_) => sync_tx.send("Syncing Complete".to_string()),
Err(e) => sync_tx.send(format!("Error during sync:.. {e}")),
Ok(_) => sync_tx.send(SyncStatus::Synced),
Err(e) => sync_tx.send(SyncStatus::Err(e)),
};

sleep(Duration::from_secs(SYNC_INTERVAL_SECONDS)).await;
Expand Down
5 changes: 4 additions & 1 deletion rusk-wallet/src/clients/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ pub(crate) async fn sync_db(
last_pos = std::cmp::max(last_pos, *note.pos());

note_data.push((block_height, note));

// keep track of max block height while inserting notes
cache.update_last_block_height(block_height)?;
}

cache.insert_last_pos(last_pos)?;
Expand All @@ -97,7 +100,7 @@ pub(crate) async fn sync_db(

match spent {
true => cache.insert_spent(&pk_bs58, *block_height, note),
false => cache.insert(&pk_bs58, *block_height, note),
false => cache.insert(&pk_bs58, *block_height, note, None),
}?;
}
}
Expand Down
12 changes: 12 additions & 0 deletions rusk-wallet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ use execution_core::{
BlsScalar,
};

/// The enum that the sync status reciever emits so the binary can report
/// on the sync status
#[derive(Debug)]
pub enum SyncStatus {
/// Syncing is complete
Synced,
/// Syncing is in progress
NotSynced,
/// An error occurred while syncing
Err(Error),
}

use currency::Dusk;

/// The largest amount of Dusk that is possible to convert
Expand Down
7 changes: 7 additions & 0 deletions rusk-wallet/src/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,13 @@ impl<F: SecureWalletFile + Debug> Wallet<F> {
Ok(network_last_pos == db_pos)
}

/// Fetch the last block height from the database
pub fn last_block_height(&self) -> Result<u64, Error> {
let state = self.state()?;

Ok(state.cache().last_block_height()?.unwrap_or(0))
}

/// Close the wallet and zeroize the seed
pub fn close(&mut self) {
self.store.inner_mut().zeroize();
Expand Down

0 comments on commit b03f31b

Please sign in to comment.