Skip to content

Commit

Permalink
vastly improve testing infra
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Jun 7, 2024
1 parent fe2fc82 commit 2d7902d
Show file tree
Hide file tree
Showing 8 changed files with 419 additions and 273 deletions.
22 changes: 15 additions & 7 deletions cdn-broker/benches/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

use std::time::Duration;

use cdn_broker::reexports::tests::{TestDefinition, TestRun};
use cdn_broker::reexports::tests::{TestBroker, TestDefinition, TestRun, TestUser};
use cdn_broker::{assert_received, send_message_as};
use cdn_proto::connection::protocols::memory::Memory;
use cdn_proto::connection::Bytes;
use cdn_proto::def::TestTopic;
use cdn_proto::message::{Broadcast, Message};
Expand Down Expand Up @@ -49,11 +50,14 @@ fn bench_broadcast_user(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]],
connected_users: vec![
TestUser::with_index(0, vec![TestTopic::Global.into()]),
TestUser::with_index(1, vec![TestTopic::Global.into()]),
],
connected_brokers: vec![],
};

run_definition.into_run().await
run_definition.into_run::<Memory, Memory>().await
});

// Benchmark
Expand All @@ -71,14 +75,18 @@ fn bench_broadcast_broker(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![]],
connected_users: vec![TestUser::with_index(0, vec![])],
connected_brokers: vec![
(vec![], vec![TestTopic::Global as u8]),
(vec![], vec![TestTopic::Global as u8]),
TestBroker {
connected_users: vec![TestUser::with_index(1, vec![TestTopic::Global.into()])],
},
TestBroker {
connected_users: vec![TestUser::with_index(2, vec![TestTopic::Global.into()])],
},
],
};

run_definition.into_run().await
run_definition.into_run::<Memory, Memory>().await
});

// Benchmark
Expand Down
46 changes: 30 additions & 16 deletions cdn-broker/benches/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

use std::time::Duration;

use cdn_broker::reexports::tests::{TestDefinition, TestRun};
use cdn_broker::{assert_received, send_message_as};
use cdn_broker::reexports::tests::{TestBroker, TestDefinition, TestRun, TestUser};
use cdn_broker::{assert_received, at_index, send_message_as};
use cdn_proto::connection::protocols::memory::Memory;
use cdn_proto::connection::Bytes;
use cdn_proto::def::TestTopic;
use cdn_proto::message::{Direct, Message};
Expand All @@ -15,7 +16,7 @@ use pprof::criterion::{Output, PProfProfiler};
async fn direct_user_to_self(run: &TestRun) {
// Allocate a rather large message
let message = Message::Direct(Direct {
recipient: vec![0],
recipient: at_index![0],
message: vec![0; 10000],
});

Expand All @@ -29,7 +30,7 @@ async fn direct_user_to_self(run: &TestRun) {
async fn direct_user_to_user(run: &TestRun) {
// Allocate a rather large message
let message = Message::Direct(Direct {
recipient: vec![1],
recipient: at_index![1],
message: vec![0; 10000],
});

Expand All @@ -43,7 +44,7 @@ async fn direct_user_to_user(run: &TestRun) {
async fn direct_user_to_broker(run: &TestRun) {
// Allocate a rather large message
let message = Message::Direct(Direct {
recipient: vec![2],
recipient: at_index![2],
message: vec![0; 10000],
});

Expand All @@ -57,7 +58,7 @@ async fn direct_user_to_broker(run: &TestRun) {
async fn direct_broker_to_user(run: &TestRun) {
// Allocate a rather large message
let message = Message::Direct(Direct {
recipient: vec![0],
recipient: at_index![0],
message: vec![0; 10000],
});

Expand All @@ -76,11 +77,11 @@ fn bench_direct_user_to_self(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![TestTopic::Global as u8]],
connected_users: vec![TestUser::with_index(0, vec![TestTopic::Global as u8])],
connected_brokers: vec![],
};

run_definition.into_run().await
run_definition.into_run::<Memory, Memory>().await
});

// Run the benchmark
Expand All @@ -99,11 +100,14 @@ fn bench_direct_user_to_user(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]],
connected_users: vec![
TestUser::with_index(0, vec![TestTopic::Global as u8]),
TestUser::with_index(1, vec![TestTopic::Global as u8]),
],
connected_brokers: vec![],
};

run_definition.into_run().await
run_definition.into_run::<Memory, Memory>().await
});

// Run the benchmark
Expand All @@ -122,11 +126,16 @@ fn bench_direct_user_to_broker(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]],
connected_brokers: vec![(vec![2], vec![TestTopic::Global as u8])],
connected_users: vec![
TestUser::with_index(0, vec![TestTopic::Global as u8]),
TestUser::with_index(1, vec![TestTopic::Global as u8]),
],
connected_brokers: vec![TestBroker {
connected_users: vec![TestUser::with_index(2, vec![TestTopic::Global as u8])],
}],
};

run_definition.into_run().await
run_definition.into_run::<Memory, Memory>().await
});

// Run the benchmark
Expand All @@ -145,11 +154,16 @@ fn bench_direct_broker_to_user(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]],
connected_brokers: vec![(vec![2], vec![TestTopic::Global as u8])],
connected_users: vec![
TestUser::with_index(0, vec![TestTopic::Global as u8]),
TestUser::with_index(1, vec![TestTopic::Global as u8]),
],
connected_brokers: vec![TestBroker {
connected_users: vec![TestUser::with_index(0, vec![TestTopic::Global as u8])],
}],
};

run_definition.into_run().await
run_definition.into_run::<Memory, Memory>().await
});

// Run the benchmark
Expand Down
2 changes: 1 addition & 1 deletion cdn-broker/src/reexports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ pub mod error {
/// This is not guarded by `![cfg(test)]` because we use the same functions
/// when doing benchmarks.
pub mod tests {
pub use crate::tests::{TestDefinition, TestRun};
pub use crate::tests::{TestBroker, TestDefinition, TestRun, TestUser};
}
50 changes: 34 additions & 16 deletions cdn-broker/src/tests/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
use std::time::Duration;

use cdn_proto::{
connection::Bytes,
connection::{protocols::memory::Memory, Bytes},
def::TestTopic,
message::{Broadcast, Message},
};
use tokio::time::{sleep, timeout};

use super::TestDefinition;
use super::{TestBroker, TestDefinition, TestUser};
use crate::{assert_received, send_message_as};

/// Test sending a broadcast message from a user.
Expand All @@ -22,19 +22,28 @@ async fn test_broadcast_user() {
// This run definition: 3 brokers, 6 users
let run_definition = TestDefinition {
connected_users: vec![
vec![TestTopic::Global as u8, TestTopic::DA as u8],
vec![TestTopic::DA as u8],
vec![TestTopic::Global as u8],
TestUser::with_index(0, vec![TestTopic::Global.into(), TestTopic::DA.into()]),
TestUser::with_index(1, vec![TestTopic::DA.into()]),
TestUser::with_index(2, vec![TestTopic::Global.into()]),
],
connected_brokers: vec![
(vec![3], vec![TestTopic::DA as u8]),
(vec![4], vec![TestTopic::Global as u8, TestTopic::DA as u8]),
(vec![5], vec![]),
TestBroker {
connected_users: vec![TestUser::with_index(3, vec![TestTopic::DA.into()])],
},
TestBroker {
connected_users: vec![TestUser::with_index(
4,
vec![TestTopic::Global.into(), TestTopic::DA.into()],
)],
},
TestBroker {
connected_users: vec![TestUser::with_index(5, vec![])],
},
],
};

// Start the run
let run = run_definition.into_run().await;
let run = run_definition.into_run::<Memory, Memory>().await;

// We need a little time for our subscribe messages to propagate
sleep(Duration::from_millis(25)).await;
Expand Down Expand Up @@ -88,19 +97,28 @@ async fn test_broadcast_broker() {
// This run definition: 3 brokers, 6 users
let run_definition = TestDefinition {
connected_users: vec![
vec![TestTopic::Global as u8, TestTopic::DA as u8],
vec![TestTopic::DA as u8],
vec![TestTopic::Global as u8],
TestUser::with_index(0, vec![TestTopic::Global.into(), TestTopic::DA.into()]),
TestUser::with_index(1, vec![TestTopic::DA.into()]),
TestUser::with_index(2, vec![TestTopic::Global.into()]),
],
connected_brokers: vec![
(vec![3], vec![TestTopic::DA as u8]),
(vec![4], vec![TestTopic::Global as u8, TestTopic::DA as u8]),
(vec![5], vec![]),
TestBroker {
connected_users: vec![TestUser::with_index(3, vec![TestTopic::DA.into()])],
},
TestBroker {
connected_users: vec![TestUser::with_index(
4,
vec![TestTopic::Global.into(), TestTopic::DA.into()],
)],
},
TestBroker {
connected_users: vec![TestUser::with_index(5, vec![])],
},
],
};

// Start the run
let run = run_definition.into_run().await;
let run = run_definition.into_run::<Memory, Memory>().await;

// We need a little time for our subscribe messages to propagate
sleep(Duration::from_millis(25)).await;
Expand Down
Loading

0 comments on commit 2d7902d

Please sign in to comment.