Skip to content

Commit

Permalink
mempool implemented with some todos still pending
Browse files Browse the repository at this point in the history
  • Loading branch information
Oscar-Pepper committed Dec 3, 2024
1 parent 895b5dc commit 39a9ef1
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 28 deletions.
4 changes: 2 additions & 2 deletions zingo-sync/src/primitives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ pub struct WalletNote<N, Nf: Copy> {
nullifier: Option<Nf>, //TODO: syncing without nullfiier deriving key
/// Commitment tree leaf position
#[getset(get_copy = "pub")]
position: Position,
position: Option<Position>,
/// Memo
#[getset(get = "pub")]
memo: Memo,
Expand All @@ -298,7 +298,7 @@ impl<N, Nf: Copy> WalletNote<N, Nf> {
key_id: KeyId,
note: N,
nullifier: Option<Nf>,
position: Position,
position: Option<Position>,
memo: Memo,
spending_transaction: Option<TxId>,
) -> Self {
Expand Down
88 changes: 72 additions & 16 deletions zingo-sync/src/scan/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ use crate::{
client::{self, FetchRequest},
keys::{self, transparent::TransparentAddressId, KeyId},
primitives::{
OrchardNote, OutPointMap, OutgoingNote, OutgoingOrchardNote, OutgoingSaplingNote, OutputId,
SaplingNote, SyncOutgoingNotes, TransparentCoin, WalletBlock, WalletNote,
WalletTransaction,
NullifierMap, OrchardNote, OutPointMap, OutgoingNote, OutgoingOrchardNote,
OutgoingSaplingNote, OutputId, SaplingNote, SyncOutgoingNotes, TransparentCoin,
WalletBlock, WalletNote, WalletTransaction,
},
utils,
};
Expand Down Expand Up @@ -67,7 +67,7 @@ impl<Proof> ShieldedOutputExt<SaplingDomain> for OutputDescription<Proof> {
#[allow(clippy::too_many_arguments)]
pub(crate) async fn scan_transactions<P: consensus::Parameters>(
fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
parameters: &P,
consensus_parameters: &P,
ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
relevant_txids: HashSet<TxId>,
decrypted_note_data: DecryptedNoteData,
Expand Down Expand Up @@ -97,13 +97,15 @@ pub(crate) async fn scan_transactions<P: consensus::Parameters>(
}

let wallet_transaction = scan_transaction(
parameters,
consensus_parameters,
ufvks,
transaction,
block_height,
&decrypted_note_data,
&mut NullifierMap::new(),
outpoint_map,
&transparent_addresses,
false,
)
.unwrap();
wallet_transactions.insert(txid, wallet_transaction);
Expand All @@ -112,18 +114,21 @@ pub(crate) async fn scan_transactions<P: consensus::Parameters>(
Ok(wallet_transactions)
}

fn scan_transaction<P: consensus::Parameters>(
parameters: &P,
pub(crate) fn scan_transaction<P: consensus::Parameters>(
consensus_parameters: &P,
ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
transaction: Transaction,
block_height: BlockHeight,
decrypted_note_data: &DecryptedNoteData,
nullifier_map: &mut NullifierMap,
outpoint_map: &mut OutPointMap,
transparent_addresses: &HashMap<String, TransparentAddressId>,
pending: bool, // TODO: change for confirmation status
) -> Result<WalletTransaction, ()> {
// TODO: price?
// TODO: condsider splitting into seperate fns for pending and confirmed etc.
// TODO: price? save in wallet block as its relative to time mined?
let zip212_enforcement = zcash_primitives::transaction::components::sapling::zip212_enforcement(
parameters,
consensus_parameters,
block_height,
);
let mut transparent_coins: Vec<TransparentCoin> = Vec::new();
Expand Down Expand Up @@ -166,7 +171,7 @@ fn scan_transaction<P: consensus::Parameters>(
if let Some(bundle) = transaction.transparent_bundle() {
let transparent_outputs = &bundle.vout;
scan_incoming_coins(
parameters,
consensus_parameters,
&mut transparent_coins,
transaction.txid(),
transparent_addresses,
Expand Down Expand Up @@ -240,18 +245,39 @@ fn scan_transaction<P: consensus::Parameters>(
encoded_memos.append(&mut parse_encoded_memos(&orchard_notes).unwrap());
}

// for confirmed transactions nullifiers are collected during compact block scanning
if pending {
collect_nullifiers(nullifier_map, block_height, &transaction);
}

for encoded_memo in encoded_memos {
match encoded_memo {
ParsedMemo::Version0 { uas } => {
add_recipient_unified_address(parameters, uas.clone(), &mut outgoing_sapling_notes);
add_recipient_unified_address(parameters, uas, &mut outgoing_orchard_notes);
add_recipient_unified_address(
consensus_parameters,
uas.clone(),
&mut outgoing_sapling_notes,
);
add_recipient_unified_address(
consensus_parameters,
uas,
&mut outgoing_orchard_notes,
);
}
ParsedMemo::Version1 {
uas,
rejection_address_indexes: _,
} => {
add_recipient_unified_address(parameters, uas.clone(), &mut outgoing_sapling_notes);
add_recipient_unified_address(parameters, uas, &mut outgoing_orchard_notes);
add_recipient_unified_address(
consensus_parameters,
uas.clone(),
&mut outgoing_sapling_notes,
);
add_recipient_unified_address(
consensus_parameters,
uas,
&mut outgoing_orchard_notes,
);

// TODO: handle rejection addresses from encoded memos
}
Expand All @@ -261,7 +287,7 @@ fn scan_transaction<P: consensus::Parameters>(
}
}

// TODO: consider adding nullifiers and transparent txin data for efficiency
// TODO: consider adding nullifiers and transparent outpoint data for efficiency

Ok(WalletTransaction::from_parts(
transaction,
Expand Down Expand Up @@ -327,7 +353,7 @@ where
key_ids[key_index],
note,
Some(*nullifier),
*position,
Some(*position),
Memo::from_bytes(memo_bytes.as_ref()).unwrap(),
None,
));
Expand Down Expand Up @@ -438,6 +464,36 @@ fn add_recipient_unified_address<P, Nz>(
}
}

/// Converts and adds the nullifiers from a transaction to the nullifier map
fn collect_nullifiers(
nullifier_map: &mut NullifierMap,
block_height: BlockHeight,
transaction: &Transaction,
) {
if let Some(bundle) = transaction.sapling_bundle() {
bundle
.shielded_spends()
.iter()
.map(|spend| spend.nullifier())
.for_each(|nullifier| {
nullifier_map
.sapling_mut()
.insert(*nullifier, (block_height, transaction.txid()));
});
}
if let Some(bundle) = transaction.orchard_bundle() {
bundle
.actions()
.iter()
.map(|action| action.nullifier())
.for_each(|nullifier| {
nullifier_map
.orchard_mut()
.insert(*nullifier, (block_height, transaction.txid()));
});
}
}

/// Adds the outpoints from a transparent bundle to the outpoint map.
fn collect_outpoints<A: zcash_primitives::transaction::components::transparent::Authorization>(
outpoint_map: &mut OutPointMap,
Expand Down
49 changes: 39 additions & 10 deletions zingo-sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use std::time::Duration;

use crate::client::{self, FetchRequest};
use crate::error::SyncError;
use crate::primitives::{Locator, OutPointMap, OutputId};
use crate::keys::transparent::TransparentAddressId;
use crate::primitives::{Locator, NullifierMap, OutPointMap, OutputId};
use crate::scan::error::{ContinuityError, ScanError};
use crate::scan::task::{Scanner, ScannerState};
use crate::scan::transactions::scan_transactions;
use crate::scan::transactions::{scan_transaction, scan_transactions};
use crate::scan::{DecryptedNoteData, ScanResults};
use crate::traits::{
SyncBlocks, SyncNullifiers, SyncOutPoints, SyncShardTrees, SyncTransactions, SyncWallet,
Expand Down Expand Up @@ -91,7 +92,7 @@ where
);
scanner.spawn_workers();

// Setup the initial mempool stream
// setup the initial mempool stream
let mut mempool_stream = client::get_mempool_transaction_stream(fetch_request_sender.clone())
.await
.unwrap();
Expand Down Expand Up @@ -119,6 +120,8 @@ where
process_mempool_stream_response(
consensus_parameters,
fetch_request_sender.clone(),
&ufvks,
wallet,
mempool_stream_response,
&mut mempool_stream)
.await;
Expand Down Expand Up @@ -338,7 +341,7 @@ where
wallet_transactions
.values_mut()
.flat_map(|tx| tx.sapling_notes_mut())
.filter(|note| note.spending_transaction().is_none())
.filter(|note| note.spending_transaction().is_none()) // TODO: add logic for spending tx pending
.for_each(|note| {
if let Some((_, txid)) = note
.nullifier()
Expand All @@ -350,7 +353,7 @@ where
wallet_transactions
.values_mut()
.flat_map(|tx| tx.orchard_notes_mut())
.filter(|note| note.spending_transaction().is_none())
.filter(|note| note.spending_transaction().is_none()) // TODO: add logic for spending tx pending
.for_each(|note| {
if let Some((_, txid)) = note
.nullifier()
Expand Down Expand Up @@ -385,7 +388,7 @@ where
wallet_transactions
.values_mut()
.flat_map(|tx| tx.transparent_coins_mut())
.filter(|coin| coin.spending_transaction().is_none())
.filter(|coin| coin.spending_transaction().is_none()) // TODO: add logic for spending tx pending
.for_each(|coin| {
if let Some((_, txid)) = transparent_spend_locators.get(&coin.output_id()) {
coin.set_spending_transaction(Some(*txid));
Expand Down Expand Up @@ -438,12 +441,16 @@ where
Ok(())
}

async fn process_mempool_stream_response(
async fn process_mempool_stream_response<W>(
consensus_parameters: &impl consensus::Parameters,
fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
wallet: &mut W,
mempool_stream_response: Result<Option<RawTransaction>, tonic::Status>,
mempool_stream: &mut tonic::Streaming<RawTransaction>,
) {
) where
W: SyncWallet + SyncNullifiers + SyncOutPoints,
{
match mempool_stream_response.unwrap() {
Some(raw_transaction) => {
let block_height =
Expand All @@ -460,11 +467,33 @@ async fn process_mempool_stream_response(
block_height
);

// TODO: scan tx
// TODO: create confirmation status
let mut nullifiers = NullifierMap::new();
let mut outpoints = OutPointMap::new();
let transparent_addresses: HashMap<String, TransparentAddressId> = wallet
.get_transparent_addresses()
.unwrap()
.iter()
.map(|(id, address)| (address.clone(), *id))
.collect();
let wallet_transaction = scan_transaction(
consensus_parameters,
ufvks,
transaction,
block_height,
&DecryptedNoteData::new(),
&mut nullifiers,
&mut outpoints,
&transparent_addresses,
true,
)
.unwrap();
wallet.append_nullifiers(nullifiers).unwrap();
wallet.append_outpoints(outpoints).unwrap();
// TODO: add wallet transactions to wallet
}
None => {
// A block was mined, setup a new mempool stream until the next block is mined.
tracing::info!("mempool response is None");
*mempool_stream = client::get_mempool_transaction_stream(fetch_request_sender.clone())
.await
.unwrap();
Expand Down

0 comments on commit 39a9ef1

Please sign in to comment.