diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 5b97be254..04ade3108 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -28,7 +28,6 @@ use xmtp_mls::groups::group_permissions::PermissionsPolicies; use xmtp_mls::groups::intents::PermissionPolicyOption; use xmtp_mls::groups::intents::PermissionUpdateType; use xmtp_mls::groups::GroupMetadataOptions; -use xmtp_mls::groups::UnpublishedMessage; use xmtp_mls::{ api::ApiClientWrapper, builder::ClientBuilder, @@ -646,28 +645,6 @@ pub struct FfiCreateGroupOptions { pub group_pinned_frame_url: Option, } -#[derive(uniffi::Object)] -pub struct FfiUnpublishedMessage { - message: UnpublishedMessage, -} - -#[uniffi::export(async_runtime = "tokio")] -impl FfiUnpublishedMessage { - pub fn id(&self) -> Vec { - self.message.id().to_vec() - } - - pub async fn publish(&self) -> Result<(), GenericError> { - self.message.publish().await.map_err(Into::into) - } -} - -impl From> for FfiUnpublishedMessage { - fn from(message: UnpublishedMessage) -> FfiUnpublishedMessage { - Self { message } - } -} - impl FfiCreateGroupOptions { pub fn into_group_metadata_options(self) -> GroupMetadataOptions { GroupMetadataOptions { @@ -695,20 +672,27 @@ impl FfiGroup { } /// send a message without immediately publishing to the delivery service. - pub fn send_optimistic( - &self, - content_bytes: Vec, - ) -> Result { + pub fn send_optimistic(&self, content_bytes: Vec) -> Result, GenericError> { let group = MlsGroup::new( self.inner_client.context().clone(), self.group_id.clone(), self.created_at_ns, ); - let message = - group.send_message_optimistic(content_bytes.as_slice(), &self.inner_client)?; + let id = group.send_message_optimistic(content_bytes.as_slice())?; - Ok(message.into()) + Ok(id) + } + + /// Publish all unpublished messages + pub async fn publish_messages(&self) -> Result<(), GenericError> { + let group = MlsGroup::new( + self.inner_client.context().clone(), + self.group_id.clone(), + self.created_at_ns, + ); + group.publish_messages(&self.inner_client).await?; + Ok(()) } pub async fn sync(&self) -> Result<(), GenericError> { diff --git a/bindings_node/src/groups.rs b/bindings_node/src/groups.rs index f9ecf9edd..ab8862f28 100644 --- a/bindings_node/src/groups.rs +++ b/bindings_node/src/groups.rs @@ -10,14 +10,14 @@ use xmtp_mls::groups::{ group_metadata::{ConversationType, GroupMetadata}, group_permissions::GroupMutablePermissions, members::PermissionLevel, - MlsGroup, PreconfiguredPolicies, UnpublishedMessage, UpdateAdminListType, + MlsGroup, PreconfiguredPolicies, UpdateAdminListType, }; use xmtp_proto::xmtp::mls::message_contents::EncodedContent; use crate::{ encoded_content::NapiEncodedContent, messages::{NapiListMessagesOptions, NapiMessage}, - mls_client::{RustXmtpClient, TonicApiClient}, + mls_client::RustXmtpClient, streams::NapiStreamCloser, }; @@ -105,32 +105,6 @@ impl NapiGroupPermissions { } } -#[napi] -pub struct NapiUnpublishedMessage { - message: UnpublishedMessage, -} - -#[napi] -impl NapiUnpublishedMessage { - pub fn id(&self) -> Vec { - self.message.id().to_vec() - } - - pub async fn publish(&self) -> Result<()> { - self - .message - .publish() - .await - .map_err(|e| Error::from_reason(format!("{}", e))) - } -} - -impl From> for NapiUnpublishedMessage { - fn from(message: UnpublishedMessage) -> NapiUnpublishedMessage { - Self { message } - } -} - #[derive(Debug)] #[napi] pub struct NapiGroup { @@ -173,11 +147,9 @@ impl NapiGroup { Ok(hex::encode(message_id.clone())) } + /// send a message without immediately publishing to the delivery service. #[napi] - pub fn send_optimistic( - &self, - encoded_content: NapiEncodedContent, - ) -> Result { + pub fn send_optimistic(&self, encoded_content: NapiEncodedContent) -> Result> { let encoded_content: EncodedContent = encoded_content.into(); let group = MlsGroup::new( self.inner_client.context().clone(), @@ -185,14 +157,26 @@ impl NapiGroup { self.created_at_ns, ); - let message = group - .send_message_optimistic( - encoded_content.encode_to_vec().as_slice(), - &self.inner_client, - ) + let id = group + .send_message_optimistic(encoded_content.encode_to_vec().as_slice()) .map_err(|e| Error::from_reason(format!("{}", e)))?; - Ok(message.into()) + Ok(id) + } + + /// Publish all unpublished messages + #[napi] + pub async fn publish_messages(&self) -> Result<()> { + let group = MlsGroup::new( + self.inner_client.context().clone(), + self.group_id.clone(), + self.created_at_ns, + ); + group + .publish_messages(&self.inner_client) + .await + .map_err(|e| Error::from_reason(format!("{}", e)))?; + Ok(()) } #[napi] diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 9bb7fe4bb..2757158f1 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -53,7 +53,7 @@ use self::{ message_history::MessageHistoryError, validated_commit::CommitValidationError, }; -use std::{collections::HashSet, future::Future, pin::Pin, sync::Arc}; +use std::{collections::HashSet, sync::Arc}; use xmtp_cryptography::signature::{sanitize_evm_addresses, AddressValidationError}; use xmtp_id::InboxId; use xmtp_proto::xmtp::mls::{ @@ -234,46 +234,6 @@ pub enum UpdateAdminListType { RemoveSuper, } -pub type MessagePublishFuture = Pin> + Send>>; - -/// An Unpublished message with an ID that can be `awaited` to publish all messages. -/// This message can be safely dropped, and [`MlsGroup::sync`] called manually instead. -pub struct UnpublishedMessage { - message_id: Vec, - client: Arc>, - group: MlsGroup, -} - -impl UnpublishedMessage -where - ApiClient: XmtpApi, -{ - fn new(message_id: Vec, client: Arc>, group: MlsGroup) -> Self { - Self { - message_id, - client, - group, - } - } - - pub fn id(&self) -> &[u8] { - &self.message_id - } - - /// Publish messages to the delivery service - pub async fn publish(&self) -> Result<(), GroupError> { - let conn = self.group.context.store.conn()?; - let update_interval = Some(5_000_000); - self.group - .maybe_update_installations(conn.clone(), update_interval, self.client.as_ref()) - .await?; - self.group - .publish_intents(conn, self.client.as_ref()) - .await?; - Ok(()) - } -} - impl MlsGroup { // Creates a new group instance. Does not validate that the group exists in the DB pub fn new(context: Arc, group_id: Vec, created_at_ns: i64) -> Self { @@ -488,23 +448,28 @@ impl MlsGroup { message_id } - /// Send a message, optimistically retrieving ID before the result of a message send. - pub fn send_message_optimistic( + /// Publish all unpublished messages + pub async fn publish_messages( &self, - message: &[u8], - client: &Arc>, - ) -> Result, GroupError> + client: &Client, + ) -> Result<(), GroupError> where ApiClient: XmtpApi, { + let conn = self.context.store.conn()?; + let update_interval = Some(5_000_000); + self.maybe_update_installations(conn.clone(), update_interval, client) + .await?; + self.publish_intents(conn, client).await?; + Ok(()) + } + + /// Send a message, optimistically returning the ID of the message before the result of a message publish. + pub fn send_message_optimistic(&self, message: &[u8]) -> Result, GroupError> { let conn = self.context.store.conn()?; let message_id = self.prepare_message(message, &conn)?; - Ok(UnpublishedMessage::new( - message_id, - client.clone(), - self.clone(), - )) + Ok(message_id) } /// Prepare a message (intent & id) on this users XMTP [`Client`]. @@ -1249,7 +1214,7 @@ mod tests { group_mutable_metadata::MetadataField, intents::{PermissionPolicyOption, PermissionUpdateType}, members::{GroupMember, PermissionLevel}, - GroupMetadataOptions, PreconfiguredPolicies, UpdateAdminListType, + DeliveryStatus, GroupMetadataOptions, PreconfiguredPolicies, UpdateAdminListType, }, storage::{ group_intent::IntentState, @@ -2653,35 +2618,75 @@ mod tests { .unwrap(); let bola_group = receive_group_invite(&bola).await; - amal_group - .send_message_optimistic(b"test one", &amal) - .unwrap(); - amal_group - .send_message_optimistic(b"test two", &amal) - .unwrap(); - amal_group - .send_message_optimistic(b"test three", &amal) - .unwrap(); - let four = amal_group - .send_message_optimistic(b"test four", &amal) - .unwrap(); + let ids = vec![ + amal_group.send_message_optimistic(b"test one").unwrap(), + amal_group.send_message_optimistic(b"test two").unwrap(), + amal_group.send_message_optimistic(b"test three").unwrap(), + amal_group.send_message_optimistic(b"test four").unwrap(), + ]; - four.publish().await.unwrap(); + let messages = amal_group + .find_messages(Some(GroupMessageKind::Application), None, None, None, None) + .unwrap() + .into_iter() + .collect::>(); + + let text = messages + .iter() + .cloned() + .map(|m| String::from_utf8_lossy(&m.decrypted_message_bytes).to_string()) + .collect::>(); + assert_eq!( + ids, + messages + .iter() + .cloned() + .map(|m| m.id) + .collect::>>() + ); + assert_eq!( + text, + vec![ + "test one".to_string(), + "test two".to_string(), + "test three".to_string(), + "test four".to_string(), + ] + ); + + let delivery = messages + .iter() + .cloned() + .map(|m| m.delivery_status) + .collect::>(); + assert_eq!( + delivery, + vec![ + DeliveryStatus::Unpublished, + DeliveryStatus::Unpublished, + DeliveryStatus::Unpublished, + DeliveryStatus::Unpublished, + ] + ); + amal_group.publish_messages(&amal).await.unwrap(); bola_group.sync(&bola).await.unwrap(); + let messages = bola_group .find_messages(None, None, None, None, None) .unwrap(); + let delivery = messages + .iter() + .cloned() + .map(|m| m.delivery_status) + .collect::>(); assert_eq!( - messages - .into_iter() - .map(|m| m.decrypted_message_bytes) - .collect::>>(), + delivery, vec![ - b"test one".to_vec(), - b"test two".to_vec(), - b"test three".to_vec(), - b"test four".to_vec(), + DeliveryStatus::Published, + DeliveryStatus::Published, + DeliveryStatus::Published, + DeliveryStatus::Published, ] ); }