Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor syncing and introduce ChainSource #365

Merged
merged 12 commits into from
Oct 15, 2024
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::str::FromStr;
fn main() {
let mut builder = Builder::new();
builder.set_network(Network::Testnet);
builder.set_esplora_server("https://blockstream.info/testnet/api".to_string());
builder.set_chain_source_esplora("https://blockstream.info/testnet/api".to_string(), None);
builder.set_gossip_source_rgs("https://rapidsync.lightningdevkit.org/testnet/snapshot".to_string());

let node = builder.build().unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ class LibraryTest {
println("Config 2: $config2")

val builder1 = Builder.fromConfig(config1)
builder1.setEsploraServer(esploraEndpoint)
builder1.setChainSourceEsplora(esploraEndpoint, null)
val builder2 = Builder.fromConfig(config2)
builder2.setEsploraServer(esploraEndpoint)
builder2.setChainSourceEsplora(esploraEndpoint, null)

val node1 = builder1.build()
val node2 = builder2.build()
Expand Down
14 changes: 9 additions & 5 deletions bindings/ldk_node.udl
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ dictionary Config {
Network network;
sequence<SocketAddress>? listening_addresses;
NodeAlias? node_alias;
u64 onchain_wallet_sync_interval_secs;
u64 wallet_sync_interval_secs;
u64 fee_rate_cache_update_interval_secs;
sequence<PublicKey> trusted_peers_0conf;
u64 probing_liquidity_limit_multiplier;
LogLevel log_level;
Expand All @@ -24,6 +21,12 @@ dictionary AnchorChannelsConfig {
u64 per_channel_reserve_sats;
};

dictionary EsploraSyncConfig {
u64 onchain_wallet_sync_interval_secs;
u64 lightning_wallet_sync_interval_secs;
u64 fee_rate_cache_update_interval_secs;
};

interface Builder {
constructor();
[Name=from_config]
Expand All @@ -32,7 +35,7 @@ interface Builder {
[Throws=BuildError]
void set_entropy_seed_bytes(sequence<u8> seed_bytes);
void set_entropy_bip39_mnemonic(Mnemonic mnemonic, string? passphrase);
void set_esplora_server(string esplora_server_url);
void set_chain_source_esplora(string server_url, EsploraSyncConfig? config);
void set_gossip_source_p2p();
void set_gossip_source_rgs(string rgs_server_url);
void set_liquidity_source_lsps2(SocketAddress address, PublicKey node_id, string? token);
Expand Down Expand Up @@ -218,11 +221,12 @@ dictionary NodeStatus {
boolean is_running;
boolean is_listening;
BestBlock current_best_block;
u64? latest_wallet_sync_timestamp;
u64? latest_lightning_wallet_sync_timestamp;
u64? latest_onchain_wallet_sync_timestamp;
u64? latest_fee_rate_cache_update_timestamp;
u64? latest_rgs_snapshot_timestamp;
u64? latest_node_announcement_broadcast_timestamp;
u32? latest_channel_monitor_archival_height;
};

dictionary BestBlock {
Expand Down
2 changes: 1 addition & 1 deletion bindings/python/src/ldk_node/test_ldk_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def setup_node(tmp_dir, esplora_endpoint, listening_addresses):
config = default_config()
builder = Builder.from_config(config)
builder.set_storage_dir_path(tmp_dir)
builder.set_esplora_server(esplora_endpoint)
builder.set_chain_source_esplora(esplora_endpoint, None)
builder.set_network(DEFAULT_TEST_NETWORK)
builder.set_listening_addresses(listening_addresses)
return builder.build()
Expand Down
173 changes: 87 additions & 86 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
// accordance with one or both of these licenses.

use crate::config::{
default_user_config, Config, DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS, DEFAULT_ESPLORA_SERVER_URL,
WALLET_KEYS_SEED_LEN,
};
use crate::chain::{ChainSource, DEFAULT_ESPLORA_SERVER_URL};
use crate::config::{default_user_config, Config, EsploraSyncConfig, WALLET_KEYS_SEED_LEN};

use crate::connection::ConnectionManager;
use crate::event::EventQueue;
use crate::fee_estimator::OnchainFeeEstimator;
use crate::gossip::GossipSource;
use crate::io;
use crate::io::sqlite_store::SqliteStore;
use crate::io::utils::{read_node_metrics, write_node_metrics};
#[cfg(any(vss, vss_test))]
use crate::io::vss_store::VssStore;
use crate::liquidity::LiquiditySource;
Expand All @@ -29,6 +28,7 @@ use crate::types::{
};
use crate::wallet::persist::KVStoreWalletPersister;
use crate::wallet::Wallet;
use crate::{io, NodeMetrics};
use crate::{LogLevel, Node};

use lightning::chain::{chainmonitor, BestBlock, Watch};
Expand All @@ -52,8 +52,6 @@ use lightning::util::sweep::OutputSweeper;

use lightning_persister::fs_store::FilesystemStore;

use lightning_transaction_sync::EsploraSyncClient;

use lightning_liquidity::lsps2::client::LSPS2ClientConfig;
use lightning_liquidity::{LiquidityClientConfig, LiquidityManager};

Expand All @@ -79,7 +77,7 @@ use std::time::SystemTime;

#[derive(Debug, Clone)]
enum ChainDataSourceConfig {
Esplora(String),
Esplora { server_url: String, sync_config: Option<EsploraSyncConfig> },
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -239,8 +237,14 @@ impl NodeBuilder {
}

/// Configures the [`Node`] instance to source its chain data from the given Esplora server.
pub fn set_esplora_server(&mut self, esplora_server_url: String) -> &mut Self {
self.chain_data_source_config = Some(ChainDataSourceConfig::Esplora(esplora_server_url));
///
/// If no `sync_config` is given, default values are used. See [`EsploraSyncConfig`] for more
/// information.
pub fn set_chain_source_esplora(
&mut self, server_url: String, sync_config: Option<EsploraSyncConfig>,
) -> &mut Self {
self.chain_data_source_config =
Some(ChainDataSourceConfig::Esplora { server_url, sync_config });
self
}

Expand Down Expand Up @@ -466,8 +470,13 @@ impl ArcedNodeBuilder {
}

/// Configures the [`Node`] instance to source its chain data from the given Esplora server.
pub fn set_esplora_server(&self, esplora_server_url: String) {
self.inner.write().unwrap().set_esplora_server(esplora_server_url);
///
/// If no `sync_config` is given, default values are used. See [`EsploraSyncConfig`] for more
/// information.
pub fn set_chain_source_esplora(
&self, server_url: String, sync_config: Option<EsploraSyncConfig>,
) {
self.inner.write().unwrap().set_chain_source_esplora(server_url, sync_config);
}

/// Configures the [`Node`] instance to source its gossip data from the Lightning peer-to-peer
Expand Down Expand Up @@ -555,6 +564,19 @@ fn build_with_store_internal(
liquidity_source_config: Option<&LiquiditySourceConfig>, seed_bytes: [u8; 64],
logger: Arc<FilesystemLogger>, kv_store: Arc<DynStore>,
) -> Result<Node, BuildError> {
// Initialize the status fields.
let is_listening = Arc::new(AtomicBool::new(false));
let node_metrics = match read_node_metrics(Arc::clone(&kv_store), Arc::clone(&logger)) {
Ok(metrics) => Arc::new(RwLock::new(metrics)),
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
Arc::new(RwLock::new(NodeMetrics::default()))
} else {
return Err(BuildError::ReadFailed);
}
},
};

// Initialize the on-chain wallet and chain access
let xprv = bitcoin::bip32::Xpriv::new_master(config.network, &seed_bytes).map_err(|e| {
log_error!(logger, "Failed to derive master secret: {}", e);
Expand Down Expand Up @@ -586,62 +608,54 @@ fn build_with_store_internal(
})?,
};

let (esplora_client, tx_sync, tx_broadcaster, fee_estimator) = match chain_data_source_config {
Some(ChainDataSourceConfig::Esplora(server_url)) => {
let mut client_builder = esplora_client::Builder::new(&server_url.clone());
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
let esplora_client = client_builder.build_async().unwrap();
let tx_sync = Arc::new(EsploraSyncClient::from_client(
esplora_client.clone(),
Arc::clone(&logger),
));
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
tx_sync.client().clone(),
Arc::clone(&logger),
));
let fee_estimator = Arc::new(OnchainFeeEstimator::new(
tx_sync.client().clone(),
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger)));
let fee_estimator = Arc::new(OnchainFeeEstimator::new());
let wallet = Arc::new(Wallet::new(
bdk_wallet,
wallet_persister,
Arc::clone(&tx_broadcaster),
Arc::clone(&fee_estimator),
Arc::clone(&logger),
));

let chain_source = match chain_data_source_config {
Some(ChainDataSourceConfig::Esplora { server_url, sync_config }) => {
let sync_config = sync_config.unwrap_or(EsploraSyncConfig::default());
Arc::new(ChainSource::new_esplora(
server_url.clone(),
sync_config,
Arc::clone(&wallet),
Arc::clone(&fee_estimator),
Arc::clone(&tx_broadcaster),
Arc::clone(&kv_store),
Arc::clone(&config),
Arc::clone(&logger),
));
(esplora_client, tx_sync, tx_broadcaster, fee_estimator)
Arc::clone(&node_metrics),
))
},
None => {
// Default to Esplora client.
let server_url = DEFAULT_ESPLORA_SERVER_URL.to_string();
let mut client_builder = esplora_client::Builder::new(&server_url);
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
let esplora_client = client_builder.build_async().unwrap();
let tx_sync = Arc::new(EsploraSyncClient::from_client(
esplora_client.clone(),
Arc::clone(&logger),
));
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
tx_sync.client().clone(),
Arc::clone(&logger),
));
let fee_estimator = Arc::new(OnchainFeeEstimator::new(
tx_sync.client().clone(),
let sync_config = EsploraSyncConfig::default();
Arc::new(ChainSource::new_esplora(
server_url.clone(),
sync_config,
Arc::clone(&wallet),
Arc::clone(&fee_estimator),
Arc::clone(&tx_broadcaster),
Arc::clone(&kv_store),
Arc::clone(&config),
Arc::clone(&logger),
));
(esplora_client, tx_sync, tx_broadcaster, fee_estimator)
Arc::clone(&node_metrics),
))
},
};

let runtime = Arc::new(RwLock::new(None));
let wallet = Arc::new(Wallet::new(
bdk_wallet,
wallet_persister,
esplora_client,
Arc::clone(&tx_broadcaster),
Arc::clone(&fee_estimator),
Arc::clone(&logger),
));

// Initialize the ChainMonitor
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
Some(Arc::clone(&tx_sync)),
Some(Arc::clone(&chain_source)),
Arc::clone(&tx_broadcaster),
Arc::clone(&logger),
Arc::clone(&fee_estimator),
Expand Down Expand Up @@ -824,23 +838,24 @@ fn build_with_store_internal(
Arc::new(GossipSource::new_p2p(Arc::clone(&network_graph), Arc::clone(&logger)));

// Reset the RGS sync timestamp in case we somehow switch gossip sources
io::utils::write_latest_rgs_sync_timestamp(
0,
Arc::clone(&kv_store),
Arc::clone(&logger),
)
.map_err(|e| {
log_error!(logger, "Failed writing to store: {}", e);
BuildError::WriteFailed
})?;
{
let mut locked_node_metrics = node_metrics.write().unwrap();
locked_node_metrics.latest_rgs_snapshot_timestamp = None;
write_node_metrics(
&*locked_node_metrics,
Arc::clone(&kv_store),
Arc::clone(&logger),
)
.map_err(|e| {
log_error!(logger, "Failed writing to store: {}", e);
BuildError::WriteFailed
})?;
}
p2p_source
},
GossipSourceConfig::RapidGossipSync(rgs_server) => {
let latest_sync_timestamp = io::utils::read_latest_rgs_sync_timestamp(
Arc::clone(&kv_store),
Arc::clone(&logger),
)
.unwrap_or(0);
let latest_sync_timestamp =
node_metrics.read().unwrap().latest_rgs_snapshot_timestamp.unwrap_or(0);
Arc::new(GossipSource::new_rgs(
rgs_server.clone(),
latest_sync_timestamp,
Expand All @@ -857,7 +872,7 @@ fn build_with_store_internal(
let liquidity_manager = Arc::new(LiquidityManager::new(
Arc::clone(&keys_manager),
Arc::clone(&channel_manager),
Some(Arc::clone(&tx_sync)),
Some(Arc::clone(&chain_source)),
None,
None,
liquidity_client_config,
Expand Down Expand Up @@ -925,7 +940,7 @@ fn build_with_store_internal(
let output_sweeper = match io::utils::read_output_sweeper(
Arc::clone(&tx_broadcaster),
Arc::clone(&fee_estimator),
Arc::clone(&tx_sync),
Arc::clone(&chain_source),
Arc::clone(&keys_manager),
Arc::clone(&kv_store),
Arc::clone(&logger),
Expand All @@ -937,7 +952,7 @@ fn build_with_store_internal(
channel_manager.current_best_block(),
Arc::clone(&tx_broadcaster),
Arc::clone(&fee_estimator),
Some(Arc::clone(&tx_sync)),
Some(Arc::clone(&chain_source)),
Arc::clone(&keys_manager),
Arc::clone(&keys_manager),
Arc::clone(&kv_store),
Expand Down Expand Up @@ -999,23 +1014,14 @@ fn build_with_store_internal(
let (stop_sender, _) = tokio::sync::watch::channel(());
let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(());

let is_listening = Arc::new(AtomicBool::new(false));
let latest_wallet_sync_timestamp = Arc::new(RwLock::new(None));
let latest_onchain_wallet_sync_timestamp = Arc::new(RwLock::new(None));
let latest_fee_rate_cache_update_timestamp = Arc::new(RwLock::new(None));
let latest_rgs_snapshot_timestamp = Arc::new(RwLock::new(None));
let latest_node_announcement_broadcast_timestamp = Arc::new(RwLock::new(None));
let latest_channel_monitor_archival_height = Arc::new(RwLock::new(None));

Ok(Node {
runtime,
stop_sender,
event_handling_stopped_sender,
config,
wallet,
tx_sync,
chain_source,
tx_broadcaster,
fee_estimator,
event_queue,
channel_manager,
chain_monitor,
Expand All @@ -1034,12 +1040,7 @@ fn build_with_store_internal(
peer_store,
payment_store,
is_listening,
latest_wallet_sync_timestamp,
latest_onchain_wallet_sync_timestamp,
latest_fee_rate_cache_update_timestamp,
latest_rgs_snapshot_timestamp,
latest_node_announcement_broadcast_timestamp,
latest_channel_monitor_archival_height,
node_metrics,
})
}

Expand Down
Loading
Loading