Skip to content

Commit

Permalink
Add drop aware sender new type (fixes #13242) (#13495)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthias Seitz <[email protected]>
  • Loading branch information
ABD-AZE and mattsse authored Dec 24, 2024
1 parent 934fd1f commit f67625f
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 11 deletions.
66 changes: 56 additions & 10 deletions crates/rpc/rpc-eth-types/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,53 @@ type HeaderLruCache<H, L> = MultiConsumerLruCache<B256, H, L, HeaderResponseSend
pub struct EthStateCache<B: Block, R> {
to_service: UnboundedSender<CacheAction<B, R>>,
}
/// Drop aware sender struct
#[derive(Debug)]
struct ActionSender<B: Block, R: Send + Sync> {
blockhash: B256,
tx: Option<UnboundedSender<CacheAction<B, R>>>,
}

impl<R: Send + Sync, B: Block> ActionSender<B, R> {
const fn new(blockhash: B256, tx: Option<UnboundedSender<CacheAction<B, R>>>) -> Self {
Self { blockhash, tx }
}
fn send_block(
&mut self,
block_sender: Result<Option<Arc<SealedBlockWithSenders<B>>>, ProviderError>,
) {
if let Some(tx) = self.tx.take() {
let _ = tx.send(CacheAction::BlockWithSendersResult {
block_hash: self.blockhash,
res: block_sender,
});
}
}
fn send_receipts(&mut self, receipts: Result<Option<Arc<Vec<R>>>, ProviderError>) {
if let Some(tx) = self.tx.take() {
let _ =
tx.send(CacheAction::ReceiptsResult { block_hash: self.blockhash, res: receipts });
}
}
fn send_header(&mut self, header: Result<<B as Block>::Header, ProviderError>) {
if let Some(tx) = self.tx.take() {
let _ = tx.send(CacheAction::HeaderResult {
block_hash: self.blockhash,
res: Box::new(header),
});
}
}
}
impl<R: Send + Sync, B: Block> Drop for ActionSender<B, R> {
fn drop(&mut self) {
if let Some(tx) = self.tx.take() {
let _ = tx.send(CacheAction::BlockWithSendersResult {
block_hash: self.blockhash,
res: Err(ProviderError::CacheServiceUnavailable),
});
}
}
}

impl<B: Block, R> Clone for EthStateCache<B, R> {
fn clone(&self) -> Self {
Expand Down Expand Up @@ -359,6 +406,8 @@ where
let provider = this.provider.clone();
let action_tx = this.action_tx.clone();
let rate_limiter = this.rate_limiter.clone();
let mut action_sender =
ActionSender::new(block_hash, Some(action_tx));
this.action_task_spawner.spawn_blocking(Box::pin(async move {
// Acquire permit
let _permit = rate_limiter.acquire().await;
Expand All @@ -370,10 +419,7 @@ where
TransactionVariant::WithHash,
)
.map(|maybe_block| maybe_block.map(Arc::new));
let _ = action_tx.send(CacheAction::BlockWithSendersResult {
block_hash,
res: block_sender,
});
action_sender.send_block(block_sender);
}));
}
}
Expand All @@ -389,15 +435,16 @@ where
let provider = this.provider.clone();
let action_tx = this.action_tx.clone();
let rate_limiter = this.rate_limiter.clone();
let mut action_sender =
ActionSender::new(block_hash, Some(action_tx));
this.action_task_spawner.spawn_blocking(Box::pin(async move {
// Acquire permit
let _permit = rate_limiter.acquire().await;
let res = provider
.receipts_by_block(block_hash.into())
.map(|maybe_receipts| maybe_receipts.map(Arc::new));

let _ = action_tx
.send(CacheAction::ReceiptsResult { block_hash, res });
action_sender.send_receipts(res);
}));
}
}
Expand All @@ -414,6 +461,8 @@ where
let provider = this.provider.clone();
let action_tx = this.action_tx.clone();
let rate_limiter = this.rate_limiter.clone();
let mut action_sender =
ActionSender::new(block_hash, Some(action_tx));
this.action_task_spawner.spawn_blocking(Box::pin(async move {
// Acquire permit
let _permit = rate_limiter.acquire().await;
Expand All @@ -422,10 +471,7 @@ where
ProviderError::HeaderNotFound(block_hash.into())
})
});
let _ = action_tx.send(CacheAction::HeaderResult {
block_hash,
res: Box::new(header),
});
action_sender.send_header(header);
}));
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/rpc-eth-types/src/fee_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ pub async fn fee_history_cache_new_blocks_task<St, Provider, N>(
event = events.next() => {
let Some(event) = event else {
// the stream ended, we are done
break;
break
};

let committed = event.committed();
Expand Down

0 comments on commit f67625f

Please sign in to comment.