Skip to content

Commit

Permalink
Merge pull request #84 from arik-so/node_reminders_v2
Browse files Browse the repository at this point in the history
Node Reminders
  • Loading branch information
TheBlueMatt authored Sep 18, 2024
2 parents c70cacc + d7a9d62 commit dcb1d49
Show file tree
Hide file tree
Showing 8 changed files with 279 additions and 149 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ jobs:
toolchain: ${{ matrix.toolchain }}
override: true
profile: minimal
- name: Pin dependencies
if: ${{ matrix.toolchain == '1.63.0' }}
run: |
cargo update -p tokio --precise "1.37.0" --verbose
cargo update -p tokio-macros --precise "2.2.0" --verbose
cargo update -p postgres-types --precise "0.2.6" --verbose
- name: Build on Rust ${{ matrix.toolchain }}
run: |
cargo build --verbose --color always
Expand Down
14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@ version = "0.1.0"
edition = "2021"

[dependencies]
bitcoin = "0.30"
hex-conservative = "0.2"
lightning = { version = "0.0.123" }
lightning-block-sync = { version = "0.0.123", features=["rest-client"] }
lightning-net-tokio = { version = "0.0.123" }
bitcoin = "0.32.2"
hex-conservative = "0.2.1"
lightning = { version = "0.0.124" }
lightning-block-sync = { version = "0.0.124", features=["rest-client"] }
lightning-net-tokio = { version = "0.0.124" }
tokio = { version = "1.25", features = ["full"] }
tokio-postgres = { version = "=0.7.5" }
futures = "0.3"

[dev-dependencies]
lightning = { version = "0.0.123", features = ["_test_utils"] }
lightning-rapid-gossip-sync = { version = "0.0.123" }
lightning = { version = "0.0.124", features = ["_test_utils"] }
lightning-rapid-gossip-sync = { version = "0.0.124" }

[profile.dev]
panic = "abort"
Expand Down
8 changes: 6 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::hex_utils;

use std::env;
use std::io::Cursor;
use std::net::{SocketAddr, ToSocketAddrs};
use std::time::Duration;

use bitcoin::io::Cursor;
use bitcoin::Network;
use bitcoin::hashes::hex::FromHex;
use bitcoin::secp256k1::PublicKey;
Expand All @@ -23,6 +23,10 @@ pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks
/// updates in both directions.
pub(crate) const CHANNEL_REMINDER_AGE: Duration = Duration::from_secs(6 * 24 * 60 * 60);

/// The interval after which graph data gets pruned after it was first seen
/// This should match the LDK default pruning interval, which is 14 days
pub(crate) const PRUNE_INTERVAL: Duration = Duration::from_secs(14 * 24 * 60 * 60);

/// Maximum number of default features to calculate for node announcements
pub(crate) const NODE_DEFAULT_FEATURE_COUNT: u8 = 6;

Expand Down Expand Up @@ -219,7 +223,7 @@ pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client)
let announcement: Vec<u8> = row.get("announcement_signed");
let tx_ref = &tx;
updates.push(async move {
let scid = ChannelAnnouncement::read(&mut Cursor::new(announcement)).unwrap().contents.short_channel_id as i64;
let scid = ChannelAnnouncement::read(&mut Cursor::new(&announcement)).unwrap().contents.short_channel_id as i64;
assert!(scid > 0); // Will roll over in some 150 years or so
tx_ref.execute("UPDATE channel_announcements SET short_channel_id = $1 WHERE id = $2", &[&scid, &id]).await.unwrap();
});
Expand Down
94 changes: 53 additions & 41 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::config::SYMLINK_GRANULARITY_INTERVAL;
use crate::lookup::DeltaSet;

use crate::persistence::GossipPersister;
use crate::serialization::{SerializationSet, UpdateSerialization};
use crate::serialization::{MutatedNodeProperties, NodeSerializationStrategy, SerializationSet, UpdateSerialization};
use crate::snapshot::Snapshotter;
use crate::types::RGSSLogger;

Expand Down Expand Up @@ -187,11 +187,11 @@ async fn calculate_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>,
// for announcement-free incremental-only updates, chain hash can be skipped

let mut delta_set = DeltaSet::new();
lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp, snapshot_reference_timestamp, logger.clone()).await;
lookup::fetch_channel_announcements(&mut delta_set, Arc::clone(&network_graph), &client, last_sync_timestamp, snapshot_reference_timestamp, logger.clone()).await;
log_info!(logger, "announcement channel count: {}", delta_set.len());
lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp, logger.clone()).await;
log_info!(logger, "update-fetched channel count: {}", delta_set.len());
let node_delta_set = lookup::fetch_node_updates(&client, last_sync_timestamp, logger.clone()).await;
let node_delta_set = lookup::fetch_node_updates(network_graph, &client, last_sync_timestamp, snapshot_reference_timestamp, logger.clone()).await;
log_info!(logger, "update-fetched node count: {}", node_delta_set.len());
lookup::filter_delta_set(&mut delta_set, logger.clone());
log_info!(logger, "update-filtered channel count: {}", delta_set.len());
Expand Down Expand Up @@ -306,6 +306,9 @@ fn serialize_delta<L: Deref + Clone>(serialization_details: &SerializationSet, s

if serialization_version >= 2 {
if let Some(node_delta) = serialization_details.node_mutations.get(&current_node_id) {
let strategy = node_delta.strategy.as_ref().unwrap();
let mut node_has_update = false;

/*
Bitmap:
7: expect extra data after the pubkey (a u16 for the count, and then that number of bytes)
Expand All @@ -317,51 +320,60 @@ fn serialize_delta<L: Deref + Clone>(serialization_details: &SerializationSet, s
0: used for odd keys
*/

if node_delta.has_address_set_changed {
node_address_update_count += 1;

let address_set = &node_delta.latest_details_after_seen.as_ref().unwrap().addresses;
let mut address_serialization = Vec::new();

// we don't know a priori how many are <= 255 bytes
let mut total_address_count = 0u8;

for address in address_set.iter() {
if total_address_count == u8::MAX {
// don't serialize more than 255 addresses
break;
match strategy {
NodeSerializationStrategy::Mutated(MutatedNodeProperties { addresses: true, .. }) | NodeSerializationStrategy::Full => {
let address_set = &node_delta.latest_details.as_ref().unwrap().addresses;
let mut address_serialization = Vec::new();

// we don't know a priori how many are <= 255 bytes
let mut total_address_count = 0u8;

for address in address_set.iter() {
if total_address_count == u8::MAX {
// don't serialize more than 255 addresses
break;
}
if let Ok(serialized_length) = u8::try_from(address.serialized_length()) {
total_address_count += 1;
serialized_length.write(&mut address_serialization).unwrap();
address.write(&mut address_serialization).unwrap();
};
}
if let Ok(serialized_length) = u8::try_from(address.serialized_length()) {
total_address_count += 1;
serialized_length.write(&mut address_serialization).unwrap();
address.write(&mut address_serialization).unwrap();
};
}

// signal the presence of node addresses
current_node_delta_serialization[0] |= 1 << 2;
// serialize the actual addresses and count
total_address_count.write(&mut current_node_delta_serialization).unwrap();
current_node_delta_serialization.append(&mut address_serialization);
}

if node_delta.has_feature_set_changed {
node_feature_update_count += 1;
node_address_update_count += 1;
node_has_update = true;

let latest_features = &node_delta.latest_details_after_seen.as_ref().unwrap().features;
// signal the presence of node addresses
current_node_delta_serialization[0] |= 1 << 2;
// serialize the actual addresses and count
total_address_count.write(&mut current_node_delta_serialization).unwrap();
current_node_delta_serialization.append(&mut address_serialization);
},
_ => {}
}

// are these features among the most common ones?
if let Some(index) = serialization_details.node_announcement_feature_defaults.iter().position(|f| f == latest_features) {
// this feature set is among the 6 defaults
current_node_delta_serialization[0] |= ((index + 1) as u8) << 3;
} else {
current_node_delta_serialization[0] |= 0b_0011_1000; // 7 << 3
latest_features.write(&mut current_node_delta_serialization).unwrap();
}
match strategy {
NodeSerializationStrategy::Mutated(MutatedNodeProperties { features: true, .. }) | NodeSerializationStrategy::Full => {
let latest_features = &node_delta.latest_details.as_ref().unwrap().features;
node_feature_update_count += 1;
node_has_update = true;

// are these features among the most common ones?
if let Some(index) = serialization_details.node_announcement_feature_defaults.iter().position(|f| f == latest_features) {
// this feature set is among the 6 defaults
current_node_delta_serialization[0] |= ((index + 1) as u8) << 3;
} else {
current_node_delta_serialization[0] |= 0b_0011_1000; // 7 << 3
latest_features.write(&mut current_node_delta_serialization).unwrap();
}
},
_ => {}
}

if node_delta.has_address_set_changed || node_delta.has_feature_set_changed {
if node_has_update {
node_update_count += 1;
} else if let NodeSerializationStrategy::Reminder = strategy {
current_node_delta_serialization[0] |= 1 << 6;
}
}
}
Expand Down
Loading

0 comments on commit dcb1d49

Please sign in to comment.