Skip to content

Commit

Permalink
feat: implement providers for SnapshotJarProvider (#5231)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshieDo authored Nov 3, 2023
1 parent 38eb1ee commit 3ab1afc
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 59 deletions.
3 changes: 3 additions & 0 deletions crates/interfaces/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,7 @@ pub enum ProviderError {
/// State is not available for the given block number because it is pruned.
#[error("state at block #{0} is pruned")]
StateAtBlockPruned(BlockNumber),
/// Provider does not support this particular request.
#[error("this provider does not support this request")]
UnsupportedProvider,
}
15 changes: 15 additions & 0 deletions crates/primitives/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,21 @@ impl TransactionSignedNoHash {
pub fn with_hash(self) -> TransactionSigned {
self.into()
}

/// Recovers a list of signers from a transaction list iterator
///
/// Returns `None`, if some transaction's signature is invalid, see also
/// [Self::recover_signer].
pub fn recover_signers<'a, T>(txes: T, num_txes: usize) -> Option<Vec<Address>>
where
T: IntoParallelIterator<Item = &'a Self> + IntoIterator<Item = &'a Self> + Send,
{
if num_txes < *PARALLEL_SENDER_RECOVERY_THRESHOLD {
txes.into_iter().map(|tx| tx.recover_signer()).collect()
} else {
txes.into_par_iter().map(|tx| tx.recover_signer()).collect()
}
}
}

impl Compact for TransactionSignedNoHash {
Expand Down
6 changes: 6 additions & 0 deletions crates/storage/db/src/snapshot/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ impl<'a> SnapshotCursor<'a> {
Ok(Self(NippyJarCursor::with_handle(jar, mmap_handle)?))
}

/// Returns the current `BlockNumber` or `TxNumber` of the cursor depending on the kind of
/// snapshot segment.
pub fn number(&self) -> u64 {
self.row_index() + self.jar().user_header().start()
}

/// Gets a row of values.
pub fn get(
&mut self,
Expand Down
6 changes: 6 additions & 0 deletions crates/storage/nippy-jar/src/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,16 @@ where
})
}

/// Returns a reference to the related [`NippyJar`]
pub fn jar(&self) -> &NippyJar<H> {
self.jar
}

/// Returns current row index of the cursor
pub fn row_index(&self) -> u64 {
self.row
}

/// Resets cursor to the beginning.
pub fn reset(&mut self) {
self.row = 0;
Expand Down
181 changes: 122 additions & 59 deletions crates/storage/provider/src/providers/snapshot/jar.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,41 @@
use super::LoadedJarRef;
use crate::{BlockHashReader, BlockNumReader, HeaderProvider, TransactionsProvider};
use crate::{
BlockHashReader, BlockNumReader, HeaderProvider, ReceiptProvider, TransactionsProvider,
};
use reth_db::{
codecs::CompactU256,
snapshot::{HeaderMask, SnapshotCursor},
table::Decompress,
snapshot::{HeaderMask, ReceiptMask, SnapshotCursor, TransactionMask},
};
use reth_interfaces::{
executor::{BlockExecutionError, BlockValidationError},
provider::ProviderError,
RethResult,
};
use reth_interfaces::{provider::ProviderError, RethResult};
use reth_primitives::{
Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, Header, SealedHeader,
Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, Header, Receipt, SealedHeader,
TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, B256, U256,
};
use std::ops::{Deref, Range, RangeBounds};

/// Provider over a specific `NippyJar` and range.
#[derive(Debug)]
pub struct SnapshotJarProvider<'a>(LoadedJarRef<'a>);
pub struct SnapshotJarProvider<'a> {
/// Main snapshot segment
jar: LoadedJarRef<'a>,
/// Another kind of snapshot segment to help query data from the main one.
auxiliar_jar: Option<Box<Self>>,
}

impl<'a> Deref for SnapshotJarProvider<'a> {
type Target = LoadedJarRef<'a>;
fn deref(&self) -> &Self::Target {
&self.0
&self.jar
}
}

impl<'a> From<LoadedJarRef<'a>> for SnapshotJarProvider<'a> {
fn from(value: LoadedJarRef<'a>) -> Self {
SnapshotJarProvider(value)
SnapshotJarProvider { jar: value, auxiliar_jar: None }
}
}

Expand All @@ -37,6 +47,12 @@ impl<'a> SnapshotJarProvider<'a> {
{
SnapshotCursor::new(self.value(), self.mmap_handle())
}

/// Adds a new auxiliar snapshot to help query data from the main one
pub fn with_auxiliar(mut self, auxiliar_jar: SnapshotJarProvider<'a>) -> Self {
self.auxiliar_jar = Some(Box::new(auxiliar_jar));
self
}
}

impl<'a> HeaderProvider for SnapshotJarProvider<'a> {
Expand Down Expand Up @@ -71,9 +87,8 @@ impl<'a> HeaderProvider for SnapshotJarProvider<'a> {
let mut headers = Vec::with_capacity((range.end - range.start) as usize);

for num in range.start..range.end {
match cursor.get_one::<HeaderMask<Header>>(num.into())? {
Some(header) => headers.push(header),
None => return Ok(headers),
if let Some(header) = cursor.get_one::<HeaderMask<Header>>(num.into())? {
headers.push(header);
}
}

Expand All @@ -90,9 +105,10 @@ impl<'a> HeaderProvider for SnapshotJarProvider<'a> {
let mut headers = Vec::with_capacity((range.end - range.start) as usize);

for number in range.start..range.end {
match cursor.get_two::<HeaderMask<Header, BlockHash>>(number.into())? {
Some((header, hash)) => headers.push(header.seal(hash)),
None => return Ok(headers),
if let Some((header, hash)) =
cursor.get_two::<HeaderMask<Header, BlockHash>>(number.into())?
{
headers.push(header.seal(hash))
}
}
Ok(headers)
Expand All @@ -113,110 +129,157 @@ impl<'a> BlockHashReader for SnapshotJarProvider<'a> {

fn canonical_hashes_range(
&self,
_start: BlockNumber,
_end: BlockNumber,
start: BlockNumber,
end: BlockNumber,
) -> RethResult<Vec<B256>> {
todo!()
let mut cursor = self.cursor()?;
let mut hashes = Vec::with_capacity((end - start) as usize);

for number in start..end {
if let Some(hash) = cursor.get_one::<HeaderMask<BlockHash>>(number.into())? {
hashes.push(hash)
}
}
Ok(hashes)
}
}

impl<'a> BlockNumReader for SnapshotJarProvider<'a> {
fn chain_info(&self) -> RethResult<ChainInfo> {
todo!()
// Information on live database
Err(ProviderError::UnsupportedProvider.into())
}

fn best_block_number(&self) -> RethResult<BlockNumber> {
todo!()
// Information on live database
Err(ProviderError::UnsupportedProvider.into())
}

fn last_block_number(&self) -> RethResult<BlockNumber> {
todo!()
// Information on live database
Err(ProviderError::UnsupportedProvider.into())
}

fn block_number(&self, _hash: B256) -> RethResult<Option<BlockNumber>> {
todo!()
fn block_number(&self, hash: B256) -> RethResult<Option<BlockNumber>> {
let mut cursor = self.cursor()?;

Ok(cursor
.get_one::<HeaderMask<BlockHash>>((&hash).into())?
.and_then(|res| (res == hash).then(|| cursor.number())))
}
}

impl<'a> TransactionsProvider for SnapshotJarProvider<'a> {
fn transaction_id(&self, _tx_hash: TxHash) -> RethResult<Option<TxNumber>> {
todo!()
fn transaction_id(&self, hash: TxHash) -> RethResult<Option<TxNumber>> {
let mut cursor = self.cursor()?;

Ok(cursor
.get_one::<TransactionMask<TransactionSignedNoHash>>((&hash).into())?
.and_then(|res| (res.hash() == hash).then(|| cursor.number())))
}

fn transaction_by_id(&self, num: TxNumber) -> RethResult<Option<TransactionSigned>> {
TransactionSignedNoHash::decompress(
self.cursor()?
.row_by_number_with_cols((num - self.user_header().tx_start()) as usize, 0b1)?
.ok_or_else(|| ProviderError::TransactionNotFound(num.into()))?[0],
)
.map(Into::into)
.map(Some)
.map_err(Into::into)
Ok(self
.cursor()?
.get_one::<TransactionMask<TransactionSignedNoHash>>(num.into())?
.map(|tx| tx.with_hash()))
}

fn transaction_by_id_no_hash(
&self,
_id: TxNumber,
num: TxNumber,
) -> RethResult<Option<TransactionSignedNoHash>> {
todo!()
self.cursor()?.get_one::<TransactionMask<TransactionSignedNoHash>>(num.into())
}

fn transaction_by_hash(&self, hash: TxHash) -> RethResult<Option<TransactionSigned>> {
// WIP
let mut cursor = self.cursor()?;

let tx = TransactionSignedNoHash::decompress(
cursor.row_by_key_with_cols(&hash.0, 0b1).unwrap().unwrap()[0],
)
.unwrap()
.with_hash();

if tx.hash() == hash {
return Ok(Some(tx))
} else {
// check next snapshot
}
Ok(None)
Ok(self
.cursor()?
.get_one::<TransactionMask<TransactionSignedNoHash>>((&hash).into())?
.map(|tx| tx.with_hash()))
}

fn transaction_by_hash_with_meta(
&self,
_hash: TxHash,
) -> RethResult<Option<(TransactionSigned, TransactionMeta)>> {
todo!()
// Information required on indexing table [`tables::TransactionBlock`]
Err(ProviderError::UnsupportedProvider.into())
}

fn transaction_block(&self, _id: TxNumber) -> RethResult<Option<BlockNumber>> {
todo!()
// Information on indexing table [`tables::TransactionBlock`]
Err(ProviderError::UnsupportedProvider.into())
}

fn transactions_by_block(
&self,
_block_id: BlockHashOrNumber,
) -> RethResult<Option<Vec<TransactionSigned>>> {
todo!()
// Related to indexing tables. Live database should get the tx_range and call snapshot
// provider with `transactions_by_tx_range` instead.
Err(ProviderError::UnsupportedProvider.into())
}

fn transactions_by_block_range(
&self,
_range: impl RangeBounds<BlockNumber>,
) -> RethResult<Vec<Vec<TransactionSigned>>> {
todo!()
// Related to indexing tables. Live database should get the tx_range and call snapshot
// provider with `transactions_by_tx_range` instead.
Err(ProviderError::UnsupportedProvider.into())
}

fn senders_by_tx_range(&self, _range: impl RangeBounds<TxNumber>) -> RethResult<Vec<Address>> {
todo!()
fn senders_by_tx_range(&self, range: impl RangeBounds<TxNumber>) -> RethResult<Vec<Address>> {
let txs = self.transactions_by_tx_range(range)?;
Ok(TransactionSignedNoHash::recover_signers(&txs, txs.len())
.ok_or(BlockExecutionError::Validation(BlockValidationError::SenderRecoveryError))?)
}

fn transactions_by_tx_range(
&self,
_range: impl RangeBounds<TxNumber>,
range: impl RangeBounds<TxNumber>,
) -> RethResult<Vec<reth_primitives::TransactionSignedNoHash>> {
todo!()
let range = to_range(range);
let mut cursor = self.cursor()?;
let mut txes = Vec::with_capacity((range.end - range.start) as usize);

for num in range {
if let Some(tx) =
cursor.get_one::<TransactionMask<TransactionSignedNoHash>>(num.into())?
{
txes.push(tx)
}
}
Ok(txes)
}

fn transaction_sender(&self, num: TxNumber) -> RethResult<Option<Address>> {
Ok(self
.cursor()?
.get_one::<TransactionMask<TransactionSignedNoHash>>(num.into())?
.and_then(|tx| tx.recover_signer()))
}
}

impl<'a> ReceiptProvider for SnapshotJarProvider<'a> {
fn receipt(&self, num: TxNumber) -> RethResult<Option<Receipt>> {
self.cursor()?.get_one::<ReceiptMask<Receipt>>(num.into())
}

fn receipt_by_hash(&self, hash: TxHash) -> RethResult<Option<Receipt>> {
if let Some(tx_snapshot) = &self.auxiliar_jar {
if let Some(num) = tx_snapshot.transaction_id(hash)? {
return self.receipt(num)
}
}
Ok(None)
}

fn transaction_sender(&self, _id: TxNumber) -> RethResult<Option<Address>> {
todo!()
fn receipts_by_block(&self, _block: BlockHashOrNumber) -> RethResult<Option<Vec<Receipt>>> {
// Related to indexing tables. Snapshot should get the tx_range and call snapshot
// provider with `receipt()` instead for each
Err(ProviderError::UnsupportedProvider.into())
}
}

Expand Down

0 comments on commit 3ab1afc

Please sign in to comment.