Skip to content

Commit

Permalink
Implement post-commit actions (#323)
Browse files Browse the repository at this point in the history
* Implement post-commit actions

* Remove empty file

* Actually call post-commit
  • Loading branch information
neekolas authored Nov 9, 2023
1 parent c7b6cb2 commit ded57cc
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 4 deletions.
79 changes: 77 additions & 2 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use xmtp_proto::api_client::{XmtpApiClient, XmtpMlsClient};

use self::intents::{AddMembersIntentData, IntentError, PostCommitAction, RemoveMembersIntentData};
use crate::{
api_client_wrapper::WelcomeMessage,
client::ClientError,
configuration::CIPHERSUITE,
storage::{
Expand All @@ -24,7 +25,7 @@ use crate::{
},
utils::{hash::sha256, time::now_ns, topic::get_group_topic},
xmtp_openmls_provider::XmtpOpenMlsProvider,
Client, Store,
Client, Delete, Store,
};

#[derive(Debug, Error)]
Expand Down Expand Up @@ -131,6 +132,8 @@ where
intent.store(&mut conn)?;

self.publish_intents(&mut conn).await?;
// ... sync state with the network
self.post_commit(&mut conn).await?;

Ok(())
}
Expand Down Expand Up @@ -296,6 +299,39 @@ where
}
}

pub(crate) async fn post_commit(&self, conn: &mut DbConnection) -> Result<(), GroupError> {
let intents = self.client.store.find_group_intents(
conn,
self.group_id.clone(),
Some(vec![IntentState::Committed]),
None,
)?;

for intent in intents {
if intent.post_commit_data.is_some() {
let post_commit_data = intent.post_commit_data.unwrap();
let post_commit_action = PostCommitAction::from_bytes(post_commit_data.as_slice())?;
match post_commit_action {
PostCommitAction::SendWelcomes(action) => {
let welcomes: Vec<WelcomeMessage> = action
.installation_ids
.into_iter()
.map(|installation_id| WelcomeMessage {
installation_id,
ciphertext: action.welcome_message.clone(),
})
.collect();
self.client.api_client.publish_welcomes(welcomes).await?;
}
}
}
let deleter: &mut dyn Delete<StoredGroupIntent, Key = i32> = conn;
deleter.delete(intent.id)?;
}

Ok(())
}

pub fn topic(&self) -> String {
get_group_topic(&self.group_id)
}
Expand All @@ -315,7 +351,7 @@ mod tests {
use openmls_traits::OpenMlsProvider;
use xmtp_cryptography::utils::generate_local_wallet;

use crate::builder::ClientBuilder;
use crate::{builder::ClientBuilder, utils::topic::get_welcome_topic};

#[tokio::test]
async fn test_send_message() {
Expand Down Expand Up @@ -446,4 +482,43 @@ mod tests {
let pending_commit = mls_group.pending_commit();
assert!(pending_commit.is_some());
}

#[tokio::test]
async fn test_post_commit() {
let client = ClientBuilder::new_test_client(generate_local_wallet().into()).await;
let client_2 = ClientBuilder::new_test_client(generate_local_wallet().into()).await;
client_2.register_identity().await.unwrap();
let group = client.create_group().expect("create group");
let conn = &mut client.store.conn().unwrap();

group
.add_members_by_installation_id(vec![client_2
.identity
.installation_keys
.to_public_vec()])
.await
.unwrap();

let intents = client
.store
.find_group_intents(conn, group.group_id.clone(), None, None)
.unwrap();
let intent = intents.first().unwrap();
// Set the intent to committed manually
client
.store
.set_group_intent_committed(conn, intent.id)
.unwrap();
group.post_commit(conn).await.unwrap();

// Check if the welcome was actually sent
let welcome_topic = get_welcome_topic(&client_2.identity.installation_keys.to_public_vec());
let welcome_messages = client
.api_client
.read_topic(welcome_topic.as_str(), 0)
.await
.unwrap();

assert_eq!(welcome_messages.len(), 1);
}
}
Empty file removed xmtp_mls/src/groups/publish.rs
Empty file.
9 changes: 8 additions & 1 deletion xmtp_mls/src/storage/encrypted_store/group_intent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use super::{
schema::{group_intents, group_intents::dsl},
DbConnection, EncryptedMessageStore,
};
use crate::{impl_fetch, impl_store, storage::StorageError};
use crate::{impl_fetch, impl_store, storage::StorageError, Delete};

pub type ID = i32;

Expand Down Expand Up @@ -53,6 +53,13 @@ pub struct StoredGroupIntent {

impl_fetch!(StoredGroupIntent, group_intents, ID);

impl Delete<StoredGroupIntent> for DbConnection {
type Key = ID;
fn delete(&mut self, key: ID) -> Result<usize, StorageError> {
Ok(diesel::delete(dsl::group_intents.find(key)).execute(self)?)
}
}

#[derive(Insertable, Debug, PartialEq, Clone)]
#[diesel(table_name = group_intents)]
pub struct NewGroupIntent {
Expand Down
3 changes: 2 additions & 1 deletion xmtp_mls/src/storage/sql_key_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ impl OpenMlsKeyStore for SqlKeyStore<'_> {
/// Interface is unclear on expected behavior when item is already deleted -
/// we choose to not surface an error if this is the case.
fn delete<V: MlsEntity>(&self, k: &[u8]) -> Result<(), Self::Error> {
let num_deleted = self.store.conn()?.delete(k.to_vec())?;
let conn: &mut dyn Delete<StoredKeyStoreEntry, Key = Vec<u8>> = &mut self.store.conn()?;
let num_deleted = conn.delete(k.to_vec())?;
if num_deleted == 0 {
debug!("No entry to delete for key {:?}", k);
}
Expand Down
4 changes: 4 additions & 0 deletions xmtp_mls/src/utils/topic.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
pub fn get_group_topic(group_id: &Vec<u8>) -> String {
format!("/xmtp/3/g-{}/proto", hex::encode(group_id))
}

pub fn get_welcome_topic(installation_id: &Vec<u8>) -> String {
format!("/xmtp/3/w-{}/proto", hex::encode(installation_id))
}

0 comments on commit ded57cc

Please sign in to comment.