diff --git a/cdn-broker/benches/broadcast.rs b/cdn-broker/benches/broadcast.rs index 51f1336..bc9e191 100644 --- a/cdn-broker/benches/broadcast.rs +++ b/cdn-broker/benches/broadcast.rs @@ -3,8 +3,9 @@ use std::time::Duration; -use cdn_broker::reexports::tests::{TestDefinition, TestRun}; +use cdn_broker::reexports::tests::{TestBroker, TestDefinition, TestRun, TestUser}; use cdn_broker::{assert_received, send_message_as}; +use cdn_proto::connection::protocols::memory::Memory; use cdn_proto::connection::Bytes; use cdn_proto::def::TestTopic; use cdn_proto::message::{Broadcast, Message}; @@ -49,11 +50,14 @@ fn bench_broadcast_user(c: &mut Criterion) { // Set up our broker under test let run = benchmark_runtime.block_on(async move { let run_definition = TestDefinition { - connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]], + connected_users: vec![ + TestUser::with_index(0, vec![TestTopic::Global.into()]), + TestUser::with_index(1, vec![TestTopic::Global.into()]), + ], connected_brokers: vec![], }; - run_definition.into_run().await + run_definition.into_run::().await }); // Benchmark @@ -71,14 +75,18 @@ fn bench_broadcast_broker(c: &mut Criterion) { // Set up our broker under test let run = benchmark_runtime.block_on(async move { let run_definition = TestDefinition { - connected_users: vec![vec![]], + connected_users: vec![TestUser::with_index(0, vec![])], connected_brokers: vec![ - (vec![], vec![TestTopic::Global as u8]), - (vec![], vec![TestTopic::Global as u8]), + TestBroker { + connected_users: vec![TestUser::with_index(1, vec![TestTopic::Global.into()])], + }, + TestBroker { + connected_users: vec![TestUser::with_index(2, vec![TestTopic::Global.into()])], + }, ], }; - run_definition.into_run().await + run_definition.into_run::().await }); // Benchmark diff --git a/cdn-broker/benches/direct.rs b/cdn-broker/benches/direct.rs index a5b9e8e..184e1fd 100644 --- a/cdn-broker/benches/direct.rs +++ b/cdn-broker/benches/direct.rs @@ -3,8 +3,9 @@ use std::time::Duration; -use cdn_broker::reexports::tests::{TestDefinition, TestRun}; -use cdn_broker::{assert_received, send_message_as}; +use cdn_broker::reexports::tests::{TestBroker, TestDefinition, TestRun, TestUser}; +use cdn_broker::{assert_received, at_index, send_message_as}; +use cdn_proto::connection::protocols::memory::Memory; use cdn_proto::connection::Bytes; use cdn_proto::def::TestTopic; use cdn_proto::message::{Direct, Message}; @@ -15,7 +16,7 @@ use pprof::criterion::{Output, PProfProfiler}; async fn direct_user_to_self(run: &TestRun) { // Allocate a rather large message let message = Message::Direct(Direct { - recipient: vec![0], + recipient: at_index![0], message: vec![0; 10000], }); @@ -29,7 +30,7 @@ async fn direct_user_to_self(run: &TestRun) { async fn direct_user_to_user(run: &TestRun) { // Allocate a rather large message let message = Message::Direct(Direct { - recipient: vec![1], + recipient: at_index![1], message: vec![0; 10000], }); @@ -43,7 +44,7 @@ async fn direct_user_to_user(run: &TestRun) { async fn direct_user_to_broker(run: &TestRun) { // Allocate a rather large message let message = Message::Direct(Direct { - recipient: vec![2], + recipient: at_index![2], message: vec![0; 10000], }); @@ -57,7 +58,7 @@ async fn direct_user_to_broker(run: &TestRun) { async fn direct_broker_to_user(run: &TestRun) { // Allocate a rather large message let message = Message::Direct(Direct { - recipient: vec![0], + recipient: at_index![0], message: vec![0; 10000], }); @@ -76,11 +77,11 @@ fn bench_direct_user_to_self(c: &mut Criterion) { // Set up our broker under test let run = benchmark_runtime.block_on(async move { let run_definition = TestDefinition { - connected_users: vec![vec![TestTopic::Global as u8]], + connected_users: vec![TestUser::with_index(0, vec![TestTopic::Global as u8])], connected_brokers: vec![], }; - run_definition.into_run().await + run_definition.into_run::().await }); // Run the benchmark @@ -99,11 +100,14 @@ fn bench_direct_user_to_user(c: &mut Criterion) { // Set up our broker under test let run = benchmark_runtime.block_on(async move { let run_definition = TestDefinition { - connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]], + connected_users: vec![ + TestUser::with_index(0, vec![TestTopic::Global as u8]), + TestUser::with_index(1, vec![TestTopic::Global as u8]), + ], connected_brokers: vec![], }; - run_definition.into_run().await + run_definition.into_run::().await }); // Run the benchmark @@ -122,11 +126,16 @@ fn bench_direct_user_to_broker(c: &mut Criterion) { // Set up our broker under test let run = benchmark_runtime.block_on(async move { let run_definition = TestDefinition { - connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]], - connected_brokers: vec![(vec![2], vec![TestTopic::Global as u8])], + connected_users: vec![ + TestUser::with_index(0, vec![TestTopic::Global as u8]), + TestUser::with_index(1, vec![TestTopic::Global as u8]), + ], + connected_brokers: vec![TestBroker { + connected_users: vec![TestUser::with_index(2, vec![TestTopic::Global as u8])], + }], }; - run_definition.into_run().await + run_definition.into_run::().await }); // Run the benchmark @@ -145,11 +154,16 @@ fn bench_direct_broker_to_user(c: &mut Criterion) { // Set up our broker under test let run = benchmark_runtime.block_on(async move { let run_definition = TestDefinition { - connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]], - connected_brokers: vec![(vec![2], vec![TestTopic::Global as u8])], + connected_users: vec![ + TestUser::with_index(0, vec![TestTopic::Global as u8]), + TestUser::with_index(1, vec![TestTopic::Global as u8]), + ], + connected_brokers: vec![TestBroker { + connected_users: vec![TestUser::with_index(0, vec![TestTopic::Global as u8])], + }], }; - run_definition.into_run().await + run_definition.into_run::().await }); // Run the benchmark diff --git a/cdn-broker/src/reexports.rs b/cdn-broker/src/reexports.rs index 948b59b..120bd97 100644 --- a/cdn-broker/src/reexports.rs +++ b/cdn-broker/src/reexports.rs @@ -29,5 +29,5 @@ pub mod error { /// This is not guarded by `![cfg(test)]` because we use the same functions /// when doing benchmarks. pub mod tests { - pub use crate::tests::{TestDefinition, TestRun}; + pub use crate::tests::{TestBroker, TestDefinition, TestRun, TestUser}; } diff --git a/cdn-broker/src/tests/broadcast.rs b/cdn-broker/src/tests/broadcast.rs index 92eca01..c044528 100644 --- a/cdn-broker/src/tests/broadcast.rs +++ b/cdn-broker/src/tests/broadcast.rs @@ -4,13 +4,13 @@ use std::time::Duration; use cdn_proto::{ - connection::Bytes, + connection::{protocols::memory::Memory, Bytes}, def::TestTopic, message::{Broadcast, Message}, }; use tokio::time::{sleep, timeout}; -use super::TestDefinition; +use super::{TestBroker, TestDefinition, TestUser}; use crate::{assert_received, send_message_as}; /// Test sending a broadcast message from a user. @@ -22,19 +22,28 @@ async fn test_broadcast_user() { // This run definition: 3 brokers, 6 users let run_definition = TestDefinition { connected_users: vec![ - vec![TestTopic::Global as u8, TestTopic::DA as u8], - vec![TestTopic::DA as u8], - vec![TestTopic::Global as u8], + TestUser::with_index(0, vec![TestTopic::Global.into(), TestTopic::DA.into()]), + TestUser::with_index(1, vec![TestTopic::DA.into()]), + TestUser::with_index(2, vec![TestTopic::Global.into()]), ], connected_brokers: vec![ - (vec![3], vec![TestTopic::DA as u8]), - (vec![4], vec![TestTopic::Global as u8, TestTopic::DA as u8]), - (vec![5], vec![]), + TestBroker { + connected_users: vec![TestUser::with_index(3, vec![TestTopic::DA.into()])], + }, + TestBroker { + connected_users: vec![TestUser::with_index( + 4, + vec![TestTopic::Global.into(), TestTopic::DA.into()], + )], + }, + TestBroker { + connected_users: vec![TestUser::with_index(5, vec![])], + }, ], }; // Start the run - let run = run_definition.into_run().await; + let run = run_definition.into_run::().await; // We need a little time for our subscribe messages to propagate sleep(Duration::from_millis(25)).await; @@ -88,19 +97,28 @@ async fn test_broadcast_broker() { // This run definition: 3 brokers, 6 users let run_definition = TestDefinition { connected_users: vec![ - vec![TestTopic::Global as u8, TestTopic::DA as u8], - vec![TestTopic::DA as u8], - vec![TestTopic::Global as u8], + TestUser::with_index(0, vec![TestTopic::Global.into(), TestTopic::DA.into()]), + TestUser::with_index(1, vec![TestTopic::DA.into()]), + TestUser::with_index(2, vec![TestTopic::Global.into()]), ], connected_brokers: vec![ - (vec![3], vec![TestTopic::DA as u8]), - (vec![4], vec![TestTopic::Global as u8, TestTopic::DA as u8]), - (vec![5], vec![]), + TestBroker { + connected_users: vec![TestUser::with_index(3, vec![TestTopic::DA.into()])], + }, + TestBroker { + connected_users: vec![TestUser::with_index( + 4, + vec![TestTopic::Global.into(), TestTopic::DA.into()], + )], + }, + TestBroker { + connected_users: vec![TestUser::with_index(5, vec![])], + }, ], }; // Start the run - let run = run_definition.into_run().await; + let run = run_definition.into_run::().await; // We need a little time for our subscribe messages to propagate sleep(Duration::from_millis(25)).await; diff --git a/cdn-broker/src/tests/direct.rs b/cdn-broker/src/tests/direct.rs index 15b503f..9c74653 100644 --- a/cdn-broker/src/tests/direct.rs +++ b/cdn-broker/src/tests/direct.rs @@ -4,14 +4,14 @@ use std::time::Duration; use cdn_proto::{ - connection::Bytes, + connection::{protocols::memory::Memory, Bytes}, def::TestTopic, message::{Direct, Message}, }; use tokio::time::{sleep, timeout}; -use super::TestDefinition; -use crate::{assert_received, send_message_as}; +use super::{TestBroker, TestDefinition, TestUser}; +use crate::{assert_received, at_index, send_message_as}; /// This test tests that: /// 1. A user sending a message to itself on a broker has it delivered @@ -23,22 +23,28 @@ async fn test_direct_user_to_user() { // This run definition: 3 brokers, 6 users let run_definition = TestDefinition { connected_users: vec![ - vec![TestTopic::Global as u8], - vec![TestTopic::Global as u8, TestTopic::DA as u8], + TestUser::with_index(0, vec![TestTopic::Global.into()]), + TestUser::with_index(1, vec![TestTopic::DA.into()]), ], connected_brokers: vec![ - (vec![2], vec![TestTopic::DA as u8]), - (vec![3], vec![]), - (vec![4], vec![]), + TestBroker { + connected_users: vec![TestUser::with_index(2, vec![TestTopic::DA.into()])], + }, + TestBroker { + connected_users: vec![TestUser::with_index(3, vec![])], + }, + TestBroker { + connected_users: vec![TestUser::with_index(4, vec![])], + }, ], }; // Start the run - let run = run_definition.into_run().await; + let run = run_definition.into_run::().await; // Send a message from user_0 to itself let message = Message::Direct(Direct { - recipient: vec![0], + recipient: at_index![0], message: b"test direct 0".to_vec(), }); @@ -54,7 +60,7 @@ async fn test_direct_user_to_user() { // Create a message that user_1 will use to send to user_0 let message = Message::Direct(Direct { - recipient: vec![1], + recipient: at_index![1], message: b"test direct 1".to_vec(), }); @@ -78,30 +84,36 @@ async fn test_direct_user_to_broker() { // This run definition: 3 brokers, 6 users let run_definition = TestDefinition { connected_users: vec![ - vec![TestTopic::Global as u8], - vec![TestTopic::Global as u8, TestTopic::DA as u8], + TestUser::with_index(0, vec![TestTopic::Global.into()]), + TestUser::with_index(1, vec![TestTopic::Global.into(), TestTopic::DA.into()]), ], connected_brokers: vec![ - (vec![3], vec![TestTopic::DA as u8]), - (vec![2], vec![]), - (vec![4], vec![]), + TestBroker { + connected_users: vec![TestUser::with_index(2, vec![])], + }, + TestBroker { + connected_users: vec![TestUser::with_index(3, vec![TestTopic::DA.into()])], + }, + TestBroker { + connected_users: vec![TestUser::with_index(4, vec![])], + }, ], }; // Start the run - let run = run_definition.into_run().await; + let run = run_definition.into_run::().await; // Send a message as a user to another user that another broker owns (user_0 to user_2) let message = Message::Direct(Direct { - recipient: vec![2], + recipient: at_index![2], message: b"test direct 2".to_vec(), }); // Send the message as user_0 send_message_as!(run.connected_users[0], message); - // Assert broker_1 received it - assert_received!(yes, run.connected_brokers[1], message); + // Assert broker_0 received it + assert_received!(yes, run.connected_brokers[0], message); // Assert no one else got it, and we didn't get it again assert_received!(no, all, run.connected_users); @@ -117,23 +129,29 @@ async fn test_direct_broker_to_user() { // This run definition: 3 brokers, 6 users let run_definition = TestDefinition { connected_users: vec![ - vec![TestTopic::Global as u8], - vec![TestTopic::Global as u8, TestTopic::DA as u8], + TestUser::with_index(0, vec![TestTopic::Global.into()]), + TestUser::with_index(1, vec![TestTopic::Global.into(), TestTopic::DA.into()]), ], connected_brokers: vec![ - (vec![3], vec![TestTopic::DA as u8]), - (vec![2], vec![]), - (vec![4], vec![]), + TestBroker { + connected_users: vec![TestUser::with_index(2, vec![])], + }, + TestBroker { + connected_users: vec![TestUser::with_index(3, vec![TestTopic::DA.into()])], + }, + TestBroker { + connected_users: vec![TestUser::with_index(4, vec![])], + }, ], }; // Start the run - let run = run_definition.into_run().await; + let run = run_definition.into_run::().await; // Send a message as a broker through the test broker to a user that we own // Tests that broker_1 -> test_broker should not come back to us. let message = Message::Direct(Direct { - recipient: vec![2], + recipient: at_index![2], message: b"test direct 2".to_vec(), }); diff --git a/cdn-broker/src/tests/mod.rs b/cdn-broker/src/tests/mod.rs index aac6048..e5eb053 100644 --- a/cdn-broker/src/tests/mod.rs +++ b/cdn-broker/src/tests/mod.rs @@ -5,8 +5,16 @@ use std::sync::Arc; use cdn_proto::{ - connection::protocols::{memory::Memory, Connection}, - crypto::{rng::DeterministicRng, signature::KeyPair}, + connection::{ + middleware::Middleware, + protocols::{Connection, Listener, Protocol, UnfinalizedConnection}, + UserPublicKey, + }, + crypto::{ + rng::DeterministicRng, + signature::KeyPair, + tls::{generate_cert_from_ca, LOCAL_CA_CERT, LOCAL_CA_KEY}, + }, def::TestingRunDef, discovery::BrokerIdentifier, message::{Message, Topic}, @@ -15,29 +23,20 @@ use jf_signature::{bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS, Sign use rand::{rngs::StdRng, RngCore, SeedableRng}; use tokio::spawn; +use crate::{connections::DirectMap, Broker, Config}; + #[cfg(test)] mod broadcast; #[cfg(test)] mod direct; -use crate::{connections::DirectMap, Broker, Config}; - -/// An actor is a [user/broker] that we inject to test message send functionality. -pub struct InjectedActor { - /// The in-memory sender that sends to the broker under test - pub sender: Connection, - /// The in-memory receiver that receives from the broker under test - pub receiver: Connection, -} - /// This lets us send a message as a particular network actor. It just helps /// readability. #[macro_export] macro_rules! send_message_as { ($obj:expr, $message: expr) => { - $obj.sender - .send_message($message.clone()) + $obj.send_message($message.clone()) .await .expect("failed to send message"); }; @@ -70,7 +69,7 @@ macro_rules! assert_received { // Make sure we haven't received this message (no, $actor: expr) => { assert!( - timeout(Duration::from_millis(100), $actor.receiver.recv_message()) + timeout(Duration::from_millis(100), $actor.recv_message()) .await .is_err(), "wasn't supposed to receive a message but did" @@ -80,11 +79,7 @@ macro_rules! assert_received { // Make sure we have received the message in a timeframe of 50ms (yes, $actor: expr, $message:expr) => { // Receive the message with a timeout - let Ok(message) = timeout( - Duration::from_millis(50), - $actor.receiver.recv_message_raw(), - ) - .await + let Ok(message) = timeout(Duration::from_millis(50), $actor.recv_message_raw()).await else { panic!("timed out trying to receive message"); }; @@ -102,206 +97,294 @@ macro_rules! assert_received { }; } +/// Get the public key of a user at a particular index +#[macro_export] +macro_rules! at_index { + ($index: expr) => { + ($index as usize).to_le_bytes().to_vec() + }; +} + +/// A test user is a user that will be connected to the broker under test. +pub struct TestUser { + /// The public key of the user + pub public_key: UserPublicKey, + + /// The topics the user is subscribed to + pub subscribed_topics: Vec, +} + +impl TestUser { + /// Create a new test user with a particular index and subscribed topics + pub fn with_index(index: usize, subscribed_topics: Vec) -> Self { + let public_key = Arc::new(at_index!(index)); + Self { + public_key, + subscribed_topics, + } + } +} + +/// A test broker is a broker that will be connected to the broker under test. +pub struct TestBroker { + /// The users connected to this broker + pub connected_users: Vec, +} + +impl TestBroker { + /// Create a new test broker with a set of connected users + pub fn new(connected_users: Vec) -> Self { + Self { connected_users } + } +} + /// This is what we use to describe tests. These are the [brokers/users] connected /// _DIRECTLY_ to the broker under test, along with the topics they're subscribed to, /// and the user index they are responsible for. A connected user has the same "identity" /// as its index in the `connected_users` vector. pub struct TestDefinition { - pub connected_brokers: Vec<(Vec, Vec)>, - pub connected_users: Vec>, + pub connected_brokers: Vec, + pub connected_users: Vec, } /// A `TestRun` is converted from a `TestDefinition`. It contains actors with their -/// sending and receiving channels so we can pretend to be talking to the broker. +/// connections so we can pretend to be talking to the broker. pub struct TestRun { - /// The connected brokers and their handles - pub connected_brokers: Vec, + /// The connected brokers and their connections + pub connected_brokers: Vec, - /// The connected users and their handles - pub connected_users: Vec, + /// The connected users and their connections + pub connected_users: Vec, } -impl TestDefinition { - /// Creates a new broker under test. This configures and starts a local broker - /// who will be deterministically tested. - async fn new_broker_under_test() -> Broker { - // Create a key for our broker [under test] - let (private_key, public_key) = BLS::key_gen(&(), &mut DeterministicRng(0)).unwrap(); - - // Create a temporary SQLite file for the broker's discovery endpoint - let temp_dir = std::env::temp_dir(); - let discovery_endpoint = temp_dir - .join(format!("test-{}.sqlite", StdRng::from_entropy().next_u64())) - .to_string_lossy() - .into(); - - // Build the broker's config - let broker_config: Config = Config { - metrics_bind_endpoint: None, - public_advertise_endpoint: String::new(), - public_bind_endpoint: String::new(), - private_advertise_endpoint: String::new(), - private_bind_endpoint: String::new(), - discovery_endpoint, - keypair: KeyPair { - public_key, - private_key, - }, - global_memory_pool_size: None, - ca_cert_path: None, - ca_key_path: None, - }; +/// Generate `n` connection pairs for a given protocol +async fn gen_connection_pairs(num: usize) -> Vec<(Connection, Connection)> { + // Generate cert signed by local CA + let (cert, key) = + generate_cert_from_ca(LOCAL_CA_CERT, LOCAL_CA_KEY).expect("failed to generate cert"); - // Create the broker - Broker::new(broker_config) + // Get random port to bind to + let bind_endpoint = format!( + "127.0.0.1:{}", + portpicker::pick_unused_port().expect("failed to get unused port") + ); + + // Create the listener + let listener = P::bind(bind_endpoint.as_str(), cert, key) + .await + .expect("failed to bind"); + + // Create the list of connection pairs we will return + let mut connection_pairs = Vec::new(); + + for _ in 0..num { + // Spawn a task to connect the user to the broker + let bind_endpoint_ = bind_endpoint.clone(); + let unfinalized_outgoing_connection = + spawn(async move { P::connect(&bind_endpoint_, true, Middleware::none()).await }); + + // Accept the connection from the user + let incoming_connection = listener + .accept() + .await + .expect("failed to accept connection") + .finalize(Middleware::none()) .await - .expect("failed to create broker") + .expect("failed to finalize connection"); + + // Finalize the outgoing connection + let outgoing_connection = unfinalized_outgoing_connection + .await + .expect("failed to connect to broker") + .expect("failed to connect to broker"); + + // Add the connection pair to the list + connection_pairs.push((incoming_connection, outgoing_connection)); } - /// This is a helper function to inject users from our `TestDefinition` into the broker under test. - /// It creates sending and receiving channels, spawns a receive loop on the broker, - /// and adds the user to the internal state. - /// - /// Then, it sends subscription messages to the broker for the topics described in `TestDefinition` - fn inject_users( - broker_under_test: &Broker, - users: &[Vec], - ) -> Vec { - // Return this at the end, our running list of users - let mut injected_users: Vec = Vec::new(); - - // For each user, - for (i, topics) in users.iter().enumerate() { - // Extrapolate identifier - #[allow(clippy::cast_possible_truncation)] - let identifier: Arc> = Arc::from(vec![i as u8]); - - // Generate a testing pair of memory network channels - let connection1 = Memory::gen_testing_connection(); - let connection2 = Memory::gen_testing_connection(); - - // Create our user object - let injected_user = InjectedActor { - sender: connection1.clone(), - receiver: connection2.clone(), - }; - - // Spawn our user receiver in the broker under test - let inner = broker_under_test.inner.clone(); - let identifier_ = identifier.clone(); - let receive_handle = - spawn(async move { inner.user_receive_loop(&identifier_, connection1).await }) - .abort_handle(); - - // Inject our user into the connections - broker_under_test.inner.connections.write().add_user( - &identifier, - connection2, - topics, - receive_handle, - ); - - // Add to our running total - injected_users.push(injected_user); - } + connection_pairs +} +/// Create a new broker under test. All test users and brokers will be connected to this broker. +async fn new_broker_under_test() -> Broker> { + // Create a key for our broker [under test] + let (private_key, public_key) = BLS::key_gen(&(), &mut DeterministicRng(0)).unwrap(); + + // Create a temporary SQLite file for the broker's discovery endpoint + let temp_dir = std::env::temp_dir(); + let discovery_endpoint = temp_dir + .join(format!("test-{}.sqlite", StdRng::from_entropy().next_u64())) + .to_string_lossy() + .into(); + + // Build the broker's config + let broker_config = Config { + metrics_bind_endpoint: None, + public_advertise_endpoint: String::new(), + public_bind_endpoint: String::new(), + private_advertise_endpoint: String::new(), + private_bind_endpoint: String::new(), + discovery_endpoint, + keypair: KeyPair { + public_key, + private_key, + }, + global_memory_pool_size: None, + ca_cert_path: None, + ca_key_path: None, + }; + + // Create and return the broker + Broker::new(broker_config) + .await + .expect("failed to create broker") +} - injected_users +/// This is a helper function to inject users from our `TestDefinition` into the broker under test. +/// It creates the relevant connections, spawns a receive loop on the broker, and adds the user to +/// the internal state. +/// +/// After that, it sends subscription messages to the broker for the topics described in `TestDefinition` +async fn inject_users( + broker_under_test: &Broker>, + users: Vec, +) -> Vec { + // Generate a set of connected pairs, one for each user + // incoming (listener), outgoing (connect) + let mut connection_pairs = gen_connection_pairs::(users.len()).await; + + // Create the list of users we will return + let mut connected_users = Vec::new(); + + // For each user, + for user in users { + // Pop the next connection + let (incoming_connection, outgoing_connection) = connection_pairs + .pop() + .expect("not enough connections spawned"); + + // Spawn a task to handle the user inside of the broker + let inner = broker_under_test.inner.clone(); + let user_public_key = user.public_key.clone(); + let incoming_connection_ = incoming_connection.clone(); + let receive_handle = spawn(async move { + inner + .user_receive_loop(&user_public_key, incoming_connection_) + .await + }) + .abort_handle(); + + // Inject our user into the connections + broker_under_test.inner.connections.write().add_user( + &user.public_key.clone(), + incoming_connection, + &user.subscribed_topics, + receive_handle, + ); + + // Add our connection with our user so we can return it + connected_users.push(outgoing_connection); } - /// This is a helper function to inject brokers from our `TestDefinition` into the broker under test. - /// It creates sending and receiving channels, spawns a receive loop on the broker, - /// and adds the broker to the internal state. - /// - /// Then, it sends subscription messages to the broker for the topics described in `TestDefinition`, - /// and syncs the users up so the broker knows where to send messages. - async fn inject_brokers( - broker_under_test: &Broker, - brokers: Vec<(Vec, Vec)>, - ) -> Vec { - // Return this at the end, our running list of brokers - let mut injected_brokers: Vec = Vec::new(); - - // For each broker, - for (i, broker) in brokers.iter().enumerate() { - // Create our identifier - let identifier: BrokerIdentifier = format!("{i}/{i}") - .try_into() - .expect("failed to create broker identifier"); - - // Generate a testing pair of memory network channels - let connection1 = Memory::gen_testing_connection(); - let connection2 = Memory::gen_testing_connection(); - - // Create our broker object - let injected_broker = InjectedActor { - sender: connection1.clone(), - receiver: connection2.clone(), - }; - - // Spawn our receiver in the broker under test - let inner = broker_under_test.inner.clone(); - let identifier_ = identifier.clone(); - let receive_handle = spawn(async move { - inner - .broker_receive_loop(&identifier_, connection1) - .await - .unwrap(); - }) - .abort_handle(); - - // Inject our broker into the connections - broker_under_test.inner.connections.write().add_broker( - identifier.clone(), - connection2, - receive_handle, - ); - - // Send our subscriptions to it - let subscribe_message = Message::Subscribe(broker.1.clone()); - send_message_as!(injected_broker, subscribe_message); - - // Create a map of our users - let mut user_map = DirectMap::new(identifier.clone()); - - for user in broker.0.clone() { - user_map.insert(Arc::from(vec![user]), identifier.clone()); - } - - // Sync the map to the broker under test - let user_sync_message = Message::UserSync( - rkyv::to_bytes::<_, 256>(&user_map.diff()) - .expect("failed to serialize map") - .to_vec(), - ); - send_message_as!(injected_broker, user_sync_message); - - // Add to our running total - injected_brokers.push(injected_broker); + connected_users +} + +/// This is a helper function to inject brokers from our `TestDefinition` into the broker under test. +/// It creates the relevant connections, spawns a receive loop on the broker, and adds the broker to +/// the internal state. +/// +/// After that, it sends subscription messages to the broker for the topics described in `TestDefinition`, +/// and syncs the users up so the broker knows where to send messages. +async fn inject_brokers( + broker_under_test: &Broker>, + brokers: Vec, +) -> Vec { + // Generate a set of connected pairs, one for each broker + // incoming (listener), outgoing (connect) + let mut connection_pairs = gen_connection_pairs::(brokers.len()).await; + + // Create the list of brokers we will return + let mut connected_brokers = Vec::new(); + + // For each broker + for (i, broker) in brokers.into_iter().enumerate() { + // Create an identifier for the broker + let identifier: BrokerIdentifier = format!("{i}/{i}") + .try_into() + .expect("failed to create broker identifier"); + + // Pop the next connection + let (incoming_connection, outgoing_connection) = connection_pairs + .pop() + .expect("not enough connections spawned"); + + // Spawn a task to handle the broker inside of the broker under test + let inner = broker_under_test.inner.clone(); + let identifier_ = identifier.clone(); + let incoming_connection_ = incoming_connection.clone(); + let receive_handle = spawn(async move { + inner + .broker_receive_loop(&identifier_, incoming_connection_) + .await + }) + .abort_handle(); + + // Inject the broker into our connections + broker_under_test.inner.connections.write().add_broker( + identifier.clone(), + incoming_connection, + receive_handle, + ); + + // Aggregate the topics we should be subscribed to + let mut topics = Vec::new(); + for user in broker.connected_users.iter() { + topics.extend(user.subscribed_topics.clone()); } - injected_brokers + // Send our subscriptions to it + let subscribe_message = Message::Subscribe(topics); + send_message_as!(outgoing_connection, subscribe_message); + + // Create a map of our users + let mut user_map = DirectMap::new(identifier.clone()); + for user in broker.connected_users { + user_map.insert(Arc::from(user.public_key), identifier.clone()); + } + + // Sync the map to the broker under test + let user_sync_message = Message::UserSync( + rkyv::to_bytes::<_, 256>(&user_map.diff()) + .expect("failed to serialize map") + .to_vec(), + ); + send_message_as!(outgoing_connection, user_sync_message); + + // Add our connection with our broker so we can return it + connected_brokers.push(outgoing_connection); } - /// This is the conversion from a `TestDefinition` into a `Run`. Implicitly, the broker is started - /// and all sending and receiving operations on that broker start. - pub async fn into_run(self) -> TestRun { - // Create a new `Run`, which we will be returning + connected_brokers +} + +impl TestDefinition { + /// Start the test run, connecting all users and brokers to the broker under test. + pub async fn into_run(self) -> TestRun { + // Create the `Run` we will return let mut run = TestRun { - connected_users: vec![], - connected_brokers: vec![], + connected_users: Vec::new(), + connected_brokers: Vec::new(), }; - // Create our broker under test - let broker_under_test = Self::new_broker_under_test().await; + // Create a new broker under test with the provided protocols + let broker_under_test = new_broker_under_test::().await; - // Inject our brokers - run.connected_brokers = - Self::inject_brokers(&broker_under_test, self.connected_brokers).await; + // Inject the users into the broker under test + run.connected_users = inject_users(&broker_under_test, self.connected_users).await; - // Inject our users - run.connected_users = Self::inject_users(&broker_under_test, &self.connected_users); + // Inject the brokers into the broker under test + run.connected_brokers = inject_brokers(&broker_under_test, self.connected_brokers).await; - // Return our injected brokers and users + // Return the run run } } diff --git a/cdn-proto/src/def.rs b/cdn-proto/src/def.rs index 6d6f316..316742c 100644 --- a/cdn-proto/src/def.rs +++ b/cdn-proto/src/def.rs @@ -1,9 +1,9 @@ //! Compile-time run configuration for all CDN components. +use std::marker::PhantomData; use jf_signature::bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS; use num_enum::{IntoPrimitive, TryFromPrimitive}; -use crate::connection::protocols::memory::Memory; use crate::connection::protocols::{quic::Quic, tcp::Tcp, Protocol as ProtocolType}; use crate::crypto::signature::SignatureScheme; use crate::discovery::embedded::Embedded; @@ -92,21 +92,25 @@ impl ConnectionDef for ProductionClientConnection { } /// The testing run configuration. -/// Uses in-memory protocols and an embedded discovery client. -pub struct TestingRunDef; -impl RunDef for TestingRunDef { - type Broker = TestingConnection; - type User = TestingConnection; +/// Uses generic protocols and an embedded discovery client. +pub struct TestingRunDef { + pd: PhantomData<(B, U)>, +} +impl RunDef for TestingRunDef { + type Broker = TestingConnection; + type User = TestingConnection; type DiscoveryClientType = Embedded; type Topic = TestTopic; } /// The testing connection configuration. -/// Uses BLS signatures, in-memory protocols, and no middleware. -pub struct TestingConnection; -impl ConnectionDef for TestingConnection { +/// Uses BLS signatures, generic protocols, and no middleware. +pub struct TestingConnection { + pd: PhantomData

, +} +impl ConnectionDef for TestingConnection

{ type Scheme = BLS; - type Protocol = Memory; + type Protocol = P; } // Type aliases to automatically disambiguate usage diff --git a/tests/src/tests/mod.rs b/tests/src/tests/mod.rs index 8ed0297..77401a2 100644 --- a/tests/src/tests/mod.rs +++ b/tests/src/tests/mod.rs @@ -2,6 +2,7 @@ use cdn_broker::{Broker, Config as BrokerConfig}; use cdn_client::{Client, Config as ClientConfig}; use cdn_marshal::{Config as MarshalConfig, Marshal}; use cdn_proto::{ + connection::protocols::memory::Memory, crypto::signature::{KeyPair, Serializable, SignatureScheme}, def::{TestingConnection, TestingRunDef}, discovery::{embedded::Embedded, BrokerIdentifier, DiscoveryClient}, @@ -57,7 +58,7 @@ async fn new_broker(key: u64, public_ep: &str, private_ep: &str, discovery_ep: & let (private_key, public_key) = keypair_from_seed(key); // Create config - let config: BrokerConfig = BrokerConfig { + let config: BrokerConfig> = BrokerConfig { ca_cert_path: None, ca_key_path: None, discovery_endpoint: discovery_ep.to_string(), @@ -74,7 +75,7 @@ async fn new_broker(key: u64, public_ep: &str, private_ep: &str, discovery_ep: & }; // Create broker - let broker = Broker::::new(config) + let broker = Broker::>::new(config) .await .expect("failed to create broker"); @@ -96,7 +97,7 @@ async fn new_marshal(ep: &str, discovery_ep: &str) { }; // Create a new marshal - let marshal = Marshal::::new(config) + let marshal = Marshal::>::new(config) .await .expect("failed to create marshal"); @@ -106,7 +107,7 @@ async fn new_marshal(ep: &str, discovery_ep: &str) { /// Create a new client, supplying it with the given topics and marshal /// endpoint. `Key` is a deterministic, seeded keypair. -fn new_client(key: u64, topics: Vec, marshal_ep: &str) -> Client { +fn new_client(key: u64, topics: Vec, marshal_ep: &str) -> Client> { // Generate keypair let (private_key, public_key) = keypair_from_seed(key); @@ -122,7 +123,7 @@ fn new_client(key: u64, topics: Vec, marshal_ep: &str) -> Client::new(config) + Client::>::new(config) } /// Create a new database client with the given endpoint and identity.