diff --git a/xmtp_mls/src/groups/mls_sync.rs b/xmtp_mls/src/groups/mls_sync.rs index 6a642bc41..e2183e58a 100644 --- a/xmtp_mls/src/groups/mls_sync.rs +++ b/xmtp_mls/src/groups/mls_sync.rs @@ -1619,33 +1619,43 @@ pub(crate) mod tests { use super::*; use crate::builder::ClientBuilder; - use futures::future; use std::sync::Arc; use xmtp_cryptography::utils::generate_local_wallet; #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)] - #[cfg_attr(not(target_arch = "wasm32"), tokio::test(flavor = "multi_thread"))] + #[cfg_attr( + not(target_arch = "wasm32"), + tokio::test(flavor = "multi_thread", worker_threads = 10) + )] async fn publish_intents_worst_case_scenario() { let wallet = generate_local_wallet(); - let amal = Arc::new(ClientBuilder::new_test_client(&wallet).await); - let amal_group: Arc> = - Arc::new(amal.create_group(None, Default::default()).unwrap()); + let amal_a = Arc::new(ClientBuilder::new_test_client(&wallet).await); + let amal_group_a: Arc> = + Arc::new(amal_a.create_group(None, Default::default()).unwrap()); - amal_group.send_message_optimistic(b"1").unwrap(); - amal_group.send_message_optimistic(b"2").unwrap(); - amal_group.send_message_optimistic(b"3").unwrap(); - amal_group.send_message_optimistic(b"4").unwrap(); - amal_group.send_message_optimistic(b"5").unwrap(); - amal_group.send_message_optimistic(b"6").unwrap(); + for _ in 0..500 { + let s = xmtp_common::rand_string::<100>(); + amal_group_a.send_message_optimistic(s.as_bytes()).unwrap(); + } - let conn = amal.context().store().conn().unwrap(); - let provider: XmtpOpenMlsProvider = conn.into(); + let conn = amal_a.context().store().conn().unwrap(); + let provider: Arc = Arc::new(conn.into()); - let mut futures = vec![]; - for _ in 0..10 { - futures.push(amal_group.publish_intents(&provider)) + let mut set = tokio::task::JoinSet::new(); + for _ in 0..100 { + let g = amal_group_a.clone(); + let p = provider.clone(); + set.spawn(async move { g.publish_intents(&p).await }); + } + + let res = set.join_all().await; + let errs: Vec<&Result<_, _>> = res.iter().filter(|r| r.is_err()).collect(); + errs.iter().for_each(|e| { + tracing::error!("{}", e.as_ref().unwrap_err()); + }); + if !errs.is_empty() { + panic!("Errors during publish"); } - future::join_all(futures).await; } #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]