Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(conversations): create conversation list with last message #1437

Merged
merged 15 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -965,6 +965,26 @@ impl FfiConversations {
Ok(convo_list)
}

pub async fn list_conversations(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default order is:
ORDER BY COALESCE(rm.sent_at_ns, g.created_at_ns) DESC

Copy link
Contributor

@nplasterer nplasterer Dec 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand when this is supposed to be used instead of the list() function above? cc @mchenani

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I saw in android, we fetched the conversations and the get the last message one by one and map them together.
we can replace it with this.

&self,
) -> Result<Vec<Arc<FfiConversationListItem>>, GenericError> {
let inner = self.inner_client.as_ref();
let convo_list: Vec<Arc<FfiConversationListItem>> = inner
.list_conversations()?
.into_iter()
.filter_map(|(mls_group, stored_message)| {
let last_message = stored_message.map(|msg| msg.into());

Some(Arc::new(FfiConversationListItem {
conversation: mls_group.into(),
last_message,
}))
})
.collect();

Ok(convo_list)
}

pub async fn list_groups(
&self,
opts: FfiListConversationsOptions,
Expand Down Expand Up @@ -1141,6 +1161,12 @@ pub struct FfiConversation {
inner: MlsGroup<RustXmtpClient>,
}

#[derive(uniffi::Object)]
pub struct FfiConversationListItem {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

introduced this struct to avoid changing all the current functions!

conversation: FfiConversation,
last_message: Option<FfiMessage>
}

impl From<MlsGroup<RustXmtpClient>> for FfiConversation {
fn from(mls_group: MlsGroup<RustXmtpClient>) -> FfiConversation {
FfiConversation { inner: mls_group }
Expand Down Expand Up @@ -2666,6 +2692,85 @@ mod tests {
assert!(stream_messages.is_closed());
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_list_conversations_last_message() {
// Step 1: Setup test client Alix and bo
let alix = new_test_client().await;
let bo = new_test_client().await;

// Step 2: Create a group and add messages
let alix_conversations = alix.conversations();

// Create a group
let group = alix_conversations
.create_group(
vec![bo.account_address.clone()],
FfiCreateGroupOptions::default(),
)
.await
.unwrap();

// Add messages to the group
group.send("First message".as_bytes().to_vec()).await.unwrap();
group.send("Second message".as_bytes().to_vec()).await.unwrap();

// Step 3: Synchronize conversations
alix_conversations.sync_all_conversations(None).await.unwrap();

// Step 4: List conversations and verify
let conversations = alix_conversations
.list_conversations()
.await
.unwrap();

// Ensure the group is included
assert_eq!(conversations.len(), 1, "Alix should have exactly 1 group");

let last_message = conversations[0].last_message.as_ref().unwrap();
assert_eq!(
last_message.content,
"Second message".as_bytes().to_vec(),
"Last message content should be the most recent"
);

}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_list_conversations_no_messages() {
// Step 1: Setup test clients Alix and Bo
let alix = new_test_client().await;
let bo = new_test_client().await;

let alix_conversations = alix.conversations();

// Step 2: Create a group with Bo but do not send messages
let group = alix_conversations
.create_group(
vec![bo.account_address.clone()],
FfiCreateGroupOptions::default(),
)
.await
.unwrap();

// Step 3: Synchronize conversations
alix_conversations.sync_all_conversations(None).await.unwrap();

// Step 4: List conversations and verify
let conversations = alix_conversations
.list_conversations()
.await
.unwrap();

// Ensure the group is included
assert_eq!(conversations.len(), 1, "Alix should have exactly 1 group");

// Verify that the last_message is None
assert!(
conversations[0].last_message.is_none(),
"Last message should be None since no messages were sent"
);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_can_sync_all_groups() {
let alix = new_test_client().await;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP VIEW IF EXISTS conversation_list;
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
CREATE VIEW conversation_list AS
WITH ranked_messages AS (
SELECT
gm.group_id,
gm.id AS message_id,
gm.decrypted_message_bytes,
gm.sent_at_ns,
gm.kind AS message_kind,
gm.sender_installation_id,
gm.sender_inbox_id,
gm.delivery_status,
ROW_NUMBER() OVER (PARTITION BY gm.group_id ORDER BY gm.sent_at_ns DESC) AS row_num
FROM
group_messages gm
WHERE
gm.kind = 1
)
SELECT
g.id AS id,
g.created_at_ns,
g.membership_state,
g.installations_last_checked,
g.added_by_inbox_id,
g.welcome_id,
g.dm_inbox_id,
g.rotated_at_ns,
g.conversation_type,
rm.message_id,
rm.decrypted_message_bytes,
rm.sent_at_ns,
rm.message_kind,
rm.sender_installation_id,
rm.sender_inbox_id,
rm.delivery_status
FROM
groups g
LEFT JOIN ranked_messages rm
ON g.id = rm.group_id AND rm.row_num = 1
ORDER BY COALESCE(rm.sent_at_ns, g.created_at_ns) DESC;
36 changes: 36 additions & 0 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,42 @@ where
.collect())
}

pub fn list_conversations(
&self,
) -> Result<Vec<(MlsGroup<Self>, Option<StoredGroupMessage>)>, ClientError> {
Ok(self
.store()
.conn()?
.fetch_conversation_list()?
.into_iter()
.map(|conversation_item| {
let message = if let Some(message_id) = conversation_item.message_id {
Some(StoredGroupMessage {
id: message_id,
group_id: conversation_item.id.clone(),
decrypted_message_bytes: conversation_item.decrypted_message_bytes.unwrap(),
sent_at_ns: conversation_item.sent_at_ns.unwrap(),
sender_installation_id: conversation_item.sender_installation_id.unwrap(),
sender_inbox_id: conversation_item.sender_inbox_id.unwrap(),
kind: conversation_item.kind.unwrap(),
delivery_status: conversation_item.delivery_status.unwrap(),
})
} else {
None
};

(
MlsGroup::new(
self.clone(),
conversation_item.id,
conversation_item.created_at_ns,
),
message,
)
})
.collect())
}

/// Upload a Key Package to the network and publish the signed identity update
/// from the provided SignatureRequest
pub async fn register_identity(
Expand Down
179 changes: 179 additions & 0 deletions xmtp_mls/src/storage/encrypted_store/conversation_list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
use crate::storage::group::{ConversationType, GroupMembershipState};
use crate::storage::group_message::{DeliveryStatus, GroupMessageKind};
use crate::storage::schema::conversation_list::dsl::conversation_list;
use crate::storage::{DbConnection, StorageError};
use crate::{Fetch, FetchListWithKey};
use diesel::{Identifiable, QueryDsl, Queryable, RunQueryDsl, Table};
use serde::{Deserialize, Serialize};

#[derive(Queryable, Debug, Clone, Deserialize, Serialize)]
#[diesel(table_name = conversation_list)]
#[diesel(primary_key(id))]
/// Combined view of a group and its messages, now named `conversation_list`.
pub struct ConversationListItem {
/// group_id
pub id: Vec<u8>,
/// Based on timestamp of the welcome message
pub created_at_ns: i64,
/// Enum, [`GroupMembershipState`] representing access to the group
pub membership_state: GroupMembershipState,
/// Track when the latest, most recent installations were checked
pub installations_last_checked: i64,
/// The inbox_id of who added the user to the group
pub added_by_inbox_id: String,
/// The sequence id of the welcome message
pub welcome_id: Option<i64>,
/// The inbox_id of the DM target
pub dm_inbox_id: Option<String>,
/// The last time the leaf node encryption key was rotated
pub rotated_at_ns: i64,
/// Enum, [`ConversationType`] signifies the group conversation type which extends to who can access it.
pub conversation_type: ConversationType,
/// Id of the message. Nullable because not every group has messages.
pub message_id: Option<Vec<u8>>,
/// Contents of message after decryption.
pub decrypted_message_bytes: Option<Vec<u8>>,
/// Time in nanoseconds the message was sent.
pub sent_at_ns: Option<i64>,
/// Group Message Kind Enum: 1 = Application, 2 = MembershipChange
pub kind: Option<GroupMessageKind>,
/// The ID of the App Installation this message was sent from.
pub sender_installation_id: Option<Vec<u8>>,
/// The Inbox ID of the Sender
pub sender_inbox_id: Option<String>,
/// We optimistically store messages before sending.
pub delivery_status: Option<DeliveryStatus>,
}

impl DbConnection {
pub fn fetch_conversation_list(&self) -> Result<Vec<ConversationListItem>, StorageError> {
let query = conversation_list
.select(conversation_list::all_columns())
.into_boxed();
Ok(self.raw_query(|conn| query.load::<ConversationListItem>(conn))?)
}
}

#[cfg(test)]
pub(crate) mod tests {
use crate::storage::group::tests::{generate_group, generate_group_with_created_at};
use crate::storage::tests::with_connection;
use crate::Store;
use wasm_bindgen_test::wasm_bindgen_test;

#[wasm_bindgen_test(unsupported = tokio::test)]
async fn test_single_group_multiple_messages() {
with_connection(|conn| {
// Create a group
let group = generate_group(None);
group.store(conn).unwrap();

// Insert multiple messages into the group
for i in 1..5 {
let message =
crate::storage::encrypted_store::group_message::tests::generate_message(
None,
Some(&group.id),
Some(i * 1000), // Increment timestamp for each message
);
message.store(conn).unwrap();
}

// Fetch the conversation list
let conversation_list = conn.fetch_conversation_list().unwrap();
assert_eq!(conversation_list.len(), 1, "Should return one group");
assert_eq!(
conversation_list[0].id, group.id,
"Returned group ID should match the created group"
);
assert_eq!(
conversation_list[0].sent_at_ns.unwrap(),
4000,
"Last message should be the most recent one"
);
})
.await
}
#[wasm_bindgen_test(unsupported = tokio::test)]
async fn test_three_groups_specific_ordering() {
with_connection(|conn| {
// Create three groups
let group_a = generate_group_with_created_at(None,5000); // Created after last message
let group_b = generate_group_with_created_at(None,2000); // Created before last message
let group_c = generate_group_with_created_at(None,1000); // Created before last message with no messages

group_a.store(conn).unwrap();
group_b.store(conn).unwrap();
group_c.store(conn).unwrap();
// Add a message to group_b
let message = crate::storage::encrypted_store::group_message::tests::generate_message(
None,
Some(&group_b.id),
Some(3000), // Last message timestamp
);
message.store(conn).unwrap();

// Fetch the conversation list
let conversation_list = conn.fetch_conversation_list().unwrap();

assert_eq!(conversation_list.len(), 3, "Should return all three groups");
assert_eq!(
conversation_list[0].id, group_a.id,
"Group created after the last message should come first"
);
assert_eq!(
conversation_list[1].id, group_b.id,
"Group with the last message should come second"
);
assert_eq!(
conversation_list[2].id, group_c.id,
"Group created before the last message with no messages should come last"
);
})
.await
}
#[wasm_bindgen_test(unsupported = tokio::test)]
async fn test_group_with_newer_message_update() {
with_connection(|conn| {
// Create a group
let group = generate_group(None);
group.store(conn).unwrap();

// Add an initial message
let first_message =
crate::storage::encrypted_store::group_message::tests::generate_message(
None,
Some(&group.id),
Some(1000),
);
first_message.store(conn).unwrap();

// Fetch the conversation list and check last message
let mut conversation_list = conn.fetch_conversation_list().unwrap();
assert_eq!(conversation_list.len(), 1, "Should return one group");
assert_eq!(
conversation_list[0].sent_at_ns.unwrap(),
1000,
"Last message should match the first message"
);

// Add a newer message
let second_message =
crate::storage::encrypted_store::group_message::tests::generate_message(
None,
Some(&group.id),
Some(2000),
);
second_message.store(conn).unwrap();

// Fetch the conversation list again and validate the last message is updated
conversation_list = conn.fetch_conversation_list().unwrap();
assert_eq!(
conversation_list[0].sent_at_ns.unwrap(),
2000,
"Last message should now match the second (newest) message"
);
})
.await
}
}
Loading
Loading