From b962ebed4c488daa7c9faf7bd4ba4836dcfb3c95 Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Mon, 16 Dec 2024 13:19:17 -0500 Subject: [PATCH 1/2] intent not found tests --- xmtp_mls/src/groups/mls_sync.rs | 49 +++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/xmtp_mls/src/groups/mls_sync.rs b/xmtp_mls/src/groups/mls_sync.rs index 6a642bc41..dc2d46d2d 100644 --- a/xmtp_mls/src/groups/mls_sync.rs +++ b/xmtp_mls/src/groups/mls_sync.rs @@ -1619,33 +1619,48 @@ pub(crate) mod tests { use super::*; use crate::builder::ClientBuilder; - use futures::future; use std::sync::Arc; use xmtp_cryptography::utils::generate_local_wallet; #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)] - #[cfg_attr(not(target_arch = "wasm32"), tokio::test(flavor = "multi_thread"))] + #[cfg_attr( + not(target_arch = "wasm32"), + tokio::test(flavor = "multi_thread", worker_threads = 10) + )] async fn publish_intents_worst_case_scenario() { let wallet = generate_local_wallet(); - let amal = Arc::new(ClientBuilder::new_test_client(&wallet).await); - let amal_group: Arc> = - Arc::new(amal.create_group(None, Default::default()).unwrap()); + let amal_a = Arc::new(ClientBuilder::new_test_client(&wallet).await); + let amal_group_a: Arc> = + Arc::new(amal_a.create_group(None, Default::default()).unwrap()); - amal_group.send_message_optimistic(b"1").unwrap(); - amal_group.send_message_optimistic(b"2").unwrap(); - amal_group.send_message_optimistic(b"3").unwrap(); - amal_group.send_message_optimistic(b"4").unwrap(); - amal_group.send_message_optimistic(b"5").unwrap(); - amal_group.send_message_optimistic(b"6").unwrap(); + for _ in 0..500 { + let s = xmtp_common::rand_string::<100>(); + amal_group_a.send_message_optimistic(s.as_bytes()).unwrap(); + } - let conn = amal.context().store().conn().unwrap(); - let provider: XmtpOpenMlsProvider = conn.into(); + let conn = amal_a.context().store().conn().unwrap(); + let provider: Arc = Arc::new(conn.into()); - let mut futures = vec![]; - for _ in 0..10 { - futures.push(amal_group.publish_intents(&provider)) + let mut set = tokio::task::JoinSet::new(); + for _ in 0..100 { + let g = amal_group_a.clone(); + let p = provider.clone(); + set.spawn(async move { g.publish_intents(&p).await }); + } + + let res = set.join_all().await; + let errs: Vec<&Result<_, _>> = res.iter().filter(|r| r.is_err()).collect(); + errs.iter().for_each(|e| { + tracing::error!("{}", e.as_ref().unwrap_err()); + }); + + let published = provider.conn_ref().intents_published(); + assert_eq!(published, 501); + // let created = provider.conn_ref().intents_created(); + // assert_eq!(created, 501); + if !errs.is_empty() { + panic!("Errors during publish"); } - future::join_all(futures).await; } #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)] From 4c7723453df50c05e4a5d0622ba7ea4551c4f494 Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Mon, 16 Dec 2024 17:52:09 -0500 Subject: [PATCH 2/2] dont err if intent was already published --- xmtp_mls/src/groups/mls_sync.rs | 23 +++++++++++-------- .../storage/encrypted_store/group_intent.rs | 20 ++++++++++++---- 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/xmtp_mls/src/groups/mls_sync.rs b/xmtp_mls/src/groups/mls_sync.rs index dc2d46d2d..8b243002c 100644 --- a/xmtp_mls/src/groups/mls_sync.rs +++ b/xmtp_mls/src/groups/mls_sync.rs @@ -1622,27 +1622,32 @@ pub(crate) mod tests { use std::sync::Arc; use xmtp_cryptography::utils::generate_local_wallet; - #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)] + /// This test is not reproducible in webassembly, b/c webassembly has only one thread. #[cfg_attr( not(target_arch = "wasm32"), tokio::test(flavor = "multi_thread", worker_threads = 10) )] + #[cfg(not(target_family = "wasm"))] async fn publish_intents_worst_case_scenario() { let wallet = generate_local_wallet(); let amal_a = Arc::new(ClientBuilder::new_test_client(&wallet).await); let amal_group_a: Arc> = Arc::new(amal_a.create_group(None, Default::default()).unwrap()); - for _ in 0..500 { + let conn = amal_a.context().store().conn().unwrap(); + let provider: Arc = Arc::new(conn.into()); + + // create group intent + amal_group_a.sync().await.unwrap(); + assert_eq!(provider.conn_ref().intents_deleted(), 1); + + for _ in 0..100 { let s = xmtp_common::rand_string::<100>(); amal_group_a.send_message_optimistic(s.as_bytes()).unwrap(); } - let conn = amal_a.context().store().conn().unwrap(); - let provider: Arc = Arc::new(conn.into()); - let mut set = tokio::task::JoinSet::new(); - for _ in 0..100 { + for _ in 0..50 { let g = amal_group_a.clone(); let p = provider.clone(); set.spawn(async move { g.publish_intents(&p).await }); @@ -1655,9 +1660,9 @@ pub(crate) mod tests { }); let published = provider.conn_ref().intents_published(); - assert_eq!(published, 501); - // let created = provider.conn_ref().intents_created(); - // assert_eq!(created, 501); + assert_eq!(published, 101); + let created = provider.conn_ref().intents_created(); + assert_eq!(created, 101); if !errs.is_empty() { panic!("Errors during publish"); } diff --git a/xmtp_mls/src/storage/encrypted_store/group_intent.rs b/xmtp_mls/src/storage/encrypted_store/group_intent.rs index 40db6fd6c..bfb3c1399 100644 --- a/xmtp_mls/src/storage/encrypted_store/group_intent.rs +++ b/xmtp_mls/src/storage/encrypted_store/group_intent.rs @@ -214,10 +214,22 @@ impl DbConnection { })?; match res { - // If nothing matched the query, return an error. Either ID or state was wrong - 0 => Err(StorageError::NotFound(format!( - "ToPublish intent {intent_id} for publish" - ))), + // If nothing matched the query, check if its already published, otherwise return an error. Either ID or state was wrong + 0 => { + let already_published = self.raw_query(|conn| { + dsl::group_intents + .filter(dsl::id.eq(intent_id)) + .first::(conn) + }); + + if already_published.is_ok() { + Ok(()) + } else { + Err(StorageError::NotFound(format!( + "Published intent {intent_id} for commit" + ))) + } + } _ => Ok(()), } }