Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track RPC events and emit them to interested parties #660

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion daemon/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,14 @@ pub mod gossip;
pub mod include;

mod run_state;
pub use run_state::{config as run_config, Config as RunConfig, Event, Status, WaitingRoomEvent};
pub use run_state::{
config as run_config,
Config as RunConfig,
Event,
NetworkDiagnosticEvent,
Status,
WaitingRoomEvent,
};

mod subroutines;
use subroutines::Subroutines;
Expand Down Expand Up @@ -161,6 +168,7 @@ where
let (addrs_tx, addrs_rx) = watch::channel(vec![]);

let protocol_events = peer.subscribe().boxed();
let network_diagnostic_events = peer.subscribe_to_diagnostic_events().boxed();
let subroutines = Subroutines::new(
peer.clone(),
addrs_rx,
Expand All @@ -169,6 +177,7 @@ where
protocol_events,
subscriber,
control_receiver,
network_diagnostic_events,
)
.run()
.fuse()
Expand Down
11 changes: 10 additions & 1 deletion daemon/src/peer/run_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::{

use serde::Serialize;

pub use librad::net::protocol::event::NetworkDiagnosticEvent;
use librad::{
git::Urn,
net::{
Expand Down Expand Up @@ -88,6 +89,8 @@ pub enum Event {
},
/// A state change occurred in the waiting room
WaitingRoomTransition(WaitingRoomTransition<SystemTime>),
/// Logs of sent and received RPC messages
NetworkDiagnostic(NetworkDiagnosticEvent),
}

impl From<WaitingRoomTransition<SystemTime>> for Event {
Expand All @@ -96,6 +99,12 @@ impl From<WaitingRoomTransition<SystemTime>> for Event {
}
}

impl From<NetworkDiagnosticEvent> for Event {
fn from(event: NetworkDiagnosticEvent) -> Self {
Self::NetworkDiagnostic(event)
}
}

impl MaybeFrom<&Input> for Event {
fn maybe_from(input: &Input) -> Option<Self> {
match input {
Expand Down Expand Up @@ -209,6 +218,7 @@ impl RunState {
Input::PeerSync(peer_sync_input) => self.handle_peer_sync(&peer_sync_input),
Input::Request(request_input) => self.handle_request(request_input),
Input::Stats(stats_input) => self.handle_stats(stats_input),
Input::NetworkDiagnostic(ev) => vec![Command::EmitEvent(ev.into())],
};

log::trace!("TRANSITION END: {:?} {:?}", self.status, cmds);
Expand Down Expand Up @@ -318,7 +328,6 @@ impl RunState {
result,
} => {
cmds.extend(self.waiting_room.found(&urn, peer_id, SystemTime::now()));

if let PutResult::Applied(_) = result {
cmds.push(Command::Include(urn));
}
Expand Down
8 changes: 7 additions & 1 deletion daemon/src/peer/run_state/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ use std::{net::SocketAddr, time::SystemTime};

use tokio::sync::oneshot;

use librad::{git::Urn, net, net::peer::ProtocolEvent, peer::PeerId};
use librad::{
git::Urn,
net,
net::{peer::ProtocolEvent, protocol::event::NetworkDiagnosticEvent},
peer::PeerId,
};

use crate::{
peer::announcement,
Expand All @@ -31,6 +36,7 @@ pub enum Input {
/// the network.
Request(Request),
Stats(Stats),
NetworkDiagnostic(NetworkDiagnosticEvent),
}

/// Announcement subroutine lifecycle events.
Expand Down
23 changes: 23 additions & 0 deletions daemon/src/peer/subroutines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//! Management of peer subroutine tasks driven by advancing the core state
//! machine with a stream of inputs, producing commands.

use librad::net::protocol::event::NetworkDiagnosticEvent;
use std::{
net::SocketAddr,
time::{Duration, SystemTime},
Expand Down Expand Up @@ -69,6 +70,7 @@ pub struct Subroutines {

impl Subroutines {
/// Constructs a new subroutines manager.
#[allow(clippy::too_many_arguments)]
pub fn new(
peer: net::peer::Peer<BoxedSigner>,
mut listen_addrs: watch::Receiver<Vec<SocketAddr>>,
Expand All @@ -77,6 +79,10 @@ impl Subroutines {
protocol_events: BoxStream<'static, Result<ProtocolEvent, net::protocol::RecvError>>,
subscriber: broadcast::Sender<Event>,
mut control_receiver: mpsc::Receiver<control::Request>,
network_diagnostic_events: BoxStream<
'static,
Result<NetworkDiagnosticEvent, net::protocol::RecvError>,
>,
) -> Self {
let announce_timer = if run_config.announce.interval.is_zero() {
None
Expand Down Expand Up @@ -126,6 +132,23 @@ impl Subroutines {
.boxed(),
);

coalesced.push(
network_diagnostic_events
.filter_map(|res| async move {
match res {
Ok(ev) => Some(Input::NetworkDiagnostic(ev)),
Err(err) => {
log::warn!(
"receive error for network diagnostic event log: {}",
err
);
None
},
}
})
.boxed(),
);

coalesced.push(
stream! {
while listen_addrs.changed().await.is_ok() {
Expand Down
2 changes: 1 addition & 1 deletion librad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ features = ["tls-rustls"]

[dependencies.radicle-data]
path = "../data"
features = ["minicbor"]
features = ["minicbor", "serde"]

[dependencies.radicle-git-ext]
path = "../git-ext"
Expand Down
8 changes: 7 additions & 1 deletion librad/src/net/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures_timer::Delay;
use thiserror::Error;
use tokio::task::spawn_blocking;

use super::protocol::{self, gossip};
use super::protocol::{self, event::NetworkDiagnosticEvent, gossip};
use crate::{
git::{self, storage::Fetchers, Urn},
signer::Signer,
Expand Down Expand Up @@ -240,6 +240,12 @@ where
self.phone.subscribe()
}

pub fn subscribe_to_diagnostic_events(
&self,
) -> impl futures::Stream<Item = Result<NetworkDiagnosticEvent, protocol::RecvError>> {
self.phone.subscribe_diagnostic_events()
}

/// Borrow a [`git::storage::Storage`] from the pool, and run a blocking
/// computation on it.
pub async fn using_storage<F, A>(&self, blocking: F) -> Result<A, StorageError>
Expand Down
1 change: 1 addition & 0 deletions librad/src/net/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ pub fn accept<Store, Disco>(
state,
incoming,
periodic,
..
}: Bound<Store>,
disco: Disco,
) -> impl Future<Output = Result<!, quic::Error>>
Expand Down
4 changes: 3 additions & 1 deletion librad/src/net/protocol/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use crate::PeerId;
mod storage;
pub use storage::{LocalStorage, PutResult};

#[derive(Clone, Debug, PartialEq, minicbor::Encode, minicbor::Decode)]
use serde::Serialize;

#[derive(Clone, Debug, PartialEq, minicbor::Encode, minicbor::Decode, Serialize)]
pub enum Message<Addr, Payload> {
#[n(0)]
#[cbor(array)]
Expand Down
58 changes: 57 additions & 1 deletion librad/src/net/protocol/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@

use std::{collections::HashMap, net::SocketAddr};

use super::{broadcast, error, gossip, interrogation, membership};
use super::{broadcast, error, gossip, interrogation, io::Rpc, membership};
use crate::PeerId;

use serde::Serialize;

#[derive(Clone)]
pub enum Downstream {
Gossip(downstream::Gossip),
Expand Down Expand Up @@ -169,3 +171,57 @@ pub mod upstream {
}
}
}

#[derive(Clone, Debug, Serialize)]
#[serde(tag = "type", rename_all = "camelCase")]
pub enum NetworkDiagnosticEvent {
GossipSent {
to: SocketAddr,
message: broadcast::Message<SocketAddr, gossip::Payload>,
},
GossipReceived {
from: SocketAddr,
message: broadcast::Message<SocketAddr, gossip::Payload>,
},
HpvSent {
to: SocketAddr,
message: membership::Message<SocketAddr>,
},
HpvReceived {
from: SocketAddr,
message: membership::Message<SocketAddr>,
},
}

impl NetworkDiagnosticEvent {
pub(crate) fn hpv_sent(
to: SocketAddr,
message: membership::Message<SocketAddr>,
) -> NetworkDiagnosticEvent {
NetworkDiagnosticEvent::HpvSent { to, message }
}

pub(crate) fn hpv_received(
from: SocketAddr,
message: membership::Message<SocketAddr>,
) -> NetworkDiagnosticEvent {
NetworkDiagnosticEvent::HpvReceived { from, message }
}

pub(crate) fn gossip_received(
from: SocketAddr,
message: broadcast::Message<SocketAddr, gossip::Payload>,
) -> NetworkDiagnosticEvent {
NetworkDiagnosticEvent::GossipReceived { from, message }
}

pub(crate) fn rpc_sent(
to: SocketAddr,
rpc: Rpc<SocketAddr, gossip::Payload>,
) -> NetworkDiagnosticEvent {
match rpc {
Rpc::Membership(message) => NetworkDiagnosticEvent::HpvSent { to, message },
Rpc::Gossip(message) => NetworkDiagnosticEvent::GossipSent { to, message },
}
}
}
15 changes: 14 additions & 1 deletion librad/src/net/protocol/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use minicbor::{Decode, Decoder, Encode, Encoder};

use crate::{identities::git::Urn, peer::PeerId};

use serde::Serialize;

#[derive(Clone, Debug, PartialEq)]
pub enum Rev {
Git(git2::Oid),
Expand Down Expand Up @@ -43,8 +45,19 @@ impl<'de> Decode<'de> for Rev {
}
}

impl Serialize for Rev {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match self {
Self::Git(oid) => serializer.serialize_str(oid.to_string().as_str()),
}
}
}

/// The gossip payload type
#[derive(Clone, Debug, PartialEq, Encode, Decode)]
#[derive(Clone, Debug, PartialEq, Encode, Decode, Serialize)]
#[cbor(array)]
pub struct Payload {
/// URN of an updated or wanted repo.
Expand Down
7 changes: 4 additions & 3 deletions librad/src/net/protocol/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ use std::{collections::BTreeSet, convert::TryFrom, option::NoneError};

use data::BoundedVec;
use minicbor::{Decode, Encode};
use serde::Serialize;
use typenum::U16;

use crate::peer::PeerId;

#[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd, Encode, Decode)]
#[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd, Encode, Decode, Serialize)]
#[repr(u8)]
pub enum Capability {
#[n(0)]
Expand Down Expand Up @@ -86,7 +87,7 @@ impl<Addr> From<PeerInfo<Addr>> for (PeerId, Vec<Addr>) {
}
}

#[derive(Debug, Clone, PartialEq, Encode)]
#[derive(Debug, Clone, PartialEq, Encode, Serialize)]
#[cbor(array)]
pub struct GenericPeerInfo<Addr, T> {
#[n(0)]
Expand Down Expand Up @@ -159,7 +160,7 @@ impl<'__b777, Addr: minicbor::Decode<'__b777>, T: minicbor::Decode<'__b777>>
})
}
}
#[derive(Debug, Clone, PartialEq, Encode)]
#[derive(Debug, Clone, PartialEq, Encode, Serialize)]
#[cbor(array)]
pub struct PeerAdvertisement<Addr> {
#[n(0)]
Expand Down
12 changes: 7 additions & 5 deletions librad/src/net/protocol/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// This file is part of radicle-link, distributed under the GPLv3 with Radicle
// Linking Exception. For full terms see the included LICENSE file.

use crate::net::protocol::event::NetworkDiagnosticEvent;
use std::{iter, net::SocketAddr};

use data::BoundedVec;
Expand Down Expand Up @@ -45,11 +46,12 @@ where
}

if let Some((conn, ingress)) = connect(&state.endpoint, peer, addrs).await {
let rpc_sent = send_rpc::<_, ()>(
&conn,
state.membership.hello(peer_advertisement(&state.endpoint)),
)
.await;
let msg = state.membership.hello(peer_advertisement(&state.endpoint));
let rpc: Rpc<_, ()> = msg.clone().into();
let rpc_sent = send_rpc::<_, ()>(&conn, rpc).await;
state
.phone
.emit_diagnostic_event(NetworkDiagnosticEvent::hpv_sent(conn.remote_addr(), msg));

match rpc_sent {
Err(e) => tracing::warn!(err = ?e, "failed to send membership hello"),
Expand Down
10 changes: 8 additions & 2 deletions librad/src/net/protocol/io/recv/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use futures_codec::FramedRead;

use crate::{
net::{
connection::RemotePeer,
connection::{RemoteAddr, RemotePeer},
protocol::{
broadcast,
event,
Expand All @@ -35,7 +35,7 @@ pub(in crate::net::protocol) async fn gossip<S, T>(
stream: Upgraded<upgrade::Gossip, T>,
) where
S: ProtocolStorage<SocketAddr, Update = gossip::Payload> + Clone + 'static,
T: RemotePeer + AsyncRead + Unpin,
T: RemotePeer + RemoteAddr<Addr = SocketAddr> + AsyncRead + Unpin,
{
let mut recv = FramedRead::new(stream.into_stream(), codec::Gossip::new());
let remote_id = recv.remote_peer_id();
Expand All @@ -58,6 +58,12 @@ pub(in crate::net::protocol) async fn gossip<S, T>(
advertised_info: peer_advertisement(&state.endpoint),
seen_addrs: iter::empty().into(),
};
state
.phone
.emit_diagnostic_event(event::NetworkDiagnosticEvent::gossip_received(
recv.remote_addr(),
msg.clone(),
));
match broadcast::apply(
&state.membership,
&state.storage,
Expand Down
Loading