diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 33d5239..e3ad979 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -30,6 +30,12 @@ jobs: toolchain: ${{ matrix.toolchain }} override: true profile: minimal + - name: Pin dependencies + if: ${{ matrix.toolchain == '1.63.0' }} + run: | + cargo update -p tokio --precise "1.37.0" --verbose + cargo update -p tokio-macros --precise "2.2.0" --verbose + cargo update -p postgres-types --precise "0.2.6" --verbose - name: Build on Rust ${{ matrix.toolchain }} run: | cargo build --verbose --color always diff --git a/Cargo.toml b/Cargo.toml index 1fee6cd..671d41a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,18 +4,18 @@ version = "0.1.0" edition = "2021" [dependencies] -bitcoin = "0.30" -hex-conservative = "0.2" -lightning = { version = "0.0.123" } -lightning-block-sync = { version = "0.0.123", features=["rest-client"] } -lightning-net-tokio = { version = "0.0.123" } +bitcoin = "0.32.2" +hex-conservative = "0.2.1" +lightning = { version = "0.0.124" } +lightning-block-sync = { version = "0.0.124", features=["rest-client"] } +lightning-net-tokio = { version = "0.0.124" } tokio = { version = "1.25", features = ["full"] } tokio-postgres = { version = "=0.7.5" } futures = "0.3" [dev-dependencies] -lightning = { version = "0.0.123", features = ["_test_utils"] } -lightning-rapid-gossip-sync = { version = "0.0.123" } +lightning = { version = "0.0.124", features = ["_test_utils"] } +lightning-rapid-gossip-sync = { version = "0.0.124" } [profile.dev] panic = "abort" diff --git a/src/config.rs b/src/config.rs index 4950ec0..6220fc1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,10 +1,10 @@ use crate::hex_utils; use std::env; -use std::io::Cursor; use std::net::{SocketAddr, ToSocketAddrs}; use std::time::Duration; +use bitcoin::io::Cursor; use bitcoin::Network; use bitcoin::hashes::hex::FromHex; use bitcoin::secp256k1::PublicKey; @@ -23,6 +23,10 @@ pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks /// updates in both directions. pub(crate) const CHANNEL_REMINDER_AGE: Duration = Duration::from_secs(6 * 24 * 60 * 60); +/// The interval after which graph data gets pruned after it was first seen +/// This should match the LDK default pruning interval, which is 14 days +pub(crate) const PRUNE_INTERVAL: Duration = Duration::from_secs(14 * 24 * 60 * 60); + /// Maximum number of default features to calculate for node announcements pub(crate) const NODE_DEFAULT_FEATURE_COUNT: u8 = 6; @@ -219,7 +223,7 @@ pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client) let announcement: Vec = row.get("announcement_signed"); let tx_ref = &tx; updates.push(async move { - let scid = ChannelAnnouncement::read(&mut Cursor::new(announcement)).unwrap().contents.short_channel_id as i64; + let scid = ChannelAnnouncement::read(&mut Cursor::new(&announcement)).unwrap().contents.short_channel_id as i64; assert!(scid > 0); // Will roll over in some 150 years or so tx_ref.execute("UPDATE channel_announcements SET short_channel_id = $1 WHERE id = $2", &[&scid, &id]).await.unwrap(); }); diff --git a/src/lib.rs b/src/lib.rs index 4da8f38..00a5ed5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,7 +26,7 @@ use crate::config::SYMLINK_GRANULARITY_INTERVAL; use crate::lookup::DeltaSet; use crate::persistence::GossipPersister; -use crate::serialization::{SerializationSet, UpdateSerialization}; +use crate::serialization::{MutatedNodeProperties, NodeSerializationStrategy, SerializationSet, UpdateSerialization}; use crate::snapshot::Snapshotter; use crate::types::RGSSLogger; @@ -187,11 +187,11 @@ async fn calculate_delta(network_graph: Arc>, // for announcement-free incremental-only updates, chain hash can be skipped let mut delta_set = DeltaSet::new(); - lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp, snapshot_reference_timestamp, logger.clone()).await; + lookup::fetch_channel_announcements(&mut delta_set, Arc::clone(&network_graph), &client, last_sync_timestamp, snapshot_reference_timestamp, logger.clone()).await; log_info!(logger, "announcement channel count: {}", delta_set.len()); lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp, logger.clone()).await; log_info!(logger, "update-fetched channel count: {}", delta_set.len()); - let node_delta_set = lookup::fetch_node_updates(&client, last_sync_timestamp, logger.clone()).await; + let node_delta_set = lookup::fetch_node_updates(network_graph, &client, last_sync_timestamp, snapshot_reference_timestamp, logger.clone()).await; log_info!(logger, "update-fetched node count: {}", node_delta_set.len()); lookup::filter_delta_set(&mut delta_set, logger.clone()); log_info!(logger, "update-filtered channel count: {}", delta_set.len()); @@ -306,6 +306,9 @@ fn serialize_delta(serialization_details: &SerializationSet, s if serialization_version >= 2 { if let Some(node_delta) = serialization_details.node_mutations.get(¤t_node_id) { + let strategy = node_delta.strategy.as_ref().unwrap(); + let mut node_has_update = false; + /* Bitmap: 7: expect extra data after the pubkey (a u16 for the count, and then that number of bytes) @@ -317,51 +320,60 @@ fn serialize_delta(serialization_details: &SerializationSet, s 0: used for odd keys */ - if node_delta.has_address_set_changed { - node_address_update_count += 1; - - let address_set = &node_delta.latest_details_after_seen.as_ref().unwrap().addresses; - let mut address_serialization = Vec::new(); - - // we don't know a priori how many are <= 255 bytes - let mut total_address_count = 0u8; - - for address in address_set.iter() { - if total_address_count == u8::MAX { - // don't serialize more than 255 addresses - break; + match strategy { + NodeSerializationStrategy::Mutated(MutatedNodeProperties { addresses: true, .. }) | NodeSerializationStrategy::Full => { + let address_set = &node_delta.latest_details.as_ref().unwrap().addresses; + let mut address_serialization = Vec::new(); + + // we don't know a priori how many are <= 255 bytes + let mut total_address_count = 0u8; + + for address in address_set.iter() { + if total_address_count == u8::MAX { + // don't serialize more than 255 addresses + break; + } + if let Ok(serialized_length) = u8::try_from(address.serialized_length()) { + total_address_count += 1; + serialized_length.write(&mut address_serialization).unwrap(); + address.write(&mut address_serialization).unwrap(); + }; } - if let Ok(serialized_length) = u8::try_from(address.serialized_length()) { - total_address_count += 1; - serialized_length.write(&mut address_serialization).unwrap(); - address.write(&mut address_serialization).unwrap(); - }; - } - - // signal the presence of node addresses - current_node_delta_serialization[0] |= 1 << 2; - // serialize the actual addresses and count - total_address_count.write(&mut current_node_delta_serialization).unwrap(); - current_node_delta_serialization.append(&mut address_serialization); - } - if node_delta.has_feature_set_changed { - node_feature_update_count += 1; + node_address_update_count += 1; + node_has_update = true; - let latest_features = &node_delta.latest_details_after_seen.as_ref().unwrap().features; + // signal the presence of node addresses + current_node_delta_serialization[0] |= 1 << 2; + // serialize the actual addresses and count + total_address_count.write(&mut current_node_delta_serialization).unwrap(); + current_node_delta_serialization.append(&mut address_serialization); + }, + _ => {} + } - // are these features among the most common ones? - if let Some(index) = serialization_details.node_announcement_feature_defaults.iter().position(|f| f == latest_features) { - // this feature set is among the 6 defaults - current_node_delta_serialization[0] |= ((index + 1) as u8) << 3; - } else { - current_node_delta_serialization[0] |= 0b_0011_1000; // 7 << 3 - latest_features.write(&mut current_node_delta_serialization).unwrap(); - } + match strategy { + NodeSerializationStrategy::Mutated(MutatedNodeProperties { features: true, .. }) | NodeSerializationStrategy::Full => { + let latest_features = &node_delta.latest_details.as_ref().unwrap().features; + node_feature_update_count += 1; + node_has_update = true; + + // are these features among the most common ones? + if let Some(index) = serialization_details.node_announcement_feature_defaults.iter().position(|f| f == latest_features) { + // this feature set is among the 6 defaults + current_node_delta_serialization[0] |= ((index + 1) as u8) << 3; + } else { + current_node_delta_serialization[0] |= 0b_0011_1000; // 7 << 3 + latest_features.write(&mut current_node_delta_serialization).unwrap(); + } + }, + _ => {} } - if node_delta.has_address_set_changed || node_delta.has_feature_set_changed { + if node_has_update { node_update_count += 1; + } else if let NodeSerializationStrategy::Reminder = strategy { + current_node_delta_serialization[0] |= 1 << 6; } } } diff --git a/src/lookup.rs b/src/lookup.rs index 4337508..87a35ae 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -1,21 +1,23 @@ use std::collections::{BTreeMap, HashMap, HashSet}; -use std::io::Cursor; use std::ops::Deref; use std::sync::Arc; use std::time::{Instant, SystemTime, UNIX_EPOCH}; +use bitcoin::io::Cursor; + use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement, SocketAddress, UnsignedChannelAnnouncement, UnsignedChannelUpdate}; use lightning::routing::gossip::{NetworkGraph, NodeId}; use lightning::util::ser::Readable; use tokio_postgres::Client; use futures::StreamExt; +use hex_conservative::DisplayHex; use lightning::{log_debug, log_gossip, log_info}; use lightning::ln::features::NodeFeatures; use lightning::util::logger::Logger; use crate::config; -use crate::serialization::MutatedProperties; +use crate::serialization::{MutatedNodeProperties, MutatedProperties, NodeSerializationStrategy}; /// The delta set needs to be a BTreeMap so the keys are sorted. /// That way, the scids in the response automatically grow monotonically @@ -54,25 +56,17 @@ pub(super) struct ChannelDelta { pub(super) struct NodeDelta { /// The most recently received, but new-to-the-client, node details - pub(super) latest_details_after_seen: Option, - - /// Between last_details_before_seen and latest_details_after_seen, including any potential - /// intermediate updates that are not kept track of here, has the set of features this node - /// supports changed? - pub(super) has_feature_set_changed: bool, + pub(super) latest_details: Option, - /// Between last_details_before_seen and latest_details_after_seen, including any potential - /// intermediate updates that are not kept track of here, has the set of socket addresses this - /// node listens on changed? - pub(super) has_address_set_changed: bool, + /// How should this delta be serialized? + pub(super) strategy: Option, /// The most recent node details that the client would have seen already pub(super) last_details_before_seen: Option } pub(super) struct NodeDetails { - #[allow(unused)] - pub(super) seen: u32, + pub(super) seen: Option, pub(super) features: NodeFeatures, pub(super) addresses: HashSet } @@ -91,10 +85,9 @@ impl Default for ChannelDelta { impl Default for NodeDelta { fn default() -> Self { Self { - latest_details_after_seen: None, - has_feature_set_changed: false, - has_address_set_changed: false, + latest_details: None, last_details_before_seen: None, + strategy: None, } } } @@ -110,6 +103,24 @@ impl Default for DirectedUpdateDelta { } } +fn should_snapshot_include_reminders(last_sync_timestamp: u32, current_timestamp: u64, logger: &L) -> bool where L::Target: Logger { + let current_hour = current_timestamp / 3600; + let current_day = current_timestamp / (24 * 3600); + + log_debug!(logger, "Current day index: {}", current_day); + log_debug!(logger, "Current hour: {}", current_hour); + + // every 5th day at midnight + let is_reminder_hour = (current_hour % 24) == 0; + let is_reminder_day = (current_day % 5) == 0; + + let snapshot_scope = current_timestamp.saturating_sub(last_sync_timestamp as u64); + let is_reminder_scope = snapshot_scope > (50 * 3600); + log_debug!(logger, "Snapshot scope: {}s", snapshot_scope); + + (is_reminder_hour && is_reminder_day) || is_reminder_scope +} + /// Fetch all the channel announcements that are presently in the network graph, regardless of /// whether they had been seen before. /// Also include all announcements for which the first update was announced @@ -133,23 +144,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS let current_timestamp = snapshot_reference_timestamp.unwrap_or(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()); log_info!(logger, "Current timestamp: {}", current_timestamp); - let include_reminders = { - let current_hour = current_timestamp / 3600; - let current_day = current_timestamp / (24 * 3600); - - log_debug!(logger, "Current day index: {}", current_day); - log_debug!(logger, "Current hour: {}", current_hour); - - // every 5th day at midnight - let is_reminder_hour = (current_hour % 24) == 0; - let is_reminder_day = (current_day % 5) == 0; - - let snapshot_scope = current_timestamp.saturating_sub(last_sync_timestamp as u64); - let is_reminder_scope = snapshot_scope > (50 * 3600); - log_debug!(logger, "Snapshot scope: {}s", snapshot_scope); - - (is_reminder_hour && is_reminder_day) || is_reminder_scope - }; + let include_reminders = should_snapshot_include_reminders(last_sync_timestamp, current_timestamp, &logger); log_info!(logger, "Obtaining corresponding database entries"); // get all the channel announcements that are currently in the network graph @@ -160,7 +155,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS while let Some(row_res) = pinned_rows.next().await { let current_announcement_row = row_res.unwrap(); let blob: Vec = current_announcement_row.get("announcement_signed"); - let mut readable = Cursor::new(blob); + let mut readable = Cursor::new(&blob); let unsigned_announcement = ChannelAnnouncement::read(&mut readable).unwrap().contents; let scid = unsigned_announcement.short_channel_id; @@ -231,7 +226,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS let reminder_threshold_timestamp = current_timestamp.checked_sub(config::CHANNEL_REMINDER_AGE.as_secs()).unwrap() as f64; log_info!(logger, "Fetch first time we saw the current value combination for each direction (prior mutations excepted)"); - let reminder_lookup_threshold_timestamp = current_timestamp.checked_sub(config::CHANNEL_REMINDER_AGE.as_secs() * 3).unwrap() as f64; + let reminder_lookup_threshold_timestamp = current_timestamp.checked_sub(config::PRUNE_INTERVAL.as_secs()).unwrap() as f64; let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&channel_ids, &reminder_lookup_threshold_timestamp]; /* @@ -287,7 +282,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS if seen < reminder_threshold_timestamp as u32 { let blob: Vec = current_row.get("blob_signed"); - let mut readable = Cursor::new(blob); + let mut readable = Cursor::new(&blob); let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents; let scid = unsigned_channel_update.short_channel_id; @@ -365,7 +360,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, cl let direction: bool = current_reference.get("direction"); let seen = current_reference.get::<_, i64>("seen") as u32; let blob: Vec = current_reference.get("blob_signed"); - let mut readable = Cursor::new(blob); + let mut readable = Cursor::new(&blob); let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents; let scid = unsigned_channel_update.short_channel_id; @@ -415,7 +410,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, cl let direction: bool = intermediate_update.get("direction"); let current_seen_timestamp = intermediate_update.get::<_, i64>("seen") as u32; let blob: Vec = intermediate_update.get("blob_signed"); - let mut readable = Cursor::new(blob); + let mut readable = Cursor::new(&blob); let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents; let scid = unsigned_channel_update.short_channel_id; @@ -451,7 +446,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, cl // determine mutations if let Some(last_seen_update) = update_delta.last_update_before_seen.as_ref() { - if unsigned_channel_update.flags != last_seen_update.update.flags { + if unsigned_channel_update.channel_flags != last_seen_update.update.channel_flags { update_delta.mutated_properties.flags = true; } if unsigned_channel_update.cltv_expiry_delta != last_seen_update.update.cltv_expiry_delta { @@ -474,19 +469,44 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, cl log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed()); } -pub(super) async fn fetch_node_updates(client: &Client, last_sync_timestamp: u32, logger: L) -> NodeDeltaSet where L::Target: Logger { +pub(super) async fn fetch_node_updates(network_graph: Arc>, client: &Client, last_sync_timestamp: u32, snapshot_reference_timestamp: Option, logger: L) -> NodeDeltaSet where L::Target: Logger { let start = Instant::now(); let last_sync_timestamp_float = last_sync_timestamp as f64; - let mut delta_set = NodeDeltaSet::new(); + let mut delta_set: NodeDeltaSet = { + let read_only_graph = network_graph.read_only(); + read_only_graph.nodes().unordered_iter().flat_map(|(node_id, node_info)| { + let details: NodeDetails = if let Some(details) = node_info.announcement_info.as_ref() { + NodeDetails { + seen: None, + features: details.features().clone(), + addresses: details.addresses().into_iter().cloned().collect(), + } + } else { + return None; + }; + Some((node_id.clone(), NodeDelta { + latest_details: Some(details), + strategy: None, + last_details_before_seen: None, + })) + }).collect() + }; + + let node_ids: Vec = delta_set.keys().into_iter().map(|id| id.as_slice().to_lower_hex_string()).collect(); + #[cfg(test)] + log_info!(logger, "Node IDs: {:?}", node_ids); // get the latest node updates prior to last_sync_timestamp + let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&node_ids, &last_sync_timestamp_float]; let reference_rows = client.query_raw(" SELECT DISTINCT ON (public_key) public_key, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen, announcement_signed FROM node_announcements - WHERE seen < TO_TIMESTAMP($1) + WHERE + public_key = ANY($1) AND + seen < TO_TIMESTAMP($2) ORDER BY public_key ASC, seen DESC - ", [last_sync_timestamp_float]).await.unwrap(); + ", params).await.unwrap(); let mut pinned_rows = Box::pin(reference_rows); log_info!(logger, "Fetched node announcement reference rows in {:?}", start.elapsed()); @@ -498,7 +518,7 @@ pub(super) async fn fetch_node_updates(client: &Client, last_sync_time let seen = current_reference.get::<_, i64>("seen") as u32; let blob: Vec = current_reference.get("announcement_signed"); - let mut readable = Cursor::new(blob); + let mut readable = Cursor::new(&blob); let unsigned_node_announcement = NodeAnnouncement::read(&mut readable).unwrap().contents; let node_id = unsigned_node_announcement.node_id; @@ -506,7 +526,7 @@ pub(super) async fn fetch_node_updates(client: &Client, last_sync_time (*current_node_delta).last_details_before_seen.get_or_insert_with(|| { let address_set: HashSet = unsigned_node_announcement.addresses.into_iter().collect(); NodeDetails { - seen, + seen: Some(seen), features: unsigned_node_announcement.features, addresses: address_set, } @@ -520,62 +540,106 @@ pub(super) async fn fetch_node_updates(client: &Client, last_sync_time log_info!(logger, "Processed {} node announcement reference rows (delta size: {}) in {:?}", reference_row_count, delta_set.len(), start.elapsed()); + let current_timestamp = snapshot_reference_timestamp.unwrap_or(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()); + let reminder_inclusion_threshold_timestamp = current_timestamp.checked_sub(config::CHANNEL_REMINDER_AGE.as_secs()).unwrap() as u32; + let reminder_lookup_threshold_timestamp = current_timestamp.checked_sub(config::PRUNE_INTERVAL.as_secs()).unwrap() as u32; + + // this is the timestamp we need to fetch all relevant updates + let include_reminders = should_snapshot_include_reminders(last_sync_timestamp, current_timestamp, &logger); + let effective_threshold_timestamp = if include_reminders { + std::cmp::min(last_sync_timestamp, reminder_lookup_threshold_timestamp) as f64 + } else { + // If we include reminders, the decision logic is as follows: + // If the pre-sync update was more than 6 days ago, serialize in full. + // Otherwise: + // If the last mutation occurred after the last sync, serialize the mutated properties. + // Otherwise: + // If the last mutation occurred more than 6 days ago, serialize as a reminder. + // Otherwise, don't serialize at all. + last_sync_timestamp as f64 + }; + // get all the intermediate node updates // (to calculate the set of mutated fields for snapshotting, where intermediate updates may // have been omitted) + let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&node_ids, &effective_threshold_timestamp]; let intermediate_updates = client.query_raw(" SELECT announcement_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM node_announcements - WHERE seen >= TO_TIMESTAMP($1) + WHERE + public_key = ANY($1) AND + seen >= TO_TIMESTAMP($2) ORDER BY public_key ASC, timestamp DESC - ", [last_sync_timestamp_float]).await.unwrap(); + ", params).await.unwrap(); let mut pinned_updates = Box::pin(intermediate_updates); log_info!(logger, "Fetched intermediate node announcement rows in {:?}", start.elapsed()); let mut previous_node_id: Option = None; let mut intermediate_update_count = 0; + let mut has_address_set_changed = false; + let mut has_feature_set_changed = false; + let mut latest_mutation_timestamp = None; while let Some(row_res) = pinned_updates.next().await { let intermediate_update = row_res.unwrap(); intermediate_update_count += 1; let current_seen_timestamp = intermediate_update.get::<_, i64>("seen") as u32; let blob: Vec = intermediate_update.get("announcement_signed"); - let mut readable = Cursor::new(blob); + let mut readable = Cursor::new(&blob); let unsigned_node_announcement = NodeAnnouncement::read(&mut readable).unwrap().contents; let node_id = unsigned_node_announcement.node_id; - let is_previously_processed_node_id = Some(node_id) == previous_node_id; // get this node's address set let current_node_delta = delta_set.entry(node_id).or_insert(NodeDelta::default()); let address_set: HashSet = unsigned_node_announcement.addresses.into_iter().collect(); - // determine mutations + if previous_node_id != Some(node_id) { + // we're traversing a new node id, initialize the values + has_address_set_changed = false; + has_feature_set_changed = false; + latest_mutation_timestamp = None; + + // this is the highest timestamp value, so set the seen timestamp accordingly + current_node_delta.latest_details.as_mut().map(|mut d| d.seen.replace(current_seen_timestamp)); + } + if let Some(last_seen_update) = current_node_delta.last_details_before_seen.as_ref() { - if unsigned_node_announcement.features != last_seen_update.features { - current_node_delta.has_feature_set_changed = true; - } - if address_set != last_seen_update.addresses { - current_node_delta.has_address_set_changed = true; - } - } else if !is_previously_processed_node_id { - if current_node_delta.last_details_before_seen.is_none() { - if !address_set.is_empty() { - current_node_delta.has_address_set_changed = true; + { // determine the latest mutation timestamp + if address_set != last_seen_update.addresses { + has_address_set_changed = true; + if latest_mutation_timestamp.is_none() { + latest_mutation_timestamp = Some(current_seen_timestamp); + } } - if unsigned_node_announcement.features != NodeFeatures::empty() { - current_node_delta.has_feature_set_changed = true; + if unsigned_node_announcement.features != last_seen_update.features { + has_feature_set_changed = true; + if latest_mutation_timestamp.is_none() { + latest_mutation_timestamp = Some(current_seen_timestamp); + } } } - } - if !is_previously_processed_node_id { - (*current_node_delta).latest_details_after_seen.get_or_insert(NodeDetails { - seen: current_seen_timestamp, - features: unsigned_node_announcement.features, - addresses: address_set, - }); + if current_seen_timestamp >= last_sync_timestamp { + if has_address_set_changed || has_feature_set_changed { + // if the last mutation occurred since the last sync, send the mutation variant + current_node_delta.strategy = Some(NodeSerializationStrategy::Mutated(MutatedNodeProperties { + addresses: has_address_set_changed, + features: has_feature_set_changed, + })); + } + } else if include_reminders && latest_mutation_timestamp.unwrap_or(u32::MAX) <= reminder_inclusion_threshold_timestamp { + // only send a reminder if the latest mutation occurred at least 6 days ago + current_node_delta.strategy = Some(NodeSerializationStrategy::Reminder); + } + + // Note that we completely ignore the case when the last mutation occurred less than + // 6 days ago, but prior to the last sync. In that scenario, we send nothing. + + } else { + // absent any update that was seen prior to the last sync, send the full version + current_node_delta.strategy = Some(NodeSerializationStrategy::Full); } previous_node_id = Some(node_id); diff --git a/src/persistence.rs b/src/persistence.rs index 04c6b9a..0db3091 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -226,8 +226,8 @@ impl GossipPersister where L::Target: Logger { let timestamp = update.contents.timestamp as i64; - let direction = (update.contents.flags & 1) == 1; - let disable = (update.contents.flags & 2) > 0; + let direction = (update.contents.channel_flags & 1) == 1; + let disable = (update.contents.channel_flags & 2) > 0; let cltv_expiry_delta = update.contents.cltv_expiry_delta as i32; let htlc_minimum_msat = update.contents.htlc_minimum_msat as i64; @@ -281,7 +281,7 @@ impl GossipPersister where L::Target: Logger { ×tamp, #[cfg(test)] &_seen_timestamp, - &(update.contents.flags as i16), + &(update.contents.channel_flags as i16), &direction, &disable, &cltv_expiry_delta, diff --git a/src/serialization.rs b/src/serialization.rs index 5f11b27..decc580 100644 --- a/src/serialization.rs +++ b/src/serialization.rs @@ -93,12 +93,28 @@ impl UpdateSerialization { fn flags(&self) -> u8 { match self { UpdateSerialization::Full(latest_update)| - UpdateSerialization::Incremental(latest_update, _) => latest_update.flags, + UpdateSerialization::Incremental(latest_update, _) => latest_update.channel_flags, UpdateSerialization::Reminder(_, flags) => *flags, } } } +pub(super) struct MutatedNodeProperties { + pub(super) addresses: bool, + pub(super) features: bool, +} + +pub(super) enum NodeSerializationStrategy { + /// Only serialize the aspects of the node ID that have been mutated. Skip if they haven't been + Mutated(MutatedNodeProperties), + /// Whether or not the addresses or features have been mutated, serialize this node in full. It + /// may have been purged from the client. + Full, + /// This node ID has been seen recently enough to not have been pruned, and this update serves + /// solely the purpose of delaying any pruning, without applying any mutations + Reminder +} + struct FullUpdateValueHistograms { cltv_expiry_delta: HashMap, htlc_minimum_msat: HashMap, @@ -220,15 +236,32 @@ pub(super) fn serialize_delta_set(channel_delta_set: DeltaSet, node_delta_set: N serialization_set.full_update_defaults = default_update_values; - serialization_set.node_mutations = node_delta_set.into_iter().filter(|(_id, delta)| { - // either something changed, or this node is new - delta.has_feature_set_changed || delta.has_address_set_changed || delta.last_details_before_seen.is_none() + serialization_set.node_mutations = node_delta_set.into_iter().filter_map(|(id, mut delta)| { + if delta.strategy.is_none() { + return None; + } + if let Some(last_details_before_seen) = delta.last_details_before_seen.as_ref() { + if let Some(last_details_seen) = last_details_before_seen.seen { + if last_details_seen <= non_incremental_previous_update_threshold_timestamp { + delta.strategy = Some(NodeSerializationStrategy::Full) + } + } + Some((id, delta)) + } else { + None + } }).collect(); let mut node_feature_histogram: HashMap<&NodeFeatures, usize> = Default::default(); for (_id, delta) in serialization_set.node_mutations.iter() { - if delta.has_feature_set_changed || delta.last_details_before_seen.is_none() { - if let Some(latest_details) = delta.latest_details_after_seen.as_ref() { + // consider either full or feature-mutating serializations for histogram + let mut should_add_to_histogram = matches!(delta.strategy, Some(NodeSerializationStrategy::Full)); + if let Some(NodeSerializationStrategy::Mutated(mutation)) = delta.strategy.as_ref() { + should_add_to_histogram = mutation.features; + } + + if should_add_to_histogram { + if let Some(latest_details) = delta.latest_details.as_ref() { *node_feature_histogram.entry(&latest_details.features).or_insert(0) += 1; }; } diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 7290e6e..3fc37cc 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -118,7 +118,8 @@ fn generate_update(scid: u64, direction: bool, timestamp: u32, expiry_delta: u16 chain_hash: genesis_hash(), short_channel_id: scid, timestamp, - flags: 0 | flag_mask, + message_flags: 0, + channel_flags: flag_mask, cltv_expiry_delta: expiry_delta, htlc_minimum_msat: min_msat, htlc_maximum_msat: max_msat, @@ -346,19 +347,41 @@ async fn test_node_announcement_delta_detection() { let timestamp = current_time() - 10; { // seed the db + + { // necessary for the node announcements to be considered relevant + let announcement = generate_channel_announcement(1); + let update_1 = generate_update(1, false, timestamp, 0, 0, 0, 6, 0); + let update_2 = generate_update(1, true, timestamp, 0, 0, 0, 6, 0); + + network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); + network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap(); + network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap(); + + receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp))).await.unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp))).await.unwrap(); + } + let mut announcement = generate_node_announcement(None); - receiver.send(GossipMessage::NodeAnnouncement(announcement.clone(), Some(timestamp - 10))).await.unwrap(); - receiver.send(GossipMessage::NodeAnnouncement(announcement.clone(), Some(timestamp - 8))).await.unwrap(); + announcement.contents.timestamp = timestamp - 10; + network_graph_arc.update_node_from_unsigned_announcement(&announcement.contents).unwrap(); + receiver.send(GossipMessage::NodeAnnouncement(announcement.clone(), Some(announcement.contents.timestamp))).await.unwrap(); + announcement.contents.timestamp = timestamp - 8; + network_graph_arc.update_node_from_unsigned_announcement(&announcement.contents).unwrap(); + receiver.send(GossipMessage::NodeAnnouncement(announcement.clone(), Some(announcement.contents.timestamp))).await.unwrap(); { let mut current_announcement = generate_node_announcement(Some(SecretKey::from_slice(&[2; 32]).unwrap())); current_announcement.contents.features = NodeFeatures::from_be_bytes(vec![23, 48]); + current_announcement.contents.timestamp = timestamp; + network_graph_arc.update_node_from_unsigned_announcement(¤t_announcement.contents).unwrap(); receiver.send(GossipMessage::NodeAnnouncement(current_announcement, Some(timestamp))).await.unwrap(); } { let mut current_announcement = generate_node_announcement(Some(SecretKey::from_slice(&[3; 32]).unwrap())); current_announcement.contents.features = NodeFeatures::from_be_bytes(vec![22, 49]); + current_announcement.contents.timestamp = timestamp; receiver.send(GossipMessage::NodeAnnouncement(current_announcement, Some(timestamp))).await.unwrap(); } @@ -378,23 +401,11 @@ async fn test_node_announcement_delta_detection() { version: 3, port: 4, }); + announcement.contents.timestamp = timestamp; } + network_graph_arc.update_node_from_unsigned_announcement(&announcement.contents).unwrap(); receiver.send(GossipMessage::NodeAnnouncement(announcement, Some(timestamp))).await.unwrap(); - { // necessary for the node announcements to be considered relevant - let announcement = generate_channel_announcement(1); - let update_1 = generate_update(1, false, timestamp, 0, 0, 0, 6, 0); - let update_2 = generate_update(1, true, timestamp, 0, 0, 0, 6, 0); - - network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); - network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap(); - network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap(); - - receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap(); - receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp))).await.unwrap(); - receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp))).await.unwrap(); - } - drop(receiver); persister.persist_gossip().await; @@ -408,9 +419,9 @@ async fn test_node_announcement_delta_detection() { clean_test_db().await; assert_eq!(serialization.message_count, 3); - assert_eq!(serialization.node_announcement_count, 3); - assert_eq!(serialization.node_update_count, 3); - assert_eq!(serialization.node_feature_update_count, 3); + assert_eq!(serialization.node_announcement_count, 2); + assert_eq!(serialization.node_update_count, 1); + assert_eq!(serialization.node_feature_update_count, 1); assert_eq!(serialization.node_address_update_count, 1); }