diff --git a/indexer/Cargo.toml b/indexer/Cargo.toml index c350d6b1..ac94de25 100644 --- a/indexer/Cargo.toml +++ b/indexer/Cargo.toml @@ -9,9 +9,9 @@ strip = true [dependencies] # [core] -dcspark-core = { git = "https://github.com/dcSpark/dcspark-core.git", rev = "ebb245ac047f9d45dba97f07a5bb525ffd81a539" } -dcspark-blockchain-source = { git = "https://github.com/dcSpark/dcspark-core.git", rev = "ebb245ac047f9d45dba97f07a5bb525ffd81a539" } -multiverse = { git = "https://github.com/dcSpark/dcspark-core.git", rev = "ebb245ac047f9d45dba97f07a5bb525ffd81a539" } +dcspark-core = { git = "https://github.com/dcSpark/dcspark-core.git", rev = "986ad8495c3894782774e2db0fc3fe65f44ca89d" } +dcspark-blockchain-source = { git = "https://github.com/dcSpark/dcspark-core.git", rev = "986ad8495c3894782774e2db0fc3fe65f44ca89d" } +multiverse = { git = "https://github.com/dcSpark/dcspark-core.git", rev = "986ad8495c3894782774e2db0fc3fe65f44ca89d" } # [local] entity = { path = "entity" } @@ -29,7 +29,6 @@ clap = { version = "3.1", features = ["derive"] } ctrlc = { version = "3.2.4", features = ["termination"] } dotenv = { version = "0.15.0" } hex = { version = "0.4.3" } -oura = { git = "https://github.com/txpipe/oura.git", rev = "e1b971394a394bde13fb601ad3f6d4ad343b02f0" } serde = { version = "1.0.152", features = ["derive", "rc"] } serde_json = { version = "1.0.92" } serde_yaml = { version = "0.9.17" } diff --git a/indexer/src/main.rs b/indexer/src/main.rs index 6de33882..7dc3aa3f 100644 --- a/indexer/src/main.rs +++ b/indexer/src/main.rs @@ -1,12 +1,11 @@ use crate::sink::Sink; use crate::sinks::CardanoSink; -use crate::sources::{CardanoSource, OuraSource}; +use crate::sources::{CardanoSource, N2CSource}; use crate::types::StoppableService; use anyhow::{anyhow, Context}; use clap::Parser; use dcspark_blockchain_source::{GetNextFrom, Source}; use migration::async_std::path::PathBuf; -use oura::sources::BearerKind; use serde::Deserialize; use std::fs::File; use std::process::exit; @@ -68,7 +67,7 @@ pub enum SinkConfig { #[serde(tag = "type", rename_all = "snake_case")] #[serde(deny_unknown_fields)] pub enum SourceConfig { - Oura { socket: String, bearer: BearerKind }, + N2c { socket: String }, CardanoNet { relay: (String, u16) }, } @@ -169,7 +168,7 @@ async fn main() -> anyhow::Result<()> { config }; - let (network, base_config, mut sink) = match &config.sink { + let (_network, base_config, mut sink) = match &config.sink { SinkConfig::Cardano { network, custom_config, @@ -213,9 +212,21 @@ async fn main() -> anyhow::Result<()> { .context("Can't get starting point from sink")?; match &config.source { - SourceConfig::Oura { .. } => { - let source = OuraSource::new(config.source, network, start_from.clone()) - .context("Can't create oura source")?; + SourceConfig::N2c { socket } => { + let network_config = dcspark_blockchain_source::cardano::NetworkConfiguration { + relay: dcspark_blockchain_source::cardano::Relay::UnixSocket(socket.clone()), + from: None, + ..base_config + }; + + let source = dcspark_blockchain_source::cardano::N2CSource::connect( + network_config, + start_from.clone(), + ) + .await?; + + let source = N2CSource(source); + let start_from = start_from .last() .cloned() diff --git a/indexer/src/sources/mod.rs b/indexer/src/sources/mod.rs index 0071ef24..9b014a95 100644 --- a/indexer/src/sources/mod.rs +++ b/indexer/src/sources/mod.rs @@ -1,5 +1,5 @@ mod cardano; -mod oura_source; +mod n2c; pub use cardano::CardanoSource; -pub use oura_source::OuraSource; +pub use n2c::N2CSource; diff --git a/indexer/src/sources/n2c.rs b/indexer/src/sources/n2c.rs new file mode 100644 index 00000000..a13bbe43 --- /dev/null +++ b/indexer/src/sources/n2c.rs @@ -0,0 +1,50 @@ +use async_trait::async_trait; +use dcspark_blockchain_source::{cardano::Point, Source}; +use dcspark_core::StoppableService as _; + +pub struct N2CSource(pub dcspark_blockchain_source::cardano::N2CSource); + +#[async_trait] +impl crate::types::StoppableService for N2CSource { + async fn stop(self) -> anyhow::Result<()> { + self.0.stop().await?; + + Ok(()) + } +} + +#[async_trait::async_trait] +impl Source for N2CSource { + type Event = crate::common::CardanoEventType; + + type From = Point; + + #[tracing::instrument(skip(self))] + async fn pull(&mut self, _from: &Self::From) -> anyhow::Result> { + let event = self.0.pull(&()).await?; + + let Some(event) = event else { + return Ok(None); + }; + + match event { + dcspark_blockchain_source::cardano::N2CSourceEvent::RollBack { + block_slot, + block_hash, + } => Ok(Some(crate::common::CardanoEventType::RollBack { + block_slot, + block_hash, + })), + dcspark_blockchain_source::cardano::N2CSourceEvent::Block(block_event) => { + Ok(Some(crate::common::CardanoEventType::Block { + cbor_hex: hex::encode(block_event.raw_block), + epoch: block_event.epoch, + epoch_slot: Some(block_event.slot_number.into()), + block_number: block_event.block_number.into(), + block_hash: block_event.id.to_string(), + block_slot: block_event.slot_number.into(), + })) + } + } + } +} diff --git a/indexer/src/sources/oura_source.rs b/indexer/src/sources/oura_source.rs deleted file mode 100644 index b868af28..00000000 --- a/indexer/src/sources/oura_source.rs +++ /dev/null @@ -1,232 +0,0 @@ -use crate::SourceConfig; -use anyhow::anyhow; -use dcspark_blockchain_source::cardano::Point; - -use std::{str::FromStr, sync::Arc, thread::JoinHandle}; - -use crate::common::CardanoEventType; -use crate::types::StoppableService; -use oura::model::EventData; -use oura::pipelining::SourceProvider; -use oura::{ - filters::selection::{self, Predicate}, - mapper, - pipelining::{FilterProvider, StageReceiver}, - sources::{n2c, n2n, AddressArg, BearerKind, IntersectArg, MagicArg, PointArg}, - utils::{ChainWellKnownInfo, Utils, WithUtils}, -}; - -pub struct OuraSource { - _handles: Vec>, - input: StageReceiver, - - // cardano-node always triggers a rollback event when you connect to it - // if all the intersection points existed, if will return the most recent point you gave it - // to avoid this causing a rollback when applying a migration starting from an old block, we skip this rollback - expected_rollback: Option, -} - -impl OuraSource { - pub fn new( - config: SourceConfig, - network: String, - start_from: Vec, - ) -> anyhow::Result { - match config { - SourceConfig::Oura { socket, bearer } => { - let (intersect, rollback) = match start_from { - points if points.is_empty() => { - // we need a special intersection type when bootstrapping from genesis - (IntersectArg::Origin, None) - } - points => { - let (slot_nb, hash) = match points.last().unwrap() { - Point::Origin => { - return Err(anyhow!("Origin point is not supported here")); - } - Point::BlockHeader { slot_nb, hash } => (slot_nb, hash), - }; - tracing::info!("Starting sync at block #{} ({})", slot_nb, hash,); - // if last block synced was at slot 0, - // that means it was the genesis block so we start from origin - match (*slot_nb).into() { - 0u64 => (IntersectArg::Origin, None), - _ => { - let point_args: Vec = points - .into_iter() - .flat_map(|p| match p { - Point::Origin => vec![], - Point::BlockHeader { slot_nb, hash } => { - vec![PointArg(slot_nb.into(), hash.to_string())] - } - }) - .collect(); - let rollback = point_args.first().cloned(); - (IntersectArg::Fallbacks(point_args), rollback) - } - } - } - }; - - let (handles, input) = oura_bootstrap(bearer, intersect, &network, socket)?; - - Ok(OuraSource { - _handles: handles, - input, - expected_rollback: rollback, - }) - } - _ => Err(anyhow!( - "Config {:?} is not supported as oura config", - config - )), - } - } -} - -#[async_trait::async_trait] -impl dcspark_blockchain_source::Source for OuraSource { - type Event = CardanoEventType; - type From = Point; - - /// note: from is ignored here since oura is set up just once - async fn pull(&mut self, _from: &Self::From) -> anyhow::Result> { - let input = self - .input - .recv() - .map_err(|error| anyhow!("Can't fetch oura event: {:?}", error))?; - - match input.data { - EventData::Block(block_record) => { - let cbor = block_record - .cbor_hex - .ok_or_else(|| anyhow!("cbor is not presented"))?; - Ok(Some(CardanoEventType::Block { - cbor_hex: cbor, - epoch: block_record.epoch, - epoch_slot: block_record.epoch_slot, - block_number: block_record.number, - block_hash: block_record.hash, - block_slot: block_record.slot, - })) - } - EventData::RollBack { - block_slot, - block_hash, - } => { - if let Some(expected) = self.expected_rollback.clone() { - if expected.1 == *block_hash { - self.expected_rollback = None; - return Ok(None); - } - }; - Ok(Some(CardanoEventType::RollBack { - block_slot, - block_hash, - })) - } - _ => Ok(None), - } - } -} - -#[async_trait::async_trait] -impl StoppableService for OuraSource { - async fn stop(self) -> anyhow::Result<()> { - Ok(()) - } -} - -fn oura_bootstrap( - mode: BearerKind, - intersect: IntersectArg, - network: &str, - socket: String, -) -> anyhow::Result<(Vec>, StageReceiver)> { - let magic = match network { - "sanchonet" => MagicArg(4), - _ => MagicArg::from_str(network).map_err(|_| anyhow!("magic arg failed"))?, - }; - - let well_known = if magic.0 == 4 { - ChainWellKnownInfo { - byron_epoch_length: 86400, - byron_slot_length: 20, - byron_known_slot: 0, - byron_known_hash: "".to_string(), - byron_known_time: 1686789000, - shelley_epoch_length: 86400, - shelley_slot_length: 1, - shelley_known_slot: 0, - shelley_known_hash: "".to_string(), - shelley_known_time: 1686789000, - address_hrp: "addr_test".to_string(), - adahandle_policy: "".to_string(), - } - } else { - ChainWellKnownInfo::try_from_magic(*magic) - .map_err(|_| anyhow!("chain well known info failed"))? - }; - - let utils = Arc::new(Utils::new(well_known)); - - let mapper = mapper::Config { - include_transaction_details: true, - include_block_cbor: true, - ..Default::default() - }; - - tracing::info!("{}", "Attempting to connect to node..."); - let (source_handle, source_rx) = match mode { - #[allow(deprecated)] - BearerKind::Unix => { - let source_config = n2c::Config { - address: AddressArg(BearerKind::Unix, socket), - magic: Some(magic), - well_known: None, - mapper, - since: None, - min_depth: 0, - intersect: Some(intersect), - retry_policy: None, - finalize: None, // TODO: configurable - }; - WithUtils::new(source_config, utils).bootstrap() - } - #[allow(deprecated)] - BearerKind::Tcp => { - let source_config = n2n::Config { - address: AddressArg(BearerKind::Tcp, socket), - magic: Some(magic), - well_known: None, - mapper, - since: None, - min_depth: 0, - intersect: Some(intersect), - retry_policy: None, - finalize: None, // TODO: configurable - }; - WithUtils::new(source_config, utils).bootstrap() - } - } - .map_err(|e| { - tracing::error!("{}", e); - anyhow!("failed to bootstrap source. Are you sure cardano-node is running?") - })?; - tracing::info!("{}", "Connection to node established"); - - let mut handles = Vec::new(); - handles.push(source_handle); - - let check = Predicate::VariantIn(vec![String::from("Block"), String::from("Rollback")]); - - let filter_setup = selection::Config { check }; - - let (filter_handle, filter_rx) = filter_setup - .bootstrap(source_rx) - .map_err(|_| anyhow!("failed to bootstrap filter"))?; - - handles.push(filter_handle); - - Ok((handles, filter_rx)) -}