Skip to content

Commit

Permalink
Add test gadget abstraction (#7)
Browse files Browse the repository at this point in the history
* Add test gadget abstraction

* Add integration test to ensure gadget works properly with job manager

* Add networking into integration test
  • Loading branch information
tbraun96 authored Nov 3, 2023
1 parent b542210 commit f43877d
Show file tree
Hide file tree
Showing 13 changed files with 639 additions and 8 deletions.
8 changes: 6 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
members = [
"gadget-core",
"webb-gadget",
"zk-gadget"
"zk-gadget",
"test-gadget"
]

[workspace.dependencies]
Expand All @@ -21,4 +22,7 @@ tokio = "1.32.0"
bincode2 = "2"
futures-util = "0.3.28"
serde = "1.0.188"
async-trait = "0.1.73"
async-trait = "0.1.73"
log = "0.4.20"
parking_lot = "0.12.1"
futures = "0.3.28"
4 changes: 2 additions & 2 deletions gadget-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ substrate = [

[dependencies]
sync_wrapper = "0.1.2"
parking_lot = "0.12.1"
parking_lot = { workspace = true }
tokio = { workspace = true, features = ["sync", "time", "macros", "rt"] }
hex = "0.4.3"
async-trait = "0.1.73"

sp-runtime = { optional = true, workspace = true, default-features = false }
sc-client-api = { optional = true, workspace = true, default-features = false }
sp-api = { optional = true, workspace = true, default-features = false }
futures = { optional = true, version = "0.3.28" }
futures = { optional = true, workspace = true }

[dev-dependencies]
11 changes: 9 additions & 2 deletions gadget-core/src/job_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,21 @@ pub trait ProtocolRemote<WM: WorkManagerInterface>: Send + Sync + 'static {
fn start(&self) -> Result<(), WM::Error>;
fn session_id(&self) -> WM::SessionID;
fn set_as_primary(&self);
fn has_stalled(&self, now: WM::Clock) -> bool;
fn started_at(&self) -> WM::Clock;
fn shutdown(&self, reason: ShutdownReason) -> Result<(), WM::Error>;
fn is_done(&self) -> bool;
fn deliver_message(&self, message: WM::ProtocolMessage) -> Result<(), WM::Error>;
fn has_started(&self) -> bool;
fn is_active(&self) -> bool;
fn ssid(&self) -> WM::SSID;

fn has_stalled(&self, now: WM::Clock) -> bool {
now >= self.started_at() + WM::acceptable_block_tolerance()
}

fn is_active(&self) -> bool {
// If the protocol has started, is not done, and has not stalled, then it is active
self.has_started() && !self.is_done() && !self.has_started()
}
}

#[derive(Debug, Eq, PartialEq)]
Expand Down
20 changes: 20 additions & 0 deletions test-gadget/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "test-gadget"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]

gadget-core = { workspace = true }
log = { workspace = true }
parking_lot = { workspace = true }
serde = { workspace = true, features = ["derive"] }
tokio = { workspace = true, features = ["sync"] }
futures = { workspace = true }
async-trait = { workspace = true }

[dev-dependencies]
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread"] }
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
29 changes: 29 additions & 0 deletions test-gadget/src/blockchain.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use crate::error::TestError;
use crate::gadget::TestFinalityNotification;
use std::time::Duration;

/// Mocks the blockchain timing mechanism
pub async fn blockchain(
block_duration: Duration,
blocks_per_session: u64,
broadcaster: tokio::sync::broadcast::Sender<TestFinalityNotification>,
) -> Result<(), TestError> {
let mut current_block = 0;
let mut current_session = 0;

loop {
broadcaster
.send(TestFinalityNotification {
number: current_block,
session_id: current_session,
})
.map_err(|_| TestError {
reason: "Failed to broadcast".to_string(),
})?;

current_block += 1;
current_session = current_block / blocks_per_session;

tokio::time::sleep(block_duration).await;
}
}
15 changes: 15 additions & 0 deletions test-gadget/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use std::error::Error;
use std::fmt::{Display, Formatter};

#[derive(Debug)]
pub struct TestError {
pub reason: String,
}

impl Display for TestError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
std::fmt::Debug::fmt(self, f)
}
}

impl Error for TestError {}
167 changes: 167 additions & 0 deletions test-gadget/src/gadget.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
use crate::error::TestError;
use crate::message::TestProtocolMessage;
use crate::work_manager::{
AsyncProtocolGenerator, TestAsyncProtocolParameters, TestProtocolRemote, TestWorkManager,
};
use async_trait::async_trait;
use gadget_core::gadget::manager::AbstractGadget;
use gadget_core::job_manager::{PollMethod, ProtocolWorkManager, SendFuture, WorkManagerInterface};
use parking_lot::RwLock;
use std::pin::Pin;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use tokio::sync::Mutex;

/// An AbstractGadget endowed with a WorkerManager, a fake blockchain that delivers FinalityNotifications to the gadgets, and a TestProtocolMessage stream
pub struct TestGadget {
job_manager: ProtocolWorkManager<TestWorkManager>,
blockchain_connection: Mutex<tokio::sync::broadcast::Receiver<TestFinalityNotification>>,
network_connection: Mutex<tokio::sync::mpsc::UnboundedReceiver<TestProtocolMessage>>,
// Specifies at which blocks we should start a job on
run_test_at: Arc<Vec<u64>>,
clock: Arc<RwLock<u64>>,
async_protocol_generator: Box<dyn AsyncProtocolGenerator>,
}

impl TestGadget {
pub fn new<T: AsyncProtocolGenerator + 'static>(
blockchain_connection: tokio::sync::broadcast::Receiver<TestFinalityNotification>,
network_connection: tokio::sync::mpsc::UnboundedReceiver<TestProtocolMessage>,
run_test_at: Arc<Vec<u64>>,
async_protocol_generator: T,
) -> Self {
let clock = Arc::new(RwLock::new(0));
Self {
job_manager: ProtocolWorkManager::new(
TestWorkManager {
clock: clock.clone(),
},
10,
5,
PollMethod::Interval { millis: 100 },
),
blockchain_connection: Mutex::new(blockchain_connection),
network_connection: Mutex::new(network_connection),
run_test_at,
async_protocol_generator: Box::new(async_protocol_generator),
clock,
}
}
}

#[derive(Clone)]
pub struct TestFinalityNotification {
pub number: u64,
pub session_id: u64,
}

#[async_trait]
impl AbstractGadget for TestGadget {
type FinalityNotification = TestFinalityNotification;
type BlockImportNotification = ();
type ProtocolMessage = TestProtocolMessage;
type Error = TestError;

async fn get_next_finality_notification(&self) -> Option<Self::FinalityNotification> {
self.blockchain_connection.lock().await.recv().await.ok()
}

async fn get_next_block_import_notification(&self) -> Option<Self::BlockImportNotification> {
// We don't care to test block import notifications in this test gadget
futures::future::pending().await
}

async fn get_next_protocol_message(&self) -> Option<Self::ProtocolMessage> {
self.network_connection.lock().await.recv().await
}

async fn process_finality_notification(
&self,
notification: Self::FinalityNotification,
) -> Result<(), Self::Error> {
let now = notification.number;
let session_id = notification.session_id;
*self.clock.write() = now;

if self.run_test_at.contains(&now) {
log::info!("Running test at block {now}");
let task_hash = now.to_be_bytes();
let ssid = 0; // Assume SSID = 0 for now
let (remote, task) = create_test_async_protocol(
session_id,
now,
ssid,
task_hash,
&*self.async_protocol_generator,
);
self.job_manager
.push_task(task_hash, true, Arc::new(remote), task)
.map_err(|err| TestError {
reason: format!("Failed to push_task: {err:?}"),
})?;
}

Ok(())
}

async fn process_block_import_notification(
&self,
_notification: Self::BlockImportNotification,
) -> Result<(), Self::Error> {
unreachable!("We don't care to test block import notifications in this test gadget")
}

async fn process_protocol_message(
&self,
message: Self::ProtocolMessage,
) -> Result<(), Self::Error> {
self.job_manager
.deliver_message(message)
.map_err(|err| TestError {
reason: format!("{err:?}"),
})
.map(|_| ())
}

async fn process_error(&self, error: Self::Error) {
log::error!("{error:?}")
}
}

fn create_test_async_protocol(
session_id: <TestWorkManager as WorkManagerInterface>::SessionID,
now: <TestWorkManager as WorkManagerInterface>::Clock,
ssid: <TestWorkManager as WorkManagerInterface>::SSID,
task_id: <TestWorkManager as WorkManagerInterface>::TaskID,
proto_gen: &dyn AsyncProtocolGenerator,
) -> (TestProtocolRemote, Pin<Box<dyn SendFuture<'static, ()>>>) {
let is_done = Arc::new(AtomicBool::new(false));
let (to_async_protocol, protocol_message_rx) = tokio::sync::mpsc::unbounded_channel();
let (start_tx, start_rx) = tokio::sync::oneshot::channel();
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();

let params = TestAsyncProtocolParameters {
is_done: is_done.clone(),
protocol_message_rx,
start_rx: Some(start_rx),
shutdown_rx: Some(shutdown_rx),
associated_block_id: now,
associated_ssid: ssid,
associated_session_id: session_id,
associated_task_id: task_id,
};

let remote = TestProtocolRemote {
start_tx: parking_lot::Mutex::new(Some(start_tx)),
shutdown_tx: parking_lot::Mutex::new(Some(shutdown_tx)),
associated_block_id: now,
associated_ssid: ssid,
associated_session_id: session_id,
to_async_protocol,
is_done,
};

let future = proto_gen(params);

(remote, future)
}
6 changes: 6 additions & 0 deletions test-gadget/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pub mod blockchain;
pub mod error;
pub mod gadget;
pub mod message;
pub mod test_network;
pub mod work_manager;
34 changes: 34 additions & 0 deletions test-gadget/src/message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use crate::work_manager::TestWorkManager;
use gadget_core::job_manager::{ProtocolMessageMetadata, WorkManagerInterface};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Clone)]
pub struct TestProtocolMessage {
pub payload: Vec<u8>,
pub from: u32,
pub to: Option<u32>,
pub associated_block_id: <TestWorkManager as WorkManagerInterface>::Clock,
pub associated_session_id: <TestWorkManager as WorkManagerInterface>::SessionID,
pub associated_ssid: <TestWorkManager as WorkManagerInterface>::SSID,
pub associated_task_id: <TestWorkManager as WorkManagerInterface>::TaskID,
}

pub type UserID = u32;

impl ProtocolMessageMetadata<TestWorkManager> for TestProtocolMessage {
fn associated_block_id(&self) -> <TestWorkManager as WorkManagerInterface>::Clock {
self.associated_block_id
}

fn associated_session_id(&self) -> <TestWorkManager as WorkManagerInterface>::SessionID {
self.associated_session_id
}

fn associated_ssid(&self) -> <TestWorkManager as WorkManagerInterface>::SSID {
self.associated_ssid
}

fn associated_task(&self) -> <TestWorkManager as WorkManagerInterface>::TaskID {
self.associated_task_id
}
}
Loading

0 comments on commit f43877d

Please sign in to comment.