diff --git a/CHANGELOG.md b/CHANGELOG.md index fd6b6deeb68..0f82d86a9ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ As a minor extension, we have adopted a slightly different versioning convention - Update website and explorer user interface to use the new mithril logo. +- Implement a Chain Reader which retrieves blocks from the Cardano chain with Pallas through the `chainsync` mini-protocol. + - Crates versions: | Crate | Version | diff --git a/Cargo.lock b/Cargo.lock index 74a0524e074..f2b929c5caa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3607,7 +3607,7 @@ dependencies = [ [[package]] name = "mithril-common" -version = "0.4.7" +version = "0.4.8" dependencies = [ "anyhow", "async-trait", diff --git a/mithril-common/Cargo.toml b/mithril-common/Cargo.toml index 4ae87cef133..56ee1d75636 100644 --- a/mithril-common/Cargo.toml +++ b/mithril-common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-common" -version = "0.4.7" +version = "0.4.8" description = "Common types, interfaces, and utilities for Mithril nodes." authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-common/src/chain_reader/entity.rs b/mithril-common/src/chain_reader/entity.rs new file mode 100644 index 00000000000..77cf3dc706f --- /dev/null +++ b/mithril-common/src/chain_reader/entity.rs @@ -0,0 +1,18 @@ +use crate::{cardano_block_scanner::ScannedBlock, entities::ChainPoint}; + +/// The action that indicates what to do next when scanning the chain +#[derive(Debug, Clone, PartialEq)] +pub enum ChainBlockNextAction { + /// RollForward event (we are still on the correct fork) + RollForward { + /// The next point in the chain to read + next_point: ChainPoint, + /// The parsed chain block + parsed_block: ScannedBlock, + }, + /// RollBackward event (we are on an incorrect fork, we need to get back a point to roll forward again) + RollBackward { + /// The rollback point in the chain to read (as a new valid point to read from on the main chain, which has already been seen) + rollback_point: ChainPoint, + }, +} diff --git a/mithril-common/src/chain_reader/fake_chain_reader.rs b/mithril-common/src/chain_reader/fake_chain_reader.rs new file mode 100644 index 00000000000..f16604e0bb9 --- /dev/null +++ b/mithril-common/src/chain_reader/fake_chain_reader.rs @@ -0,0 +1,73 @@ +use async_trait::async_trait; +use std::{collections::VecDeque, sync::Mutex}; + +use crate::{entities::ChainPoint, StdResult}; + +use super::{ChainBlockNextAction, ChainBlockReader}; + +/// [FakeChainReader] is a fake implementation of [ChainBlockReader] for testing purposes. +pub struct FakeChainReader { + chain_point_next_actions: Mutex>, +} + +impl FakeChainReader { + /// Creates a new [FakeChainReader] instance. + pub fn new(chain_point_next_actions: Vec) -> Self { + Self { + chain_point_next_actions: Mutex::new(chain_point_next_actions.into()), + } + } +} + +#[async_trait] +impl ChainBlockReader for FakeChainReader { + async fn set_chain_point(&mut self, _point: &ChainPoint) -> StdResult<()> { + Ok(()) + } + + async fn get_next_chain_block(&mut self) -> StdResult> { + Ok(self.chain_point_next_actions.lock().unwrap().pop_front()) + } +} + +#[cfg(test)] +mod tests { + use crate::cardano_block_scanner::ScannedBlock; + + use super::*; + + fn build_chain_point(id: u64) -> ChainPoint { + ChainPoint { + slot_number: id, + block_number: id, + block_hash: format!("point-hash-{id}"), + } + } + + #[tokio::test] + async fn test_get_next_chain_block() { + let expected_chain_point_next_actions = vec![ + ChainBlockNextAction::RollForward { + next_point: build_chain_point(1), + parsed_block: ScannedBlock::new("hash-1", 1, 10, 20, Vec::<&str>::new()), + }, + ChainBlockNextAction::RollForward { + next_point: build_chain_point(2), + parsed_block: ScannedBlock::new("hash-2", 2, 11, 21, Vec::<&str>::new()), + }, + ChainBlockNextAction::RollBackward { + rollback_point: build_chain_point(1), + }, + ]; + + let mut chain_reader = FakeChainReader::new(expected_chain_point_next_actions.clone()); + + let mut chain_point_next_actions = vec![]; + while let Some(chain_block_next_action) = chain_reader.get_next_chain_block().await.unwrap() + { + chain_point_next_actions.push(chain_block_next_action); + } + + assert_eq!(expected_chain_point_next_actions, chain_point_next_actions); + } +} diff --git a/mithril-common/src/chain_reader/interface.rs b/mithril-common/src/chain_reader/interface.rs new file mode 100644 index 00000000000..1db2df4deb3 --- /dev/null +++ b/mithril-common/src/chain_reader/interface.rs @@ -0,0 +1,18 @@ +use async_trait::async_trait; + +use crate::{entities::ChainPoint, StdResult}; + +use super::ChainBlockNextAction; + +/// The trait that reads events to either: +/// - read next block on the chain +/// - rollback to another point in case of rollback +/// - do nothing when tip of the chain is reached +#[async_trait] +pub trait ChainBlockReader { + /// Sets the chain point + async fn set_chain_point(&mut self, point: &ChainPoint) -> StdResult<()>; + + /// Get the next chain block + async fn get_next_chain_block(&mut self) -> StdResult>; +} diff --git a/mithril-common/src/chain_reader/mod.rs b/mithril-common/src/chain_reader/mod.rs new file mode 100644 index 00000000000..c6ab1d9fa5e --- /dev/null +++ b/mithril-common/src/chain_reader/mod.rs @@ -0,0 +1,11 @@ +//! Tools to read chain blocks sequentially + +mod entity; +mod fake_chain_reader; +mod interface; +mod pallas_chain_reader; + +pub use entity::*; +pub use fake_chain_reader::*; +pub use interface::*; +pub use pallas_chain_reader::*; diff --git a/mithril-common/src/chain_reader/pallas_chain_reader.rs b/mithril-common/src/chain_reader/pallas_chain_reader.rs new file mode 100644 index 00000000000..b075f674e93 --- /dev/null +++ b/mithril-common/src/chain_reader/pallas_chain_reader.rs @@ -0,0 +1,357 @@ +use std::path::{Path, PathBuf}; + +use anyhow::{anyhow, Context}; +use async_trait::async_trait; +use pallas_network::{ + facades::NodeClient, + miniprotocols::chainsync::{BlockContent, NextResponse}, +}; +use pallas_traverse::MultiEraBlock; + +use crate::{cardano_block_scanner::ScannedBlock, entities::ChainPoint, CardanoNetwork, StdResult}; + +use super::{ChainBlockNextAction, ChainBlockReader}; + +/// [PallasChainReader] reads blocks with 'chainsync' mini-protocol +pub struct PallasChainReader { + socket: PathBuf, + network: CardanoNetwork, + client: Option, +} + +impl PallasChainReader { + /// Creates a new `PallasChainReader` with the specified socket and network. + pub fn new(socket: &Path, network: CardanoNetwork) -> Self { + Self { + socket: socket.to_owned(), + network, + client: None, + } + } + + /// Creates and returns a new `NodeClient` connected to the specified socket. + async fn new_client(&self) -> StdResult { + let magic = self.network.code(); + NodeClient::connect(&self.socket, magic) + .await + .map_err(|err| anyhow!(err)) + .with_context(|| "PallasChainReader failed to create a new client") + } + + /// Returns a mutable reference to the client. + async fn get_client(&mut self) -> StdResult<&mut NodeClient> { + if self.client.is_none() { + self.client = Some(self.new_client().await?); + } + + self.client + .as_mut() + .with_context(|| "PallasChainReader failed to get a client") + } + + /// Intersects the point of the chain with the given point. + async fn find_intersect_point(&mut self, point: &ChainPoint) -> StdResult<()> { + let client = self.get_client().await?; + let chainsync = client.chainsync(); + + chainsync + .find_intersect(vec![point.to_owned().into()]) + .await?; + + Ok(()) + } + + /// Processes a block content next response and returns the appropriate chain block next action. + async fn process_chain_block_next_action( + &mut self, + next: NextResponse, + ) -> StdResult> { + match next { + NextResponse::RollForward(raw_block, forward_tip) => { + let multi_era_block = MultiEraBlock::decode(&raw_block) + .with_context(|| "PallasChainReader failed to decode raw block")?; + let parsed_block = ScannedBlock::convert(multi_era_block, 0); + Ok(Some(ChainBlockNextAction::RollForward { + next_point: forward_tip.into(), + parsed_block, + })) + } + NextResponse::RollBackward(rollback_point, _) => { + Ok(Some(ChainBlockNextAction::RollBackward { + rollback_point: rollback_point.into(), + })) + } + NextResponse::Await => Ok(None), + } + } +} + +impl Drop for PallasChainReader { + fn drop(&mut self) { + if let Some(client) = self.client.take() { + tokio::spawn(async move { + let _ = client.abort().await; + }); + } + } +} + +#[async_trait] +impl ChainBlockReader for PallasChainReader { + async fn set_chain_point(&mut self, point: &ChainPoint) -> StdResult<()> { + self.find_intersect_point(point).await + } + + async fn get_next_chain_block(&mut self) -> StdResult> { + let client = self.get_client().await?; + let chainsync = client.chainsync(); + + let next = match chainsync.has_agency() { + true => chainsync.request_next().await?, + false => chainsync.recv_while_must_reply().await?, + }; + + self.process_chain_block_next_action(next).await + } +} + +#[cfg(test)] +mod tests { + use std::fs; + + use pallas_network::{ + facades::NodeServer, + miniprotocols::{ + chainsync::{BlockContent, Tip}, + Point, + }, + }; + use tokio::net::UnixListener; + + use super::*; + + use crate::{entities::BlockNumber, test_utils::TempDir}; + + /// Enum representing the action to be performed by the server. + enum ServerAction { + RollBackward, + RollForward, + } + + /// Enum representing whether the node has agency or not. + #[derive(Debug, PartialEq)] + enum HasAgency { + Yes, + No, + } + + /// Returns a fake specific point for testing purposes. + fn get_fake_specific_point() -> Point { + Point::Specific( + 1654413, + hex::decode("7de1f036df5a133ce68a82877d14354d0ba6de7625ab918e75f3e2ecb29771c2") + .unwrap(), + ) + } + + /// Returns a fake block number for testing purposes. + fn get_fake_block_number() -> BlockNumber { + 1337 + } + + /// Returns a fake chain point for testing purposes. + fn get_fake_chain_point_backwards() -> ChainPoint { + ChainPoint::from(get_fake_specific_point()) + } + + /// Returns a fake chain point for testing purposes. + fn get_fake_chain_point_forwards() -> ChainPoint { + Tip(get_fake_specific_point(), get_fake_block_number()).into() + } + + /// Creates a new work directory in the system's temporary folder. + fn create_temp_dir(folder_name: &str) -> PathBuf { + TempDir::create_with_short_path("pallas_chain_observer_test", folder_name) + } + + fn get_fake_raw_block() -> Vec { + let raw_block = include_str!("../../../mithril-test-lab/test_data/blocks/shelley1.block"); + + hex::decode(raw_block).unwrap() + } + + fn get_fake_scanned_block() -> ScannedBlock { + let raw_block = get_fake_raw_block(); + let multi_era_block = MultiEraBlock::decode(&raw_block).unwrap(); + + ScannedBlock::convert(multi_era_block, 0) + } + + /// Sets up a mock server for related tests. + /// + /// Use the `action` parameter to specify the action to be performed by the server. + async fn setup_server( + socket_path: PathBuf, + action: ServerAction, + has_agency: HasAgency, + ) -> tokio::task::JoinHandle<()> { + tokio::spawn({ + async move { + if socket_path.exists() { + fs::remove_file(&socket_path).expect("Previous socket removal failed"); + } + + let known_point = get_fake_specific_point(); + let tip_block_number = get_fake_block_number(); + let unix_listener = UnixListener::bind(socket_path.as_path()).unwrap(); + let mut server = NodeServer::accept(&unix_listener, 10).await.unwrap(); + + let chainsync_server = server.chainsync(); + + chainsync_server.recv_while_idle().await.unwrap(); + + chainsync_server + .send_intersect_found( + known_point.clone(), + Tip(known_point.clone(), tip_block_number), + ) + .await + .unwrap(); + + chainsync_server.recv_while_idle().await.unwrap(); + + if has_agency == HasAgency::No { + chainsync_server.send_await_reply().await.unwrap(); + } + + match action { + ServerAction::RollBackward => { + chainsync_server + .send_roll_backward( + known_point.clone(), + Tip(known_point.clone(), tip_block_number), + ) + .await + .unwrap(); + } + ServerAction::RollForward => { + let block = BlockContent(get_fake_raw_block()); + chainsync_server + .send_roll_forward(block, Tip(known_point.clone(), tip_block_number)) + .await + .unwrap(); + } + } + } + }) + } + + #[tokio::test] + async fn get_next_chain_block_rolls_backward() { + let socket_path = + create_temp_dir("get_next_chain_block_rolls_backward").join("node.socket"); + let known_point = get_fake_specific_point(); + let server = setup_server( + socket_path.clone(), + ServerAction::RollBackward, + HasAgency::Yes, + ) + .await; + let client = tokio::spawn(async move { + let mut chain_reader = + PallasChainReader::new(socket_path.as_path(), CardanoNetwork::TestNet(10)); + + chain_reader + .set_chain_point(&ChainPoint::from(known_point.clone())) + .await + .unwrap(); + + chain_reader.get_next_chain_block().await.unwrap().unwrap() + }); + + let (_, client_res) = tokio::join!(server, client); + let chain_block = client_res.expect("Client failed to get next chain block"); + match chain_block { + ChainBlockNextAction::RollBackward { rollback_point } => { + assert_eq!(rollback_point, get_fake_chain_point_backwards()); + } + _ => panic!("Unexpected chain block action"), + } + } + + #[tokio::test] + async fn get_next_chain_block_rolls_forward() { + let socket_path = create_temp_dir("get_next_chain_block_rolls_forward").join("node.socket"); + let known_point = get_fake_specific_point(); + let server = setup_server( + socket_path.clone(), + ServerAction::RollForward, + HasAgency::Yes, + ) + .await; + let client = tokio::spawn(async move { + let mut chain_reader = + PallasChainReader::new(socket_path.as_path(), CardanoNetwork::TestNet(10)); + + chain_reader + .set_chain_point(&ChainPoint::from(known_point.clone())) + .await + .unwrap(); + + chain_reader.get_next_chain_block().await.unwrap().unwrap() + }); + + let (_, client_res) = tokio::join!(server, client); + let chain_block = client_res.expect("Client failed to get next chain block"); + match chain_block { + ChainBlockNextAction::RollForward { + next_point, + parsed_block, + } => { + assert_eq!(next_point, get_fake_chain_point_forwards()); + assert_eq!(parsed_block, get_fake_scanned_block()); + } + _ => panic!("Unexpected chain block action"), + } + } + + #[tokio::test] + async fn get_next_chain_block_has_no_agency() { + let socket_path = create_temp_dir("get_next_chain_block_has_no_agency").join("node.socket"); + let known_point = get_fake_specific_point(); + let server = setup_server( + socket_path.clone(), + ServerAction::RollForward, + HasAgency::No, + ) + .await; + let client = tokio::spawn(async move { + let mut chain_reader = + PallasChainReader::new(socket_path.as_path(), CardanoNetwork::TestNet(10)); + + chain_reader + .set_chain_point(&ChainPoint::from(known_point.clone())) + .await + .unwrap(); + + // forces the client to change the chainsync server agency state + let client = chain_reader.get_client().await.unwrap(); + client.chainsync().request_next().await.unwrap(); + + chain_reader.get_next_chain_block().await.unwrap().unwrap() + }); + + let (_, client_res) = tokio::join!(server, client); + let chain_block = client_res.expect("Client failed to get next chain block"); + match chain_block { + ChainBlockNextAction::RollForward { + next_point, + parsed_block, + } => { + assert_eq!(next_point, get_fake_chain_point_forwards()); + assert_eq!(parsed_block, get_fake_scanned_block()); + } + _ => panic!("Unexpected chain block action"), + } + } +} diff --git a/mithril-common/src/entities/cardano_chain_point.rs b/mithril-common/src/entities/cardano_chain_point.rs index d49262ed7de..2066cc12016 100644 --- a/mithril-common/src/entities/cardano_chain_point.rs +++ b/mithril-common/src/entities/cardano_chain_point.rs @@ -1,4 +1,7 @@ use serde::{Deserialize, Serialize}; +cfg_fs! { + use pallas_network::miniprotocols::{chainsync::Tip, Point}; +} /// [Cardano Slot number](https://docs.cardano.org/learn/cardano-node/#slotsandepochs) pub type SlotNumber = u64; @@ -21,3 +24,60 @@ pub struct ChainPoint { /// The hex encoded block hash pub block_hash: BlockHash, } + +impl ChainPoint { + /// Check if origin chain point + pub fn is_origin(&self) -> bool { + self.slot_number == 0 && self.block_number == 0 && self.block_hash.is_empty() + } +} + +cfg_fs! { + impl From for Point { + fn from(chain_point: ChainPoint) -> Self { + match chain_point.is_origin() { + true => Self::Origin, + false => Self::Specific( + chain_point.slot_number, + hex::decode(&chain_point.block_hash).unwrap(), // TODO: keep block_hash as a Vec + ), + } + } + } + + impl From for ChainPoint { + fn from(point: Point) -> Self { + match point { + Point::Specific(slot_number, block_hash) => Self { + slot_number, + block_number: 0, + block_hash: hex::encode(block_hash), + }, + Point::Origin => Self { + slot_number: 0, + block_number: 0, + block_hash: String::new(), + }, + } + } + } + + impl From for ChainPoint { + fn from(tip: Tip) -> Self { + let chain_point: Self = tip.0.into(); + Self { + slot_number: chain_point.slot_number, + block_number: tip.1, + block_hash: chain_point.block_hash, + } + } + } + + impl From for Tip { + fn from(chain_point: ChainPoint) -> Self { + let block_number = chain_point.block_number; + let point: Point = chain_point.into(); + Tip(point, block_number) + } + } +} diff --git a/mithril-common/src/lib.rs b/mithril-common/src/lib.rs index 6fb6924cfc4..cbb97b071b8 100644 --- a/mithril-common/src/lib.rs +++ b/mithril-common/src/lib.rs @@ -69,6 +69,7 @@ cfg_fs! { mod time_point_provider; pub mod digesters; pub mod cardano_block_scanner; + pub mod chain_reader; pub use time_point_provider::{TimePointProvider, TimePointProviderImpl}; }