Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(provider): Receipts by block range in ReceiptProvider #5281

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion crates/storage/provider/src/providers/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl<DB: Database> ProviderFactory<DB> {
if block_number == provider.best_block_number().unwrap_or_default() &&
block_number == provider.last_block_number().unwrap_or_default()
{
return Ok(Box::new(LatestStateProvider::new(provider.into_tx())))
return Ok(Box::new(LatestStateProvider::new(provider.into_tx())));
}

// +1 as the changeset that we want is the one that was applied after this block.
Expand Down Expand Up @@ -332,6 +332,14 @@ impl<DB: Database> ReceiptProvider for ProviderFactory<DB> {
fn receipts_by_block(&self, block: BlockHashOrNumber) -> RethResult<Option<Vec<Receipt>>> {
self.provider()?.receipts_by_block(block)
}

fn receipts_by_block_range(
&self,
start: BlockHashOrNumber,
end: BlockHashOrNumber,
) -> RethResult<Option<Vec<(u64, Vec<Receipt>)>>> {
self.provider()?.receipts_by_block_range(start, end)
}
}

impl<DB: Database> WithdrawalsProvider for ProviderFactory<DB> {
Expand Down
101 changes: 77 additions & 24 deletions crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ where
while let Some((sharded_key, list)) = item {
// If the shard does not belong to the key, break.
if !shard_belongs_to_key(&sharded_key) {
break
break;
}
cursor.delete_current()?;

Expand All @@ -141,12 +141,12 @@ where
let first = list.iter(0).next().expect("List can't be empty");
if first >= block_number as usize {
item = cursor.prev()?;
continue
continue;
} else if block_number <= sharded_key.as_ref().highest_block_number {
// Filter out all elements greater than block number.
return Ok(list.iter(0).take_while(|i| *i < block_number as usize).collect::<Vec<_>>())
return Ok(list.iter(0).take_while(|i| *i < block_number as usize).collect::<Vec<_>>());
} else {
return Ok(list.iter(0).collect::<Vec<_>>())
return Ok(list.iter(0).collect::<Vec<_>>());
}
}

Expand Down Expand Up @@ -223,7 +223,7 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
range: RangeInclusive<BlockNumber>,
) -> RethResult<BundleStateWithReceipts> {
if range.is_empty() {
return Ok(BundleStateWithReceipts::default())
return Ok(BundleStateWithReceipts::default());
}
let start_block_number = *range.start();

Expand Down Expand Up @@ -401,7 +401,7 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
let block_bodies = self.get_or_take::<tables::BlockBodyIndices, false>(range)?;

if block_bodies.is_empty() {
return Ok(Vec::new())
return Ok(Vec::new());
}

// Compute the first and last tx ID in the range
Expand All @@ -410,7 +410,7 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {

// If this is the case then all of the blocks in the range are empty
if last_transaction < first_transaction {
return Ok(block_bodies.into_iter().map(|(n, _)| (n, Vec::new())).collect())
return Ok(block_bodies.into_iter().map(|(n, _)| (n, Vec::new())).collect());
}

// Get transactions and senders
Expand Down Expand Up @@ -539,7 +539,7 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {

let block_headers = self.get_or_take::<tables::Headers, TAKE>(range.clone())?;
if block_headers.is_empty() {
return Ok(Vec::new())
return Ok(Vec::new());
}

let block_header_hashes =
Expand Down Expand Up @@ -645,7 +645,7 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {

while let Some(Ok((entry_key, _))) = reverse_walker.next() {
if selector(entry_key.clone()) <= key {
break
break;
}
reverse_walker.delete_current()?;
deleted += 1;
Expand Down Expand Up @@ -692,7 +692,7 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
}

if deleted == limit {
break
break;
}
}
}
Expand Down Expand Up @@ -723,7 +723,7 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
}

if deleted == limit {
break
break;
}
}
}
Expand All @@ -743,7 +743,7 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
// delete old shard so new one can be inserted.
self.tx.delete::<T>(shard_key, None)?;
let list = list.iter(0).map(|i| i as u64).collect::<Vec<_>>();
return Ok(list)
return Ok(list);
}
Ok(Vec::new())
}
Expand Down Expand Up @@ -884,7 +884,7 @@ impl<TX: DbTx> HeaderProvider for DatabaseProvider<TX> {
if let Some(td) = self.chain_spec.final_paris_total_difficulty(number) {
// if this block is higher than the final paris(merge) block, return the final paris
// difficulty
return Ok(Some(td))
return Ok(Some(td));
}

Ok(self.tx.get::<tables::HeaderTD>(number)?.map(|td| td.0))
Expand Down Expand Up @@ -995,7 +995,7 @@ impl<TX: DbTx> BlockReader for DatabaseProvider<TX> {
None => return Ok(None),
};

return Ok(Some(Block { header, body: transactions, ommers, withdrawals }))
return Ok(Some(Block { header, body: transactions, ommers, withdrawals }));
}
}

Expand All @@ -1015,11 +1015,11 @@ impl<TX: DbTx> BlockReader for DatabaseProvider<TX> {
// If the Paris (Merge) hardfork block is known and block is after it, return empty
// ommers.
if self.chain_spec.final_paris_total_difficulty(number).is_some() {
return Ok(Some(Vec::new()))
return Ok(Some(Vec::new()));
}

let ommers = self.tx.get::<tables::BlockOmmers>(number)?.map(|o| o.ommers);
return Ok(ommers)
return Ok(ommers);
}

Ok(None)
Expand Down Expand Up @@ -1084,7 +1084,7 @@ impl<TX: DbTx> BlockReader for DatabaseProvider<TX> {

fn block_range(&self, range: RangeInclusive<BlockNumber>) -> RethResult<Vec<Block>> {
if range.is_empty() {
return Ok(Vec::new())
return Ok(Vec::new());
}

let len = range.end().saturating_sub(*range.start()) as usize;
Expand Down Expand Up @@ -1262,7 +1262,7 @@ impl<TX: DbTx> TransactionsProvider for DatabaseProvider<TX> {
excess_blob_gas: header.excess_blob_gas,
};

return Ok(Some((transaction, meta)))
return Ok(Some((transaction, meta)));
}
}
}
Expand Down Expand Up @@ -1293,7 +1293,7 @@ impl<TX: DbTx> TransactionsProvider for DatabaseProvider<TX> {
.map(|result| result.map(|(_, tx)| tx.into()))
.collect::<Result<Vec<_>, _>>()?;
Ok(Some(transactions))
}
};
}
}
Ok(None)
Expand Down Expand Up @@ -1375,11 +1375,64 @@ impl<TX: DbTx> ReceiptProvider for DatabaseProvider<TX> {
.map(|result| result.map(|(_, receipt)| receipt))
.collect::<Result<Vec<_>, _>>()?;
Ok(Some(receipts))
}
};
}
}
Ok(None)
}

fn receipts_by_block_range(
&self,
start: BlockHashOrNumber,
end: BlockHashOrNumber,
) -> RethResult<Option<Vec<(u64, Vec<Receipt>)>>> {
let block_numbers =
(self.convert_hash_or_number(start)?, self.convert_hash_or_number(end)?);
if let (Some(start_number), Some(end_number)) = block_numbers {
if start_number > end_number {
return Err(RethError::Custom(String::from("Invalid inclusive range: `start` corresponds to a greater blockheight than `end`")));
}

// Gather all block body indices for blocks in the inclusive block range of
// [start, end].
let block_body_tx_indices = (start_number..=end_number)
.map(|block_no| Ok((block_no, self.block_body_indices(block_no)?)))
.collect::<RethResult<Vec<_>>>()?;

let tx_num_boundaries = (block_body_tx_indices.first(), block_body_tx_indices.last());
if let (Some((_, Some(start_body))), Some((_, Some(end_body)))) = tx_num_boundaries {
let (start_idx, end_idx) =
(start_body.tx_num_range().start, end_body.tx_num_range().end);

// Gather all receipts in the block range in one db transaction by walking over the
// receipts table in the range of the freceipts table in the range of the first
let mut receipts_cursor = self.tx.cursor_read::<tables::Receipts>()?;
let mut receipts = receipts_cursor
.walk_range(start_idx..end_idx)?
.map(|result| result.map(|(_, receipt)| receipt))
.collect::<Result<Vec<_>, _>>()?;

// Walk through the tx indices of each block in the requested range. For each
// block, drain elements from the front of the `receipts` vector equal to the
// number of transactions in the block.
let res = block_body_tx_indices
.into_iter()
.map(|(block_number, indices)| {
let indices = indices.ok_or(RethError::Custom(String::from(
"Failed to fetch block indices",
)))?;
Ok((
block_number,
receipts.drain(0..indices.tx_count as usize).collect_vec(),
))
})
.collect::<RethResult<Vec<(u64, Vec<Receipt>)>>>()?;

return Ok(Some(res));
};
}
Ok(None)
}
}

impl<TX: DbTx> WithdrawalsProvider for DatabaseProvider<TX> {
Expand All @@ -1397,7 +1450,7 @@ impl<TX: DbTx> WithdrawalsProvider for DatabaseProvider<TX> {
.get::<tables::BlockWithdrawals>(number)
.map(|w| w.map(|w| w.withdrawals))?
.unwrap_or_default();
return Ok(Some(withdrawals))
return Ok(Some(withdrawals));
}
}
Ok(None)
Expand Down Expand Up @@ -1648,7 +1701,7 @@ impl<TX: DbTxMut + DbTx> HashingWriter for DatabaseProvider<TX> {
block_number: *range.end(),
block_hash: end_block_hash,
}
.into())
.into());
}
trie_updates.flush(&self.tx)?;
}
Expand Down Expand Up @@ -2028,7 +2081,7 @@ impl<TX: DbTxMut + DbTx> BlockExecutionWriter for DatabaseProvider<TX> {
block_number: parent_number,
block_hash: parent_hash,
}
.into())
.into());
}
trie_updates.flush(&self.tx)?;
}
Expand Down Expand Up @@ -2156,7 +2209,7 @@ impl<TX: DbTxMut + DbTx> BlockWriter for DatabaseProvider<TX> {
prune_modes: Option<&PruneModes>,
) -> RethResult<()> {
if blocks.is_empty() {
return Ok(())
return Ok(());
}
let new_tip = blocks.last().unwrap();
let new_tip_number = new_tip.number;
Expand Down
12 changes: 10 additions & 2 deletions crates/storage/provider/src/providers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,14 @@ where
fn receipts_by_block(&self, block: BlockHashOrNumber) -> RethResult<Option<Vec<Receipt>>> {
self.database.provider()?.receipts_by_block(block)
}

fn receipts_by_block_range(
&self,
start: BlockHashOrNumber,
end: BlockHashOrNumber,
) -> RethResult<Option<Vec<(u64, Vec<Receipt>)>>> {
self.database.provider()?.receipts_by_block_range(start, end)
}
}
impl<DB, Tree> ReceiptProviderIdExt for BlockchainProvider<DB, Tree>
where
Expand Down Expand Up @@ -534,7 +542,7 @@ where

if let Some(block) = self.tree.pending_block_num_hash() {
if let Ok(pending) = self.tree.pending_state_provider(block.hash) {
return self.pending_with_provider(pending)
return self.pending_with_provider(pending);
}
}

Expand All @@ -544,7 +552,7 @@ where

fn pending_state_by_hash(&self, block_hash: B256) -> RethResult<Option<StateProviderBox<'_>>> {
if let Some(state) = self.tree.find_pending_state_provider(block_hash) {
return Ok(Some(self.pending_with_provider(state)?))
return Ok(Some(self.pending_with_provider(state)?));
}
Ok(None)
}
Expand Down
12 changes: 10 additions & 2 deletions crates/storage/provider/src/test_utils/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ impl TransactionsProvider for MockEthProvider {
base_fee: block.header.base_fee_per_gas,
excess_blob_gas: block.header.excess_blob_gas,
};
return Ok(Some((tx.clone(), meta)))
return Ok(Some((tx.clone(), meta)));
}
}
}
Expand All @@ -249,7 +249,7 @@ impl TransactionsProvider for MockEthProvider {
let mut current_tx_number: TxNumber = 0;
for block in lock.values() {
if current_tx_number + (block.body.len() as TxNumber) > id {
return Ok(Some(block.header.number))
return Ok(Some(block.header.number));
}
current_tx_number += block.body.len() as TxNumber;
}
Expand Down Expand Up @@ -334,6 +334,14 @@ impl ReceiptProvider for MockEthProvider {
fn receipts_by_block(&self, _block: BlockHashOrNumber) -> RethResult<Option<Vec<Receipt>>> {
Ok(None)
}

fn receipts_by_block_range(
&self,
_start: BlockHashOrNumber,
_end: BlockHashOrNumber,
) -> RethResult<Option<Vec<(u64, Vec<Receipt>)>>> {
Ok(None)
}
}

impl ReceiptProviderIdExt for MockEthProvider {}
Expand Down
8 changes: 8 additions & 0 deletions crates/storage/provider/src/test_utils/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,14 @@ impl ReceiptProvider for NoopProvider {
fn receipts_by_block(&self, _block: BlockHashOrNumber) -> RethResult<Option<Vec<Receipt>>> {
Ok(None)
}

fn receipts_by_block_range(
&self,
_start: BlockHashOrNumber,
_end: BlockHashOrNumber,
) -> RethResult<Option<Vec<(u64, Vec<Receipt>)>>> {
Ok(None)
}
}

impl ReceiptProviderIdExt for NoopProvider {}
Expand Down
13 changes: 12 additions & 1 deletion crates/storage/provider/src/traits/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ pub trait ReceiptProvider: Send + Sync {
///
/// Returns `None` if the block is not found.
fn receipts_by_block(&self, block: BlockHashOrNumber) -> RethResult<Option<Vec<Receipt>>>;

/// Get receipts by contiguous block range, with inclusive bounds. The `start`
/// [BlockHashOrNumber] must be less than or equal to the `end` [BlockHashOrNumber].
///
/// Returns `None` if either the start or end block is not found.
#[allow(clippy::type_complexity)]
fn receipts_by_block_range(
&self,
start: BlockHashOrNumber,
end: BlockHashOrNumber,
) -> RethResult<Option<Vec<(u64, Vec<Receipt>)>>>;
}

/// Trait extension for `ReceiptProvider`, for types that implement `BlockId` conversion.
Expand All @@ -41,7 +52,7 @@ pub trait ReceiptProviderIdExt: ReceiptProvider + BlockIdReader {
if let Some(num) = self.convert_block_number(num_tag)? {
BlockHashOrNumber::Number(num)
} else {
return Ok(None)
return Ok(None);
}
}
};
Expand Down
Loading