From bb7a38f73e68140499d4f9646928b50fb50e18d9 Mon Sep 17 00:00:00 2001 From: Cameron Voell <1103838+cameronvoell@users.noreply.github.com> Date: Tue, 1 Oct 2024 11:08:33 -0700 Subject: [PATCH] MLS Dms - Dual Sending Pre-requisites (#1076) * Adds functions for creating a DM group (#901) * update bindings cargo locks * Added dm group functionality * dm members can update all metadata * fix tests * fix indentation * fix test imports * gen protos back to main --------- Co-authored-by: cameronvoell * Private Preferences DB (#946) * create the database migration for the private preference work * update the table to be focused on consent * first pass at database storage structure * update the get method for consent records * fix up the set method * add a test * fix up the test * fix up the clippy error with consent record * fix up the clippy error with consent record * fix up all clippy issues * cargo fmt * Validate dm group metadata + permissions from welcome (#1075) * validate dm group before creating from welcome * lint fix * lint fix --------- Co-authored-by: cameronvoell * fixes after merge * DM updates - default to not displaying dm groups (#1046) * bindings create_dm function * find groups by default does not include dm groups * fmt fix * dont execute callbacks when dm group welcomes are streamed * Update bindings_ffi/src/mls.rs Co-authored-by: Andrew Plaza * fixed bad merge * filter dms in stream_conversations * surface include_dm_groups in bindings list function more clearly --------- Co-authored-by: cameronvoell Co-authored-by: Andrew Plaza * fix merge conflicts * cargo clippy * Remove tracing from test * Fix test * try and fix the tests * fix up the test --------- Co-authored-by: cameronvoell Co-authored-by: Naomi Plasterer Co-authored-by: Andrew Plaza Co-authored-by: Ry Racherbaumer --- bindings_ffi/src/mls.rs | 79 ++- bindings_node/src/conversations.rs | 22 +- examples/cli/cli-client.rs | 3 +- .../down.sql | 3 + .../up.sql | 3 + xmtp_mls/src/client.rs | 77 ++- xmtp_mls/src/groups/group_metadata.rs | 87 +++- xmtp_mls/src/groups/group_mutable_metadata.rs | 29 ++ xmtp_mls/src/groups/group_permissions.rs | 268 ++++++++-- xmtp_mls/src/groups/message_history.rs | 8 +- xmtp_mls/src/groups/mod.rs | 467 ++++++++++++++++-- xmtp_mls/src/groups/validated_commit.rs | 4 +- xmtp_mls/src/storage/encrypted_store/group.rs | 49 +- .../storage/encrypted_store/group_intent.rs | 1 + xmtp_mls/src/storage/encrypted_store/mod.rs | 2 + .../src/storage/encrypted_store/schema.rs | 1 + xmtp_mls/src/subscriptions.rs | 141 +++++- 17 files changed, 1098 insertions(+), 146 deletions(-) create mode 100644 xmtp_mls/migrations/2024-09-09-231735_create_dm_inbox_id/down.sql create mode 100644 xmtp_mls/migrations/2024-09-09-231735_create_dm_inbox_id/up.sql diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index b081cf455..968b5843a 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -17,6 +17,7 @@ use xmtp_id::{ associations::{builder::SignatureRequest, generate_inbox_id as xmtp_id_generate_inbox_id}, InboxId, }; +use xmtp_mls::client::FindGroupParams; use xmtp_mls::groups::group_mutable_metadata::MetadataField; use xmtp_mls::groups::group_permissions::BasePolicies; use xmtp_mls::groups::group_permissions::GroupMutablePermissionsError; @@ -780,6 +781,20 @@ impl FfiConversations { Ok(out) } + pub async fn create_dm(&self, account_address: String) -> Result, GenericError> { + log::info!("creating dm with target address: {}", account_address); + + let convo = self.inner_client.create_dm(account_address).await?; + + let out = Arc::new(FfiGroup { + inner_client: self.inner_client.clone(), + group_id: convo.group_id, + created_at_ns: convo.created_at_ns, + }); + + Ok(out) + } + pub async fn process_streamed_welcome_message( &self, envelope_bytes: Vec, @@ -804,7 +819,16 @@ impl FfiConversations { pub async fn sync_all_groups(&self) -> Result { let inner = self.inner_client.as_ref(); - let groups = inner.find_groups(None, None, None, None)?; + let groups = inner.find_groups(FindGroupParams { + include_dm_groups: true, + ..FindGroupParams::default() + })?; + + log::info!( + "groups for client inbox id {:?}: {:?}", + self.inner_client.inbox_id(), + groups.len() + ); let num_groups_synced: usize = inner.sync_all_groups(groups).await?; // Uniffi does not work with usize, so we need to convert to u32 @@ -824,12 +848,13 @@ impl FfiConversations { ) -> Result>, GenericError> { let inner = self.inner_client.as_ref(); let convo_list: Vec> = inner - .find_groups( - None, - opts.created_after_ns, - opts.created_before_ns, - opts.limit, - )? + .find_groups(FindGroupParams { + allowed_states: None, + created_after_ns: opts.created_after_ns, + created_before_ns: opts.created_before_ns, + limit: opts.limit, + include_dm_groups: false, + })? .into_iter() .map(|group| { Arc::new(FfiGroup { @@ -845,14 +870,17 @@ impl FfiConversations { pub async fn stream(&self, callback: Box) -> FfiStreamCloser { let client = self.inner_client.clone(); - let handle = - RustXmtpClient::stream_conversations_with_callback(client.clone(), move |convo| { + let handle = RustXmtpClient::stream_conversations_with_callback( + client.clone(), + move |convo| { callback.on_conversation(Arc::new(FfiGroup { inner_client: client.clone(), group_id: convo.group_id, created_at_ns: convo.created_at_ns, })) - }); + }, + false, + ); FfiStreamCloser::new(handle) } @@ -3700,6 +3728,37 @@ mod tests { ); } + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + async fn test_dms_sync_but_do_not_list() { + let alix = new_test_client().await; + let bola = new_test_client().await; + + let alix_conversations = alix.conversations(); + let bola_conversations = bola.conversations(); + + let _alix_group = alix_conversations + .create_dm(bola.account_address.clone()) + .await + .unwrap(); + let alix_num_sync = alix_conversations.sync_all_groups().await.unwrap(); + bola_conversations.sync().await.unwrap(); + let bola_num_sync = bola_conversations.sync_all_groups().await.unwrap(); + assert_eq!(alix_num_sync, 1); + assert_eq!(bola_num_sync, 1); + + let alix_groups = alix_conversations + .list(FfiListConversationsOptions::default()) + .await + .unwrap(); + assert_eq!(alix_groups.len(), 0); + + let bola_groups = bola_conversations + .list(FfiListConversationsOptions::default()) + .await + .unwrap(); + assert_eq!(bola_groups.len(), 0); + } + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] async fn test_set_and_get_group_consent() { let alix = new_test_client().await; diff --git a/bindings_node/src/conversations.rs b/bindings_node/src/conversations.rs index 3928c36f0..ed39f37fa 100644 --- a/bindings_node/src/conversations.rs +++ b/bindings_node/src/conversations.rs @@ -6,6 +6,7 @@ use napi::bindgen_prelude::{Error, Result, Uint8Array}; use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFunctionCallMode}; use napi::JsFunction; use napi_derive::napi; +use xmtp_mls::client::FindGroupParams; use xmtp_mls::groups::{GroupMetadataOptions, PreconfiguredPolicies}; use crate::messages::NapiMessage; @@ -171,12 +172,12 @@ impl NapiConversations { }; let convo_list: Vec = self .inner_client - .find_groups( - None, - opts.created_after_ns, - opts.created_before_ns, - opts.limit, - ) + .find_groups(FindGroupParams { + created_after_ns: opts.created_after_ns, + created_before_ns: opts.created_before_ns, + limit: opts.limit, + ..FindGroupParams::default() + }) .map_err(|e| Error::from_reason(format!("{}", e)))? .into_iter() .map(|group| { @@ -196,8 +197,9 @@ impl NapiConversations { let tsfn: ThreadsafeFunction = callback.create_threadsafe_function(0, |ctx| Ok(vec![ctx.value]))?; let client = self.inner_client.clone(); - let stream_closer = - RustXmtpClient::stream_conversations_with_callback(client.clone(), move |convo| { + let stream_closer = RustXmtpClient::stream_conversations_with_callback( + client.clone(), + move |convo| { tsfn.call( Ok(NapiGroup::new( client.clone(), @@ -206,7 +208,9 @@ impl NapiConversations { )), ThreadsafeFunctionCallMode::Blocking, ); - }); + }, + false, + ); Ok(NapiStreamCloser::new(stream_closer)) } diff --git a/examples/cli/cli-client.rs b/examples/cli/cli-client.rs index 6ede48fa3..9125e2ff6 100755 --- a/examples/cli/cli-client.rs +++ b/examples/cli/cli-client.rs @@ -19,6 +19,7 @@ use futures::future::join_all; use kv_log_macro::{error, info}; use prost::Message; use xmtp_id::associations::unverified::{UnverifiedRecoverableEcdsaSignature, UnverifiedSignature}; +use xmtp_mls::client::FindGroupParams; use xmtp_mls::groups::message_history::MessageHistoryContent; use xmtp_mls::storage::group_message::GroupMessageKind; @@ -209,7 +210,7 @@ async fn main() { // recv(&client).await.unwrap(); let group_list = client - .find_groups(None, None, None, None) + .find_groups(FindGroupParams::default()) .expect("failed to list groups"); for group in group_list.iter() { group.sync(&client).await.expect("error syncing group"); diff --git a/xmtp_mls/migrations/2024-09-09-231735_create_dm_inbox_id/down.sql b/xmtp_mls/migrations/2024-09-09-231735_create_dm_inbox_id/down.sql new file mode 100644 index 000000000..2dbbf5787 --- /dev/null +++ b/xmtp_mls/migrations/2024-09-09-231735_create_dm_inbox_id/down.sql @@ -0,0 +1,3 @@ +-- This file should undo anything in `up.sql` +ALTER TABLE groups DROP COLUMN dm_inbox_id; +DROP INDEX idx_dm_target; diff --git a/xmtp_mls/migrations/2024-09-09-231735_create_dm_inbox_id/up.sql b/xmtp_mls/migrations/2024-09-09-231735_create_dm_inbox_id/up.sql new file mode 100644 index 000000000..6614f371a --- /dev/null +++ b/xmtp_mls/migrations/2024-09-09-231735_create_dm_inbox_id/up.sql @@ -0,0 +1,3 @@ +-- Your SQL goes here +ALTER TABLE groups ADD COLUMN dm_inbox_id text; +CREATE INDEX idx_dm_target ON groups(dm_inbox_id); diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index 6f5bc6246..9e2e5e369 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -218,6 +218,15 @@ impl From<&str> for ClientError { } } +#[derive(Debug, Default)] +pub struct FindGroupParams { + pub allowed_states: Option>, + pub created_after_ns: Option, + pub created_before_ns: Option, + pub limit: Option, + pub include_dm_groups: bool, +} + /// Clients manage access to the network, identity, and data store #[derive(Debug)] pub struct Client { @@ -495,6 +504,49 @@ where Ok(group) } + /// Create a new Direct Message with the default settings + pub async fn create_dm(&self, account_address: String) -> Result { + tracing::info!("creating dm with address: {}", account_address); + + let inbox_id = match self + .find_inbox_id_from_address(account_address.clone()) + .await? + { + Some(id) => id, + None => { + return Err(ClientError::Storage(StorageError::NotFound(format!( + "inbox id for address {} not found", + account_address + )))) + } + }; + + self.create_dm_by_inbox_id(inbox_id).await + } + + /// Create a new Direct Message with the default settings + pub async fn create_dm_by_inbox_id( + &self, + dm_target_inbox_id: InboxId, + ) -> Result { + tracing::info!("creating dm with {}", dm_target_inbox_id); + + let group = MlsGroup::create_dm_and_insert( + self.context.clone(), + GroupMembershipState::Allowed, + dm_target_inbox_id.clone(), + )?; + + group + .add_members_by_inbox_id(self, vec![dm_target_inbox_id]) + .await?; + + // notify any streams of the new group + let _ = self.local_events.send(LocalEvents::NewGroup(group.clone())); + + Ok(group) + } + #[cfg(feature = "message-history")] pub(crate) fn create_sync_group(&self) -> Result { tracing::info!("creating sync group"); @@ -542,17 +594,17 @@ where /// - created_after_ns: only return groups created after the given timestamp (in nanoseconds) /// - created_before_ns: only return groups created before the given timestamp (in nanoseconds) /// - limit: only return the first `limit` groups - pub fn find_groups( - &self, - allowed_states: Option>, - created_after_ns: Option, - created_before_ns: Option, - limit: Option, - ) -> Result, ClientError> { + pub fn find_groups(&self, params: FindGroupParams) -> Result, ClientError> { Ok(self .store() .conn()? - .find_groups(allowed_states, created_after_ns, created_before_ns, limit)? + .find_groups( + params.allowed_states, + params.created_after_ns, + params.created_before_ns, + params.limit, + params.include_dm_groups, + )? .into_iter() .map(|stored_group| { MlsGroup::new( @@ -857,6 +909,7 @@ mod tests { use crate::{ builder::ClientBuilder, + client::FindGroupParams, groups::GroupMetadataOptions, hpke::{decrypt_welcome, encrypt_welcome}, identity::serialize_key_package_hash_ref, @@ -958,7 +1011,7 @@ mod tests { .create_group(None, GroupMetadataOptions::default()) .unwrap(); - let groups = client.find_groups(None, None, None, None).unwrap(); + let groups = client.find_groups(FindGroupParams::default()).unwrap(); assert_eq!(groups.len(), 2); assert_eq!(groups[0].group_id, group_1.group_id); assert_eq!(groups[1].group_id, group_2.group_id); @@ -1024,7 +1077,7 @@ mod tests { let bob_received_groups = bo.sync_welcomes().await.unwrap(); assert_eq!(bob_received_groups.len(), 2); - let bo_groups = bo.find_groups(None, None, None, None).unwrap(); + let bo_groups = bo.find_groups(FindGroupParams::default()).unwrap(); let bo_group1 = bo.group(alix_bo_group1.clone().group_id).unwrap(); let bo_messages1 = bo_group1 .find_messages(None, None, None, None, None) @@ -1129,7 +1182,7 @@ mod tests { tracing::info!("Syncing bolas welcomes"); // See if Bola can see that they were added to the group bola.sync_welcomes().await.unwrap(); - let bola_groups = bola.find_groups(None, None, None, None).unwrap(); + let bola_groups = bola.find_groups(FindGroupParams::default()).unwrap(); assert_eq!(bola_groups.len(), 1); let bola_group = bola_groups.first().unwrap(); tracing::info!("Syncing bolas messages"); @@ -1262,7 +1315,7 @@ mod tests { bo.sync_welcomes().await.unwrap(); // Bo should have two groups now - let bo_groups = bo.find_groups(None, None, None, None).unwrap(); + let bo_groups = bo.find_groups(FindGroupParams::default()).unwrap(); assert_eq!(bo_groups.len(), 2); // Bo's original key should be deleted diff --git a/xmtp_mls/src/groups/group_metadata.rs b/xmtp_mls/src/groups/group_metadata.rs index 608294e14..d80ea93ec 100644 --- a/xmtp_mls/src/groups/group_metadata.rs +++ b/xmtp_mls/src/groups/group_metadata.rs @@ -3,7 +3,8 @@ use prost::Message; use thiserror::Error; use xmtp_proto::xmtp::mls::message_contents::{ - ConversationType as ConversationTypeProto, GroupMetadataV1 as GroupMetadataProto, + ConversationType as ConversationTypeProto, DmMembers as DmMembersProto, + GroupMetadataV1 as GroupMetadataProto, Inbox as InboxProto, }; #[derive(Debug, Error)] @@ -16,6 +17,10 @@ pub enum GroupMetadataError { InvalidConversationType, #[error("missing extension")] MissingExtension, + #[error("invalid dm members")] + InvalidDmMembers, + #[error("missing a dm member")] + MissingDmMember, } #[derive(Debug, Clone, PartialEq)] @@ -23,40 +28,35 @@ pub struct GroupMetadata { pub conversation_type: ConversationType, // TODO: Remove this once transition is completed pub creator_inbox_id: String, + pub dm_members: Option, } impl GroupMetadata { - pub fn new(conversation_type: ConversationType, creator_inbox_id: String) -> Self { + pub fn new( + conversation_type: ConversationType, + creator_inbox_id: String, + dm_members: Option, + ) -> Self { Self { conversation_type, creator_inbox_id, + dm_members, } } - - pub(crate) fn from_proto(proto: GroupMetadataProto) -> Result { - Ok(Self::new( - proto.conversation_type.try_into()?, - proto.creator_inbox_id.clone(), - )) - } - - pub(crate) fn to_proto(&self) -> Result { - let conversation_type: ConversationTypeProto = self.conversation_type.clone().into(); - Ok(GroupMetadataProto { - conversation_type: conversation_type as i32, - creator_inbox_id: self.creator_inbox_id.clone(), - creator_account_address: "".to_string(), // TODO: remove from proto - dm_members: None, - }) - } } impl TryFrom for Vec { type Error = GroupMetadataError; fn try_from(value: GroupMetadata) -> Result { - let mut buf = Vec::new(); - let proto_val = value.to_proto()?; + let conversation_type: ConversationTypeProto = value.conversation_type.clone().into(); + let proto_val = GroupMetadataProto { + conversation_type: conversation_type as i32, + creator_inbox_id: value.creator_inbox_id.clone(), + creator_account_address: "".to_string(), // TODO: remove from proto + dm_members: value.dm_members.clone().map(|dm| dm.into()), + }; + let mut buf: Vec = Vec::new(); proto_val.encode(&mut buf)?; Ok(buf) @@ -68,7 +68,7 @@ impl TryFrom<&Vec> for GroupMetadata { fn try_from(value: &Vec) -> Result { let proto_val = GroupMetadataProto::decode(value.as_slice())?; - Self::from_proto(proto_val) + proto_val.try_into() } } @@ -76,7 +76,12 @@ impl TryFrom for GroupMetadata { type Error = GroupMetadataError; fn try_from(value: GroupMetadataProto) -> Result { - Self::from_proto(value) + let dm_members = value.dm_members.map(DmMembers::try_from).transpose()?; + Ok(Self::new( + value.conversation_type.try_into()?, + value.creator_inbox_id, + dm_members, + )) } } @@ -121,6 +126,42 @@ impl TryFrom for ConversationType { } } +#[derive(Debug, Clone, PartialEq)] +pub struct DmMembers { + pub member_one_inbox_id: String, + pub member_two_inbox_id: String, +} + +impl From for DmMembersProto { + fn from(value: DmMembers) -> Self { + DmMembersProto { + dm_member_one: Some(InboxProto { + inbox_id: value.member_one_inbox_id.clone(), + }), + dm_member_two: Some(InboxProto { + inbox_id: value.member_two_inbox_id.clone(), + }), + } + } +} + +impl TryFrom for DmMembers { + type Error = GroupMetadataError; + + fn try_from(value: DmMembersProto) -> Result { + Ok(Self { + member_one_inbox_id: value + .dm_member_one + .ok_or(GroupMetadataError::MissingDmMember)? + .inbox_id, + member_two_inbox_id: value + .dm_member_two + .ok_or(GroupMetadataError::MissingDmMember)? + .inbox_id, + }) + } +} + pub fn extract_group_metadata(group: &OpenMlsGroup) -> Result { let extension = group .export_group_context() diff --git a/xmtp_mls/src/groups/group_mutable_metadata.rs b/xmtp_mls/src/groups/group_mutable_metadata.rs index 9b1ac6ae7..c693782e5 100644 --- a/xmtp_mls/src/groups/group_mutable_metadata.rs +++ b/xmtp_mls/src/groups/group_mutable_metadata.rs @@ -130,6 +130,35 @@ impl GroupMutableMetadata { } } + // Admin / super admin is not needed for a DM + pub fn new_dm_default(_creator_inbox_id: String, _dm_target_inbox_id: &str) -> Self { + let mut attributes = HashMap::new(); + // TODO: would it be helpful to incorporate the dm inbox ids in the name or description? + attributes.insert( + MetadataField::GroupName.to_string(), + DEFAULT_GROUP_NAME.to_string(), + ); + attributes.insert( + MetadataField::Description.to_string(), + DEFAULT_GROUP_DESCRIPTION.to_string(), + ); + attributes.insert( + MetadataField::GroupImageUrlSquare.to_string(), + DEFAULT_GROUP_IMAGE_URL_SQUARE.to_string(), + ); + attributes.insert( + MetadataField::GroupPinnedFrameUrl.to_string(), + DEFAULT_GROUP_PINNED_FRAME_URL.to_string(), + ); + let admin_list = vec![]; + let super_admin_list = vec![]; + Self { + attributes, + admin_list, + super_admin_list, + } + } + /// Returns a vector of supported metadata fields. /// /// These fields will receive default permission policies for new groups. diff --git a/xmtp_mls/src/groups/group_permissions.rs b/xmtp_mls/src/groups/group_permissions.rs index cdd2b33fc..cff50081d 100644 --- a/xmtp_mls/src/groups/group_permissions.rs +++ b/xmtp_mls/src/groups/group_permissions.rs @@ -232,6 +232,15 @@ impl MetadataPolicies { map } + // by default members of DM groups can update all metadata + pub fn dm_map() -> HashMap { + let mut map: HashMap = HashMap::new(); + for field in GroupMutableMetadata::supported_fields() { + map.insert(field.to_string(), MetadataPolicies::allow()); + } + map + } + /// Creates an "Allow" metadata policy. pub fn allow() -> Self { MetadataPolicies::Standard(MetadataBasePolicies::Allow) @@ -898,17 +907,41 @@ impl PolicySet { } } + pub fn new_dm() -> Self { + Self { + add_member_policy: MembershipPolicies::deny(), + remove_member_policy: MembershipPolicies::deny(), + update_metadata_policy: MetadataPolicies::dm_map(), + add_admin_policy: PermissionsPolicies::deny(), + remove_admin_policy: PermissionsPolicies::deny(), + update_permissions_policy: PermissionsPolicies::deny(), + } + } + /// The [evaluate_commit] function is the core function for client side verification /// that [ValidatedCommit](crate::groups::validated_commit::ValidatedCommit) /// adheres to the XMTP permission policies set in the PolicySet. pub fn evaluate_commit(&self, commit: &ValidatedCommit) -> bool { // Verify add member policy was not violated - let added_inboxes_valid = self.evaluate_policy( + let mut added_inboxes_valid = self.evaluate_policy( commit.added_inboxes.iter(), &self.add_member_policy, &commit.actor, ); + // We can always add DM member's inboxId to a DM + if let Some(dm_members) = &commit.dm_members { + if commit.added_inboxes.len() == 1 { + let added_inbox_id = &commit.added_inboxes[0].inbox_id; + if (added_inbox_id == &dm_members.member_one_inbox_id + || added_inbox_id == &dm_members.member_two_inbox_id) + && added_inbox_id != &commit.actor_inbox_id() + { + added_inboxes_valid = true; + } + } + } + // Verify remove member policy was not violated // Super admin can not be removed from a group let removed_inboxes_valid = self.evaluate_policy( @@ -1240,7 +1273,10 @@ impl std::fmt::Display for PreconfiguredPolicies { #[cfg(test)] mod tests { use crate::{ - groups::{group_mutable_metadata::MetadataField, validated_commit::MutableMetadataChanges}, + groups::{ + group_metadata::DmMembers, group_mutable_metadata::MetadataField, + validated_commit::MutableMetadataChanges, + }, utils::test::{rand_string, rand_vec}, }; @@ -1271,27 +1307,35 @@ mod tests { } } + enum MemberType { + SameAsActor, + DmTarget, + Random, + } + /// Test helper function for building a ValidatedCommit. fn build_validated_commit( // Add a member with the same account address as the actor if true, random account address if false - member_added: Option, - member_removed: Option, + member_added: Option, + member_removed: Option, metadata_fields_changed: Option>, permissions_changed: bool, actor_is_admin: bool, actor_is_super_admin: bool, + dm_target_inbox_id: Option, ) -> ValidatedCommit { let actor = build_actor(None, None, actor_is_admin, actor_is_super_admin); - let build_membership_change = |same_address_as_actor| { - if same_address_as_actor { - vec![build_change( - Some(actor.inbox_id.clone()), - actor_is_admin, - actor_is_super_admin, - )] - } else { - vec![build_change(None, false, false)] + let dm_target_inbox_id_clone = dm_target_inbox_id.clone(); + let build_membership_change = |member_type: MemberType| match member_type { + MemberType::SameAsActor => vec![build_change( + Some(actor.inbox_id.clone()), + actor_is_admin, + actor_is_super_admin, + )], + MemberType::DmTarget => { + vec![build_change(dm_target_inbox_id_clone.clone(), false, false)] } + MemberType::Random => vec![build_change(None, false, false)], }; let field_changes = metadata_fields_changed @@ -1300,6 +1344,15 @@ mod tests { .map(|field| MetadataFieldChange::new(field, Some(rand_string()), Some(rand_string()))) .collect(); + let dm_members = if let Some(dm_target_inbox_id) = dm_target_inbox_id { + Some(DmMembers { + member_one_inbox_id: actor.inbox_id.clone(), + member_two_inbox_id: dm_target_inbox_id, + }) + } else { + None + }; + ValidatedCommit { actor: actor.clone(), added_inboxes: member_added @@ -1313,6 +1366,7 @@ mod tests { ..Default::default() }, permissions_changed, + dm_members, } } @@ -1329,7 +1383,15 @@ mod tests { PermissionsPolicies::allow_if_actor_super_admin(), ); - let commit = build_validated_commit(Some(true), Some(true), None, false, false, false); + let commit = build_validated_commit( + Some(MemberType::SameAsActor), + Some(MemberType::SameAsActor), + None, + false, + false, + false, + None, + ); assert!(permissions.evaluate_commit(&commit)); } @@ -1345,12 +1407,26 @@ mod tests { PermissionsPolicies::allow_if_actor_super_admin(), ); - let member_added_commit = - build_validated_commit(Some(false), None, None, false, false, false); + let member_added_commit = build_validated_commit( + Some(MemberType::Random), + None, + None, + false, + false, + false, + None, + ); assert!(!permissions.evaluate_commit(&member_added_commit)); - let member_removed_commit = - build_validated_commit(None, Some(false), None, false, false, false); + let member_removed_commit = build_validated_commit( + None, + Some(MemberType::Random), + None, + false, + false, + false, + None, + ); assert!(!permissions.evaluate_commit(&member_removed_commit)); } @@ -1367,16 +1443,37 @@ mod tests { ); // Can not remove the creator if they are the only super admin - let commit_with_creator = - build_validated_commit(Some(true), Some(true), None, false, false, true); + let commit_with_creator = build_validated_commit( + Some(MemberType::SameAsActor), + Some(MemberType::SameAsActor), + None, + false, + false, + true, + None, + ); assert!(!permissions.evaluate_commit(&commit_with_creator)); - let commit_with_creator = - build_validated_commit(Some(true), Some(false), None, false, false, true); + let commit_with_creator = build_validated_commit( + Some(MemberType::SameAsActor), + Some(MemberType::Random), + None, + false, + false, + true, + None, + ); assert!(permissions.evaluate_commit(&commit_with_creator)); - let commit_without_creator = - build_validated_commit(Some(true), Some(true), None, false, false, false); + let commit_without_creator = build_validated_commit( + Some(MemberType::SameAsActor), + Some(MemberType::SameAsActor), + None, + false, + false, + false, + None, + ); assert!(!permissions.evaluate_commit(&commit_without_creator)); } @@ -1395,8 +1492,15 @@ mod tests { PermissionsPolicies::allow_if_actor_super_admin(), ); - let member_added_commit = - build_validated_commit(Some(true), None, None, false, false, false); + let member_added_commit = build_validated_commit( + Some(MemberType::SameAsActor), + None, + None, + false, + false, + false, + None, + ); assert!(!permissions.evaluate_commit(&member_added_commit)); } @@ -1415,8 +1519,15 @@ mod tests { PermissionsPolicies::allow_if_actor_super_admin(), ); - let member_added_commit = - build_validated_commit(Some(true), None, None, false, false, false); + let member_added_commit = build_validated_commit( + Some(MemberType::SameAsActor), + None, + None, + false, + false, + false, + None, + ); assert!(permissions.evaluate_commit(&member_added_commit)); } @@ -1461,12 +1572,13 @@ mod tests { ); let member_added_commit = build_validated_commit( - Some(true), + Some(MemberType::SameAsActor), None, Some(vec![MetadataField::GroupName.to_string()]), false, false, false, + None, ); assert!(allow_permissions.evaluate_commit(&member_added_commit)); @@ -1551,11 +1663,11 @@ mod tests { ); // Commit should fail because actor is not superadmin - let commit = build_validated_commit(None, None, None, true, false, false); + let commit = build_validated_commit(None, None, None, true, false, false, None); assert!(!permissions.evaluate_commit(&commit)); // Commit should pass because actor is superadmin - let commit = build_validated_commit(None, None, None, true, false, true); + let commit = build_validated_commit(None, None, None, true, false, true, None); assert!(permissions.evaluate_commit(&commit)); } @@ -1580,6 +1692,7 @@ mod tests { false, false, false, + None, ); assert!(permissions.evaluate_commit(&name_updated_commit)); @@ -1591,6 +1704,7 @@ mod tests { false, false, false, + None, ); assert!(!permissions.evaluate_commit(&non_existing_field_updated_commit)); @@ -1602,6 +1716,7 @@ mod tests { false, true, false, + None, ); assert!(permissions.evaluate_commit(&non_existing_field_updated_commit)); @@ -1615,6 +1730,7 @@ mod tests { false, true, false, + None, ); assert!(!permissions.evaluate_commit(&non_existing_field_updated_commit)); @@ -1628,7 +1744,97 @@ mod tests { false, false, true, + None, ); assert!(permissions.evaluate_commit(&non_existing_field_updated_commit)); } + + #[test] + fn test_dm_group_permissions() { + // Simulate a group with DM Permissions + let permissions = PolicySet::new_dm(); + + // String below represents the inbox id of the DM target + const TARGET_INBOX_ID: &str = "example_target_dm_id"; + + // DM group can not add a random inbox + let commit = build_validated_commit( + Some(MemberType::Random), + None, + None, + false, + false, + false, + Some(TARGET_INBOX_ID.to_string()), + ); + assert!(!permissions.evaluate_commit(&commit)); + + // DM group can not add themselves + let commit = build_validated_commit( + Some(MemberType::SameAsActor), + None, + None, + false, + false, + false, + Some(TARGET_INBOX_ID.to_string()), + ); + assert!(!permissions.evaluate_commit(&commit)); + + // DM group can add the target inbox + let commit = build_validated_commit( + Some(MemberType::DmTarget), + None, + None, + false, + false, + false, + Some(TARGET_INBOX_ID.to_string()), + ); + assert!(permissions.evaluate_commit(&commit)); + + // DM group can not remove + let commit = build_validated_commit( + None, + Some(MemberType::Random), + None, + false, + false, + false, + Some(TARGET_INBOX_ID.to_string()), + ); + assert!(!permissions.evaluate_commit(&commit)); + let commit = build_validated_commit( + None, + Some(MemberType::DmTarget), + None, + false, + false, + false, + Some(TARGET_INBOX_ID.to_string()), + ); + assert!(!permissions.evaluate_commit(&commit)); + let commit = build_validated_commit( + None, + Some(MemberType::SameAsActor), + None, + false, + false, + false, + Some(TARGET_INBOX_ID.to_string()), + ); + assert!(!permissions.evaluate_commit(&commit)); + + // DM group can update metadata + let commit = build_validated_commit( + None, + None, + Some(vec![MetadataField::GroupName.to_string()]), + false, + false, + false, + Some(TARGET_INBOX_ID.to_string()), + ); + assert!(permissions.evaluate_commit(&commit)); + } } diff --git a/xmtp_mls/src/groups/message_history.rs b/xmtp_mls/src/groups/message_history.rs index 3b08eb9a8..ed9896c10 100644 --- a/xmtp_mls/src/groups/message_history.rs +++ b/xmtp_mls/src/groups/message_history.rs @@ -135,7 +135,7 @@ where pub async fn ensure_member_of_all_groups(&self, inbox_id: String) -> Result<(), GroupError> { let conn = self.store().conn()?; - let groups = conn.find_groups(None, None, None, None)?; + let groups = conn.find_groups(None, None, None, None, false)?; for group in groups { let group = self.group(group.id)?; Box::pin(group.add_members_by_inbox_id(self, vec![inbox_id.clone()])).await?; @@ -384,7 +384,7 @@ where self.sync_welcomes().await?; let conn = self.store().conn()?; - let groups = conn.find_groups(None, None, None, None)?; + let groups = conn.find_groups(None, None, None, None, false)?; for crate::storage::group::StoredGroup { id, .. } in groups.into_iter() { let group = self.group(id)?; Box::pin(group.sync(self)).await?; @@ -502,14 +502,14 @@ where async fn prepare_groups_to_sync(&self) -> Result, MessageHistoryError> { let conn = self.store().conn()?; - Ok(conn.find_groups(None, None, None, None)?) + Ok(conn.find_groups(None, None, None, None, false)?) } async fn prepare_messages_to_sync( &self, ) -> Result, MessageHistoryError> { let conn = self.store().conn()?; - let groups = conn.find_groups(None, None, None, None)?; + let groups = conn.find_groups(None, None, None, None, false)?; let mut all_messages: Vec = vec![]; for StoredGroup { id, .. } in groups.into_iter() { diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 1bf4775d3..bf6357cfb 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -40,7 +40,7 @@ pub use self::intents::{AddressesOrInstallationIds, IntentError}; use self::message_history::MessageHistoryError; use self::{ group_membership::GroupMembership, - group_metadata::extract_group_metadata, + group_metadata::{extract_group_metadata, DmMembers}, group_mutable_metadata::{GroupMutableMetadata, GroupMutableMetadataError, MetadataField}, group_permissions::{ extract_group_permissions, GroupMutablePermissions, GroupMutablePermissionsError, @@ -79,7 +79,7 @@ use crate::{ SEND_MESSAGE_UPDATE_INSTALLATIONS_INTERVAL_NS, }, hpke::{decrypt_welcome, HpkeError}, - identity::{parse_credential, Identity, IdentityError}, + identity::{parse_credential, IdentityError}, identity_updates::{load_identity_updates, InstallationDiffError}, retry::RetryableError, storage::{ @@ -184,6 +184,8 @@ pub enum GroupError { PublishCancelled, #[error("the publish failed to complete due to panic")] PublishPanicked, + #[error("dm requires target inbox_id")] + InvalidDmMissingInboxId, #[error("Missing metadata field {name}")] MissingMetadataField { name: String }, #[error("Message was processed but is missing")] @@ -293,9 +295,10 @@ impl MlsGroup { ) -> Result { let conn = context.store.conn()?; let provider = XmtpOpenMlsProvider::new(conn); + let creator_inbox_id = context.inbox_id(); let protected_metadata = - build_protected_metadata_extension(&context.identity, Purpose::Conversation)?; - let mutable_metadata = build_mutable_metadata_extension_default(&context.identity, opts)?; + build_protected_metadata_extension(creator_inbox_id.clone(), Purpose::Conversation)?; + let mutable_metadata = build_mutable_metadata_extension_default(creator_inbox_id, opts)?; let group_membership = build_starting_group_membership_extension(context.inbox_id(), 0); let mutable_permissions = build_mutable_permissions_extension(permissions_policy_set)?; let group_config = build_group_config( @@ -321,6 +324,7 @@ impl MlsGroup { now_ns(), membership_state, context.inbox_id(), + None, ); stored_group.store(provider.conn_ref())?; @@ -331,6 +335,56 @@ impl MlsGroup { Ok(new_group) } + // Create a new DM and save it to the DB + pub fn create_dm_and_insert( + context: Arc, + membership_state: GroupMembershipState, + dm_target_inbox_id: InboxId, + ) -> Result { + let conn = context.store.conn()?; + let provider = XmtpOpenMlsProvider::new(conn); + let protected_metadata = + build_dm_protected_metadata_extension(context.inbox_id(), dm_target_inbox_id.clone())?; + let mutable_metadata = + build_dm_mutable_metadata_extension_default(context.inbox_id(), &dm_target_inbox_id)?; + let group_membership = build_starting_group_membership_extension(context.inbox_id(), 0); + let mutable_permissions = PolicySet::new_dm(); + let mutable_permission_extension = + build_mutable_permissions_extension(mutable_permissions)?; + let group_config = build_group_config( + protected_metadata, + mutable_metadata, + group_membership, + mutable_permission_extension, + )?; + + let mls_group = OpenMlsGroup::new( + &provider, + &context.identity.installation_keys, + &group_config, + CredentialWithKey { + credential: context.identity.credential(), + signature_key: context.identity.installation_keys.to_public_vec().into(), + }, + )?; + + let group_id = mls_group.group_id().to_vec(); + let stored_group = StoredGroup::new( + group_id.clone(), + now_ns(), + membership_state, + context.inbox_id(), + Some(dm_target_inbox_id), + ); + + stored_group.store(provider.conn_ref())?; + Ok(Self::new( + context.clone(), + group_id, + stored_group.created_at_ns, + )) + } + // Create a group from a decrypted and decoded welcome message // If the group already exists in the store, overwrite the MLS state and do not update the group entry async fn create_from_welcome( @@ -347,17 +401,40 @@ impl MlsGroup { let mls_group = mls_welcome.into_group(provider)?; let group_id = mls_group.group_id().to_vec(); let metadata = extract_group_metadata(&mls_group)?; + let dm_members = metadata.dm_members; + let dm_inbox_id = if let Some(dm_members) = &dm_members { + if dm_members.member_one_inbox_id == client.inbox_id() { + Some(dm_members.member_two_inbox_id.clone()) + } else { + Some(dm_members.member_one_inbox_id.clone()) + } + } else { + None + }; let group_type = metadata.conversation_type; let to_store = match group_type { - ConversationType::Group | ConversationType::Dm => StoredGroup::new_from_welcome( + ConversationType::Group => StoredGroup::new_from_welcome( group_id.clone(), now_ns(), GroupMembershipState::Pending, added_by_inbox, welcome_id, Purpose::Conversation, + dm_inbox_id, ), + ConversationType::Dm => { + validate_dm_group(client, &mls_group, &added_by_inbox)?; + StoredGroup::new_from_welcome( + group_id.clone(), + now_ns(), + GroupMembershipState::Pending, + added_by_inbox, + welcome_id, + Purpose::Conversation, + dm_inbox_id, + ) + } ConversationType::Sync => StoredGroup::new_from_welcome( group_id.clone(), now_ns(), @@ -365,6 +442,7 @@ impl MlsGroup { added_by_inbox, welcome_id, Purpose::Sync, + dm_inbox_id, ), }; @@ -417,11 +495,12 @@ impl MlsGroup { ) -> Result { let conn = context.store.conn()?; // let my_sequence_id = context.inbox_sequence_id(&conn)?; + let creator_inbox_id = context.inbox_id().to_string(); let provider = XmtpOpenMlsProvider::new(conn); let protected_metadata = - build_protected_metadata_extension(&context.identity, Purpose::Sync)?; + build_protected_metadata_extension(creator_inbox_id.clone(), Purpose::Sync)?; let mutable_metadata = build_mutable_metadata_extension_default( - &context.identity, + creator_inbox_id, GroupMetadataOptions::default(), )?; let group_membership = build_starting_group_membership_extension(context.inbox_id(), 0); @@ -1022,6 +1101,68 @@ impl MlsGroup { Ok(extract_group_permissions(&mls_group)?) } + /// Used for testing that dm group validation works as expected. + /// + /// See the `test_validate_dm_group` test function for more details. + #[cfg(test)] + pub fn create_test_dm_group( + context: Arc, + dm_target_inbox_id: InboxId, + custom_protected_metadata: Option, + custom_mutable_metadata: Option, + custom_group_membership: Option, + custom_mutable_permissions: Option, + ) -> Result { + let conn = context.store.conn()?; + let provider = XmtpOpenMlsProvider::new(conn); + + let protected_metadata = custom_protected_metadata.unwrap_or_else(|| { + build_dm_protected_metadata_extension(context.inbox_id(), dm_target_inbox_id.clone()) + .unwrap() + }); + let mutable_metadata = custom_mutable_metadata.unwrap_or_else(|| { + build_dm_mutable_metadata_extension_default(context.inbox_id(), &dm_target_inbox_id) + .unwrap() + }); + let group_membership = custom_group_membership + .unwrap_or_else(|| build_starting_group_membership_extension(context.inbox_id(), 0)); + let mutable_permissions = custom_mutable_permissions.unwrap_or_else(PolicySet::new_dm); + let mutable_permission_extension = + build_mutable_permissions_extension(mutable_permissions)?; + + let group_config = build_group_config( + protected_metadata, + mutable_metadata, + group_membership, + mutable_permission_extension, + )?; + + let mls_group = OpenMlsGroup::new( + &provider, + &context.identity.installation_keys, + &group_config, + CredentialWithKey { + credential: context.identity.credential(), + signature_key: context.identity.installation_keys.to_public_vec().into(), + }, + )?; + + let group_id = mls_group.group_id().to_vec(); + let stored_group = StoredGroup::new( + group_id.clone(), + now_ns(), + GroupMembershipState::Allowed, // Use Allowed as default for tests + context.inbox_id(), + Some(dm_target_inbox_id), + ); + + stored_group.store(provider.conn_ref())?; + Ok(Self::new( + context.clone(), + group_id, + stored_group.created_at_ns, + )) + } } fn extract_message_v1(message: GroupMessage) -> Result { @@ -1039,14 +1180,30 @@ pub fn extract_group_id(message: &GroupMessage) -> Result, MessageProces } fn build_protected_metadata_extension( - identity: &Identity, + creator_inbox_id: String, group_purpose: Purpose, ) -> Result { let group_type = match group_purpose { Purpose::Conversation => ConversationType::Group, Purpose::Sync => ConversationType::Sync, }; - let metadata = GroupMetadata::new(group_type, identity.inbox_id().clone()); + + let metadata = GroupMetadata::new(group_type, creator_inbox_id, None); + let protected_metadata = Metadata::new(metadata.try_into()?); + + Ok(Extension::ImmutableMetadata(protected_metadata)) +} + +fn build_dm_protected_metadata_extension( + creator_inbox_id: String, + dm_inbox_id: InboxId, +) -> Result { + let dm_members = Some(DmMembers { + member_one_inbox_id: creator_inbox_id.clone(), + member_two_inbox_id: dm_inbox_id, + }); + + let metadata = GroupMetadata::new(ConversationType::Dm, creator_inbox_id, dm_members); let protected_metadata = Metadata::new(metadata.try_into()?); Ok(Extension::ImmutableMetadata(protected_metadata)) @@ -1063,11 +1220,25 @@ fn build_mutable_permissions_extension(policies: PolicySet) -> Result Result { let mutable_metadata: Vec = - GroupMutableMetadata::new_default(identity.inbox_id.clone(), opts).try_into()?; + GroupMutableMetadata::new_default(creator_inbox_id, opts).try_into()?; + let unknown_gc_extension = UnknownExtension(mutable_metadata); + + Ok(Extension::Unknown( + MUTABLE_METADATA_EXTENSION_ID, + unknown_gc_extension, + )) +} + +pub fn build_dm_mutable_metadata_extension_default( + creator_inbox_id: String, + dm_target_inbox_id: &str, +) -> Result { + let mutable_metadata: Vec = + GroupMutableMetadata::new_dm_default(creator_inbox_id, dm_target_inbox_id).try_into()?; let unknown_gc_extension = UnknownExtension(mutable_metadata); Ok(Extension::Unknown( @@ -1304,6 +1475,59 @@ async fn validate_initial_group_membership( Ok(()) } +fn validate_dm_group( + client: &Client, + mls_group: &OpenMlsGroup, + added_by_inbox: &str, +) -> Result<(), GroupError> { + let metadata = extract_group_metadata(mls_group)?; + + // Check if the conversation type is DM + if metadata.conversation_type != ConversationType::Dm { + return Err(GroupError::Generic( + "Invalid conversation type for DM group".to_string(), + )); + } + + // Check if DmMembers are set and validate their contents + if let Some(dm_members) = metadata.dm_members { + let our_inbox_id = client.context.identity.inbox_id().clone(); + if !((dm_members.member_one_inbox_id == added_by_inbox + && dm_members.member_two_inbox_id == our_inbox_id) + || (dm_members.member_one_inbox_id == our_inbox_id + && dm_members.member_two_inbox_id == added_by_inbox)) + { + return Err(GroupError::Generic( + "DM members do not match expected inboxes".to_string(), + )); + } + } else { + return Err(GroupError::Generic( + "DM group must have DmMembers set".to_string(), + )); + } + + // Validate mutable metadata + let mutable_metadata: GroupMutableMetadata = mls_group.try_into()?; + + // Check if the admin list and super admin list are empty + if !mutable_metadata.admin_list.is_empty() || !mutable_metadata.super_admin_list.is_empty() { + return Err(GroupError::Generic( + "DM group must have empty admin and super admin lists".to_string(), + )); + } + + // Validate permissions + let permissions = extract_group_permissions(mls_group)?; + if permissions != GroupMutablePermissions::new(PolicySet::new_dm()) { + return Err(GroupError::Generic( + "Invalid permissions for DM group".to_string(), + )); + } + + Ok(()) +} + fn build_group_join_config() -> MlsGroupJoinConfig { MlsGroupJoinConfig::builder() .wire_format_policy(WireFormatPolicy::default()) @@ -1325,19 +1549,22 @@ mod tests { use crate::{ assert_err, assert_logged, builder::ClientBuilder, - client::MessageProcessingError, + client::{FindGroupParams, MessageProcessingError}, codecs::{group_updated::GroupUpdatedCodec, ContentCodec}, groups::{ - build_group_membership_extension, + build_dm_protected_metadata_extension, build_group_membership_extension, + build_mutable_metadata_extension_default, build_protected_metadata_extension, group_membership::GroupMembership, group_metadata::{ConversationType, GroupMetadata}, group_mutable_metadata::MetadataField, intents::{PermissionPolicyOption, PermissionUpdateType}, members::{GroupMember, PermissionLevel}, - DeliveryStatus, GroupMetadataOptions, PreconfiguredPolicies, UpdateAdminListType, + validate_dm_group, DeliveryStatus, GroupMetadataOptions, PreconfiguredPolicies, + UpdateAdminListType, }, storage::{ consent_record::ConsentState, + group::Purpose, group_intent::{IntentKind, IntentState, NewGroupIntent}, group_message::{GroupMessageKind, StoredGroupMessage}, }, @@ -1346,6 +1573,7 @@ mod tests { }; use super::{ + group_permissions::PolicySet, intents::{Installation, SendWelcomesAction}, GroupError, MlsGroup, }; @@ -1355,7 +1583,7 @@ mod tests { ApiClient: XmtpApi, { client.sync_welcomes().await.unwrap(); - let mut groups = client.find_groups(None, None, None, None).unwrap(); + let mut groups = client.find_groups(FindGroupParams::default()).unwrap(); groups.remove(0) } @@ -1667,7 +1895,7 @@ mod tests { // Bo should not be able to actually read this group bo.sync_welcomes().await.unwrap(); - let groups = bo.find_groups(None, None, None, None).unwrap(); + let groups = bo.find_groups(FindGroupParams::default()).unwrap(); assert_eq!(groups.len(), 0); assert_logged!("failed to create group from welcome", 1); }); @@ -1796,7 +2024,7 @@ mod tests { .expect("send message"); bola_client.sync_welcomes().await.unwrap(); - let bola_groups = bola_client.find_groups(None, None, None, None).unwrap(); + let bola_groups = bola_client.find_groups(FindGroupParams::default()).unwrap(); let bola_group = bola_groups.first().unwrap(); bola_group.sync(&bola_client).await.unwrap(); let bola_messages = bola_group @@ -2149,7 +2377,7 @@ mod tests { .await .unwrap(); bola.sync_welcomes().await.unwrap(); - let bola_groups = bola.find_groups(None, None, None, None).unwrap(); + let bola_groups = bola.find_groups(FindGroupParams::default()).unwrap(); assert_eq!(bola_groups.len(), 1); let bola_group = bola_groups.first().unwrap(); bola_group.sync(&bola).await.unwrap(); @@ -2317,7 +2545,7 @@ mod tests { .await .unwrap(); bola.sync_welcomes().await.unwrap(); - let bola_groups = bola.find_groups(None, None, None, None).unwrap(); + let bola_groups = bola.find_groups(FindGroupParams::default()).unwrap(); assert_eq!(bola_groups.len(), 1); let bola_group = bola_groups.first().unwrap(); bola_group.sync(&bola).await.unwrap(); @@ -2396,7 +2624,7 @@ mod tests { .await .unwrap(); bola.sync_welcomes().await.unwrap(); - let bola_groups = bola.find_groups(None, None, None, None).unwrap(); + let bola_groups = bola.find_groups(FindGroupParams::default()).unwrap(); assert_eq!(bola_groups.len(), 1); let bola_group = bola_groups.first().unwrap(); bola_group.sync(&bola).await.unwrap(); @@ -2412,7 +2640,7 @@ mod tests { // Verify that bola can not add caro because they are not an admin bola.sync_welcomes().await.unwrap(); - let bola_groups = bola.find_groups(None, None, None, None).unwrap(); + let bola_groups = bola.find_groups(FindGroupParams::default()).unwrap(); assert_eq!(bola_groups.len(), 1); let bola_group: &MlsGroup = bola_groups.first().unwrap(); bola_group.sync(&bola).await.unwrap(); @@ -2476,7 +2704,7 @@ mod tests { // Verify that bola can not add charlie because they are not an admin bola.sync_welcomes().await.unwrap(); - let bola_groups = bola.find_groups(None, None, None, None).unwrap(); + let bola_groups = bola.find_groups(FindGroupParams::default()).unwrap(); assert_eq!(bola_groups.len(), 1); let bola_group: &MlsGroup = bola_groups.first().unwrap(); bola_group.sync(&bola).await.unwrap(); @@ -2504,7 +2732,7 @@ mod tests { .await .unwrap(); bola.sync_welcomes().await.unwrap(); - let bola_groups = bola.find_groups(None, None, None, None).unwrap(); + let bola_groups = bola.find_groups(FindGroupParams::default()).unwrap(); assert_eq!(bola_groups.len(), 1); let bola_group = bola_groups.first().unwrap(); bola_group.sync(&bola).await.unwrap(); @@ -2520,7 +2748,7 @@ mod tests { // Verify that bola can not add caro as an admin because they are not a super admin bola.sync_welcomes().await.unwrap(); - let bola_groups = bola.find_groups(None, None, None, None).unwrap(); + let bola_groups = bola.find_groups(FindGroupParams::default()).unwrap(); assert_eq!(bola_groups.len(), 1); let bola_group: &MlsGroup = bola_groups.first().unwrap(); bola_group.sync(&bola).await.unwrap(); @@ -2767,7 +2995,7 @@ mod tests { // Step 3: Verify that Bola can update the group name, and amal sees the update bola.sync_welcomes().await.unwrap(); - let bola_groups = bola.find_groups(None, None, None, None).unwrap(); + let bola_groups = bola.find_groups(FindGroupParams::default()).unwrap(); let bola_group: &MlsGroup = bola_groups.first().unwrap(); bola_group.sync(&bola).await.unwrap(); bola_group @@ -2832,7 +3060,7 @@ mod tests { // Step 3: Bola attemps to add Caro, but fails because group is admin only let caro = ClientBuilder::new_test_client(&generate_local_wallet()).await; bola.sync_welcomes().await.unwrap(); - let bola_groups = bola.find_groups(None, None, None, None).unwrap(); + let bola_groups = bola.find_groups(FindGroupParams::default()).unwrap(); let bola_group: &MlsGroup = bola_groups.first().unwrap(); bola_group.sync(&bola).await.unwrap(); let result = bola_group @@ -2969,6 +3197,78 @@ mod tests { ); } + #[tokio::test(flavor = "multi_thread")] + async fn test_dm_creation() { + let amal = ClientBuilder::new_test_client(&generate_local_wallet()).await; + let bola = ClientBuilder::new_test_client(&generate_local_wallet()).await; + let caro = ClientBuilder::new_test_client(&generate_local_wallet()).await; + + // Amal creates a dm group targetting bola + let amal_dm: MlsGroup = amal.create_dm_by_inbox_id(bola.inbox_id()).await.unwrap(); + + // Amal can not add caro to the dm group + let result = amal_dm + .add_members_by_inbox_id(&amal, vec![caro.inbox_id()]) + .await; + assert!(result.is_err()); + + // Bola is already a member + let result = amal_dm + .add_members_by_inbox_id(&amal, vec![bola.inbox_id(), caro.inbox_id()]) + .await; + assert!(result.is_err()); + amal_dm.sync(&amal).await.unwrap(); + let members = amal_dm.members(&amal).await.unwrap(); + assert_eq!(members.len(), 2); + + // Bola can message amal + let _ = bola.sync_welcomes().await; + let bola_groups = bola + .find_groups(FindGroupParams { + include_dm_groups: true, + ..FindGroupParams::default() + }) + .unwrap(); + let bola_dm: &MlsGroup = bola_groups.first().unwrap(); + bola_dm.send_message(b"test one", &bola).await.unwrap(); + + // Amal sync and reads message + amal_dm.sync(&amal).await.unwrap(); + let messages = amal_dm.find_messages(None, None, None, None, None).unwrap(); + assert_eq!(messages.len(), 2); + let message = messages.last().unwrap(); + assert_eq!(message.decrypted_message_bytes, b"test one"); + + // Amal can not remove bola + let result = amal_dm + .remove_members_by_inbox_id(&amal, vec![bola.inbox_id()]) + .await; + assert!(result.is_err()); + amal_dm.sync(&amal).await.unwrap(); + let members = amal_dm.members(&amal).await.unwrap(); + assert_eq!(members.len(), 2); + + // Neither Amal nor Bola is an admin or super admin + amal_dm.sync(&amal).await.unwrap(); + bola_dm.sync(&bola).await.unwrap(); + let is_amal_admin = amal_dm + .is_admin(amal.inbox_id(), amal.mls_provider().unwrap()) + .unwrap(); + let is_bola_admin = amal_dm + .is_admin(bola.inbox_id(), bola.mls_provider().unwrap()) + .unwrap(); + let is_amal_super_admin = amal_dm + .is_super_admin(amal.inbox_id(), amal.mls_provider().unwrap()) + .unwrap(); + let is_bola_super_admin = amal_dm + .is_super_admin(bola.inbox_id(), bola.mls_provider().unwrap()) + .unwrap(); + assert!(!is_amal_admin); + assert!(!is_bola_admin); + assert!(!is_amal_super_admin); + assert!(!is_bola_super_admin); + } + #[tokio::test(flavor = "multi_thread")] async fn process_messages_abort_on_retryable_error() { let alix = ClientBuilder::new_test_client(&generate_local_wallet()).await; @@ -3285,7 +3585,7 @@ mod tests { .unwrap(); bola.sync_welcomes().await.unwrap(); - let bola_groups = bola.find_groups(None, None, None, None).unwrap(); + let bola_groups = bola.find_groups(FindGroupParams::default()).unwrap(); let bola_group = bola_groups.first().unwrap(); // group consent state should default to unknown for users who did not create the group assert_eq!(bola_group.consent_state().unwrap(), ConsentState::Unknown); @@ -3304,7 +3604,7 @@ mod tests { .unwrap(); caro.sync_welcomes().await.unwrap(); - let caro_groups = caro.find_groups(None, None, None, None).unwrap(); + let caro_groups = caro.find_groups(FindGroupParams::default()).unwrap(); let caro_group = caro_groups.first().unwrap(); caro_group @@ -3316,4 +3616,115 @@ mod tests { // group consent state should be allowed if user publishes a message to the group assert_eq!(caro_group.consent_state().unwrap(), ConsentState::Allowed); } + + #[tokio::test] + async fn test_validate_dm_group() { + let client = ClientBuilder::new_test_client(&generate_local_wallet()).await; + let added_by_inbox = "added_by_inbox_id"; + let creator_inbox_id = client.context.identity.inbox_id().clone(); + let dm_target_inbox_id = added_by_inbox.to_string(); + + // Test case 1: Valid DM group + let valid_dm_group = MlsGroup::create_test_dm_group( + client.context.clone(), + dm_target_inbox_id.clone(), + None, + None, + None, + None, + ) + .unwrap(); + assert!(validate_dm_group( + &client, + &valid_dm_group + .load_mls_group(client.mls_provider().unwrap()) + .unwrap(), + added_by_inbox + ) + .is_ok()); + + // Test case 2: Invalid conversation type + let invalid_protected_metadata = + build_protected_metadata_extension(creator_inbox_id.clone(), Purpose::Conversation) + .unwrap(); + let invalid_type_group = MlsGroup::create_test_dm_group( + client.context.clone(), + dm_target_inbox_id.clone(), + Some(invalid_protected_metadata), + None, + None, + None, + ) + .unwrap(); + assert!(matches!( + validate_dm_group(&client, &invalid_type_group.load_mls_group(client.mls_provider().unwrap()).unwrap(), added_by_inbox), + Err(GroupError::Generic(msg)) if msg.contains("Invalid conversation type") + )); + + // Test case 3: Missing DmMembers + // This case is not easily testable with the current structure, as DmMembers are set in the protected metadata + + // Test case 4: Mismatched DM members + let mismatched_dm_members = build_dm_protected_metadata_extension( + creator_inbox_id.clone(), + "wrong_inbox_id".to_string(), + ) + .unwrap(); + let mismatched_dm_members_group = MlsGroup::create_test_dm_group( + client.context.clone(), + dm_target_inbox_id.clone(), + Some(mismatched_dm_members), + None, + None, + None, + ) + .unwrap(); + assert!(matches!( + validate_dm_group(&client, &mismatched_dm_members_group.load_mls_group(client.mls_provider().unwrap()).unwrap(), added_by_inbox), + Err(GroupError::Generic(msg)) if msg.contains("DM members do not match expected inboxes") + )); + + // Test case 5: Non-empty admin list + let non_empty_admin_list = build_mutable_metadata_extension_default( + creator_inbox_id.clone(), + GroupMetadataOptions::default(), + ) + .unwrap(); + let non_empty_admin_list_group = MlsGroup::create_test_dm_group( + client.context.clone(), + dm_target_inbox_id.clone(), + None, + Some(non_empty_admin_list), + None, + None, + ) + .unwrap(); + assert!(matches!( + validate_dm_group(&client, &non_empty_admin_list_group.load_mls_group(client.mls_provider().unwrap()).unwrap(), added_by_inbox), + Err(GroupError::Generic(msg)) if msg.contains("DM group must have empty admin and super admin lists") + )); + + // Test case 6: Non-empty super admin list + // Similar to test case 5, but with super_admin_list + + // Test case 7: Invalid permissions + let invalid_permissions = PolicySet::default(); + let invalid_permissions_group = MlsGroup::create_test_dm_group( + client.context.clone(), + dm_target_inbox_id.clone(), + None, + None, + None, + Some(invalid_permissions), + ) + .unwrap(); + assert!(matches!( + validate_dm_group( + &client, + &invalid_permissions_group.load_mls_group(client.mls_provider().unwrap()).unwrap(), + added_by_inbox + ), + Err(GroupError::Generic(msg)) if msg.contains("Invalid permissions for DM group") + )); + } } diff --git a/xmtp_mls/src/groups/validated_commit.rs b/xmtp_mls/src/groups/validated_commit.rs index 3ce770bc4..ead219534 100644 --- a/xmtp_mls/src/groups/validated_commit.rs +++ b/xmtp_mls/src/groups/validated_commit.rs @@ -32,7 +32,7 @@ use crate::{ use super::{ group_membership::{GroupMembership, MembershipDiff}, - group_metadata::{GroupMetadata, GroupMetadataError}, + group_metadata::{DmMembers, GroupMetadata, GroupMetadataError}, group_mutable_metadata::{ find_mutable_metadata_extension, GroupMutableMetadata, GroupMutableMetadataError, }, @@ -210,6 +210,7 @@ pub struct ValidatedCommit { pub removed_inboxes: Vec, pub metadata_changes: MutableMetadataChanges, pub permissions_changed: bool, + pub dm_members: Option, } impl ValidatedCommit { @@ -326,6 +327,7 @@ impl ValidatedCommit { removed_inboxes, metadata_changes, permissions_changed, + dm_members: immutable_metadata.dm_members, }; let policy_set = extract_group_permissions(openmls_group)?; diff --git a/xmtp_mls/src/storage/encrypted_store/group.rs b/xmtp_mls/src/storage/encrypted_store/group.rs index 1142e7dff..717191218 100644 --- a/xmtp_mls/src/storage/encrypted_store/group.rs +++ b/xmtp_mls/src/storage/encrypted_store/group.rs @@ -39,6 +39,8 @@ pub struct StoredGroup { pub added_by_inbox_id: String, /// The sequence id of the welcome message pub welcome_id: Option, + /// The inbox_id of the DM target + pub dm_inbox_id: Option, } impl_fetch!(StoredGroup, groups, Vec); @@ -53,6 +55,7 @@ impl StoredGroup { added_by_inbox_id: String, welcome_id: i64, purpose: Purpose, + dm_inbox_id: Option, ) -> Self { Self { id, @@ -62,6 +65,7 @@ impl StoredGroup { purpose, added_by_inbox_id, welcome_id: Some(welcome_id), + dm_inbox_id, } } @@ -71,6 +75,7 @@ impl StoredGroup { created_at_ns: i64, membership_state: GroupMembershipState, added_by_inbox_id: String, + dm_inbox_id: Option, ) -> Self { Self { id, @@ -80,6 +85,7 @@ impl StoredGroup { purpose: Purpose::Conversation, added_by_inbox_id, welcome_id: None, + dm_inbox_id, } } @@ -98,6 +104,7 @@ impl StoredGroup { purpose: Purpose::Sync, added_by_inbox_id: "".into(), welcome_id: None, + dm_inbox_id: None, } } } @@ -110,6 +117,7 @@ impl DbConnection { created_after_ns: Option, created_before_ns: Option, limit: Option, + include_dm_groups: bool, ) -> Result, StorageError> { let mut query = dsl::groups.order(dsl::created_at_ns.asc()).into_boxed(); @@ -129,6 +137,10 @@ impl DbConnection { query = query.limit(limit); } + if !include_dm_groups { + query = query.filter(dsl::dm_inbox_id.is_null()); + } + query = query.filter(dsl::purpose.eq(Purpose::Conversation)); Ok(self.raw_query(|conn| query.load(conn))?) @@ -336,6 +348,22 @@ pub(crate) mod tests { created_at_ns, membership_state, "placeholder_address".to_string(), + None, + ) + } + + /// Generate a test dm group + pub fn generate_dm(state: Option) -> StoredGroup { + let id = rand_vec(); + let created_at_ns = now_ns(); + let membership_state = state.unwrap_or(GroupMembershipState::Allowed); + let dm_inbox_id = Some("placeholder_inbox_id".to_string()); + StoredGroup::new( + id, + created_at_ns, + membership_state, + "placeholder_address".to_string(), + dm_inbox_id, ) } @@ -397,23 +425,31 @@ pub(crate) mod tests { test_group_1.store(conn).unwrap(); let test_group_2 = generate_group(Some(GroupMembershipState::Allowed)); test_group_2.store(conn).unwrap(); + let test_group_3 = generate_dm(Some(GroupMembershipState::Allowed)); + test_group_3.store(conn).unwrap(); - let all_results = conn.find_groups(None, None, None, None).unwrap(); + let all_results = conn.find_groups(None, None, None, None, false).unwrap(); assert_eq!(all_results.len(), 2); let pending_results = conn - .find_groups(Some(vec![GroupMembershipState::Pending]), None, None, None) + .find_groups( + Some(vec![GroupMembershipState::Pending]), + None, + None, + None, + false, + ) .unwrap(); assert_eq!(pending_results[0].id, test_group_1.id); assert_eq!(pending_results.len(), 1); // Offset and limit - let results_with_limit = conn.find_groups(None, None, None, Some(1)).unwrap(); + let results_with_limit = conn.find_groups(None, None, None, Some(1), false).unwrap(); assert_eq!(results_with_limit.len(), 1); assert_eq!(results_with_limit[0].id, test_group_1.id); let results_with_created_at_ns_after = conn - .find_groups(None, Some(test_group_1.created_at_ns), None, Some(1)) + .find_groups(None, Some(test_group_1.created_at_ns), None, Some(1), false) .unwrap(); assert_eq!(results_with_created_at_ns_after.len(), 1); assert_eq!(results_with_created_at_ns_after[0].id, test_group_2.id); @@ -422,7 +458,10 @@ pub(crate) mod tests { let synced_groups = conn.find_sync_groups().unwrap(); assert_eq!(synced_groups.len(), 0); - // test that ONLY normal groups show up. + // test that dm groups are included + let dm_results = conn.find_groups(None, None, None, None, true).unwrap(); + assert_eq!(dm_results.len(), 3); + assert_eq!(dm_results[2].id, test_group_3.id); }) } diff --git a/xmtp_mls/src/storage/encrypted_store/group_intent.rs b/xmtp_mls/src/storage/encrypted_store/group_intent.rs index 4ff7615e7..b5104f9ac 100644 --- a/xmtp_mls/src/storage/encrypted_store/group_intent.rs +++ b/xmtp_mls/src/storage/encrypted_store/group_intent.rs @@ -404,6 +404,7 @@ mod tests { 100, GroupMembershipState::Allowed, "placeholder_address".to_string(), + None, ); group.store(conn).unwrap(); } diff --git a/xmtp_mls/src/storage/encrypted_store/mod.rs b/xmtp_mls/src/storage/encrypted_store/mod.rs index 8cd5baf9e..fb3561b72 100644 --- a/xmtp_mls/src/storage/encrypted_store/mod.rs +++ b/xmtp_mls/src/storage/encrypted_store/mod.rs @@ -681,6 +681,7 @@ mod tests { 0, GroupMembershipState::Allowed, "goodbye".to_string(), + None, ); group.store(connection)?; Ok(()) @@ -732,6 +733,7 @@ mod tests { 0, GroupMembershipState::Allowed, "goodbye".to_string(), + None, ); group.store(conn1).unwrap(); diff --git a/xmtp_mls/src/storage/encrypted_store/schema.rs b/xmtp_mls/src/storage/encrypted_store/schema.rs index 7d835a5b9..7266406cd 100644 --- a/xmtp_mls/src/storage/encrypted_store/schema.rs +++ b/xmtp_mls/src/storage/encrypted_store/schema.rs @@ -53,6 +53,7 @@ diesel::table! { purpose -> Integer, added_by_inbox_id -> Text, welcome_id -> Nullable, + dm_inbox_id -> Nullable, } } diff --git a/xmtp_mls/src/subscriptions.rs b/xmtp_mls/src/subscriptions.rs index 2e19338a3..85449af0c 100644 --- a/xmtp_mls/src/subscriptions.rs +++ b/xmtp_mls/src/subscriptions.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, sync::Arc}; +use crate::xmtp_openmls_provider::XmtpOpenMlsProvider; use futures::{FutureExt, Stream, StreamExt}; use prost::Message; use tokio::{sync::oneshot, task::JoinHandle}; @@ -9,7 +10,7 @@ use xmtp_proto::xmtp::mls::api::v1::WelcomeMessage; use crate::{ api::GroupFilter, client::{extract_welcome_message, ClientError}, - groups::{extract_group_id, GroupError, MlsGroup}, + groups::{extract_group_id, group_metadata::ConversationType, GroupError, MlsGroup}, retry::Retry, retry_async, storage::{group::StoredGroup, group_message::StoredGroupMessage}, @@ -131,18 +132,42 @@ where pub async fn stream_conversations( &self, + include_dm: bool, ) -> Result + '_, ClientError> { + let provider = Arc::new(self.context.mls_provider()?); + let event_queue = tokio_stream::wrappers::BroadcastStream::new(self.local_events.subscribe()); - let event_queue = event_queue.filter_map(|event| async move { - match event { - Ok(LocalEvents::NewGroup(g)) => Some(g), - Err(BroadcastStreamRecvError::Lagged(missed)) => { - tracing::warn!("Missed {missed} messages due to local event queue lagging"); + // Helper function for filtering Dm groups + let filter_group = move |group: MlsGroup, provider: Arc| async move { + match group.metadata(provider.as_ref()) { + Ok(metadata) => { + if include_dm || metadata.conversation_type != ConversationType::Dm { + Some(group) + } else { + None + } + } + Err(err) => { + tracing::error!("Error processing group metadata: {:?}", err); None } } + }; + + let event_provider = Arc::clone(&provider); + let event_queue = event_queue.filter_map(move |event| { + let provider = Arc::clone(&event_provider); + async move { + match event { + Ok(LocalEvents::NewGroup(group)) => filter_group(group, provider).await, + Err(BroadcastStreamRecvError::Lagged(missed)) => { + tracing::warn!("Missed {missed} messages due to local event queue lagging"); + None + } + } + } }); let installation_key = self.installation_public_key(); @@ -154,20 +179,24 @@ where .subscribe_welcome_messages(installation_key, Some(id_cursor)) .await?; + let stream_provider = Arc::clone(&provider); let stream = subscription .map(|welcome| async { tracing::info!("Received conversation streaming payload"); self.process_streamed_welcome(welcome?).await }) - .filter_map(|res| async { - match res.await { - Ok(group) => Some(group), - Err(err) => { - tracing::error!( - "Error processing stream entry for conversation: {:?}", - err - ); - None + .filter_map(move |res| { + let provider = Arc::clone(&stream_provider); + async move { + match res.await { + Ok(group) => filter_group(group, provider).await, + Err(err) => { + tracing::error!( + "Error processing stream entry for conversation: {:?}", + err + ); + None + } } } }); @@ -239,11 +268,12 @@ where pub fn stream_conversations_with_callback( client: Arc>, mut convo_callback: impl FnMut(MlsGroup) + Send + 'static, + include_dm: bool, ) -> StreamHandle> { let (tx, rx) = oneshot::channel(); let handle = tokio::spawn(async move { - let stream = client.stream_conversations().await?; + let stream = client.stream_conversations(include_dm).await?; futures::pin_mut!(stream); let _ = tx.send(()); while let Some(convo) = stream.next().await { @@ -293,7 +323,7 @@ where let mut group_id_to_info = self .store() .conn()? - .find_groups(None, None, None, None)? + .find_groups(None, None, None, None, false)? .into_iter() .map(Into::into) .collect::, MessagesStreamInfo>>(); @@ -305,7 +335,8 @@ where futures::pin_mut!(messages_stream); tracing::info!("Setting up conversation stream in stream_all_messages"); - let convo_stream = self.stream_conversations().await?; + let convo_stream = self.stream_conversations(true).await?; + futures::pin_mut!(convo_stream); let mut extra_messages = Vec::new(); @@ -424,7 +455,7 @@ mod tests { let mut stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx); let bob_ptr = bob.clone(); tokio::spawn(async move { - let bob_stream = bob_ptr.stream_conversations().await.unwrap(); + let bob_stream = bob_ptr.stream_conversations(true).await.unwrap(); futures::pin_mut!(bob_stream); while let Some(item) = bob_stream.next().await { let _ = tx.send(item); @@ -734,12 +765,15 @@ mod tests { let notify = Delivery::new(None); let (notify_pointer, groups_pointer) = (notify.clone(), groups.clone()); - let closer = - Client::::stream_conversations_with_callback(alix.clone(), move |g| { + let closer = Client::::stream_conversations_with_callback( + alix.clone(), + move |g| { let mut groups = groups_pointer.lock(); groups.push(g); notify_pointer.notify_one(); - }); + }, + false, + ); alix.create_group(None, GroupMetadataOptions::default()) .unwrap(); @@ -771,4 +805,67 @@ mod tests { closer.handle.abort(); } + + #[tokio::test(flavor = "multi_thread")] + async fn test_dm_streaming() { + let alix = Arc::new(ClientBuilder::new_test_client(&generate_local_wallet()).await); + let bo = Arc::new(ClientBuilder::new_test_client(&generate_local_wallet()).await); + + let groups = Arc::new(Mutex::new(Vec::new())); + // Wait for 2 seconds for the group creation to be streamed + let notify = Delivery::new(Some(std::time::Duration::from_secs(1))); + let (notify_pointer, groups_pointer) = (notify.clone(), groups.clone()); + + // Start a stream with enableDm set to false + let closer = Client::::stream_conversations_with_callback( + alix.clone(), + move |g| { + let mut groups = groups_pointer.lock(); + groups.push(g); + notify_pointer.notify_one(); + }, + false, + ); + + alix.create_dm_by_inbox_id(bo.inbox_id()).await.unwrap(); + + let result = notify.wait_for_delivery().await; + assert!(result.is_err(), "Stream unexpectedly received a DM group"); + + closer.handle.abort(); + + // Start a stream with enableDm set to true + let groups = Arc::new(Mutex::new(Vec::new())); + // Wait for 2 seconds for the group creation to be streamed + let notify = Delivery::new(Some(std::time::Duration::from_secs(60))); + let (notify_pointer, groups_pointer) = (notify.clone(), groups.clone()); + let closer = Client::::stream_conversations_with_callback( + alix.clone(), + move |g| { + let mut groups = groups_pointer.lock(); + groups.push(g); + notify_pointer.notify_one(); + }, + true, + ); + + alix.create_dm_by_inbox_id(bo.inbox_id()).await.unwrap(); + notify.wait_for_delivery().await.unwrap(); + { + let grps = groups.lock(); + assert_eq!(grps.len(), 1); + } + + let dm = bo.create_dm_by_inbox_id(alix.inbox_id()).await.unwrap(); + dm.add_members_by_inbox_id(&bo, vec![alix.inbox_id()]) + .await + .unwrap(); + notify.wait_for_delivery().await.unwrap(); + { + let grps = groups.lock(); + assert_eq!(grps.len(), 2); + } + + closer.handle.abort(); + } }