From 1c5c59f33726ba182ee91d381685b3da5373a803 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Thu, 19 Dec 2024 17:02:48 -0800 Subject: [PATCH 1/2] Hmac Key updates (#1439) * update the hmac keys to return the conversation id * fix lint issue --- bindings_ffi/src/mls.rs | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 4ac46b3c1..193ea8732 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -534,24 +534,6 @@ impl FfiXmtpClient { scw_verifier: self.inner_client.scw_verifier().clone().clone(), })) } - - pub fn get_hmac_keys(&self) -> Result, GenericError> { - let inner = self.inner_client.as_ref(); - let conversations = inner.find_groups(GroupQueryArgs::default())?; - - let mut keys = vec![]; - for conversation in conversations { - let mut k = conversation - .hmac_keys(-1..=1)? - .into_iter() - .map(Into::into) - .collect::>(); - - keys.append(&mut k); - } - - Ok(keys) - } } impl From for FfiHmacKey { @@ -1110,6 +1092,25 @@ impl FfiConversations { FfiStreamCloser::new(handle) } + + pub fn get_hmac_keys(&self) -> Result, Vec>, GenericError> { + let inner = self.inner_client.as_ref(); + let conversations = inner.find_groups(GroupQueryArgs::default())?; + + let mut hmac_map = HashMap::new(); + for conversation in conversations { + let id = conversation.group_id.clone(); + let keys = conversation + .hmac_keys(-1..=1)? + .into_iter() + .map(Into::into) + .collect::>(); + + hmac_map.insert(id, keys); + } + + Ok(hmac_map) + } } impl From for ConversationType { From db841995bc454452170238d54c13ca4a4da2155e Mon Sep 17 00:00:00 2001 From: Cameron Voell <1103838+cameronvoell@users.noreply.github.com> Date: Thu, 19 Dec 2024 17:18:01 -0800 Subject: [PATCH 2/2] Save group message content types (#1435) Part of https://github.com/xmtp/libxmtp/issues/1403 Adds fields to `group_messages` table corresponding to `ContentTypeId` proto and saves them when sending and receiving messages. `ContentTypeId` for reference: https://github.com/xmtp/proto/blob/404a0f41a6dc00f5de5fcfc24856c8b4e417fe59/proto/mls/message_contents/content.proto#L10-L16 ```proto // ContentTypeId is used to identify the type of content stored in a Message. message ContentTypeId { string authority_id = 1; // authority governing this content type string type_id = 2; // type identifier uint32 version_major = 3; // major version of the type uint32 version_minor = 4; // minor version of the type } ``` --- xmtp_content_types/src/attachment.rs | 6 + xmtp_content_types/src/group_updated.rs | 2 +- xmtp_content_types/src/lib.rs | 12 +- xmtp_content_types/src/membership_change.rs | 2 +- xmtp_content_types/src/reaction.rs | 6 + xmtp_content_types/src/read_receipt.rs | 6 + xmtp_content_types/src/remote_attachment.rs | 6 + xmtp_content_types/src/reply.rs | 6 + xmtp_content_types/src/text.rs | 2 +- .../src/transaction_reference.rs | 6 + .../down.sql | 11 + .../up.sql | 11 + xmtp_mls/src/groups/mls_sync.rs | 44 +++- xmtp_mls/src/groups/mod.rs | 50 ++++- .../storage/encrypted_store/group_message.rs | 203 +++++++++++++++++- .../src/storage/encrypted_store/schema.rs | 4 + 16 files changed, 347 insertions(+), 30 deletions(-) create mode 100644 xmtp_content_types/src/attachment.rs create mode 100644 xmtp_content_types/src/reaction.rs create mode 100644 xmtp_content_types/src/read_receipt.rs create mode 100644 xmtp_content_types/src/remote_attachment.rs create mode 100644 xmtp_content_types/src/reply.rs create mode 100644 xmtp_content_types/src/transaction_reference.rs create mode 100644 xmtp_mls/migrations/2024-12-18-175338_messages_content_type/down.sql create mode 100644 xmtp_mls/migrations/2024-12-18-175338_messages_content_type/up.sql diff --git a/xmtp_content_types/src/attachment.rs b/xmtp_content_types/src/attachment.rs new file mode 100644 index 000000000..fcd908944 --- /dev/null +++ b/xmtp_content_types/src/attachment.rs @@ -0,0 +1,6 @@ +pub struct AttachmentCodec {} + +//. Legacy content type id at https://github.com/xmtp/xmtp-js/blob/main/content-types/content-type-remote-attachment/src/Attachment.ts +impl AttachmentCodec { + pub const TYPE_ID: &'static str = "attachment"; +} diff --git a/xmtp_content_types/src/group_updated.rs b/xmtp_content_types/src/group_updated.rs index 2ab08917a..6ef0e33f8 100644 --- a/xmtp_content_types/src/group_updated.rs +++ b/xmtp_content_types/src/group_updated.rs @@ -10,7 +10,7 @@ pub struct GroupUpdatedCodec {} impl GroupUpdatedCodec { const AUTHORITY_ID: &'static str = "xmtp.org"; - const TYPE_ID: &'static str = "group_updated"; + pub const TYPE_ID: &'static str = "group_updated"; } impl ContentCodec for GroupUpdatedCodec { diff --git a/xmtp_content_types/src/lib.rs b/xmtp_content_types/src/lib.rs index 04a7a3fb9..e6e8d4397 100644 --- a/xmtp_content_types/src/lib.rs +++ b/xmtp_content_types/src/lib.rs @@ -1,16 +1,16 @@ +pub mod attachment; pub mod group_updated; pub mod membership_change; +pub mod reaction; +pub mod read_receipt; +pub mod remote_attachment; +pub mod reply; pub mod text; +pub mod transaction_reference; use thiserror::Error; use xmtp_proto::xmtp::mls::message_contents::{ContentTypeId, EncodedContent}; -pub enum ContentType { - GroupMembershipChange, - GroupUpdated, - Text, -} - #[derive(Debug, Error)] pub enum CodecError { #[error("encode error {0}")] diff --git a/xmtp_content_types/src/membership_change.rs b/xmtp_content_types/src/membership_change.rs index 14401bbb8..bfb65fea7 100644 --- a/xmtp_content_types/src/membership_change.rs +++ b/xmtp_content_types/src/membership_change.rs @@ -12,7 +12,7 @@ pub struct GroupMembershipChangeCodec {} impl GroupMembershipChangeCodec { const AUTHORITY_ID: &'static str = "xmtp.org"; - const TYPE_ID: &'static str = "group_membership_change"; + pub const TYPE_ID: &'static str = "group_membership_change"; } impl ContentCodec for GroupMembershipChangeCodec { diff --git a/xmtp_content_types/src/reaction.rs b/xmtp_content_types/src/reaction.rs new file mode 100644 index 000000000..771f03119 --- /dev/null +++ b/xmtp_content_types/src/reaction.rs @@ -0,0 +1,6 @@ +pub struct ReactionCodec {} + +/// Legacy content type id at https://github.com/xmtp/xmtp-js/blob/main/content-types/content-type-reaction/src/Reaction.ts +impl ReactionCodec { + pub const TYPE_ID: &'static str = "reaction"; +} diff --git a/xmtp_content_types/src/read_receipt.rs b/xmtp_content_types/src/read_receipt.rs new file mode 100644 index 000000000..7c0f34e06 --- /dev/null +++ b/xmtp_content_types/src/read_receipt.rs @@ -0,0 +1,6 @@ +pub struct ReadReceiptCodec {} + +/// Legacy content type id at https://github.com/xmtp/xmtp-js/blob/main/content-types/content-type-read-receipt/src/ReadReceipt.ts +impl ReadReceiptCodec { + pub const TYPE_ID: &'static str = "readReceipt"; +} diff --git a/xmtp_content_types/src/remote_attachment.rs b/xmtp_content_types/src/remote_attachment.rs new file mode 100644 index 000000000..7d190d32b --- /dev/null +++ b/xmtp_content_types/src/remote_attachment.rs @@ -0,0 +1,6 @@ +pub struct RemoteAttachmentCodec {} + +//. Legacy content type id at https://github.com/xmtp/xmtp-js/blob/main/content-types/content-type-remote-attachment/src/RemoteAttachment.ts +impl RemoteAttachmentCodec { + pub const TYPE_ID: &'static str = "remoteStaticAttachment"; +} diff --git a/xmtp_content_types/src/reply.rs b/xmtp_content_types/src/reply.rs new file mode 100644 index 000000000..513effe9d --- /dev/null +++ b/xmtp_content_types/src/reply.rs @@ -0,0 +1,6 @@ +pub struct ReplyCodec {} + +/// Legacy content type id at https://github.com/xmtp/xmtp-js/blob/main/content-types/content-type-reply/src/Reply.ts +impl ReplyCodec { + pub const TYPE_ID: &'static str = "reply"; +} diff --git a/xmtp_content_types/src/text.rs b/xmtp_content_types/src/text.rs index 124fb7831..64bf0c93a 100644 --- a/xmtp_content_types/src/text.rs +++ b/xmtp_content_types/src/text.rs @@ -8,7 +8,7 @@ pub struct TextCodec {} impl TextCodec { const AUTHORITY_ID: &'static str = "xmtp.org"; - const TYPE_ID: &'static str = "text"; + pub const TYPE_ID: &'static str = "text"; const ENCODING_KEY: &'static str = "encoding"; const ENCODING_UTF8: &'static str = "UTF-8"; } diff --git a/xmtp_content_types/src/transaction_reference.rs b/xmtp_content_types/src/transaction_reference.rs new file mode 100644 index 000000000..44df54d6c --- /dev/null +++ b/xmtp_content_types/src/transaction_reference.rs @@ -0,0 +1,6 @@ +pub struct TransactionReferenceCodec {} + +/// Legacy content type id at https://github.com/xmtp/xmtp-js/blob/main/content-types/content-type-transaction-reference/src/TransactionReference.ts +impl TransactionReferenceCodec { + pub const TYPE_ID: &'static str = "transactionReference"; +} diff --git a/xmtp_mls/migrations/2024-12-18-175338_messages_content_type/down.sql b/xmtp_mls/migrations/2024-12-18-175338_messages_content_type/down.sql new file mode 100644 index 000000000..3c9d1a4ec --- /dev/null +++ b/xmtp_mls/migrations/2024-12-18-175338_messages_content_type/down.sql @@ -0,0 +1,11 @@ +ALTER TABLE group_messages + DROP COLUMN authority_id; + +ALTER TABLE group_messages + DROP COLUMN version_major; + +ALTER TABLE group_messages + DROP COLUMN version_minor; + +ALTER TABLE group_messages + DROP COLUMN content_type; diff --git a/xmtp_mls/migrations/2024-12-18-175338_messages_content_type/up.sql b/xmtp_mls/migrations/2024-12-18-175338_messages_content_type/up.sql new file mode 100644 index 000000000..fc3ace48f --- /dev/null +++ b/xmtp_mls/migrations/2024-12-18-175338_messages_content_type/up.sql @@ -0,0 +1,11 @@ +ALTER TABLE group_messages + ADD COLUMN content_type INTEGER NOT NULL DEFAULT 0; + +ALTER TABLE group_messages + ADD COLUMN version_minor INTEGER NOT NULL DEFAULT 0; + +ALTER TABLE group_messages + ADD COLUMN version_major INTEGER NOT NULL DEFAULT 0; + +ALTER TABLE group_messages + ADD COLUMN authority_id TEXT NOT NULL DEFAULT ''; diff --git a/xmtp_mls/src/groups/mls_sync.rs b/xmtp_mls/src/groups/mls_sync.rs index 87109e81a..4550abfc3 100644 --- a/xmtp_mls/src/groups/mls_sync.rs +++ b/xmtp_mls/src/groups/mls_sync.rs @@ -13,9 +13,9 @@ use crate::{ GRPC_DATA_LIMIT, HMAC_SALT, MAX_GROUP_SIZE, MAX_INTENT_PUBLISH_ATTEMPTS, MAX_PAST_EPOCHS, SYNC_UPDATE_INSTALLATIONS_INTERVAL_NS, }, - groups::device_sync::DeviceSyncContent, groups::{ - device_sync::preference_sync::UserPreferenceUpdate, intents::UpdateMetadataIntentData, + device_sync::{preference_sync::UserPreferenceUpdate, DeviceSyncContent}, + intents::UpdateMetadataIntentData, validated_commit::ValidatedCommit, }, hpke::{encrypt_welcome, HpkeError}, @@ -25,15 +25,14 @@ use crate::{ storage::{ db_connection::DbConnection, group_intent::{IntentKind, IntentState, StoredGroupIntent, ID}, - group_message::{DeliveryStatus, GroupMessageKind, StoredGroupMessage}, + group_message::{ContentType, DeliveryStatus, GroupMessageKind, StoredGroupMessage}, refresh_state::EntityKind, serialization::{db_deserialize, db_serialize}, sql_key_store, user_preferences::StoredUserPreferences, StorageError, }, - subscriptions::LocalEvents, - subscriptions::SyncMessage, + subscriptions::{LocalEvents, SyncMessage}, utils::{hash::sha256, id::calculate_message_id, time::hmac_epoch}, xmtp_openmls_provider::XmtpOpenMlsProvider, Delete, Fetch, StoreOrIgnore, @@ -44,7 +43,7 @@ use hmac::{Hmac, Mac}; use openmls::{ credentials::BasicCredential, extensions::Extensions, - framing::{ContentType, ProtocolMessage}, + framing::{ContentType as MlsContentType, ProtocolMessage}, group::{GroupEpoch, StagedCommit}, key_packages::KeyPackage, prelude::{ @@ -546,6 +545,7 @@ where })) => { let message_id = calculate_message_id(&self.group_id, &content, &idempotency_key); + let queryable_content_fields = Self::extract_queryable_content_fields(&content); StoredGroupMessage { id: message_id, group_id: self.group_id.clone(), @@ -555,6 +555,10 @@ where sender_installation_id, sender_inbox_id, delivery_status: DeliveryStatus::Published, + content_type: queryable_content_fields.content_type, + version_major: queryable_content_fields.version_major, + version_minor: queryable_content_fields.version_minor, + authority_id: queryable_content_fields.authority_id, } .store_or_ignore(provider.conn_ref())? } @@ -583,6 +587,10 @@ where sender_installation_id, sender_inbox_id: sender_inbox_id.clone(), delivery_status: DeliveryStatus::Published, + content_type: ContentType::Unknown, + version_major: 0, + version_minor: 0, + authority_id: "unknown".to_string(), } .store_or_ignore(provider.conn_ref())?; @@ -612,6 +620,10 @@ where sender_installation_id, sender_inbox_id, delivery_status: DeliveryStatus::Published, + content_type: ContentType::Unknown, + version_major: 0, + version_minor: 0, + authority_id: "unknown".to_string(), } .store_or_ignore(provider.conn_ref())?; @@ -712,7 +724,7 @@ where discriminant(&other), )), }?; - if !allow_epoch_increment && message.content_type() == ContentType::Commit { + if !allow_epoch_increment && message.content_type() == MlsContentType::Commit { return Err(GroupMessageProcessingError::EpochIncrementNotAllowed); } @@ -933,7 +945,19 @@ where encoded_payload_bytes.as_slice(), ×tamp_ns.to_string(), ); - + let content_type = match encoded_payload.r#type { + Some(ct) => ct, + None => { + tracing::warn!("Missing content type in encoded payload, using default values"); + // Default content type values + xmtp_proto::xmtp::mls::message_contents::ContentTypeId { + authority_id: "unknown".to_string(), + type_id: "unknown".to_string(), + version_major: 0, + version_minor: 0, + } + } + }; let msg = StoredGroupMessage { id: message_id, group_id: group_id.to_vec(), @@ -943,6 +967,10 @@ where sender_installation_id, sender_inbox_id, delivery_status: DeliveryStatus::Published, + content_type: content_type.type_id.into(), + version_major: content_type.version_major as i32, + version_minor: content_type.version_minor as i32, + authority_id: content_type.authority_id.to_string(), }; msg.store_or_ignore(conn)?; diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 35662bc7d..3c1320f6c 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -58,7 +58,7 @@ use self::{ intents::IntentError, validated_commit::CommitValidationError, }; -use crate::storage::StorageError; +use crate::storage::{group_message::ContentType, StorageError}; use xmtp_common::time::now_ns; use xmtp_proto::xmtp::mls::{ api::v1::{ @@ -67,7 +67,7 @@ use xmtp_proto::xmtp::mls::{ }, message_contents::{ plaintext_envelope::{Content, V1}, - PlaintextEnvelope, + EncodedContent, PlaintextEnvelope, }, }; @@ -309,6 +309,38 @@ pub enum UpdateAdminListType { RemoveSuper, } +/// Fields extracted from content of a message that should be stored in the DB +pub struct QueryableContentFields { + pub content_type: ContentType, + pub version_major: i32, + pub version_minor: i32, + pub authority_id: String, +} + +impl Default for QueryableContentFields { + fn default() -> Self { + Self { + content_type: ContentType::Unknown, // Or whatever the appropriate default is + version_major: 0, + version_minor: 0, + authority_id: String::new(), + } + } +} + +impl From for QueryableContentFields { + fn from(content: EncodedContent) -> Self { + let content_type_id = content.r#type.unwrap_or_default(); + + QueryableContentFields { + content_type: content_type_id.type_id.into(), + version_major: content_type_id.version_major as i32, + version_minor: content_type_id.version_minor as i32, + authority_id: content_type_id.authority_id.to_string(), + } + } +} + /// Represents a group, which can contain anywhere from 1 to MAX_GROUP_SIZE inboxes. /// /// This is a wrapper around OpenMLS's `MlsGroup` that handles our application-level configuration @@ -706,6 +738,15 @@ impl MlsGroup { Ok(message_id) } + /// Helper function to extract queryable content fields from a message + fn extract_queryable_content_fields(message: &[u8]) -> QueryableContentFields { + // Return early with default if decoding fails or type is missing + EncodedContent::decode(message) + .inspect_err(|e| tracing::debug!("Failed to decode message as EncodedContent: {}", e)) + .map(QueryableContentFields::from) + .unwrap_or_default() + } + /// Prepare a [`IntentKind::SendMessage`] intent, and [`StoredGroupMessage`] on this users XMTP [`Client`]. /// /// # Arguments @@ -734,6 +775,7 @@ impl MlsGroup { // store this unpublished message locally before sending let message_id = calculate_message_id(&self.group_id, message, &now.to_string()); + let queryable_content_fields = Self::extract_queryable_content_fields(message); let group_message = StoredGroupMessage { id: message_id.clone(), group_id: self.group_id.clone(), @@ -743,6 +785,10 @@ impl MlsGroup { sender_installation_id: self.context().installation_public_key().into(), sender_inbox_id: self.context().inbox_id().to_string(), delivery_status: DeliveryStatus::Unpublished, + content_type: queryable_content_fields.content_type, + version_major: queryable_content_fields.version_major, + version_minor: queryable_content_fields.version_minor, + authority_id: queryable_content_fields.authority_id, }; group_message.store(provider.conn_ref())?; diff --git a/xmtp_mls/src/storage/encrypted_store/group_message.rs b/xmtp_mls/src/storage/encrypted_store/group_message.rs index 743800d79..cfc3d599c 100644 --- a/xmtp_mls/src/storage/encrypted_store/group_message.rs +++ b/xmtp_mls/src/storage/encrypted_store/group_message.rs @@ -7,6 +7,10 @@ use diesel::{ sql_types::Integer, }; use serde::{Deserialize, Serialize}; +use xmtp_content_types::{ + attachment, group_updated, membership_change, reaction, read_receipt, remote_attachment, reply, + text, transaction_reference, +}; use super::{ db_connection::DbConnection, @@ -38,6 +42,14 @@ pub struct StoredGroupMessage { pub sender_inbox_id: String, /// We optimistically store messages before sending. pub delivery_status: DeliveryStatus, + /// The Content Type of the message + pub content_type: ContentType, + /// The content type version major + pub version_major: i32, + /// The content type version minor + pub version_minor: i32, + /// The ID of the authority defining the content type + pub authority_id: String, } #[derive(Clone, Debug, PartialEq)] @@ -77,6 +89,90 @@ where } } +//Legacy content types found at https://github.com/xmtp/xmtp-js/tree/main/content-types +#[repr(i32)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, FromSqlRow, AsExpression)] +#[diesel(sql_type = diesel::sql_types::Integer)] +pub enum ContentType { + Unknown = 0, + Text = 1, + GroupMembershipChange = 2, + GroupUpdated = 3, + Reaction = 4, + ReadReceipt = 5, + Reply = 6, + Attachment = 7, + RemoteAttachment = 8, + TransactionReference = 9, +} + +impl std::fmt::Display for ContentType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let as_string = match self { + Self::Unknown => "unknown", + Self::Text => text::TextCodec::TYPE_ID, + Self::GroupMembershipChange => membership_change::GroupMembershipChangeCodec::TYPE_ID, + Self::GroupUpdated => group_updated::GroupUpdatedCodec::TYPE_ID, + Self::Reaction => reaction::ReactionCodec::TYPE_ID, + Self::ReadReceipt => read_receipt::ReadReceiptCodec::TYPE_ID, + Self::Attachment => attachment::AttachmentCodec::TYPE_ID, + Self::RemoteAttachment => remote_attachment::RemoteAttachmentCodec::TYPE_ID, + Self::Reply => reply::ReplyCodec::TYPE_ID, + Self::TransactionReference => transaction_reference::TransactionReferenceCodec::TYPE_ID, + }; + + write!(f, "{}", as_string) + } +} + +impl From for ContentType { + fn from(type_id: String) -> Self { + match type_id.as_str() { + text::TextCodec::TYPE_ID => Self::Text, + membership_change::GroupMembershipChangeCodec::TYPE_ID => Self::GroupMembershipChange, + group_updated::GroupUpdatedCodec::TYPE_ID => Self::GroupUpdated, + reaction::ReactionCodec::TYPE_ID => Self::Reaction, + read_receipt::ReadReceiptCodec::TYPE_ID => Self::ReadReceipt, + reply::ReplyCodec::TYPE_ID => Self::Reply, + attachment::AttachmentCodec::TYPE_ID => Self::Attachment, + remote_attachment::RemoteAttachmentCodec::TYPE_ID => Self::RemoteAttachment, + transaction_reference::TransactionReferenceCodec::TYPE_ID => Self::TransactionReference, + _ => Self::Unknown, + } + } +} + +impl ToSql for ContentType +where + i32: ToSql, +{ + fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Sqlite>) -> serialize::Result { + out.set_value(*self as i32); + Ok(IsNull::No) + } +} + +impl FromSql for ContentType +where + i32: FromSql, +{ + fn from_sql(bytes: ::RawValue<'_>) -> deserialize::Result { + match i32::from_sql(bytes)? { + 0 => Ok(ContentType::Unknown), + 1 => Ok(ContentType::Text), + 2 => Ok(ContentType::GroupMembershipChange), + 3 => Ok(ContentType::GroupUpdated), + 4 => Ok(ContentType::Reaction), + 5 => Ok(ContentType::ReadReceipt), + 6 => Ok(ContentType::Reply), + 7 => Ok(ContentType::Attachment), + 8 => Ok(ContentType::RemoteAttachment), + 9 => Ok(ContentType::TransactionReference), + x => Err(format!("Unrecognized variant {}", x).into()), + } + } +} + #[repr(i32)] #[derive(Debug, Copy, Clone, Serialize, Deserialize, Eq, PartialEq, FromSqlRow, AsExpression)] #[diesel(sql_type = Integer)] @@ -122,6 +218,7 @@ pub struct MsgQueryArgs { delivery_status: Option, limit: Option, direction: Option, + content_types: Option>, } impl MsgQueryArgs { @@ -179,6 +276,16 @@ impl MsgQueryArgs { self.limit = limit; self } + + pub fn content_types(mut self, content_types: Vec) -> Self { + self.content_types = Some(content_types); + self + } + + pub fn maybe_content_types(mut self, content_types: Option>) -> Self { + self.content_types = content_types; + self + } } impl DbConnection { @@ -209,6 +316,10 @@ impl DbConnection { query = query.filter(dsl::delivery_status.eq(status)); } + if let Some(content_types) = &args.content_types { + query = query.filter(dsl::content_type.eq_any(content_types)); + } + query = match args.direction.as_ref().unwrap_or(&SortDirection::Ascending) { SortDirection::Ascending => query.order(dsl::sent_at_ns.asc()), SortDirection::Descending => query.order(dsl::sent_at_ns.desc()), @@ -294,6 +405,7 @@ pub(crate) mod tests { kind: Option, group_id: Option<&[u8]>, sent_at_ns: Option, + content_type: Option, ) -> StoredGroupMessage { StoredGroupMessage { id: rand_vec::<24>(), @@ -304,6 +416,10 @@ pub(crate) mod tests { sender_inbox_id: "0x0".to_string(), kind: kind.unwrap_or(GroupMessageKind::Application), delivery_status: DeliveryStatus::Unpublished, + content_type: content_type.unwrap_or(ContentType::Unknown), + version_major: 0, + version_minor: 0, + authority_id: "unknown".to_string(), } } @@ -320,7 +436,7 @@ pub(crate) mod tests { async fn it_gets_messages() { with_connection(|conn| { let group = generate_group(None); - let message = generate_message(None, Some(&group.id), None); + let message = generate_message(None, Some(&group.id), None, None); group.store(conn).unwrap(); let id = message.id.clone(); @@ -337,7 +453,7 @@ pub(crate) mod tests { use diesel::result::{DatabaseErrorKind::ForeignKeyViolation, Error::DatabaseError}; with_connection(|conn| { - let message = generate_message(None, None, None); + let message = generate_message(None, None, None, None); assert_err!( message.store(conn), StorageError::DieselResult(DatabaseError(ForeignKeyViolation, _)) @@ -355,7 +471,7 @@ pub(crate) mod tests { group.store(conn).unwrap(); for idx in 0..50 { - let msg = generate_message(None, Some(&group.id), Some(idx)); + let msg = generate_message(None, Some(&group.id), Some(idx), None); assert_ok!(msg.store(conn)); } @@ -388,10 +504,10 @@ pub(crate) mod tests { group.store(conn).unwrap(); let messages = vec![ - generate_message(None, Some(&group.id), Some(1_000)), - generate_message(None, Some(&group.id), Some(100_000)), - generate_message(None, Some(&group.id), Some(10_000)), - generate_message(None, Some(&group.id), Some(1_000_000)), + generate_message(None, Some(&group.id), Some(1_000), None), + generate_message(None, Some(&group.id), Some(100_000), None), + generate_message(None, Some(&group.id), Some(10_000), None), + generate_message(None, Some(&group.id), Some(1_000_000), None), ]; assert_ok!(messages.store(conn)); let message = conn @@ -432,6 +548,7 @@ pub(crate) mod tests { Some(GroupMessageKind::Application), Some(&group.id), None, + Some(ContentType::Text), ); msg.store(conn).unwrap(); } @@ -440,6 +557,7 @@ pub(crate) mod tests { Some(GroupMessageKind::MembershipChange), Some(&group.id), None, + Some(ContentType::GroupMembershipChange), ); msg.store(conn).unwrap(); } @@ -472,10 +590,10 @@ pub(crate) mod tests { group.store(conn).unwrap(); let messages = vec![ - generate_message(None, Some(&group.id), Some(10_000)), - generate_message(None, Some(&group.id), Some(1_000)), - generate_message(None, Some(&group.id), Some(100_000)), - generate_message(None, Some(&group.id), Some(1_000_000)), + generate_message(None, Some(&group.id), Some(10_000), None), + generate_message(None, Some(&group.id), Some(1_000), None), + generate_message(None, Some(&group.id), Some(100_000), None), + generate_message(None, Some(&group.id), Some(1_000_000), None), ]; assert_ok!(messages.store(conn)); @@ -506,4 +624,67 @@ pub(crate) mod tests { }) .await } + + #[wasm_bindgen_test(unsupported = tokio::test)] + async fn it_gets_messages_by_content_type() { + with_connection(|conn| { + let group = generate_group(None); + group.store(conn).unwrap(); + + let messages = vec![ + generate_message(None, Some(&group.id), Some(1_000), Some(ContentType::Text)), + generate_message( + None, + Some(&group.id), + Some(2_000), + Some(ContentType::GroupMembershipChange), + ), + generate_message( + None, + Some(&group.id), + Some(3_000), + Some(ContentType::GroupUpdated), + ), + ]; + assert_ok!(messages.store(conn)); + + // Query for text messages + let text_messages = conn + .get_group_messages( + &group.id, + &MsgQueryArgs::default().content_types(vec![ContentType::Text]), + ) + .unwrap(); + assert_eq!(text_messages.len(), 1); + assert_eq!(text_messages[0].content_type, ContentType::Text); + assert_eq!(text_messages[0].sent_at_ns, 1_000); + + // Query for membership change messages + let membership_messages = conn + .get_group_messages( + &group.id, + &MsgQueryArgs::default() + .content_types(vec![ContentType::GroupMembershipChange]), + ) + .unwrap(); + assert_eq!(membership_messages.len(), 1); + assert_eq!( + membership_messages[0].content_type, + ContentType::GroupMembershipChange + ); + assert_eq!(membership_messages[0].sent_at_ns, 2_000); + + // Query for group updated messages + let updated_messages = conn + .get_group_messages( + &group.id, + &MsgQueryArgs::default().content_types(vec![ContentType::GroupUpdated]), + ) + .unwrap(); + assert_eq!(updated_messages.len(), 1); + assert_eq!(updated_messages[0].content_type, ContentType::GroupUpdated); + assert_eq!(updated_messages[0].sent_at_ns, 3_000); + }) + .await + } } diff --git a/xmtp_mls/src/storage/encrypted_store/schema.rs b/xmtp_mls/src/storage/encrypted_store/schema.rs index c82818161..9dba148f6 100644 --- a/xmtp_mls/src/storage/encrypted_store/schema.rs +++ b/xmtp_mls/src/storage/encrypted_store/schema.rs @@ -41,6 +41,10 @@ diesel::table! { sender_installation_id -> Binary, sender_inbox_id -> Text, delivery_status -> Integer, + content_type -> Integer, + version_minor -> Integer, + version_major -> Integer, + authority_id -> Text, } }