diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 45a1fe5b3..7187cc8d8 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -15,6 +15,7 @@ use xmtp_proto::api_client::{XmtpApiClient, XmtpMlsClient}; use self::intents::{AddMembersIntentData, IntentError, PostCommitAction, RemoveMembersIntentData}; use crate::{ + api_client_wrapper::WelcomeMessage, client::ClientError, configuration::CIPHERSUITE, storage::{ @@ -24,7 +25,7 @@ use crate::{ }, utils::{hash::sha256, time::now_ns, topic::get_group_topic}, xmtp_openmls_provider::XmtpOpenMlsProvider, - Client, Store, + Client, Delete, Store, }; #[derive(Debug, Error)] @@ -131,6 +132,8 @@ where intent.store(&mut conn)?; self.publish_intents(&mut conn).await?; + // ... sync state with the network + self.post_commit(&mut conn).await?; Ok(()) } @@ -296,6 +299,39 @@ where } } + pub(crate) async fn post_commit(&self, conn: &mut DbConnection) -> Result<(), GroupError> { + let intents = self.client.store.find_group_intents( + conn, + self.group_id.clone(), + Some(vec![IntentState::Committed]), + None, + )?; + + for intent in intents { + if intent.post_commit_data.is_some() { + let post_commit_data = intent.post_commit_data.unwrap(); + let post_commit_action = PostCommitAction::from_bytes(post_commit_data.as_slice())?; + match post_commit_action { + PostCommitAction::SendWelcomes(action) => { + let welcomes: Vec = action + .installation_ids + .into_iter() + .map(|installation_id| WelcomeMessage { + installation_id, + ciphertext: action.welcome_message.clone(), + }) + .collect(); + self.client.api_client.publish_welcomes(welcomes).await?; + } + } + } + let deleter: &mut dyn Delete = conn; + deleter.delete(intent.id)?; + } + + Ok(()) + } + pub fn topic(&self) -> String { get_group_topic(&self.group_id) } @@ -315,7 +351,7 @@ mod tests { use openmls_traits::OpenMlsProvider; use xmtp_cryptography::utils::generate_local_wallet; - use crate::builder::ClientBuilder; + use crate::{builder::ClientBuilder, utils::topic::get_welcome_topic}; #[tokio::test] async fn test_send_message() { @@ -446,4 +482,43 @@ mod tests { let pending_commit = mls_group.pending_commit(); assert!(pending_commit.is_some()); } + + #[tokio::test] + async fn test_post_commit() { + let client = ClientBuilder::new_test_client(generate_local_wallet().into()).await; + let client_2 = ClientBuilder::new_test_client(generate_local_wallet().into()).await; + client_2.register_identity().await.unwrap(); + let group = client.create_group().expect("create group"); + let conn = &mut client.store.conn().unwrap(); + + group + .add_members_by_installation_id(vec![client_2 + .identity + .installation_keys + .to_public_vec()]) + .await + .unwrap(); + + let intents = client + .store + .find_group_intents(conn, group.group_id.clone(), None, None) + .unwrap(); + let intent = intents.first().unwrap(); + // Set the intent to committed manually + client + .store + .set_group_intent_committed(conn, intent.id) + .unwrap(); + group.post_commit(conn).await.unwrap(); + + // Check if the welcome was actually sent + let welcome_topic = get_welcome_topic(&client_2.identity.installation_keys.to_public_vec()); + let welcome_messages = client + .api_client + .read_topic(welcome_topic.as_str(), 0) + .await + .unwrap(); + + assert_eq!(welcome_messages.len(), 1); + } } diff --git a/xmtp_mls/src/groups/publish.rs b/xmtp_mls/src/groups/publish.rs deleted file mode 100644 index e69de29bb..000000000 diff --git a/xmtp_mls/src/storage/encrypted_store/group_intent.rs b/xmtp_mls/src/storage/encrypted_store/group_intent.rs index 5c0665c86..e81868008 100644 --- a/xmtp_mls/src/storage/encrypted_store/group_intent.rs +++ b/xmtp_mls/src/storage/encrypted_store/group_intent.rs @@ -13,7 +13,7 @@ use super::{ schema::{group_intents, group_intents::dsl}, DbConnection, EncryptedMessageStore, }; -use crate::{impl_fetch, impl_store, storage::StorageError}; +use crate::{impl_fetch, impl_store, storage::StorageError, Delete}; pub type ID = i32; @@ -53,6 +53,13 @@ pub struct StoredGroupIntent { impl_fetch!(StoredGroupIntent, group_intents, ID); +impl Delete for DbConnection { + type Key = ID; + fn delete(&mut self, key: ID) -> Result { + Ok(diesel::delete(dsl::group_intents.find(key)).execute(self)?) + } +} + #[derive(Insertable, Debug, PartialEq, Clone)] #[diesel(table_name = group_intents)] pub struct NewGroupIntent { diff --git a/xmtp_mls/src/storage/sql_key_store.rs b/xmtp_mls/src/storage/sql_key_store.rs index 442bccf69..b7821d962 100644 --- a/xmtp_mls/src/storage/sql_key_store.rs +++ b/xmtp_mls/src/storage/sql_key_store.rs @@ -70,7 +70,8 @@ impl OpenMlsKeyStore for SqlKeyStore<'_> { /// Interface is unclear on expected behavior when item is already deleted - /// we choose to not surface an error if this is the case. fn delete(&self, k: &[u8]) -> Result<(), Self::Error> { - let num_deleted = self.store.conn()?.delete(k.to_vec())?; + let conn: &mut dyn Delete> = &mut self.store.conn()?; + let num_deleted = conn.delete(k.to_vec())?; if num_deleted == 0 { debug!("No entry to delete for key {:?}", k); } diff --git a/xmtp_mls/src/utils/topic.rs b/xmtp_mls/src/utils/topic.rs index 4b2dc05f7..4685984fc 100644 --- a/xmtp_mls/src/utils/topic.rs +++ b/xmtp_mls/src/utils/topic.rs @@ -1,3 +1,7 @@ pub fn get_group_topic(group_id: &Vec) -> String { format!("/xmtp/3/g-{}/proto", hex::encode(group_id)) } + +pub fn get_welcome_topic(installation_id: &Vec) -> String { + format!("/xmtp/3/w-{}/proto", hex::encode(installation_id)) +}