diff --git a/.env-template b/.env-template index e179811b0d..1ff2e79d9a 100644 --- a/.env-template +++ b/.env-template @@ -34,9 +34,15 @@ YAGNA_DATADIR="." #ACCOUNT_LIST="${YAGNA_DATADIR}/accounts.json" #PAYMENT_SHUTDOWN_TIMEOUT_SECS=10 -## All drivers -#RINKEBY_GETH_ADDR=http://1.geth.testnet.golem.network:55555 +### All drivers + +## Set list of Ethereum nodes used by payment driver for specified networks. +## By default payment driver will query nodes addresses from DNS record `{network_name}.rpc-node.dev.golem.network`. +## Setting any of these variables will disable DNS lookup mechanism and use custom list of nodes instead for chosen network. #MAINNET_GETH_ADDR=https://geth.golem.network:55555 +#GOERLI_GETH_ADDR=https://rpc.ankr.com/eth_goerli +#POLYGON_GETH_ADDR=https://bor.golem.network,https://polygon-rpc.com +#MUMBAI_GETH_ADDR=https://matic-mumbai.chainstacklabs.com ## ERC20 driver. #ETH_FAUCET_ADDRESS=http://faucet.testnet.golem.network:4000/donate diff --git a/Cargo.lock b/Cargo.lock index 5426f69bae..f3c9f515a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1901,18 +1901,6 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" -[[package]] -name = "enum-as-inner" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21cdad81446a7f7dc43f6a77409efeb9733d2fa65553efef6018ef257c959b73" -dependencies = [ - "heck 0.4.1", - "proc-macro2", - "quote", - "syn 1.0.107", -] - [[package]] name = "enum-as-inner" version = "0.5.1" @@ -5436,7 +5424,7 @@ dependencies = [ "tokio 1.25.0", "tokio-native-tls", "tower-service", - "trust-dns-resolver 0.22.0", + "trust-dns-resolver", "url", "wasm-bindgen", "wasm-bindgen-futures", @@ -7162,31 +7150,6 @@ dependencies = [ "tracing-serde", ] -[[package]] -name = "trust-dns-proto" -version = "0.21.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c31f240f59877c3d4bb3b3ea0ec5a6a0cff07323580ff8c7a605cd7d08b255d" -dependencies = [ - "async-trait", - "cfg-if 1.0.0", - "data-encoding", - "enum-as-inner 0.4.0", - "futures-channel", - "futures-io", - "futures-util", - "idna 0.2.3", - "ipnet", - "lazy_static", - "log", - "rand 0.8.5", - "smallvec", - "thiserror", - "tinyvec", - "tokio 1.25.0", - "url", -] - [[package]] name = "trust-dns-proto" version = "0.22.0" @@ -7196,7 +7159,7 @@ dependencies = [ "async-trait", "cfg-if 1.0.0", "data-encoding", - "enum-as-inner 0.5.1", + "enum-as-inner", "futures-channel", "futures-io", "futures-util", @@ -7212,27 +7175,6 @@ dependencies = [ "url", ] -[[package]] -name = "trust-dns-resolver" -version = "0.21.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4ba72c2ea84515690c9fcef4c6c660bb9df3036ed1051686de84605b74fd558" -dependencies = [ - "cfg-if 1.0.0", - "futures-util", - "ipconfig", - "lazy_static", - "log", - "lru-cache", - "parking_lot 0.12.1", - "resolv-conf", - "smallvec", - "thiserror", - "tokio 1.25.0", - "tokio-openssl", - "trust-dns-proto 0.21.2", -] - [[package]] name = "trust-dns-resolver" version = "0.22.0" @@ -7249,8 +7191,9 @@ dependencies = [ "smallvec", "thiserror", "tokio 1.25.0", + "tokio-openssl", "tracing", - "trust-dns-proto 0.22.0", + "trust-dns-proto", ] [[package]] @@ -8150,14 +8093,18 @@ dependencies = [ "maplit", "num-bigint 0.3.3", "num-traits", + "rand 0.8.5", "rlp", "serde", "serde_json", "sha3 0.8.2", "structopt", + "test-case 3.1.0", "thiserror", "tiny-keccak 2.0.2", "tokio 1.25.0", + "trust-dns-resolver", + "url", "uuid 0.8.2", "web3 0.16.0", "ya-client-model", @@ -9115,7 +9062,7 @@ dependencies = [ "log", "regex", "thiserror", - "trust-dns-resolver 0.21.2", + "trust-dns-resolver", "url", "ya-relay-stack", ] diff --git a/Cargo.toml b/Cargo.toml index 7afe4d5e54..beb642cdb6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -218,6 +218,11 @@ members = [ "test-utils/test-framework/framework-macro" ] +[workspace.dependencies] +rand = "0.8.5" +url = "2.3.1" +trust-dns-resolver = "0.22" + [patch.crates-io] ## SERVICES ya-identity = { path = "core/identity" } diff --git a/core/payment-driver/base/src/db/models.rs b/core/payment-driver/base/src/db/models.rs index fffa65c363..8fe0de520f 100644 --- a/core/payment-driver/base/src/db/models.rs +++ b/core/payment-driver/base/src/db/models.rs @@ -97,7 +97,19 @@ pub struct PaymentEntity { pub network: Network, } -#[derive(AsExpression, FromSqlRow, PartialEq, Eq, Debug, Clone, Copy, FromPrimitive, Default)] +#[derive( + AsExpression, + FromSqlRow, + PartialEq, + Eq, + PartialOrd, + Ord, + Debug, + Clone, + Copy, + FromPrimitive, + Default, +)] #[sql_type = "Integer"] pub enum Network { Mainnet = 1, //Main Ethereum chain diff --git a/core/payment-driver/erc20/Cargo.toml b/core/payment-driver/erc20/Cargo.toml index 514e5872f6..295fc5b1c0 100644 --- a/core/payment-driver/erc20/Cargo.toml +++ b/core/payment-driver/erc20/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" [features] default = [] +integration = [] [dependencies] async-trait = "0.1" @@ -33,7 +34,9 @@ tiny-keccak = { version = "2.0", features = ["keccak"] } tokio = { version = "1", features = ["full"] } uuid = { version = "0.8", features = ["v4"] } web3 = { version = "0.16", default-features = false, features = [ "http-tls", "signing", "ws-tls-tokio" ] } - +rand = { workspace = true } +url = { workspace=true } +trust-dns-resolver = { workspace=true, default-features = false } ## yagna dependencies ya-payment-driver = "0.3" ya-client-model = "0.5" @@ -46,3 +49,4 @@ actix-rt = "2.7" dotenv = "0.15.0" env_logger = "0.7.1" structopt = "0.3" +test-case = "3.1.0" diff --git a/core/payment-driver/erc20/src/erc20/ethereum.rs b/core/payment-driver/erc20/src/erc20/ethereum.rs index c90e3ad757..627abea62e 100644 --- a/core/payment-driver/erc20/src/erc20/ethereum.rs +++ b/core/payment-driver/erc20/src/erc20/ethereum.rs @@ -1,6 +1,8 @@ #![allow(clippy::too_many_arguments)] +use futures::prelude::*; use std::collections::HashMap; +use std::pin::pin; use std::sync::Arc; use bigdecimal::BigDecimal; @@ -199,12 +201,19 @@ async fn get_next_nonce_pending_with( pub async fn with_clients(network: Network, mut f: F) -> Result where F: FnMut(Web3) -> R, - R: futures::Future>, + R: Future>, { - let clients = get_clients(network).await?; + lazy_static! { + static ref RESOLVER: super::rpc_resolv::RpcResolver = super::rpc_resolv::RpcResolver::new(); + }; + + let mut clients = pin!(RESOLVER + .clients_for(network) + .await + .map_err(GenericError::new)?); let mut last_err: Option = None; - for client in clients { + while let Some(client) = clients.next().await { match f(client).await { Ok(result) => return Ok(result), Err(ClientError::Web3(e)) => match e { @@ -526,67 +535,6 @@ async fn get_tx_receipt_with( .map_err(Into::into) } -fn get_rpc_addr_from_env(network: Network) -> Vec { - match network { - Network::Mainnet => { - collect_rpc_addr_from("MAINNET_GETH_ADDR", "https://geth.golem.network:55555") - } - Network::Rinkeby => collect_rpc_addr_from( - "RINKEBY_GETH_ADDR", - "http://geth.testnet.golem.network:55555", - ), - Network::Goerli => { - collect_rpc_addr_from("GOERLI_GETH_ADDR", "https://rpc.ankr.com/eth_goerli") - } - Network::Polygon => collect_rpc_addr_from( - "POLYGON_GETH_ADDR", - "https://bor.golem.network,https://polygon-rpc.com", - ), - Network::Mumbai => collect_rpc_addr_from( - "MUMBAI_GETH_ADDR", - "https://matic-mumbai.chainstacklabs.com", - ), - } -} - -fn collect_rpc_addr_from(env: &str, default: &str) -> Vec { - std::env::var(env) - .ok() - .unwrap_or_else(|| default.to_string()) - .split(',') - .map(|path| path.to_string()) - .collect() -} - -async fn get_clients(network: Network) -> Result>, GenericError> { - let geth_addrs = get_rpc_addr_from_env(network); - let mut clients: Vec> = Default::default(); - - for geth_addr in geth_addrs { - { - let client_map = WEB3_CLIENT_MAP.read().await; - if let Some(client) = client_map.get(&geth_addr).cloned() { - clients.push(client); - continue; - } - } - - let transport = match web3::transports::Http::new(&geth_addr) { - Ok(t) => t, - Err(_) => continue, - }; - - let client = Web3::new(transport); - - let mut client_map = WEB3_CLIENT_MAP.write().await; - client_map.insert(geth_addr, client.clone()); - - clients.push(client); - } - - Ok(clients) -} - fn get_env(network: Network) -> config::EnvConfiguration { match network { Network::Mainnet => *config::MAINNET_CONFIG, diff --git a/core/payment-driver/erc20/src/erc20/mod.rs b/core/payment-driver/erc20/src/erc20/mod.rs index ff7487775c..6aeb35d288 100644 --- a/core/payment-driver/erc20/src/erc20/mod.rs +++ b/core/payment-driver/erc20/src/erc20/mod.rs @@ -10,4 +10,5 @@ pub mod wallet; mod config; pub mod eth_utils; mod gasless_transfer; +mod rpc_resolv; pub mod transaction; diff --git a/core/payment-driver/erc20/src/erc20/rpc_resolv.rs b/core/payment-driver/erc20/src/erc20/rpc_resolv.rs new file mode 100644 index 0000000000..50143b2b97 --- /dev/null +++ b/core/payment-driver/erc20/src/erc20/rpc_resolv.rs @@ -0,0 +1,271 @@ +use crate::erc20::rpc_resolv::NameResolver::{DnsLookup, StaticList}; +use anyhow::Context; +use futures::prelude::*; +use rand::prelude::*; +use rand::thread_rng; +use std::collections::btree_map::Entry; +use std::collections::BTreeMap; +use std::env; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; +use tokio::sync::RwLock; +use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; +use trust_dns_resolver::TokioAsyncResolver; +use web3::transports::Http; +use web3::Web3; +use ya_payment_driver::db::models::Network; + +pub struct RpcResolver { + network_resolvers: Arc>>, +} + +impl RpcResolver { + pub fn new() -> Self { + let network_resolvers = Arc::new(Mutex::new(BTreeMap::new())); + Self { network_resolvers } + } + + pub async fn clients_for( + &self, + network: Network, + ) -> anyhow::Result>> { + let n = { + let mut g = self + .network_resolvers + .lock() + .expect("RpcResolver mutex poisoned"); + match g.entry(network) { + Entry::Occupied(v) => v.get().clone(), + Entry::Vacant(v) => { + let network_resolver = NetworkResolver::from_env(network)?; + v.insert(network_resolver.clone()); + network_resolver + } + } + }; + Ok(n.clients().await) + } +} + +#[derive(Clone)] +struct NetworkResolver { + inner: Arc>, +} + +struct NetworkResolverInner { + name_resolver: NameResolver, + last_ep: Option<(Web3, Arc)>, +} + +impl NetworkResolver { + fn from_env(network: Network) -> anyhow::Result { + let name_resolver = NameResolver::from_env(network)?; + let last_ep = None; + let inner = Arc::new(RwLock::new(NetworkResolverInner { + name_resolver, + last_ep, + })); + + Ok(Self { inner }) + } + + pub async fn clients(&self) -> impl Stream> { + let last_ep = self.inner.read().await.last_ep.clone(); + if let Some((web3, web3url)) = last_ep { + let resolver = self.clone(); + return ResolvStream::Ready { + web3: Some(web3), + web3url, + resolver, + } + .stream(); + } + let mut g = self.clone().inner.write_owned().await; + if let Some(ts) = g.name_resolver.expires() { + if ts < Instant::now() { + g.name_resolver.refresh().await; + } + } + let mut t = thread_rng(); + let mut names: Vec<_> = g.name_resolver.names().collect(); + names.as_mut_slice().shuffle(&mut t); + ResolvStream::TryNames { names, g }.stream() + } +} + +enum NameResolver { + StaticList(Vec>), + DnsLookup { + resolver: TokioAsyncResolver, + dns_name: String, + last_names: Vec>, + expire: Instant, + }, +} + +fn parse_env_list(names: &'_ str) -> impl Iterator> + '_ { + names.split(',').filter_map(|name| { + let name = name.trim(); + if url::Url::parse(name).is_ok() { + Some(name.into()) + } else { + None + } + }) +} + +const DNS_REFRESH_TIMEOUT: Duration = Duration::from_secs(300); + +impl NameResolver { + fn from_env(network: Network) -> anyhow::Result { + let network_upper = network.to_string().to_uppercase(); + let env_name = format!("{network_upper}_GETH_ADDR"); + if let Ok(names) = env::var(env_name) { + return Ok(StaticList(parse_env_list(&names).collect())); + } + let resolver = TokioAsyncResolver::tokio(ResolverConfig::google(), ResolverOpts::default()) + .context("Failed to create dns resolver for rpc-node lookup")?; + let expire = Instant::now() - Duration::from_secs(1); + let last_names = Default::default(); + let network_lower = network.to_string().to_lowercase(); + let dns_name = format!("{network_lower}.rpc-node.dev.golem.network"); + + Ok(DnsLookup { + resolver, + dns_name, + last_names, + expire, + }) + } + + fn expires(&self) -> Option { + match self { + Self::StaticList(_) => None, + Self::DnsLookup { expire, .. } => Some(*expire), + } + } + + fn names(&self) -> impl Iterator> + 'static { + match self { + Self::StaticList(items) => items.clone().into_iter(), + Self::DnsLookup { last_names, .. } => last_names.clone().into_iter(), + } + } + + async fn refresh(&mut self) { + match self { + Self::StaticList(_) => (), + Self::DnsLookup { + resolver, + expire, + last_names, + dns_name, + } => { + if let Ok(names) = resolver.txt_lookup(dns_name.as_str()).await { + *last_names = names + .iter() + .filter_map(|x| x.iter().next().and_then(|v| std::str::from_utf8(v).ok())) + .map(Into::into) + .collect(); + *expire = Instant::now() + DNS_REFRESH_TIMEOUT; + } + } + } + } +} + +enum ResolvStream { + Ready { + web3: Option>, + web3url: Arc, + resolver: NetworkResolver, + }, + TryNames { + names: Vec>, + g: tokio::sync::OwnedRwLockWriteGuard, + }, +} + +impl ResolvStream { + fn stream(self) -> impl Stream> { + fn do_try_names( + mut names: Vec>, + mut g: tokio::sync::OwnedRwLockWriteGuard, + ) -> Option<(Web3, ResolvStream)> { + while let Some(web3name) = names.pop() { + if let Ok(web3t) = web3::transports::Http::new(&web3name) { + let web3 = Web3::new(web3t); + g.last_ep = Some((web3.clone(), web3name)); + return Some((web3, ResolvStream::TryNames { names, g })); + } + } + g.last_ep = None; + + None + } + + stream::unfold(self, |state| async move { + match state { + Self::Ready { + mut web3, + web3url, + resolver, + } => { + if let Some(web3) = web3.take() { + Some(( + web3, + Self::Ready { + web3: None, + web3url, + resolver, + }, + )) + } + // case 2: find other values + else { + let g = resolver.clone().inner.write_owned().await; + let mut t = thread_rng(); + let mut names: Vec<_> = + g.name_resolver.names().filter(|u| u != &web3url).collect(); + names.as_mut_slice().shuffle(&mut t); + + do_try_names(names, g) + } + } + Self::TryNames { names, g } => do_try_names(names, g), + } + }) + } +} + +#[cfg(test)] +mod integration_test { + use super::*; + use std::sync::atomic::{AtomicUsize, Ordering}; + use test_case::test_case; + + #[test_case(Network::Mainnet; "check chain_id on Eth Mainnet")] + #[test_case(Network::Goerli; "check chain_id on Goerli")] + #[test_case(Network::Mumbai; "check chain_id on Mumbai")] + #[test_case(Network::Polygon; "check chain_id on Polygon Mainnet")] + #[test_case(Network::Rinkeby; "check chain_id on Rinkeby")] + #[tokio::test] + #[cfg_attr(not(feature = "integration"), ignore)] + async fn test_resolver(network: Network) { + let resolver = NetworkResolver::from_env(network).unwrap(); + eprintln!("starting check for: {}", network); + let cnt = AtomicUsize::new(0); + resolver + .clients() + .await + .for_each(|client: Web3| { + let _ = cnt.fetch_add(1, Ordering::Relaxed); + async move { + let chain_id = client.eth().chain_id().await; + assert_eq!(network as usize, chain_id.unwrap().as_usize()); + } + }) + .await; + assert!(cnt.into_inner() > 0); + } +} diff --git a/utils/networking/Cargo.toml b/utils/networking/Cargo.toml index 17f16f1b12..bd1b22261e 100644 --- a/utils/networking/Cargo.toml +++ b/utils/networking/Cargo.toml @@ -18,8 +18,8 @@ regex = "1" ya-relay-stack = { version = "0.5.0", optional = true } anyhow = { version = "1.0", optional = true } -trust-dns-resolver = { version = "0.21", optional = true } -url = { version = "2.2", optional = true } +trust-dns-resolver = { workspace=true, optional = true } +url = { workspace=true, optional = true } ipnet = { version = "2.3", optional = true } thiserror = { version = "1.0", optional = true }