diff --git a/README.md b/README.md index df072999d..22ef1a1b2 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ use std::str::FromStr; fn main() { let mut builder = Builder::new(); builder.set_network(Network::Testnet); - builder.set_esplora_server("https://blockstream.info/testnet/api".to_string()); + builder.set_chain_source_esplora("https://blockstream.info/testnet/api".to_string(), None); builder.set_gossip_source_rgs("https://rapidsync.lightningdevkit.org/testnet/snapshot".to_string()); let node = builder.build().unwrap(); diff --git a/bindings/kotlin/ldk-node-jvm/lib/src/test/kotlin/org/lightningdevkit/ldknode/LibraryTest.kt b/bindings/kotlin/ldk-node-jvm/lib/src/test/kotlin/org/lightningdevkit/ldknode/LibraryTest.kt index e2bcd4c89..786534b84 100644 --- a/bindings/kotlin/ldk-node-jvm/lib/src/test/kotlin/org/lightningdevkit/ldknode/LibraryTest.kt +++ b/bindings/kotlin/ldk-node-jvm/lib/src/test/kotlin/org/lightningdevkit/ldknode/LibraryTest.kt @@ -130,9 +130,9 @@ class LibraryTest { println("Config 2: $config2") val builder1 = Builder.fromConfig(config1) - builder1.setEsploraServer(esploraEndpoint) + builder1.setChainSourceEsplora(esploraEndpoint, null) val builder2 = Builder.fromConfig(config2) - builder2.setEsploraServer(esploraEndpoint) + builder2.setChainSourceEsplora(esploraEndpoint, null) val node1 = builder1.build() val node2 = builder2.build() diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 96490f2b7..b4fc7ec79 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -9,9 +9,6 @@ dictionary Config { Network network; sequence? listening_addresses; NodeAlias? node_alias; - u64 onchain_wallet_sync_interval_secs; - u64 wallet_sync_interval_secs; - u64 fee_rate_cache_update_interval_secs; sequence trusted_peers_0conf; u64 probing_liquidity_limit_multiplier; LogLevel log_level; @@ -24,6 +21,12 @@ dictionary AnchorChannelsConfig { u64 per_channel_reserve_sats; }; +dictionary EsploraSyncConfig { + u64 onchain_wallet_sync_interval_secs; + u64 lightning_wallet_sync_interval_secs; + u64 fee_rate_cache_update_interval_secs; +}; + interface Builder { constructor(); [Name=from_config] @@ -32,7 +35,7 @@ interface Builder { [Throws=BuildError] void set_entropy_seed_bytes(sequence seed_bytes); void set_entropy_bip39_mnemonic(Mnemonic mnemonic, string? passphrase); - void set_esplora_server(string esplora_server_url); + void set_chain_source_esplora(string server_url, EsploraSyncConfig? config); void set_gossip_source_p2p(); void set_gossip_source_rgs(string rgs_server_url); void set_liquidity_source_lsps2(SocketAddress address, PublicKey node_id, string? token); @@ -218,11 +221,12 @@ dictionary NodeStatus { boolean is_running; boolean is_listening; BestBlock current_best_block; - u64? latest_wallet_sync_timestamp; + u64? latest_lightning_wallet_sync_timestamp; u64? latest_onchain_wallet_sync_timestamp; u64? latest_fee_rate_cache_update_timestamp; u64? latest_rgs_snapshot_timestamp; u64? latest_node_announcement_broadcast_timestamp; + u32? latest_channel_monitor_archival_height; }; dictionary BestBlock { diff --git a/bindings/python/src/ldk_node/test_ldk_node.py b/bindings/python/src/ldk_node/test_ldk_node.py index 4f2931440..82c493e32 100644 --- a/bindings/python/src/ldk_node/test_ldk_node.py +++ b/bindings/python/src/ldk_node/test_ldk_node.py @@ -84,7 +84,7 @@ def setup_node(tmp_dir, esplora_endpoint, listening_addresses): config = default_config() builder = Builder.from_config(config) builder.set_storage_dir_path(tmp_dir) - builder.set_esplora_server(esplora_endpoint) + builder.set_chain_source_esplora(esplora_endpoint, None) builder.set_network(DEFAULT_TEST_NETWORK) builder.set_listening_addresses(listening_addresses) return builder.build() diff --git a/src/builder.rs b/src/builder.rs index f6b201c54..43171db1f 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -5,16 +5,15 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -use crate::config::{ - default_user_config, Config, DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS, DEFAULT_ESPLORA_SERVER_URL, - WALLET_KEYS_SEED_LEN, -}; +use crate::chain::{ChainSource, DEFAULT_ESPLORA_SERVER_URL}; +use crate::config::{default_user_config, Config, EsploraSyncConfig, WALLET_KEYS_SEED_LEN}; + use crate::connection::ConnectionManager; use crate::event::EventQueue; use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::GossipSource; -use crate::io; use crate::io::sqlite_store::SqliteStore; +use crate::io::utils::{read_node_metrics, write_node_metrics}; #[cfg(any(vss, vss_test))] use crate::io::vss_store::VssStore; use crate::liquidity::LiquiditySource; @@ -29,6 +28,7 @@ use crate::types::{ }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; +use crate::{io, NodeMetrics}; use crate::{LogLevel, Node}; use lightning::chain::{chainmonitor, BestBlock, Watch}; @@ -52,8 +52,6 @@ use lightning::util::sweep::OutputSweeper; use lightning_persister::fs_store::FilesystemStore; -use lightning_transaction_sync::EsploraSyncClient; - use lightning_liquidity::lsps2::client::LSPS2ClientConfig; use lightning_liquidity::{LiquidityClientConfig, LiquidityManager}; @@ -79,7 +77,7 @@ use std::time::SystemTime; #[derive(Debug, Clone)] enum ChainDataSourceConfig { - Esplora(String), + Esplora { server_url: String, sync_config: Option }, } #[derive(Debug, Clone)] @@ -239,8 +237,14 @@ impl NodeBuilder { } /// Configures the [`Node`] instance to source its chain data from the given Esplora server. - pub fn set_esplora_server(&mut self, esplora_server_url: String) -> &mut Self { - self.chain_data_source_config = Some(ChainDataSourceConfig::Esplora(esplora_server_url)); + /// + /// If no `sync_config` is given, default values are used. See [`EsploraSyncConfig`] for more + /// information. + pub fn set_chain_source_esplora( + &mut self, server_url: String, sync_config: Option, + ) -> &mut Self { + self.chain_data_source_config = + Some(ChainDataSourceConfig::Esplora { server_url, sync_config }); self } @@ -466,8 +470,13 @@ impl ArcedNodeBuilder { } /// Configures the [`Node`] instance to source its chain data from the given Esplora server. - pub fn set_esplora_server(&self, esplora_server_url: String) { - self.inner.write().unwrap().set_esplora_server(esplora_server_url); + /// + /// If no `sync_config` is given, default values are used. See [`EsploraSyncConfig`] for more + /// information. + pub fn set_chain_source_esplora( + &self, server_url: String, sync_config: Option, + ) { + self.inner.write().unwrap().set_chain_source_esplora(server_url, sync_config); } /// Configures the [`Node`] instance to source its gossip data from the Lightning peer-to-peer @@ -555,6 +564,19 @@ fn build_with_store_internal( liquidity_source_config: Option<&LiquiditySourceConfig>, seed_bytes: [u8; 64], logger: Arc, kv_store: Arc, ) -> Result { + // Initialize the status fields. + let is_listening = Arc::new(AtomicBool::new(false)); + let node_metrics = match read_node_metrics(Arc::clone(&kv_store), Arc::clone(&logger)) { + Ok(metrics) => Arc::new(RwLock::new(metrics)), + Err(e) => { + if e.kind() == std::io::ErrorKind::NotFound { + Arc::new(RwLock::new(NodeMetrics::default())) + } else { + return Err(BuildError::ReadFailed); + } + }, + }; + // Initialize the on-chain wallet and chain access let xprv = bitcoin::bip32::Xpriv::new_master(config.network, &seed_bytes).map_err(|e| { log_error!(logger, "Failed to derive master secret: {}", e); @@ -586,62 +608,54 @@ fn build_with_store_internal( })?, }; - let (esplora_client, tx_sync, tx_broadcaster, fee_estimator) = match chain_data_source_config { - Some(ChainDataSourceConfig::Esplora(server_url)) => { - let mut client_builder = esplora_client::Builder::new(&server_url.clone()); - client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS); - let esplora_client = client_builder.build_async().unwrap(); - let tx_sync = Arc::new(EsploraSyncClient::from_client( - esplora_client.clone(), - Arc::clone(&logger), - )); - let tx_broadcaster = Arc::new(TransactionBroadcaster::new( - tx_sync.client().clone(), - Arc::clone(&logger), - )); - let fee_estimator = Arc::new(OnchainFeeEstimator::new( - tx_sync.client().clone(), + let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger))); + let fee_estimator = Arc::new(OnchainFeeEstimator::new()); + let wallet = Arc::new(Wallet::new( + bdk_wallet, + wallet_persister, + Arc::clone(&tx_broadcaster), + Arc::clone(&fee_estimator), + Arc::clone(&logger), + )); + + let chain_source = match chain_data_source_config { + Some(ChainDataSourceConfig::Esplora { server_url, sync_config }) => { + let sync_config = sync_config.unwrap_or(EsploraSyncConfig::default()); + Arc::new(ChainSource::new_esplora( + server_url.clone(), + sync_config, + Arc::clone(&wallet), + Arc::clone(&fee_estimator), + Arc::clone(&tx_broadcaster), + Arc::clone(&kv_store), Arc::clone(&config), Arc::clone(&logger), - )); - (esplora_client, tx_sync, tx_broadcaster, fee_estimator) + Arc::clone(&node_metrics), + )) }, None => { // Default to Esplora client. let server_url = DEFAULT_ESPLORA_SERVER_URL.to_string(); - let mut client_builder = esplora_client::Builder::new(&server_url); - client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS); - let esplora_client = client_builder.build_async().unwrap(); - let tx_sync = Arc::new(EsploraSyncClient::from_client( - esplora_client.clone(), - Arc::clone(&logger), - )); - let tx_broadcaster = Arc::new(TransactionBroadcaster::new( - tx_sync.client().clone(), - Arc::clone(&logger), - )); - let fee_estimator = Arc::new(OnchainFeeEstimator::new( - tx_sync.client().clone(), + let sync_config = EsploraSyncConfig::default(); + Arc::new(ChainSource::new_esplora( + server_url.clone(), + sync_config, + Arc::clone(&wallet), + Arc::clone(&fee_estimator), + Arc::clone(&tx_broadcaster), + Arc::clone(&kv_store), Arc::clone(&config), Arc::clone(&logger), - )); - (esplora_client, tx_sync, tx_broadcaster, fee_estimator) + Arc::clone(&node_metrics), + )) }, }; let runtime = Arc::new(RwLock::new(None)); - let wallet = Arc::new(Wallet::new( - bdk_wallet, - wallet_persister, - esplora_client, - Arc::clone(&tx_broadcaster), - Arc::clone(&fee_estimator), - Arc::clone(&logger), - )); // Initialize the ChainMonitor let chain_monitor: Arc = Arc::new(chainmonitor::ChainMonitor::new( - Some(Arc::clone(&tx_sync)), + Some(Arc::clone(&chain_source)), Arc::clone(&tx_broadcaster), Arc::clone(&logger), Arc::clone(&fee_estimator), @@ -824,23 +838,24 @@ fn build_with_store_internal( Arc::new(GossipSource::new_p2p(Arc::clone(&network_graph), Arc::clone(&logger))); // Reset the RGS sync timestamp in case we somehow switch gossip sources - io::utils::write_latest_rgs_sync_timestamp( - 0, - Arc::clone(&kv_store), - Arc::clone(&logger), - ) - .map_err(|e| { - log_error!(logger, "Failed writing to store: {}", e); - BuildError::WriteFailed - })?; + { + let mut locked_node_metrics = node_metrics.write().unwrap(); + locked_node_metrics.latest_rgs_snapshot_timestamp = None; + write_node_metrics( + &*locked_node_metrics, + Arc::clone(&kv_store), + Arc::clone(&logger), + ) + .map_err(|e| { + log_error!(logger, "Failed writing to store: {}", e); + BuildError::WriteFailed + })?; + } p2p_source }, GossipSourceConfig::RapidGossipSync(rgs_server) => { - let latest_sync_timestamp = io::utils::read_latest_rgs_sync_timestamp( - Arc::clone(&kv_store), - Arc::clone(&logger), - ) - .unwrap_or(0); + let latest_sync_timestamp = + node_metrics.read().unwrap().latest_rgs_snapshot_timestamp.unwrap_or(0); Arc::new(GossipSource::new_rgs( rgs_server.clone(), latest_sync_timestamp, @@ -857,7 +872,7 @@ fn build_with_store_internal( let liquidity_manager = Arc::new(LiquidityManager::new( Arc::clone(&keys_manager), Arc::clone(&channel_manager), - Some(Arc::clone(&tx_sync)), + Some(Arc::clone(&chain_source)), None, None, liquidity_client_config, @@ -925,7 +940,7 @@ fn build_with_store_internal( let output_sweeper = match io::utils::read_output_sweeper( Arc::clone(&tx_broadcaster), Arc::clone(&fee_estimator), - Arc::clone(&tx_sync), + Arc::clone(&chain_source), Arc::clone(&keys_manager), Arc::clone(&kv_store), Arc::clone(&logger), @@ -937,7 +952,7 @@ fn build_with_store_internal( channel_manager.current_best_block(), Arc::clone(&tx_broadcaster), Arc::clone(&fee_estimator), - Some(Arc::clone(&tx_sync)), + Some(Arc::clone(&chain_source)), Arc::clone(&keys_manager), Arc::clone(&keys_manager), Arc::clone(&kv_store), @@ -999,23 +1014,14 @@ fn build_with_store_internal( let (stop_sender, _) = tokio::sync::watch::channel(()); let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(()); - let is_listening = Arc::new(AtomicBool::new(false)); - let latest_wallet_sync_timestamp = Arc::new(RwLock::new(None)); - let latest_onchain_wallet_sync_timestamp = Arc::new(RwLock::new(None)); - let latest_fee_rate_cache_update_timestamp = Arc::new(RwLock::new(None)); - let latest_rgs_snapshot_timestamp = Arc::new(RwLock::new(None)); - let latest_node_announcement_broadcast_timestamp = Arc::new(RwLock::new(None)); - let latest_channel_monitor_archival_height = Arc::new(RwLock::new(None)); - Ok(Node { runtime, stop_sender, event_handling_stopped_sender, config, wallet, - tx_sync, + chain_source, tx_broadcaster, - fee_estimator, event_queue, channel_manager, chain_monitor, @@ -1034,12 +1040,7 @@ fn build_with_store_internal( peer_store, payment_store, is_listening, - latest_wallet_sync_timestamp, - latest_onchain_wallet_sync_timestamp, - latest_fee_rate_cache_update_timestamp, - latest_rgs_snapshot_timestamp, - latest_node_announcement_broadcast_timestamp, - latest_channel_monitor_archival_height, + node_metrics, }) } diff --git a/src/chain/mod.rs b/src/chain/mod.rs new file mode 100644 index 000000000..7501c9809 --- /dev/null +++ b/src/chain/mod.rs @@ -0,0 +1,619 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use crate::config::{ + Config, EsploraSyncConfig, BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP, + BDK_WALLET_SYNC_TIMEOUT_SECS, FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS, LDK_WALLET_SYNC_TIMEOUT_SECS, + RESOLVED_CHANNEL_MONITOR_ARCHIVAL_INTERVAL, TX_BROADCAST_TIMEOUT_SECS, + WALLET_SYNC_INTERVAL_MINIMUM_SECS, +}; +use crate::fee_estimator::{ + apply_post_estimation_adjustments, get_all_conf_targets, get_num_block_defaults_for_target, + OnchainFeeEstimator, +}; +use crate::io::utils::write_node_metrics; +use crate::logger::{log_bytes, log_error, log_info, log_trace, FilesystemLogger, Logger}; +use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; +use crate::{Error, NodeMetrics}; + +use lightning::chain::{Confirm, Filter}; +use lightning::util::ser::Writeable; + +use lightning_transaction_sync::EsploraSyncClient; + +use bdk_esplora::EsploraAsyncExt; + +use esplora_client::AsyncClient as EsploraAsyncClient; + +use bitcoin::{FeeRate, Network}; + +use std::collections::HashMap; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; + +// The default Esplora server we're using. +pub(crate) const DEFAULT_ESPLORA_SERVER_URL: &str = "https://blockstream.info/api"; + +// The default Esplora client timeout we're using. +pub(crate) const DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS: u64 = 10; + +pub(crate) enum WalletSyncStatus { + Completed, + InProgress { subscribers: tokio::sync::broadcast::Sender> }, +} + +impl WalletSyncStatus { + fn register_or_subscribe_pending_sync( + &mut self, + ) -> Option>> { + match self { + WalletSyncStatus::Completed => { + // We're first to register for a sync. + let (tx, _) = tokio::sync::broadcast::channel(1); + *self = WalletSyncStatus::InProgress { subscribers: tx }; + None + }, + WalletSyncStatus::InProgress { subscribers } => { + // A sync is in-progress, we subscribe. + let rx = subscribers.subscribe(); + Some(rx) + }, + } + } + + fn propagate_result_to_subscribers(&mut self, res: Result<(), Error>) { + // Send the notification to any other tasks that might be waiting on it by now. + { + match self { + WalletSyncStatus::Completed => { + // No sync in-progress, do nothing. + return; + }, + WalletSyncStatus::InProgress { subscribers } => { + // A sync is in-progress, we notify subscribers. + if subscribers.receiver_count() > 0 { + match subscribers.send(res) { + Ok(_) => (), + Err(e) => { + debug_assert!( + false, + "Failed to send wallet sync result to subscribers: {:?}", + e + ); + }, + } + } + *self = WalletSyncStatus::Completed; + }, + } + } + } +} + +pub(crate) enum ChainSource { + Esplora { + sync_config: EsploraSyncConfig, + esplora_client: EsploraAsyncClient, + onchain_wallet: Arc, + onchain_wallet_sync_status: Mutex, + tx_sync: Arc>>, + lightning_wallet_sync_status: Mutex, + fee_estimator: Arc, + tx_broadcaster: Arc, + kv_store: Arc, + config: Arc, + logger: Arc, + node_metrics: Arc>, + }, +} + +impl ChainSource { + pub(crate) fn new_esplora( + server_url: String, sync_config: EsploraSyncConfig, onchain_wallet: Arc, + fee_estimator: Arc, tx_broadcaster: Arc, + kv_store: Arc, config: Arc, logger: Arc, + node_metrics: Arc>, + ) -> Self { + let mut client_builder = esplora_client::Builder::new(&server_url); + client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS); + let esplora_client = client_builder.build_async().unwrap(); + let tx_sync = + Arc::new(EsploraSyncClient::from_client(esplora_client.clone(), Arc::clone(&logger))); + let onchain_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed); + let lightning_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed); + Self::Esplora { + sync_config, + esplora_client, + onchain_wallet, + onchain_wallet_sync_status, + tx_sync, + lightning_wallet_sync_status, + fee_estimator, + tx_broadcaster, + kv_store, + config, + logger, + node_metrics, + } + } + + pub(crate) async fn continuously_sync_wallets( + &self, mut stop_sync_receiver: tokio::sync::watch::Receiver<()>, + channel_manager: Arc, chain_monitor: Arc, + output_sweeper: Arc, + ) { + match self { + Self::Esplora { sync_config, logger, .. } => { + // Setup syncing intervals + let onchain_wallet_sync_interval_secs = sync_config + .onchain_wallet_sync_interval_secs + .max(WALLET_SYNC_INTERVAL_MINIMUM_SECS); + let mut onchain_wallet_sync_interval = + tokio::time::interval(Duration::from_secs(onchain_wallet_sync_interval_secs)); + onchain_wallet_sync_interval + .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + let fee_rate_cache_update_interval_secs = sync_config + .fee_rate_cache_update_interval_secs + .max(WALLET_SYNC_INTERVAL_MINIMUM_SECS); + let mut fee_rate_update_interval = + tokio::time::interval(Duration::from_secs(fee_rate_cache_update_interval_secs)); + // When starting up, we just blocked on updating, so skip the first tick. + fee_rate_update_interval.reset(); + fee_rate_update_interval + .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + let lightning_wallet_sync_interval_secs = sync_config + .lightning_wallet_sync_interval_secs + .max(WALLET_SYNC_INTERVAL_MINIMUM_SECS); + let mut lightning_wallet_sync_interval = + tokio::time::interval(Duration::from_secs(lightning_wallet_sync_interval_secs)); + lightning_wallet_sync_interval + .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + // Start the syncing loop. + loop { + tokio::select! { + _ = stop_sync_receiver.changed() => { + log_trace!( + logger, + "Stopping background syncing on-chain wallet.", + ); + return; + } + _ = onchain_wallet_sync_interval.tick() => { + let _ = self.sync_onchain_wallet().await; + } + _ = fee_rate_update_interval.tick() => { + let _ = self.update_fee_rate_estimates().await; + } + _ = lightning_wallet_sync_interval.tick() => { + let _ = self.sync_lightning_wallet( + Arc::clone(&channel_manager), + Arc::clone(&chain_monitor), + Arc::clone(&output_sweeper), + ).await; + } + } + } + }, + } + } + + pub(crate) async fn sync_onchain_wallet(&self) -> Result<(), Error> { + match self { + Self::Esplora { + esplora_client, + onchain_wallet, + onchain_wallet_sync_status, + kv_store, + logger, + node_metrics, + .. + } => { + let receiver_res = { + let mut status_lock = onchain_wallet_sync_status.lock().unwrap(); + status_lock.register_or_subscribe_pending_sync() + }; + if let Some(mut sync_receiver) = receiver_res { + log_info!(logger, "Sync in progress, skipping."); + return sync_receiver.recv().await.map_err(|e| { + debug_assert!(false, "Failed to receive wallet sync result: {:?}", e); + log_error!(logger, "Failed to receive wallet sync result: {:?}", e); + Error::WalletOperationFailed + })?; + } + + let res = { + // If this is our first sync, do a full scan with the configured gap limit. + // Otherwise just do an incremental sync. + let incremental_sync = + node_metrics.read().unwrap().latest_onchain_wallet_sync_timestamp.is_some(); + + macro_rules! get_and_apply_wallet_update { + ($sync_future: expr) => {{ + let now = Instant::now(); + match $sync_future.await { + Ok(res) => match res { + Ok(update) => match onchain_wallet.apply_update(update) { + Ok(()) => { + log_info!( + logger, + "{} of on-chain wallet finished in {}ms.", + if incremental_sync { "Incremental sync" } else { "Sync" }, + now.elapsed().as_millis() + ); + let unix_time_secs_opt = SystemTime::now() + .duration_since(UNIX_EPOCH) + .ok() + .map(|d| d.as_secs()); + { + let mut locked_node_metrics = node_metrics.write().unwrap(); + locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt; + write_node_metrics(&*locked_node_metrics, Arc::clone(&kv_store), Arc::clone(&logger))?; + } + Ok(()) + }, + Err(e) => Err(e), + }, + Err(e) => match *e { + esplora_client::Error::Reqwest(he) => { + log_error!( + logger, + "{} of on-chain wallet failed due to HTTP connection error: {}", + if incremental_sync { "Incremental sync" } else { "Sync" }, + he + ); + Err(Error::WalletOperationFailed) + }, + _ => { + log_error!( + logger, + "{} of on-chain wallet failed due to Esplora error: {}", + if incremental_sync { "Incremental sync" } else { "Sync" }, + e + ); + Err(Error::WalletOperationFailed) + }, + }, + }, + Err(e) => { + log_error!( + logger, + "{} of on-chain wallet timed out: {}", + if incremental_sync { "Incremental sync" } else { "Sync" }, + e + ); + Err(Error::WalletOperationTimeout) + }, + } + }} + } + + if incremental_sync { + let full_scan_request = onchain_wallet.get_full_scan_request(); + let wallet_sync_timeout_fut = tokio::time::timeout( + Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), + esplora_client.full_scan( + full_scan_request, + BDK_CLIENT_STOP_GAP, + BDK_CLIENT_CONCURRENCY, + ), + ); + get_and_apply_wallet_update!(wallet_sync_timeout_fut) + } else { + let sync_request = onchain_wallet.get_incremental_sync_request(); + let wallet_sync_timeout_fut = tokio::time::timeout( + Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), + esplora_client.sync(sync_request, BDK_CLIENT_CONCURRENCY), + ); + get_and_apply_wallet_update!(wallet_sync_timeout_fut) + } + }; + + onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); + + res + }, + } + } + + pub(crate) async fn sync_lightning_wallet( + &self, channel_manager: Arc, chain_monitor: Arc, + output_sweeper: Arc, + ) -> Result<(), Error> { + match self { + Self::Esplora { + tx_sync, + lightning_wallet_sync_status, + kv_store, + logger, + node_metrics, + .. + } => { + let sync_cman = Arc::clone(&channel_manager); + let sync_cmon = Arc::clone(&chain_monitor); + let sync_sweeper = Arc::clone(&output_sweeper); + let confirmables = vec![ + &*sync_cman as &(dyn Confirm + Sync + Send), + &*sync_cmon as &(dyn Confirm + Sync + Send), + &*sync_sweeper as &(dyn Confirm + Sync + Send), + ]; + + let receiver_res = { + let mut status_lock = lightning_wallet_sync_status.lock().unwrap(); + status_lock.register_or_subscribe_pending_sync() + }; + if let Some(mut sync_receiver) = receiver_res { + log_info!(logger, "Sync in progress, skipping."); + return sync_receiver.recv().await.map_err(|e| { + debug_assert!(false, "Failed to receive wallet sync result: {:?}", e); + log_error!(logger, "Failed to receive wallet sync result: {:?}", e); + Error::WalletOperationFailed + })?; + } + let res = { + let timeout_fut = tokio::time::timeout( + Duration::from_secs(LDK_WALLET_SYNC_TIMEOUT_SECS), + tx_sync.sync(confirmables), + ); + let now = Instant::now(); + match timeout_fut.await { + Ok(res) => match res { + Ok(()) => { + log_info!( + logger, + "Sync of Lightning wallet finished in {}ms.", + now.elapsed().as_millis() + ); + + let unix_time_secs_opt = SystemTime::now() + .duration_since(UNIX_EPOCH) + .ok() + .map(|d| d.as_secs()); + { + let mut locked_node_metrics = node_metrics.write().unwrap(); + locked_node_metrics.latest_lightning_wallet_sync_timestamp = + unix_time_secs_opt; + write_node_metrics( + &*locked_node_metrics, + Arc::clone(&kv_store), + Arc::clone(&logger), + )?; + } + + periodically_archive_fully_resolved_monitors( + Arc::clone(&channel_manager), + Arc::clone(&chain_monitor), + Arc::clone(&kv_store), + Arc::clone(&logger), + Arc::clone(&node_metrics), + )?; + Ok(()) + }, + Err(e) => { + log_error!(logger, "Sync of Lightning wallet failed: {}", e); + Err(e.into()) + }, + }, + Err(e) => { + log_error!(logger, "Lightning wallet sync timed out: {}", e); + Err(Error::TxSyncTimeout) + }, + } + }; + + lightning_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); + + res + }, + } + } + + pub(crate) async fn update_fee_rate_estimates(&self) -> Result<(), Error> { + match self { + Self::Esplora { + esplora_client, + fee_estimator, + config, + kv_store, + logger, + node_metrics, + .. + } => { + let now = Instant::now(); + let estimates = tokio::time::timeout( + Duration::from_secs(FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS), + esplora_client.get_fee_estimates(), + ) + .await + .map_err(|e| { + log_error!(logger, "Updating fee rate estimates timed out: {}", e); + Error::FeerateEstimationUpdateTimeout + })? + .map_err(|e| { + log_error!(logger, "Failed to retrieve fee rate estimates: {}", e); + Error::FeerateEstimationUpdateFailed + })?; + + if estimates.is_empty() && config.network == Network::Bitcoin { + // Ensure we fail if we didn't receive any estimates. + log_error!( + logger, + "Failed to retrieve fee rate estimates: empty fee estimates are dissallowed on Mainnet.", + ); + return Err(Error::FeerateEstimationUpdateFailed); + } + + let confirmation_targets = get_all_conf_targets(); + + let mut new_fee_rate_cache = HashMap::with_capacity(10); + for target in confirmation_targets { + let num_blocks = get_num_block_defaults_for_target(target); + + let converted_estimate_sat_vb = + esplora_client::convert_fee_rate(num_blocks, estimates.clone()).map_err( + |e| { + log_error!( + logger, + "Failed to convert fee rate estimates for {:?}: {}", + target, + e + ); + Error::FeerateEstimationUpdateFailed + }, + )?; + + let fee_rate = + FeeRate::from_sat_per_kwu((converted_estimate_sat_vb * 250.0) as u64); + + // LDK 0.0.118 introduced changes to the `ConfirmationTarget` semantics that + // require some post-estimation adjustments to the fee rates, which we do here. + let adjusted_fee_rate = apply_post_estimation_adjustments(target, fee_rate); + + new_fee_rate_cache.insert(target, adjusted_fee_rate); + + log_trace!( + logger, + "Fee rate estimation updated for {:?}: {} sats/kwu", + target, + adjusted_fee_rate.to_sat_per_kwu(), + ); + } + + fee_estimator.set_fee_rate_cache(new_fee_rate_cache); + + log_info!( + logger, + "Fee rate cache update finished in {}ms.", + now.elapsed().as_millis() + ); + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + { + let mut locked_node_metrics = node_metrics.write().unwrap(); + locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt; + write_node_metrics( + &*locked_node_metrics, + Arc::clone(&kv_store), + Arc::clone(&logger), + )?; + } + + Ok(()) + }, + } + } + + pub(crate) async fn process_broadcast_queue(&self) { + match self { + Self::Esplora { esplora_client, tx_broadcaster, logger, .. } => { + let mut receiver = tx_broadcaster.get_broadcast_queue().await; + while let Some(next_package) = receiver.recv().await { + for tx in &next_package { + let txid = tx.compute_txid(); + let timeout_fut = tokio::time::timeout( + Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS), + esplora_client.broadcast(tx), + ); + match timeout_fut.await { + Ok(res) => match res { + Ok(()) => { + log_trace!( + logger, + "Successfully broadcast transaction {}", + txid + ); + }, + Err(e) => match e { + esplora_client::Error::Reqwest(err) => { + if err.status() == reqwest::StatusCode::from_u16(400).ok() { + // Ignore 400, as this just means bitcoind already knows the + // transaction. + // FIXME: We can further differentiate here based on the error + // message which will be available with rust-esplora-client 0.7 and + // later. + } else { + log_error!( + logger, + "Failed to broadcast due to HTTP connection error: {}", + err + ); + } + log_trace!( + logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx.encode()) + ); + }, + _ => { + log_error!( + logger, + "Failed to broadcast transaction {}: {}", + txid, + e + ); + log_trace!( + logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx.encode()) + ); + }, + }, + }, + Err(e) => { + log_error!( + logger, + "Failed to broadcast transaction due to timeout {}: {}", + txid, + e + ); + log_trace!( + logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx.encode()) + ); + }, + } + } + } + }, + } + } +} + +impl Filter for ChainSource { + fn register_tx(&self, txid: &bitcoin::Txid, script_pubkey: &bitcoin::Script) { + match self { + Self::Esplora { tx_sync, .. } => tx_sync.register_tx(txid, script_pubkey), + } + } + fn register_output(&self, output: lightning::chain::WatchedOutput) { + match self { + Self::Esplora { tx_sync, .. } => tx_sync.register_output(output), + } + } +} + +fn periodically_archive_fully_resolved_monitors( + channel_manager: Arc, chain_monitor: Arc, + kv_store: Arc, logger: Arc, node_metrics: Arc>, +) -> Result<(), Error> { + let mut locked_node_metrics = node_metrics.write().unwrap(); + let cur_height = channel_manager.current_best_block().height; + let should_archive = locked_node_metrics + .latest_channel_monitor_archival_height + .as_ref() + .map_or(true, |h| cur_height >= h + RESOLVED_CHANNEL_MONITOR_ARCHIVAL_INTERVAL); + + if should_archive { + chain_monitor.archive_fully_resolved_channel_monitors(); + locked_node_metrics.latest_channel_monitor_archival_height = Some(cur_height); + write_node_metrics(&*locked_node_metrics, kv_store, logger)?; + } + Ok(()) +} diff --git a/src/config.rs b/src/config.rs index b69e73ecf..d82b64f32 100644 --- a/src/config.rs +++ b/src/config.rs @@ -5,10 +5,14 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. +//! Objects for configuring the node. + use crate::payment::SendingParameters; use lightning::ln::msgs::SocketAddress; use lightning::routing::gossip::NodeAlias; +use lightning::util::config::ChannelConfig as LdkChannelConfig; +use lightning::util::config::MaxDustHTLCExposure as LdkMaxDustHTLCExposure; use lightning::util::config::UserConfig; use lightning::util::logger::Level as LogLevel; @@ -34,12 +38,6 @@ pub(crate) const BDK_CLIENT_STOP_GAP: usize = 20; // The number of concurrent requests made against the API provider. pub(crate) const BDK_CLIENT_CONCURRENCY: usize = 4; -// The default Esplora server we're using. -pub(crate) const DEFAULT_ESPLORA_SERVER_URL: &str = "https://blockstream.info/api"; - -// The default Esplora client timeout we're using. -pub(crate) const DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS: u64 = 10; - // The timeout after which we abandon retrying failed payments. pub(crate) const LDK_PAYMENT_RETRY_TIMEOUT: Duration = Duration::from_secs(10); @@ -123,18 +121,6 @@ pub struct Config { /// **Note**: We will only allow opening and accepting public channels if the `node_alias` and the /// `listening_addresses` are set. pub node_alias: Option, - /// The time in-between background sync attempts of the onchain wallet, in seconds. - /// - /// **Note:** A minimum of 10 seconds is always enforced. - pub onchain_wallet_sync_interval_secs: u64, - /// The time in-between background sync attempts of the LDK wallet, in seconds. - /// - /// **Note:** A minimum of 10 seconds is always enforced. - pub wallet_sync_interval_secs: u64, - /// The time in-between background update attempts to our fee rate cache, in seconds. - /// - /// **Note:** A minimum of 10 seconds is always enforced. - pub fee_rate_cache_update_interval_secs: u64, /// A list of peers that we allow to establish zero confirmation channels to us. /// /// **Note:** Allowing payments via zero-confirmation channels is potentially insecure if the @@ -184,9 +170,6 @@ impl Default for Config { log_dir_path: None, network: DEFAULT_NETWORK, listening_addresses: None, - onchain_wallet_sync_interval_secs: DEFAULT_BDK_WALLET_SYNC_INTERVAL_SECS, - wallet_sync_interval_secs: DEFAULT_LDK_WALLET_SYNC_INTERVAL_SECS, - fee_rate_cache_update_interval_secs: DEFAULT_FEE_RATE_CACHE_UPDATE_INTERVAL_SECS, trusted_peers_0conf: Vec::new(), probing_liquidity_limit_multiplier: DEFAULT_PROBING_LIQUIDITY_LIMIT_MULTIPLIER, log_level: DEFAULT_LOG_LEVEL, @@ -304,6 +287,157 @@ pub(crate) fn default_user_config(config: &Config) -> UserConfig { user_config } +/// Options related to syncing the Lightning and on-chain wallets via an Esplora backend. +/// +/// ### Defaults +/// +/// | Parameter | Value | +/// |----------------------------------------|--------------------| +/// | `onchain_wallet_sync_interval_secs` | 80 | +/// | `lightning_wallet_sync_interval_secs` | 30 | +/// | `fee_rate_cache_update_interval_secs` | 600 | +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct EsploraSyncConfig { + /// The time in-between background sync attempts of the onchain wallet, in seconds. + /// + /// **Note:** A minimum of 10 seconds is always enforced. + pub onchain_wallet_sync_interval_secs: u64, + /// The time in-between background sync attempts of the LDK wallet, in seconds. + /// + /// **Note:** A minimum of 10 seconds is always enforced. + pub lightning_wallet_sync_interval_secs: u64, + /// The time in-between background update attempts to our fee rate cache, in seconds. + /// + /// **Note:** A minimum of 10 seconds is always enforced. + pub fee_rate_cache_update_interval_secs: u64, +} + +impl Default for EsploraSyncConfig { + fn default() -> Self { + Self { + onchain_wallet_sync_interval_secs: DEFAULT_BDK_WALLET_SYNC_INTERVAL_SECS, + lightning_wallet_sync_interval_secs: DEFAULT_LDK_WALLET_SYNC_INTERVAL_SECS, + fee_rate_cache_update_interval_secs: DEFAULT_FEE_RATE_CACHE_UPDATE_INTERVAL_SECS, + } + } +} + +/// Options which apply on a per-channel basis and may change at runtime or based on negotiation +/// with our counterparty. +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub struct ChannelConfig { + /// Amount (in millionths of a satoshi) charged per satoshi for payments forwarded outbound + /// over the channel. + /// This may be allowed to change at runtime in a later update, however doing so must result in + /// update messages sent to notify all nodes of our updated relay fee. + /// + /// Please refer to [`LdkChannelConfig`] for further details. + pub forwarding_fee_proportional_millionths: u32, + /// Amount (in milli-satoshi) charged for payments forwarded outbound over the channel, in + /// excess of [`ChannelConfig::forwarding_fee_proportional_millionths`]. + /// This may be allowed to change at runtime in a later update, however doing so must result in + /// update messages sent to notify all nodes of our updated relay fee. + /// + /// Please refer to [`LdkChannelConfig`] for further details. + pub forwarding_fee_base_msat: u32, + /// The difference in the CLTV value between incoming HTLCs and an outbound HTLC forwarded over + /// the channel this config applies to. + /// + /// Please refer to [`LdkChannelConfig`] for further details. + pub cltv_expiry_delta: u16, + /// Limit our total exposure to potential loss to on-chain fees on close, including in-flight + /// HTLCs which are burned to fees as they are too small to claim on-chain and fees on + /// commitment transaction(s) broadcasted by our counterparty in excess of our own fee estimate. + /// + /// Please refer to [`LdkChannelConfig`] for further details. + pub max_dust_htlc_exposure: MaxDustHTLCExposure, + /// The additional fee we're willing to pay to avoid waiting for the counterparty's + /// `to_self_delay` to reclaim funds. + /// + /// Please refer to [`LdkChannelConfig`] for further details. + pub force_close_avoidance_max_fee_satoshis: u64, + /// If set, allows this channel's counterparty to skim an additional fee off this node's inbound + /// HTLCs. Useful for liquidity providers to offload on-chain channel costs to end users. + /// + /// Please refer to [`LdkChannelConfig`] for further details. + pub accept_underpaying_htlcs: bool, +} + +impl From for ChannelConfig { + fn from(value: LdkChannelConfig) -> Self { + Self { + forwarding_fee_proportional_millionths: value.forwarding_fee_proportional_millionths, + forwarding_fee_base_msat: value.forwarding_fee_base_msat, + cltv_expiry_delta: value.cltv_expiry_delta, + max_dust_htlc_exposure: value.max_dust_htlc_exposure.into(), + force_close_avoidance_max_fee_satoshis: value.force_close_avoidance_max_fee_satoshis, + accept_underpaying_htlcs: value.accept_underpaying_htlcs, + } + } +} + +impl From for LdkChannelConfig { + fn from(value: ChannelConfig) -> Self { + Self { + forwarding_fee_proportional_millionths: value.forwarding_fee_proportional_millionths, + forwarding_fee_base_msat: value.forwarding_fee_base_msat, + cltv_expiry_delta: value.cltv_expiry_delta, + max_dust_htlc_exposure: value.max_dust_htlc_exposure.into(), + force_close_avoidance_max_fee_satoshis: value.force_close_avoidance_max_fee_satoshis, + accept_underpaying_htlcs: value.accept_underpaying_htlcs, + } + } +} + +impl Default for ChannelConfig { + fn default() -> Self { + LdkChannelConfig::default().into() + } +} + +/// Options for how to set the max dust exposure allowed on a channel. +/// +/// See [`LdkChannelConfig::max_dust_htlc_exposure`] for details. +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum MaxDustHTLCExposure { + /// This sets a fixed limit on the total dust exposure in millisatoshis. + /// + /// Please refer to [`LdkMaxDustHTLCExposure`] for further details. + FixedLimit { + /// The fixed limit, in millisatoshis. + limit_msat: u64, + }, + /// This sets a multiplier on the feerate to determine the maximum allowed dust exposure. + /// + /// Please refer to [`LdkMaxDustHTLCExposure`] for further details. + FeeRateMultiplier { + /// The applied fee rate multiplier. + multiplier: u64, + }, +} + +impl From for MaxDustHTLCExposure { + fn from(value: LdkMaxDustHTLCExposure) -> Self { + match value { + LdkMaxDustHTLCExposure::FixedLimitMsat(limit_msat) => Self::FixedLimit { limit_msat }, + LdkMaxDustHTLCExposure::FeeRateMultiplier(multiplier) => { + Self::FeeRateMultiplier { multiplier } + }, + } + } +} + +impl From for LdkMaxDustHTLCExposure { + fn from(value: MaxDustHTLCExposure) -> Self { + match value { + MaxDustHTLCExposure::FixedLimit { limit_msat } => Self::FixedLimitMsat(limit_msat), + MaxDustHTLCExposure::FeeRateMultiplier { multiplier } => { + Self::FeeRateMultiplier(multiplier) + }, + } + } +} + #[cfg(test)] mod tests { use std::str::FromStr; diff --git a/src/fee_estimator.rs b/src/fee_estimator.rs index 62b4b8882..0ecc71586 100644 --- a/src/fee_estimator.rs +++ b/src/fee_estimator.rs @@ -5,23 +5,14 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -use crate::config::FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS; -use crate::logger::{log_error, log_trace, Logger}; -use crate::{Config, Error}; - use lightning::chain::chaininterface::ConfirmationTarget as LdkConfirmationTarget; use lightning::chain::chaininterface::FeeEstimator as LdkFeeEstimator; use lightning::chain::chaininterface::FEERATE_FLOOR_SATS_PER_KW; use bitcoin::FeeRate; -use esplora_client::AsyncClient as EsploraClient; - -use bitcoin::Network; use std::collections::HashMap; -use std::ops::Deref; -use std::sync::{Arc, RwLock}; -use std::time::Duration; +use std::sync::RwLock; #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] pub(crate) enum ConfirmationTarget { @@ -43,141 +34,26 @@ impl From for ConfirmationTarget { } } -pub(crate) struct OnchainFeeEstimator -where - L::Target: Logger, -{ +pub(crate) struct OnchainFeeEstimator { fee_rate_cache: RwLock>, - esplora_client: EsploraClient, - config: Arc, - logger: L, } -impl OnchainFeeEstimator -where - L::Target: Logger, -{ - pub(crate) fn new(esplora_client: EsploraClient, config: Arc, logger: L) -> Self { +impl OnchainFeeEstimator { + pub(crate) fn new() -> Self { let fee_rate_cache = RwLock::new(HashMap::new()); - Self { fee_rate_cache, esplora_client, config, logger } + Self { fee_rate_cache } } - pub(crate) async fn update_fee_estimates(&self) -> Result<(), Error> { - let estimates = tokio::time::timeout( - Duration::from_secs(FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS), - self.esplora_client.get_fee_estimates(), - ) - .await - .map_err(|e| { - log_error!(self.logger, "Updating fee rate estimates timed out: {}", e); - Error::FeerateEstimationUpdateTimeout - })? - .map_err(|e| { - log_error!(self.logger, "Failed to retrieve fee rate estimates: {}", e); - Error::FeerateEstimationUpdateFailed - })?; - - if estimates.is_empty() && self.config.network == Network::Bitcoin { - // Ensure we fail if we didn't receive any estimates. - log_error!( - self.logger, - "Failed to retrieve fee rate estimates: empty fee estimates are dissallowed on Mainnet.", - ); - return Err(Error::FeerateEstimationUpdateFailed); - } - - let confirmation_targets = vec![ - ConfirmationTarget::OnchainPayment, - ConfirmationTarget::ChannelFunding, - LdkConfirmationTarget::MaximumFeeEstimate.into(), - LdkConfirmationTarget::UrgentOnChainSweep.into(), - LdkConfirmationTarget::MinAllowedAnchorChannelRemoteFee.into(), - LdkConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee.into(), - LdkConfirmationTarget::AnchorChannelFee.into(), - LdkConfirmationTarget::NonAnchorChannelFee.into(), - LdkConfirmationTarget::ChannelCloseMinimum.into(), - LdkConfirmationTarget::OutputSpendingFee.into(), - ]; - - for target in confirmation_targets { - let num_blocks = match target { - ConfirmationTarget::OnchainPayment => 6, - ConfirmationTarget::ChannelFunding => 12, - ConfirmationTarget::Lightning(ldk_target) => match ldk_target { - LdkConfirmationTarget::MaximumFeeEstimate => 1, - LdkConfirmationTarget::UrgentOnChainSweep => 6, - LdkConfirmationTarget::MinAllowedAnchorChannelRemoteFee => 1008, - LdkConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee => 144, - LdkConfirmationTarget::AnchorChannelFee => 1008, - LdkConfirmationTarget::NonAnchorChannelFee => 12, - LdkConfirmationTarget::ChannelCloseMinimum => 144, - LdkConfirmationTarget::OutputSpendingFee => 12, - }, - }; - - let converted_estimate_sat_vb = - esplora_client::convert_fee_rate(num_blocks, estimates.clone()).map_err(|e| { - log_error!( - self.logger, - "Failed to convert fee rate estimates for {:?}: {}", - target, - e - ); - Error::FeerateEstimationUpdateFailed - })?; - - let fee_rate = FeeRate::from_sat_per_kwu((converted_estimate_sat_vb * 250.0) as u64); - - // LDK 0.0.118 introduced changes to the `ConfirmationTarget` semantics that - // require some post-estimation adjustments to the fee rates, which we do here. - let adjusted_fee_rate = match target { - ConfirmationTarget::Lightning( - LdkConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee, - ) => { - let slightly_less_than_background = fee_rate.to_sat_per_kwu() - 250; - FeeRate::from_sat_per_kwu(slightly_less_than_background) - }, - _ => fee_rate, - }; - - let mut locked_fee_rate_cache = self.fee_rate_cache.write().unwrap(); - locked_fee_rate_cache.insert(target, adjusted_fee_rate); - log_trace!( - self.logger, - "Fee rate estimation updated for {:?}: {} sats/kwu", - target, - adjusted_fee_rate.to_sat_per_kwu(), - ); - } - Ok(()) + pub(crate) fn set_fee_rate_cache(&self, fee_rate_cache: HashMap) { + *self.fee_rate_cache.write().unwrap() = fee_rate_cache; } } -impl FeeEstimator for OnchainFeeEstimator -where - L::Target: Logger, -{ +impl FeeEstimator for OnchainFeeEstimator { fn estimate_fee_rate(&self, confirmation_target: ConfirmationTarget) -> FeeRate { let locked_fee_rate_cache = self.fee_rate_cache.read().unwrap(); - let fallback_sats_kwu = match confirmation_target { - ConfirmationTarget::OnchainPayment => 5000, - ConfirmationTarget::ChannelFunding => 1000, - ConfirmationTarget::Lightning(ldk_target) => match ldk_target { - LdkConfirmationTarget::MaximumFeeEstimate => 8000, - LdkConfirmationTarget::UrgentOnChainSweep => 5000, - LdkConfirmationTarget::MinAllowedAnchorChannelRemoteFee => { - FEERATE_FLOOR_SATS_PER_KW - }, - LdkConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee => { - FEERATE_FLOOR_SATS_PER_KW - }, - LdkConfirmationTarget::AnchorChannelFee => 500, - LdkConfirmationTarget::NonAnchorChannelFee => 1000, - LdkConfirmationTarget::ChannelCloseMinimum => 500, - LdkConfirmationTarget::OutputSpendingFee => 1000, - }, - }; + let fallback_sats_kwu = get_fallback_rate_for_target(confirmation_target); // We'll fall back on this, if we really don't have any other information. let fallback_rate = FeeRate::from_sat_per_kwu(fallback_sats_kwu as u64); @@ -190,11 +66,71 @@ where } } -impl LdkFeeEstimator for OnchainFeeEstimator -where - L::Target: Logger, -{ +impl LdkFeeEstimator for OnchainFeeEstimator { fn get_est_sat_per_1000_weight(&self, confirmation_target: LdkConfirmationTarget) -> u32 { self.estimate_fee_rate(confirmation_target.into()).to_sat_per_kwu() as u32 } } + +pub(crate) fn get_num_block_defaults_for_target(target: ConfirmationTarget) -> usize { + match target { + ConfirmationTarget::OnchainPayment => 6, + ConfirmationTarget::ChannelFunding => 12, + ConfirmationTarget::Lightning(ldk_target) => match ldk_target { + LdkConfirmationTarget::MaximumFeeEstimate => 1, + LdkConfirmationTarget::UrgentOnChainSweep => 6, + LdkConfirmationTarget::MinAllowedAnchorChannelRemoteFee => 1008, + LdkConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee => 144, + LdkConfirmationTarget::AnchorChannelFee => 1008, + LdkConfirmationTarget::NonAnchorChannelFee => 12, + LdkConfirmationTarget::ChannelCloseMinimum => 144, + LdkConfirmationTarget::OutputSpendingFee => 12, + }, + } +} + +pub(crate) fn get_fallback_rate_for_target(target: ConfirmationTarget) -> u32 { + match target { + ConfirmationTarget::OnchainPayment => 5000, + ConfirmationTarget::ChannelFunding => 1000, + ConfirmationTarget::Lightning(ldk_target) => match ldk_target { + LdkConfirmationTarget::MaximumFeeEstimate => 8000, + LdkConfirmationTarget::UrgentOnChainSweep => 5000, + LdkConfirmationTarget::MinAllowedAnchorChannelRemoteFee => FEERATE_FLOOR_SATS_PER_KW, + LdkConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee => FEERATE_FLOOR_SATS_PER_KW, + LdkConfirmationTarget::AnchorChannelFee => 500, + LdkConfirmationTarget::NonAnchorChannelFee => 1000, + LdkConfirmationTarget::ChannelCloseMinimum => 500, + LdkConfirmationTarget::OutputSpendingFee => 1000, + }, + } +} + +pub(crate) fn get_all_conf_targets() -> [ConfirmationTarget; 10] { + [ + ConfirmationTarget::OnchainPayment, + ConfirmationTarget::ChannelFunding, + LdkConfirmationTarget::MaximumFeeEstimate.into(), + LdkConfirmationTarget::UrgentOnChainSweep.into(), + LdkConfirmationTarget::MinAllowedAnchorChannelRemoteFee.into(), + LdkConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee.into(), + LdkConfirmationTarget::AnchorChannelFee.into(), + LdkConfirmationTarget::NonAnchorChannelFee.into(), + LdkConfirmationTarget::ChannelCloseMinimum.into(), + LdkConfirmationTarget::OutputSpendingFee.into(), + ] +} + +pub(crate) fn apply_post_estimation_adjustments( + target: ConfirmationTarget, estimated_rate: FeeRate, +) -> FeeRate { + match target { + ConfirmationTarget::Lightning( + LdkConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee, + ) => { + let slightly_less_than_background = estimated_rate.to_sat_per_kwu() - 250; + FeeRate::from_sat_per_kwu(slightly_less_than_background) + }, + _ => estimated_rate, + } +} diff --git a/src/io/mod.rs b/src/io/mod.rs index 22caff50f..fab0a27f9 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -33,15 +33,10 @@ pub(crate) const DEPRECATED_SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: "spendable_outputs"; pub(crate) const DEPRECATED_SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; -/// RapidGossipSync's `latest_sync_timestamp` will be persisted under this key. -pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_PRIMARY_NAMESPACE: &str = ""; -pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_SECONDARY_NAMESPACE: &str = ""; -pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_KEY: &str = "latest_rgs_sync_timestamp"; - -/// The last time we broadcast a node announcement will be persisted under this key. -pub(crate) const LATEST_NODE_ANN_BCAST_TIMESTAMP_PRIMARY_NAMESPACE: &str = ""; -pub(crate) const LATEST_NODE_ANN_BCAST_TIMESTAMP_SECONDARY_NAMESPACE: &str = ""; -pub(crate) const LATEST_NODE_ANN_BCAST_TIMESTAMP_KEY: &str = "latest_node_ann_bcast_timestamp"; +/// The node metrics will be persisted under this key. +pub(crate) const NODE_METRICS_PRIMARY_NAMESPACE: &str = ""; +pub(crate) const NODE_METRICS_SECONDARY_NAMESPACE: &str = ""; +pub(crate) const NODE_METRICS_KEY: &str = "node_metrics"; /// The BDK wallet's [`ChangeSet::descriptor`] will be persisted under this key. /// diff --git a/src/io/utils.rs b/src/io/utils.rs index f6fd10b41..218fec473 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -8,12 +8,17 @@ use super::*; use crate::config::WALLET_KEYS_SEED_LEN; +use crate::chain::ChainSource; +use crate::fee_estimator::OnchainFeeEstimator; +use crate::io::{ + NODE_METRICS_KEY, NODE_METRICS_PRIMARY_NAMESPACE, NODE_METRICS_SECONDARY_NAMESPACE, +}; use crate::logger::{log_error, FilesystemLogger}; use crate::peer_store::PeerStore; use crate::sweep::DeprecatedSpendableOutputInfo; -use crate::types::{Broadcaster, ChainSource, DynStore, FeeEstimator, KeysManager, Sweeper}; +use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper}; use crate::wallet::ser::{ChangeSetDeserWrapper, ChangeSetSerWrapper}; -use crate::{Error, EventQueue, PaymentDetails}; +use crate::{Error, EventQueue, NodeMetrics, PaymentDetails}; use lightning::io::Cursor; use lightning::ln::msgs::DecodeError; @@ -219,7 +224,7 @@ where /// Read `OutputSweeper` state from the store. pub(crate) fn read_output_sweeper( - broadcaster: Arc, fee_estimator: Arc, + broadcaster: Arc, fee_estimator: Arc, chain_data_source: Arc, keys_manager: Arc, kv_store: Arc, logger: Arc, ) -> Result { @@ -340,98 +345,44 @@ where Ok(()) } -pub(crate) fn read_latest_rgs_sync_timestamp( +pub(crate) fn read_node_metrics( kv_store: Arc, logger: L, -) -> Result +) -> Result where L::Target: Logger, { let mut reader = Cursor::new(kv_store.read( - LATEST_RGS_SYNC_TIMESTAMP_PRIMARY_NAMESPACE, - LATEST_RGS_SYNC_TIMESTAMP_SECONDARY_NAMESPACE, - LATEST_RGS_SYNC_TIMESTAMP_KEY, + NODE_METRICS_PRIMARY_NAMESPACE, + NODE_METRICS_SECONDARY_NAMESPACE, + NODE_METRICS_KEY, )?); - u32::read(&mut reader).map_err(|e| { - log_error!(logger, "Failed to deserialize latest RGS sync timestamp: {}", e); - std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Failed to deserialize latest RGS sync timestamp", - ) - }) -} - -pub(crate) fn write_latest_rgs_sync_timestamp( - updated_timestamp: u32, kv_store: Arc, logger: L, -) -> Result<(), Error> -where - L::Target: Logger, -{ - let data = updated_timestamp.encode(); - kv_store - .write( - LATEST_RGS_SYNC_TIMESTAMP_PRIMARY_NAMESPACE, - LATEST_RGS_SYNC_TIMESTAMP_SECONDARY_NAMESPACE, - LATEST_RGS_SYNC_TIMESTAMP_KEY, - &data, - ) - .map_err(|e| { - log_error!( - logger, - "Writing data to key {}/{}/{} failed due to: {}", - LATEST_RGS_SYNC_TIMESTAMP_PRIMARY_NAMESPACE, - LATEST_RGS_SYNC_TIMESTAMP_SECONDARY_NAMESPACE, - LATEST_RGS_SYNC_TIMESTAMP_KEY, - e - ); - Error::PersistenceFailed - }) -} - -pub(crate) fn read_latest_node_ann_bcast_timestamp( - kv_store: Arc, logger: L, -) -> Result -where - L::Target: Logger, -{ - let mut reader = Cursor::new(kv_store.read( - LATEST_NODE_ANN_BCAST_TIMESTAMP_PRIMARY_NAMESPACE, - LATEST_NODE_ANN_BCAST_TIMESTAMP_SECONDARY_NAMESPACE, - LATEST_NODE_ANN_BCAST_TIMESTAMP_KEY, - )?); - u64::read(&mut reader).map_err(|e| { - log_error!( - logger, - "Failed to deserialize latest node announcement broadcast timestamp: {}", - e - ); - std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Failed to deserialize latest node announcement broadcast timestamp", - ) + NodeMetrics::read(&mut reader).map_err(|e| { + log_error!(logger, "Failed to deserialize NodeMetrics: {}", e); + std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize NodeMetrics") }) } -pub(crate) fn write_latest_node_ann_bcast_timestamp( - updated_timestamp: u64, kv_store: Arc, logger: L, +pub(crate) fn write_node_metrics( + node_metrics: &NodeMetrics, kv_store: Arc, logger: L, ) -> Result<(), Error> where L::Target: Logger, { - let data = updated_timestamp.encode(); + let data = node_metrics.encode(); kv_store .write( - LATEST_NODE_ANN_BCAST_TIMESTAMP_PRIMARY_NAMESPACE, - LATEST_NODE_ANN_BCAST_TIMESTAMP_SECONDARY_NAMESPACE, - LATEST_NODE_ANN_BCAST_TIMESTAMP_KEY, + NODE_METRICS_PRIMARY_NAMESPACE, + NODE_METRICS_SECONDARY_NAMESPACE, + NODE_METRICS_KEY, &data, ) .map_err(|e| { log_error!( logger, "Writing data to key {}/{}/{} failed due to: {}", - LATEST_NODE_ANN_BCAST_TIMESTAMP_PRIMARY_NAMESPACE, - LATEST_NODE_ANN_BCAST_TIMESTAMP_SECONDARY_NAMESPACE, - LATEST_NODE_ANN_BCAST_TIMESTAMP_KEY, + NODE_METRICS_PRIMARY_NAMESPACE, + NODE_METRICS_SECONDARY_NAMESPACE, + NODE_METRICS_KEY, e ); Error::PersistenceFailed diff --git a/src/lib.rs b/src/lib.rs index 67e49fd38..42b99406a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,7 +33,7 @@ //! fn main() { //! let mut builder = Builder::new(); //! builder.set_network(Network::Testnet); -//! builder.set_esplora_server("https://blockstream.info/testnet/api".to_string()); +//! builder.set_chain_source_esplora("https://blockstream.info/testnet/api".to_string(), None); //! builder.set_gossip_source_rgs("https://rapidsync.lightningdevkit.org/testnet/snapshot".to_string()); //! //! let node = builder.build().unwrap(); @@ -74,7 +74,8 @@ mod balance; mod builder; -mod config; +mod chain; +pub mod config; mod connection; mod error; mod event; @@ -101,12 +102,10 @@ pub use lightning; pub use lightning_invoice; pub use balance::{BalanceDetails, LightningBalance, PendingSweepBalance}; -pub use config::{default_config, AnchorChannelsConfig, Config}; pub use error::Error as NodeError; use error::Error; pub use event::Event; -pub use types::{ChannelConfig, MaxDustHTLCExposure}; pub use io::utils::generate_entropy_mnemonic; @@ -119,16 +118,16 @@ pub use builder::BuildError; #[cfg(not(feature = "uniffi"))] pub use builder::NodeBuilder as Builder; +use chain::ChainSource; use config::{ - default_user_config, may_announce_channel, LDK_WALLET_SYNC_TIMEOUT_SECS, - NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, - RESOLVED_CHANNEL_MONITOR_ARCHIVAL_INTERVAL, RGS_SYNC_INTERVAL, - WALLET_SYNC_INTERVAL_MINIMUM_SECS, + default_user_config, may_announce_channel, ChannelConfig, Config, NODE_ANN_BCAST_INTERVAL, + PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL, }; use connection::ConnectionManager; use event::{EventHandler, EventQueue}; use gossip::GossipSource; use graph::NetworkGraph; +use io::utils::write_node_metrics; use liquidity::LiquiditySource; use payment::store::PaymentStore; use payment::{ @@ -137,15 +136,16 @@ use payment::{ }; use peer_store::{PeerInfo, PeerStore}; use types::{ - Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, FeeEstimator, - Graph, KeysManager, OnionMessenger, PeerManager, Router, Scorer, Sweeper, Wallet, + Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph, + KeysManager, OnionMessenger, PeerManager, Router, Scorer, Sweeper, Wallet, }; pub use types::{ChannelDetails, PeerDetails, UserChannelId}; use logger::{log_error, log_info, log_trace, FilesystemLogger, Logger}; -use lightning::chain::{BestBlock, Confirm}; +use lightning::chain::BestBlock; use lightning::events::bump_transaction::Wallet as LdkWallet; +use lightning::impl_writeable_tlv_based; use lightning::ln::channel_state::ChannelShutdownState; use lightning::ln::channelmanager::PaymentId; use lightning::ln::msgs::SocketAddress; @@ -155,8 +155,6 @@ pub use lightning::util::logger::Level as LogLevel; use lightning_background_processor::process_events_async; -use lightning_transaction_sync::EsploraSyncClient; - use bitcoin::secp256k1::PublicKey; use rand::Rng; @@ -179,9 +177,8 @@ pub struct Node { event_handling_stopped_sender: tokio::sync::watch::Sender<()>, config: Arc, wallet: Arc, - tx_sync: Arc>>, + chain_source: Arc, tx_broadcaster: Arc, - fee_estimator: Arc, event_queue: Arc>>, channel_manager: Arc, chain_monitor: Arc, @@ -200,12 +197,7 @@ pub struct Node { peer_store: Arc>>, payment_store: Arc>>, is_listening: Arc, - latest_wallet_sync_timestamp: Arc>>, - latest_onchain_wallet_sync_timestamp: Arc>>, - latest_fee_rate_cache_update_timestamp: Arc>>, - latest_rgs_snapshot_timestamp: Arc>>, - latest_node_announcement_broadcast_timestamp: Arc>>, - latest_channel_monitor_archival_height: Arc>>, + node_metrics: Arc>, } impl Node { @@ -244,200 +236,29 @@ impl Node { ); // Block to ensure we update our fee rate cache once on startup - let fee_estimator = Arc::clone(&self.fee_estimator); - let sync_logger = Arc::clone(&self.logger); - let sync_fee_rate_update_timestamp = - Arc::clone(&self.latest_fee_rate_cache_update_timestamp); + let chain_source = Arc::clone(&self.chain_source); let runtime_ref = &runtime; tokio::task::block_in_place(move || { - runtime_ref.block_on(async move { - let now = Instant::now(); - match fee_estimator.update_fee_estimates().await { - Ok(()) => { - log_info!( - sync_logger, - "Initial fee rate cache update finished in {}ms.", - now.elapsed().as_millis() - ); - let unix_time_secs_opt = - SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); - *sync_fee_rate_update_timestamp.write().unwrap() = unix_time_secs_opt; - Ok(()) - }, - Err(e) => { - log_error!(sync_logger, "Initial fee rate cache update failed: {}", e,); - Err(e) - }, - } - }) + runtime_ref.block_on(async move { chain_source.update_fee_rate_estimates().await }) })?; - // Setup wallet sync - let wallet = Arc::clone(&self.wallet); - let sync_logger = Arc::clone(&self.logger); - let sync_onchain_wallet_timestamp = Arc::clone(&self.latest_onchain_wallet_sync_timestamp); - let mut stop_sync = self.stop_sender.subscribe(); - let onchain_wallet_sync_interval_secs = self - .config - .onchain_wallet_sync_interval_secs - .max(config::WALLET_SYNC_INTERVAL_MINIMUM_SECS); - runtime.spawn(async move { - let mut onchain_wallet_sync_interval = - tokio::time::interval(Duration::from_secs(onchain_wallet_sync_interval_secs)); - onchain_wallet_sync_interval - .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - loop { - tokio::select! { - _ = stop_sync.changed() => { - log_trace!( - sync_logger, - "Stopping background syncing on-chain wallet.", - ); - return; - } - _ = onchain_wallet_sync_interval.tick() => { - let now = Instant::now(); - match wallet.sync().await { - Ok(()) => { - log_trace!( - sync_logger, - "Background sync of on-chain wallet finished in {}ms.", - now.elapsed().as_millis() - ); - let unix_time_secs_opt = - SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); - *sync_onchain_wallet_timestamp.write().unwrap() = unix_time_secs_opt; - } - Err(err) => { - log_error!( - sync_logger, - "Background sync of on-chain wallet failed: {}", - err - ) - } - } - } - } - } - }); - - let mut stop_fee_updates = self.stop_sender.subscribe(); - let fee_update_logger = Arc::clone(&self.logger); - let fee_update_timestamp = Arc::clone(&self.latest_fee_rate_cache_update_timestamp); - let fee_estimator = Arc::clone(&self.fee_estimator); - let fee_rate_cache_update_interval_secs = - self.config.fee_rate_cache_update_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS); - runtime.spawn(async move { - let mut fee_rate_update_interval = - tokio::time::interval(Duration::from_secs(fee_rate_cache_update_interval_secs)); - // We just blocked on updating, so skip the first tick. - fee_rate_update_interval.reset(); - fee_rate_update_interval - .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - loop { - tokio::select! { - _ = stop_fee_updates.changed() => { - log_trace!( - fee_update_logger, - "Stopping background updates of fee rate cache.", - ); - return; - } - _ = fee_rate_update_interval.tick() => { - let now = Instant::now(); - match fee_estimator.update_fee_estimates().await { - Ok(()) => { - log_trace!( - fee_update_logger, - "Background update of fee rate cache finished in {}ms.", - now.elapsed().as_millis() - ); - let unix_time_secs_opt = - SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); - *fee_update_timestamp.write().unwrap() = unix_time_secs_opt; - } - Err(err) => { - log_error!( - fee_update_logger, - "Background update of fee rate cache failed: {}", - err - ) - } - } - } - } - } - }); - - let tx_sync = Arc::clone(&self.tx_sync); + // Spawn background task continuously syncing onchain, lightning, and fee rate cache. + let stop_sync_receiver = self.stop_sender.subscribe(); + let chain_source = Arc::clone(&self.chain_source); let sync_cman = Arc::clone(&self.channel_manager); - let archive_cman = Arc::clone(&self.channel_manager); let sync_cmon = Arc::clone(&self.chain_monitor); - let archive_cmon = Arc::clone(&self.chain_monitor); let sync_sweeper = Arc::clone(&self.output_sweeper); - let sync_logger = Arc::clone(&self.logger); - let sync_wallet_timestamp = Arc::clone(&self.latest_wallet_sync_timestamp); - let sync_monitor_archival_height = Arc::clone(&self.latest_channel_monitor_archival_height); - let mut stop_sync = self.stop_sender.subscribe(); - let wallet_sync_interval_secs = - self.config.wallet_sync_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS); runtime.spawn(async move { - let mut wallet_sync_interval = - tokio::time::interval(Duration::from_secs(wallet_sync_interval_secs)); - wallet_sync_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - loop { - tokio::select! { - _ = stop_sync.changed() => { - log_trace!( - sync_logger, - "Stopping background syncing Lightning wallet.", - ); - return; - } - _ = wallet_sync_interval.tick() => { - let confirmables = vec![ - &*sync_cman as &(dyn Confirm + Sync + Send), - &*sync_cmon as &(dyn Confirm + Sync + Send), - &*sync_sweeper as &(dyn Confirm + Sync + Send), - ]; - let now = Instant::now(); - let timeout_fut = tokio::time::timeout(Duration::from_secs(LDK_WALLET_SYNC_TIMEOUT_SECS), tx_sync.sync(confirmables)); - match timeout_fut.await { - Ok(res) => match res { - Ok(()) => { - log_trace!( - sync_logger, - "Background sync of Lightning wallet finished in {}ms.", - now.elapsed().as_millis() - ); - let unix_time_secs_opt = - SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); - *sync_wallet_timestamp.write().unwrap() = unix_time_secs_opt; - - periodically_archive_fully_resolved_monitors( - Arc::clone(&archive_cman), - Arc::clone(&archive_cmon), - Arc::clone(&sync_monitor_archival_height) - ); - } - Err(e) => { - log_error!(sync_logger, "Background sync of Lightning wallet failed: {}", e) - } - } - Err(e) => { - log_error!(sync_logger, "Background sync of Lightning wallet timed out: {}", e) - } - } - } - } - } + chain_source + .continuously_sync_wallets(stop_sync_receiver, sync_cman, sync_cmon, sync_sweeper) + .await; }); if self.gossip_source.is_rgs() { let gossip_source = Arc::clone(&self.gossip_source); let gossip_sync_store = Arc::clone(&self.kv_store); let gossip_sync_logger = Arc::clone(&self.logger); - let gossip_rgs_sync_timestamp = Arc::clone(&self.latest_rgs_snapshot_timestamp); + let gossip_node_metrics = Arc::clone(&self.node_metrics); let mut stop_gossip_sync = self.stop_sender.subscribe(); runtime.spawn(async move { let mut interval = tokio::time::interval(RGS_SYNC_INTERVAL); @@ -460,22 +281,22 @@ impl Node { "Background sync of RGS gossip data finished in {}ms.", now.elapsed().as_millis() ); - io::utils::write_latest_rgs_sync_timestamp( - updated_timestamp, - Arc::clone(&gossip_sync_store), - Arc::clone(&gossip_sync_logger), - ) - .unwrap_or_else(|e| { - log_error!(gossip_sync_logger, "Persistence failed: {}", e); - panic!("Persistence failed"); - }); - *gossip_rgs_sync_timestamp.write().unwrap() = Some(updated_timestamp as u64); + { + let mut locked_node_metrics = gossip_node_metrics.write().unwrap(); + locked_node_metrics.latest_rgs_snapshot_timestamp = Some(updated_timestamp); + write_node_metrics(&*locked_node_metrics, Arc::clone(&gossip_sync_store), Arc::clone(&gossip_sync_logger)) + .unwrap_or_else(|e| { + log_error!(gossip_sync_logger, "Persistence failed: {}", e); + }); + } + } + Err(e) => { + log_error!( + gossip_sync_logger, + "Background sync of RGS gossip data failed: {}", + e + ) } - Err(e) => log_error!( - gossip_sync_logger, - "Background sync of RGS gossip data failed: {}", - e - ), } } } @@ -597,7 +418,7 @@ impl Node { let bcast_config = Arc::clone(&self.config); let bcast_store = Arc::clone(&self.kv_store); let bcast_logger = Arc::clone(&self.logger); - let bcast_ann_timestamp = Arc::clone(&self.latest_node_announcement_broadcast_timestamp); + let bcast_node_metrics = Arc::clone(&self.node_metrics); let mut stop_bcast = self.stop_sender.subscribe(); let node_alias = self.config.node_alias.clone(); if may_announce_channel(&self.config) { @@ -617,13 +438,13 @@ impl Node { return; } _ = interval.tick() => { - let skip_broadcast = match io::utils::read_latest_node_ann_bcast_timestamp(Arc::clone(&bcast_store), Arc::clone(&bcast_logger)) { - Ok(latest_bcast_time_secs) => { + let skip_broadcast = match bcast_node_metrics.read().unwrap().latest_node_announcement_broadcast_timestamp { + Some(latest_bcast_time_secs) => { // Skip if the time hasn't elapsed yet. let next_bcast_unix_time = SystemTime::UNIX_EPOCH + Duration::from_secs(latest_bcast_time_secs) + NODE_ANN_BCAST_INTERVAL; next_bcast_unix_time.elapsed().is_err() } - Err(_) => { + None => { // Don't skip if we haven't broadcasted before. false } @@ -655,20 +476,18 @@ impl Node { let unix_time_secs_opt = SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); - *bcast_ann_timestamp.write().unwrap() = unix_time_secs_opt; - - if let Some(unix_time_secs) = unix_time_secs_opt { - io::utils::write_latest_node_ann_bcast_timestamp(unix_time_secs, Arc::clone(&bcast_store), Arc::clone(&bcast_logger)) + { + let mut locked_node_metrics = bcast_node_metrics.write().unwrap(); + locked_node_metrics.latest_node_announcement_broadcast_timestamp = unix_time_secs_opt; + write_node_metrics(&*locked_node_metrics, Arc::clone(&bcast_store), Arc::clone(&bcast_logger)) .unwrap_or_else(|e| { log_error!(bcast_logger, "Persistence failed: {}", e); - panic!("Persistence failed"); }); } } else { debug_assert!(false, "We checked whether the node may announce, so node alias should always be set"); continue } - } } } @@ -676,7 +495,7 @@ impl Node { } let mut stop_tx_bcast = self.stop_sender.subscribe(); - let tx_bcaster = Arc::clone(&self.tx_broadcaster); + let chain_source = Arc::clone(&self.chain_source); let tx_bcast_logger = Arc::clone(&self.logger); runtime.spawn(async move { // Every second we try to clear our broadcasting queue. @@ -692,7 +511,7 @@ impl Node { return; } _ = interval.tick() => { - tx_bcaster.process_queue().await; + chain_source.process_broadcast_queue().await; } } } @@ -895,24 +714,30 @@ impl Node { let is_running = self.runtime.read().unwrap().is_some(); let is_listening = self.is_listening.load(Ordering::Acquire); let current_best_block = self.channel_manager.current_best_block().into(); - let latest_wallet_sync_timestamp = *self.latest_wallet_sync_timestamp.read().unwrap(); + let locked_node_metrics = self.node_metrics.read().unwrap(); + let latest_lightning_wallet_sync_timestamp = + locked_node_metrics.latest_lightning_wallet_sync_timestamp; let latest_onchain_wallet_sync_timestamp = - *self.latest_onchain_wallet_sync_timestamp.read().unwrap(); + locked_node_metrics.latest_onchain_wallet_sync_timestamp; let latest_fee_rate_cache_update_timestamp = - *self.latest_fee_rate_cache_update_timestamp.read().unwrap(); - let latest_rgs_snapshot_timestamp = *self.latest_rgs_snapshot_timestamp.read().unwrap(); + locked_node_metrics.latest_fee_rate_cache_update_timestamp; + let latest_rgs_snapshot_timestamp = + locked_node_metrics.latest_rgs_snapshot_timestamp.map(|val| val as u64); let latest_node_announcement_broadcast_timestamp = - *self.latest_node_announcement_broadcast_timestamp.read().unwrap(); + locked_node_metrics.latest_node_announcement_broadcast_timestamp; + let latest_channel_monitor_archival_height = + locked_node_metrics.latest_channel_monitor_archival_height; NodeStatus { is_running, is_listening, current_best_block, - latest_wallet_sync_timestamp, + latest_lightning_wallet_sync_timestamp, latest_onchain_wallet_sync_timestamp, latest_fee_rate_cache_update_timestamp, latest_rgs_snapshot_timestamp, latest_node_announcement_broadcast_timestamp, + latest_channel_monitor_archival_height, } } @@ -1304,6 +1129,8 @@ impl Node { /// opening the channel. /// /// Returns a [`UserChannelId`] allowing to locally keep track of the channel. + /// + /// [`AnchorChannelsConfig::per_channel_reserve_sats`]: crate::config::AnchorChannelsConfig::per_channel_reserve_sats pub fn open_channel( &self, node_id: PublicKey, address: SocketAddress, channel_amount_sats: u64, push_to_counterparty_msat: Option, channel_config: Option, @@ -1337,6 +1164,8 @@ impl Node { /// opening the channel. /// /// Returns a [`UserChannelId`] allowing to locally keep track of the channel. + /// + /// [`AnchorChannelsConfig::per_channel_reserve_sats`]: crate::config::AnchorChannelsConfig::per_channel_reserve_sats pub fn open_announced_channel( &self, node_id: PublicKey, address: SocketAddress, channel_amount_sats: u64, push_to_counterparty_msat: Option, channel_config: Option, @@ -1359,120 +1188,31 @@ impl Node { /// Manually sync the LDK and BDK wallets with the current chain state and update the fee rate /// cache. /// - /// **Note:** The wallets are regularly synced in the background, which is configurable via - /// [`Config::onchain_wallet_sync_interval_secs`] and [`Config::wallet_sync_interval_secs`]. - /// Therefore, using this blocking sync method is almost always redundant and should be avoided - /// where possible. + /// **Note:** The wallets are regularly synced in the background, which is configurable via the + /// respective config object, e.g., via + /// [`EsploraSyncConfig::onchain_wallet_sync_interval_secs`] and + /// [`EsploraSyncConfig::lightning_wallet_sync_interval_secs`]. Therefore, using this blocking + /// sync method is almost always redundant and should be avoided where possible. + /// + /// [`EsploraSyncConfig::onchain_wallet_sync_interval_secs`]: crate::config::EsploraSyncConfig::onchain_wallet_sync_interval_secs + /// [`EsploraSyncConfig::lightning_wallet_sync_interval_secs`]: crate::config::EsploraSyncConfig::lightning_wallet_sync_interval_secs pub fn sync_wallets(&self) -> Result<(), Error> { let rt_lock = self.runtime.read().unwrap(); if rt_lock.is_none() { return Err(Error::NotRunning); } - let wallet = Arc::clone(&self.wallet); - let tx_sync = Arc::clone(&self.tx_sync); + let chain_source = Arc::clone(&self.chain_source); let sync_cman = Arc::clone(&self.channel_manager); - let archive_cman = Arc::clone(&self.channel_manager); let sync_cmon = Arc::clone(&self.chain_monitor); - let archive_cmon = Arc::clone(&self.chain_monitor); - let fee_estimator = Arc::clone(&self.fee_estimator); let sync_sweeper = Arc::clone(&self.output_sweeper); - let sync_logger = Arc::clone(&self.logger); - let confirmables = vec![ - &*sync_cman as &(dyn Confirm + Sync + Send), - &*sync_cmon as &(dyn Confirm + Sync + Send), - &*sync_sweeper as &(dyn Confirm + Sync + Send), - ]; - let sync_wallet_timestamp = Arc::clone(&self.latest_wallet_sync_timestamp); - let sync_fee_rate_update_timestamp = - Arc::clone(&self.latest_fee_rate_cache_update_timestamp); - let sync_onchain_wallet_timestamp = Arc::clone(&self.latest_onchain_wallet_sync_timestamp); - let sync_monitor_archival_height = Arc::clone(&self.latest_channel_monitor_archival_height); - tokio::task::block_in_place(move || { tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap().block_on( async move { - let now = Instant::now(); - // We don't add an additional timeout here, as `Wallet::sync` already returns - // after a timeout. - match wallet.sync().await { - Ok(()) => { - log_info!( - sync_logger, - "Sync of on-chain wallet finished in {}ms.", - now.elapsed().as_millis() - ); - let unix_time_secs_opt = SystemTime::now() - .duration_since(UNIX_EPOCH) - .ok() - .map(|d| d.as_secs()); - *sync_onchain_wallet_timestamp.write().unwrap() = unix_time_secs_opt; - }, - Err(e) => { - log_error!(sync_logger, "Sync of on-chain wallet failed: {}", e); - return Err(e); - }, - }; - - let now = Instant::now(); - // We don't add an additional timeout here, as - // `FeeEstimator::update_fee_estimates` already returns after a timeout. - match fee_estimator.update_fee_estimates().await { - Ok(()) => { - log_info!( - sync_logger, - "Fee rate cache update finished in {}ms.", - now.elapsed().as_millis() - ); - let unix_time_secs_opt = SystemTime::now() - .duration_since(UNIX_EPOCH) - .ok() - .map(|d| d.as_secs()); - *sync_fee_rate_update_timestamp.write().unwrap() = unix_time_secs_opt; - }, - Err(e) => { - log_error!(sync_logger, "Fee rate cache update failed: {}", e,); - return Err(e); - }, - } - - let now = Instant::now(); - let tx_sync_timeout_fut = tokio::time::timeout( - Duration::from_secs(LDK_WALLET_SYNC_TIMEOUT_SECS), - tx_sync.sync(confirmables), - ); - match tx_sync_timeout_fut.await { - Ok(res) => match res { - Ok(()) => { - log_info!( - sync_logger, - "Sync of Lightning wallet finished in {}ms.", - now.elapsed().as_millis() - ); - - let unix_time_secs_opt = SystemTime::now() - .duration_since(UNIX_EPOCH) - .ok() - .map(|d| d.as_secs()); - *sync_wallet_timestamp.write().unwrap() = unix_time_secs_opt; - - periodically_archive_fully_resolved_monitors( - archive_cman, - archive_cmon, - sync_monitor_archival_height, - ); - Ok(()) - }, - Err(e) => { - log_error!(sync_logger, "Sync of Lightning wallet failed: {}", e); - Err(e.into()) - }, - }, - Err(e) => { - log_error!(sync_logger, "Sync of Lightning wallet timed out: {}", e); - Err(Error::TxSyncTimeout) - }, - } + chain_source.update_fee_rate_estimates().await?; + chain_source.sync_lightning_wallet(sync_cman, sync_cmon, sync_sweeper).await?; + chain_source.sync_onchain_wallet().await?; + Ok(()) }, ) }) @@ -1499,6 +1239,8 @@ impl Node { /// Broadcasting the closing transactions will be omitted for Anchor channels if we trust the /// counterparty to broadcast for us (see [`AnchorChannelsConfig::trusted_peers_no_reserve`] /// for more information). + /// + /// [`AnchorChannelsConfig::trusted_peers_no_reserve`]: crate::config::AnchorChannelsConfig::trusted_peers_no_reserve pub fn force_close_channel( &self, user_channel_id: &UserChannelId, counterparty_node_id: PublicKey, reason: Option, @@ -1655,7 +1397,8 @@ impl Node { /// /// For example, you could retrieve all stored outbound payments as follows: /// ``` - /// # use ldk_node::{Builder, Config}; + /// # use ldk_node::Builder; + /// # use ldk_node::config::Config; /// # use ldk_node::payment::PaymentDirection; /// # use ldk_node::bitcoin::Network; /// # let mut config = Config::default(); @@ -1766,30 +1509,67 @@ pub struct NodeStatus { /// The timestamp, in seconds since start of the UNIX epoch, when we last successfully synced /// our Lightning wallet to the chain tip. /// - /// Will be `None` if the wallet hasn't been synced since the [`Node`] was initialized. - pub latest_wallet_sync_timestamp: Option, + /// Will be `None` if the wallet hasn't been synced yet. + pub latest_lightning_wallet_sync_timestamp: Option, /// The timestamp, in seconds since start of the UNIX epoch, when we last successfully synced /// our on-chain wallet to the chain tip. /// - /// Will be `None` if the wallet hasn't been synced since the [`Node`] was initialized. + /// Will be `None` if the wallet hasn't been synced yet. pub latest_onchain_wallet_sync_timestamp: Option, /// The timestamp, in seconds since start of the UNIX epoch, when we last successfully update /// our fee rate cache. /// - /// Will be `None` if the cache hasn't been updated since the [`Node`] was initialized. + /// Will be `None` if the cache hasn't been updated yet. pub latest_fee_rate_cache_update_timestamp: Option, /// The timestamp, in seconds since start of the UNIX epoch, when the last rapid gossip sync /// (RGS) snapshot we successfully applied was generated. /// - /// Will be `None` if RGS isn't configured or the snapshot hasn't been updated since the [`Node`] was initialized. + /// Will be `None` if RGS isn't configured or the snapshot hasn't been updated yet. pub latest_rgs_snapshot_timestamp: Option, /// The timestamp, in seconds since start of the UNIX epoch, when we last broadcasted a node /// announcement. /// - /// Will be `None` if we have no public channels or we haven't broadcasted since the [`Node`] was initialized. + /// Will be `None` if we have no public channels or we haven't broadcasted yet. pub latest_node_announcement_broadcast_timestamp: Option, + /// The block height when we last archived closed channel monitor data. + /// + /// Will be `None` if we haven't archived any monitors of closed channels yet. + pub latest_channel_monitor_archival_height: Option, } +/// Status fields that are persisted across restarts. +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct NodeMetrics { + latest_lightning_wallet_sync_timestamp: Option, + latest_onchain_wallet_sync_timestamp: Option, + latest_fee_rate_cache_update_timestamp: Option, + latest_rgs_snapshot_timestamp: Option, + latest_node_announcement_broadcast_timestamp: Option, + latest_channel_monitor_archival_height: Option, +} + +impl Default for NodeMetrics { + fn default() -> Self { + Self { + latest_lightning_wallet_sync_timestamp: None, + latest_onchain_wallet_sync_timestamp: None, + latest_fee_rate_cache_update_timestamp: None, + latest_rgs_snapshot_timestamp: None, + latest_node_announcement_broadcast_timestamp: None, + latest_channel_monitor_archival_height: None, + } + } +} + +impl_writeable_tlv_based!(NodeMetrics, { + (0, latest_lightning_wallet_sync_timestamp, option), + (2, latest_onchain_wallet_sync_timestamp, option), + (4, latest_fee_rate_cache_update_timestamp, option), + (6, latest_rgs_snapshot_timestamp, option), + (8, latest_node_announcement_broadcast_timestamp, option), + (10, latest_channel_monitor_archival_height, option), +}); + pub(crate) fn total_anchor_channels_reserve_sats( channel_manager: &ChannelManager, config: &Config, ) -> u64 { @@ -1809,19 +1589,3 @@ pub(crate) fn total_anchor_channels_reserve_sats( * anchor_channels_config.per_channel_reserve_sats }) } - -fn periodically_archive_fully_resolved_monitors( - channel_manager: Arc, chain_monitor: Arc, - latest_channel_monitor_archival_height: Arc>>, -) { - let mut latest_archival_height_lock = latest_channel_monitor_archival_height.write().unwrap(); - let cur_height = channel_manager.current_best_block().height; - let should_archive = latest_archival_height_lock - .as_ref() - .map_or(true, |h| cur_height >= h + RESOLVED_CHANNEL_MONITOR_ARCHIVAL_INTERVAL); - - if should_archive { - chain_monitor.archive_fully_resolved_channel_monitors(); - *latest_archival_height_lock = Some(cur_height); - } -} diff --git a/src/tx_broadcaster.rs b/src/tx_broadcaster.rs index 37bd616dc..5aded03c6 100644 --- a/src/tx_broadcaster.rs +++ b/src/tx_broadcaster.rs @@ -5,22 +5,16 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -use crate::config::TX_BROADCAST_TIMEOUT_SECS; -use crate::logger::{log_bytes, log_error, log_trace, Logger}; +use crate::logger::{log_error, Logger}; use lightning::chain::chaininterface::BroadcasterInterface; -use lightning::util::ser::Writeable; - -use esplora_client::AsyncClient as EsploraClient; use bitcoin::Transaction; -use reqwest::StatusCode; use tokio::sync::mpsc; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, MutexGuard}; use std::ops::Deref; -use std::time::Duration; const BCAST_PACKAGE_QUEUE_SIZE: usize = 50; @@ -30,7 +24,6 @@ where { queue_sender: mpsc::Sender>, queue_receiver: Mutex>>, - esplora_client: EsploraClient, logger: L, } @@ -38,77 +31,13 @@ impl TransactionBroadcaster where L::Target: Logger, { - pub(crate) fn new(esplora_client: EsploraClient, logger: L) -> Self { + pub(crate) fn new(logger: L) -> Self { let (queue_sender, queue_receiver) = mpsc::channel(BCAST_PACKAGE_QUEUE_SIZE); - Self { queue_sender, queue_receiver: Mutex::new(queue_receiver), esplora_client, logger } + Self { queue_sender, queue_receiver: Mutex::new(queue_receiver), logger } } - pub(crate) async fn process_queue(&self) { - let mut receiver = self.queue_receiver.lock().await; - while let Some(next_package) = receiver.recv().await { - for tx in &next_package { - let txid = tx.compute_txid(); - let timeout_fut = tokio::time::timeout( - Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS), - self.esplora_client.broadcast(tx), - ); - match timeout_fut.await { - Ok(res) => match res { - Ok(()) => { - log_trace!(self.logger, "Successfully broadcast transaction {}", txid); - }, - Err(e) => match e { - esplora_client::Error::Reqwest(err) => { - if err.status() == StatusCode::from_u16(400).ok() { - // Ignore 400, as this just means bitcoind already knows the - // transaction. - // FIXME: We can further differentiate here based on the error - // message which will be available with rust-esplora-client 0.7 and - // later. - } else { - log_error!( - self.logger, - "Failed to broadcast due to HTTP connection error: {}", - err - ); - } - log_trace!( - self.logger, - "Failed broadcast transaction bytes: {}", - log_bytes!(tx.encode()) - ); - }, - _ => { - log_error!( - self.logger, - "Failed to broadcast transaction {}: {}", - txid, - e - ); - log_trace!( - self.logger, - "Failed broadcast transaction bytes: {}", - log_bytes!(tx.encode()) - ); - }, - }, - }, - Err(e) => { - log_error!( - self.logger, - "Failed to broadcast transaction due to timeout {}: {}", - txid, - e - ); - log_trace!( - self.logger, - "Failed broadcast transaction bytes: {}", - log_bytes!(tx.encode()) - ); - }, - } - } - } + pub(crate) async fn get_broadcast_queue(&self) -> MutexGuard>> { + self.queue_receiver.lock().await } } diff --git a/src/types.rs b/src/types.rs index 5005d93a6..9fae37e18 100644 --- a/src/types.rs +++ b/src/types.rs @@ -5,6 +5,9 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. +use crate::chain::ChainSource; +use crate::config::ChannelConfig; +use crate::fee_estimator::OnchainFeeEstimator; use crate::logger::FilesystemLogger; use crate::message_handler::NodeCustomMessageHandler; @@ -18,13 +21,10 @@ use lightning::routing::gossip; use lightning::routing::router::DefaultRouter; use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters}; use lightning::sign::InMemorySigner; -use lightning::util::config::ChannelConfig as LdkChannelConfig; -use lightning::util::config::MaxDustHTLCExposure as LdkMaxDustHTLCExposure; use lightning::util::persist::KVStore; use lightning::util::ser::{Readable, Writeable, Writer}; use lightning::util::sweep::OutputSweeper; use lightning_net_tokio::SocketDescriptor; -use lightning_transaction_sync::EsploraSyncClient; use bitcoin::secp256k1::PublicKey; use bitcoin::OutPoint; @@ -37,7 +37,7 @@ pub(crate) type ChainMonitor = chainmonitor::ChainMonitor< InMemorySigner, Arc, Arc, - Arc, + Arc, Arc, Arc, >; @@ -52,8 +52,6 @@ pub(crate) type PeerManager = lightning::ln::peer_handler::PeerManager< Arc, >; -pub(crate) type ChainSource = EsploraSyncClient>; - pub(crate) type LiquidityManager = lightning_liquidity::LiquidityManager, Arc, Arc>; @@ -63,20 +61,21 @@ pub(crate) type ChannelManager = lightning::ln::channelmanager::ChannelManager< Arc, Arc, Arc, - Arc, + Arc, Arc, Arc, >; pub(crate) type Broadcaster = crate::tx_broadcaster::TransactionBroadcaster>; -pub(crate) type FeeEstimator = crate::fee_estimator::OnchainFeeEstimator>; - pub(crate) type Wallet = - crate::wallet::Wallet, Arc, Arc>; + crate::wallet::Wallet, Arc, Arc>; -pub(crate) type KeysManager = - crate::wallet::WalletKeysManager, Arc, Arc>; +pub(crate) type KeysManager = crate::wallet::WalletKeysManager< + Arc, + Arc, + Arc, +>; pub(crate) type Router = DefaultRouter< Arc, @@ -125,7 +124,7 @@ pub(crate) type MessageRouter = lightning::onion_message::messenger::DefaultMess pub(crate) type Sweeper = OutputSweeper< Arc, Arc, - Arc, + Arc, Arc, Arc, Arc, @@ -349,119 +348,3 @@ pub struct PeerDetails { /// Indicates whether we currently have an active connection with the peer. pub is_connected: bool, } - -/// Options which apply on a per-channel basis and may change at runtime or based on negotiation -/// with our counterparty. -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -pub struct ChannelConfig { - /// Amount (in millionths of a satoshi) charged per satoshi for payments forwarded outbound - /// over the channel. - /// This may be allowed to change at runtime in a later update, however doing so must result in - /// update messages sent to notify all nodes of our updated relay fee. - /// - /// Please refer to [`LdkChannelConfig`] for further details. - pub forwarding_fee_proportional_millionths: u32, - /// Amount (in milli-satoshi) charged for payments forwarded outbound over the channel, in - /// excess of [`ChannelConfig::forwarding_fee_proportional_millionths`]. - /// This may be allowed to change at runtime in a later update, however doing so must result in - /// update messages sent to notify all nodes of our updated relay fee. - /// - /// Please refer to [`LdkChannelConfig`] for further details. - pub forwarding_fee_base_msat: u32, - /// The difference in the CLTV value between incoming HTLCs and an outbound HTLC forwarded over - /// the channel this config applies to. - /// - /// Please refer to [`LdkChannelConfig`] for further details. - pub cltv_expiry_delta: u16, - /// Limit our total exposure to potential loss to on-chain fees on close, including in-flight - /// HTLCs which are burned to fees as they are too small to claim on-chain and fees on - /// commitment transaction(s) broadcasted by our counterparty in excess of our own fee estimate. - /// - /// Please refer to [`LdkChannelConfig`] for further details. - pub max_dust_htlc_exposure: MaxDustHTLCExposure, - /// The additional fee we're willing to pay to avoid waiting for the counterparty's - /// `to_self_delay` to reclaim funds. - /// - /// Please refer to [`LdkChannelConfig`] for further details. - pub force_close_avoidance_max_fee_satoshis: u64, - /// If set, allows this channel's counterparty to skim an additional fee off this node's inbound - /// HTLCs. Useful for liquidity providers to offload on-chain channel costs to end users. - /// - /// Please refer to [`LdkChannelConfig`] for further details. - pub accept_underpaying_htlcs: bool, -} - -impl From for ChannelConfig { - fn from(value: LdkChannelConfig) -> Self { - Self { - forwarding_fee_proportional_millionths: value.forwarding_fee_proportional_millionths, - forwarding_fee_base_msat: value.forwarding_fee_base_msat, - cltv_expiry_delta: value.cltv_expiry_delta, - max_dust_htlc_exposure: value.max_dust_htlc_exposure.into(), - force_close_avoidance_max_fee_satoshis: value.force_close_avoidance_max_fee_satoshis, - accept_underpaying_htlcs: value.accept_underpaying_htlcs, - } - } -} - -impl From for LdkChannelConfig { - fn from(value: ChannelConfig) -> Self { - Self { - forwarding_fee_proportional_millionths: value.forwarding_fee_proportional_millionths, - forwarding_fee_base_msat: value.forwarding_fee_base_msat, - cltv_expiry_delta: value.cltv_expiry_delta, - max_dust_htlc_exposure: value.max_dust_htlc_exposure.into(), - force_close_avoidance_max_fee_satoshis: value.force_close_avoidance_max_fee_satoshis, - accept_underpaying_htlcs: value.accept_underpaying_htlcs, - } - } -} - -impl Default for ChannelConfig { - fn default() -> Self { - LdkChannelConfig::default().into() - } -} - -/// Options for how to set the max dust exposure allowed on a channel. -/// -/// See [`LdkChannelConfig::max_dust_htlc_exposure`] for details. -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -pub enum MaxDustHTLCExposure { - /// This sets a fixed limit on the total dust exposure in millisatoshis. - /// - /// Please refer to [`LdkMaxDustHTLCExposure`] for further details. - FixedLimit { - /// The fixed limit, in millisatoshis. - limit_msat: u64, - }, - /// This sets a multiplier on the feerate to determine the maximum allowed dust exposure. - /// - /// Please refer to [`LdkMaxDustHTLCExposure`] for further details. - FeeRateMultiplier { - /// The applied fee rate multiplier. - multiplier: u64, - }, -} - -impl From for MaxDustHTLCExposure { - fn from(value: LdkMaxDustHTLCExposure) -> Self { - match value { - LdkMaxDustHTLCExposure::FixedLimitMsat(limit_msat) => Self::FixedLimit { limit_msat }, - LdkMaxDustHTLCExposure::FeeRateMultiplier(multiplier) => { - Self::FeeRateMultiplier { multiplier } - }, - } - } -} - -impl From for LdkMaxDustHTLCExposure { - fn from(value: MaxDustHTLCExposure) -> Self { - match value { - MaxDustHTLCExposure::FixedLimit { limit_msat } => Self::FixedLimitMsat(limit_msat), - MaxDustHTLCExposure::FeeRateMultiplier { multiplier } => { - Self::FeeRateMultiplier(multiplier) - }, - } - } -} diff --git a/src/uniffi_types.rs b/src/uniffi_types.rs index a66bcddea..894e5d739 100644 --- a/src/uniffi_types.rs +++ b/src/uniffi_types.rs @@ -10,6 +10,9 @@ // // Make sure to add any re-exported items that need to be used in uniffi below. +pub use crate::config::{ + default_config, AnchorChannelsConfig, EsploraSyncConfig, MaxDustHTLCExposure, +}; pub use crate::graph::{ChannelInfo, ChannelUpdateInfo, NodeAnnouncementInfo, NodeInfo}; pub use crate::payment::store::{LSPFeeLimits, PaymentDirection, PaymentKind, PaymentStatus}; pub use crate::payment::{MaxTotalRoutingFeeLimit, QrPaymentResult, SendingParameters}; diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index b1c053f66..30da1682d 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -9,7 +9,6 @@ use persist::KVStoreWalletPersister; use crate::logger::{log_error, log_info, log_trace, Logger}; -use crate::config::{BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP, BDK_WALLET_SYNC_TIMEOUT_SECS}; use crate::fee_estimator::{ConfirmationTarget, FeeEstimator}; use crate::Error; @@ -26,9 +25,9 @@ use lightning::sign::{ use lightning::util::message_signing; use lightning_invoice::RawBolt11Invoice; +use bdk_chain::spk_client::{FullScanRequest, SyncRequest}; use bdk_chain::ChainPosition; -use bdk_esplora::EsploraAsyncExt; -use bdk_wallet::{KeychainKind, PersistedWallet, SignOptions}; +use bdk_wallet::{KeychainKind, PersistedWallet, SignOptions, Update}; use bitcoin::blockdata::constants::WITNESS_SCALE_FACTOR; use bitcoin::blockdata::locktime::absolute::LockTime; @@ -42,20 +41,12 @@ use bitcoin::{ Amount, ScriptBuf, Transaction, TxOut, Txid, WPubkeyHash, WitnessProgram, WitnessVersion, }; -use esplora_client::AsyncClient as EsploraAsyncClient; - -use std::ops::{Deref, DerefMut}; +use std::ops::Deref; use std::sync::{Arc, Mutex}; -use std::time::Duration; pub(crate) mod persist; pub(crate) mod ser; -enum WalletSyncStatus { - Completed, - InProgress { subscribers: tokio::sync::broadcast::Sender> }, -} - pub(crate) struct Wallet where B::Target: BroadcasterInterface, @@ -65,11 +56,8 @@ where // A BDK on-chain wallet. inner: Mutex>, persister: Mutex, - esplora_client: EsploraAsyncClient, broadcaster: B, fee_estimator: E, - // A Mutex holding the current sync status. - sync_status: Mutex, logger: L, } @@ -81,86 +69,38 @@ where { pub(crate) fn new( wallet: bdk_wallet::PersistedWallet, - wallet_persister: KVStoreWalletPersister, esplora_client: EsploraAsyncClient, - broadcaster: B, fee_estimator: E, logger: L, + wallet_persister: KVStoreWalletPersister, broadcaster: B, fee_estimator: E, logger: L, ) -> Self { let inner = Mutex::new(wallet); let persister = Mutex::new(wallet_persister); - let sync_status = Mutex::new(WalletSyncStatus::Completed); - Self { inner, persister, esplora_client, broadcaster, fee_estimator, sync_status, logger } + Self { inner, persister, broadcaster, fee_estimator, logger } } - pub(crate) async fn sync(&self) -> Result<(), Error> { - if let Some(mut sync_receiver) = self.register_or_subscribe_pending_sync() { - log_info!(self.logger, "Sync in progress, skipping."); - return sync_receiver.recv().await.map_err(|e| { - debug_assert!(false, "Failed to receive wallet sync result: {:?}", e); - log_error!(self.logger, "Failed to receive wallet sync result: {:?}", e); - Error::WalletOperationFailed - })?; - } - - let res = { - let full_scan_request = self.inner.lock().unwrap().start_full_scan().build(); - - let wallet_sync_timeout_fut = tokio::time::timeout( - Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), - self.esplora_client.full_scan( - full_scan_request, - BDK_CLIENT_STOP_GAP, - BDK_CLIENT_CONCURRENCY, - ), - ); - - match wallet_sync_timeout_fut.await { - Ok(res) => match res { - Ok(update) => { - let mut locked_wallet = self.inner.lock().unwrap(); - match locked_wallet.apply_update(update) { - Ok(()) => { - let mut locked_persister = self.persister.lock().unwrap(); - locked_wallet.persist(&mut locked_persister).map_err(|e| { - log_error!(self.logger, "Failed to persist wallet: {}", e); - Error::PersistenceFailed - })?; - - Ok(()) - }, - Err(e) => { - log_error!( - self.logger, - "Sync failed due to chain connection error: {}", - e - ); - Err(Error::WalletOperationFailed) - }, - } - }, - Err(e) => match *e { - esplora_client::Error::Reqwest(he) => { - log_error!( - self.logger, - "Sync failed due to HTTP connection error: {}", - he - ); - Err(Error::WalletOperationFailed) - }, - _ => { - log_error!(self.logger, "Sync failed due to Esplora error: {}", e); - Err(Error::WalletOperationFailed) - }, - }, - }, - Err(e) => { - log_error!(self.logger, "On-chain wallet sync timed out: {}", e); - Err(Error::WalletOperationTimeout) - }, - } - }; + pub(crate) fn get_full_scan_request(&self) -> FullScanRequest { + self.inner.lock().unwrap().start_full_scan().build() + } - self.propagate_result_to_subscribers(res); + pub(crate) fn get_incremental_sync_request(&self) -> SyncRequest<(KeychainKind, u32)> { + self.inner.lock().unwrap().start_sync_with_revealed_spks().build() + } - res + pub(crate) fn apply_update(&self, update: impl Into) -> Result<(), Error> { + let mut locked_wallet = self.inner.lock().unwrap(); + match locked_wallet.apply_update(update) { + Ok(()) => { + let mut locked_persister = self.persister.lock().unwrap(); + locked_wallet.persist(&mut locked_persister).map_err(|e| { + log_error!(self.logger, "Failed to persist wallet: {}", e); + Error::PersistenceFailed + })?; + + Ok(()) + }, + Err(e) => { + log_error!(self.logger, "Sync failed due to chain connection error: {}", e); + Err(Error::WalletOperationFailed) + }, + } } pub(crate) fn create_funding_transaction( @@ -343,59 +283,6 @@ where Ok(txid) } - - fn register_or_subscribe_pending_sync( - &self, - ) -> Option>> { - let mut sync_status_lock = self.sync_status.lock().unwrap(); - match sync_status_lock.deref_mut() { - WalletSyncStatus::Completed => { - // We're first to register for a sync. - let (tx, _) = tokio::sync::broadcast::channel(1); - *sync_status_lock = WalletSyncStatus::InProgress { subscribers: tx }; - None - }, - WalletSyncStatus::InProgress { subscribers } => { - // A sync is in-progress, we subscribe. - let rx = subscribers.subscribe(); - Some(rx) - }, - } - } - - fn propagate_result_to_subscribers(&self, res: Result<(), Error>) { - // Send the notification to any other tasks that might be waiting on it by now. - { - let mut sync_status_lock = self.sync_status.lock().unwrap(); - match sync_status_lock.deref_mut() { - WalletSyncStatus::Completed => { - // No sync in-progress, do nothing. - return; - }, - WalletSyncStatus::InProgress { subscribers } => { - // A sync is in-progress, we notify subscribers. - if subscribers.receiver_count() > 0 { - match subscribers.send(res) { - Ok(_) => (), - Err(e) => { - debug_assert!( - false, - "Failed to send wallet sync result to subscribers: {:?}", - e - ); - log_error!( - self.logger, - "Failed to send wallet sync result to subscribers: {:?}", - e - ); - }, - } - } - *sync_status_lock = WalletSyncStatus::Completed; - }, - } - } - } } impl WalletSource for Wallet diff --git a/tests/common/mod.rs b/tests/common/mod.rs index a7cd87323..9c712286a 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -8,11 +8,10 @@ #![cfg(any(test, cln_test, vss_test))] #![allow(dead_code)] +use ldk_node::config::{Config, EsploraSyncConfig}; use ldk_node::io::sqlite_store::SqliteStore; use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus}; -use ldk_node::{ - Builder, Config, Event, LightningBalance, LogLevel, Node, NodeError, PendingSweepBalance, -}; +use ldk_node::{Builder, Event, LightningBalance, LogLevel, Node, NodeError, PendingSweepBalance}; use lightning::ln::msgs::SocketAddress; use lightning::ln::{PaymentHash, PaymentPreimage}; @@ -218,8 +217,6 @@ pub(crate) fn random_config(anchor_channels: bool) -> Config { } config.network = Network::Regtest; - config.onchain_wallet_sync_interval_secs = 100000; - config.wallet_sync_interval_secs = 100000; println!("Setting network: {}", config.network); let rand_dir = random_storage_path(); @@ -281,8 +278,11 @@ pub(crate) fn setup_two_nodes( pub(crate) fn setup_node(electrsd: &ElectrsD, config: Config) -> TestNode { let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); + let mut sync_config = EsploraSyncConfig::default(); + sync_config.onchain_wallet_sync_interval_secs = 100000; + sync_config.lightning_wallet_sync_interval_secs = 100000; setup_builder!(builder, config); - builder.set_esplora_server(esplora_url.clone()); + builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); let test_sync_store = Arc::new(TestSyncStore::new(config.storage_dir_path.into())); let node = builder.build_with_store(test_sync_store).unwrap(); node.start().unwrap(); diff --git a/tests/integration_tests_cln.rs b/tests/integration_tests_cln.rs index 13b5c44c6..c3ade673f 100644 --- a/tests/integration_tests_cln.rs +++ b/tests/integration_tests_cln.rs @@ -45,7 +45,7 @@ fn test_cln() { // Setup LDK Node let config = common::random_config(true); let mut builder = Builder::from_config(config); - builder.set_esplora_server("http://127.0.0.1:3002".to_string()); + builder.set_chain_source_esplora("http://127.0.0.1:3002".to_string(), None); let node = builder.build().unwrap(); node.start().unwrap(); diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 6d33e80c6..13f3ab0be 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -14,6 +14,7 @@ use common::{ setup_node, setup_two_nodes, wait_for_tx, TestSyncStore, }; +use ldk_node::config::EsploraSyncConfig; use ldk_node::payment::{PaymentKind, QrPaymentResult, SendingParameters}; use ldk_node::{Builder, Event, NodeError}; @@ -102,8 +103,11 @@ fn multi_hop_sending() { let mut nodes = Vec::new(); for _ in 0..5 { let config = random_config(true); + let mut sync_config = EsploraSyncConfig::default(); + sync_config.onchain_wallet_sync_interval_secs = 100000; + sync_config.lightning_wallet_sync_interval_secs = 100000; setup_builder!(builder, config); - builder.set_esplora_server(esplora_url.clone()); + builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); let node = builder.build().unwrap(); node.start().unwrap(); nodes.push(node); @@ -182,7 +186,7 @@ fn connect_to_public_testnet_esplora() { let mut config = random_config(true); config.network = Network::Testnet; setup_builder!(builder, config); - builder.set_esplora_server("https://blockstream.info/testnet/api".to_string()); + builder.set_chain_source_esplora("https://blockstream.info/testnet/api".to_string(), None); let node = builder.build().unwrap(); node.start().unwrap(); node.stop().unwrap(); @@ -198,8 +202,11 @@ fn start_stop_reinit() { let test_sync_store: Arc = Arc::new(TestSyncStore::new(config.storage_dir_path.clone().into())); + let mut sync_config = EsploraSyncConfig::default(); + sync_config.onchain_wallet_sync_interval_secs = 100000; + sync_config.lightning_wallet_sync_interval_secs = 100000; setup_builder!(builder, config); - builder.set_esplora_server(esplora_url.clone()); + builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); let node = builder.build_with_store(Arc::clone(&test_sync_store)).unwrap(); node.start().unwrap(); @@ -236,7 +243,7 @@ fn start_stop_reinit() { drop(node); setup_builder!(builder, config); - builder.set_esplora_server(esplora_url.clone()); + builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); let reinitialized_node = builder.build_with_store(Arc::clone(&test_sync_store)).unwrap(); reinitialized_node.start().unwrap(); diff --git a/tests/integration_tests_vss.rs b/tests/integration_tests_vss.rs index c572fbcd8..483902375 100644 --- a/tests/integration_tests_vss.rs +++ b/tests/integration_tests_vss.rs @@ -18,7 +18,7 @@ fn channel_full_cycle_with_vss_store() { let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); let config_a = common::random_config(true); let mut builder_a = Builder::from_config(config_a); - builder_a.set_esplora_server(esplora_url.clone()); + builder_a.set_chain_source_esplora(esplora_url.clone(), None); let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap(); let node_a = builder_a.build_with_vss_store(vss_base_url.clone(), "node_1_store".to_string()).unwrap(); @@ -27,7 +27,7 @@ fn channel_full_cycle_with_vss_store() { println!("\n== Node B =="); let config_b = common::random_config(true); let mut builder_b = Builder::from_config(config_b); - builder_b.set_esplora_server(esplora_url); + builder_b.set_chain_source_esplora(esplora_url.clone(), None); let node_b = builder_b.build_with_vss_store(vss_base_url, "node_2_store".to_string()).unwrap(); node_b.start().unwrap();