From 03ba9622f5b5ecce8fed854c7118277ec1caccf9 Mon Sep 17 00:00:00 2001 From: Mojtaba Chenani Date: Thu, 19 Dec 2024 19:05:11 +0100 Subject: [PATCH 1/7] wip --- Cargo.lock | 1 - bindings_ffi/src/mls.rs | 20 ++ xmtp_mls/src/client.rs | 26 +++ xmtp_mls/src/groups/mod.rs | 2 + .../encrypted_store/conversation_list.rs | 179 ++++++++++++++++++ xmtp_mls/src/storage/encrypted_store/group.rs | 9 +- .../storage/encrypted_store/group_message.rs | 2 +- xmtp_mls/src/storage/encrypted_store/mod.rs | 1 + .../src/storage/encrypted_store/schema.rs | 22 +++ 9 files changed, 259 insertions(+), 3 deletions(-) create mode 100644 xmtp_mls/src/storage/encrypted_store/conversation_list.rs diff --git a/Cargo.lock b/Cargo.lock index 5627ff5d8..cf5f4a245 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7518,7 +7518,6 @@ dependencies = [ "tracing-oslog", "tracing-subscriber", "uniffi", - "url", "uuid 1.11.0", "xmtp_api_grpc", "xmtp_common", diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 339c9c23e..b2bcaa262 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -973,6 +973,20 @@ impl FfiConversations { Ok(convo_list) } + pub async fn list_conversations( + &self, + opts: FfiListConversationsOptions, + ) -> Result>, GenericError> { + let inner = self.inner_client.as_ref(); + let convo_list: Vec> = inner + .list_conversations()? + .into_iter() + .map(|group| Arc::new(group.into())) + .collect(); + + Ok(convo_list) + } + pub async fn list_groups( &self, opts: FfiListConversationsOptions, @@ -1149,6 +1163,12 @@ pub struct FfiConversation { inner: MlsGroup, } +#[derive(uniffi::Object)] +pub struct FfiConversationListItem { + conversation: FfiConversation, + last_message: FfiMessage +} + impl From> for FfiConversation { fn from(mls_group: MlsGroup) -> FfiConversation { FfiConversation { inner: mls_group } diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index 02327f11b..09fa09142 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -684,6 +684,32 @@ where .collect()) } + pub fn list_conversations( + &self, + ) -> Result, StoredGroupMessage)>, ClientError> { + Ok(self + .store() + .conn()? + .fetch_conversation_list()? + .into_iter() + .map(|conversation_item| { + ( + StoredGroupMessage { + id: conversation_item.message_id?, + group_id: conversation_item.id.clone(), + decrypted_message_bytes: conversation_item.decrypted_message_bytes?, + sent_at_ns: conversation_item.sent_at_ns?, + sender_installation_id: conversation_item.sender_installation_id?, + sender_inbox_id: conversation_item.sender_inbox_id?, + kind: conversation_item.kind?, + delivery_status: conversation_item.delivery_status?, + }, + MlsGroup::new(self.clone(), conversation_item.id, conversation_item.created_at_ns), + ) + }) + .collect()) + } + /// Upload a Key Package to the network and publish the signed identity update /// from the provided SignatureRequest pub async fn register_identity( diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 35662bc7d..f9b3e1e7a 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -272,6 +272,7 @@ impl RetryableError for GroupError { pub struct MlsGroup { pub group_id: Vec, pub created_at_ns: i64, + pub last_message: Option, pub client: Arc, mutex: Arc>, } @@ -289,6 +290,7 @@ impl Clone for MlsGroup { Self { group_id: self.group_id.clone(), created_at_ns: self.created_at_ns, + last_message: self.last_message.clone(), client: self.client.clone(), mutex: self.mutex.clone(), } diff --git a/xmtp_mls/src/storage/encrypted_store/conversation_list.rs b/xmtp_mls/src/storage/encrypted_store/conversation_list.rs new file mode 100644 index 000000000..ef12837d7 --- /dev/null +++ b/xmtp_mls/src/storage/encrypted_store/conversation_list.rs @@ -0,0 +1,179 @@ +use crate::storage::group::{ConversationType, GroupMembershipState}; +use crate::storage::group_message::{DeliveryStatus, GroupMessageKind}; +use crate::storage::schema::conversation_list::dsl::conversation_list; +use crate::storage::{DbConnection, StorageError}; +use crate::{Fetch, FetchListWithKey}; +use diesel::{Identifiable, QueryDsl, Queryable, RunQueryDsl, Table}; +use serde::{Deserialize, Serialize}; + +#[derive(Queryable, Debug, Clone, Deserialize, Serialize)] +#[diesel(table_name = conversation_list)] +#[diesel(primary_key(id))] +/// Combined view of a group and its messages, now named `conversation_list`. +pub struct ConversationListItem { + /// group_id + pub id: Vec, + /// Based on timestamp of the welcome message + pub created_at_ns: i64, + /// Enum, [`GroupMembershipState`] representing access to the group + pub membership_state: GroupMembershipState, + /// Track when the latest, most recent installations were checked + pub installations_last_checked: i64, + /// The inbox_id of who added the user to the group + pub added_by_inbox_id: String, + /// The sequence id of the welcome message + pub welcome_id: Option, + /// The inbox_id of the DM target + pub dm_inbox_id: Option, + /// The last time the leaf node encryption key was rotated + pub rotated_at_ns: i64, + /// Enum, [`ConversationType`] signifies the group conversation type which extends to who can access it. + pub conversation_type: ConversationType, + /// Id of the message. Nullable because not every group has messages. + pub message_id: Option>, + /// Contents of message after decryption. + pub decrypted_message_bytes: Option>, + /// Time in nanoseconds the message was sent. + pub sent_at_ns: Option, + /// Group Message Kind Enum: 1 = Application, 2 = MembershipChange + pub kind: Option, + /// The ID of the App Installation this message was sent from. + pub sender_installation_id: Option>, + /// The Inbox ID of the Sender + pub sender_inbox_id: Option, + /// We optimistically store messages before sending. + pub delivery_status: Option, +} + +impl DbConnection { + pub fn fetch_conversation_list(&self) -> Result, StorageError> { + let query = conversation_list + .select(conversation_list::all_columns()) + .into_boxed(); + Ok(self.raw_query(|conn| query.load::(conn))?) + } +} + +#[cfg(test)] +pub(crate) mod tests { + use crate::storage::group::tests::{generate_group, generate_group_with_created_at}; + use crate::storage::tests::with_connection; + use crate::Store; + use wasm_bindgen_test::wasm_bindgen_test; + + #[wasm_bindgen_test(unsupported = tokio::test)] + async fn test_single_group_multiple_messages() { + with_connection(|conn| { + // Create a group + let group = generate_group(None); + group.store(conn).unwrap(); + + // Insert multiple messages into the group + for i in 1..5 { + let message = + crate::storage::encrypted_store::group_message::tests::generate_message( + None, + Some(&group.id), + Some(i * 1000), // Increment timestamp for each message + ); + message.store(conn).unwrap(); + } + + // Fetch the conversation list + let conversation_list = conn.fetch_conversation_list().unwrap(); + assert_eq!(conversation_list.len(), 1, "Should return one group"); + assert_eq!( + conversation_list[0].id, group.id, + "Returned group ID should match the created group" + ); + assert_eq!( + conversation_list[0].sent_at_ns.unwrap(), + 4000, + "Last message should be the most recent one" + ); + }) + .await + } + #[wasm_bindgen_test(unsupported = tokio::test)] + async fn test_three_groups_specific_ordering() { + with_connection(|conn| { + // Create three groups + let group_a = generate_group_with_created_at(None,5000); // Created after last message + let group_b = generate_group_with_created_at(None,2000); // Created before last message + let group_c = generate_group_with_created_at(None,1000); // Created before last message with no messages + + group_a.store(conn).unwrap(); + group_b.store(conn).unwrap(); + group_c.store(conn).unwrap(); + // Add a message to group_b + let message = crate::storage::encrypted_store::group_message::tests::generate_message( + None, + Some(&group_b.id), + Some(3000), // Last message timestamp + ); + message.store(conn).unwrap(); + + // Fetch the conversation list + let conversation_list = conn.fetch_conversation_list().unwrap(); + + assert_eq!(conversation_list.len(), 3, "Should return all three groups"); + assert_eq!( + conversation_list[0].id, group_a.id, + "Group created after the last message should come first" + ); + assert_eq!( + conversation_list[1].id, group_b.id, + "Group with the last message should come second" + ); + assert_eq!( + conversation_list[2].id, group_c.id, + "Group created before the last message with no messages should come last" + ); + }) + .await + } + #[wasm_bindgen_test(unsupported = tokio::test)] + async fn test_group_with_newer_message_update() { + with_connection(|conn| { + // Create a group + let group = generate_group(None); + group.store(conn).unwrap(); + + // Add an initial message + let first_message = + crate::storage::encrypted_store::group_message::tests::generate_message( + None, + Some(&group.id), + Some(1000), + ); + first_message.store(conn).unwrap(); + + // Fetch the conversation list and check last message + let mut conversation_list = conn.fetch_conversation_list().unwrap(); + assert_eq!(conversation_list.len(), 1, "Should return one group"); + assert_eq!( + conversation_list[0].sent_at_ns.unwrap(), + 1000, + "Last message should match the first message" + ); + + // Add a newer message + let second_message = + crate::storage::encrypted_store::group_message::tests::generate_message( + None, + Some(&group.id), + Some(2000), + ); + second_message.store(conn).unwrap(); + + // Fetch the conversation list again and validate the last message is updated + conversation_list = conn.fetch_conversation_list().unwrap(); + assert_eq!( + conversation_list[0].sent_at_ns.unwrap(), + 2000, + "Last message should now match the second (newest) message" + ); + }) + .await + } +} diff --git a/xmtp_mls/src/storage/encrypted_store/group.rs b/xmtp_mls/src/storage/encrypted_store/group.rs index 010eab713..971034702 100644 --- a/xmtp_mls/src/storage/encrypted_store/group.rs +++ b/xmtp_mls/src/storage/encrypted_store/group.rs @@ -558,8 +558,15 @@ pub(crate) mod tests { /// Generate a test group pub fn generate_group(state: Option) -> StoredGroup { + // Default behavior: Use `now_ns()` as the creation time + generate_group_with_created_at(state, now_ns()) + } + + pub fn generate_group_with_created_at( + state: Option, + created_at_ns: i64, + ) -> StoredGroup { let id = rand_vec::<24>(); - let created_at_ns = now_ns(); let membership_state = state.unwrap_or(GroupMembershipState::Allowed); StoredGroup::new( id, diff --git a/xmtp_mls/src/storage/encrypted_store/group_message.rs b/xmtp_mls/src/storage/encrypted_store/group_message.rs index 743800d79..6410a4e86 100644 --- a/xmtp_mls/src/storage/encrypted_store/group_message.rs +++ b/xmtp_mls/src/storage/encrypted_store/group_message.rs @@ -290,7 +290,7 @@ pub(crate) mod tests { use wasm_bindgen_test::wasm_bindgen_test; use xmtp_common::{assert_err, assert_ok, rand_time, rand_vec}; - fn generate_message( + pub fn generate_message( kind: Option, group_id: Option<&[u8]>, sent_at_ns: Option, diff --git a/xmtp_mls/src/storage/encrypted_store/mod.rs b/xmtp_mls/src/storage/encrypted_store/mod.rs index e89e306bd..72cc3751a 100644 --- a/xmtp_mls/src/storage/encrypted_store/mod.rs +++ b/xmtp_mls/src/storage/encrypted_store/mod.rs @@ -28,6 +28,7 @@ pub mod schema; mod sqlcipher_connection; pub mod user_preferences; pub mod wallet_addresses; +mod conversation_list; #[cfg(target_arch = "wasm32")] mod wasm; diff --git a/xmtp_mls/src/storage/encrypted_store/schema.rs b/xmtp_mls/src/storage/encrypted_store/schema.rs index c82818161..3834e0443 100644 --- a/xmtp_mls/src/storage/encrypted_store/schema.rs +++ b/xmtp_mls/src/storage/encrypted_store/schema.rs @@ -121,6 +121,27 @@ diesel::table! { } } +diesel::table!{ + conversation_list (id) { + id -> Binary, + created_at_ns -> BigInt, + membership_state -> Integer, + installations_last_checked -> BigInt, + added_by_inbox_id -> Text, + welcome_id -> Nullable, + dm_inbox_id -> Nullable, + rotated_at_ns -> BigInt, + conversation_type -> Integer, + message_id -> Nullable, + decrypted_message_bytes -> Nullable, + sent_at_ns -> Nullable, + message_kind -> Nullable, + sender_installation_id -> Nullable, + sender_inbox_id -> Nullable, + delivery_status -> Nullable, + } +} + diesel::joinable!(group_intents -> groups (group_id)); diesel::joinable!(group_messages -> groups (group_id)); @@ -138,4 +159,5 @@ diesel::allow_tables_to_appear_in_same_query!( refresh_state, user_preferences, wallet_addresses, + conversation_list ); From 6cb3b20dc33f60587f38a598895fdb762e059fc3 Mon Sep 17 00:00:00 2001 From: Mojtaba Chenani Date: Thu, 19 Dec 2024 19:06:24 +0100 Subject: [PATCH 2/7] add migrations --- .../down.sql | 1 + .../up.sql | 39 +++++++++++++++++++ 2 files changed, 40 insertions(+) create mode 100644 xmtp_mls/migrations/2024-12-18-160859_create_conversation_list_view/down.sql create mode 100644 xmtp_mls/migrations/2024-12-18-160859_create_conversation_list_view/up.sql diff --git a/xmtp_mls/migrations/2024-12-18-160859_create_conversation_list_view/down.sql b/xmtp_mls/migrations/2024-12-18-160859_create_conversation_list_view/down.sql new file mode 100644 index 000000000..64a220cd8 --- /dev/null +++ b/xmtp_mls/migrations/2024-12-18-160859_create_conversation_list_view/down.sql @@ -0,0 +1 @@ +DROP VIEW IF EXISTS conversation_list; \ No newline at end of file diff --git a/xmtp_mls/migrations/2024-12-18-160859_create_conversation_list_view/up.sql b/xmtp_mls/migrations/2024-12-18-160859_create_conversation_list_view/up.sql new file mode 100644 index 000000000..0cb4bdf08 --- /dev/null +++ b/xmtp_mls/migrations/2024-12-18-160859_create_conversation_list_view/up.sql @@ -0,0 +1,39 @@ +CREATE VIEW conversation_list AS +WITH ranked_messages AS ( + SELECT + gm.group_id, + gm.id AS message_id, + gm.decrypted_message_bytes, + gm.sent_at_ns, + gm.kind AS message_kind, + gm.sender_installation_id, + gm.sender_inbox_id, + gm.delivery_status, + ROW_NUMBER() OVER (PARTITION BY gm.group_id ORDER BY gm.sent_at_ns DESC) AS row_num + FROM + group_messages gm + WHERE + gm.kind = 1 +) +SELECT + g.id AS id, + g.created_at_ns, + g.membership_state, + g.installations_last_checked, + g.added_by_inbox_id, + g.welcome_id, + g.dm_inbox_id, + g.rotated_at_ns, + g.conversation_type, + rm.message_id, + rm.decrypted_message_bytes, + rm.sent_at_ns, + rm.message_kind, + rm.sender_installation_id, + rm.sender_inbox_id, + rm.delivery_status +FROM + groups g + LEFT JOIN ranked_messages rm + ON g.id = rm.group_id AND rm.row_num = 1 +ORDER BY COALESCE(rm.sent_at_ns, g.created_at_ns) DESC; \ No newline at end of file From 0396ec0e3361593081e0bc201a8f709ba2803633 Mon Sep 17 00:00:00 2001 From: Mojtaba Chenani Date: Thu, 19 Dec 2024 19:15:34 +0100 Subject: [PATCH 3/7] Update xmtp_mls/src/storage/encrypted_store/group_message.rs Co-authored-by: Dakota Brink <779390+codabrink@users.noreply.github.com> --- xmtp_mls/src/storage/encrypted_store/group_message.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xmtp_mls/src/storage/encrypted_store/group_message.rs b/xmtp_mls/src/storage/encrypted_store/group_message.rs index 6410a4e86..7365ffd67 100644 --- a/xmtp_mls/src/storage/encrypted_store/group_message.rs +++ b/xmtp_mls/src/storage/encrypted_store/group_message.rs @@ -290,7 +290,7 @@ pub(crate) mod tests { use wasm_bindgen_test::wasm_bindgen_test; use xmtp_common::{assert_err, assert_ok, rand_time, rand_vec}; - pub fn generate_message( + pub(crate) fn generate_message( kind: Option, group_id: Option<&[u8]>, sent_at_ns: Option, From 7aec1c4590e5effd7f1076e86d868bd53bc25c50 Mon Sep 17 00:00:00 2001 From: Mojtaba Chenani Date: Fri, 20 Dec 2024 10:35:31 +0100 Subject: [PATCH 4/7] add tests to list_conversations function --- bindings_ffi/src/mls.rs | 91 ++++++++++++++++++++++++++++++++++++-- xmtp_mls/src/client.rs | 34 +++++++++----- xmtp_mls/src/groups/mod.rs | 2 - 3 files changed, 110 insertions(+), 17 deletions(-) diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index a4082e5d8..5283116bd 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -967,13 +967,19 @@ impl FfiConversations { pub async fn list_conversations( &self, - opts: FfiListConversationsOptions, ) -> Result>, GenericError> { let inner = self.inner_client.as_ref(); let convo_list: Vec> = inner .list_conversations()? .into_iter() - .map(|group| Arc::new(group.into())) + .filter_map(|(mls_group, stored_message)| { + let last_message = stored_message.map(|msg| msg.into()); + + Some(Arc::new(FfiConversationListItem { + conversation: mls_group.into(), + last_message, + })) + }) .collect(); Ok(convo_list) @@ -1158,7 +1164,7 @@ pub struct FfiConversation { #[derive(uniffi::Object)] pub struct FfiConversationListItem { conversation: FfiConversation, - last_message: FfiMessage + last_message: Option } impl From> for FfiConversation { @@ -2686,6 +2692,85 @@ mod tests { assert!(stream_messages.is_closed()); } + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + async fn test_list_conversations_last_message() { + // Step 1: Setup test client Alix and bo + let alix = new_test_client().await; + let bo = new_test_client().await; + + // Step 2: Create a group and add messages + let alix_conversations = alix.conversations(); + + // Create a group + let group = alix_conversations + .create_group( + vec![bo.account_address.clone()], + FfiCreateGroupOptions::default(), + ) + .await + .unwrap(); + + // Add messages to the group + group.send("First message".as_bytes().to_vec()).await.unwrap(); + group.send("Second message".as_bytes().to_vec()).await.unwrap(); + + // Step 3: Synchronize conversations + alix_conversations.sync_all_conversations(None).await.unwrap(); + + // Step 4: List conversations and verify + let conversations = alix_conversations + .list_conversations() + .await + .unwrap(); + + // Ensure the group is included + assert_eq!(conversations.len(), 1, "Alix should have exactly 1 group"); + + let last_message = conversations[0].last_message.as_ref().unwrap(); + assert_eq!( + last_message.content, + "Second message".as_bytes().to_vec(), + "Last message content should be the most recent" + ); + + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + async fn test_list_conversations_no_messages() { + // Step 1: Setup test clients Alix and Bo + let alix = new_test_client().await; + let bo = new_test_client().await; + + let alix_conversations = alix.conversations(); + + // Step 2: Create a group with Bo but do not send messages + let group = alix_conversations + .create_group( + vec![bo.account_address.clone()], + FfiCreateGroupOptions::default(), + ) + .await + .unwrap(); + + // Step 3: Synchronize conversations + alix_conversations.sync_all_conversations(None).await.unwrap(); + + // Step 4: List conversations and verify + let conversations = alix_conversations + .list_conversations() + .await + .unwrap(); + + // Ensure the group is included + assert_eq!(conversations.len(), 1, "Alix should have exactly 1 group"); + + // Verify that the last_message is None + assert!( + conversations[0].last_message.is_none(), + "Last message should be None since no messages were sent" + ); + } + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] async fn test_can_sync_all_groups() { let alix = new_test_client().await; diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index 09fa09142..d68f84f73 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -686,25 +686,35 @@ where pub fn list_conversations( &self, - ) -> Result, StoredGroupMessage)>, ClientError> { + ) -> Result, Option)>, ClientError> { Ok(self .store() .conn()? .fetch_conversation_list()? .into_iter() .map(|conversation_item| { - ( - StoredGroupMessage { - id: conversation_item.message_id?, + let message = if let Some(message_id) = conversation_item.message_id { + Some(StoredGroupMessage { + id: message_id, group_id: conversation_item.id.clone(), - decrypted_message_bytes: conversation_item.decrypted_message_bytes?, - sent_at_ns: conversation_item.sent_at_ns?, - sender_installation_id: conversation_item.sender_installation_id?, - sender_inbox_id: conversation_item.sender_inbox_id?, - kind: conversation_item.kind?, - delivery_status: conversation_item.delivery_status?, - }, - MlsGroup::new(self.clone(), conversation_item.id, conversation_item.created_at_ns), + decrypted_message_bytes: conversation_item.decrypted_message_bytes.unwrap(), + sent_at_ns: conversation_item.sent_at_ns.unwrap(), + sender_installation_id: conversation_item.sender_installation_id.unwrap(), + sender_inbox_id: conversation_item.sender_inbox_id.unwrap(), + kind: conversation_item.kind.unwrap(), + delivery_status: conversation_item.delivery_status.unwrap(), + }) + } else { + None + }; + + ( + MlsGroup::new( + self.clone(), + conversation_item.id, + conversation_item.created_at_ns, + ), + message, ) }) .collect()) diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index f9b3e1e7a..35662bc7d 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -272,7 +272,6 @@ impl RetryableError for GroupError { pub struct MlsGroup { pub group_id: Vec, pub created_at_ns: i64, - pub last_message: Option, pub client: Arc, mutex: Arc>, } @@ -290,7 +289,6 @@ impl Clone for MlsGroup { Self { group_id: self.group_id.clone(), created_at_ns: self.created_at_ns, - last_message: self.last_message.clone(), client: self.client.clone(), mutex: self.mutex.clone(), } From 1b783df9292d5cdc951849afccd9281b2ad535f1 Mon Sep 17 00:00:00 2001 From: Mojtaba Chenani Date: Fri, 20 Dec 2024 11:11:08 +0100 Subject: [PATCH 5/7] add a data model instead of pair --- bindings_ffi/src/mls.rs | 129 +++++++++++++++--- xmtp_mls/src/client.rs | 32 ++--- xmtp_mls/src/groups/mod.rs | 5 + .../encrypted_store/conversation_list.rs | 9 +- xmtp_mls/src/storage/encrypted_store/mod.rs | 2 +- .../src/storage/encrypted_store/schema.rs | 2 +- 6 files changed, 133 insertions(+), 46 deletions(-) diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 5283116bd..28bb3fa73 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -972,13 +972,13 @@ impl FfiConversations { let convo_list: Vec> = inner .list_conversations()? .into_iter() - .filter_map(|(mls_group, stored_message)| { - let last_message = stored_message.map(|msg| msg.into()); - - Some(Arc::new(FfiConversationListItem { - conversation: mls_group.into(), - last_message, - })) + .map(|conversation_item| { + Arc::new(FfiConversationListItem { + conversation: conversation_item.group.into(), + last_message: conversation_item + .last_message + .map(|stored_message| stored_message.into()), + }) }) .collect(); @@ -1164,7 +1164,7 @@ pub struct FfiConversation { #[derive(uniffi::Object)] pub struct FfiConversationListItem { conversation: FfiConversation, - last_message: Option + last_message: Option, } impl From> for FfiConversation { @@ -2711,18 +2711,24 @@ mod tests { .unwrap(); // Add messages to the group - group.send("First message".as_bytes().to_vec()).await.unwrap(); - group.send("Second message".as_bytes().to_vec()).await.unwrap(); + group + .send("First message".as_bytes().to_vec()) + .await + .unwrap(); + group + .send("Second message".as_bytes().to_vec()) + .await + .unwrap(); // Step 3: Synchronize conversations - alix_conversations.sync_all_conversations(None).await.unwrap(); - - // Step 4: List conversations and verify - let conversations = alix_conversations - .list_conversations() + alix_conversations + .sync_all_conversations(None) .await .unwrap(); + // Step 4: List conversations and verify + let conversations = alix_conversations.list_conversations().await.unwrap(); + // Ensure the group is included assert_eq!(conversations.len(), 1, "Alix should have exactly 1 group"); @@ -2732,7 +2738,6 @@ mod tests { "Second message".as_bytes().to_vec(), "Last message content should be the most recent" ); - } #[tokio::test(flavor = "multi_thread", worker_threads = 5)] @@ -2744,7 +2749,7 @@ mod tests { let alix_conversations = alix.conversations(); // Step 2: Create a group with Bo but do not send messages - let group = alix_conversations + alix_conversations .create_group( vec![bo.account_address.clone()], FfiCreateGroupOptions::default(), @@ -2753,14 +2758,14 @@ mod tests { .unwrap(); // Step 3: Synchronize conversations - alix_conversations.sync_all_conversations(None).await.unwrap(); - - // Step 4: List conversations and verify - let conversations = alix_conversations - .list_conversations() + alix_conversations + .sync_all_conversations(None) .await .unwrap(); + // Step 4: List conversations and verify + let conversations = alix_conversations.list_conversations().await.unwrap(); + // Ensure the group is included assert_eq!(conversations.len(), 1, "Alix should have exactly 1 group"); @@ -2771,6 +2776,86 @@ mod tests { ); } + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + async fn test_conversation_list_ordering() { + // Step 1: Setup test client + let client = new_test_client().await; + let conversations_api = client.conversations(); + + // Step 2: Create Group A + let group_a = conversations_api + .create_group(vec![], FfiCreateGroupOptions::default()) + .await + .unwrap(); + + // Step 3: Create Group B + let group_b = conversations_api + .create_group(vec![], FfiCreateGroupOptions::default()) + .await + .unwrap(); + + // Step 4: Send a message to Group A + group_a + .send("Message to Group A".as_bytes().to_vec()) + .await + .unwrap(); + + // Step 5: Create Group C + let group_c = conversations_api + .create_group(vec![], FfiCreateGroupOptions::default()) + .await + .unwrap(); + + // Step 6: Synchronize conversations + conversations_api + .sync_all_conversations(None) + .await + .unwrap(); + + // Step 7: Fetch the conversation list + let conversations = conversations_api.list_conversations().await.unwrap(); + + // Step 8: Assert the correct order of conversations + assert_eq!( + conversations.len(), + 3, + "There should be exactly 3 conversations" + ); + + // Verify the order: Group C, Group A, Group B + assert_eq!( + conversations[0].conversation.inner.group_id, group_c.inner.group_id, + "Group C should be the first conversation" + ); + assert_eq!( + conversations[1].conversation.inner.group_id, group_a.inner.group_id, + "Group A should be the second conversation" + ); + assert_eq!( + conversations[2].conversation.inner.group_id, group_b.inner.group_id, + "Group B should be the third conversation" + ); + + // Verify the last_message field for Group A and None for others + assert!( + conversations[0].last_message.is_none(), + "Group C should have no messages" + ); + assert!( + conversations[1].last_message.is_some(), + "Group A should have a last message" + ); + assert_eq!( + conversations[1].last_message.as_ref().unwrap().content, + "Message to Group A".as_bytes().to_vec(), + "Group A's last message content should match" + ); + assert!( + conversations[2].last_message.is_none(), + "Group B should have no messages" + ); + } + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] async fn test_can_sync_all_groups() { let alix = new_test_client().await; diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index d68f84f73..a817f1199 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -33,6 +33,7 @@ use xmtp_proto::xmtp::mls::api::v1::{ #[cfg(any(test, feature = "test-utils"))] use crate::groups::device_sync::WorkerHandle; +use crate::groups::ConversationListItem; use crate::{ api::ApiClientWrapper, groups::{ @@ -684,38 +685,35 @@ where .collect()) } - pub fn list_conversations( - &self, - ) -> Result, Option)>, ClientError> { + pub fn list_conversations(&self) -> Result>, ClientError> { Ok(self .store() .conn()? .fetch_conversation_list()? .into_iter() .map(|conversation_item| { - let message = if let Some(message_id) = conversation_item.message_id { + let message = conversation_item.message_id.and_then(|message_id| { + // Only construct StoredGroupMessage if all fields are Some Some(StoredGroupMessage { id: message_id, group_id: conversation_item.id.clone(), - decrypted_message_bytes: conversation_item.decrypted_message_bytes.unwrap(), - sent_at_ns: conversation_item.sent_at_ns.unwrap(), - sender_installation_id: conversation_item.sender_installation_id.unwrap(), - sender_inbox_id: conversation_item.sender_inbox_id.unwrap(), - kind: conversation_item.kind.unwrap(), - delivery_status: conversation_item.delivery_status.unwrap(), + decrypted_message_bytes: conversation_item.decrypted_message_bytes?, + sent_at_ns: conversation_item.sent_at_ns?, + sender_installation_id: conversation_item.sender_installation_id?, + sender_inbox_id: conversation_item.sender_inbox_id?, + kind: conversation_item.kind?, + delivery_status: conversation_item.delivery_status?, }) - } else { - None - }; + }); - ( - MlsGroup::new( + ConversationListItem { + group: MlsGroup::new( self.clone(), conversation_item.id, conversation_item.created_at_ns, ), - message, - ) + last_message: message, + } }) .collect()) } diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 35662bc7d..0d535df0f 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -276,6 +276,11 @@ pub struct MlsGroup { mutex: Arc>, } +pub struct ConversationListItem { + pub group: MlsGroup, + pub last_message: Option, +} + #[derive(Default)] pub struct GroupMetadataOptions { pub name: Option, diff --git a/xmtp_mls/src/storage/encrypted_store/conversation_list.rs b/xmtp_mls/src/storage/encrypted_store/conversation_list.rs index ef12837d7..8d2b40bf0 100644 --- a/xmtp_mls/src/storage/encrypted_store/conversation_list.rs +++ b/xmtp_mls/src/storage/encrypted_store/conversation_list.rs @@ -2,8 +2,7 @@ use crate::storage::group::{ConversationType, GroupMembershipState}; use crate::storage::group_message::{DeliveryStatus, GroupMessageKind}; use crate::storage::schema::conversation_list::dsl::conversation_list; use crate::storage::{DbConnection, StorageError}; -use crate::{Fetch, FetchListWithKey}; -use diesel::{Identifiable, QueryDsl, Queryable, RunQueryDsl, Table}; +use diesel::{QueryDsl, Queryable, RunQueryDsl, Table}; use serde::{Deserialize, Serialize}; #[derive(Queryable, Debug, Clone, Deserialize, Serialize)] @@ -98,9 +97,9 @@ pub(crate) mod tests { async fn test_three_groups_specific_ordering() { with_connection(|conn| { // Create three groups - let group_a = generate_group_with_created_at(None,5000); // Created after last message - let group_b = generate_group_with_created_at(None,2000); // Created before last message - let group_c = generate_group_with_created_at(None,1000); // Created before last message with no messages + let group_a = generate_group_with_created_at(None, 5000); // Created after last message + let group_b = generate_group_with_created_at(None, 2000); // Created before last message + let group_c = generate_group_with_created_at(None, 1000); // Created before last message with no messages group_a.store(conn).unwrap(); group_b.store(conn).unwrap(); diff --git a/xmtp_mls/src/storage/encrypted_store/mod.rs b/xmtp_mls/src/storage/encrypted_store/mod.rs index 72cc3751a..dfeecd843 100644 --- a/xmtp_mls/src/storage/encrypted_store/mod.rs +++ b/xmtp_mls/src/storage/encrypted_store/mod.rs @@ -12,6 +12,7 @@ pub mod association_state; pub mod consent_record; +mod conversation_list; pub mod db_connection; pub mod group; pub mod group_intent; @@ -28,7 +29,6 @@ pub mod schema; mod sqlcipher_connection; pub mod user_preferences; pub mod wallet_addresses; -mod conversation_list; #[cfg(target_arch = "wasm32")] mod wasm; diff --git a/xmtp_mls/src/storage/encrypted_store/schema.rs b/xmtp_mls/src/storage/encrypted_store/schema.rs index 3834e0443..5ed3eed92 100644 --- a/xmtp_mls/src/storage/encrypted_store/schema.rs +++ b/xmtp_mls/src/storage/encrypted_store/schema.rs @@ -121,7 +121,7 @@ diesel::table! { } } -diesel::table!{ +diesel::table! { conversation_list (id) { id -> Binary, created_at_ns -> BigInt, From 0f88bc4d98a0b034ac8505cbdf80d4192884fced Mon Sep 17 00:00:00 2001 From: Mojtaba Chenani Date: Fri, 20 Dec 2024 18:02:30 +0100 Subject: [PATCH 6/7] fix after merge --- xmtp_mls/Cargo.toml | 1 + .../down.sql | 0 .../up.sql | 10 +++++- xmtp_mls/src/client.rs | 4 +++ .../encrypted_store/conversation_list.rs | 16 +++++++-- .../src/storage/encrypted_store/schema.rs | 36 ++++++++++--------- 6 files changed, 48 insertions(+), 19 deletions(-) rename xmtp_mls/migrations/{2024-12-18-160859_create_conversation_list_view => 2024-12-20-143210_create_conversation_list_view}/down.sql (100%) rename xmtp_mls/migrations/{2024-12-18-160859_create_conversation_list_view => 2024-12-20-143210_create_conversation_list_view}/up.sql (82%) diff --git a/xmtp_mls/Cargo.toml b/xmtp_mls/Cargo.toml index 0e160e485..9954151a5 100644 --- a/xmtp_mls/Cargo.toml +++ b/xmtp_mls/Cargo.toml @@ -107,6 +107,7 @@ diesel = { workspace = true, features = [ "r2d2", "returning_clauses_for_sqlite_3_35", "sqlite", + "32-column-tables" ] } dyn-clone.workspace = true libsqlite3-sys = { workspace = true } diff --git a/xmtp_mls/migrations/2024-12-18-160859_create_conversation_list_view/down.sql b/xmtp_mls/migrations/2024-12-20-143210_create_conversation_list_view/down.sql similarity index 100% rename from xmtp_mls/migrations/2024-12-18-160859_create_conversation_list_view/down.sql rename to xmtp_mls/migrations/2024-12-20-143210_create_conversation_list_view/down.sql diff --git a/xmtp_mls/migrations/2024-12-18-160859_create_conversation_list_view/up.sql b/xmtp_mls/migrations/2024-12-20-143210_create_conversation_list_view/up.sql similarity index 82% rename from xmtp_mls/migrations/2024-12-18-160859_create_conversation_list_view/up.sql rename to xmtp_mls/migrations/2024-12-20-143210_create_conversation_list_view/up.sql index 0cb4bdf08..01d5433ab 100644 --- a/xmtp_mls/migrations/2024-12-18-160859_create_conversation_list_view/up.sql +++ b/xmtp_mls/migrations/2024-12-20-143210_create_conversation_list_view/up.sql @@ -9,6 +9,10 @@ WITH ranked_messages AS ( gm.sender_installation_id, gm.sender_inbox_id, gm.delivery_status, + gm.content_type, + gm.version_major, + gm.version_minor, + gm.authority_id, ROW_NUMBER() OVER (PARTITION BY gm.group_id ORDER BY gm.sent_at_ns DESC) AS row_num FROM group_messages gm @@ -31,7 +35,11 @@ SELECT rm.message_kind, rm.sender_installation_id, rm.sender_inbox_id, - rm.delivery_status + rm.delivery_status, + rm.content_type, + rm.version_major, + rm.version_minor, + rm.authority_id FROM groups g LEFT JOIN ranked_messages rm diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index a817f1199..de06a6061 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -703,6 +703,10 @@ where sender_inbox_id: conversation_item.sender_inbox_id?, kind: conversation_item.kind?, delivery_status: conversation_item.delivery_status?, + content_type: conversation_item.content_type?, + version_major: conversation_item.version_major?, + version_minor: conversation_item.version_minor?, + authority_id: conversation_item.authority_id?, }) }); diff --git a/xmtp_mls/src/storage/encrypted_store/conversation_list.rs b/xmtp_mls/src/storage/encrypted_store/conversation_list.rs index 8d2b40bf0..e82d2fd66 100644 --- a/xmtp_mls/src/storage/encrypted_store/conversation_list.rs +++ b/xmtp_mls/src/storage/encrypted_store/conversation_list.rs @@ -1,5 +1,5 @@ use crate::storage::group::{ConversationType, GroupMembershipState}; -use crate::storage::group_message::{DeliveryStatus, GroupMessageKind}; +use crate::storage::group_message::{ContentType, DeliveryStatus, GroupMessageKind}; use crate::storage::schema::conversation_list::dsl::conversation_list; use crate::storage::{DbConnection, StorageError}; use diesel::{QueryDsl, Queryable, RunQueryDsl, Table}; @@ -42,6 +42,14 @@ pub struct ConversationListItem { pub sender_inbox_id: Option, /// We optimistically store messages before sending. pub delivery_status: Option, + /// The Content Type of the message + pub content_type: Option, + /// The content type version major + pub version_major: Option, + /// The content type version minor + pub version_minor: Option, + /// The ID of the authority defining the content type + pub authority_id: Option, } impl DbConnection { @@ -73,7 +81,8 @@ pub(crate) mod tests { crate::storage::encrypted_store::group_message::tests::generate_message( None, Some(&group.id), - Some(i * 1000), // Increment timestamp for each message + Some(i * 1000), + None, ); message.store(conn).unwrap(); } @@ -109,6 +118,7 @@ pub(crate) mod tests { None, Some(&group_b.id), Some(3000), // Last message timestamp + None, ); message.store(conn).unwrap(); @@ -144,6 +154,7 @@ pub(crate) mod tests { None, Some(&group.id), Some(1000), + None, ); first_message.store(conn).unwrap(); @@ -162,6 +173,7 @@ pub(crate) mod tests { None, Some(&group.id), Some(2000), + None, ); second_message.store(conn).unwrap(); diff --git a/xmtp_mls/src/storage/encrypted_store/schema.rs b/xmtp_mls/src/storage/encrypted_store/schema.rs index 8af0cee31..640b71f44 100644 --- a/xmtp_mls/src/storage/encrypted_store/schema.rs +++ b/xmtp_mls/src/storage/encrypted_store/schema.rs @@ -127,22 +127,26 @@ diesel::table! { diesel::table! { conversation_list (id) { - id -> Binary, - created_at_ns -> BigInt, - membership_state -> Integer, - installations_last_checked -> BigInt, - added_by_inbox_id -> Text, - welcome_id -> Nullable, - dm_inbox_id -> Nullable, - rotated_at_ns -> BigInt, - conversation_type -> Integer, - message_id -> Nullable, - decrypted_message_bytes -> Nullable, - sent_at_ns -> Nullable, - message_kind -> Nullable, - sender_installation_id -> Nullable, - sender_inbox_id -> Nullable, - delivery_status -> Nullable, + id -> Binary, + created_at_ns -> BigInt, + membership_state -> Integer, + installations_last_checked -> BigInt, + added_by_inbox_id -> Text, + welcome_id -> Nullable, + dm_inbox_id -> Nullable, + rotated_at_ns -> BigInt, + conversation_type -> Integer, + message_id -> Nullable, + decrypted_message_bytes -> Nullable, + sent_at_ns -> Nullable, + message_kind -> Nullable, + sender_installation_id -> Nullable, + sender_inbox_id -> Nullable, + delivery_status -> Nullable, + content_type -> Nullable, + version_major -> Nullable, + version_minor -> Nullable, + authority_id -> Nullable } } From f88131afb080a2afbb58aec28e8c5712a5064a0f Mon Sep 17 00:00:00 2001 From: Mojtaba Chenani Date: Fri, 20 Dec 2024 18:15:58 +0100 Subject: [PATCH 7/7] fix clippy --- bindings_ffi/src/mls.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index bb5fee28d..ec9ce607b 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -1163,6 +1163,7 @@ pub struct FfiConversation { } #[derive(uniffi::Object)] +#[allow(unused_variables, dead_code)] pub struct FfiConversationListItem { conversation: FfiConversation, last_message: Option,