diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 0780ca657db..24a32de4cc7 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -61,7 +61,7 @@ use crate::types::{ ControlAction, Message, MessageAcceptance, MessageId, PeerInfo, RawMessage, Subscription, SubscriptionAction, }; -use crate::types::{PeerConnections, PeerKind, Rpc}; +use crate::types::{PeerConnections, PeerKind, RpcOut}; use crate::{rpc_proto::proto, TopicScoreParams}; use crate::{PublishError, SubscriptionError, ValidationError}; use instant::SystemTime; @@ -534,23 +534,10 @@ where } // send subscription request to all peers - let peer_list = self.peer_topics.keys().cloned().collect::>(); - if !peer_list.is_empty() { - let event = Rpc { - messages: Vec::new(), - subscriptions: vec![Subscription { - topic_hash: topic_hash.clone(), - action: SubscriptionAction::Subscribe, - }], - control_msgs: Vec::new(), - } - .into_protobuf(); - - for peer in peer_list { - tracing::debug!(%peer, "Sending SUBSCRIBE to peer"); - self.send_message(peer, event.clone()) - .map_err(SubscriptionError::PublishError)?; - } + for peer in self.peer_topics.keys().copied().collect::>() { + tracing::debug!(%peer, "Sending SUBSCRIBE to peer"); + let event = RpcOut::Subscribe(topic_hash.clone()); + self.send_message(peer, event); } // call JOIN(topic) @@ -574,22 +561,10 @@ where } // announce to all peers - let peer_list = self.peer_topics.keys().cloned().collect::>(); - if !peer_list.is_empty() { - let event = Rpc { - messages: Vec::new(), - subscriptions: vec![Subscription { - topic_hash: topic_hash.clone(), - action: SubscriptionAction::Unsubscribe, - }], - control_msgs: Vec::new(), - } - .into_protobuf(); - - for peer in peer_list { - tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer"); - self.send_message(peer, event.clone())?; - } + for peer in self.peer_topics.keys().copied().collect::>() { + tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer"); + let event = RpcOut::Unsubscribe(topic_hash.clone()); + self.send_message(peer, event); } // call LEAVE(topic) @@ -624,15 +599,8 @@ where topic: raw_message.topic.clone(), }); - let event = Rpc { - subscriptions: Vec::new(), - messages: vec![raw_message.clone()], - control_msgs: Vec::new(), - } - .into_protobuf(); - // check that the size doesn't exceed the max transmission size - if event.get_size() > self.config.max_transmit_size() { + if raw_message.raw_protobuf_len() > self.config.max_transmit_size() { return Err(PublishError::MessageTooLarge); } @@ -651,23 +619,60 @@ where let topic_hash = raw_message.topic.clone(); - // If we are not flood publishing forward the message to mesh peers. - let mesh_peers_sent = !self.config.flood_publish() - && self.forward_msg(&msg_id, raw_message.clone(), None, HashSet::new())?; - let mut recipient_peers = HashSet::new(); if let Some(set) = self.topic_peers.get(&topic_hash) { if self.config.flood_publish() { // Forward to all peers above score and all explicit peers - recipient_peers.extend( - set.iter() - .filter(|p| { - self.explicit_peers.contains(*p) - || !self.score_below_threshold(p, |ts| ts.publish_threshold).0 - }) - .cloned(), - ); + recipient_peers.extend(set.iter().filter(|p| { + self.explicit_peers.contains(*p) + || !self.score_below_threshold(p, |ts| ts.publish_threshold).0 + })); } else { + match self.mesh.get(&raw_message.topic) { + // Mesh peers + Some(mesh_peers) => { + recipient_peers.extend(mesh_peers); + } + // Gossipsub peers + None => { + tracing::debug!(topic=%topic_hash, "Topic not in the mesh"); + // If we have fanout peers add them to the map. + if self.fanout.contains_key(&topic_hash) { + for peer in self.fanout.get(&topic_hash).expect("Topic must exist") { + recipient_peers.insert(*peer); + } + } else { + // We have no fanout peers, select mesh_n of them and add them to the fanout + let mesh_n = self.config.mesh_n(); + let new_peers = get_random_peers( + &self.topic_peers, + &self.connected_peers, + &topic_hash, + mesh_n, + { + |p| { + !self.explicit_peers.contains(p) + && !self + .score_below_threshold(p, |pst| { + pst.publish_threshold + }) + .0 + } + }, + ); + // Add the new peers to the fanout and recipient peers + self.fanout.insert(topic_hash.clone(), new_peers.clone()); + for peer in new_peers { + tracing::debug!(%peer, "Peer added to fanout"); + recipient_peers.insert(peer); + } + } + // We are publishing to fanout peers - update the time we published + self.fanout_last_pub + .insert(topic_hash.clone(), Instant::now()); + } + } + // Explicit peers for peer in &self.explicit_peers { if set.contains(peer) { @@ -685,54 +690,17 @@ where recipient_peers.insert(*peer); } } - - // Gossipsub peers - if self.mesh.get(&topic_hash).is_none() { - tracing::debug!(topic=%topic_hash, "Topic not in the mesh"); - // If we have fanout peers add them to the map. - if self.fanout.contains_key(&topic_hash) { - for peer in self.fanout.get(&topic_hash).expect("Topic must exist") { - recipient_peers.insert(*peer); - } - } else { - // We have no fanout peers, select mesh_n of them and add them to the fanout - let mesh_n = self.config.mesh_n(); - let new_peers = get_random_peers( - &self.topic_peers, - &self.connected_peers, - &topic_hash, - mesh_n, - { - |p| { - !self.explicit_peers.contains(p) - && !self - .score_below_threshold(p, |pst| pst.publish_threshold) - .0 - } - }, - ); - // Add the new peers to the fanout and recipient peers - self.fanout.insert(topic_hash.clone(), new_peers.clone()); - for peer in new_peers { - tracing::debug!(%peer, "Peer added to fanout"); - recipient_peers.insert(peer); - } - } - // We are publishing to fanout peers - update the time we published - self.fanout_last_pub - .insert(topic_hash.clone(), Instant::now()); - } } } - if recipient_peers.is_empty() && !mesh_peers_sent { + if recipient_peers.is_empty() { return Err(PublishError::InsufficientPeers); } // If the message isn't a duplicate and we have sent it to some peers add it to the // duplicate cache and memcache. self.duplicate_cache.insert(msg_id.clone()); - self.mcache.put(&msg_id, raw_message); + self.mcache.put(&msg_id, raw_message.clone()); // If the message is anonymous or has a random author add it to the published message ids // cache. @@ -743,14 +711,9 @@ where } // Send to peers we know are subscribed to the topic. - let msg_bytes = event.get_size(); for peer_id in recipient_peers.iter() { tracing::trace!(peer=%peer_id, "Sending message to peer"); - self.send_message(*peer_id, event.clone())?; - - if let Some(m) = self.metrics.as_mut() { - m.msg_sent(&topic_hash, msg_bytes); - } + self.send_message(*peer_id, RpcOut::Publish(raw_message.clone())); } tracing::debug!(message=%msg_id, "Published message"); @@ -979,7 +942,7 @@ where "JOIN: Adding {:?} peers from the fanout for topic", add_peers ); - added_peers.extend(peers.iter().cloned().take(add_peers)); + added_peers.extend(peers.iter().take(add_peers)); self.mesh.insert( topic_hash.clone(), @@ -1331,13 +1294,15 @@ where } tracing::debug!(peer=%peer_id, "Handling IWANT for peer"); - // build a hashmap of available messages - let mut cached_messages = HashMap::new(); for id in iwant_msgs { - // If we have it and the IHAVE count is not above the threshold, add it do the - // cached_messages mapping - if let Some((msg, count)) = self.mcache.get_with_iwant_counts(&id, peer_id) { + // If we have it and the IHAVE count is not above the threshold, + // foward the message. + if let Some((msg, count)) = self + .mcache + .get_with_iwant_counts(&id, peer_id) + .map(|(msg, count)| (msg.clone(), count)) + { if count > self.config.gossip_retransimission() { tracing::debug!( peer=%peer_id, @@ -1345,36 +1310,8 @@ where "IWANT: Peer has asked for message too many times; ignoring request" ); } else { - cached_messages.insert(id.clone(), msg.clone()); - } - } - } - - if !cached_messages.is_empty() { - tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer"); - // Send the messages to the peer - let message_list: Vec<_> = cached_messages.into_iter().map(|entry| entry.1).collect(); - - let topics = message_list - .iter() - .map(|message| message.topic.clone()) - .collect::>(); - - let message = Rpc { - subscriptions: Vec::new(), - messages: message_list, - control_msgs: Vec::new(), - } - .into_protobuf(); - - let msg_bytes = message.get_size(); - - if self.send_message(*peer_id, message).is_err() { - tracing::error!("Failed to send cached messages. Messages too large"); - } else if let Some(m) = self.metrics.as_mut() { - // Sending of messages succeeded, register them on the internal metrics. - for topic in topics.iter() { - m.msg_sent(topic, msg_bytes); + tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer"); + self.send_message(*peer_id, RpcOut::Forward(msg)); } } } @@ -1527,27 +1464,18 @@ where if !to_prune_topics.is_empty() { // build the prune messages to send let on_unsubscribe = false; - let prune_messages = to_prune_topics + for action in to_prune_topics .iter() .map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe)) - .collect(); + .collect::>() + { + self.send_message(*peer_id, RpcOut::Control(action)); + } // Send the prune messages to the peer tracing::debug!( peer=%peer_id, "GRAFT: Not subscribed to topics - Sending PRUNE to peer" ); - - if let Err(e) = self.send_message( - *peer_id, - Rpc { - subscriptions: Vec::new(), - messages: Vec::new(), - control_msgs: prune_messages, - } - .into_protobuf(), - ) { - tracing::error!("Failed to send PRUNE: {:?}", e); - } } tracing::debug!(peer=%peer_id, "Completed GRAFT handling for peer"); } @@ -2036,23 +1964,12 @@ where // If we need to send grafts to peer, do so immediately, rather than waiting for the // heartbeat. - if !topics_to_graft.is_empty() - && self - .send_message( - *propagation_source, - Rpc { - subscriptions: Vec::new(), - messages: Vec::new(), - control_msgs: topics_to_graft - .into_iter() - .map(|topic_hash| ControlAction::Graft { topic_hash }) - .collect(), - } - .into_protobuf(), - ) - .is_err() + for action in topics_to_graft + .into_iter() + .map(|topic_hash| ControlAction::Graft { topic_hash }) + .collect::>() { - tracing::error!("Failed sending grafts. Message too large"); + self.send_message(*propagation_source, RpcOut::Control(action)) } // Notify the application of the subscriptions @@ -2204,7 +2121,7 @@ where // shuffle the peers and then sort by score ascending beginning with the worst let mut rng = thread_rng(); - let mut shuffled = peers.iter().cloned().collect::>(); + let mut shuffled = peers.iter().copied().collect::>(); shuffled.shuffle(&mut rng); shuffled.sort_by(|p1, p2| { let score_p1 = *scores.get(p1).unwrap_or(&0.0); @@ -2578,12 +2495,9 @@ where &self.connected_peers, ); } - let mut control_msgs: Vec = topics - .iter() - .map(|topic_hash| ControlAction::Graft { - topic_hash: topic_hash.clone(), - }) - .collect(); + let control_msgs = topics.iter().map(|topic_hash| ControlAction::Graft { + topic_hash: topic_hash.clone(), + }); // If there are prunes associated with the same peer add them. // NOTE: In this case a peer has been added to a topic mesh, and removed from another. @@ -2591,52 +2505,37 @@ where // of its removal from another. // The following prunes are not due to unsubscribing. - let on_unsubscribe = false; - if let Some(topics) = to_prune.remove(&peer) { - let mut prunes = topics - .iter() - .map(|topic_hash| { - self.make_prune( - topic_hash, - &peer, - self.config.do_px() && !no_px.contains(&peer), - on_unsubscribe, - ) - }) - .collect::>(); - control_msgs.append(&mut prunes); - } + let prunes = to_prune + .remove(&peer) + .into_iter() + .flatten() + .map(|topic_hash| { + self.make_prune( + &topic_hash, + &peer, + self.config.do_px() && !no_px.contains(&peer), + false, + ) + }); // send the control messages - if self - .send_message( - peer, - Rpc { - subscriptions: Vec::new(), - messages: Vec::new(), - control_msgs, - } - .into_protobuf(), - ) - .is_err() - { - tracing::error!("Failed to send control messages. Message too large"); + for msg in control_msgs.chain(prunes).collect::>() { + self.send_message(peer, RpcOut::Control(msg)); } } // handle the remaining prunes // The following prunes are not due to unsubscribing. - let on_unsubscribe = false; for (peer, topics) in to_prune.iter() { - let mut remaining_prunes = Vec::new(); for topic_hash in topics { let prune = self.make_prune( topic_hash, peer, self.config.do_px() && !no_px.contains(peer), - on_unsubscribe, + false, ); - remaining_prunes.push(prune); + self.send_message(*peer, RpcOut::Control(prune)); + // inform the handler peer_removed_from_mesh( *peer, @@ -2647,21 +2546,6 @@ where &self.connected_peers, ); } - - if self - .send_message( - *peer, - Rpc { - subscriptions: Vec::new(), - messages: Vec::new(), - control_msgs: remaining_prunes, - } - .into_protobuf(), - ) - .is_err() - { - tracing::error!("Failed to send prune messages. Message too large"); - } } } @@ -2718,20 +2602,11 @@ where // forward the message to peers if !recipient_peers.is_empty() { - let event = Rpc { - subscriptions: Vec::new(), - messages: vec![message.clone()], - control_msgs: Vec::new(), - } - .into_protobuf(); + let event = RpcOut::Forward(message.clone()); - let msg_bytes = event.get_size(); for peer in recipient_peers.iter() { tracing::debug!(%peer, message=%msg_id, "Sending message to peer"); - self.send_message(*peer, event.clone())?; - if let Some(m) = self.metrics.as_mut() { - m.msg_sent(&message.topic, msg_bytes); - } + self.send_message(*peer, event.clone()); } tracing::debug!("Completed forwarding message"); Ok(true) @@ -2844,19 +2719,8 @@ where /// Takes each control action mapping and turns it into a message fn flush_control_pool(&mut self) { for (peer, controls) in self.control_pool.drain().collect::>() { - if self - .send_message( - peer, - Rpc { - subscriptions: Vec::new(), - messages: Vec::new(), - control_msgs: controls, - } - .into_protobuf(), - ) - .is_err() - { - tracing::error!("Failed to flush control pool. Message too large"); + for msg in controls { + self.send_message(peer, RpcOut::Control(msg)); } } @@ -2864,144 +2728,21 @@ where self.pending_iwant_msgs.clear(); } - /// Send a [`Rpc`] message to a peer. This will wrap the message in an arc if it + /// Send a [`RpcOut`] message to a peer. This will wrap the message in an arc if it /// is not already an arc. - fn send_message(&mut self, peer_id: PeerId, message: proto::RPC) -> Result<(), PublishError> { - // If the message is oversized, try and fragment it. If it cannot be fragmented, log an - // error and drop the message (all individual messages should be small enough to fit in the - // max_transmit_size) - - let messages = self.fragment_message(message)?; - - for message in messages { - self.events.push_back(ToSwarm::NotifyHandler { - peer_id, - event: HandlerIn::Message(message), - handler: NotifyHandler::Any, - }) - } - Ok(()) - } - - // If a message is too large to be sent as-is, this attempts to fragment it into smaller RPC - // messages to be sent. - fn fragment_message(&self, rpc: proto::RPC) -> Result, PublishError> { - if rpc.get_size() < self.config.max_transmit_size() { - return Ok(vec![rpc]); - } - - let new_rpc = proto::RPC { - subscriptions: Vec::new(), - publish: Vec::new(), - control: None, - }; - - let mut rpc_list = vec![new_rpc.clone()]; - - // Gets an RPC if the object size will fit, otherwise create a new RPC. The last element - // will be the RPC to add an object. - macro_rules! create_or_add_rpc { - ($object_size: ident ) => { - let list_index = rpc_list.len() - 1; // the list is never empty - - // create a new RPC if the new object plus 5% of its size (for length prefix - // buffers) exceeds the max transmit size. - if rpc_list[list_index].get_size() + (($object_size as f64) * 1.05) as usize - > self.config.max_transmit_size() - && rpc_list[list_index] != new_rpc - { - // create a new rpc and use this as the current - rpc_list.push(new_rpc.clone()); - } - }; - } - - macro_rules! add_item { - ($object: ident, $type: ident ) => { - let object_size = $object.get_size(); - - if object_size + 2 > self.config.max_transmit_size() { - // This should not be possible. All received and published messages have already - // been vetted to fit within the size. - tracing::error!("Individual message too large to fragment"); - return Err(PublishError::MessageTooLarge); - } - - create_or_add_rpc!(object_size); - rpc_list - .last_mut() - .expect("Must have at least one element") - .$type - .push($object.clone()); - }; - } - - // Add messages until the limit - for message in &rpc.publish { - add_item!(message, publish); - } - for subscription in &rpc.subscriptions { - add_item!(subscription, subscriptions); - } - - // handle the control messages. If all are within the max_transmit_size, send them without - // fragmenting, otherwise, fragment the control messages - let empty_control = proto::ControlMessage::default(); - if let Some(control) = rpc.control.as_ref() { - if control.get_size() + 2 > self.config.max_transmit_size() { - // fragment the RPC - for ihave in &control.ihave { - let len = ihave.get_size(); - create_or_add_rpc!(len); - rpc_list - .last_mut() - .expect("Always an element") - .control - .get_or_insert_with(|| empty_control.clone()) - .ihave - .push(ihave.clone()); - } - for iwant in &control.iwant { - let len = iwant.get_size(); - create_or_add_rpc!(len); - rpc_list - .last_mut() - .expect("Always an element") - .control - .get_or_insert_with(|| empty_control.clone()) - .iwant - .push(iwant.clone()); - } - for graft in &control.graft { - let len = graft.get_size(); - create_or_add_rpc!(len); - rpc_list - .last_mut() - .expect("Always an element") - .control - .get_or_insert_with(|| empty_control.clone()) - .graft - .push(graft.clone()); - } - for prune in &control.prune { - let len = prune.get_size(); - create_or_add_rpc!(len); - rpc_list - .last_mut() - .expect("Always an element") - .control - .get_or_insert_with(|| empty_control.clone()) - .prune - .push(prune.clone()); - } - } else { - let len = control.get_size(); - create_or_add_rpc!(len); - rpc_list.last_mut().expect("Always an element").control = Some(control.clone()); + fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) { + if let Some(m) = self.metrics.as_mut() { + if let RpcOut::Publish(ref message) | RpcOut::Forward(ref message) = rpc { + // register bytes sent on the internal metrics. + m.msg_sent(&message.topic, message.raw_protobuf_len()); } } - Ok(rpc_list) + self.events.push_back(ToSwarm::NotifyHandler { + peer_id, + event: HandlerIn::Message(rpc), + handler: NotifyHandler::Any, + }); } fn on_connection_established( @@ -3050,46 +2791,27 @@ where .connections .push(connection_id); - if other_established == 0 { - // Ignore connections from blacklisted peers. - if self.blacklisted_peers.contains(&peer_id) { - tracing::debug!(peer=%peer_id, "Ignoring connection from blacklisted peer"); - } else { - tracing::debug!(peer=%peer_id, "New peer connected"); - // We need to send our subscriptions to the newly-connected node. - let mut subscriptions = vec![]; - for topic_hash in self.mesh.keys() { - subscriptions.push(Subscription { - topic_hash: topic_hash.clone(), - action: SubscriptionAction::Subscribe, - }); - } + if other_established > 0 { + return; // Not our first connection to this peer, hence nothing to do. + } - if !subscriptions.is_empty() { - // send our subscriptions to the peer - if self - .send_message( - peer_id, - Rpc { - messages: Vec::new(), - subscriptions, - control_msgs: Vec::new(), - } - .into_protobuf(), - ) - .is_err() - { - tracing::error!("Failed to send subscriptions, message too large"); - } - } - } + // Insert an empty set of the topics of this peer until known. + self.peer_topics.insert(peer_id, Default::default()); - // Insert an empty set of the topics of this peer until known. - self.peer_topics.insert(peer_id, Default::default()); + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.add_peer(peer_id); + } - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.add_peer(peer_id); - } + // Ignore connections from blacklisted peers. + if self.blacklisted_peers.contains(&peer_id) { + tracing::debug!(peer=%peer_id, "Ignoring connection from blacklisted peer"); + return; + } + + tracing::debug!(peer=%peer_id, "New peer connected"); + // We need to send our subscriptions to the newly-connected node. + for topic_hash in self.mesh.clone().into_keys() { + self.send_message(peer_id, RpcOut::Subscribe(topic_hash)); } } @@ -3552,7 +3274,7 @@ fn get_random_peers_dynamic( // if they exist, filter the peers by `f` Some(peer_list) => peer_list .iter() - .cloned() + .copied() .filter(|p| { f(p) && match connected_peers.get(p) { Some(connections) if connections.kind == PeerKind::Gossipsub => true, @@ -3659,17 +3381,8 @@ impl fmt::Debug for PublishConfig { mod local_test { use super::*; use crate::IdentTopic; - use asynchronous_codec::Encoder; use quickcheck::*; - fn empty_rpc() -> Rpc { - Rpc { - subscriptions: Vec::new(), - messages: Vec::new(), - control_msgs: Vec::new(), - } - } - fn test_message() -> RawMessage { RawMessage { source: Some(PeerId::random()), @@ -3682,13 +3395,6 @@ mod local_test { } } - fn test_subscription() -> Subscription { - Subscription { - action: SubscriptionAction::Subscribe, - topic_hash: IdentTopic::new("TestTopic").hash(), - } - } - fn test_control() -> ControlAction { ControlAction::IHave { topic_hash: IdentTopic::new("TestTopic").hash(), @@ -3696,117 +3402,16 @@ mod local_test { } } - impl Arbitrary for Rpc { + impl Arbitrary for RpcOut { fn arbitrary(g: &mut Gen) -> Self { - let mut rpc = empty_rpc(); - - for _ in 0..g.gen_range(0..10u8) { - rpc.subscriptions.push(test_subscription()); - } - for _ in 0..g.gen_range(0..10u8) { - rpc.messages.push(test_message()); - } - for _ in 0..g.gen_range(0..10u8) { - rpc.control_msgs.push(test_control()); - } - rpc - } - } - - #[test] - /// Tests RPC message fragmentation - fn test_message_fragmentation_deterministic() { - let max_transmit_size = 500; - let config = crate::config::ConfigBuilder::default() - .max_transmit_size(max_transmit_size) - .validation_mode(ValidationMode::Permissive) - .build() - .unwrap(); - let gs: Behaviour = Behaviour::new(MessageAuthenticity::RandomAuthor, config).unwrap(); - - // Message under the limit should be fine. - let mut rpc = empty_rpc(); - rpc.messages.push(test_message()); - - let mut rpc_proto = rpc.clone().into_protobuf(); - let fragmented_messages = gs.fragment_message(rpc_proto.clone()).unwrap(); - assert_eq!( - fragmented_messages, - vec![rpc_proto.clone()], - "Messages under the limit shouldn't be fragmented" - ); - - // Messages over the limit should be split - - while rpc_proto.get_size() < max_transmit_size { - rpc.messages.push(test_message()); - rpc_proto = rpc.clone().into_protobuf(); - } - - let fragmented_messages = gs - .fragment_message(rpc_proto) - .expect("Should be able to fragment the messages"); - - assert!( - fragmented_messages.len() > 1, - "the message should be fragmented" - ); - - // all fragmented messages should be under the limit - for message in fragmented_messages { - assert!( - message.get_size() < max_transmit_size, - "all messages should be less than the transmission size" - ); - } - } - - #[test] - fn test_message_fragmentation() { - fn prop(rpc: Rpc) { - let max_transmit_size = 500; - let config = crate::config::ConfigBuilder::default() - .max_transmit_size(max_transmit_size) - .validation_mode(ValidationMode::Permissive) - .build() - .unwrap(); - let gs: Behaviour = Behaviour::new(MessageAuthenticity::RandomAuthor, config).unwrap(); - - let mut codec = - crate::protocol::GossipsubCodec::new(max_transmit_size, ValidationMode::Permissive); - - let rpc_proto = rpc.into_protobuf(); - let fragmented_messages = gs - .fragment_message(rpc_proto.clone()) - .expect("Messages must be valid"); - - if rpc_proto.get_size() < max_transmit_size { - assert_eq!( - fragmented_messages.len(), - 1, - "the message should not be fragmented" - ); - } else { - assert!( - fragmented_messages.len() > 1, - "the message should be fragmented" - ); - } - - // all fragmented messages should be under the limit - for message in fragmented_messages { - assert!( - message.get_size() < max_transmit_size, - "all messages should be less than the transmission size: list size {} max size{}", message.get_size(), max_transmit_size - ); - - // ensure they can all be encoded - let mut buf = bytes::BytesMut::with_capacity(message.get_size()); - codec.encode(message, &mut buf).unwrap() + match u8::arbitrary(g) % 5 { + 0 => RpcOut::Subscribe(IdentTopic::new("TestTopic").hash()), + 1 => RpcOut::Unsubscribe(IdentTopic::new("TestTopic").hash()), + 2 => RpcOut::Publish(test_message()), + 3 => RpcOut::Forward(test_message()), + 4 => RpcOut::Control(test_control()), + _ => panic!("outside range"), } } - QuickCheck::new() - .max_tests(100) - .quickcheck(prop as fn(_) -> _) } } diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index cf24ed8d8dc..570cdf43f90 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -24,7 +24,9 @@ use super::*; use crate::subscription_filter::WhitelistSubscriptionFilter; use crate::transform::{DataTransform, IdentityTransform}; use crate::ValidationError; -use crate::{config::Config, config::ConfigBuilder, IdentTopic as Topic, TopicScoreParams}; +use crate::{ + config::Config, config::ConfigBuilder, types::Rpc, IdentTopic as Topic, TopicScoreParams, +}; use async_std::net::Ipv4Addr; use byteorder::{BigEndian, ByteOrder}; use libp2p_core::{ConnectedPoint, Endpoint}; @@ -402,26 +404,19 @@ fn test_subscribe() { let subscriptions = gs .events .iter() - .fold(vec![], |mut collected_subscriptions, e| match e { - ToSwarm::NotifyHandler { - event: HandlerIn::Message(ref message), - .. - } => { - for s in &message.subscriptions { - if let Some(true) = s.subscribe { - collected_subscriptions.push(s.clone()) - }; + .filter(|e| { + matches!( + e, + ToSwarm::NotifyHandler { + event: HandlerIn::Message(RpcOut::Subscribe(_)), + .. } - collected_subscriptions - } - _ => collected_subscriptions, - }); + ) + }) + .count(); // we sent a subscribe to all known peers - assert!( - subscriptions.len() == 20, - "Should send a subscription to all known peers" - ); + assert_eq!(subscriptions, 20); } #[test] @@ -470,26 +465,16 @@ fn test_unsubscribe() { let subscriptions = gs .events .iter() - .fold(vec![], |mut collected_subscriptions, e| match e { + .fold(0, |collected_subscriptions, e| match e { ToSwarm::NotifyHandler { - event: HandlerIn::Message(ref message), + event: HandlerIn::Message(RpcOut::Subscribe(_)), .. - } => { - for s in &message.subscriptions { - if let Some(true) = s.subscribe { - collected_subscriptions.push(s.clone()) - }; - } - collected_subscriptions - } + } => collected_subscriptions + 1, _ => collected_subscriptions, }); // we sent a unsubscribe to all known peers, for two topics - assert!( - subscriptions.len() == 40, - "Should send an unsubscribe event to all known peers" - ); + assert_eq!(subscriptions, 40); // check we clean up internal structures for topic_hash in &topic_hashes { @@ -657,16 +642,13 @@ fn test_publish_without_flood_publishing() { // Collect all publish messages let publishes = gs .events - .iter() + .into_iter() .fold(vec![], |mut collected_publish, e| match e { ToSwarm::NotifyHandler { - event: HandlerIn::Message(ref message), + event: HandlerIn::Message(RpcOut::Publish(message)), .. } => { - let event = proto_to_message(message); - for s in &event.messages { - collected_publish.push(s.clone()); - } + collected_publish.push(message); collected_publish } _ => collected_publish, @@ -747,16 +729,13 @@ fn test_fanout() { // Collect all publish messages let publishes = gs .events - .iter() + .into_iter() .fold(vec![], |mut collected_publish, e| match e { ToSwarm::NotifyHandler { - event: HandlerIn::Message(ref message), + event: HandlerIn::Message(RpcOut::Publish(message)), .. } => { - let event = proto_to_message(message); - for s in &event.messages { - collected_publish.push(s.clone()); - } + collected_publish.push(message); collected_publish } _ => collected_publish, @@ -798,37 +777,36 @@ fn test_inject_connected() { // check that our subscriptions are sent to each of the peers // collect all the SendEvents - let send_events: Vec<_> = gs + let subscriptions = gs .events - .iter() - .filter(|e| match e { + .into_iter() + .filter_map(|e| match e { ToSwarm::NotifyHandler { - event: HandlerIn::Message(ref m), + event: HandlerIn::Message(RpcOut::Subscribe(topic)), + peer_id, .. - } => !m.subscriptions.is_empty(), - _ => false, + } => Some((peer_id, topic)), + _ => None, }) - .collect(); + .fold( + HashMap::>::new(), + |mut subs, (peer, sub)| { + let mut peer_subs = subs.remove(&peer).unwrap_or_default(); + peer_subs.push(sub.into_string()); + subs.insert(peer, peer_subs); + subs + }, + ); // check that there are two subscriptions sent to each peer - for sevent in send_events.clone() { - if let ToSwarm::NotifyHandler { - event: HandlerIn::Message(ref m), - .. - } = sevent - { - assert!( - m.subscriptions.len() == 2, - "There should be two subscriptions sent to each peer (1 for each topic)." - ); - }; + for peer_subs in subscriptions.values() { + assert!(peer_subs.contains(&String::from("topic1"))); + assert!(peer_subs.contains(&String::from("topic2"))); + assert_eq!(peer_subs.len(), 2); } // check that there are 20 send events created - assert!( - send_events.len() == 20, - "There should be a subscription event sent to each peer." - ); + assert_eq!(subscriptions.len(), 20); // should add the new peers to `peer_topics` with an empty vec as a gossipsub node for peer in peers { @@ -1041,21 +1019,18 @@ fn test_handle_iwant_msg_cached() { gs.handle_iwant(&peers[7], vec![msg_id.clone()]); // the messages we are sending - let sent_messages = gs - .events - .iter() - .fold(vec![], |mut collected_messages, e| match e { + let sent_messages = gs.events.into_iter().fold( + Vec::::new(), + |mut collected_messages, e| match e { ToSwarm::NotifyHandler { event, .. } => { - if let HandlerIn::Message(ref m) = event { - let event = proto_to_message(m); - for c in &event.messages { - collected_messages.push(c.clone()) - } + if let HandlerIn::Message(RpcOut::Forward(message)) = event { + collected_messages.push(message); } collected_messages } _ => collected_messages, - }); + }, + ); assert!( sent_messages @@ -1104,15 +1079,14 @@ fn test_handle_iwant_msg_cached_shifted() { // is the message is being sent? let message_exists = gs.events.iter().any(|e| match e { ToSwarm::NotifyHandler { - event: HandlerIn::Message(ref m), + event: HandlerIn::Message(RpcOut::Forward(message)), .. } => { - let event = proto_to_message(m); - event - .messages - .iter() - .map(|msg| gs.data_transform.inbound_transform(msg.clone()).unwrap()) - .any(|msg| gs.config.message_id(&msg) == msg_id) + gs.config.message_id( + &gs.data_transform + .inbound_transform(message.clone()) + .unwrap(), + ) == msg_id } _ => false, }); @@ -1343,22 +1317,15 @@ fn count_control_msgs( .sum::() + gs.events .iter() - .map(|e| match e { + .filter(|e| match e { ToSwarm::NotifyHandler { peer_id, - event: HandlerIn::Message(ref m), + event: HandlerIn::Message(RpcOut::Control(action)), .. - } => { - let event = proto_to_message(m); - event - .control_msgs - .iter() - .filter(|m| filter(peer_id, m)) - .count() - } - _ => 0, + } => filter(peer_id, action), + _ => false, }) - .sum::() + .count() } fn flush_events(gs: &mut Behaviour) { @@ -1567,17 +1534,10 @@ fn do_forward_messages_to_explicit_peers() { .filter(|e| match e { ToSwarm::NotifyHandler { peer_id, - event: HandlerIn::Message(ref m), + event: HandlerIn::Message(RpcOut::Forward(m)), .. } => { - let event = proto_to_message(m); - peer_id == &peers[0] - && event - .messages - .iter() - .filter(|m| m.data == message.data) - .count() - > 0 + peer_id == &peers[0] && m.data == message.data } _ => false, }) @@ -2111,14 +2071,11 @@ fn test_flood_publish() { // Collect all publish messages let publishes = gs .events - .iter() + .into_iter() .fold(vec![], |mut collected_publish, e| match e { ToSwarm::NotifyHandler { event, .. } => { - if let HandlerIn::Message(ref m) = event { - let event = proto_to_message(m); - for s in &event.messages { - collected_publish.push(s.clone()); - } + if let HandlerIn::Message(RpcOut::Publish(message)) = event { + collected_publish.push(message); } collected_publish } @@ -2672,14 +2629,11 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { // the messages we are sending let sent_messages = gs .events - .iter() + .into_iter() .fold(vec![], |mut collected_messages, e| match e { ToSwarm::NotifyHandler { event, peer_id, .. } => { - if let HandlerIn::Message(ref m) = event { - let event = proto_to_message(m); - for c in &event.messages { - collected_messages.push((*peer_id, c.clone())) - } + if let HandlerIn::Message(RpcOut::Forward(message)) = event { + collected_messages.push((peer_id, message)); } collected_messages } @@ -2820,14 +2774,11 @@ fn test_do_not_publish_to_peer_below_publish_threshold() { // Collect all publish messages let publishes = gs .events - .iter() + .into_iter() .fold(vec![], |mut collected_publish, e| match e { ToSwarm::NotifyHandler { event, peer_id, .. } => { - if let HandlerIn::Message(ref m) = event { - let event = proto_to_message(m); - for s in &event.messages { - collected_publish.push((*peer_id, s.clone())); - } + if let HandlerIn::Message(RpcOut::Publish(message)) = event { + collected_publish.push((peer_id, message)); } collected_publish } @@ -2877,14 +2828,11 @@ fn test_do_not_flood_publish_to_peer_below_publish_threshold() { // Collect all publish messages let publishes = gs .events - .iter() + .into_iter() .fold(vec![], |mut collected_publish, e| match e { ToSwarm::NotifyHandler { event, peer_id, .. } => { - if let HandlerIn::Message(ref m) = event { - let event = proto_to_message(m); - for s in &event.messages { - collected_publish.push((*peer_id, s.clone())); - } + if let HandlerIn::Message(RpcOut::Publish(message)) = event { + collected_publish.push((peer_id, message)); } collected_publish } @@ -4403,17 +4351,14 @@ fn test_ignore_too_many_iwants_from_same_peer_for_same_message() { assert_eq!( gs.events .iter() - .map(|e| match e { + .filter(|e| matches!( + e, ToSwarm::NotifyHandler { - event: HandlerIn::Message(ref m), + event: HandlerIn::Message(RpcOut::Forward(_)), .. - } => { - let event = proto_to_message(m); - event.messages.len() } - _ => 0, - }) - .sum::(), + )) + .count(), config.gossip_retransimission() as usize, "not more then gossip_retransmission many messages get sent back" ); @@ -4815,11 +4760,8 @@ fn test_publish_to_floodsub_peers_without_flood_publish() { .fold(vec![], |mut collected_publish, e| match e { ToSwarm::NotifyHandler { peer_id, event, .. } => { if peer_id == &p1 || peer_id == &p2 { - if let HandlerIn::Message(ref m) = event { - let event = proto_to_message(m); - for s in &event.messages { - collected_publish.push(s.clone()); - } + if let HandlerIn::Message(RpcOut::Publish(message)) = event { + collected_publish.push(message); } } collected_publish @@ -4872,11 +4814,8 @@ fn test_do_not_use_floodsub_in_fanout() { .fold(vec![], |mut collected_publish, e| match e { ToSwarm::NotifyHandler { peer_id, event, .. } => { if peer_id == &p1 || peer_id == &p2 { - if let HandlerIn::Message(ref m) = event { - let event = proto_to_message(m); - for s in &event.messages { - collected_publish.push(s.clone()); - } + if let HandlerIn::Message(RpcOut::Publish(message)) = event { + collected_publish.push(message); } } collected_publish @@ -5122,7 +5061,7 @@ fn test_subscribe_and_graft_with_negative_score() { p2, connection_id, HandlerEvent::Message { - rpc: proto_to_message(&message), + rpc: proto_to_message(&message.into_protobuf()), invalid_messages: vec![], }, ); diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index e2ec681321c..e91f81776e7 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -20,7 +20,7 @@ use crate::protocol::{GossipsubCodec, ProtocolConfig}; use crate::rpc_proto::proto; -use crate::types::{PeerKind, RawMessage, Rpc}; +use crate::types::{PeerKind, RawMessage, Rpc, RpcOut}; use crate::ValidationError; use asynchronous_codec::Framed; use futures::future::Either; @@ -58,10 +58,11 @@ pub enum HandlerEvent { } /// A message sent from the behaviour to the handler. +#[allow(clippy::large_enum_variant)] #[derive(Debug)] pub enum HandlerIn { /// A gossipsub message to send. - Message(proto::RPC), + Message(RpcOut), /// The peer has joined the mesh. JoinedMesh, /// The peer has left the mesh. @@ -408,7 +409,7 @@ impl ConnectionHandler for Handler { fn on_behaviour_event(&mut self, message: HandlerIn) { match self { Handler::Enabled(handler) => match message { - HandlerIn::Message(m) => handler.send_queue.push(m), + HandlerIn::Message(m) => handler.send_queue.push(m.into_protobuf()), HandlerIn::JoinedMesh => { handler.in_mesh = true; } diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index 42d43c97510..e9600a4d8d8 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -577,7 +577,7 @@ mod tests { let message = message.0; let rpc = Rpc { - messages: vec![message], + messages: vec![message.clone()], subscriptions: vec![], control_msgs: vec![], }; @@ -591,7 +591,7 @@ mod tests { HandlerEvent::Message { mut rpc, .. } => { rpc.messages[0].validated = true; - assert_eq!(rpc, rpc); + assert_eq!(vec![message], rpc.messages); } _ => panic!("Must decode a message"), } diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index 196468b8d32..d1b92ff0ba8 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -132,6 +132,19 @@ impl RawMessage { } } +impl From for proto::Message { + fn from(raw: RawMessage) -> Self { + proto::Message { + from: raw.source.map(|m| m.to_bytes()), + data: Some(raw.data), + seqno: raw.sequence_number.map(|s| s.to_be_bytes().to_vec()), + topic: TopicHash::into_string(raw.topic), + signature: raw.signature, + key: raw.key, + } + } +} + /// The message sent to the user after a [`RawMessage`] has been transformed by a /// [`crate::DataTransform`]. #[derive(Clone, PartialEq, Eq, Hash)] @@ -220,6 +233,130 @@ pub enum ControlAction { }, } +/// A Gossipsub RPC message sent. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum RpcOut { + /// Publish a Gossipsub message on network. + Publish(RawMessage), + /// Forward a Gossipsub message to the network. + Forward(RawMessage), + /// Subscribe a topic. + Subscribe(TopicHash), + /// Unsubscribe a topic. + Unsubscribe(TopicHash), + /// List of Gossipsub control messages. + Control(ControlAction), +} + +impl RpcOut { + /// Converts the GossipsubRPC into its protobuf format. + // A convenience function to avoid explicitly specifying types. + pub fn into_protobuf(self) -> proto::RPC { + self.into() + } +} + +impl From for proto::RPC { + /// Converts the RPC into protobuf format. + fn from(rpc: RpcOut) -> Self { + match rpc { + RpcOut::Publish(message) => proto::RPC { + subscriptions: Vec::new(), + publish: vec![message.into()], + control: None, + }, + RpcOut::Forward(message) => proto::RPC { + publish: vec![message.into()], + subscriptions: Vec::new(), + control: None, + }, + RpcOut::Subscribe(topic) => proto::RPC { + publish: Vec::new(), + subscriptions: vec![proto::SubOpts { + subscribe: Some(true), + topic_id: Some(topic.into_string()), + }], + control: None, + }, + RpcOut::Unsubscribe(topic) => proto::RPC { + publish: Vec::new(), + subscriptions: vec![proto::SubOpts { + subscribe: Some(false), + topic_id: Some(topic.into_string()), + }], + control: None, + }, + RpcOut::Control(ControlAction::IHave { + topic_hash, + message_ids, + }) => proto::RPC { + publish: Vec::new(), + subscriptions: Vec::new(), + control: Some(proto::ControlMessage { + ihave: vec![proto::ControlIHave { + topic_id: Some(topic_hash.into_string()), + message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(), + }], + iwant: vec![], + graft: vec![], + prune: vec![], + }), + }, + RpcOut::Control(ControlAction::IWant { message_ids }) => proto::RPC { + publish: Vec::new(), + subscriptions: Vec::new(), + control: Some(proto::ControlMessage { + ihave: vec![], + iwant: vec![proto::ControlIWant { + message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(), + }], + graft: vec![], + prune: vec![], + }), + }, + RpcOut::Control(ControlAction::Graft { topic_hash }) => proto::RPC { + publish: Vec::new(), + subscriptions: vec![], + control: Some(proto::ControlMessage { + ihave: vec![], + iwant: vec![], + graft: vec![proto::ControlGraft { + topic_id: Some(topic_hash.into_string()), + }], + prune: vec![], + }), + }, + RpcOut::Control(ControlAction::Prune { + topic_hash, + peers, + backoff, + }) => { + proto::RPC { + publish: Vec::new(), + subscriptions: vec![], + control: Some(proto::ControlMessage { + ihave: vec![], + iwant: vec![], + graft: vec![], + prune: vec![proto::ControlPrune { + topic_id: Some(topic_hash.into_string()), + peers: peers + .into_iter() + .map(|info| proto::PeerInfo { + peer_id: info.peer_id.map(|id| id.to_bytes()), + // TODO, see https://github.com/libp2p/specs/pull/217 + signed_peer_record: None, + }) + .collect(), + backoff, + }], + }), + } + } + } + } +} + /// An RPC received/sent. #[derive(Clone, PartialEq, Eq, Hash)] pub struct Rpc {