From 3ab1afc9aa11cfc50236509a217b26d522fbb9b2 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Fri, 3 Nov 2023 14:30:57 +0000 Subject: [PATCH] feat: implement providers for `SnapshotJarProvider` (#5231) --- crates/interfaces/src/provider.rs | 3 + crates/primitives/src/transaction/mod.rs | 15 ++ crates/storage/db/src/snapshot/cursor.rs | 6 + crates/storage/nippy-jar/src/cursor.rs | 6 + .../provider/src/providers/snapshot/jar.rs | 181 ++++++++++++------ 5 files changed, 152 insertions(+), 59 deletions(-) diff --git a/crates/interfaces/src/provider.rs b/crates/interfaces/src/provider.rs index 71b1371f34c6..b9d8bd43b4a6 100644 --- a/crates/interfaces/src/provider.rs +++ b/crates/interfaces/src/provider.rs @@ -111,4 +111,7 @@ pub enum ProviderError { /// State is not available for the given block number because it is pruned. #[error("state at block #{0} is pruned")] StateAtBlockPruned(BlockNumber), + /// Provider does not support this particular request. + #[error("this provider does not support this request")] + UnsupportedProvider, } diff --git a/crates/primitives/src/transaction/mod.rs b/crates/primitives/src/transaction/mod.rs index 5cd7cfdf6b93..3c05d849c09a 100644 --- a/crates/primitives/src/transaction/mod.rs +++ b/crates/primitives/src/transaction/mod.rs @@ -716,6 +716,21 @@ impl TransactionSignedNoHash { pub fn with_hash(self) -> TransactionSigned { self.into() } + + /// Recovers a list of signers from a transaction list iterator + /// + /// Returns `None`, if some transaction's signature is invalid, see also + /// [Self::recover_signer]. + pub fn recover_signers<'a, T>(txes: T, num_txes: usize) -> Option> + where + T: IntoParallelIterator + IntoIterator + Send, + { + if num_txes < *PARALLEL_SENDER_RECOVERY_THRESHOLD { + txes.into_iter().map(|tx| tx.recover_signer()).collect() + } else { + txes.into_par_iter().map(|tx| tx.recover_signer()).collect() + } + } } impl Compact for TransactionSignedNoHash { diff --git a/crates/storage/db/src/snapshot/cursor.rs b/crates/storage/db/src/snapshot/cursor.rs index d4b446ab7cf9..403183f930a1 100644 --- a/crates/storage/db/src/snapshot/cursor.rs +++ b/crates/storage/db/src/snapshot/cursor.rs @@ -18,6 +18,12 @@ impl<'a> SnapshotCursor<'a> { Ok(Self(NippyJarCursor::with_handle(jar, mmap_handle)?)) } + /// Returns the current `BlockNumber` or `TxNumber` of the cursor depending on the kind of + /// snapshot segment. + pub fn number(&self) -> u64 { + self.row_index() + self.jar().user_header().start() + } + /// Gets a row of values. pub fn get( &mut self, diff --git a/crates/storage/nippy-jar/src/cursor.rs b/crates/storage/nippy-jar/src/cursor.rs index 879ff42b8c70..160beb5df5fb 100644 --- a/crates/storage/nippy-jar/src/cursor.rs +++ b/crates/storage/nippy-jar/src/cursor.rs @@ -58,10 +58,16 @@ where }) } + /// Returns a reference to the related [`NippyJar`] pub fn jar(&self) -> &NippyJar { self.jar } + /// Returns current row index of the cursor + pub fn row_index(&self) -> u64 { + self.row + } + /// Resets cursor to the beginning. pub fn reset(&mut self) { self.row = 0; diff --git a/crates/storage/provider/src/providers/snapshot/jar.rs b/crates/storage/provider/src/providers/snapshot/jar.rs index 6facfed3f58d..1e04c8003c2f 100644 --- a/crates/storage/provider/src/providers/snapshot/jar.rs +++ b/crates/storage/provider/src/providers/snapshot/jar.rs @@ -1,31 +1,41 @@ use super::LoadedJarRef; -use crate::{BlockHashReader, BlockNumReader, HeaderProvider, TransactionsProvider}; +use crate::{ + BlockHashReader, BlockNumReader, HeaderProvider, ReceiptProvider, TransactionsProvider, +}; use reth_db::{ codecs::CompactU256, - snapshot::{HeaderMask, SnapshotCursor}, - table::Decompress, + snapshot::{HeaderMask, ReceiptMask, SnapshotCursor, TransactionMask}, +}; +use reth_interfaces::{ + executor::{BlockExecutionError, BlockValidationError}, + provider::ProviderError, + RethResult, }; -use reth_interfaces::{provider::ProviderError, RethResult}; use reth_primitives::{ - Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, Header, SealedHeader, + Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, Header, Receipt, SealedHeader, TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, B256, U256, }; use std::ops::{Deref, Range, RangeBounds}; /// Provider over a specific `NippyJar` and range. #[derive(Debug)] -pub struct SnapshotJarProvider<'a>(LoadedJarRef<'a>); +pub struct SnapshotJarProvider<'a> { + /// Main snapshot segment + jar: LoadedJarRef<'a>, + /// Another kind of snapshot segment to help query data from the main one. + auxiliar_jar: Option>, +} impl<'a> Deref for SnapshotJarProvider<'a> { type Target = LoadedJarRef<'a>; fn deref(&self) -> &Self::Target { - &self.0 + &self.jar } } impl<'a> From> for SnapshotJarProvider<'a> { fn from(value: LoadedJarRef<'a>) -> Self { - SnapshotJarProvider(value) + SnapshotJarProvider { jar: value, auxiliar_jar: None } } } @@ -37,6 +47,12 @@ impl<'a> SnapshotJarProvider<'a> { { SnapshotCursor::new(self.value(), self.mmap_handle()) } + + /// Adds a new auxiliar snapshot to help query data from the main one + pub fn with_auxiliar(mut self, auxiliar_jar: SnapshotJarProvider<'a>) -> Self { + self.auxiliar_jar = Some(Box::new(auxiliar_jar)); + self + } } impl<'a> HeaderProvider for SnapshotJarProvider<'a> { @@ -71,9 +87,8 @@ impl<'a> HeaderProvider for SnapshotJarProvider<'a> { let mut headers = Vec::with_capacity((range.end - range.start) as usize); for num in range.start..range.end { - match cursor.get_one::>(num.into())? { - Some(header) => headers.push(header), - None => return Ok(headers), + if let Some(header) = cursor.get_one::>(num.into())? { + headers.push(header); } } @@ -90,9 +105,10 @@ impl<'a> HeaderProvider for SnapshotJarProvider<'a> { let mut headers = Vec::with_capacity((range.end - range.start) as usize); for number in range.start..range.end { - match cursor.get_two::>(number.into())? { - Some((header, hash)) => headers.push(header.seal(hash)), - None => return Ok(headers), + if let Some((header, hash)) = + cursor.get_two::>(number.into())? + { + headers.push(header.seal(hash)) } } Ok(headers) @@ -113,110 +129,157 @@ impl<'a> BlockHashReader for SnapshotJarProvider<'a> { fn canonical_hashes_range( &self, - _start: BlockNumber, - _end: BlockNumber, + start: BlockNumber, + end: BlockNumber, ) -> RethResult> { - todo!() + let mut cursor = self.cursor()?; + let mut hashes = Vec::with_capacity((end - start) as usize); + + for number in start..end { + if let Some(hash) = cursor.get_one::>(number.into())? { + hashes.push(hash) + } + } + Ok(hashes) } } impl<'a> BlockNumReader for SnapshotJarProvider<'a> { fn chain_info(&self) -> RethResult { - todo!() + // Information on live database + Err(ProviderError::UnsupportedProvider.into()) } fn best_block_number(&self) -> RethResult { - todo!() + // Information on live database + Err(ProviderError::UnsupportedProvider.into()) } fn last_block_number(&self) -> RethResult { - todo!() + // Information on live database + Err(ProviderError::UnsupportedProvider.into()) } - fn block_number(&self, _hash: B256) -> RethResult> { - todo!() + fn block_number(&self, hash: B256) -> RethResult> { + let mut cursor = self.cursor()?; + + Ok(cursor + .get_one::>((&hash).into())? + .and_then(|res| (res == hash).then(|| cursor.number()))) } } impl<'a> TransactionsProvider for SnapshotJarProvider<'a> { - fn transaction_id(&self, _tx_hash: TxHash) -> RethResult> { - todo!() + fn transaction_id(&self, hash: TxHash) -> RethResult> { + let mut cursor = self.cursor()?; + + Ok(cursor + .get_one::>((&hash).into())? + .and_then(|res| (res.hash() == hash).then(|| cursor.number()))) } fn transaction_by_id(&self, num: TxNumber) -> RethResult> { - TransactionSignedNoHash::decompress( - self.cursor()? - .row_by_number_with_cols((num - self.user_header().tx_start()) as usize, 0b1)? - .ok_or_else(|| ProviderError::TransactionNotFound(num.into()))?[0], - ) - .map(Into::into) - .map(Some) - .map_err(Into::into) + Ok(self + .cursor()? + .get_one::>(num.into())? + .map(|tx| tx.with_hash())) } fn transaction_by_id_no_hash( &self, - _id: TxNumber, + num: TxNumber, ) -> RethResult> { - todo!() + self.cursor()?.get_one::>(num.into()) } fn transaction_by_hash(&self, hash: TxHash) -> RethResult> { - // WIP - let mut cursor = self.cursor()?; - - let tx = TransactionSignedNoHash::decompress( - cursor.row_by_key_with_cols(&hash.0, 0b1).unwrap().unwrap()[0], - ) - .unwrap() - .with_hash(); - - if tx.hash() == hash { - return Ok(Some(tx)) - } else { - // check next snapshot - } - Ok(None) + Ok(self + .cursor()? + .get_one::>((&hash).into())? + .map(|tx| tx.with_hash())) } fn transaction_by_hash_with_meta( &self, _hash: TxHash, ) -> RethResult> { - todo!() + // Information required on indexing table [`tables::TransactionBlock`] + Err(ProviderError::UnsupportedProvider.into()) } fn transaction_block(&self, _id: TxNumber) -> RethResult> { - todo!() + // Information on indexing table [`tables::TransactionBlock`] + Err(ProviderError::UnsupportedProvider.into()) } fn transactions_by_block( &self, _block_id: BlockHashOrNumber, ) -> RethResult>> { - todo!() + // Related to indexing tables. Live database should get the tx_range and call snapshot + // provider with `transactions_by_tx_range` instead. + Err(ProviderError::UnsupportedProvider.into()) } fn transactions_by_block_range( &self, _range: impl RangeBounds, ) -> RethResult>> { - todo!() + // Related to indexing tables. Live database should get the tx_range and call snapshot + // provider with `transactions_by_tx_range` instead. + Err(ProviderError::UnsupportedProvider.into()) } - fn senders_by_tx_range(&self, _range: impl RangeBounds) -> RethResult> { - todo!() + fn senders_by_tx_range(&self, range: impl RangeBounds) -> RethResult> { + let txs = self.transactions_by_tx_range(range)?; + Ok(TransactionSignedNoHash::recover_signers(&txs, txs.len()) + .ok_or(BlockExecutionError::Validation(BlockValidationError::SenderRecoveryError))?) } fn transactions_by_tx_range( &self, - _range: impl RangeBounds, + range: impl RangeBounds, ) -> RethResult> { - todo!() + let range = to_range(range); + let mut cursor = self.cursor()?; + let mut txes = Vec::with_capacity((range.end - range.start) as usize); + + for num in range { + if let Some(tx) = + cursor.get_one::>(num.into())? + { + txes.push(tx) + } + } + Ok(txes) + } + + fn transaction_sender(&self, num: TxNumber) -> RethResult> { + Ok(self + .cursor()? + .get_one::>(num.into())? + .and_then(|tx| tx.recover_signer())) + } +} + +impl<'a> ReceiptProvider for SnapshotJarProvider<'a> { + fn receipt(&self, num: TxNumber) -> RethResult> { + self.cursor()?.get_one::>(num.into()) + } + + fn receipt_by_hash(&self, hash: TxHash) -> RethResult> { + if let Some(tx_snapshot) = &self.auxiliar_jar { + if let Some(num) = tx_snapshot.transaction_id(hash)? { + return self.receipt(num) + } + } + Ok(None) } - fn transaction_sender(&self, _id: TxNumber) -> RethResult> { - todo!() + fn receipts_by_block(&self, _block: BlockHashOrNumber) -> RethResult>> { + // Related to indexing tables. Snapshot should get the tx_range and call snapshot + // provider with `receipt()` instead for each + Err(ProviderError::UnsupportedProvider.into()) } }