Skip to content

Commit

Permalink
Merge branch 'main' into mc/thread-safe-groups
Browse files Browse the repository at this point in the history
  • Loading branch information
mchenani authored Dec 17, 2024
2 parents 37ed3ec + 5e0761f commit 3e43b68
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 22 deletions.
56 changes: 38 additions & 18 deletions xmtp_mls/src/groups/mls_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1603,33 +1603,53 @@ pub(crate) mod tests {

use super::*;
use crate::builder::ClientBuilder;
use futures::future;
use std::sync::Arc;
use xmtp_cryptography::utils::generate_local_wallet;

#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
#[cfg_attr(not(target_arch = "wasm32"), tokio::test(flavor = "multi_thread"))]
/// This test is not reproducible in webassembly, b/c webassembly has only one thread.
#[cfg_attr(
not(target_arch = "wasm32"),
tokio::test(flavor = "multi_thread", worker_threads = 10)
)]
#[cfg(not(target_family = "wasm"))]
async fn publish_intents_worst_case_scenario() {
let wallet = generate_local_wallet();
let amal = Arc::new(ClientBuilder::new_test_client(&wallet).await);
let amal_group: Arc<MlsGroup<_>> =
Arc::new(amal.create_group(None, Default::default()).unwrap());
let amal_a = Arc::new(ClientBuilder::new_test_client(&wallet).await);
let amal_group_a: Arc<MlsGroup<_>> =
Arc::new(amal_a.create_group(None, Default::default()).unwrap());

amal_group.send_message_optimistic(b"1").unwrap();
amal_group.send_message_optimistic(b"2").unwrap();
amal_group.send_message_optimistic(b"3").unwrap();
amal_group.send_message_optimistic(b"4").unwrap();
amal_group.send_message_optimistic(b"5").unwrap();
amal_group.send_message_optimistic(b"6").unwrap();
let conn = amal_a.context().store().conn().unwrap();
let provider: Arc<XmtpOpenMlsProvider> = Arc::new(conn.into());

let conn = amal.context().store().conn().unwrap();
let provider: XmtpOpenMlsProvider = conn.into();
// create group intent
amal_group_a.sync().await.unwrap();
assert_eq!(provider.conn_ref().intents_deleted(), 1);

let mut futures = vec![];
for _ in 0..10 {
futures.push(amal_group.publish_intents(&provider))
for _ in 0..100 {
let s = xmtp_common::rand_string::<100>();
amal_group_a.send_message_optimistic(s.as_bytes()).unwrap();
}

let mut set = tokio::task::JoinSet::new();
for _ in 0..50 {
let g = amal_group_a.clone();
let p = provider.clone();
set.spawn(async move { g.publish_intents(&p).await });
}

let res = set.join_all().await;
let errs: Vec<&Result<_, _>> = res.iter().filter(|r| r.is_err()).collect();
errs.iter().for_each(|e| {
tracing::error!("{}", e.as_ref().unwrap_err());
});

let published = provider.conn_ref().intents_published();
assert_eq!(published, 101);
let created = provider.conn_ref().intents_created();
assert_eq!(created, 101);
if !errs.is_empty() {
panic!("Errors during publish");
}
future::join_all(futures).await;
}

#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
Expand Down
20 changes: 16 additions & 4 deletions xmtp_mls/src/storage/encrypted_store/group_intent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,22 @@ impl DbConnection {
})?;

match res {
// If nothing matched the query, return an error. Either ID or state was wrong
0 => Err(StorageError::NotFound(format!(
"ToPublish intent {intent_id} for publish"
))),
// If nothing matched the query, check if its already published, otherwise return an error. Either ID or state was wrong
0 => {
let already_published = self.raw_query(|conn| {
dsl::group_intents
.filter(dsl::id.eq(intent_id))
.first::<StoredGroupIntent>(conn)
});

if already_published.is_ok() {
Ok(())
} else {
Err(StorageError::NotFound(format!(
"Published intent {intent_id} for commit"
)))
}
}
_ => Ok(()),
}
}
Expand Down

0 comments on commit 3e43b68

Please sign in to comment.