Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rusk-wallet: Add method to store last block height in cache #2623

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 37 additions & 10 deletions rusk-wallet/src/bin/interactive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
// Copyright (c) DUSK NETWORK. All rights reserved.

use bip39::{Language, Mnemonic, MnemonicType};
use flume::TryRecvError;
use requestty::Question;
use rusk_wallet::{
currency::Dusk,
dat::{DatFileVersion, LATEST_VERSION},
gas::{self},
Address, Error, Wallet, WalletPath, MAX_ADDRESSES,
gas, Address, Error, SyncStatus, Wallet, WalletPath, MAX_ADDRESSES,
};

use crate::io;
Expand Down Expand Up @@ -199,14 +199,41 @@ fn menu_addr(wallet: &Wallet<WalletFile>) -> anyhow::Result<AddrSelect> {
};

if let Some(rx) = &wallet.state()?.sync_rx {
if let Ok(status) = rx.try_recv() {
action_menu = action_menu
.separator()
.separator_msg(format!("Sync Status: {}", status));
} else {
action_menu = action_menu
.separator()
.separator_msg("Waiting for Sync to complete..".to_string());
match rx.try_recv() {
Ok(status) => {
let last_height = wallet.last_block_height()?;

action_menu = action_menu.separator().separator_msg(format!(
"Synced at last block height: {}",
last_height
));

match status {
SyncStatus::Synced => (),
SyncStatus::NotSynced => {
action_menu = action_menu
.separator()
.separator_msg("Syncing in progress..".to_string());
}
SyncStatus::Err(e) => {
action_menu = action_menu.separator().separator_msg(
format!("Sync failed with err: {:?}", e),
)
}
}
}
Err(e) => match e {
TryRecvError::Empty => {
action_menu = action_menu
.separator()
.separator_msg("Syncing in progress..".to_string());
}
TryRecvError::Disconnected => {
action_menu = action_menu.separator().separator_msg(
"Channel disconnected restart the wallet for sync status".to_string(),
);
}
},
}
}

Expand Down
44 changes: 27 additions & 17 deletions rusk-wallet/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,16 @@ impl Cache {
pk_bs58: &str,
block_height: u64,
note_data: (Note, BlsScalar),
cf_name: Option<String>,
) -> Result<(), Error> {
let cf_name = pk_bs58;
let cf_name = match cf_name {
Some(x) => x,
None => pk_bs58.to_string(),
};

let cf = self
.db
.cf_handle(cf_name)
.cf_handle(&cf_name)
.ok_or(Error::CacheDatabaseCorrupted)?;

let (note, nullifier) = note_data;
Expand Down Expand Up @@ -86,21 +90,7 @@ impl Cache {
) -> Result<(), Error> {
let cf_name = format!("spent_{pk_bs58}");

let cf = self
.db
.cf_handle(&cf_name)
.ok_or(Error::CacheDatabaseCorrupted)?;

let (note, nullifier) = note_data;

let leaf = NoteLeaf { block_height, note };
let data = rkyv::to_bytes::<NoteLeaf, TREE_LEAF>(&leaf)
.map_err(|_| Error::Rkyv)?;
let key = nullifier.to_bytes();

self.db.put_cf(&cf, key, data)?;

Ok(())
self.insert(pk_bs58, block_height, note_data, Some(cf_name))
}

pub(crate) fn spend_notes(
Expand Down Expand Up @@ -155,6 +145,26 @@ impl Cache {
}))
}

pub(crate) fn update_last_block_height(
&self,
block_height: u64,
) -> Result<(), Error> {
let last_block_height = self.last_block_height()?.unwrap_or(0);
let max = std::cmp::max(last_block_height, block_height);

self.db.put(b"last_block_height", max.to_be_bytes())?;
Daksh14 marked this conversation as resolved.
Show resolved Hide resolved

Ok(())
}

pub(crate) fn last_block_height(&self) -> Result<Option<u64>, Error> {
Ok(self.db.get(b"last_block_height")?.map(|x| {
let buff: [u8; 8] = x.try_into().expect("Invalid u64 in cache db");

u64::from_be_bytes(buff)
}))
}

/// Returns an iterator over all unspent notes nullifier for the given pk.
pub(crate) fn unspent_notes_id(
&self,
Expand Down
10 changes: 5 additions & 5 deletions rusk-wallet/src/clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub struct State {
client: RuesHttpClient,
prover: RuskHttpClient,
store: LocalStore,
pub sync_rx: Option<Receiver<String>>,
pub sync_rx: Option<Receiver<SyncStatus>>,
sync_join_handle: Option<JoinHandle<()>>,
}

Expand Down Expand Up @@ -121,7 +121,7 @@ impl State {
}

pub async fn register_sync(&mut self) -> Result<(), Error> {
let (sync_tx, sync_rx) = flume::unbounded::<String>();
let (sync_tx, sync_rx) = flume::unbounded::<SyncStatus>();

self.sync_rx = Some(sync_rx);

Expand All @@ -134,11 +134,11 @@ impl State {

let handle = tokio::spawn(async move {
loop {
let _ = sync_tx.send("Syncing..".to_string());
let _ = sync_tx.send(SyncStatus::NotSynced);

let _ = match sync_db(&client, &cache, &store, status).await {
Ok(_) => sync_tx.send("Syncing Complete".to_string()),
Err(e) => sync_tx.send(format!("Error during sync:.. {e}")),
Ok(_) => sync_tx.send(SyncStatus::Synced),
Err(e) => sync_tx.send(SyncStatus::Err(e)),
};

sleep(Duration::from_secs(SYNC_INTERVAL_SECONDS)).await;
Expand Down
5 changes: 4 additions & 1 deletion rusk-wallet/src/clients/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ pub(crate) async fn sync_db(
last_pos = std::cmp::max(last_pos, *note.pos());

note_data.push((block_height, note));

// keep track of max block height while inserting notes
cache.update_last_block_height(block_height)?;
}

cache.insert_last_pos(last_pos)?;
Expand All @@ -97,7 +100,7 @@ pub(crate) async fn sync_db(

match spent {
true => cache.insert_spent(&pk_bs58, *block_height, note),
false => cache.insert(&pk_bs58, *block_height, note),
false => cache.insert(&pk_bs58, *block_height, note, None),
}?;
}
}
Expand Down
12 changes: 12 additions & 0 deletions rusk-wallet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ use execution_core::{
BlsScalar,
};

/// The enum that the sync status reciever emits so the binary can report
/// on the sync status
#[derive(Debug)]
pub enum SyncStatus {
/// Syncing is complete
Synced,
/// Syncing is in progress
NotSynced,
/// An error occurred while syncing
Err(Error),
}

use currency::Dusk;

/// The largest amount of Dusk that is possible to convert
Expand Down
7 changes: 7 additions & 0 deletions rusk-wallet/src/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1154,6 +1154,13 @@ impl<F: SecureWalletFile + Debug> Wallet<F> {
Ok(network_last_pos == db_pos)
}

/// Fetch the last block height from the database
pub fn last_block_height(&self) -> Result<u64, Error> {
let state = self.state()?;

Ok(state.cache().last_block_height()?.unwrap_or(0))
}

/// Close the wallet and zeroize the seed
pub fn close(&mut self) {
self.store.inner_mut().zeroize();
Expand Down
Loading