Skip to content

Commit

Permalink
Introduce supported protocol ids and vers
Browse files Browse the repository at this point in the history
  • Loading branch information
kettlebell committed Nov 25, 2022
1 parent 45e1a15 commit 8c9ee32
Show file tree
Hide file tree
Showing 9 changed files with 407 additions and 235 deletions.
2 changes: 1 addition & 1 deletion spectrum-network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pub mod network_controller;
pub mod peer_conn_handler;
pub mod peer_manager;
pub mod protocol;
pub mod protocol_api;
pub mod protocol_handler;
pub mod protocol_upgrade;
pub mod types;
pub mod protocol_api;
87 changes: 48 additions & 39 deletions spectrum-network/src/network_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::peer_conn_handler::{ConnHandlerIn, ConnHandlerOut, PartialPeerConnHan
use crate::peer_manager::{PeerEvents, PeerManagerOut, Peers};
use crate::protocol::ProtocolConfig;
use crate::protocol_api::ProtocolEvents;
use crate::protocol_upgrade::supported_protocol_vers::SupportedProtocolIdMap;
use crate::types::{ProtocolId, ProtocolVer};

use libp2p::core::connection::ConnectionId;
Expand Down Expand Up @@ -107,7 +108,7 @@ impl NetworkAPI for NetworkMailbox {
pub struct NetworkController<TPeers, TPeerManager, THandler> {
conn_handler_conf: PeerConnHandlerConf,
/// All supported protocols and their handlers
supported_protocols: HashMap<ProtocolId, (ProtocolConfig, THandler)>,
supported_protocols: SupportedProtocolIdMap<(ProtocolConfig, THandler)>,
/// PeerManager API
peers: TPeers,
/// PeerManager stream itself
Expand All @@ -132,7 +133,7 @@ where
) -> Self {
Self {
conn_handler_conf,
supported_protocols,
supported_protocols: supported_protocols.into(),
peers,
peer_manager,
enabled_peers: HashMap::new(),
Expand All @@ -146,8 +147,7 @@ where
self.conn_handler_conf.clone(),
self.supported_protocols
.iter()
.clone()
.map(|(prot_id, (conf, _))| (*prot_id, conf.clone()))
.map(|(prot_id, (conf, _))| (prot_id, conf.clone()))
.collect::<Vec<_>>(),
)
}
Expand Down Expand Up @@ -247,7 +247,7 @@ where
{
let protocol_id = protocol_tag.protocol_id();
let protocol_ver = protocol_tag.protocol_ver();
match enabled_protocols.entry(protocol_id) {
match enabled_protocols.entry(protocol_id.get_inner()) {
Entry::Occupied(mut entry) => {
trace!(
"Current state of protocol {:?} is {:?}",
Expand All @@ -257,12 +257,12 @@ where
if let (EnabledProtocol::PendingEnable, handler) = entry.get() {
handler.protocol_enabled(
peer_id,
protocol_ver,
protocol_ver.get_inner(),
out_channel.clone(),
handshake,
);
let enabled_protocol = EnabledProtocol::Enabled {
ver: protocol_ver,
ver: protocol_ver.get_inner(),
sink: out_channel,
};
entry.insert((enabled_protocol, handler.clone()));
Expand All @@ -283,42 +283,51 @@ where
}) = self.enabled_peers.get_mut(&peer_id)
{
let protocol_id = protocol_tag.protocol_id();
if let Some((_, prot_handler)) = self.supported_protocols.get(&protocol_id) {
match enabled_protocols.entry(protocol_id) {
Entry::Vacant(entry) => {
entry.insert((EnabledProtocol::PendingApprove, prot_handler.clone()));
prot_handler.protocol_requested(
let (_, prot_handler) = self.supported_protocols.get_supported(protocol_id);
match enabled_protocols.entry(protocol_id.get_inner()) {
Entry::Vacant(entry) => {
entry.insert((EnabledProtocol::PendingApprove, prot_handler.clone()));
prot_handler.protocol_requested(
peer_id,
protocol_tag.protocol_ver().get_inner(),
handshake,
);
}
Entry::Occupied(_) => {
warn!(
"Peer {:?} opened already enabled protocol {:?}",
peer_id, protocol_id
);
self.pending_actions
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id,
protocol_tag.protocol_ver(),
handshake,
);
}
Entry::Occupied(_) => {
warn!(
"Peer {:?} opened already enabled protocol {:?}",
peer_id, protocol_id
);
self.pending_actions
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection),
event: ConnHandlerIn::Close(protocol_id),
})
}
handler: NotifyHandler::One(connection),
event: ConnHandlerIn::Close(protocol_id),
})
}
} else {
self.pending_actions
.push_back(NetworkBehaviourAction::NotifyHandler {
}
}
}
ConnHandlerOut::ClosedByPeer(protocol_id) | ConnHandlerOut::RefusedToOpen(protocol_id) => {
if let Some(ConnectedPeer::Connected {
enabled_protocols, ..
}) = self.enabled_peers.get_mut(&peer_id)
{
match enabled_protocols.entry(protocol_id.get_inner()) {
Entry::Occupied(entry) => {
trace!(
"Peer {:?} closed the substream for protocol {:?}",
peer_id,
handler: NotifyHandler::One(connection),
event: ConnHandlerIn::Close(protocol_id),
})
protocol_id
);
entry.remove();
}
Entry::Vacant(_) => {}
}
}
}
ConnHandlerOut::ClosedByPeer(protocol_id)
| ConnHandlerOut::RefusedToOpen(protocol_id)
| ConnHandlerOut::Closed(protocol_id) => {

ConnHandlerOut::Closed(protocol_id) => {
if let Some(ConnectedPeer::Connected {
enabled_protocols, ..
}) = self.enabled_peers.get_mut(&peer_id)
Expand Down Expand Up @@ -445,7 +454,7 @@ where
ConnectedPeer::Connected {
enabled_protocols, ..
} => {
if let Some((_, prot_handler)) = self.supported_protocols.get(&protocol) {
if let Some((_, prot_handler)) = self.supported_protocols.get(protocol) {
match enabled_protocols.entry(protocol) {
Entry::Occupied(_) => warn!(
"PM requested already enabled protocol {:?} with peer {:?}",
Expand Down Expand Up @@ -494,7 +503,7 @@ where
enabled_protocols,
}) = self.enabled_peers.get_mut(&peer_id)
{
let (_, prot_handler) = self.supported_protocols.get(&protocol_id).unwrap();
let (_, prot_handler) = self.supported_protocols.get(protocol_id).unwrap();
match enabled_protocols.entry(protocol_id) {
Entry::Occupied(protocol_entry) => match protocol_entry.remove_entry().1 {
// Protocol handler approves either outbound or inbound protocol request.
Expand Down
Loading

0 comments on commit 8c9ee32

Please sign in to comment.