Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save group message content types #1435

Merged
merged 10 commits into from
Dec 20, 2024
6 changes: 6 additions & 0 deletions xmtp_content_types/src/attachment.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pub struct AttachmentCodec {}

//. Legacy content type id at https://github.com/xmtp/xmtp-js/blob/main/content-types/content-type-remote-attachment/src/Attachment.ts
impl AttachmentCodec {
pub const TYPE_ID: &'static str = "attachment";
}
2 changes: 1 addition & 1 deletion xmtp_content_types/src/group_updated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub struct GroupUpdatedCodec {}

impl GroupUpdatedCodec {
const AUTHORITY_ID: &'static str = "xmtp.org";
const TYPE_ID: &'static str = "group_updated";
pub const TYPE_ID: &'static str = "group_updated";
}

impl ContentCodec<GroupUpdated> for GroupUpdatedCodec {
Expand Down
12 changes: 6 additions & 6 deletions xmtp_content_types/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
pub mod attachment;
pub mod group_updated;
pub mod membership_change;
pub mod reaction;
pub mod read_receipt;
pub mod remote_attachment;
pub mod reply;
pub mod text;
pub mod transaction_reference;

use thiserror::Error;
use xmtp_proto::xmtp::mls::message_contents::{ContentTypeId, EncodedContent};

pub enum ContentType {
GroupMembershipChange,
GroupUpdated,
Text,
}

#[derive(Debug, Error)]
pub enum CodecError {
#[error("encode error {0}")]
Expand Down
2 changes: 1 addition & 1 deletion xmtp_content_types/src/membership_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct GroupMembershipChangeCodec {}

impl GroupMembershipChangeCodec {
const AUTHORITY_ID: &'static str = "xmtp.org";
const TYPE_ID: &'static str = "group_membership_change";
pub const TYPE_ID: &'static str = "group_membership_change";
}

impl ContentCodec<GroupMembershipChanges> for GroupMembershipChangeCodec {
Expand Down
6 changes: 6 additions & 0 deletions xmtp_content_types/src/reaction.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pub struct ReactionCodec {}

/// Legacy content type id at https://github.com/xmtp/xmtp-js/blob/main/content-types/content-type-reaction/src/Reaction.ts
impl ReactionCodec {
pub const TYPE_ID: &'static str = "reaction";
}
6 changes: 6 additions & 0 deletions xmtp_content_types/src/read_receipt.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pub struct ReadReceiptCodec {}

/// Legacy content type id at https://github.com/xmtp/xmtp-js/blob/main/content-types/content-type-read-receipt/src/ReadReceipt.ts
impl ReadReceiptCodec {
pub const TYPE_ID: &'static str = "readReceipt";
}
6 changes: 6 additions & 0 deletions xmtp_content_types/src/remote_attachment.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pub struct RemoteAttachmentCodec {}

//. Legacy content type id at https://github.com/xmtp/xmtp-js/blob/main/content-types/content-type-remote-attachment/src/RemoteAttachment.ts
impl RemoteAttachmentCodec {
pub const TYPE_ID: &'static str = "remoteStaticAttachment";
}
6 changes: 6 additions & 0 deletions xmtp_content_types/src/reply.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pub struct ReplyCodec {}

/// Legacy content type id at https://github.com/xmtp/xmtp-js/blob/main/content-types/content-type-reply/src/Reply.ts
impl ReplyCodec {
pub const TYPE_ID: &'static str = "reply";
}
2 changes: 1 addition & 1 deletion xmtp_content_types/src/text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub struct TextCodec {}

impl TextCodec {
const AUTHORITY_ID: &'static str = "xmtp.org";
const TYPE_ID: &'static str = "text";
pub const TYPE_ID: &'static str = "text";
const ENCODING_KEY: &'static str = "encoding";
const ENCODING_UTF8: &'static str = "UTF-8";
}
Expand Down
6 changes: 6 additions & 0 deletions xmtp_content_types/src/transaction_reference.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pub struct TransactionReferenceCodec {}

/// Legacy content type id at https://github.com/xmtp/xmtp-js/blob/main/content-types/content-type-transaction-reference/src/TransactionReference.ts
impl TransactionReferenceCodec {
pub const TYPE_ID: &'static str = "transactionReference";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
ALTER TABLE group_messages
DROP COLUMN authority_id;

ALTER TABLE group_messages
DROP COLUMN version_major;

ALTER TABLE group_messages
DROP COLUMN version_minor;

ALTER TABLE group_messages
DROP COLUMN content_type;
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
ALTER TABLE group_messages
ADD COLUMN content_type INTEGER NOT NULL DEFAULT 0;

ALTER TABLE group_messages
ADD COLUMN version_minor INTEGER NOT NULL DEFAULT 0;

ALTER TABLE group_messages
ADD COLUMN version_major INTEGER NOT NULL DEFAULT 0;

ALTER TABLE group_messages
ADD COLUMN authority_id TEXT NOT NULL DEFAULT '';
44 changes: 36 additions & 8 deletions xmtp_mls/src/groups/mls_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use crate::{
GRPC_DATA_LIMIT, HMAC_SALT, MAX_GROUP_SIZE, MAX_INTENT_PUBLISH_ATTEMPTS, MAX_PAST_EPOCHS,
SYNC_UPDATE_INSTALLATIONS_INTERVAL_NS,
},
groups::device_sync::DeviceSyncContent,
groups::{
device_sync::preference_sync::UserPreferenceUpdate, intents::UpdateMetadataIntentData,
device_sync::{preference_sync::UserPreferenceUpdate, DeviceSyncContent},
intents::UpdateMetadataIntentData,
validated_commit::ValidatedCommit,
},
hpke::{encrypt_welcome, HpkeError},
Expand All @@ -25,15 +25,14 @@ use crate::{
storage::{
db_connection::DbConnection,
group_intent::{IntentKind, IntentState, StoredGroupIntent, ID},
group_message::{DeliveryStatus, GroupMessageKind, StoredGroupMessage},
group_message::{ContentType, DeliveryStatus, GroupMessageKind, StoredGroupMessage},
refresh_state::EntityKind,
serialization::{db_deserialize, db_serialize},
sql_key_store,
user_preferences::StoredUserPreferences,
StorageError,
},
subscriptions::LocalEvents,
subscriptions::SyncMessage,
subscriptions::{LocalEvents, SyncMessage},
utils::{hash::sha256, id::calculate_message_id, time::hmac_epoch},
xmtp_openmls_provider::XmtpOpenMlsProvider,
Delete, Fetch, StoreOrIgnore,
Expand All @@ -44,7 +43,7 @@ use hmac::{Hmac, Mac};
use openmls::{
credentials::BasicCredential,
extensions::Extensions,
framing::{ContentType, ProtocolMessage},
framing::{ContentType as MlsContentType, ProtocolMessage},
group::{GroupEpoch, StagedCommit},
key_packages::KeyPackage,
prelude::{
Expand Down Expand Up @@ -546,6 +545,7 @@ where
})) => {
let message_id =
calculate_message_id(&self.group_id, &content, &idempotency_key);
let queryable_content_fields = Self::extract_queryable_content_fields(&content);
StoredGroupMessage {
id: message_id,
group_id: self.group_id.clone(),
Expand All @@ -555,6 +555,10 @@ where
sender_installation_id,
sender_inbox_id,
delivery_status: DeliveryStatus::Published,
content_type: queryable_content_fields.content_type,
version_major: queryable_content_fields.version_major,
version_minor: queryable_content_fields.version_minor,
authority_id: queryable_content_fields.authority_id,
}
.store_or_ignore(provider.conn_ref())?
}
Expand Down Expand Up @@ -583,6 +587,10 @@ where
sender_installation_id,
sender_inbox_id: sender_inbox_id.clone(),
delivery_status: DeliveryStatus::Published,
content_type: ContentType::Unknown,
version_major: 0,
version_minor: 0,
authority_id: "unknown".to_string(),
}
.store_or_ignore(provider.conn_ref())?;

Expand Down Expand Up @@ -612,6 +620,10 @@ where
sender_installation_id,
sender_inbox_id,
delivery_status: DeliveryStatus::Published,
content_type: ContentType::Unknown,
version_major: 0,
version_minor: 0,
authority_id: "unknown".to_string(),
}
.store_or_ignore(provider.conn_ref())?;

Expand Down Expand Up @@ -712,7 +724,7 @@ where
discriminant(&other),
)),
}?;
if !allow_epoch_increment && message.content_type() == ContentType::Commit {
if !allow_epoch_increment && message.content_type() == MlsContentType::Commit {
return Err(GroupMessageProcessingError::EpochIncrementNotAllowed);
}

Expand Down Expand Up @@ -933,7 +945,19 @@ where
encoded_payload_bytes.as_slice(),
&timestamp_ns.to_string(),
);

let content_type = match encoded_payload.r#type {
Some(ct) => ct,
None => {
tracing::warn!("Missing content type in encoded payload, using default values");
// Default content type values
xmtp_proto::xmtp::mls::message_contents::ContentTypeId {
authority_id: "unknown".to_string(),
type_id: "unknown".to_string(),
version_major: 0,
version_minor: 0,
}
}
};
let msg = StoredGroupMessage {
id: message_id,
group_id: group_id.to_vec(),
Expand All @@ -943,6 +967,10 @@ where
sender_installation_id,
sender_inbox_id,
delivery_status: DeliveryStatus::Published,
content_type: content_type.type_id.into(),
version_major: content_type.version_major as i32,
version_minor: content_type.version_minor as i32,
authority_id: content_type.authority_id.to_string(),
};

msg.store_or_ignore(conn)?;
Expand Down
50 changes: 48 additions & 2 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use self::{
intents::IntentError,
validated_commit::CommitValidationError,
};
use crate::storage::StorageError;
use crate::storage::{group_message::ContentType, StorageError};
use xmtp_common::time::now_ns;
use xmtp_proto::xmtp::mls::{
api::v1::{
Expand All @@ -67,7 +67,7 @@ use xmtp_proto::xmtp::mls::{
},
message_contents::{
plaintext_envelope::{Content, V1},
PlaintextEnvelope,
EncodedContent, PlaintextEnvelope,
},
};

Expand Down Expand Up @@ -309,6 +309,38 @@ pub enum UpdateAdminListType {
RemoveSuper,
}

/// Fields extracted from content of a message that should be stored in the DB
pub struct QueryableContentFields {
insipx marked this conversation as resolved.
Show resolved Hide resolved
pub content_type: ContentType,
pub version_major: i32,
pub version_minor: i32,
pub authority_id: String,
}

impl Default for QueryableContentFields {
fn default() -> Self {
Self {
content_type: ContentType::Unknown, // Or whatever the appropriate default is
version_major: 0,
version_minor: 0,
authority_id: String::new(),
}
}
}

impl From<EncodedContent> for QueryableContentFields {
fn from(content: EncodedContent) -> Self {
let content_type_id = content.r#type.unwrap_or_default();

QueryableContentFields {
content_type: content_type_id.type_id.into(),
version_major: content_type_id.version_major as i32,
version_minor: content_type_id.version_minor as i32,
authority_id: content_type_id.authority_id.to_string(),
}
}
}

/// Represents a group, which can contain anywhere from 1 to MAX_GROUP_SIZE inboxes.
///
/// This is a wrapper around OpenMLS's `MlsGroup` that handles our application-level configuration
Expand Down Expand Up @@ -706,6 +738,15 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
Ok(message_id)
}

/// Helper function to extract queryable content fields from a message
fn extract_queryable_content_fields(message: &[u8]) -> QueryableContentFields {
// Return early with default if decoding fails or type is missing
EncodedContent::decode(message)
.inspect_err(|e| tracing::debug!("Failed to decode message as EncodedContent: {}", e))
.map(QueryableContentFields::from)
.unwrap_or_default()
}

/// Prepare a [`IntentKind::SendMessage`] intent, and [`StoredGroupMessage`] on this users XMTP [`Client`].
///
/// # Arguments
Expand Down Expand Up @@ -734,6 +775,7 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {

// store this unpublished message locally before sending
let message_id = calculate_message_id(&self.group_id, message, &now.to_string());
let queryable_content_fields = Self::extract_queryable_content_fields(message);
let group_message = StoredGroupMessage {
id: message_id.clone(),
group_id: self.group_id.clone(),
Expand All @@ -743,6 +785,10 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
sender_installation_id: self.context().installation_public_key().into(),
sender_inbox_id: self.context().inbox_id().to_string(),
delivery_status: DeliveryStatus::Unpublished,
content_type: queryable_content_fields.content_type,
version_major: queryable_content_fields.version_major,
version_minor: queryable_content_fields.version_minor,
authority_id: queryable_content_fields.authority_id,
};
group_message.store(provider.conn_ref())?;

Expand Down
Loading
Loading