diff --git a/xmtp_mls/src/groups/mls_sync.rs b/xmtp_mls/src/groups/mls_sync.rs index 0889c05ca..87109e81a 100644 --- a/xmtp_mls/src/groups/mls_sync.rs +++ b/xmtp_mls/src/groups/mls_sync.rs @@ -1603,33 +1603,53 @@ pub(crate) mod tests { use super::*; use crate::builder::ClientBuilder; - use futures::future; use std::sync::Arc; use xmtp_cryptography::utils::generate_local_wallet; - #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)] - #[cfg_attr(not(target_arch = "wasm32"), tokio::test(flavor = "multi_thread"))] + /// This test is not reproducible in webassembly, b/c webassembly has only one thread. + #[cfg_attr( + not(target_arch = "wasm32"), + tokio::test(flavor = "multi_thread", worker_threads = 10) + )] + #[cfg(not(target_family = "wasm"))] async fn publish_intents_worst_case_scenario() { let wallet = generate_local_wallet(); - let amal = Arc::new(ClientBuilder::new_test_client(&wallet).await); - let amal_group: Arc> = - Arc::new(amal.create_group(None, Default::default()).unwrap()); + let amal_a = Arc::new(ClientBuilder::new_test_client(&wallet).await); + let amal_group_a: Arc> = + Arc::new(amal_a.create_group(None, Default::default()).unwrap()); - amal_group.send_message_optimistic(b"1").unwrap(); - amal_group.send_message_optimistic(b"2").unwrap(); - amal_group.send_message_optimistic(b"3").unwrap(); - amal_group.send_message_optimistic(b"4").unwrap(); - amal_group.send_message_optimistic(b"5").unwrap(); - amal_group.send_message_optimistic(b"6").unwrap(); + let conn = amal_a.context().store().conn().unwrap(); + let provider: Arc = Arc::new(conn.into()); - let conn = amal.context().store().conn().unwrap(); - let provider: XmtpOpenMlsProvider = conn.into(); + // create group intent + amal_group_a.sync().await.unwrap(); + assert_eq!(provider.conn_ref().intents_deleted(), 1); - let mut futures = vec![]; - for _ in 0..10 { - futures.push(amal_group.publish_intents(&provider)) + for _ in 0..100 { + let s = xmtp_common::rand_string::<100>(); + amal_group_a.send_message_optimistic(s.as_bytes()).unwrap(); + } + + let mut set = tokio::task::JoinSet::new(); + for _ in 0..50 { + let g = amal_group_a.clone(); + let p = provider.clone(); + set.spawn(async move { g.publish_intents(&p).await }); + } + + let res = set.join_all().await; + let errs: Vec<&Result<_, _>> = res.iter().filter(|r| r.is_err()).collect(); + errs.iter().for_each(|e| { + tracing::error!("{}", e.as_ref().unwrap_err()); + }); + + let published = provider.conn_ref().intents_published(); + assert_eq!(published, 101); + let created = provider.conn_ref().intents_created(); + assert_eq!(created, 101); + if !errs.is_empty() { + panic!("Errors during publish"); } - future::join_all(futures).await; } #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)] diff --git a/xmtp_mls/src/storage/encrypted_store/group_intent.rs b/xmtp_mls/src/storage/encrypted_store/group_intent.rs index 7392ec885..70edb0956 100644 --- a/xmtp_mls/src/storage/encrypted_store/group_intent.rs +++ b/xmtp_mls/src/storage/encrypted_store/group_intent.rs @@ -214,10 +214,22 @@ impl DbConnection { })?; match res { - // If nothing matched the query, return an error. Either ID or state was wrong - 0 => Err(StorageError::NotFound(format!( - "ToPublish intent {intent_id} for publish" - ))), + // If nothing matched the query, check if its already published, otherwise return an error. Either ID or state was wrong + 0 => { + let already_published = self.raw_query(|conn| { + dsl::group_intents + .filter(dsl::id.eq(intent_id)) + .first::(conn) + }); + + if already_published.is_ok() { + Ok(()) + } else { + Err(StorageError::NotFound(format!( + "Published intent {intent_id} for commit" + ))) + } + } _ => Ok(()), } }