Skip to content

Commit

Permalink
Transport and hanshake handler connection fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
netsirius authored and iduartgomez committed Sep 20, 2024
1 parent ffe372a commit 71cf889
Show file tree
Hide file tree
Showing 25 changed files with 928 additions and 1,194 deletions.
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ tracing = "0.1"
[features]
default = ["redb", "trace", "websocket"]
local-mode = []
local-simulation = []
network-mode = []
sqlite = ["sqlx"]
trace = ["tracing-subscriber"]
Expand Down
36 changes: 23 additions & 13 deletions crates/core/src/client_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,13 @@ pub(crate) mod test {
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};

use crate::node::{testing_impl::EventId, PeerId};
use crate::{node::testing_impl::EventId, transport::TransportPublicKey};

use super::*;

pub struct MemoryEventsGen<R = rand::rngs::SmallRng> {
id: PeerId,
signal: Receiver<(EventId, PeerId)>,
key: TransportPublicKey,
signal: Receiver<(EventId, TransportPublicKey)>,
events_to_gen: HashMap<EventId, ClientRequest<'static>>,
rng: Option<R>,
internal_state: Option<InternalGeneratorState>,
Expand All @@ -186,10 +186,14 @@ pub(crate) mod test {
where
R: RandomEventGenerator,
{
pub fn new_with_seed(signal: Receiver<(EventId, PeerId)>, id: PeerId, seed: u64) -> Self {
pub fn new_with_seed(
signal: Receiver<(EventId, TransportPublicKey)>,
key: TransportPublicKey,
seed: u64,
) -> Self {
Self {
signal,
id,
key,
events_to_gen: HashMap::new(),
rng: Some(R::seed_from_u64(seed)),
internal_state: None,
Expand Down Expand Up @@ -233,10 +237,13 @@ pub(crate) mod test {

impl MemoryEventsGen {
#[cfg(test)]
pub fn new(signal: Receiver<(EventId, PeerId)>, id: PeerId) -> Self {
pub fn new(
signal: Receiver<(EventId, TransportPublicKey)>,
key: TransportPublicKey,
) -> Self {
Self {
signal,
id,
key,
events_to_gen: HashMap::new(),
rng: None,
internal_state: None,
Expand Down Expand Up @@ -267,7 +274,7 @@ pub(crate) mod test {
loop {
if self.signal.changed().await.is_ok() {
let (ev_id, pk) = self.signal.borrow().clone();
if self.rng.is_some() && pk == self.id {
if self.rng.is_some() && pk == self.key {
let res = OpenRequest {
client_id: ClientId::FIRST,
request: self
Expand All @@ -279,7 +286,7 @@ pub(crate) mod test {
token: None,
};
return Ok(res.into_owned());
} else if pk == self.id {
} else if pk == self.key {
let res = OpenRequest {
client_id: ClientId::FIRST,
request: self
Expand Down Expand Up @@ -321,7 +328,7 @@ pub(crate) mod test {
}

pub struct NetworkEventGenerator<R = rand::rngs::SmallRng> {
id: PeerId,
id: TransportPublicKey,
memory_event_generator: MemoryEventsGen<R>,
ws_client: Arc<Mutex<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
}
Expand All @@ -331,7 +338,7 @@ pub(crate) mod test {
R: RandomEventGenerator,
{
pub fn new(
id: PeerId,
id: TransportPublicKey,
memory_event_generator: MemoryEventsGen<R>,
ws_client: Arc<Mutex<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
) -> Self {
Expand Down Expand Up @@ -359,8 +366,11 @@ pub(crate) mod test {

match message {
Some(Ok(Message::Binary(data))) => {
if let Ok((_, pk)) = bincode::deserialize::<(EventId, PeerId)>(&data) {
if pk == self.id {
if let Ok((id, pub_key)) =
bincode::deserialize::<(EventId, TransportPublicKey)>(&data)
{
tracing::debug!(peer = %self.id, %id, "Received event from the supervisor");
if &pub_key == &self.id {
let res = OpenRequest {
client_id: ClientId::FIRST,
request: self
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub mod dev_tool {
InitPeerNode, NodeConfig, PeerId,
};
pub use ring::Location;
pub use transport::TransportKeypair;
pub use transport::{TransportKeypair, TransportPublicKey};
pub use wasm_runtime::{ContractStore, DelegateStore, Runtime, SecretsStore, StateStore};
}

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ pub(crate) enum NodeEvent {
ConnectPeer {
peer: PeerId,
tx: Transaction,
callback: tokio::sync::mpsc::Sender<Result<(), ()>>,
callback: tokio::sync::mpsc::Sender<Result<PeerId, ()>>,
is_gw: bool,
},
Disconnect {
Expand Down
20 changes: 16 additions & 4 deletions crates/core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{
borrow::Cow,
fmt::Display,
fs::File,
hash::Hash,
io::Read,
net::{IpAddr, SocketAddr, ToSocketAddrs},
sync::Arc,
Expand Down Expand Up @@ -704,7 +705,6 @@ async fn process_message_v1<CB>(
let span = tracing::info_span!(
parent: parent_span,
"handle_connect_op_request",
peer = ?op_manager.ring.connection_manager.get_peer_key(),
transaction = %msg.id(),
tx_type = %msg.id().transaction_type()
);
Expand Down Expand Up @@ -919,12 +919,24 @@ where
///
/// A gateway will have its `PeerId` set when it is created since it will know its own address
/// from the start.
#[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Clone)]
#[derive(Serialize, Deserialize, Eq, Clone)]
pub struct PeerId {
pub addr: SocketAddr,
pub pub_key: TransportPublicKey,
}

impl Hash for PeerId {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.addr.hash(state);
}
}

impl PartialEq<PeerId> for PeerId {
fn eq(&self, other: &PeerId) -> bool {
self.addr == other.addr
}
}

impl Ord for PeerId {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.addr.cmp(&other.addr)
Expand Down Expand Up @@ -1010,7 +1022,7 @@ impl std::fmt::Debug for PeerId {

impl Display for PeerId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.addr)
write!(f, "{:?}", self.pub_key)
}
}

Expand Down Expand Up @@ -1132,7 +1144,7 @@ pub async fn run_network_node(mut node: Node) -> anyhow::Result<()> {
node.0
.peer_id
.clone()
.map(|id| Location::from_address(&id.addr()))
.map(|id| Location::from_address(&id.addr))
})
.flatten();

Expand Down
5 changes: 4 additions & 1 deletion crates/core/src/node/network_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ pub(crate) enum ConnectionError {
Serialization(#[from] Option<Box<bincode::ErrorKind>>),
#[error("{0}")]
TransportError(String),
#[error("unwanted connection")]
#[error("failed connect")]
FailedConnectOp,
#[error("unwanted connection")]
UnwantedConnection,

// errors produced while handling the connection:
#[error("IO error: {0}")]
Expand Down Expand Up @@ -75,6 +77,7 @@ impl Clone for ConnectionError {
Self::UnexpectedReq => Self::UnexpectedReq,
Self::TransportError(err) => Self::TransportError(err.clone()),
Self::FailedConnectOp => Self::FailedConnectOp,
Self::UnwantedConnection => Self::UnwantedConnection,
}
}
}
Expand Down
Loading

0 comments on commit 71cf889

Please sign in to comment.