Skip to content

Commit

Permalink
demonstrates find_messages_with_reactions FFI function
Browse files Browse the repository at this point in the history
  • Loading branch information
cameronvoell committed Nov 28, 2024
1 parent 3696fc8 commit fa72df4
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 7 deletions.
78 changes: 74 additions & 4 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use xmtp_mls::groups::scoped_client::LocalScopedGroupClient;
use xmtp_mls::storage::group::ConversationType;
use xmtp_mls::storage::group_message::MsgQueryArgs;
use xmtp_mls::storage::group_message::SortDirection;
use xmtp_mls::storage::group_message::StoredGroupMessageWithReactions;
use xmtp_mls::{
api::ApiClientWrapper,
builder::ClientBuilder,
Expand Down Expand Up @@ -1285,6 +1286,35 @@ impl FfiConversation {
Ok(messages)
}

pub async fn find_messages_with_reactions(
&self,
opts: FfiListMessagesOptions,
) -> Result<Vec<FfiMessageWithReactions>, GenericError> {
let delivery_status = opts.delivery_status.map(|status| status.into());
let direction = opts.direction.map(|dir| dir.into());
let kind = match self.conversation_type()? {
FfiConversationType::Group => None,
FfiConversationType::Dm => Some(GroupMessageKind::Application),
FfiConversationType::Sync => None,
};

let messages: Vec<FfiMessageWithReactions> = self
.inner
.find_messages_with_reactions(
&MsgQueryArgs::default()
.maybe_sent_before_ns(opts.sent_before_ns)
.maybe_sent_after_ns(opts.sent_after_ns)
.maybe_kind(kind)
.maybe_delivery_status(delivery_status)
.maybe_limit(opts.limit)
.maybe_direction(direction),
)?
.into_iter()
.map(|msg| msg.into())
.collect();
Ok(messages)
}

pub async fn process_streamed_conversation_message(
&self,
envelope_bytes: Vec<u8>,
Expand Down Expand Up @@ -1626,6 +1656,12 @@ pub struct FfiMessage {
pub delivery_status: FfiDeliveryStatus,
}

#[derive(uniffi::Record)]
pub struct FfiMessageWithReactions {
pub message: FfiMessage,
pub reactions: Vec<FfiMessage>,
}

impl From<StoredGroupMessage> for FfiMessage {
fn from(msg: StoredGroupMessage) -> Self {
Self {
Expand All @@ -1640,6 +1676,19 @@ impl From<StoredGroupMessage> for FfiMessage {
}
}

impl From<StoredGroupMessageWithReactions> for FfiMessageWithReactions {
fn from(msg_with_reactions: StoredGroupMessageWithReactions) -> Self {
Self {
message: msg_with_reactions.message.into(),
reactions: msg_with_reactions
.reactions
.into_iter()
.map(|reaction| reaction.into())
.collect(),
}
}
}

#[derive(uniffi::Record, Clone, Default)]
pub struct FfiReaction {
pub reference: String,
Expand Down Expand Up @@ -1868,9 +1917,10 @@ mod tests {
get_inbox_id_for_address, inbox_owner::SigningError, logger::FfiLogger, FfiConsent,
FfiConsentEntityType, FfiConsentState, FfiConversation, FfiConversationCallback,
FfiConversationMessageKind, FfiCreateGroupOptions, FfiGroupPermissionsOptions,
FfiInboxOwner, FfiListConversationsOptions, FfiListMessagesOptions, FfiMetadataField,
FfiPermissionPolicy, FfiPermissionPolicySet, FfiPermissionUpdateType, FfiReaction,
FfiReactionAction, FfiReactionSchema, FfiSubscribeError,
FfiInboxOwner, FfiListConversationsOptions, FfiListMessagesOptions,
FfiMessageWithReactions, FfiMetadataField, FfiPermissionPolicy, FfiPermissionPolicySet,
FfiPermissionUpdateType, FfiReaction, FfiReactionAction, FfiReactionSchema,
FfiSubscribeError,
};
use ethers::utils::hex;
use prost::Message;
Expand Down Expand Up @@ -4513,7 +4563,27 @@ mod tests {
let slice: &[u8] = message_content.as_slice();
let encoded_content = EncodedContent::decode(slice).unwrap();
let reaction = Reaction::decode(encoded_content.content.as_slice()).unwrap();
println!("Encoded content: {:?}", encoded_content);
assert_eq!(reaction.content, "👍");
assert_eq!(reaction.action, ReactionAction::ActionAdded as i32);
assert_eq!(reaction.reference_inbox_id, alix.inbox_id());
assert_eq!(
reaction.reference,
hex::encode(message_to_react_to.id.clone())
);
assert_eq!(reaction.schema, ReactionSchema::SchemaUnicode as i32);

// Test find_messages_with_reactions query
let messages_with_reactions: Vec<FfiMessageWithReactions> = alix_conversation
.find_messages_with_reactions(FfiListMessagesOptions::default())
.await
.unwrap();
assert_eq!(messages_with_reactions.len(), 2);
let message_with_reactions = &messages_with_reactions[1];
assert_eq!(message_with_reactions.reactions.len(), 1);
let message_content = message_with_reactions.reactions[0].content.clone();
let slice: &[u8] = message_content.as_slice();
let encoded_content = EncodedContent::decode(slice).unwrap();
let reaction = Reaction::decode(encoded_content.content.as_slice()).unwrap();
assert_eq!(reaction.content, "👍");
assert_eq!(reaction.action, ReactionAction::ActionAdded as i32);
assert_eq!(reaction.reference_inbox_id, alix.inbox_id());
Expand Down
30 changes: 28 additions & 2 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ use crate::{
db_connection::DbConnection,
group::{ConversationType, GroupMembershipState, StoredGroup},
group_intent::IntentKind,
group_message::{DeliveryStatus, GroupMessageKind, MsgQueryArgs, StoredGroupMessage},
group_message::{
DeliveryStatus, GroupMessageKind, MsgQueryArgs, StoredGroupMessage,
StoredGroupMessageWithReactions,
},
sql_key_store,
},
subscriptions::{LocalEventError, LocalEvents},
Expand Down Expand Up @@ -725,7 +728,19 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
Some(content_type) if content_type.type_id == ReactionCodec::TYPE_ID => {
// Attempt to decode as reaction
match ReactionCodec::decode(encoded_content_clone) {
Ok(reaction) => Some(reaction.reference.into_bytes()),
Ok(reaction) => {
// Decode hex string into bytes
match hex::decode(&reaction.reference) {
Ok(bytes) => Some(bytes),
Err(e) => {
tracing::debug!(
"Failed to decode reaction reference as hex: {}",
e
);
None
}
}
}
Err(e) => {
tracing::debug!("Failed to decode reaction: {}", e);
None
Expand Down Expand Up @@ -761,6 +776,17 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
Ok(messages)
}

/// Query the database for stored messages. Optionally filtered by time, kind, delivery_status
/// and limit
pub fn find_messages_with_reactions(
&self,
args: &MsgQueryArgs,
) -> Result<Vec<StoredGroupMessageWithReactions>, GroupError> {
let conn = self.context().store().conn()?;
let messages = conn.get_group_messages_with_reactions(&self.group_id, args)?;
Ok(messages)
}

///
/// Add members to the group by account address
///
Expand Down
73 changes: 72 additions & 1 deletion xmtp_mls/src/storage/encrypted_store/group_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ use super::{
};
use crate::{impl_fetch, impl_store, impl_store_or_ignore, StorageError};

pub struct StoredGroupMessageWithReactions {
pub message: StoredGroupMessage,
// Messages who's parent_id matches this message's id
pub reactions: Vec<StoredGroupMessage>,
}

#[derive(
Debug, Clone, Serialize, Deserialize, Insertable, Identifiable, Queryable, Eq, PartialEq,
)]
Expand Down Expand Up @@ -116,14 +122,15 @@ impl_fetch!(StoredGroupMessage, group_messages, Vec<u8>);
impl_store!(StoredGroupMessage, group_messages);
impl_store_or_ignore!(StoredGroupMessage, group_messages);

#[derive(Default)]
#[derive(Default, Clone)]
pub struct MsgQueryArgs {
sent_after_ns: Option<i64>,
sent_before_ns: Option<i64>,
kind: Option<GroupMessageKind>,
delivery_status: Option<DeliveryStatus>,
limit: Option<i64>,
direction: Option<SortDirection>,
include_reactions: Option<bool>,
}

impl MsgQueryArgs {
Expand Down Expand Up @@ -194,6 +201,10 @@ impl DbConnection {
let mut query = dsl::group_messages
.filter(dsl::group_id.eq(group_id))
.into_boxed();
// Add filter for reactions based on include_reactions flag
if args.include_reactions == Some(false) {
query = query.filter(dsl::parent_id.is_null());
}

if let Some(sent_after) = args.sent_after_ns {
query = query.filter(dsl::sent_at_ns.gt(sent_after));
Expand Down Expand Up @@ -223,6 +234,66 @@ impl DbConnection {
Ok(self.raw_query(|conn| query.load::<StoredGroupMessage>(conn))?)
}

/// Query for group messages with their reactions
#[allow(clippy::too_many_arguments)]
pub fn get_group_messages_with_reactions(
&self,
group_id: &[u8],
args: &MsgQueryArgs,
) -> Result<Vec<StoredGroupMessageWithReactions>, StorageError> {
// First get all the main messages
let mut modified_args = args.clone();
modified_args.include_reactions = Some(false); // Force include_reactions to false for main query
let messages = self.get_group_messages(group_id, &modified_args)?;

// Then get all reactions for these messages in a single query
let message_ids: Vec<&[u8]> = messages.iter().map(|m| m.id.as_slice()).collect();

let mut reactions_query = dsl::group_messages
.filter(dsl::group_id.eq(group_id))
.filter(dsl::parent_id.is_not_null())
.filter(dsl::parent_id.eq_any(message_ids))
.into_boxed();

// Apply the same sorting as the main messages
reactions_query = match args.direction.as_ref().unwrap_or(&SortDirection::Ascending) {
SortDirection::Ascending => reactions_query.order(dsl::sent_at_ns.asc()),
SortDirection::Descending => reactions_query.order(dsl::sent_at_ns.desc()),
};

let reactions: Vec<StoredGroupMessage> =
self.raw_query(|conn| reactions_query.load(conn))?;

// Group reactions by parent message id
let mut reactions_by_parent: std::collections::HashMap<Vec<u8>, Vec<StoredGroupMessage>> =
std::collections::HashMap::new();

for reaction in reactions {
if let Some(parent_id) = &reaction.parent_id {
reactions_by_parent
.entry(parent_id.clone())
.or_default()
.push(reaction);
}
}

// Combine messages with their reactions
let messages_with_reactions: Vec<StoredGroupMessageWithReactions> = messages
.into_iter()
.map(|message| {
let message_clone = message.clone();
StoredGroupMessageWithReactions {
message,
reactions: reactions_by_parent
.remove(&message_clone.id)
.unwrap_or_default(),
}
})
.collect();

Ok(messages_with_reactions)
}

/// Get a particular group message
pub fn get_group_message<MessageId: AsRef<[u8]>>(
&self,
Expand Down

0 comments on commit fa72df4

Please sign in to comment.