Skip to content

Commit

Permalink
Backport lru deadlock fix (#826)
Browse files Browse the repository at this point in the history
* Esad/fix deadlock block cache (#823)

* fix deadlock of acquiring orders

* use fee history mutex better

* update Cargo.lock (should have been done in file descriptor pr :) )

* Apply suggestions from code review

Co-authored-by: Rakan Al-Huneiti <[email protected]>

---------

Co-authored-by: Rakan Al-Huneiti <[email protected]>

* Fix fuzz Cargo.lock

* Remove mutexes from gas_price fee/block caches

---------

Co-authored-by: Esad Yusuf Atik <[email protected]>
Co-authored-by: Rakan Al-Huneiti <[email protected]>
  • Loading branch information
3 people authored Jul 4, 2024
1 parent a9eb99f commit 8f99def
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 58 deletions.
37 changes: 15 additions & 22 deletions crates/ethereum-rpc/src/gas_price/cache.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::sync::Mutex;

use reth_primitives::{BlockNumberOrTag, B256};
use reth_rpc::eth::error::EthResult;
use reth_rpc_types::{AnyTransactionReceipt, Block, BlockTransactions, Rich};
Expand All @@ -8,34 +6,31 @@ use sov_modules_api::WorkingSet;

/// Cache for gas oracle
pub struct BlockCache<C: sov_modules_api::Context> {
// Assuming number_to_hash and cache are always in sync
number_to_hash: Mutex<LruMap<u64, B256, ByLength>>, // Number -> hash mapping
cache: Mutex<LruMap<B256, Rich<Block>, ByLength>>,
number_to_hash: LruMap<u64, B256, ByLength>, // Number -> hash mapping
cache: LruMap<B256, Rich<Block>, ByLength>,
provider: citrea_evm::Evm<C>,
}

impl<C: sov_modules_api::Context> BlockCache<C> {
pub fn new(max_size: u32, provider: citrea_evm::Evm<C>) -> Self {
Self {
number_to_hash: Mutex::new(LruMap::new(ByLength::new(max_size))),
cache: Mutex::new(LruMap::new(ByLength::new(max_size))),
number_to_hash: LruMap::new(ByLength::new(max_size)),
cache: LruMap::new(ByLength::new(max_size)),
provider,
}
}

/// Gets block from cache or from provider
pub fn get_block(
&self,
&mut self,
block_hash: B256,
working_set: &mut WorkingSet<C>,
) -> EthResult<Option<Rich<Block>>> {
// Check if block is in cache
let mut cache = self.cache.lock().unwrap();
let mut number_to_hash = self.number_to_hash.lock().unwrap();
if let Some(block) = cache.get(&block_hash) {
if let Some(block) = self.cache.get(&block_hash) {
// Even though block is in cache, ask number_to_hash to keep it in sync
let number: u64 = block.header.number.unwrap_or_default();
number_to_hash.get(&number);
self.number_to_hash.get(&number);
return Ok(Some(block.clone()));
}

Expand All @@ -49,24 +44,22 @@ impl<C: sov_modules_api::Context> BlockCache<C> {
if let Some(block) = &block {
let number: u64 = block.header.number.unwrap_or_default();

number_to_hash.insert(number, block_hash);
cache.insert(block_hash, block.clone());
self.number_to_hash.insert(number, block_hash);
self.cache.insert(block_hash, block.clone());
}

Ok(block)
}

/// Gets block from cache or from provider by block number
pub fn get_block_by_number(
&self,
&mut self,
block_number: u64,
working_set: &mut WorkingSet<C>,
) -> EthResult<Option<Rich<Block>>> {
let mut number_to_hash = self.number_to_hash.lock().unwrap();
let mut cache = self.cache.lock().unwrap();
// Check if block is in cache
if let Some(block_hash) = number_to_hash.get(&block_number) {
return Ok(Some(cache.get(block_hash).unwrap().clone()));
if let Some(block_hash) = self.number_to_hash.get(&block_number) {
return Ok(Some(self.cache.get(block_hash).unwrap().clone()));
}

// Get block from provider
Expand All @@ -84,15 +77,15 @@ impl<C: sov_modules_api::Context> BlockCache<C> {
let number: u64 = block.header.number.unwrap_or_default();
let hash = block.header.hash.unwrap_or_default();

number_to_hash.insert(number, hash);
cache.insert(hash, block.clone());
self.number_to_hash.insert(number, hash);
self.cache.insert(hash, block.clone());
}

Ok(block)
}

pub fn get_block_with_receipts(
&self,
&mut self,
block_number: u64,
working_set: &mut WorkingSet<C>,
) -> EthResult<Option<(Rich<Block>, Vec<AnyTransactionReceipt>)>> {
Expand Down
40 changes: 19 additions & 21 deletions crates/ethereum-rpc/src/gas_price/fee_history.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Consist of types adjacent to the fee history cache and its configs
use std::fmt::Debug;
use std::sync::{Arc, Mutex};

use reth_primitives::B256;
use reth_rpc::eth::error::EthApiError;
Expand Down Expand Up @@ -44,18 +43,18 @@ pub struct FeeHistoryCache<C: sov_modules_api::Context> {
/// and max number of blocks
config: FeeHistoryCacheConfig,
/// Stores the entries of the cache
entries: Mutex<LruMap<u64, FeeHistoryEntry, ByLength>>,
entries: LruMap<u64, FeeHistoryEntry, ByLength>,
/// Block cache
block_cache: Arc<BlockCache<C>>,
pub(crate) block_cache: BlockCache<C>,
}

impl<C: sov_modules_api::Context> FeeHistoryCache<C> {
/// Creates new FeeHistoryCache instance, initialize it with the mose recent data, set bounds
pub fn new(config: FeeHistoryCacheConfig, block_cache: Arc<BlockCache<C>>) -> Self {
pub fn new(config: FeeHistoryCacheConfig, block_cache: BlockCache<C>) -> Self {
let max_blocks = config.max_blocks;
Self {
config,
entries: Mutex::new(LruMap::new(ByLength::new(max_blocks as u32))),
entries: LruMap::new(ByLength::new(max_blocks as u32)),
block_cache,
}
}
Expand All @@ -73,10 +72,7 @@ impl<C: sov_modules_api::Context> FeeHistoryCache<C> {
}

/// Processing of the arriving blocks
pub fn insert_blocks<I>(&self, entries: &mut LruMap<u64, FeeHistoryEntry, ByLength>, blocks: I)
where
I: Iterator<Item = (Rich<Block>, Vec<AnyTransactionReceipt>)>,
{
pub fn insert_blocks(&mut self, blocks: Vec<(Rich<Block>, Vec<AnyTransactionReceipt>)>) {
let percentiles = self.predefined_percentiles();
// Insert all new blocks and calculate approximated rewards
for (block, receipts) in blocks {
Expand All @@ -94,7 +90,7 @@ impl<C: sov_modules_api::Context> FeeHistoryCache<C> {
)
.unwrap_or_default();
let block_number: u64 = block.header.number.unwrap_or_default();
entries.insert(block_number, fee_history_entry);
self.entries.insert(block_number, fee_history_entry);
}
}

Expand All @@ -105,17 +101,15 @@ impl<C: sov_modules_api::Context> FeeHistoryCache<C> {
/// it returns the corresponding entries.
/// Otherwise it returns None.
pub fn get_history(
&self,
&mut self,
start_block: u64,
end_block: u64,
working_set: &mut WorkingSet<C>,
) -> Vec<FeeHistoryEntry> {
let mut entries = self.entries.lock().unwrap();

let mut result = Vec::new();
let mut empty_blocks = Vec::new();
for block_number in start_block..=end_block {
let entry = entries.get(&block_number);
let entry = self.entries.get(&block_number);

// if entry, push to result
if let Some(entry) = entry {
Expand All @@ -128,18 +122,22 @@ impl<C: sov_modules_api::Context> FeeHistoryCache<C> {
}

// Get blocks from cache (fallback rpc) and receipts from rpc
let blocks_with_receipts = empty_blocks.clone().into_iter().filter_map(|block_number| {
self.block_cache
.get_block_with_receipts(block_number, working_set)
.unwrap_or(None)
});
let blocks_with_receipts = empty_blocks
.clone()
.into_iter()
.filter_map(|block_number| {
self.block_cache
.get_block_with_receipts(block_number, working_set)
.unwrap_or(None)
})
.collect();

// Insert blocks with receipts into cache
self.insert_blocks(&mut entries, blocks_with_receipts);
self.insert_blocks(blocks_with_receipts);

// Get entries from cache for empty blocks
for block_number in empty_blocks {
let entry = entries.get(&block_number);
let entry = self.entries.get(&block_number);
if let Some(entry) = entry {
result[block_number as usize - start_block as usize] = entry.clone();
}
Expand Down
29 changes: 14 additions & 15 deletions crates/ethereum-rpc/src/gas_price/gas_oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
// Adopted from: https://github.com/paradigmxyz/reth/blob/main/crates/rpc/rpc/src/eth/gas_oracle.rs

use std::sync::Arc;

use citrea_evm::{Evm, SYSTEM_SIGNER};
use reth_primitives::basefee::calc_next_block_base_fee;
use reth_primitives::constants::GWEI_TO_WEI;
Expand Down Expand Up @@ -105,8 +103,6 @@ pub struct GasPriceOracle<C: sov_modules_api::Context> {
last_price: Mutex<GasPriceOracleResult>,
/// Fee history cache with lifetime
fee_history_cache: Mutex<FeeHistoryCache<C>>,
/// Block cache
cache: Arc<BlockCache<C>>,
}

impl<C: sov_modules_api::Context> GasPriceOracle<C> {
Expand All @@ -124,18 +120,14 @@ impl<C: sov_modules_api::Context> GasPriceOracle<C> {

let max_header_history = oracle_config.max_header_history as u32;

let cache = BlockCache::new(max_header_history, provider.clone());

let arc_cache = Arc::new(cache);

let fee_history_cache = FeeHistoryCache::new(fee_history_config, arc_cache.clone());
let block_cache = BlockCache::new(max_header_history, provider.clone());
let fee_history_cache = FeeHistoryCache::new(fee_history_config, block_cache);

Self {
provider: provider.clone(),
oracle_config,
last_price: Default::default(),
fee_history_cache: Mutex::new(fee_history_cache),
cache: arc_cache,
}
}

Expand Down Expand Up @@ -200,11 +192,14 @@ impl<C: sov_modules_api::Context> GasPriceOracle<C> {
let mut gas_used_ratio: Vec<f64> = Vec::new();
let mut rewards: Vec<Vec<u128>> = Vec::new();

let fee_history_cache = self.fee_history_cache.lock().await;
let (fee_entries, resolution) = {
let mut fee_history_cache = self.fee_history_cache.lock().await;

// Check if the requested range is within the cache bounds
let fee_entries = fee_history_cache.get_history(start_block, end_block, working_set);
let resolution = fee_history_cache.resolution();
(
fee_history_cache.get_history(start_block, end_block, working_set),
fee_history_cache.resolution(),
)
};

if fee_entries.len() != block_count as usize {
return Err(EthApiError::InvalidBlockRange);
Expand Down Expand Up @@ -333,7 +328,11 @@ impl<C: sov_modules_api::Context> GasPriceOracle<C> {
working_set: &mut WorkingSet<C>,
) -> EthResult<Option<(B256, Vec<u128>)>> {
// check the cache (this will hit the disk if the block is not cached)
let block = match self.cache.get_block(block_hash, working_set)? {
let block_hit = {
let mut cache = self.fee_history_cache.lock().await;
cache.block_cache.get_block(block_hash, working_set)?
};
let block = match block_hit {
Some(block) => block,
None => return Ok(None),
};
Expand Down

0 comments on commit 8f99def

Please sign in to comment.