Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Forward only verified and new HeaderSub messages #89

Merged
merged 2 commits into from
Oct 3, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 87 additions & 25 deletions node/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use libp2p::{
Multiaddr, PeerId, TransportError,
};
use tokio::select;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::{mpsc, oneshot, watch};
use tracing::{debug, info, instrument, trace, warn};

use crate::exchange::{ExchangeBehaviour, ExchangeConfig};
Expand Down Expand Up @@ -76,6 +76,7 @@ impl From<oneshot::error::RecvError> for P2pError {
#[derive(Debug)]
pub struct P2p<S> {
cmd_tx: mpsc::Sender<P2pCmd>,
header_sub_watcher: watch::Receiver<Option<ExtendedHeader>>,
_store: PhantomData<S>,
}

Expand Down Expand Up @@ -120,14 +121,16 @@ where

async fn start(args: P2pArgs<S>) -> Result<Self, P2pError> {
let (cmd_tx, cmd_rx) = mpsc::channel(16);
let mut worker = Worker::new(args, cmd_rx)?;
let (header_sub_tx, header_sub_rx) = watch::channel(None);
let mut worker = Worker::new(args, cmd_rx, header_sub_tx)?;

spawn(async move {
worker.run().await;
});

Ok(P2p {
cmd_tx,
header_sub_watcher: header_sub_rx,
_store: PhantomData,
})
}
Expand All @@ -151,6 +154,8 @@ pub trait P2pService:
{
type Store: Store;

fn new_header_sub_watcher(&self) -> watch::Receiver<Option<ExtendedHeader>>;

async fn wait_connected(&self) -> Result<()> {
let (tx, rx) = oneshot::channel();

Expand Down Expand Up @@ -254,6 +259,10 @@ where
S: Store + 'static,
{
type Store = S;

fn new_header_sub_watcher(&self) -> watch::Receiver<Option<ExtendedHeader>> {
self.header_sub_watcher.clone()
}
}

/// Our network behaviour.
Expand All @@ -280,13 +289,19 @@ where
cmd_rx: mpsc::Receiver<P2pCmd>,
peer_tracker: Arc<PeerTracker>,
wait_connected_tx: Option<Vec<oneshot::Sender<()>>>,
header_sub_watcher: watch::Sender<Option<ExtendedHeader>>,
store: Arc<S>,
}

impl<S> Worker<S>
where
S: Store + 'static,
{
fn new(args: P2pArgs<S>, cmd_rx: mpsc::Receiver<P2pCmd>) -> Result<Self, P2pError> {
fn new(
args: P2pArgs<S>,
cmd_rx: mpsc::Receiver<P2pCmd>,
header_sub_watcher: watch::Sender<Option<ExtendedHeader>>,
) -> Result<Self, P2pError> {
let peer_tracker = Arc::new(PeerTracker::new());
let local_peer_id = PeerId::from(args.local_keypair.public());

Expand All @@ -307,7 +322,7 @@ where
let header_ex = ExchangeBehaviour::new(ExchangeConfig {
network_id: &args.network_id,
peer_tracker: peer_tracker.clone(),
header_store: args.store,
header_store: args.store.clone(),
});

let behaviour = Behaviour {
Expand Down Expand Up @@ -337,6 +352,8 @@ where
header_sub_topic_hash: header_sub_topic.hash(),
peer_tracker,
wait_connected_tx: None,
header_sub_watcher,
store: args.store.clone(),
})
}

Expand Down Expand Up @@ -364,7 +381,7 @@ where
match ev {
SwarmEvent::Behaviour(ev) => match ev {
BehaviourEvent::Identify(ev) => self.on_identify_event(ev).await?,
BehaviourEvent::Gossipsub(ev) => self.on_gossip_sub_event(ev).await?,
BehaviourEvent::Gossipsub(ev) => self.on_gossip_sub_event(ev).await,
BehaviourEvent::Kademlia(ev) => self.on_kademlia_event(ev).await?,
BehaviourEvent::Autonat(_)
| BehaviourEvent::KeepAlive(_)
Expand Down Expand Up @@ -450,24 +467,36 @@ where
}

#[instrument(level = "trace", skip(self))]
async fn on_gossip_sub_event(&mut self, ev: gossipsub::Event) -> Result<()> {
async fn on_gossip_sub_event(&mut self, ev: gossipsub::Event) {
match ev {
gossipsub::Event::Message { message, .. } => {
gossipsub::Event::Message {
message,
message_id,
..
} => {
let Some(peer) = message.source else {
// Validation mode is `strict` so this will never happen
return;
};

// We may discovered a new peer
if let Some(peer) = message.source {
self.on_peer_discovered(peer);
}
self.peer_maybe_discovered(peer);

if message.topic == self.header_sub_topic_hash {
self.on_header_sub_message(&message.data[..]);
let acceptance = if message.topic == self.header_sub_topic_hash {
self.on_header_sub_message(&message.data[..]).await
} else {
trace!("Unhandled gossipsub message");
}
gossipsub::MessageAcceptance::Ignore
};

let _ = self
.swarm
.behaviour_mut()
.gossipsub
.report_message_validation_result(&message_id, &peer, acceptance);
}
_ => trace!("Unhandled gossipsub event"),
}

Ok(())
}

#[instrument(level = "trace", skip(self))]
Expand All @@ -492,7 +521,7 @@ where
}

#[instrument(skip_all, fields(peer_id = %peer_id))]
fn on_peer_discovered(&mut self, peer_id: PeerId) {
fn peer_maybe_discovered(&mut self, peer_id: PeerId) {
if !self.peer_tracker.maybe_discovered(peer_id) {
return;
}
Expand Down Expand Up @@ -547,17 +576,45 @@ where
}

#[instrument(skip_all)]
fn on_header_sub_message(&mut self, data: &[u8]) {
async fn on_header_sub_message(&mut self, data: &[u8]) -> gossipsub::MessageAcceptance {
let Ok(header) = ExtendedHeader::decode_and_validate(data) else {
trace!("Malformed or invalid header from header-sub");
return;
return gossipsub::MessageAcceptance::Reject;
};

debug!("New header from header-sub ({header})");
// TODO: inform syncer about it
//
// TODO: Verify header against store. If it's verified propagate
// it to other peers. If not, discard it.
trace!("Received header from header-sub ({header})");

// We care only for headers newer than head
let Ok(head) = self.store.get_head().await else {
trace!("Store not initialized yet. Ignore header from header-sub ({header}).");
return gossipsub::MessageAcceptance::Ignore;
};

// Verify header
if head.verify(&header).is_err() {
trace!("Verification failed. Ignore header from header-sub ({header}).");
return gossipsub::MessageAcceptance::Ignore;
}

// Update it only if needed
let updated = self.header_sub_watcher.send_if_modified(|state| {
if let Some(known_header) = state {
if known_header.height() >= header.height() {
trace!("Header-sub already up-to-date. Ignore header ({header}).");
return false;
}
}

debug!("New header from header-sub ({header})");
*state = Some(header);
true
});

if updated {
gossipsub::MessageAcceptance::Accept
} else {
gossipsub::MessageAcceptance::Ignore
zvolin marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

Expand All @@ -569,10 +626,15 @@ fn init_gossipsub<'a, S>(
// Here we expect the publisher to sign the message with their key.
let message_authenticity = gossipsub::MessageAuthenticity::Signed(args.local_keypair.clone());

let config = gossipsub::ConfigBuilder::default()
.validation_mode(gossipsub::ValidationMode::Strict)
.validate_messages()
.build()
.map_err(P2pError::GossipsubInit)?;

// build a gossipsub network behaviour
let mut gossipsub: gossipsub::Behaviour =
gossipsub::Behaviour::new(message_authenticity, gossipsub::Config::default())
.map_err(P2pError::GossipsubInit)?;
gossipsub::Behaviour::new(message_authenticity, config).map_err(P2pError::GossipsubInit)?;

for topic in topics {
gossipsub.subscribe(topic)?;
Expand Down
Loading