From 631171158a7566971151813e3bfb46df0f27b311 Mon Sep 17 00:00:00 2001 From: Alex Good Date: Tue, 30 Mar 2021 15:57:50 +0100 Subject: [PATCH] Track RPC events and emit them to interested parties It is useful when debugging to be able to track network events as they occur. This is tricky to do by just tailing the logs as it's quite noisy. This PR adds a `NetworkDiagnosticEvent` enum which is emitted to a channel in `TinCans::diagnostic_events`. Any time we receive or send an RPC message we send it to this channel. This allows interested parties to subscribe to the channel and display RPC events for diagnostic purposes. --- daemon/src/peer.rs | 11 +++- daemon/src/peer/run_state.rs | 11 +++- daemon/src/peer/run_state/input.rs | 8 ++- daemon/src/peer/subroutines.rs | 23 ++++++++ librad/Cargo.toml | 2 +- librad/src/net/peer.rs | 8 ++- librad/src/net/protocol.rs | 1 + librad/src/net/protocol/broadcast.rs | 4 +- librad/src/net/protocol/event.rs | 58 ++++++++++++++++++- librad/src/net/protocol/gossip.rs | 15 ++++- librad/src/net/protocol/info.rs | 7 ++- librad/src/net/protocol/io.rs | 12 ++-- librad/src/net/protocol/io/recv/gossip.rs | 10 +++- librad/src/net/protocol/io/recv/membership.rs | 7 +++ librad/src/net/protocol/io/send/rpc.rs | 2 +- librad/src/net/protocol/membership/rpc.rs | 4 +- librad/src/net/protocol/tick.rs | 25 ++++++-- librad/src/net/protocol/tincans.rs | 14 +++++ 18 files changed, 196 insertions(+), 26 deletions(-) diff --git a/daemon/src/peer.rs b/daemon/src/peer.rs index d2c49ec10..fef72e18e 100644 --- a/daemon/src/peer.rs +++ b/daemon/src/peer.rs @@ -31,7 +31,14 @@ pub mod gossip; pub mod include; mod run_state; -pub use run_state::{config as run_config, Config as RunConfig, Event, Status, WaitingRoomEvent}; +pub use run_state::{ + config as run_config, + Config as RunConfig, + Event, + NetworkDiagnosticEvent, + Status, + WaitingRoomEvent, +}; mod subroutines; use subroutines::Subroutines; @@ -161,6 +168,7 @@ where let (addrs_tx, addrs_rx) = watch::channel(vec![]); let protocol_events = peer.subscribe().boxed(); + let network_diagnostic_events = peer.subscribe_to_diagnostic_events().boxed(); let subroutines = Subroutines::new( peer.clone(), addrs_rx, @@ -169,6 +177,7 @@ where protocol_events, subscriber, control_receiver, + network_diagnostic_events, ) .run() .fuse() diff --git a/daemon/src/peer/run_state.rs b/daemon/src/peer/run_state.rs index 552260a30..46757468f 100644 --- a/daemon/src/peer/run_state.rs +++ b/daemon/src/peer/run_state.rs @@ -13,6 +13,7 @@ use std::{ use serde::Serialize; +pub use librad::net::protocol::event::NetworkDiagnosticEvent; use librad::{ git::Urn, net::{ @@ -88,6 +89,8 @@ pub enum Event { }, /// A state change occurred in the waiting room WaitingRoomTransition(WaitingRoomTransition), + /// Logs of sent and received RPC messages + NetworkDiagnostic(NetworkDiagnosticEvent), } impl From> for Event { @@ -96,6 +99,12 @@ impl From> for Event { } } +impl From for Event { + fn from(event: NetworkDiagnosticEvent) -> Self { + Self::NetworkDiagnostic(event) + } +} + impl MaybeFrom<&Input> for Event { fn maybe_from(input: &Input) -> Option { match input { @@ -209,6 +218,7 @@ impl RunState { Input::PeerSync(peer_sync_input) => self.handle_peer_sync(&peer_sync_input), Input::Request(request_input) => self.handle_request(request_input), Input::Stats(stats_input) => self.handle_stats(stats_input), + Input::NetworkDiagnostic(ev) => vec![Command::EmitEvent(ev.into())], }; log::trace!("TRANSITION END: {:?} {:?}", self.status, cmds); @@ -318,7 +328,6 @@ impl RunState { result, } => { cmds.extend(self.waiting_room.found(&urn, peer_id, SystemTime::now())); - if let PutResult::Applied(_) = result { cmds.push(Command::Include(urn)); } diff --git a/daemon/src/peer/run_state/input.rs b/daemon/src/peer/run_state/input.rs index a345c244f..94ece1006 100644 --- a/daemon/src/peer/run_state/input.rs +++ b/daemon/src/peer/run_state/input.rs @@ -7,7 +7,12 @@ use std::{net::SocketAddr, time::SystemTime}; use tokio::sync::oneshot; -use librad::{git::Urn, net, net::peer::ProtocolEvent, peer::PeerId}; +use librad::{ + git::Urn, + net, + net::{peer::ProtocolEvent, protocol::event::NetworkDiagnosticEvent}, + peer::PeerId, +}; use crate::{ peer::announcement, @@ -31,6 +36,7 @@ pub enum Input { /// the network. Request(Request), Stats(Stats), + NetworkDiagnostic(NetworkDiagnosticEvent), } /// Announcement subroutine lifecycle events. diff --git a/daemon/src/peer/subroutines.rs b/daemon/src/peer/subroutines.rs index c8607591f..e65c0d8d0 100644 --- a/daemon/src/peer/subroutines.rs +++ b/daemon/src/peer/subroutines.rs @@ -6,6 +6,7 @@ //! Management of peer subroutine tasks driven by advancing the core state //! machine with a stream of inputs, producing commands. +use librad::net::protocol::event::NetworkDiagnosticEvent; use std::{ net::SocketAddr, time::{Duration, SystemTime}, @@ -69,6 +70,7 @@ pub struct Subroutines { impl Subroutines { /// Constructs a new subroutines manager. + #[allow(clippy::too_many_arguments)] pub fn new( peer: net::peer::Peer, mut listen_addrs: watch::Receiver>, @@ -77,6 +79,10 @@ impl Subroutines { protocol_events: BoxStream<'static, Result>, subscriber: broadcast::Sender, mut control_receiver: mpsc::Receiver, + network_diagnostic_events: BoxStream< + 'static, + Result, + >, ) -> Self { let announce_timer = if run_config.announce.interval.is_zero() { None @@ -126,6 +132,23 @@ impl Subroutines { .boxed(), ); + coalesced.push( + network_diagnostic_events + .filter_map(|res| async move { + match res { + Ok(ev) => Some(Input::NetworkDiagnostic(ev)), + Err(err) => { + log::warn!( + "receive error for network diagnostic event log: {}", + err + ); + None + }, + } + }) + .boxed(), + ); + coalesced.push( stream! { while listen_addrs.changed().await.is_ok() { diff --git a/librad/Cargo.toml b/librad/Cargo.toml index 3f1fe9f85..23b1df4c9 100644 --- a/librad/Cargo.toml +++ b/librad/Cargo.toml @@ -97,7 +97,7 @@ features = ["tls-rustls"] [dependencies.radicle-data] path = "../data" -features = ["minicbor"] +features = ["minicbor", "serde"] [dependencies.radicle-git-ext] path = "../git-ext" diff --git a/librad/src/net/peer.rs b/librad/src/net/peer.rs index 549d3ec8e..116297fb4 100644 --- a/librad/src/net/peer.rs +++ b/librad/src/net/peer.rs @@ -10,7 +10,7 @@ use futures_timer::Delay; use thiserror::Error; use tokio::task::spawn_blocking; -use super::protocol::{self, gossip}; +use super::protocol::{self, event::NetworkDiagnosticEvent, gossip}; use crate::{ git::{self, storage::Fetchers, Urn}, signer::Signer, @@ -240,6 +240,12 @@ where self.phone.subscribe() } + pub fn subscribe_to_diagnostic_events( + &self, + ) -> impl futures::Stream> { + self.phone.subscribe_diagnostic_events() + } + /// Borrow a [`git::storage::Storage`] from the pool, and run a blocking /// computation on it. pub async fn using_storage(&self, blocking: F) -> Result diff --git a/librad/src/net/protocol.rs b/librad/src/net/protocol.rs index 53b6d03a6..3b81f26e0 100644 --- a/librad/src/net/protocol.rs +++ b/librad/src/net/protocol.rs @@ -197,6 +197,7 @@ pub fn accept( state, incoming, periodic, + .. }: Bound, disco: Disco, ) -> impl Future> diff --git a/librad/src/net/protocol/broadcast.rs b/librad/src/net/protocol/broadcast.rs index 6e9a9967e..2f4627c12 100644 --- a/librad/src/net/protocol/broadcast.rs +++ b/librad/src/net/protocol/broadcast.rs @@ -13,7 +13,9 @@ use crate::PeerId; mod storage; pub use storage::{LocalStorage, PutResult}; -#[derive(Clone, Debug, PartialEq, minicbor::Encode, minicbor::Decode)] +use serde::Serialize; + +#[derive(Clone, Debug, PartialEq, minicbor::Encode, minicbor::Decode, Serialize)] pub enum Message { #[n(0)] #[cbor(array)] diff --git a/librad/src/net/protocol/event.rs b/librad/src/net/protocol/event.rs index fae2e518d..e6a6fe155 100644 --- a/librad/src/net/protocol/event.rs +++ b/librad/src/net/protocol/event.rs @@ -5,9 +5,11 @@ use std::{collections::HashMap, net::SocketAddr}; -use super::{broadcast, error, gossip, interrogation, membership}; +use super::{broadcast, error, gossip, interrogation, io::Rpc, membership}; use crate::PeerId; +use serde::Serialize; + #[derive(Clone)] pub enum Downstream { Gossip(downstream::Gossip), @@ -169,3 +171,57 @@ pub mod upstream { } } } + +#[derive(Clone, Debug, Serialize)] +#[serde(tag = "type", rename_all = "camelCase")] +pub enum NetworkDiagnosticEvent { + GossipSent { + to: SocketAddr, + message: broadcast::Message, + }, + GossipReceived { + from: SocketAddr, + message: broadcast::Message, + }, + HpvSent { + to: SocketAddr, + message: membership::Message, + }, + HpvReceived { + from: SocketAddr, + message: membership::Message, + }, +} + +impl NetworkDiagnosticEvent { + pub(crate) fn hpv_sent( + to: SocketAddr, + message: membership::Message, + ) -> NetworkDiagnosticEvent { + NetworkDiagnosticEvent::HpvSent { to, message } + } + + pub(crate) fn hpv_received( + from: SocketAddr, + message: membership::Message, + ) -> NetworkDiagnosticEvent { + NetworkDiagnosticEvent::HpvReceived { from, message } + } + + pub(crate) fn gossip_received( + from: SocketAddr, + message: broadcast::Message, + ) -> NetworkDiagnosticEvent { + NetworkDiagnosticEvent::GossipReceived { from, message } + } + + pub(crate) fn rpc_sent( + to: SocketAddr, + rpc: Rpc, + ) -> NetworkDiagnosticEvent { + match rpc { + Rpc::Membership(message) => NetworkDiagnosticEvent::HpvSent { to, message }, + Rpc::Gossip(message) => NetworkDiagnosticEvent::GossipSent { to, message }, + } + } +} diff --git a/librad/src/net/protocol/gossip.rs b/librad/src/net/protocol/gossip.rs index 8e0ca26aa..b8c182f31 100644 --- a/librad/src/net/protocol/gossip.rs +++ b/librad/src/net/protocol/gossip.rs @@ -7,6 +7,8 @@ use minicbor::{Decode, Decoder, Encode, Encoder}; use crate::{identities::git::Urn, peer::PeerId}; +use serde::Serialize; + #[derive(Clone, Debug, PartialEq)] pub enum Rev { Git(git2::Oid), @@ -43,8 +45,19 @@ impl<'de> Decode<'de> for Rev { } } +impl Serialize for Rev { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + match self { + Self::Git(oid) => serializer.serialize_str(oid.to_string().as_str()), + } + } +} + /// The gossip payload type -#[derive(Clone, Debug, PartialEq, Encode, Decode)] +#[derive(Clone, Debug, PartialEq, Encode, Decode, Serialize)] #[cbor(array)] pub struct Payload { /// URN of an updated or wanted repo. diff --git a/librad/src/net/protocol/info.rs b/librad/src/net/protocol/info.rs index 5cbc7da61..ddce8e018 100644 --- a/librad/src/net/protocol/info.rs +++ b/librad/src/net/protocol/info.rs @@ -7,11 +7,12 @@ use std::{collections::BTreeSet, convert::TryFrom, option::NoneError}; use data::BoundedVec; use minicbor::{Decode, Encode}; +use serde::Serialize; use typenum::U16; use crate::peer::PeerId; -#[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd, Encode, Decode)] +#[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd, Encode, Decode, Serialize)] #[repr(u8)] pub enum Capability { #[n(0)] @@ -86,7 +87,7 @@ impl From> for (PeerId, Vec) { } } -#[derive(Debug, Clone, PartialEq, Encode)] +#[derive(Debug, Clone, PartialEq, Encode, Serialize)] #[cbor(array)] pub struct GenericPeerInfo { #[n(0)] @@ -159,7 +160,7 @@ impl<'__b777, Addr: minicbor::Decode<'__b777>, T: minicbor::Decode<'__b777>> }) } } -#[derive(Debug, Clone, PartialEq, Encode)] +#[derive(Debug, Clone, PartialEq, Encode, Serialize)] #[cbor(array)] pub struct PeerAdvertisement { #[n(0)] diff --git a/librad/src/net/protocol/io.rs b/librad/src/net/protocol/io.rs index df5584188..5070d9b8b 100644 --- a/librad/src/net/protocol/io.rs +++ b/librad/src/net/protocol/io.rs @@ -3,6 +3,7 @@ // This file is part of radicle-link, distributed under the GPLv3 with Radicle // Linking Exception. For full terms see the included LICENSE file. +use crate::net::protocol::event::NetworkDiagnosticEvent; use std::{iter, net::SocketAddr}; use data::BoundedVec; @@ -45,11 +46,12 @@ where } if let Some((conn, ingress)) = connect(&state.endpoint, peer, addrs).await { - let rpc_sent = send_rpc::<_, ()>( - &conn, - state.membership.hello(peer_advertisement(&state.endpoint)), - ) - .await; + let msg = state.membership.hello(peer_advertisement(&state.endpoint)); + let rpc: Rpc<_, ()> = msg.clone().into(); + let rpc_sent = send_rpc::<_, ()>(&conn, rpc).await; + state + .phone + .emit_diagnostic_event(NetworkDiagnosticEvent::hpv_sent(conn.remote_addr(), msg)); match rpc_sent { Err(e) => tracing::warn!(err = ?e, "failed to send membership hello"), diff --git a/librad/src/net/protocol/io/recv/gossip.rs b/librad/src/net/protocol/io/recv/gossip.rs index 6a737c0d3..f9afbbdf5 100644 --- a/librad/src/net/protocol/io/recv/gossip.rs +++ b/librad/src/net/protocol/io/recv/gossip.rs @@ -13,7 +13,7 @@ use futures_codec::FramedRead; use crate::{ net::{ - connection::RemotePeer, + connection::{RemoteAddr, RemotePeer}, protocol::{ broadcast, event, @@ -35,7 +35,7 @@ pub(in crate::net::protocol) async fn gossip( stream: Upgraded, ) where S: ProtocolStorage + Clone + 'static, - T: RemotePeer + AsyncRead + Unpin, + T: RemotePeer + RemoteAddr + AsyncRead + Unpin, { let mut recv = FramedRead::new(stream.into_stream(), codec::Gossip::new()); let remote_id = recv.remote_peer_id(); @@ -58,6 +58,12 @@ pub(in crate::net::protocol) async fn gossip( advertised_info: peer_advertisement(&state.endpoint), seen_addrs: iter::empty().into(), }; + state + .phone + .emit_diagnostic_event(event::NetworkDiagnosticEvent::gossip_received( + recv.remote_addr(), + msg.clone(), + )); match broadcast::apply( &state.membership, &state.storage, diff --git a/librad/src/net/protocol/io/recv/membership.rs b/librad/src/net/protocol/io/recv/membership.rs index 9ac7002e2..ceb098775 100644 --- a/librad/src/net/protocol/io/recv/membership.rs +++ b/librad/src/net/protocol/io/recv/membership.rs @@ -15,6 +15,7 @@ use crate::{ net::{ connection::RemoteInfo, protocol::{ + event::NetworkDiagnosticEvent, gossip, io::{codec, peer_advertisement}, membership, @@ -47,6 +48,12 @@ pub(in crate::net::protocol) async fn membership( }, Ok(msg) => { + state + .phone + .emit_diagnostic_event(NetworkDiagnosticEvent::hpv_received( + remote_addr, + msg.clone(), + )); let info = || peer_advertisement(&state.endpoint); match membership::apply(&state.membership, &info, remote_id, remote_addr, msg) { Err(e) => { diff --git a/librad/src/net/protocol/io/send/rpc.rs b/librad/src/net/protocol/io/send/rpc.rs index e79d09725..60b92ebb0 100644 --- a/librad/src/net/protocol/io/send/rpc.rs +++ b/librad/src/net/protocol/io/send/rpc.rs @@ -15,7 +15,7 @@ use crate::net::{ upgrade, }; -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum Rpc { Membership(membership::Message), Gossip(broadcast::Message), diff --git a/librad/src/net/protocol/membership/rpc.rs b/librad/src/net/protocol/membership/rpc.rs index 159f99a26..ec0b04e38 100644 --- a/librad/src/net/protocol/membership/rpc.rs +++ b/librad/src/net/protocol/membership/rpc.rs @@ -5,7 +5,9 @@ use crate::net::protocol::info::{PeerAdvertisement, PeerInfo}; -#[derive(Debug, Clone, PartialEq, minicbor::Encode, minicbor::Decode)] +use serde::Serialize; + +#[derive(Debug, Clone, PartialEq, minicbor::Encode, minicbor::Decode, Serialize)] pub enum Message { #[n(0)] #[cbor(array)] diff --git a/librad/src/net/protocol/tick.rs b/librad/src/net/protocol/tick.rs index 22f6d22ea..49deeb754 100644 --- a/librad/src/net/protocol/tick.rs +++ b/librad/src/net/protocol/tick.rs @@ -11,8 +11,8 @@ use futures::{ }; use tracing::Instrument as _; -use super::{error, gossip, io, membership, PeerInfo, ProtocolStorage, State}; -use crate::PeerId; +use super::{error, event, gossip, io, membership, PeerInfo, ProtocolStorage, State}; +use crate::{net::connection::RemoteAddr, PeerId}; #[derive(Debug)] pub(super) enum Tock { @@ -84,7 +84,7 @@ where }, Some(conn) => { - io::send_rpc(&conn, message) + io::send_rpc(&conn, message.clone()) .map_err(|e| { let membership::TnT { trans, ticks: cont } = state.membership.connection_lost(to); @@ -95,11 +95,19 @@ where source: e.into(), }) }) - .await + .await?; + state + .phone + .emit_diagnostic_event(event::NetworkDiagnosticEvent::rpc_sent( + conn.remote_addr(), + message, + )); + Ok(()) }, }, AttemptSend { to, message } => { + let phone = state.phone.clone(); let conn = match state.endpoint.get_connection(to.peer_id) { None => { let (conn, ingress) = io::connect_peer_info(&state.endpoint, to.clone()) @@ -111,9 +119,14 @@ where }, Some(conn) => Ok::<_, error::Tock>(conn), }?; - Ok(io::send_rpc(&conn, message) + io::send_rpc(&conn, message.clone()) .await - .map_err(error::BestEffortSend::SendGossip)?) + .map_err(error::BestEffortSend::SendGossip)?; + phone.emit_diagnostic_event(event::NetworkDiagnosticEvent::rpc_sent( + conn.remote_addr(), + message, + )); + Ok(()) }, Disconnect { peer } => { diff --git a/librad/src/net/protocol/tincans.rs b/librad/src/net/protocol/tincans.rs index 2d80aba1f..56fe1a7ba 100644 --- a/librad/src/net/protocol/tincans.rs +++ b/librad/src/net/protocol/tincans.rs @@ -22,6 +22,7 @@ use crate::PeerId; pub struct TinCans { pub(super) downstream: tincan::Sender, pub(super) upstream: tincan::Sender, + diagnostic_events: tincan::Sender, } impl TinCans { @@ -29,6 +30,7 @@ impl TinCans { Self { downstream: tincan::channel(16).0, upstream: tincan::channel(16).0, + diagnostic_events: tincan::channel(16).0, } } @@ -141,6 +143,18 @@ impl TinCans { pub(super) fn emit(&self, evt: impl Into) { self.upstream.send(evt.into()).ok(); } + + pub fn subscribe_diagnostic_events( + &self, + ) -> impl futures::Stream> { + let mut r = self.diagnostic_events.subscribe(); + async_stream::stream! { loop { yield r.recv().await } } + } + + pub(super) fn emit_diagnostic_event(&self, evt: impl Into) { + tracing::info!("Emitting log event"); + self.diagnostic_events.send(evt.into()).ok(); + } } impl Default for TinCans {