From b185a9445044ae1c0a1e895fb5dc1e64eed24d4f Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Sun, 11 Jun 2023 23:40:14 +0200 Subject: [PATCH] test: add upstream integration test (#64) --- src/lib.rs | 5 ++- src/tests/mod.rs | 1 + src/tests/upstream.rs | 81 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 86 insertions(+), 1 deletion(-) create mode 100644 src/tests/mod.rs create mode 100644 src/tests/upstream.rs diff --git a/src/lib.rs b/src/lib.rs index fc057ec9..8510bb4d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,8 @@ pub mod model; pub mod prelude; -pub mod rolldb; pub mod serve; +pub mod storage; pub mod sync; + +#[cfg(test)] +mod tests; diff --git a/src/tests/mod.rs b/src/tests/mod.rs new file mode 100644 index 00000000..1040c1f6 --- /dev/null +++ b/src/tests/mod.rs @@ -0,0 +1 @@ +mod upstream; diff --git a/src/tests/upstream.rs b/src/tests/upstream.rs new file mode 100644 index 00000000..6cb87405 --- /dev/null +++ b/src/tests/upstream.rs @@ -0,0 +1,81 @@ +use gasket::{framework::*, messaging::*, runtime::Policy}; +use tracing::{error, info}; + +use crate::{model::UpstreamEvent, storage::rolldb::RollDB}; + +struct WitnessStage { + input: gasket::messaging::tokio::InputPort, +} + +impl gasket::framework::Stage for WitnessStage { + type Unit = UpstreamEvent; + type Worker = WitnessWorker; + + fn name(&self) -> &str { + "witness" + } +} + +struct WitnessWorker; + +#[async_trait::async_trait(?Send)] +impl Worker for WitnessWorker { + async fn bootstrap(_: &WitnessStage) -> Result { + Ok(Self) + } + + async fn schedule( + &mut self, + stage: &mut WitnessStage, + ) -> Result, WorkerError> { + error!("dequeing form witness"); + let msg = stage.input.recv().await.or_panic()?; + Ok(WorkSchedule::Unit(msg.payload)) + } + + async fn execute( + &mut self, + _: &UpstreamEvent, + _: &mut WitnessStage, + ) -> Result<(), WorkerError> { + info!("witnessing block event"); + + Ok(()) + } +} + +#[test] +#[ignore] +fn test_mainnet_upstream() { + tracing::subscriber::set_global_default( + tracing_subscriber::FmtSubscriber::builder() + .with_max_level(tracing::Level::TRACE) + .finish(), + ) + .unwrap(); + + let rolldb = RollDB::open("tmp", 10).unwrap(); + + let (send, receive) = gasket::messaging::tokio::channel(200); + + let mut upstream = crate::sync::upstream::Stage::new( + "relays-new.cardano-mainnet.iohk.io:3001".into(), + 764824073, + rolldb, + ); + + upstream.downstream.connect(send); + + let mut witness = WitnessStage { + input: Default::default(), + }; + + witness.input.connect(receive); + + let upstream = gasket::runtime::spawn_stage(upstream, Policy::default()); + let witness = gasket::runtime::spawn_stage(witness, Policy::default()); + + let daemon = gasket::daemon::Daemon(vec![upstream, witness]); + + daemon.block(); +}