Skip to content

Commit

Permalink
Merge pull request #79 from arik-so/persist-node-data
Browse files Browse the repository at this point in the history
Persist NodeAnnouncement Gossip
  • Loading branch information
TheBlueMatt authored May 15, 2024
2 parents a18c256 + f46a139 commit 6cb80e0
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 39 deletions.
10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ edition = "2021"
[dependencies]
bitcoin = "0.30"
hex-conservative = "0.2"
lightning = { version = "0.0.121" }
lightning-block-sync = { version = "0.0.121", features=["rest-client"] }
lightning-net-tokio = { version = "0.0.121" }
lightning = { version = "0.0.123" }
lightning-block-sync = { version = "0.0.123", features=["rest-client"] }
lightning-net-tokio = { version = "0.0.123" }
tokio = { version = "1.25", features = ["full"] }
tokio-postgres = { version = "=0.7.5" }
futures = "0.3"

[dev-dependencies]
lightning = { version = "0.0.121", features = ["_test_utils"] }
lightning-rapid-gossip-sync = { version = "0.0.121" }
lightning = { version = "0.0.123", features = ["_test_utils"] }
lightning-rapid-gossip-sync = { version = "0.0.123" }

[profile.dev]
panic = "abort"
Expand Down
14 changes: 13 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use lightning::util::ser::Readable;
use lightning_block_sync::http::HttpEndpoint;
use tokio_postgres::Config;

pub(crate) const SCHEMA_VERSION: i32 = 13;
pub(crate) const SCHEMA_VERSION: i32 = 14;
pub(crate) const SYMLINK_GRANULARITY_INTERVAL: u32 = 3600 * 3; // three hours
pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks
// generate symlinks based on a 3-hour-granularity
Expand Down Expand Up @@ -135,6 +135,18 @@ pub(crate) fn db_channel_update_table_creation_query() -> &'static str {
)"
}

pub(crate) fn db_node_announcement_table_creation_query() -> &'static str {
"CREATE TABLE IF NOT EXISTS node_announcements (
id SERIAL PRIMARY KEY,
public_key varchar(66) NOT NULL,
features BYTEA NOT NULL,
socket_addresses BYTEA NOT NULL,
timestamp bigint NOT NULL,
announcement_signed BYTEA,
seen timestamp NOT NULL DEFAULT NOW()
)"
}

pub(crate) fn db_index_creation_query() -> &'static str {
"
CREATE INDEX IF NOT EXISTS channel_updates_seen_scid ON channel_updates(seen, short_channel_id);
Expand Down
25 changes: 23 additions & 2 deletions src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::types::{GossipMessage, GossipChainAccess, GossipPeerManager};
use crate::verifier::ChainVerifier;

pub(crate) struct GossipCounter {
pub(crate) node_announcements: u64,
pub(crate) channel_announcements: u64,
pub(crate) channel_updates: u64,
pub(crate) channel_updates_without_htlc_max_msats: u64,
Expand All @@ -23,6 +24,7 @@ pub(crate) struct GossipCounter {
impl GossipCounter {
pub(crate) fn new() -> Self {
Self {
node_announcements: 0,
channel_announcements: 0,
channel_updates: 0,
channel_updates_without_htlc_max_msats: 0,
Expand Down Expand Up @@ -71,6 +73,21 @@ impl<L: Deref + Clone + Send + Sync> GossipRouter<L> where L::Target: Logger {
}
}

fn new_node_announcement(&self, msg: NodeAnnouncement) {
{
let mut counter = self.counter.write().unwrap();
counter.node_announcements += 1;
}

let gossip_message = GossipMessage::NodeAnnouncement(msg, None);
if let Err(err) = self.sender.try_send(gossip_message) {
let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
self.sender.send(gossip_message).await.unwrap();
})});
}
}

fn new_channel_update(&self, msg: ChannelUpdate) {
self.counter.write().unwrap().channel_updates += 1;
let gossip_message = GossipMessage::ChannelUpdate(msg, None);
Expand All @@ -92,7 +109,9 @@ impl<L: Deref + Clone + Send + Sync> MessageSendEventsProvider for GossipRouter<
MessageSendEvent::BroadcastChannelAnnouncement { msg, .. } => {
self.new_channel_announcement(msg);
},
MessageSendEvent::BroadcastNodeAnnouncement { .. } => {},
MessageSendEvent::BroadcastNodeAnnouncement { msg } => {
self.new_node_announcement(msg);
},
MessageSendEvent::BroadcastChannelUpdate { msg } => {
self.new_channel_update(msg);
},
Expand All @@ -105,7 +124,9 @@ impl<L: Deref + Clone + Send + Sync> MessageSendEventsProvider for GossipRouter<

impl<L: Deref + Clone + Send + Sync> RoutingMessageHandler for GossipRouter<L> where L::Target: Logger {
fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
self.native_router.handle_node_announcement(msg)
let res = self.native_router.handle_node_announcement(msg)?;
self.new_node_announcement(msg.clone());
Ok(res)
}

fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result<bool, LightningError> {
Expand Down
80 changes: 66 additions & 14 deletions src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,20 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
panic!("db init error: {}", initialization_error);
}

let initialization = client
.execute(config::db_announcement_table_creation_query(), &[])
.await;
if let Err(initialization_error) = initialization {
panic!("db init error: {}", initialization_error);
}
let table_creation_queries = [
config::db_announcement_table_creation_query(),
config::db_channel_update_table_creation_query(),
config::db_channel_update_table_creation_query(),
config::db_node_announcement_table_creation_query()
];

let initialization = client
.execute(
config::db_channel_update_table_creation_query(),
&[],
)
.await;
if let Err(initialization_error) = initialization {
panic!("db init error: {}", initialization_error);
for current_table_creation_query in table_creation_queries {
let initialization = client
.execute(current_table_creation_query, &[])
.await;
if let Err(initialization_error) = initialization {
panic!("db init error: {}", initialization_error);
}
}

let initialization = client
Expand Down Expand Up @@ -133,6 +132,59 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {

let connections_cache_ref = Arc::clone(&connections_cache);
match gossip_message {
GossipMessage::NodeAnnouncement(announcement, seen_override) => {
let public_key_hex = announcement.contents.node_id.to_string();

let mut announcement_signed = Vec::new();
announcement.write(&mut announcement_signed).unwrap();

let features = announcement.contents.features.encode();
let timestamp = announcement.contents.timestamp as i64;

let mut serialized_addresses = Vec::new();
announcement.contents.addresses.write(&mut serialized_addresses).unwrap();

let _task = self.tokio_runtime.spawn(async move {
if cfg!(test) && seen_override.is_some() {
tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute("INSERT INTO node_announcements (\
public_key, \
features, \
socket_addresses, \
timestamp, \
announcement_signed, \
seen \
) VALUES ($1, $2, $3, $4, $5, TO_TIMESTAMP($6))", &[
&public_key_hex,
&features,
&serialized_addresses,
&timestamp,
&announcement_signed,
&(seen_override.unwrap() as f64)
])).await.unwrap().unwrap();
} else {
tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute("INSERT INTO node_announcements (\
public_key, \
features, \
socket_addresses, \
timestamp, \
announcement_signed \
) VALUES ($1, $2, $3, $4, $5)", &[
&public_key_hex,
&features,
&serialized_addresses,
&timestamp,
&announcement_signed,
])).await.unwrap().unwrap();
}
let mut connections_set = connections_cache_ref.lock().await;
connections_set.push(client);
limiter_ref.add_permits(1);
});
#[cfg(test)]
tasks_spawned.push(_task);
},
GossipMessage::ChannelAnnouncement(announcement, seen_override) => {
let scid = announcement.contents.short_channel_id as i64;

Expand Down
Loading

0 comments on commit 6cb80e0

Please sign in to comment.