Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
rpc: Use the blocks pinning API for chainHead methods (#13233)
Browse files Browse the repository at this point in the history
* rpc/chain_head: Add backend to subscription management

Signed-off-by: Alexandru Vasile <[email protected]>

* rpc/chain_head: Pin blocks internally and adjust testing

Signed-off-by: Alexandru Vasile <[email protected]>

* client/in_mem: Reference for the number of pinned blocks

Signed-off-by: Alexandru Vasile <[email protected]>

* rpc/tests: Check in-memory references to pinned blocks

Signed-off-by: Alexandru Vasile <[email protected]>

* rpc/chain_head: Fix clippy

Signed-off-by: Alexandru Vasile <[email protected]>

* rpc/chain_head: Remove unused comment

Signed-off-by: Alexandru Vasile <[email protected]>

* rpc/chain_head: Place subscription handle under `Arc` and unpin blocks on drop

Signed-off-by: Alexandru Vasile <[email protected]>

* rpc/tests: Check all pinned blocks are unpinned on drop

Signed-off-by: Alexandru Vasile <[email protected]>

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <[email protected]>

* Update client/rpc-spec-v2/src/chain_head/subscription.rs

Co-authored-by: Bastian Köcher <[email protected]>

* rpc/tests: Retry fetching the pinned references for CI correctness

Signed-off-by: Alexandru Vasile <[email protected]>

* client/service: Use 512 as maximum number of pinned blocks

Signed-off-by: Alexandru Vasile <[email protected]>

* chain_head: Fix merging conflicts

Signed-off-by: Alexandru Vasile <[email protected]>

* rpc/chain_head: Adjust subscriptions to use pinning API

Signed-off-by: Alexandru Vasile <[email protected]>

* rpc/chain_head/tests: Test subscription management

Signed-off-by: Alexandru Vasile <[email protected]>

* rpc/chain_head: Adjust chain_head follow to the new API

Signed-off-by: Alexandru Vasile <[email protected]>

* rpc/chain_head: Adjust chain_head.rs to the new API

Signed-off-by: Alexandru Vasile <[email protected]>

* rpc/chain_head/tests: Adjust test.rs to the new API

Signed-off-by: Alexandru Vasile <[email protected]>

* client/builder: Use new chainHead API

Signed-off-by: Alexandru Vasile <[email protected]>

* rpc/chain_head: Fix documentation

Signed-off-by: Alexandru Vasile <[email protected]>

* rpc/chain_head: Fix clippy

Signed-off-by: Alexandru Vasile <[email protected]>

* client/in_mem: ChainHead no longer uses `in_mem::children`

Signed-off-by: Alexandru Vasile <[email protected]>

* Update client/rpc-spec-v2/src/chain_head/subscription.rs

Co-authored-by: Sebastian Kunert <[email protected]>

* Update client/rpc-spec-v2/src/chain_head/subscription.rs

Co-authored-by: Sebastian Kunert <[email protected]>

* Update client/rpc-spec-v2/src/chain_head/subscription.rs

Co-authored-by: Sebastian Kunert <[email protected]>

* Update client/rpc-spec-v2/src/chain_head/subscription.rs

Co-authored-by: Sebastian Kunert <[email protected]>

* chain_head: Add block state machine

Signed-off-by: Alexandru Vasile <[email protected]>

* Address feedback

Signed-off-by: Alexandru Vasile <[email protected]>

* Use new_native_or_wasm_executor

Signed-off-by: Alexandru Vasile <[email protected]>

* chain_head: Remove 'static on Backend

Signed-off-by: Alexandru Vasile <[email protected]>

* chain_head: Add documentation

Signed-off-by: Alexandru Vasile <[email protected]>

* chain_head: Lock blocks before async blocks

Signed-off-by: Alexandru Vasile <[email protected]>

* chain_head_follower: Remove static on backend

Signed-off-by: Alexandru Vasile <[email protected]>

* Update client/service/src/builder.rs

Co-authored-by: Davide Galassi <[email protected]>

* Update client/service/src/builder.rs

Co-authored-by: Davide Galassi <[email protected]>

* chain_head: Add BlockHeaderAbsent to the PartialEq impl

Signed-off-by: Alexandru Vasile <[email protected]>

* client: Add better documentation around pinning constants

Signed-off-by: Alexandru Vasile <[email protected]>

* chain_head: Move subscription to dedicated module

Signed-off-by: Alexandru Vasile <[email protected]>

* subscription: Rename global pin / unpin functions

Signed-off-by: Alexandru Vasile <[email protected]>

---------

Signed-off-by: Alexandru Vasile <[email protected]>
Co-authored-by: Bastian Köcher <[email protected]>
Co-authored-by: parity-processbot <>
Co-authored-by: Sebastian Kunert <[email protected]>
Co-authored-by: Davide Galassi <[email protected]>
  • Loading branch information
4 people authored May 2, 2023
1 parent 8904512 commit cb5be45
Show file tree
Hide file tree
Showing 11 changed files with 1,428 additions and 364 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 23 additions & 2 deletions client/api/src/in_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,20 +618,36 @@ where
states: RwLock<HashMap<Block::Hash, InMemoryBackend<HashFor<Block>>>>,
blockchain: Blockchain<Block>,
import_lock: RwLock<()>,
pinned_blocks: RwLock<HashMap<Block::Hash, i64>>,
}

impl<Block: BlockT> Backend<Block>
where
Block::Hash: Ord,
{
/// Create a new instance of in-mem backend.
///
/// # Warning
///
/// For testing purposes only!
pub fn new() -> Self {
Backend {
states: RwLock::new(HashMap::new()),
blockchain: Blockchain::new(),
import_lock: Default::default(),
pinned_blocks: Default::default(),
}
}

/// Return the number of references active for a pinned block.
///
/// # Warning
///
/// For testing purposes only!
pub fn pin_refs(&self, hash: &<Block as BlockT>::Hash) -> Option<i64> {
let blocks = self.pinned_blocks.read();
blocks.get(hash).map(|value| *value)
}
}

impl<Block: BlockT> backend::AuxStore for Backend<Block>
Expand Down Expand Up @@ -781,11 +797,16 @@ where
false
}

fn pin_block(&self, _: <Block as BlockT>::Hash) -> blockchain::Result<()> {
fn pin_block(&self, hash: <Block as BlockT>::Hash) -> blockchain::Result<()> {
let mut blocks = self.pinned_blocks.write();
*blocks.entry(hash).or_default() += 1;
Ok(())
}

fn unpin_block(&self, _: <Block as BlockT>::Hash) {}
fn unpin_block(&self, hash: <Block as BlockT>::Hash) {
let mut blocks = self.pinned_blocks.write();
blocks.entry(hash).and_modify(|counter| *counter -= 1).or_insert(-1);
}
}

impl<Block: BlockT> backend::LocalBackend<Block> for Backend<Block> where Block::Hash: Ord {}
Expand Down
1 change: 1 addition & 0 deletions client/rpc-spec-v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,6 @@ substrate-test-runtime = { version = "2.0.0", path = "../../test-utils/runtime"
sp-consensus = { version = "0.10.0-dev", path = "../../primitives/consensus/common" }
sp-maybe-compressed-blob = { version = "4.1.0-dev", path = "../../primitives/maybe-compressed-blob" }
sc-block-builder = { version = "0.10.0-dev", path = "../block-builder" }
sc-service = { version = "0.10.0-dev", features = ["test-helpers"], path = "../service" }
sc-utils = { version = "4.0.0-dev", path = "../utils" }
assert_matches = "1.3.0"
152 changes: 92 additions & 60 deletions client/rpc-spec-v2/src/chain_head/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
chain_head_follow::ChainHeadFollower,
error::Error as ChainHeadRpcError,
event::{ChainHeadEvent, ChainHeadResult, ErrorEvent, FollowEvent, NetworkConfig},
subscription::SubscriptionManagement,
subscription::{SubscriptionManagement, SubscriptionManagementError},
},
SubscriptionTaskExecutor,
};
Expand All @@ -44,46 +44,48 @@ use sp_api::CallApiAt;
use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
use sp_core::{hexdisplay::HexDisplay, storage::well_known_keys, traits::CallContext, Bytes};
use sp_runtime::traits::Block as BlockT;
use std::{marker::PhantomData, sync::Arc};
use std::{marker::PhantomData, sync::Arc, time::Duration};

pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";

/// An API for chain head RPC calls.
pub struct ChainHead<BE, Block: BlockT, Client> {
pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
/// Substrate client.
client: Arc<Client>,
/// Backend of the chain.
backend: Arc<BE>,
/// Executor to spawn subscriptions.
executor: SubscriptionTaskExecutor,
/// Keep track of the pinned blocks for each subscription.
subscriptions: Arc<SubscriptionManagement<Block>>,
subscriptions: Arc<SubscriptionManagement<Block, BE>>,
/// The hexadecimal encoded hash of the genesis block.
genesis_hash: String,
/// The maximum number of pinned blocks allowed per connection.
max_pinned_blocks: usize,
/// Phantom member to pin the block type.
_phantom: PhantomData<Block>,
}

impl<BE, Block: BlockT, Client> ChainHead<BE, Block, Client> {
impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
/// Create a new [`ChainHead`].
pub fn new<GenesisHash: AsRef<[u8]>>(
client: Arc<Client>,
backend: Arc<BE>,
executor: SubscriptionTaskExecutor,
genesis_hash: GenesisHash,
max_pinned_blocks: usize,
max_pinned_duration: Duration,
) -> Self {
let genesis_hash = format!("0x{:?}", HexDisplay::from(&genesis_hash.as_ref()));

Self {
client,
backend,
backend: backend.clone(),
executor,
subscriptions: Arc::new(SubscriptionManagement::new()),
subscriptions: Arc::new(SubscriptionManagement::new(
max_pinned_blocks,
max_pinned_duration,
backend,
)),
genesis_hash,
max_pinned_blocks,
_phantom: PhantomData,
}
}
Expand Down Expand Up @@ -159,9 +161,8 @@ where
return Err(err)
},
};

// Keep track of the subscription.
let Some((rx_stop, sub_handle)) = self.subscriptions.insert_subscription(sub_id.clone(), runtime_updates, self.max_pinned_blocks) else {
let Some(rx_stop) = self.subscriptions.insert_subscription(sub_id.clone(), runtime_updates) else {
// Inserting the subscription can only fail if the JsonRPSee
// generated a duplicate subscription ID.
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id);
Expand All @@ -177,7 +178,7 @@ where
let mut chain_head_follow = ChainHeadFollower::new(
client,
backend,
sub_handle,
subscriptions.clone(),
runtime_updates,
sub_id.clone(),
);
Expand All @@ -202,19 +203,28 @@ where
let client = self.client.clone();
let subscriptions = self.subscriptions.clone();

let fut = async move {
let Some(handle) = subscriptions.get_subscription(&follow_subscription) else {
let block_guard = match subscriptions.lock_block(&follow_subscription, hash) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
let _ = sink.send(&ChainHeadEvent::<String>::Disjoint);
return
};

// Block is not part of the subscription.
if !handle.contains_block(&hash) {
return Ok(())
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
let _ = sink.reject(ChainHeadRpcError::InvalidBlock);
return
}
return Ok(())
},
Err(error) => {
let _ = sink.send(&ChainHeadEvent::<String>::Error(ErrorEvent {
error: error.to_string(),
}));
return Ok(())
},
};

let fut = async move {
let _block_guard = block_guard;
let event = match client.block(hash) {
Ok(Some(signed_block)) => {
let extrinsics = signed_block.block.extrinsics();
Expand All @@ -226,10 +236,10 @@ where
debug!(
target: LOG_TARGET,
"[body][id={:?}] Stopping subscription because hash={:?} was pruned",
follow_subscription,
&follow_subscription,
hash
);
handle.stop();
subscriptions.remove_subscription(&follow_subscription);
ChainHeadEvent::<String>::Disjoint
},
Err(error) => ChainHeadEvent::Error(ErrorEvent { error: error.to_string() }),
Expand All @@ -246,16 +256,19 @@ where
follow_subscription: String,
hash: Block::Hash,
) -> RpcResult<Option<String>> {
let Some(handle) = self.subscriptions.get_subscription(&follow_subscription) else {
// Invalid invalid subscription ID.
return Ok(None)
let _block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
return Ok(None)
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
return Err(ChainHeadRpcError::InvalidBlock.into())
},
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
};

// Block is not part of the subscription.
if !handle.contains_block(&hash) {
return Err(ChainHeadRpcError::InvalidBlock.into())
}

self.client
.header(hash)
.map(|opt_header| opt_header.map(|h| format!("0x{:?}", HexDisplay::from(&h.encode()))))
Expand Down Expand Up @@ -286,19 +299,28 @@ where
let client = self.client.clone();
let subscriptions = self.subscriptions.clone();

let fut = async move {
let Some(handle) = subscriptions.get_subscription(&follow_subscription) else {
let block_guard = match subscriptions.lock_block(&follow_subscription, hash) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
let _ = sink.send(&ChainHeadEvent::<String>::Disjoint);
return
};

// Block is not part of the subscription.
if !handle.contains_block(&hash) {
return Ok(())
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
let _ = sink.reject(ChainHeadRpcError::InvalidBlock);
return
}
return Ok(())
},
Err(error) => {
let _ = sink.send(&ChainHeadEvent::<String>::Error(ErrorEvent {
error: error.to_string(),
}));
return Ok(())
},
};

let fut = async move {
let _block_guard = block_guard;
// The child key is provided, use the key to query the child trie.
if let Some(child_key) = child_key {
// The child key must not be prefixed with ":child_storage:" nor
Expand Down Expand Up @@ -367,21 +389,29 @@ where
let client = self.client.clone();
let subscriptions = self.subscriptions.clone();

let fut = async move {
let Some(handle) = subscriptions.get_subscription(&follow_subscription) else {
let block_guard = match subscriptions.lock_block(&follow_subscription, hash) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
let _ = sink.send(&ChainHeadEvent::<String>::Disjoint);
return
};

// Block is not part of the subscription.
if !handle.contains_block(&hash) {
return Ok(())
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
let _ = sink.reject(ChainHeadRpcError::InvalidBlock);
return
}
return Ok(())
},
Err(error) => {
let _ = sink.send(&ChainHeadEvent::<String>::Error(ErrorEvent {
error: error.to_string(),
}));
return Ok(())
},
};

let fut = async move {
// Reject subscription if runtime_updates is false.
if !handle.has_runtime_updates() {
if !block_guard.has_runtime_updates() {
let _ = sink.reject(ChainHeadRpcError::InvalidParam(
"The runtime updates flag must be set".into(),
));
Expand Down Expand Up @@ -417,15 +447,17 @@ where
follow_subscription: String,
hash: Block::Hash,
) -> RpcResult<()> {
let Some(handle) = self.subscriptions.get_subscription(&follow_subscription) else {
// Invalid invalid subscription ID.
return Ok(())
};

if !handle.unpin_block(&hash) {
return Err(ChainHeadRpcError::InvalidBlock.into())
match self.subscriptions.unpin_block(&follow_subscription, hash) {
Ok(()) => Ok(()),
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
Ok(())
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
Err(ChainHeadRpcError::InvalidBlock.into())
},
Err(_) => Err(ChainHeadRpcError::InvalidBlock.into()),
}

Ok(())
}
}
Loading

0 comments on commit cb5be45

Please sign in to comment.