Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network/sync: Use multiple supported protocol names #6602

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ where
) -> (Self, Network::RequestResponseProtocolConfig) {
let (request_receiver, config): (_, Network::RequestResponseProtocolConfig) =
on_demand_justifications_protocol_config::<_, _, Network>(genesis_hash, fork_id);
let justif_protocol_name = config.protocol_name().clone();
let justif_protocol_name = config.protocol_names().protocol_name().clone();
let metrics = register_metrics(prometheus_registry);
(
Self { request_receiver, justif_protocol_name, client, metrics, _block: PhantomData },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
peer_store::PeerStoreProvider,
request_responses::{IncomingRequest, OutgoingResponse},
service::{metrics::Metrics, traits::RequestResponseConfig as RequestResponseConfigT},
types::ProtocolSupportedNames,
IfDisconnected, OutboundFailure, ProtocolName, RequestFailure,
};

Expand Down Expand Up @@ -150,8 +151,8 @@ impl RequestResponseConfig {
}

impl RequestResponseConfigT for RequestResponseConfig {
fn protocol_name(&self) -> &ProtocolName {
&self.protocol_name
fn protocol_names(&self) -> ProtocolSupportedNames {
ProtocolSupportedNames::new(self.protocol_name.clone(), self.fallback_names.clone())
}
}

Expand Down
6 changes: 3 additions & 3 deletions substrate/client/network/src/request_responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
use crate::{
peer_store::{PeerStoreProvider, BANNED_THRESHOLD},
service::traits::RequestResponseConfig as RequestResponseConfigT,
types::ProtocolName,
types::{ProtocolName, ProtocolSupportedNames},
ReputationChange,
};

Expand Down Expand Up @@ -194,8 +194,8 @@ pub struct ProtocolConfig {
}

impl RequestResponseConfigT for ProtocolConfig {
fn protocol_name(&self) -> &ProtocolName {
&self.name
fn protocol_names(&self) -> ProtocolSupportedNames {
ProtocolSupportedNames::new(self.name.clone(), self.fallback_names.clone())
}
}

Expand Down
6 changes: 3 additions & 3 deletions substrate/client/network/src/service/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
network_state::NetworkState,
request_responses::{IfDisconnected, RequestFailure},
service::{metrics::NotificationMetrics, signature::Signature, PeerStoreProvider},
types::ProtocolName,
types::{ProtocolName, ProtocolSupportedNames},
ReputationChange,
};

Expand Down Expand Up @@ -94,8 +94,8 @@ pub trait NotificationConfig: Debug {

/// Trait defining the required functionality from a request-response protocol configuration.
pub trait RequestResponseConfig: Debug {
/// Get protocol name.
fn protocol_name(&self) -> &ProtocolName;
/// Get protocol supported names.
fn protocol_names(&self) -> ProtocolSupportedNames;
}

/// Trait defining required functionality from `PeerStore`.
Expand Down
33 changes: 33 additions & 0 deletions substrate/client/network/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,39 @@ use std::{
sync::Arc,
};

/// The supported names for a request response protocol.
///
/// A protocol may support a main protocol name and a list of fallback protocol names.
#[derive(Debug, Clone)]
pub struct ProtocolSupportedNames {
/// The main protocol name.
protocol_name: ProtocolName,
/// The fallback protocol names.
fallback_protocol_names: Vec<ProtocolName>,
}

impl ProtocolSupportedNames {
/// Create a new instance of [`ProtocolSupportedNames`].
pub fn new(protocol_name: ProtocolName, fallback_protocol_names: Vec<ProtocolName>) -> Self {
Self { protocol_name, fallback_protocol_names }
}

/// Get the protocol name.
pub fn protocol_name(&self) -> &ProtocolName {
&self.protocol_name
}

/// Get the fallback protocol names.
pub fn fallback_protocol_names(&self) -> &Vec<ProtocolName> {
&self.fallback_protocol_names
}

/// Check if the protocol name is supported.
pub fn is_supported(&self, protocol_name: &ProtocolName) -> bool {
&self.protocol_name == protocol_name || self.fallback_protocol_names.contains(protocol_name)
}
}

/// The protocol name transmitted on the wire.
#[derive(Debug, Clone)]
pub enum ProtocolName {
Expand Down
8 changes: 5 additions & 3 deletions substrate/client/network/sync/src/block_relay_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
//! Block relay protocol related definitions.

use futures::channel::oneshot;
use sc_network::{request_responses::RequestFailure, NetworkBackend, ProtocolName};
use sc_network::{
request_responses::RequestFailure, types::ProtocolSupportedNames, NetworkBackend, ProtocolName,
};
use sc_network_common::sync::message::{BlockData, BlockRequest};
use sc_network_types::PeerId;
use sp_runtime::traits::Block as BlockT;
Expand All @@ -35,8 +37,8 @@ pub trait BlockServer<Block: BlockT>: Send {
/// that can be used to initiate concurrent downloads.
#[async_trait::async_trait]
pub trait BlockDownloader<Block: BlockT>: fmt::Debug + Send + Sync {
/// Protocol name used by block downloader.
fn protocol_name(&self) -> &ProtocolName;
/// Protocol names used by block downloader.
fn protocol_names(&self) -> ProtocolSupportedNames;

/// Performs the protocol specific sequence to fetch the blocks from the peer.
/// Output: if the download succeeds, the response is a `Vec<u8>` which is
Expand Down
16 changes: 8 additions & 8 deletions substrate/client/network/sync/src/block_request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use sc_network::{
config::ProtocolId,
request_responses::{IfDisconnected, IncomingRequest, OutgoingResponse, RequestFailure},
service::traits::RequestResponseConfig,
types::ProtocolName,
types::{ProtocolName, ProtocolSupportedNames},
NetworkBackend, MAX_RESPONSE_SIZE,
};
use sc_network_common::sync::message::{BlockAttributes, BlockData, BlockRequest, FromBlock};
Expand Down Expand Up @@ -190,7 +190,7 @@ where
BlockRelayParams {
server: Box::new(Self { client, request_receiver, seen_requests }),
downloader: Arc::new(FullBlockDownloader::new(
protocol_config.protocol_name().clone(),
protocol_config.protocol_names(),
network,
)),
request_response_config: protocol_config,
Expand Down Expand Up @@ -504,13 +504,13 @@ enum HandleRequestError {
/// The full block downloader implementation of [`BlockDownloader].
#[derive(Debug)]
pub struct FullBlockDownloader {
protocol_name: ProtocolName,
protocol_names: ProtocolSupportedNames,
network: NetworkServiceHandle,
}

impl FullBlockDownloader {
fn new(protocol_name: ProtocolName, network: NetworkServiceHandle) -> Self {
Self { protocol_name, network }
fn new(protocol_names: ProtocolSupportedNames, network: NetworkServiceHandle) -> Self {
Self { protocol_names, network }
}

/// Extracts the blocks from the response schema.
Expand Down Expand Up @@ -577,8 +577,8 @@ impl FullBlockDownloader {

#[async_trait::async_trait]
impl<B: BlockT> BlockDownloader<B> for FullBlockDownloader {
fn protocol_name(&self) -> &ProtocolName {
&self.protocol_name
fn protocol_names(&self) -> ProtocolSupportedNames {
self.protocol_names.clone()
}

async fn download_blocks(
Expand All @@ -602,7 +602,7 @@ impl<B: BlockT> BlockDownloader<B> for FullBlockDownloader {
let (tx, rx) = oneshot::channel();
self.network.start_request(
who,
self.protocol_name.clone(),
self.protocol_names.protocol_name().clone(),
bytes,
tx,
IfDisconnected::ImmediateError,
Expand Down
4 changes: 2 additions & 2 deletions substrate/client/network/sync/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use crate::block_relay_protocol::{BlockDownloader as BlockDownloaderT, BlockResponseError};

use futures::channel::oneshot;
use sc_network::{ProtocolName, RequestFailure};
use sc_network::{types::ProtocolSupportedNames, ProtocolName, RequestFailure};
use sc_network_common::sync::message::{BlockData, BlockRequest};
use sc_network_types::PeerId;
use sp_runtime::traits::Block as BlockT;
Expand All @@ -32,7 +32,7 @@ mockall::mock! {

#[async_trait::async_trait]
impl<Block: BlockT> BlockDownloaderT<Block> for BlockDownloader<Block> {
fn protocol_name(&self) -> &ProtocolName;
fn protocol_names(&self) -> ProtocolSupportedNames;

async fn download_blocks(
&self,
Expand Down
36 changes: 21 additions & 15 deletions substrate/client/network/sync/src/strategy/chain_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use prometheus_endpoint::{register, Gauge, PrometheusError, Registry, U64};
use prost::Message;
use sc_client_api::{blockchain::BlockGap, BlockBackend, ProofProvider};
use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock};
use sc_network::{IfDisconnected, ProtocolName};
use sc_network::{types::ProtocolSupportedNames, IfDisconnected, ProtocolName};
use sc_network_common::sync::message::{
BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, Direction, FromBlock,
};
Expand Down Expand Up @@ -326,8 +326,8 @@ pub struct ChainSync<B: BlockT, Client> {
max_parallel_downloads: u32,
/// Maximum blocks per request.
max_blocks_per_request: u32,
/// Protocol name used to send out state requests
state_request_protocol_name: ProtocolName,
/// Protocol names used to send out state requests
state_request_protocol_names: ProtocolSupportedNames,
/// Total number of downloaded blocks.
downloaded_blocks: usize,
/// State sync in progress, if any.
Expand Down Expand Up @@ -589,7 +589,7 @@ where
return;
}

if protocol_name == self.state_request_protocol_name {
if self.state_request_protocol_names.is_supported(&protocol_name) {
let Ok(response) = response.downcast::<Vec<u8>>() else {
warn!(target: LOG_TARGET, "Failed to downcast state response");
debug_assert!(false);
Expand All @@ -599,7 +599,11 @@ where
if let Err(bad_peer) = self.on_state_data(&peer_id, &response) {
self.actions.push(SyncingAction::DropPeer(bad_peer));
}
} else if &protocol_name == self.block_downloader.protocol_name() {

return;
}

if self.block_downloader.protocol_names().is_supported(&protocol_name) {
let Ok(response) = response
.downcast::<(BlockRequest<B>, Result<Vec<BlockData<B>>, BlockResponseError>)>()
else {
Expand Down Expand Up @@ -636,14 +640,16 @@ where
if let Err(bad_peer) = self.on_block_response(peer_id, key, request, blocks) {
self.actions.push(SyncingAction::DropPeer(bad_peer));
}
} else {
warn!(
target: LOG_TARGET,
"Unexpected generic response protocol {protocol_name}, strategy key \
{key:?}",
);
debug_assert!(false);

return;
}

warn!(
target: LOG_TARGET,
"Unexpected generic response protocol {protocol_name}, strategy key \
{key:?}",
);
debug_assert!(false);
}

fn on_blocks_processed(
Expand Down Expand Up @@ -901,7 +907,7 @@ where

network_service.start_request(
peer_id,
self.state_request_protocol_name.clone(),
self.state_request_protocol_names.protocol_name().clone(),
request.encode_to_vec(),
tx,
IfDisconnected::ImmediateError,
Expand Down Expand Up @@ -945,7 +951,7 @@ where
client: Arc<Client>,
max_parallel_downloads: u32,
max_blocks_per_request: u32,
state_request_protocol_name: ProtocolName,
state_request_protocol_names: ProtocolSupportedNames,
block_downloader: Arc<dyn BlockDownloader<B>>,
metrics_registry: Option<&Registry>,
initial_peers: impl Iterator<Item = (PeerId, B::Hash, NumberFor<B>)>,
Expand All @@ -965,7 +971,7 @@ where
allowed_requests: Default::default(),
max_parallel_downloads,
max_blocks_per_request,
state_request_protocol_name,
state_request_protocol_names,
downloaded_blocks: 0,
state_sync: None,
import_existing: false,
Expand Down
19 changes: 10 additions & 9 deletions substrate/client/network/sync/src/strategy/polkadot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ use log::{debug, error, info, warn};
use prometheus_endpoint::Registry;
use sc_client_api::{BlockBackend, ProofProvider};
use sc_consensus::{BlockImportError, BlockImportStatus};
use sc_network::ProtocolName;
use sc_network::{types::ProtocolSupportedNames, ProtocolName};
use sc_network_common::sync::{message::BlockAnnounce, SyncMode};
use sc_network_types::PeerId;
use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
use sp_runtime::traits::{Block as BlockT, Header, NumberFor};

use std::{any::Any, collections::HashMap, sync::Arc};

/// Corresponding `ChainSync` mode.
Expand All @@ -67,8 +68,8 @@ where
pub max_blocks_per_request: u32,
/// Prometheus metrics registry.
pub metrics_registry: Option<Registry>,
/// Protocol name used to send out state requests
pub state_request_protocol_name: ProtocolName,
/// Protocol names used to send out state requests
pub state_request_protocol_names: ProtocolSupportedNames,
/// Block downloader
pub block_downloader: Arc<dyn BlockDownloader<Block>>,
}
Expand Down Expand Up @@ -342,7 +343,7 @@ where
mut config: PolkadotSyncingStrategyConfig<B>,
client: Arc<Client>,
warp_sync_config: Option<WarpSyncConfig<B>>,
warp_sync_protocol_name: Option<ProtocolName>,
warp_sync_protocol_names: Option<ProtocolSupportedNames>,
) -> Result<Self, ClientError> {
if config.max_blocks_per_request > MAX_BLOCKS_IN_RESPONSE as u32 {
info!(
Expand All @@ -358,7 +359,7 @@ where
let warp_sync = WarpSync::new(
client.clone(),
warp_sync_config,
warp_sync_protocol_name,
warp_sync_protocol_names,
config.block_downloader.clone(),
);
Ok(Self {
Expand All @@ -375,7 +376,7 @@ where
client.clone(),
config.max_parallel_downloads,
config.max_blocks_per_request,
config.state_request_protocol_name.clone(),
config.state_request_protocol_names.clone(),
config.block_downloader.clone(),
config.metrics_registry.as_ref(),
std::iter::empty(),
Expand Down Expand Up @@ -410,7 +411,7 @@ where
self.peer_best_blocks
.iter()
.map(|(peer_id, (_, best_number))| (*peer_id, *best_number)),
self.config.state_request_protocol_name.clone(),
self.config.state_request_protocol_names.protocol_name().clone(),
);

self.warp = None;
Expand All @@ -427,7 +428,7 @@ where
self.client.clone(),
self.config.max_parallel_downloads,
self.config.max_blocks_per_request,
self.config.state_request_protocol_name.clone(),
self.config.state_request_protocol_names.clone(),
self.config.block_downloader.clone(),
self.config.metrics_registry.as_ref(),
self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| {
Expand Down Expand Up @@ -457,7 +458,7 @@ where
self.client.clone(),
self.config.max_parallel_downloads,
self.config.max_blocks_per_request,
self.config.state_request_protocol_name.clone(),
self.config.state_request_protocol_names.clone(),
self.config.block_downloader.clone(),
self.config.metrics_registry.as_ref(),
self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| {
Expand Down
Loading
Loading