Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow arbitrary logger types. #51

Merged
merged 10 commits into from
Aug 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ edition = "2021"

[dependencies]
bitcoin = "0.29"
lightning = { version = "0.0.116-alpha1" }
lightning-block-sync = { version = "0.0.116-alpha1", features=["rest-client"] }
lightning-net-tokio = { version = "0.0.116-alpha1" }
lightning = { version = "0.0.116" }
lightning-block-sync = { version = "0.0.116", features=["rest-client"] }
lightning-net-tokio = { version = "0.0.116" }
tokio = { version = "1.25", features = ["full"] }
tokio-postgres = { version="=0.7.5" }
futures = "0.3"
Expand Down
13 changes: 13 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,19 @@ pub(crate) fn network() -> Network {
}
}

pub(crate) fn log_level() -> lightning::util::logger::Level {
let level = env::var("RAPID_GOSSIP_SYNC_SERVER_LOG_LEVEL").unwrap_or("info".to_string()).to_lowercase();
match level.as_str() {
"gossip" => lightning::util::logger::Level::Gossip,
"trace" => lightning::util::logger::Level::Trace,
"debug" => lightning::util::logger::Level::Debug,
"info" => lightning::util::logger::Level::Info,
"warn" => lightning::util::logger::Level::Warn,
"error" => lightning::util::logger::Level::Error,
_ => panic!("Invalid log level"),
}
}

pub(crate) fn network_graph_cache_path() -> String {
format!("{}/network_graph.bin", cache_path())
}
Expand Down
29 changes: 15 additions & 14 deletions src/downloader.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::ops::Deref;
use std::sync::{Arc, RwLock};

use bitcoin::secp256k1::PublicKey;
use lightning::events::{MessageSendEvent, MessageSendEventsProvider};
use lightning::ln::features::{InitFeatures, NodeFeatures};
use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler};
use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
use lightning::util::logger::Logger;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;

use crate::TestLogger;
use crate::types::{GossipMessage, GossipChainAccess, GossipPeerManager};
use crate::verifier::ChainVerifier;

Expand All @@ -30,28 +31,28 @@ impl GossipCounter {
}
}

pub(crate) struct GossipRouter {
native_router: P2PGossipSync<Arc<NetworkGraph<TestLogger>>, GossipChainAccess, TestLogger>,
pub(crate) struct GossipRouter<L: Deref + Clone + Send + Sync + 'static> where L::Target: Logger {
native_router: P2PGossipSync<Arc<NetworkGraph<L>>, GossipChainAccess<L>, L>,
pub(crate) counter: RwLock<GossipCounter>,
sender: mpsc::Sender<GossipMessage>,
verifier: Arc<ChainVerifier>,
outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, GossipChainAccess, TestLogger>>,
verifier: Arc<ChainVerifier<L>>,
outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, GossipChainAccess<L>, L>>,
}

impl GossipRouter {
pub(crate) fn new(network_graph: Arc<NetworkGraph<TestLogger>>, sender: mpsc::Sender<GossipMessage>) -> Self {
let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, TestLogger::new()));
let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper)));
impl<L: Deref + Clone + Send + Sync> GossipRouter<L> where L::Target: Logger {
pub(crate) fn new(network_graph: Arc<NetworkGraph<L>>, sender: mpsc::Sender<GossipMessage>, logger: L) -> Self {
let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, logger.clone()));
let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper), logger.clone()));
Self {
native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), TestLogger::new()),
native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), logger.clone()),
outbound_gossiper,
counter: RwLock::new(GossipCounter::new()),
sender,
verifier,
verifier
}
}

pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager) {
pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager<L>) {
self.verifier.set_ph(peer_handler);
}

Expand Down Expand Up @@ -83,7 +84,7 @@ impl GossipRouter {
}
}

impl MessageSendEventsProvider for GossipRouter {
impl<L: Deref + Clone + Send + Sync> MessageSendEventsProvider for GossipRouter<L> where L::Target: Logger {
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
let gossip_evs = self.outbound_gossiper.get_and_clear_pending_msg_events();
for ev in gossip_evs {
Expand All @@ -102,7 +103,7 @@ impl MessageSendEventsProvider for GossipRouter {
}
}

impl RoutingMessageHandler for GossipRouter {
impl<L: Deref + Clone + Send + Sync> RoutingMessageHandler for GossipRouter<L> where L::Target: Logger {
fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
self.native_router.handle_node_announcement(msg)
}
Expand Down
63 changes: 34 additions & 29 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,22 @@ extern crate core;
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::io::BufReader;
use std::ops::Deref;
use std::sync::Arc;
use lightning::log_info;

use lightning::routing::gossip::{NetworkGraph, NodeId};
use lightning::util::logger::Logger;
use lightning::util::ser::{ReadableArgs, Writeable};
use tokio::sync::mpsc;
use crate::lookup::DeltaSet;

use crate::persistence::GossipPersister;
use crate::serialization::UpdateSerialization;
use crate::snapshot::Snapshotter;
use crate::types::TestLogger;
use crate::types::RGSSLogger;

mod downloader;
mod types;
mod tracking;
mod lookup;
mod persistence;
Expand All @@ -35,14 +37,17 @@ mod config;
mod hex_utils;
mod verifier;

pub mod types;

/// The purpose of this prefix is to identify the serialization format, should other rapid gossip
/// sync formats arise in the future.
///
/// The fourth byte is the protocol version in case our format gets updated.
const GOSSIP_PREFIX: [u8; 4] = [76, 68, 75, 1];

pub struct RapidSyncProcessor {
network_graph: Arc<NetworkGraph<TestLogger>>,
pub struct RapidSyncProcessor<L: Deref> where L::Target: Logger {
network_graph: Arc<NetworkGraph<L>>,
logger: L
}

pub struct SerializedResponse {
Expand All @@ -54,27 +59,27 @@ pub struct SerializedResponse {
pub update_count_incremental: u32,
}

impl RapidSyncProcessor {
pub fn new() -> Self {
impl<L: Deref + Clone + Send + Sync + 'static> RapidSyncProcessor<L> where L::Target: Logger {
pub fn new(logger: L) -> Self {
let network = config::network();
let logger = TestLogger::new();
let network_graph = if let Ok(file) = File::open(&config::network_graph_cache_path()) {
println!("Initializing from cached network graph…");
log_info!(logger, "Initializing from cached network graph…");
let mut buffered_reader = BufReader::new(file);
let network_graph_result = NetworkGraph::read(&mut buffered_reader, logger);
let network_graph_result = NetworkGraph::read(&mut buffered_reader, logger.clone());
if let Ok(network_graph) = network_graph_result {
println!("Initialized from cached network graph!");
log_info!(logger, "Initialized from cached network graph!");
network_graph
} else {
println!("Initialization from cached network graph failed: {}", network_graph_result.err().unwrap());
NetworkGraph::new(network, logger)
log_info!(logger, "Initialization from cached network graph failed: {}", network_graph_result.err().unwrap());
NetworkGraph::new(network, logger.clone())
}
} else {
NetworkGraph::new(network, logger)
NetworkGraph::new(network, logger.clone())
};
let arc_network_graph = Arc::new(network_graph);
Self {
network_graph: arc_network_graph,
logger
}
}

Expand All @@ -83,12 +88,12 @@ impl RapidSyncProcessor {
let (sync_completion_sender, mut sync_completion_receiver) = mpsc::channel::<()>(1);

if config::DOWNLOAD_NEW_GOSSIP {
let (mut persister, persistence_sender) = GossipPersister::new(Arc::clone(&self.network_graph));
let (mut persister, persistence_sender) = GossipPersister::new(self.network_graph.clone(), self.logger.clone());

println!("Starting gossip download");
log_info!(self.logger, "Starting gossip download");
tokio::spawn(tracking::download_gossip(persistence_sender, sync_completion_sender,
Arc::clone(&self.network_graph)));
println!("Starting gossip db persistence listener");
Arc::clone(&self.network_graph), self.logger.clone()));
log_info!(self.logger, "Starting gossip db persistence listener");
tokio::spawn(async move { persister.persist_gossip().await; });
} else {
sync_completion_sender.send(()).await.unwrap();
Expand All @@ -98,10 +103,10 @@ impl RapidSyncProcessor {
if sync_completion.is_none() {
panic!("Sync failed!");
}
println!("Initial sync complete!");
log_info!(self.logger, "Initial sync complete!");

// start the gossip snapshotting service
Snapshotter::new(Arc::clone(&self.network_graph)).snapshot_gossip().await;
Snapshotter::new(Arc::clone(&self.network_graph), self.logger.clone()).snapshot_gossip().await;
}
}

Expand All @@ -126,7 +131,7 @@ fn serialize_empty_blob(current_timestamp: u64) -> Vec<u8> {
let chain_hash = genesis_block.block_hash();
chain_hash.write(&mut blob).unwrap();

let blob_timestamp = Snapshotter::round_down_to_nearest_multiple(current_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64) as u32;
let blob_timestamp = Snapshotter::<Arc<RGSSLogger>>::round_down_to_nearest_multiple(current_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64) as u32;
blob_timestamp.write(&mut blob).unwrap();

0u32.write(&mut blob).unwrap(); // node count
Expand All @@ -136,7 +141,7 @@ fn serialize_empty_blob(current_timestamp: u64) -> Vec<u8> {
blob
}

async fn serialize_delta(network_graph: Arc<NetworkGraph<TestLogger>>, last_sync_timestamp: u32) -> SerializedResponse {
async fn serialize_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>, last_sync_timestamp: u32, logger: L) -> SerializedResponse where L::Target: Logger {
let (client, connection) = lookup::connect_to_db().await;

network_graph.remove_stale_channels_and_tracking();
Expand Down Expand Up @@ -170,12 +175,12 @@ async fn serialize_delta(network_graph: Arc<NetworkGraph<TestLogger>>, last_sync
};

let mut delta_set = DeltaSet::new();
lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp).await;
println!("announcement channel count: {}", delta_set.len());
lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp).await;
println!("update-fetched channel count: {}", delta_set.len());
lookup::filter_delta_set(&mut delta_set);
println!("update-filtered channel count: {}", delta_set.len());
lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp, logger.clone()).await;
log_info!(logger, "announcement channel count: {}", delta_set.len());
lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp, logger.clone()).await;
log_info!(logger, "update-fetched channel count: {}", delta_set.len());
lookup::filter_delta_set(&mut delta_set, logger.clone());
log_info!(logger, "update-filtered channel count: {}", delta_set.len());
let serialization_details = serialization::serialize_delta_set(delta_set, last_sync_timestamp);

// process announcements
Expand Down Expand Up @@ -246,8 +251,8 @@ async fn serialize_delta(network_graph: Arc<NetworkGraph<TestLogger>>, last_sync

prefixed_output.append(&mut output);

println!("duplicated node ids: {}", duplicate_node_ids);
println!("latest seen timestamp: {:?}", serialization_details.latest_seen);
log_info!(logger, "duplicated node ids: {}", duplicate_node_ids);
log_info!(logger, "latest seen timestamp: {:?}", serialization_details.latest_seen);

SerializedResponse {
data: prefixed_output,
Expand Down
32 changes: 17 additions & 15 deletions src/lookup.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::{BTreeMap, HashSet};
use std::io::Cursor;
use std::ops::Add;
use std::ops::{Add, Deref};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};

Expand All @@ -11,8 +11,10 @@ use tokio_postgres::{Client, Connection, NoTls, Socket};
use tokio_postgres::tls::NoTlsStream;

use futures::StreamExt;
use lightning::log_info;
use lightning::util::logger::Logger;

use crate::{config, TestLogger};
use crate::config;
use crate::serialization::MutatedProperties;

/// The delta set needs to be a BTreeMap so the keys are sorted.
Expand Down Expand Up @@ -75,20 +77,20 @@ pub(super) async fn connect_to_db() -> (Client, Connection<Socket, NoTlsStream>)
/// whether they had been seen before.
/// Also include all announcements for which the first update was announced
/// after `last_sync_timestamp`
pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<TestLogger>>, client: &Client, last_sync_timestamp: u32) {
println!("Obtaining channel ids from network graph");
pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<L>>, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger {
log_info!(logger, "Obtaining channel ids from network graph");
let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64));
let channel_ids = {
let read_only_graph = network_graph.read_only();
println!("Retrieved read-only network graph copy");
log_info!(logger, "Retrieved read-only network graph copy");
let channel_iterator = read_only_graph.channels().unordered_iter();
channel_iterator
.filter(|c| c.1.announcement_message.is_some())
.map(|c| c.1.announcement_message.as_ref().unwrap().contents.short_channel_id as i64)
.collect::<Vec<_>>()
};

println!("Obtaining corresponding database entries");
log_info!(logger, "Obtaining corresponding database entries");
// get all the channel announcements that are currently in the network graph
let announcement_rows = client.query_raw("SELECT announcement_signed, seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", [&channel_ids]).await.unwrap();
let mut pinned_rows = Box::pin(announcement_rows);
Expand All @@ -113,7 +115,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ
{
// THIS STEP IS USED TO DETERMINE IF A CHANNEL SHOULD BE OMITTED FROM THE DELTA

println!("Annotating channel announcements whose oldest channel update in a given direction occurred after the last sync");
log_info!(logger, "Annotating channel announcements whose oldest channel update in a given direction occurred after the last sync");
// Steps:
// — Obtain all updates, distinct by (scid, direction), ordered by seen DESC // to find the oldest update in a given direction
// — From those updates, select distinct by (scid), ordered by seen DESC (to obtain the newer one per direction)
Expand Down Expand Up @@ -155,7 +157,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ
{
// THIS STEP IS USED TO DETERMINE IF A REMINDER UPDATE SHOULD BE SENT

println!("Annotating channel announcements whose latest channel update in a given direction occurred more than six days ago");
log_info!(logger, "Annotating channel announcements whose latest channel update in a given direction occurred more than six days ago");
// Steps:
// — Obtain all updates, distinct by (scid, direction), ordered by seen DESC
// — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction)
Expand Down Expand Up @@ -213,7 +215,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ
}
}

pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32) {
pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger {
let start = Instant::now();
let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64));

Expand All @@ -235,7 +237,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Cli
", [last_sync_timestamp_object]).await.unwrap();
let mut pinned_rows = Box::pin(reference_rows);

println!("Fetched reference rows in {:?}", start.elapsed());
log_info!(logger, "Fetched reference rows in {:?}", start.elapsed());

let mut last_seen_update_ids: Vec<i32> = Vec::new();
let mut non_intermediate_ids: HashSet<i32> = HashSet::new();
Expand Down Expand Up @@ -263,7 +265,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Cli
reference_row_count += 1;
}

println!("Processed {} reference rows (delta size: {}) in {:?}",
log_info!(logger, "Processed {} reference rows (delta size: {}) in {:?}",
reference_row_count, delta_set.len(), start.elapsed());

// get all the intermediate channel updates
Expand All @@ -276,7 +278,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Cli
WHERE seen >= $1
", [last_sync_timestamp_object]).await.unwrap();
let mut pinned_updates = Box::pin(intermediate_updates);
println!("Fetched intermediate rows in {:?}", start.elapsed());
log_info!(logger, "Fetched intermediate rows in {:?}", start.elapsed());

let mut previous_scid = u64::MAX;
let mut previously_seen_directions = (false, false);
Expand Down Expand Up @@ -351,10 +353,10 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Cli
}
}
}
println!("Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
}

pub(super) fn filter_delta_set(delta_set: &mut DeltaSet) {
pub(super) fn filter_delta_set<L: Deref>(delta_set: &mut DeltaSet, logger: L) where L::Target: Logger {
let original_length = delta_set.len();
let keys: Vec<u64> = delta_set.keys().cloned().collect();
for k in keys {
Expand Down Expand Up @@ -386,6 +388,6 @@ pub(super) fn filter_delta_set(delta_set: &mut DeltaSet) {

let new_length = delta_set.len();
if original_length != new_length {
println!("length modified!");
log_info!(logger, "length modified!");
}
}
5 changes: 4 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::sync::Arc;
use rapid_gossip_sync_server::RapidSyncProcessor;
use rapid_gossip_sync_server::types::RGSSLogger;

#[tokio::main]
async fn main() {
RapidSyncProcessor::new().start_sync().await;
let logger = Arc::new(RGSSLogger::new());
RapidSyncProcessor::new(logger).start_sync().await;
}
Loading
Loading