diff --git a/.gitignore b/.gitignore index d01bd1a..efe3eb1 100644 --- a/.gitignore +++ b/.gitignore @@ -18,4 +18,8 @@ Cargo.lock # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ \ No newline at end of file +#.idea/ + +# Added by cargo + +/target diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..a48a7ee --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "citrea-e2e" +version = "0.1.0" +edition = "2021" + +[dependencies] +bollard = { version = "0.17.1" } +bitcoin = { version = "0.32.2", features = ["serde", "rand"] } +bitcoincore-rpc = { version = "0.18.0" } +futures = "0.3" +rand = "0.8" +toml = "0.8.0" +serde = { version = "1.0.192", default-features = false, features = ["alloc", "derive"] } +serde_json = { version = "1.0", default-features = false } +tokio = { version = "1.39", features = ["full"] } +anyhow = { version = "1.0.68", default-features = false, features = ["std"] } +tempfile = "3.8" +async-trait = "0.1.71" + +[patch.crates-io] +bitcoincore-rpc = { version = "0.18.0", git = "https://github.com/chainwayxyz/rust-bitcoincore-rpc.git", rev = "0ae498d" } \ No newline at end of file diff --git a/src/bitcoin.rs b/src/bitcoin.rs new file mode 100644 index 0000000..d75cebd --- /dev/null +++ b/src/bitcoin.rs @@ -0,0 +1,366 @@ +use std::collections::HashSet; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use anyhow::{bail, Context}; +use async_trait::async_trait; +use bitcoin::Address; +use bitcoin_da::service::{get_relevant_blobs_from_txs, FINALITY_DEPTH}; +use bitcoin_da::spec::blob::BlobWithSender; +use bitcoincore_rpc::json::AddressType::Bech32m; +use bitcoincore_rpc::{Auth, Client, RpcApi}; +use citrea_primitives::REVEAL_BATCH_PROOF_PREFIX; +use futures::TryStreamExt; +use tokio::process::Command; +use tokio::sync::OnceCell; +use tokio::time::sleep; + +use super::config::BitcoinConfig; +use super::docker::DockerEnv; +use super::framework::TestContext; +use super::node::{LogProvider, Node, Restart, SpawnOutput}; +use super::Result; +use crate::node::NodeKind; + +pub struct BitcoinNode { + spawn_output: SpawnOutput, + pub config: BitcoinConfig, + client: Client, + gen_addr: OnceCell
, + docker_env: Arc>, +} + +impl BitcoinNode { + pub async fn new(config: &BitcoinConfig, docker: Arc>) -> Result { + let spawn_output = Self::spawn(config, &docker).await?; + + let rpc_url = format!( + "http://127.0.0.1:{}/wallet/{}", + config.rpc_port, + NodeKind::Bitcoin + ); + let client = Client::new( + &rpc_url, + Auth::UserPass(config.rpc_user.clone(), config.rpc_password.clone()), + ) + .await + .context("Failed to create RPC client")?; + + wait_for_rpc_ready(&client, None).await?; + + Ok(Self { + spawn_output, + config: config.clone(), + client, + gen_addr: OnceCell::new(), + docker_env: docker, + }) + } + + pub async fn wait_mempool_len( + &self, + target_len: usize, + timeout: Option, + ) -> Result<()> { + let timeout = timeout.unwrap_or(Duration::from_secs(300)); + let start = Instant::now(); + while start.elapsed() < timeout { + let mempool_len = self.get_raw_mempool().await?.len(); + if mempool_len >= target_len { + return Ok(()); + } + sleep(Duration::from_millis(500)).await; + } + bail!("Timeout waiting for mempool to reach length {}", target_len) + } + + pub async fn fund_wallet(&self, name: String, blocks: u64) -> Result<()> { + let rpc_url = format!("http://127.0.0.1:{}/wallet/{}", self.config.rpc_port, name); + let client = Client::new( + &rpc_url, + Auth::UserPass( + self.config.rpc_user.clone(), + self.config.rpc_password.clone(), + ), + ) + .await + .context("Failed to create RPC client")?; + + let gen_addr = client + .get_new_address(None, Some(Bech32m)) + .await? + .assume_checked(); + client.generate_to_address(blocks, &gen_addr).await?; + Ok(()) + } + + pub async fn get_finalized_height(&self) -> Result { + Ok(self.get_block_count().await? - FINALITY_DEPTH + 1) + } + + pub async fn get_relevant_blobs_from_block(&self, height: u64) -> Result> { + let hash = self.get_block_hash(height).await?; + let block = self.get_block(&hash).await?; + + Ok(get_relevant_blobs_from_txs( + block.txdata, + REVEAL_BATCH_PROOF_PREFIX, + )) + } + + async fn wait_for_shutdown(&self) -> Result<()> { + let timeout_duration = Duration::from_secs(30); + let start = std::time::Instant::now(); + + while start.elapsed() < timeout_duration { + if !self.is_process_running().await? { + println!("Bitcoin daemon has stopped successfully"); + return Ok(()); + } + sleep(Duration::from_millis(200)).await; + } + + bail!("Timeout waiting for Bitcoin daemon to stop") + } + + async fn is_process_running(&self) -> Result { + let data_dir = &self.config.data_dir; + let output = Command::new("pgrep") + .args(["-f", &format!("bitcoind.*{}", data_dir.display())]) + .output() + .await?; + + Ok(output.status.success()) + } + + // Infallible, discard already loaded errors + async fn load_wallets(&self) { + let _ = self.load_wallet(&NodeKind::Bitcoin.to_string()).await; + let _ = self.load_wallet(&NodeKind::Sequencer.to_string()).await; + let _ = self.load_wallet(&NodeKind::Prover.to_string()).await; + } + + // Switch this over to Node signature once we add support for docker to citrea nodes + async fn spawn(config: &BitcoinConfig, docker: &Arc>) -> Result { + match docker.as_ref() { + Some(docker) => docker.spawn(config.into()).await, + None => ::spawn(config), + } + } +} + +#[async_trait] +impl RpcApi for BitcoinNode { + async fn call serde::de::Deserialize<'a>>( + &self, + cmd: &str, + args: &[serde_json::Value], + ) -> bitcoincore_rpc::Result { + self.client.call(cmd, args).await + } + + // Override deprecated generate method. + // Uses or lazy init gen_addr and forward to `generate_to_address` + async fn generate( + &self, + block_num: u64, + _maxtries: Option, + ) -> bitcoincore_rpc::Result> { + let addr = self + .gen_addr + .get_or_init(|| async { + self.client + .get_new_address(None, Some(Bech32m)) + .await + .expect("Failed to generate address") + .assume_checked() + }) + .await; + + self.generate_to_address(block_num, addr).await + } +} + +impl Node for BitcoinNode { + type Config = BitcoinConfig; + type Client = Client; + + fn spawn(config: &Self::Config) -> Result { + let args = config.args(); + println!("Running bitcoind with args : {args:?}"); + + Command::new("bitcoind") + .args(&args) + .kill_on_drop(true) + .envs(config.env.clone()) + .spawn() + .context("Failed to spawn bitcoind process") + .map(SpawnOutput::Child) + } + + fn spawn_output(&mut self) -> &mut SpawnOutput { + &mut self.spawn_output + } + + async fn wait_for_ready(&self, timeout: Option) -> Result<()> { + println!("Waiting for ready"); + let start = Instant::now(); + let timeout = timeout.unwrap_or(Duration::from_secs(30)); + while start.elapsed() < timeout { + if wait_for_rpc_ready(&self.client, Some(timeout)) + .await + .is_ok() + { + return Ok(()); + } + tokio::time::sleep(Duration::from_millis(500)).await; + } + anyhow::bail!("Node failed to become ready within the specified timeout") + } + + fn client(&self) -> &Self::Client { + &self.client + } + + fn env(&self) -> Vec<(&'static str, &'static str)> { + self.config.env.clone() + } + + fn config_mut(&mut self) -> &mut Self::Config { + &mut self.config + } +} + +impl Restart for BitcoinNode { + async fn wait_until_stopped(&mut self) -> Result<()> { + self.client.stop().await?; + self.stop().await?; + + match &self.spawn_output { + SpawnOutput::Child(_) => self.wait_for_shutdown().await, + SpawnOutput::Container(output) => { + let Some(env) = self.docker_env.as_ref() else { + bail!("Missing docker environment") + }; + env.docker.stop_container(&output.id, None).await?; + + env.docker + .wait_container::(&output.id, None) + .try_collect::>() + .await?; + env.docker.remove_container(&output.id, None).await?; + println!("Docker container {} succesfully removed", output.id); + Ok(()) + } + } + } + + async fn start(&mut self, config: Option) -> Result<()> { + if let Some(config) = config { + self.config = config + } + self.spawn_output = Self::spawn(&self.config, &self.docker_env).await?; + + self.wait_for_ready(None).await?; + + // Reload wallets after restart + self.load_wallets().await; + + Ok(()) + } +} + +impl LogProvider for BitcoinNode { + fn kind(&self) -> NodeKind { + NodeKind::Bitcoin + } + + fn log_path(&self) -> PathBuf { + self.config.data_dir.join("regtest").join("debug.log") + } +} + +pub struct BitcoinNodeCluster { + inner: Vec, +} + +impl BitcoinNodeCluster { + pub async fn new(ctx: &TestContext) -> Result { + let n_nodes = ctx.config.test_case.n_nodes; + let mut cluster = Self { + inner: Vec::with_capacity(n_nodes), + }; + for config in ctx.config.bitcoin.iter() { + let node = BitcoinNode::new(config, Arc::clone(&ctx.docker)).await?; + cluster.inner.push(node) + } + + Ok(cluster) + } + + pub async fn stop_all(&mut self) -> Result<()> { + for node in &mut self.inner { + RpcApi::stop(node).await?; + node.stop().await?; + } + Ok(()) + } + + pub async fn wait_for_sync(&self, timeout: Duration) -> Result<()> { + let start = Instant::now(); + while start.elapsed() < timeout { + let mut heights = HashSet::new(); + for node in &self.inner { + let height = node.get_block_count().await?; + heights.insert(height); + } + + if heights.len() == 1 { + return Ok(()); + } + + sleep(Duration::from_secs(1)).await; + } + bail!("Nodes failed to sync within the specified timeout") + } + + // Connect all bitcoin nodes between them + pub async fn connect_nodes(&self) -> Result<()> { + for (i, from_node) in self.inner.iter().enumerate() { + for (j, to_node) in self.inner.iter().enumerate() { + if i != j { + let ip = match &to_node.spawn_output { + SpawnOutput::Container(container) => container.ip.clone(), + _ => "127.0.0.1".to_string(), + }; + + let add_node_arg = format!("{}:{}", ip, to_node.config.p2p_port); + from_node.add_node(&add_node_arg).await?; + } + } + } + Ok(()) + } + + pub fn get(&self, index: usize) -> Option<&BitcoinNode> { + self.inner.get(index) + } + + #[allow(unused)] + pub fn get_mut(&mut self, index: usize) -> Option<&mut BitcoinNode> { + self.inner.get_mut(index) + } +} + +async fn wait_for_rpc_ready(client: &Client, timeout: Option) -> Result<()> { + let start = Instant::now(); + let timeout = timeout.unwrap_or(Duration::from_secs(300)); + while start.elapsed() < timeout { + match client.get_blockchain_info().await { + Ok(_) => return Ok(()), + Err(_) => sleep(Duration::from_millis(500)).await, + } + } + Err(anyhow::anyhow!("Timeout waiting for RPC to be ready")) +} diff --git a/src/config/bitcoin.rs b/src/config/bitcoin.rs new file mode 100644 index 0000000..447bad3 --- /dev/null +++ b/src/config/bitcoin.rs @@ -0,0 +1,64 @@ +use std::path::PathBuf; + +use bitcoin::Network; +use tempfile::TempDir; + +#[derive(Debug, Clone)] +pub struct BitcoinConfig { + pub p2p_port: u16, + pub rpc_port: u16, + pub rpc_user: String, + pub rpc_password: String, + pub data_dir: PathBuf, + pub extra_args: Vec<&'static str>, + pub network: Network, + pub docker_image: Option, + pub env: Vec<(&'static str, &'static str)>, + pub idx: usize, +} + +impl Default for BitcoinConfig { + fn default() -> Self { + Self { + p2p_port: 0, + rpc_port: 0, + rpc_user: "user".to_string(), + rpc_password: "password".to_string(), + data_dir: TempDir::new() + .expect("Failed to create temporary directory") + .into_path(), + extra_args: Vec::new(), + network: Network::Regtest, + docker_image: Some("bitcoin/bitcoin:latest".to_string()), + env: Vec::new(), + idx: 0, + } + } +} + +impl BitcoinConfig { + fn base_args(&self) -> Vec { + vec![ + "-regtest".to_string(), + format!("-datadir={}", self.data_dir.display()), + format!("-port={}", self.p2p_port), + format!("-rpcport={}", self.rpc_port), + format!("-rpcuser={}", self.rpc_user), + format!("-rpcpassword={}", self.rpc_password), + "-server".to_string(), + "-daemonwait".to_string(), + "-txindex".to_string(), + "-addresstype=bech32m".to_string(), + "-debug=net".to_string(), + "-debug=rpc".to_string(), + ] + } + + pub fn args(&self) -> Vec { + [ + self.base_args(), + self.extra_args.iter().map(|&s| s.to_string()).collect(), + ] + .concat() + } +} diff --git a/src/config/docker.rs b/src/config/docker.rs new file mode 100644 index 0000000..1e5c0ab --- /dev/null +++ b/src/config/docker.rs @@ -0,0 +1,77 @@ +use std::path::PathBuf; + +use super::{BitcoinConfig, FullSequencerConfig}; +use crate::utils::get_genesis_path; + +#[derive(Debug)] +pub struct VolumeConfig { + pub name: String, + pub target: String, +} + +#[derive(Debug)] +pub struct DockerConfig { + pub ports: Vec, + pub image: String, + pub cmd: Vec, + pub log_path: PathBuf, + pub volume: VolumeConfig, +} + +impl From<&BitcoinConfig> for DockerConfig { + fn from(v: &BitcoinConfig) -> Self { + let mut args = v.args(); + + // Docker specific args + args.extend([ + "-rpcallowip=0.0.0.0/0".to_string(), + "-rpcbind=0.0.0.0".to_string(), + "-daemonwait=0".to_string(), + ]); + + Self { + ports: vec![v.rpc_port, v.p2p_port], + image: v + .docker_image + .clone() + .unwrap_or_else(|| "bitcoin/bitcoin:latest".to_string()), + cmd: args, + log_path: v.data_dir.join("regtest").join("debug.log"), + volume: VolumeConfig { + name: format!("bitcoin-{}", v.idx), + target: "/home/bitcoin/.bitcoin".to_string(), + }, + } + } +} + +impl From<&FullSequencerConfig> for DockerConfig { + fn from(v: &FullSequencerConfig) -> Self { + let args = vec![ + "--da-layer".to_string(), + "bitcoin".to_string(), + "--rollup-config-path".to_string(), + "sequencer_rollup_config.toml".to_string(), + "--sequencer-config-path".to_string(), + "sequencer_config.toml".to_string(), + "--genesis-paths".to_string(), + get_genesis_path(v.dir.parent().expect("Couldn't get parent dir")) + .display() + .to_string(), + ]; + + Self { + ports: vec![v.rollup.rpc.bind_port], + image: v + .docker_image + .clone() + .unwrap_or_else(|| "citrea:latest".to_string()), // Default to local image + cmd: args, + log_path: v.dir.join("stdout"), + volume: VolumeConfig { + name: "sequencer".to_string(), + target: "/sequencer/data".to_string(), + }, + } + } +} diff --git a/src/config/mod.rs b/src/config/mod.rs new file mode 100644 index 0000000..9bb8453 --- /dev/null +++ b/src/config/mod.rs @@ -0,0 +1,30 @@ +mod bitcoin; +mod docker; +mod rollup; +mod test; +mod test_case; +mod utils; + +use std::path::PathBuf; + +pub use bitcoin::BitcoinConfig; +pub use citrea_sequencer::SequencerConfig; +pub use docker::DockerConfig; +pub use rollup::{default_rollup_config, RollupConfig}; +pub use sov_stf_runner::ProverConfig; +pub use test::TestConfig; +pub use test_case::{TestCaseConfig, TestCaseEnv}; +pub use utils::config_to_file; + +#[derive(Clone, Debug)] +pub struct FullL2NodeConfig { + pub node: T, + pub rollup: RollupConfig, + pub docker_image: Option, + pub dir: PathBuf, + pub env: Vec<(&'static str, &'static str)>, +} + +pub type FullSequencerConfig = FullL2NodeConfig; +pub type FullProverConfig = FullL2NodeConfig; +pub type FullFullNodeConfig = FullL2NodeConfig<()>; diff --git a/src/config/rollup.rs b/src/config/rollup.rs new file mode 100644 index 0000000..33c5525 --- /dev/null +++ b/src/config/rollup.rs @@ -0,0 +1,68 @@ +use bitcoin_da::service::BitcoinServiceConfig; +use sov_stf_runner::{FullNodeConfig, RollupPublicKeys, RpcConfig, StorageConfig}; +use tempfile::TempDir; + +use super::BitcoinConfig; +use crate::utils::get_tx_backup_dir; +pub type RollupConfig = FullNodeConfig; + +pub fn default_rollup_config() -> RollupConfig { + RollupConfig { + rpc: RpcConfig { + bind_host: "127.0.0.1".into(), + bind_port: 0, + max_connections: 100, + max_request_body_size: 10 * 1024 * 1024, + max_response_body_size: 10 * 1024 * 1024, + batch_requests_limit: 50, + enable_subscriptions: true, + max_subscriptions_per_connection: 100, + }, + storage: StorageConfig { + path: TempDir::new() + .expect("Failed to create temporary directory") + .into_path(), + db_max_open_files: None, + }, + runner: None, + da: BitcoinServiceConfig { + node_url: String::new(), + node_username: String::from("user"), + node_password: String::from("password"), + network: bitcoin::Network::Regtest, + da_private_key: None, + tx_backup_dir: get_tx_backup_dir(), + }, + public_keys: RollupPublicKeys { + sequencer_public_key: vec![ + 32, 64, 64, 227, 100, 193, 15, 43, 236, 156, 31, 229, 0, 161, 205, 76, 36, 124, + 137, 214, 80, 160, 30, 215, 232, 44, 171, 168, 103, 135, 124, 33, + ], + // private key [4, 95, 252, 129, 163, 193, 253, 179, 175, 19, 89, 219, 242, 209, 20, 176, 179, 239, 191, 127, 41, 204, 156, 93, 160, 18, 103, 170, 57, 210, 199, 141] + // Private Key (WIF): KwNDSCvKqZqFWLWN1cUzvMiJQ7ck6ZKqR6XBqVKyftPZtvmbE6YD + sequencer_da_pub_key: vec![ + 3, 136, 195, 18, 11, 187, 25, 37, 38, 109, 184, 237, 247, 208, 131, 219, 162, 70, + 35, 174, 234, 47, 239, 247, 60, 51, 174, 242, 247, 112, 186, 222, 30, + ], + // private key [117, 186, 249, 100, 208, 116, 89, 70, 0, 54, 110, 91, 17, 26, 29, 168, 248, 107, 46, 254, 45, 34, 218, 81, 200, 216, 33, 38, 160, 252, 172, 114] + // Private Key (WIF): L1AZdJXzDGGENBBPZGSL7dKJnwn5xSKqzszgK6CDwiBGThYQEVTo + prover_da_pub_key: vec![ + 2, 138, 232, 157, 214, 46, 7, 210, 235, 33, 105, 239, 71, 169, 105, 233, 239, 84, + 172, 112, 13, 54, 9, 206, 106, 138, 251, 218, 15, 28, 137, 112, 127, + ], + }, + } +} + +impl From for BitcoinServiceConfig { + fn from(v: BitcoinConfig) -> Self { + Self { + node_url: format!("127.0.0.1:{}", v.rpc_port), + node_username: v.rpc_user, + node_password: v.rpc_password, + network: v.network, + da_private_key: None, + tx_backup_dir: "".to_string(), + } + } +} diff --git a/src/config/test.rs b/src/config/test.rs new file mode 100644 index 0000000..3f33d4f --- /dev/null +++ b/src/config/test.rs @@ -0,0 +1,12 @@ +use super::bitcoin::BitcoinConfig; +use super::test_case::TestCaseConfig; +use super::{FullFullNodeConfig, FullProverConfig, FullSequencerConfig}; + +#[derive(Clone)] +pub struct TestConfig { + pub test_case: TestCaseConfig, + pub bitcoin: Vec, + pub sequencer: FullSequencerConfig, + pub prover: FullProverConfig, + pub full_node: FullFullNodeConfig, +} diff --git a/src/config/test_case.rs b/src/config/test_case.rs new file mode 100644 index 0000000..caaeca5 --- /dev/null +++ b/src/config/test_case.rs @@ -0,0 +1,73 @@ +use std::path::PathBuf; +use std::time::Duration; + +use tempfile::TempDir; + +#[derive(Clone, Default)] +pub struct TestCaseEnv { + pub test: Vec<(&'static str, &'static str)>, + pub full_node: Vec<(&'static str, &'static str)>, + pub sequencer: Vec<(&'static str, &'static str)>, + pub prover: Vec<(&'static str, &'static str)>, + pub bitcoin: Vec<(&'static str, &'static str)>, +} + +impl TestCaseEnv { + // Base env that should apply to every test cases + fn base_env() -> Vec<(&'static str, &'static str)> { + vec![("NO_COLOR", "1")] + } + + fn test_env(&self) -> Vec<(&'static str, &'static str)> { + [Self::base_env(), self.test.clone()].concat() + } + + pub fn sequencer(&self) -> Vec<(&'static str, &'static str)> { + [self.test_env(), self.sequencer.clone()].concat() + } + + pub fn prover(&self) -> Vec<(&'static str, &'static str)> { + [self.test_env(), self.prover.clone()].concat() + } + + pub fn full_node(&self) -> Vec<(&'static str, &'static str)> { + [self.test_env(), self.full_node.clone()].concat() + } + + pub fn bitcoin(&self) -> Vec<(&'static str, &'static str)> { + [self.test_env(), self.bitcoin.clone()].concat() + } +} + +#[derive(Clone)] +pub struct TestCaseConfig { + pub n_nodes: usize, + pub with_sequencer: bool, + pub with_full_node: bool, + pub with_prover: bool, + #[allow(unused)] + pub timeout: Duration, + pub dir: PathBuf, + pub docker: bool, + // Either a relative dir from workspace root, i.e. "./resources/genesis/devnet" + // Or an absolute path. + // Defaults to resources/genesis/bitcoin-regtest + pub genesis_dir: Option, +} + +impl Default for TestCaseConfig { + fn default() -> Self { + TestCaseConfig { + n_nodes: 1, + with_sequencer: true, + with_prover: false, + with_full_node: false, + timeout: Duration::from_secs(60), + dir: TempDir::new() + .expect("Failed to create temporary directory") + .into_path(), + docker: std::env::var("USE_DOCKER").map_or(false, |v| v.parse().unwrap_or(false)), + genesis_dir: None, + } + } +} diff --git a/src/config/utils.rs b/src/config/utils.rs new file mode 100644 index 0000000..49beb15 --- /dev/null +++ b/src/config/utils.rs @@ -0,0 +1,14 @@ +use std::path::Path; + +use serde::Serialize; + +pub fn config_to_file(config: &C, path: &P) -> std::io::Result<()> +where + C: Serialize, + P: AsRef, +{ + let toml = + toml::to_string(config).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + std::fs::write(path, toml)?; + Ok(()) +} diff --git a/src/docker.rs b/src/docker.rs new file mode 100644 index 0000000..7119cbe --- /dev/null +++ b/src/docker.rs @@ -0,0 +1,282 @@ +use std::collections::{HashMap, HashSet}; +use std::io::{stdout, Write}; +use std::path::PathBuf; + +use anyhow::{anyhow, Context, Result}; +use bollard::container::{Config, LogOutput, LogsOptions, NetworkingConfig}; +use bollard::image::CreateImageOptions; +use bollard::models::{EndpointSettings, Mount, PortBinding}; +use bollard::network::CreateNetworkOptions; +use bollard::secret::MountTypeEnum; +use bollard::service::HostConfig; +use bollard::volume::CreateVolumeOptions; +use bollard::Docker; +use futures::StreamExt; +use tokio::fs::File; +use tokio::io::AsyncWriteExt; +use tokio::task::JoinHandle; + +use super::config::DockerConfig; +use super::node::SpawnOutput; +use super::utils::generate_test_id; +use crate::node::ContainerSpawnOutput; + +pub struct DockerEnv { + pub docker: Docker, + pub network_id: String, + pub network_name: String, + id: String, + volumes: HashSet, +} + +impl DockerEnv { + pub async fn new(n_nodes: usize) -> Result { + let docker = + Docker::connect_with_local_defaults().context("Failed to connect to Docker")?; + let test_id = generate_test_id(); + let (network_id, network_name) = Self::create_network(&docker, &test_id).await?; + let volumes = Self::create_volumes(&docker, &test_id, n_nodes).await?; + + Ok(Self { + docker, + network_id, + network_name, + id: test_id, + volumes, + }) + } + + async fn create_volumes( + docker: &Docker, + test_case_id: &str, + n_nodes: usize, + ) -> Result> { + let volume_configs = vec![("bitcoin", n_nodes)]; + let mut volumes = HashSet::new(); + + for (name, n) in volume_configs { + for i in 0..n { + let volume_name = format!("{name}-{i}-{test_case_id}"); + docker + .create_volume(CreateVolumeOptions { + name: volume_name.clone(), + driver: "local".to_string(), + driver_opts: HashMap::new(), + labels: HashMap::new(), + }) + .await?; + + volumes.insert(volume_name); + } + } + + Ok(volumes) + } + + async fn create_network(docker: &Docker, test_case_id: &str) -> Result<(String, String)> { + let network_name = format!("test_network_{}", test_case_id); + let options = CreateNetworkOptions { + name: network_name.clone(), + check_duplicate: true, + driver: "bridge".to_string(), + ..Default::default() + }; + + let id = docker + .create_network(options) + .await? + .id + .context("Error getting network id")?; + Ok((id, network_name)) + } + + pub async fn spawn(&self, config: DockerConfig) -> Result { + println!("Spawning docker with config {config:#?}"); + let exposed_ports: HashMap> = config + .ports + .iter() + .map(|port| (format!("{}/tcp", port), HashMap::new())) + .collect(); + + let port_bindings: HashMap>> = config + .ports + .iter() + .map(|port| { + ( + format!("{}/tcp", port), + Some(vec![PortBinding { + host_ip: Some("0.0.0.0".to_string()), + host_port: Some(port.to_string()), + }]), + ) + }) + .collect(); + + let mut network_config = HashMap::new(); + network_config.insert(self.network_id.clone(), EndpointSettings::default()); + + let volume_name = format!("{}-{}", config.volume.name, self.id); + let mount = Mount { + target: Some(config.volume.target.clone()), + source: Some(volume_name), + typ: Some(MountTypeEnum::VOLUME), + ..Default::default() + }; + + let container_config = Config { + image: Some(config.image), + cmd: Some(config.cmd), + exposed_ports: Some(exposed_ports), + host_config: Some(HostConfig { + port_bindings: Some(port_bindings), + // binds: Some(vec![config.dir]), + mounts: Some(vec![mount]), + ..Default::default() + }), + networking_config: Some(NetworkingConfig { + endpoints_config: network_config, + }), + tty: Some(true), + ..Default::default() + }; + + let image = container_config + .image + .as_ref() + .context("Image not specified in config")?; + self.ensure_image_exists(image).await?; + + // println!("options :{options:?}"); + // println!("config :{container_config:?}"); + + let container = self + .docker + .create_container::(None, container_config) + .await + .map_err(|e| anyhow!("Failed to create Docker container {e}"))?; + + self.docker + .start_container::(&container.id, None) + .await + .context("Failed to start Docker container")?; + + let inspect_result = self.docker.inspect_container(&container.id, None).await?; + let ip_address = inspect_result + .network_settings + .and_then(|ns| ns.networks) + .and_then(|networks| { + networks + .values() + .next() + .and_then(|network| network.ip_address.clone()) + }) + .context("Failed to get container IP address")?; + + // Extract container logs to host + // This spawns a background task to continuously stream logs from the container. + // The task will run until the container is stopped or removed during cleanup. + Self::extract_container_logs(self.docker.clone(), container.id.clone(), config.log_path); + + Ok(SpawnOutput::Container(ContainerSpawnOutput { + id: container.id, + ip: ip_address, + })) + } + + async fn ensure_image_exists(&self, image: &str) -> Result<()> { + let images = self + .docker + .list_images::(None) + .await + .context("Failed to list Docker images")?; + if images + .iter() + .any(|img| img.repo_tags.contains(&image.to_string())) + { + return Ok(()); + } + + println!("Pulling image: {}", image); + let options = Some(CreateImageOptions { + from_image: image, + ..Default::default() + }); + + let mut stream = self.docker.create_image(options, None, None); + while let Some(result) = stream.next().await { + match result { + Ok(info) => { + if let (Some(status), Some(progress)) = (info.status, info.progress) { + print!("\r{}: {} ", status, progress); + stdout().flush().unwrap(); + } + } + Err(e) => return Err(anyhow::anyhow!("Failed to pull image: {}", e)), + } + } + println!("Image succesfully pulled"); + + Ok(()) + } + + pub async fn cleanup(&self) -> Result<()> { + let containers = self.docker.list_containers::(None).await?; + for container in containers { + if let (Some(id), Some(networks)) = ( + container.id, + container.network_settings.and_then(|ns| ns.networks), + ) { + if networks.contains_key(&self.network_name) { + self.docker.stop_container(&id, None).await?; + self.docker.remove_container(&id, None).await?; + } + } + } + + self.docker.remove_network(&self.network_name).await?; + + for volume_name in &self.volumes { + self.docker.remove_volume(volume_name, None).await?; + } + + Ok(()) + } + + fn extract_container_logs( + docker: Docker, + container_id: String, + log_path: PathBuf, + ) -> JoinHandle> { + tokio::spawn(async move { + if let Some(parent) = log_path.parent() { + tokio::fs::create_dir_all(parent) + .await + .context("Failed to create log directory")?; + } + let mut log_file = File::create(log_path) + .await + .context("Failed to create log file")?; + let mut log_stream = docker.logs::( + &container_id, + Some(LogsOptions { + follow: true, + stdout: true, + stderr: true, + ..Default::default() + }), + ); + + while let Some(Ok(log_output)) = log_stream.next().await { + let log_line = match log_output { + LogOutput::Console { message } | LogOutput::StdOut { message } => message, + _ => continue, + }; + log_file + .write_all(&log_line) + .await + .context("Failed to write log line")?; + } + Ok(()) + }) + } +} diff --git a/src/framework.rs b/src/framework.rs new file mode 100644 index 0000000..f7c2db1 --- /dev/null +++ b/src/framework.rs @@ -0,0 +1,200 @@ +use std::future::Future; +use std::sync::Arc; + +use bitcoincore_rpc::RpcApi; + +use super::bitcoin::BitcoinNodeCluster; +use super::config::TestConfig; +use super::docker::DockerEnv; +use super::full_node::FullNode; +use super::node::{LogProvider, LogProviderErased, Node, NodeKind}; +use super::sequencer::Sequencer; +use super::Result; +use crate::prover::Prover; +use crate::utils::tail_file; + +pub struct TestContext { + pub config: TestConfig, + pub docker: Arc>, +} + +impl TestContext { + async fn new(config: TestConfig) -> Self { + let docker = if config.test_case.docker { + Some(DockerEnv::new(config.test_case.n_nodes).await.unwrap()) + } else { + None + }; + Self { + config, + docker: Arc::new(docker), + } + } +} + +pub struct TestFramework { + ctx: TestContext, + pub bitcoin_nodes: BitcoinNodeCluster, + pub sequencer: Option, + pub prover: Option, + pub full_node: Option, + show_logs: bool, + pub initial_da_height: u64, +} + +async fn create_optional(pred: bool, f: impl Future>) -> Result> { + if pred { + Ok(Some(f.await?)) + } else { + Ok(None) + } +} + +impl TestFramework { + pub async fn new(config: TestConfig) -> Result { + anyhow::ensure!( + config.test_case.n_nodes > 0, + "At least one bitcoin node has to be running" + ); + + let ctx = TestContext::new(config).await; + + let bitcoin_nodes = BitcoinNodeCluster::new(&ctx).await?; + + // tokio::time::sleep(std::time::Duration::from_secs(30)).await; + Ok(Self { + bitcoin_nodes, + sequencer: None, + prover: None, + full_node: None, + ctx, + show_logs: true, + initial_da_height: 0, + }) + } + + pub async fn init_nodes(&mut self) -> Result<()> { + // Has to initialize sequencer first since prover and full node depend on it + self.sequencer = create_optional( + self.ctx.config.test_case.with_sequencer, + Sequencer::new(&self.ctx), + ) + .await?; + + (self.prover, self.full_node) = tokio::try_join!( + create_optional( + self.ctx.config.test_case.with_prover, + Prover::new(&self.ctx) + ), + create_optional( + self.ctx.config.test_case.with_full_node, + FullNode::new(&self.ctx) + ), + )?; + + Ok(()) + } + + fn get_nodes_as_log_provider(&self) -> Vec<&dyn LogProviderErased> { + vec![ + self.bitcoin_nodes.get(0).map(LogProvider::as_erased), + self.sequencer.as_ref().map(LogProvider::as_erased), + self.full_node.as_ref().map(LogProvider::as_erased), + self.prover.as_ref().map(LogProvider::as_erased), + ] + .into_iter() + .flatten() + .collect() + } + + pub fn show_log_paths(&self) { + if self.show_logs { + println!( + "Logs available at {}", + self.ctx.config.test_case.dir.display() + ); + + for node in self.get_nodes_as_log_provider() { + println!( + "{} logs available at : {}", + node.kind(), + node.log_path().display() + ); + } + } + } + + pub fn dump_log(&self) -> Result<()> { + println!("Dumping logs:"); + + let n_lines = std::env::var("TAIL_N_LINES") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(25); + for node in self.get_nodes_as_log_provider() { + println!("{} logs (last {n_lines} lines):", node.kind()); + if let Err(e) = tail_file(&node.log_path(), n_lines) { + eprint!("{e}"); + } + } + Ok(()) + } + + pub async fn stop(&mut self) -> Result<()> { + println!("Stopping framework..."); + + if let Some(sequencer) = &mut self.sequencer { + let _ = sequencer.stop().await; + println!("Successfully stopped sequencer"); + } + + if let Some(prover) = &mut self.prover { + let _ = prover.stop().await; + println!("Successfully stopped prover"); + } + + if let Some(full_node) = &mut self.full_node { + let _ = full_node.stop().await; + println!("Successfully stopped full_node"); + } + + let _ = self.bitcoin_nodes.stop_all().await; + println!("Successfully stopped bitcoin nodes"); + + if let Some(docker) = self.ctx.docker.as_ref() { + let _ = docker.cleanup().await; + println!("Successfully cleaned docker"); + } + + Ok(()) + } + + pub async fn fund_da_wallets(&mut self) -> Result<()> { + let da = self.bitcoin_nodes.get(0).unwrap(); + + da.create_wallet(&NodeKind::Sequencer.to_string(), None, None, None, None) + .await?; + da.create_wallet(&NodeKind::Prover.to_string(), None, None, None, None) + .await?; + da.create_wallet(&NodeKind::Bitcoin.to_string(), None, None, None, None) + .await?; + + let blocks_to_mature = 100; + let blocks_to_fund = 25; + if self.ctx.config.test_case.with_sequencer { + da.fund_wallet(NodeKind::Sequencer.to_string(), blocks_to_fund) + .await?; + } + + if self.ctx.config.test_case.with_prover { + da.fund_wallet(NodeKind::Prover.to_string(), blocks_to_fund) + .await?; + } + da.fund_wallet(NodeKind::Bitcoin.to_string(), blocks_to_fund) + .await?; + + da.generate(blocks_to_mature, None).await?; + self.initial_da_height = da.get_block_count().await?; + Ok(()) + } +} diff --git a/src/full_node.rs b/src/full_node.rs new file mode 100644 index 0000000..d53d8ae --- /dev/null +++ b/src/full_node.rs @@ -0,0 +1,181 @@ +use std::fs::File; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::process::Stdio; + +use anyhow::{bail, Context}; +use sov_rollup_interface::rpc::{SequencerCommitmentResponse, VerifiedProofResponse}; +use tokio::process::Command; +use tokio::time::{sleep, Duration, Instant}; + +use super::config::{config_to_file, FullFullNodeConfig, TestConfig}; +use super::framework::TestContext; +use super::node::{LogProvider, Node, NodeKind, SpawnOutput}; +use super::utils::{get_citrea_path, get_stderr_path, get_stdout_path, retry}; +use super::Result; +use crate::utils::get_genesis_path; +use crate::evm::make_test_client; +use crate::test_client::TestClient; + +#[allow(unused)] +pub struct FullNode { + spawn_output: SpawnOutput, + config: FullFullNodeConfig, + pub client: Box, +} + +impl FullNode { + pub async fn new(ctx: &TestContext) -> Result { + let TestConfig { + full_node: full_node_config, + .. + } = &ctx.config; + + let spawn_output = Self::spawn(full_node_config)?; + + let socket_addr = SocketAddr::new( + full_node_config + .rollup + .rpc + .bind_host + .parse() + .context("Failed to parse bind host")?, + full_node_config.rollup.rpc.bind_port, + ); + let client = retry(|| async { make_test_client(socket_addr).await }, None).await?; + + Ok(Self { + spawn_output, + config: full_node_config.clone(), + client, + }) + } + + pub async fn wait_for_sequencer_commitments( + &self, + height: u64, + timeout: Option, + ) -> Result> { + let start = Instant::now(); + let timeout = timeout.unwrap_or(Duration::from_secs(30)); + + loop { + if start.elapsed() >= timeout { + bail!("FullNode failed to get sequencer commitments within the specified timeout"); + } + + match self + .client + .ledger_get_sequencer_commitments_on_slot_by_number(height) + .await + { + Ok(Some(commitments)) => return Ok(commitments), + Ok(None) => sleep(Duration::from_millis(500)).await, + Err(e) => bail!("Error fetching sequencer commitments: {}", e), + } + } + } + + pub async fn wait_for_zkproofs( + &self, + height: u64, + timeout: Option, + ) -> Result> { + let start = Instant::now(); + let timeout = timeout.unwrap_or(Duration::from_secs(30)); + + loop { + if start.elapsed() >= timeout { + bail!("FullNode failed to get zkproofs within the specified timeout"); + } + + match self + .client + .ledger_get_verified_proofs_by_slot_height(height) + .await + { + Some(proofs) => return Ok(proofs), + None => sleep(Duration::from_millis(500)).await, + } + } + } +} + +impl Node for FullNode { + type Config = FullFullNodeConfig; + type Client = TestClient; + + fn spawn(config: &Self::Config) -> Result { + let citrea = get_citrea_path(); + let dir = &config.dir; + + let stdout_file = + File::create(get_stdout_path(dir)).context("Failed to create stdout file")?; + let stderr_file = + File::create(get_stderr_path(dir)).context("Failed to create stderr file")?; + + let rollup_config_path = dir.join("full_node_rollup_config.toml"); + config_to_file(&config.rollup, &rollup_config_path)?; + + Command::new(citrea) + .arg("--da-layer") + .arg("bitcoin") + .arg("--rollup-config-path") + .arg(rollup_config_path) + .arg("--genesis-paths") + .arg(get_genesis_path( + dir.parent().expect("Couldn't get parent dir"), + )) + .envs(config.env.clone()) + .stdout(Stdio::from(stdout_file)) + .stderr(Stdio::from(stderr_file)) + .kill_on_drop(true) + .spawn() + .context("Failed to spawn citrea process") + .map(SpawnOutput::Child) + } + + fn spawn_output(&mut self) -> &mut SpawnOutput { + &mut self.spawn_output + } + + async fn wait_for_ready(&self, timeout: Option) -> Result<()> { + let start = Instant::now(); + + let timeout = timeout.unwrap_or(Duration::from_secs(30)); + while start.elapsed() < timeout { + if self + .client + .ledger_get_head_soft_confirmation() + .await + .is_ok() + { + return Ok(()); + } + sleep(Duration::from_millis(500)).await; + } + bail!("FullNode failed to become ready within the specified timeout") + } + + fn client(&self) -> &Self::Client { + &self.client + } + + fn env(&self) -> Vec<(&'static str, &'static str)> { + self.config.env.clone() + } + + fn config_mut(&mut self) -> &mut Self::Config { + &mut self.config + } +} + +impl LogProvider for FullNode { + fn kind(&self) -> NodeKind { + NodeKind::FullNode + } + + fn log_path(&self) -> PathBuf { + get_stdout_path(&self.config.dir) + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..32cc181 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,13 @@ +mod bitcoin; +pub mod config; +mod docker; +pub mod framework; +mod full_node; +pub mod node; +mod prover; +mod sequencer; +pub mod test_case; + +mod utils; + +pub(crate) type Result = anyhow::Result; diff --git a/src/node.rs b/src/node.rs new file mode 100644 index 0000000..d81ddb1 --- /dev/null +++ b/src/node.rs @@ -0,0 +1,165 @@ +use std::fmt; +use std::path::PathBuf; +use std::time::Duration; + +use anyhow::Context; +use bollard::container::StopContainerOptions; +use bollard::Docker; +use tokio::process::Child; + +use super::Result; +use crate::test_client::TestClient; +use crate::test_helpers::wait_for_l2_block; + +#[derive(Debug)] +pub enum NodeKind { + Bitcoin, + Prover, + Sequencer, + FullNode, +} + +impl fmt::Display for NodeKind { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + NodeKind::Bitcoin => write!(f, "bitcoin"), + NodeKind::Prover => write!(f, "prover"), + NodeKind::Sequencer => write!(f, "sequencer"), + NodeKind::FullNode => write!(f, "full-node"), + } + } +} + +#[derive(Debug)] +pub struct ContainerSpawnOutput { + pub id: String, + pub ip: String, +} + +#[derive(Debug)] +pub enum SpawnOutput { + Child(Child), + Container(ContainerSpawnOutput), +} + +/// The Node trait defines the common interface shared between +/// BitcoinNode, Prover, Sequencer and FullNode +pub(crate) trait Node { + type Config; + type Client; + + /// Spawn a new node with specific config and return its child + fn spawn(test_config: &Self::Config) -> Result; + fn spawn_output(&mut self) -> &mut SpawnOutput; + + fn config_mut(&mut self) -> &mut Self::Config; + + /// Stops the running node + async fn stop(&mut self) -> Result<()> { + match self.spawn_output() { + SpawnOutput::Child(process) => { + process + .kill() + .await + .context("Failed to kill child process")?; + Ok(()) + } + SpawnOutput::Container(ContainerSpawnOutput { id, .. }) => { + println!("Stopping container {id}"); + let docker = + Docker::connect_with_local_defaults().context("Failed to connect to Docker")?; + docker + .stop_container(id, Some(StopContainerOptions { t: 10 })) + .await + .context("Failed to stop Docker container")?; + Ok(()) + } + } + } + + /// Wait for the node to be reachable by its client. + async fn wait_for_ready(&self, timeout: Option) -> Result<()>; + + fn client(&self) -> &Self::Client; + + #[allow(unused)] + fn env(&self) -> Vec<(&'static str, &'static str)> { + Vec::new() + } +} + +pub trait L2Node: Node { + async fn wait_for_l2_height(&self, height: u64, timeout: Option); +} + +impl L2Node for T +where + T: Node, +{ + async fn wait_for_l2_height(&self, height: u64, timeout: Option) { + wait_for_l2_block(self.client(), height, timeout).await + } +} + +// Two patterns supported : +// - Call wait_until_stopped, runs any extra commands needed for testing purposes, call start again. +// - Call restart if you need to wait for node to be fully shutdown and brough back up with new config. +pub trait Restart: Node { + async fn wait_until_stopped(&mut self) -> Result<()>; + async fn start(&mut self, new_config: Option) -> Result<()>; + + // Default implementation to support waiting for node to be fully shutdown and brough back up with new config. + async fn restart(&mut self, new_config: Option) -> Result<()> { + self.wait_until_stopped().await?; + self.start(new_config).await + } +} + +impl Restart for T +where + T: L2Node, +{ + async fn wait_until_stopped(&mut self) -> Result<()> { + self.stop().await?; + match self.spawn_output() { + SpawnOutput::Child(pid) => pid.wait().await?, + SpawnOutput::Container(_) => unimplemented!("L2 nodes don't run in docker yet"), + }; + Ok(()) + } + + async fn start(&mut self, new_config: Option) -> Result<()> { + let config = self.config_mut(); + if let Some(new_config) = new_config { + *config = new_config + } + *self.spawn_output() = Self::spawn(config)?; + self.wait_for_ready(None).await + } +} + +pub trait LogProvider: Node { + fn kind(&self) -> NodeKind; + fn log_path(&self) -> PathBuf; + fn as_erased(&self) -> &dyn LogProviderErased + where + Self: Sized, + { + self + } +} + +pub trait LogProviderErased { + fn kind(&self) -> NodeKind; + fn log_path(&self) -> PathBuf; +} + +impl LogProviderErased for T { + fn kind(&self) -> NodeKind { + LogProvider::kind(self) + } + + fn log_path(&self) -> PathBuf { + LogProvider::log_path(self) + } +} diff --git a/src/prover.rs b/src/prover.rs new file mode 100644 index 0000000..de5c519 --- /dev/null +++ b/src/prover.rs @@ -0,0 +1,140 @@ +use std::fs::File; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::process::Stdio; + +use anyhow::Context; +use tokio::process::Command; +use tokio::time::{sleep, Duration, Instant}; + +use super::config::{config_to_file, FullProverConfig, TestConfig}; +use super::framework::TestContext; +use super::node::{LogProvider, Node, NodeKind, SpawnOutput}; +use super::utils::{get_citrea_path, get_stderr_path, get_stdout_path, retry}; +use super::Result; +use crate::evm::make_test_client; +use crate::test_client::TestClient; +use crate::test_helpers::wait_for_prover_l1_height; +use crate::utils::get_genesis_path; + +#[allow(unused)] +pub struct Prover { + spawn_output: SpawnOutput, + config: FullProverConfig, + pub client: Box, +} + +impl Prover { + pub async fn new(ctx: &TestContext) -> Result { + let TestConfig { + prover: prover_config, + .. + } = &ctx.config; + + let spawn_output = Self::spawn(prover_config)?; + + let socket_addr = SocketAddr::new( + prover_config + .rollup + .rpc + .bind_host + .parse() + .context("Failed to parse bind host")?, + prover_config.rollup.rpc.bind_port, + ); + let client = retry(|| async { make_test_client(socket_addr).await }, None).await?; + + Ok(Self { + spawn_output, + config: prover_config.to_owned(), + client, + }) + } + + pub async fn wait_for_l1_height(&self, height: u64, timeout: Option) -> Result<()> { + wait_for_prover_l1_height(&self.client, height, timeout).await + } +} + +impl Node for Prover { + type Config = FullProverConfig; + type Client = TestClient; + + fn spawn(config: &Self::Config) -> Result { + let citrea = get_citrea_path(); + let dir = &config.dir; + + let stdout_file = + File::create(get_stdout_path(dir)).context("Failed to create stdout file")?; + let stderr_file = + File::create(get_stderr_path(dir)).context("Failed to create stderr file")?; + + let config_path = dir.join("prover_config.toml"); + config_to_file(&config.node, &config_path)?; + + let rollup_config_path = dir.join("prover_rollup_config.toml"); + config_to_file(&config.rollup, &rollup_config_path)?; + + Command::new(citrea) + .arg("--da-layer") + .arg("bitcoin") + .arg("--rollup-config-path") + .arg(rollup_config_path) + .arg("--prover-config-path") + .arg(config_path) + .arg("--genesis-paths") + .arg(get_genesis_path( + dir.parent().expect("Couldn't get parent dir"), + )) + .envs(config.env.clone()) + .stdout(Stdio::from(stdout_file)) + .stderr(Stdio::from(stderr_file)) + .kill_on_drop(true) + .spawn() + .context("Failed to spawn citrea process") + .map(SpawnOutput::Child) + } + + fn spawn_output(&mut self) -> &mut SpawnOutput { + &mut self.spawn_output + } + + async fn wait_for_ready(&self, timeout: Option) -> Result<()> { + let start = Instant::now(); + let timeout = timeout.unwrap_or(Duration::from_secs(30)); + while start.elapsed() < timeout { + if self + .client + .ledger_get_head_soft_confirmation() + .await + .is_ok() + { + return Ok(()); + } + sleep(Duration::from_millis(500)).await; + } + anyhow::bail!("Prover failed to become ready within the specified timeout") + } + + fn client(&self) -> &Self::Client { + &self.client + } + + fn env(&self) -> Vec<(&'static str, &'static str)> { + self.config.env.clone() + } + + fn config_mut(&mut self) -> &mut Self::Config { + &mut self.config + } +} + +impl LogProvider for Prover { + fn kind(&self) -> NodeKind { + NodeKind::Prover + } + + fn log_path(&self) -> PathBuf { + get_stdout_path(&self.config.dir) + } +} diff --git a/src/sequencer.rs b/src/sequencer.rs new file mode 100644 index 0000000..19cfc02 --- /dev/null +++ b/src/sequencer.rs @@ -0,0 +1,143 @@ +use std::fs::File; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::process::Stdio; + +use anyhow::Context; +use tokio::process::Command; +use tokio::time::{sleep, Duration, Instant}; + +use super::config::{config_to_file, FullSequencerConfig, TestConfig}; +use super::framework::TestContext; +use super::node::{LogProvider, Node, NodeKind, SpawnOutput}; +use super::utils::{get_citrea_path, get_stderr_path, get_stdout_path, retry}; +use super::Result; +use crate::evm::make_test_client; +use crate::test_client::TestClient; +use crate::utils::get_genesis_path; + +#[allow(unused)] +pub struct Sequencer { + spawn_output: SpawnOutput, + config: FullSequencerConfig, + pub client: Box, +} + +impl Sequencer { + pub async fn new(ctx: &TestContext) -> Result { + let TestConfig { + sequencer: config, .. + } = &ctx.config; + + let spawn_output = Self::spawn(config)?; + + let socket_addr = SocketAddr::new( + config + .rollup + .rpc + .bind_host + .parse() + .context("Failed to parse bind host")?, + config.rollup.rpc.bind_port, + ); + + let client = retry(|| async { make_test_client(socket_addr).await }, None).await?; + + Ok(Self { + spawn_output, + config: config.clone(), + client, + }) + } + + pub fn dir(&self) -> &PathBuf { + &self.config.dir + } + + pub fn min_soft_confirmations_per_commitment(&self) -> u64 { + self.config.node.min_soft_confirmations_per_commitment + } +} + +impl Node for Sequencer { + type Config = FullSequencerConfig; + type Client = TestClient; + + fn spawn(config: &Self::Config) -> Result { + let citrea = get_citrea_path(); + let dir = &config.dir; + + let stdout_file = + File::create(get_stdout_path(dir)).context("Failed to create stdout file")?; + let stderr_file = + File::create(get_stderr_path(dir)).context("Failed to create stderr file")?; + + let config_path = dir.join("sequencer_config.toml"); + config_to_file(&config.node, &config_path)?; + + let rollup_config_path = dir.join("sequencer_rollup_config.toml"); + config_to_file(&config.rollup, &rollup_config_path)?; + + Command::new(citrea) + .arg("--da-layer") + .arg("bitcoin") + .arg("--rollup-config-path") + .arg(rollup_config_path) + .arg("--sequencer-config-path") + .arg(config_path) + .arg("--genesis-paths") + .arg(get_genesis_path( + dir.parent().expect("Couldn't get parent dir"), + )) + .envs(config.env.clone()) + .stdout(Stdio::from(stdout_file)) + .stderr(Stdio::from(stderr_file)) + .kill_on_drop(true) + .spawn() + .context("Failed to spawn citrea process") + .map(SpawnOutput::Child) + } + + fn spawn_output(&mut self) -> &mut SpawnOutput { + &mut self.spawn_output + } + + async fn wait_for_ready(&self, timeout: Option) -> Result<()> { + let start = Instant::now(); + let timeout = timeout.unwrap_or(Duration::from_secs(30)); + while start.elapsed() < timeout { + if self + .client + .ledger_get_head_soft_confirmation() + .await + .is_ok() + { + return Ok(()); + } + sleep(Duration::from_millis(500)).await; + } + anyhow::bail!("Sequencer failed to become ready within the specified timeout") + } + + fn client(&self) -> &Self::Client { + &self.client + } + + fn env(&self) -> Vec<(&'static str, &'static str)> { + self.config.env.clone() + } + + fn config_mut(&mut self) -> &mut Self::Config { + &mut self.config + } +} + +impl LogProvider for Sequencer { + fn kind(&self) -> NodeKind { + NodeKind::Sequencer + } + + fn log_path(&self) -> PathBuf { + get_stdout_path(self.dir()) + } +} diff --git a/src/test_case.rs b/src/test_case.rs new file mode 100644 index 0000000..28abba6 --- /dev/null +++ b/src/test_case.rs @@ -0,0 +1,343 @@ +//! This module provides the TestCaseRunner and TestCase trait for running and defining test cases. +//! It handles setup, execution, and cleanup of test environments. + +use std::panic::{self}; +use std::path::{Path, PathBuf}; +use std::time::Duration; + +use anyhow::{bail, Context}; +use async_trait::async_trait; +use bitcoin_da::service::BitcoinServiceConfig; +use citrea_sequencer::SequencerConfig; +use futures::FutureExt; +use sov_stf_runner::{ProverConfig, RpcConfig, RunnerConfig, StorageConfig}; + +use super::config::{ + default_rollup_config, BitcoinConfig, FullFullNodeConfig, FullProverConfig, + FullSequencerConfig, RollupConfig, TestCaseConfig, TestCaseEnv, TestConfig, +}; +use super::framework::TestFramework; +use super::node::NodeKind; +use super::utils::{copy_directory, get_available_port, get_tx_backup_dir}; +use super::Result; +use crate::node::Node; +use crate::utils::{get_default_genesis_path, get_workspace_root}; + +// TestCaseRunner manages the lifecycle of a test case, including setup, execution, and cleanup. +/// It creates a test framework with the associated configs, spawns required nodes, connects them, +/// runs the test case, and performs cleanup afterwards. The `run` method handles any panics that +/// might occur during test execution and takes care of cleaning up and stopping the child processes. +pub struct TestCaseRunner(T); + +impl TestCaseRunner { + /// Creates a new TestCaseRunner with the given test case. + pub fn new(test_case: T) -> Self { + Self(test_case) + } + + /// Internal method to fund the wallets, connect the nodes, wait for them to be ready. + async fn prepare(&self, f: &mut TestFramework) -> Result<()> { + f.fund_da_wallets().await?; + f.init_nodes().await?; + f.show_log_paths(); + f.bitcoin_nodes.connect_nodes().await?; + + if let Some(sequencer) = &f.sequencer { + sequencer + .wait_for_ready(Some(Duration::from_secs(5))) + .await?; + } + + Ok(()) + } + + async fn run_test_case(&mut self, f: &mut TestFramework) -> Result<()> { + self.prepare(f).await?; + self.0.setup(f).await?; + self.0.run_test(f).await + } + + /// Executes the test case, handling any panics and performing cleanup. + /// + /// This sets up the framework, executes the test, and ensures cleanup is performed even if a panic occurs. + pub async fn run(mut self) -> Result<()> { + let mut framework = None; + let result = panic::AssertUnwindSafe(async { + framework = Some(TestFramework::new(Self::generate_test_config()?).await?); + let f = framework.as_mut().unwrap(); + self.run_test_case(f).await + }) + .catch_unwind() + .await; + + let f = framework + .as_mut() + .expect("Framework not correctly initialized"); + + if result.is_err() { + if let Err(e) = f.dump_log() { + eprintln!("Error dumping log: {}", e); + } + } + + f.stop().await?; + + // Additional test cleanup + self.0.cleanup().await?; + + match result { + Ok(Ok(())) => Ok(()), + Ok(Err(e)) => Err(e), + Err(panic_error) => { + let panic_msg = panic_error + .downcast_ref::() + .map(|s| s.to_string()) + .unwrap_or_else(|| "Unknown panic".to_string()); + bail!(panic_msg) + } + } + } + + fn generate_test_config() -> Result { + let test_case = T::test_config(); + let env = T::test_env(); + let bitcoin = T::bitcoin_config(); + let prover = T::prover_config(); + let sequencer = T::sequencer_config(); + let sequencer_rollup = default_rollup_config(); + let prover_rollup = default_rollup_config(); + let full_node_rollup = default_rollup_config(); + + let [bitcoin_dir, dbs_dir, prover_dir, sequencer_dir, full_node_dir, genesis_dir] = + create_dirs(&test_case.dir)?; + + copy_genesis_dir(&test_case.genesis_dir, &genesis_dir)?; + + let mut bitcoin_confs = vec![]; + for i in 0..test_case.n_nodes { + let data_dir = bitcoin_dir.join(i.to_string()); + std::fs::create_dir_all(&data_dir) + .with_context(|| format!("Failed to create {} directory", data_dir.display()))?; + + let p2p_port = get_available_port()?; + let rpc_port = get_available_port()?; + + bitcoin_confs.push(BitcoinConfig { + p2p_port, + rpc_port, + data_dir, + env: env.bitcoin().clone(), + idx: i, + ..bitcoin.clone() + }) + } + + // Target first bitcoin node as DA for now + let da_config: BitcoinServiceConfig = bitcoin_confs[0].clone().into(); + + let sequencer_rollup = { + let bind_port = get_available_port()?; + let node_kind = NodeKind::Sequencer.to_string(); + RollupConfig { + da: BitcoinServiceConfig { + da_private_key: Some( + "045FFC81A3C1FDB3AF1359DBF2D114B0B3EFBF7F29CC9C5DA01267AA39D2C78D" + .to_string(), + ), + node_url: format!("http://{}/wallet/{}", da_config.node_url, node_kind), + tx_backup_dir: get_tx_backup_dir(), + ..da_config.clone() + }, + storage: StorageConfig { + path: dbs_dir.join(format!("{}-db", node_kind)), + db_max_open_files: None, + }, + rpc: RpcConfig { + bind_port, + ..sequencer_rollup.rpc + }, + ..sequencer_rollup + } + }; + + let runner_config = Some(RunnerConfig { + sequencer_client_url: format!( + "http://{}:{}", + sequencer_rollup.rpc.bind_host, sequencer_rollup.rpc.bind_port, + ), + include_tx_body: true, + accept_public_input_as_proven: Some(true), + sync_blocks_count: 10, + }); + + let prover_rollup = { + let bind_port = get_available_port()?; + let node_kind = NodeKind::Prover.to_string(); + RollupConfig { + da: BitcoinServiceConfig { + da_private_key: Some( + "75BAF964D074594600366E5B111A1DA8F86B2EFE2D22DA51C8D82126A0FCAC72" + .to_string(), + ), + node_url: format!("http://{}/wallet/{}", da_config.node_url, node_kind), + tx_backup_dir: get_tx_backup_dir(), + ..da_config.clone() + }, + storage: StorageConfig { + path: dbs_dir.join(format!("{}-db", node_kind)), + db_max_open_files: None, + }, + rpc: RpcConfig { + bind_port, + ..prover_rollup.rpc + }, + runner: runner_config.clone(), + ..prover_rollup + } + }; + + let full_node_rollup = { + let bind_port = get_available_port()?; + let node_kind = NodeKind::FullNode.to_string(); + RollupConfig { + da: BitcoinServiceConfig { + node_url: format!( + "http://{}/wallet/{}", + da_config.node_url, + NodeKind::Bitcoin // Use default wallet + ), + tx_backup_dir: get_tx_backup_dir(), + ..da_config.clone() + }, + storage: StorageConfig { + path: dbs_dir.join(format!("{}-db", node_kind)), + db_max_open_files: None, + }, + rpc: RpcConfig { + bind_port, + ..full_node_rollup.rpc + }, + runner: runner_config.clone(), + ..full_node_rollup + } + }; + + Ok(TestConfig { + bitcoin: bitcoin_confs, + sequencer: FullSequencerConfig { + rollup: sequencer_rollup, + dir: sequencer_dir, + docker_image: None, + node: sequencer, + env: env.sequencer(), + }, + prover: FullProverConfig { + rollup: prover_rollup, + dir: prover_dir, + docker_image: None, + node: prover, + env: env.prover(), + }, + full_node: FullFullNodeConfig { + rollup: full_node_rollup, + dir: full_node_dir, + docker_image: None, + node: (), + env: env.full_node(), + }, + test_case, + }) + } +} + +/// Defines the interface for implementing test cases. +/// +/// This trait should be implemented by every test case to define the configuration +/// and inner test logic. It provides default configurations that should be sane for most test cases, +/// which can be overridden by implementing the associated methods. +#[async_trait] +pub trait TestCase: Send + Sync + 'static { + /// Returns the test case configuration. + /// Override this method to provide custom test configurations. + fn test_config() -> TestCaseConfig { + TestCaseConfig::default() + } + + /// Returns the test case env. + /// Override this method to provide custom env per node. + fn test_env() -> TestCaseEnv { + TestCaseEnv::default() + } + + /// Returns the Bitcoin configuration for the test. + /// Override this method to provide a custom Bitcoin configuration. + fn bitcoin_config() -> BitcoinConfig { + BitcoinConfig::default() + } + + /// Returns the sequencer configuration for the test. + /// Override this method to provide a custom sequencer configuration. + fn sequencer_config() -> SequencerConfig { + SequencerConfig::default() + } + + /// Returns the prover configuration for the test. + /// Override this method to provide a custom prover configuration. + fn prover_config() -> ProverConfig { + ProverConfig::default() + } + + /// Returns the test setup + /// Override this method to add custom initialization logic + async fn setup(&self, _framework: &mut TestFramework) -> Result<()> { + Ok(()) + } + + /// Implements the actual test logic. + /// + /// This method is where the test case should be implemented. It receives + /// a reference to the TestFramework, which provides access to the test environment. + /// + /// # Arguments + /// * `framework` - A reference to the TestFramework instance + async fn run_test(&mut self, framework: &mut TestFramework) -> Result<()>; + + async fn cleanup(&self) -> Result<()> { + Ok(()) + } +} + +fn create_dirs(base_dir: &Path) -> Result<[PathBuf; 6]> { + let paths = [ + NodeKind::Bitcoin.to_string(), + "dbs".to_string(), + NodeKind::Prover.to_string(), + NodeKind::Sequencer.to_string(), + NodeKind::FullNode.to_string(), + "genesis".to_string(), + ] + .map(|dir| base_dir.join(dir)); + + for path in &paths { + std::fs::create_dir_all(path) + .with_context(|| format!("Failed to create {} directory", path.display()))?; + } + + Ok(paths) +} + +fn copy_genesis_dir(genesis_dir: &Option, target_dir: &Path) -> std::io::Result<()> { + let genesis_dir = + genesis_dir + .as_ref() + .map(PathBuf::from) + .map_or_else(get_default_genesis_path, |dir| { + if dir.is_absolute() { + dir + } else { + get_workspace_root().join(dir) + } + }); + + copy_directory(genesis_dir, target_dir) +} diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..05e395f --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,149 @@ +use std::fs::File; +use std::future::Future; +use std::io::{BufRead, BufReader}; +use std::net::TcpListener; +use std::path::{Path, PathBuf}; +use std::{fs, io}; + +use anyhow::bail; +use rand::distributions::Alphanumeric; +use rand::{thread_rng, Rng}; +use tokio::time::{sleep, Duration, Instant}; + +use super::Result; + +pub fn get_available_port() -> Result { + let listener = TcpListener::bind("127.0.0.1:0")?; + Ok(listener.local_addr()?.port()) +} + +pub fn get_workspace_root() -> PathBuf { + let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + manifest_dir + .ancestors() + .nth(2) + .expect("Failed to find workspace root") + .to_path_buf() +} + +/// Get citrea path from CITREA env or resolves to debug build. +pub fn get_citrea_path() -> PathBuf { + std::env::var("CITREA").map_or_else( + |_| { + let workspace_root = get_workspace_root(); + let mut path = workspace_root.to_path_buf(); + path.push("target"); + path.push("debug"); + path.push("citrea"); + path + }, + PathBuf::from, + ) +} + +pub fn get_stdout_path(dir: &Path) -> PathBuf { + dir.join("stdout.log") +} + +pub fn get_stderr_path(dir: &Path) -> PathBuf { + dir.join("stderr.log") +} + +/// Get genesis path from resources +/// TODO: assess need for customable genesis path in e2e tests +pub fn get_default_genesis_path() -> PathBuf { + let workspace_root = get_workspace_root(); + let mut path = workspace_root.to_path_buf(); + path.push("resources"); + path.push("genesis"); + path.push("bitcoin-regtest"); + path +} + +pub fn get_genesis_path(dir: &Path) -> PathBuf { + dir.join("genesis") +} + +pub fn generate_test_id() -> String { + thread_rng() + .sample_iter(&Alphanumeric) + .take(10) + .map(char::from) + .collect() +} + +pub fn copy_directory(src: impl AsRef, dst: impl AsRef) -> io::Result<()> { + let src = src.as_ref(); + let dst = dst.as_ref(); + + if !dst.exists() { + fs::create_dir_all(dst)?; + } + + for entry in fs::read_dir(src)? { + let entry = entry?; + let ty = entry.file_type()?; + let file_name = entry.file_name(); + let src_path = src.join(&file_name); + let dst_path = dst.join(&file_name); + + if ty.is_dir() { + copy_directory(&src_path, &dst_path)?; + } else { + fs::copy(&src_path, &dst_path)?; + } + } + + Ok(()) +} + +pub(crate) async fn retry(f: F, timeout: Option) -> Result +where + F: Fn() -> Fut, + Fut: Future>, +{ + let start = Instant::now(); + let timeout = start + timeout.unwrap_or_else(|| Duration::from_secs(5)); + + loop { + match tokio::time::timeout_at(timeout, f()).await { + Ok(Ok(result)) => return Ok(result), + Ok(Err(e)) => { + if Instant::now() >= timeout { + return Err(e); + } + sleep(Duration::from_millis(500)).await; + } + Err(elapsed) => bail!("Timeout expired {elapsed}"), + } + } +} + +pub fn tail_file(path: &Path, lines: usize) -> Result<()> { + let file = File::open(path)?; + let reader = BufReader::new(file); + let mut last_lines = Vec::with_capacity(lines); + + for line in reader.lines() { + let line = line?; + if last_lines.len() >= lines { + last_lines.remove(0); + } + last_lines.push(line); + } + + for line in last_lines { + println!("{}", line); + } + + Ok(()) +} + +pub fn get_tx_backup_dir() -> String { + let workspace_root = get_workspace_root(); + let mut path = workspace_root.to_path_buf(); + path.push("resources"); + path.push("bitcoin"); + path.push("inscription_txs"); + path.to_str().expect("Failed to convert path").to_string() +}