From d4d8134a6e892c705eb9720f1cb952e88f2b1055 Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Mon, 8 Jul 2024 11:44:32 -0400 Subject: [PATCH] Fix optimistic sending (#887) * make test more explicit * add `send_optimistic` to the bindings --- .gitignore | 3 + bindings_ffi/src/mls.rs | 40 +++++++++ bindings_node/src/groups.rs | 52 +++++++++++- bindings_node/src/mls_client.rs | 2 +- xmtp_mls/src/groups/mod.rs | 140 +++++++++++++++++++++++++++++--- xmtp_mls/src/groups/sync.rs | 6 +- 6 files changed, 227 insertions(+), 16 deletions(-) diff --git a/.gitignore b/.gitignore index 99499948b..6dff7d3cd 100644 --- a/.gitignore +++ b/.gitignore @@ -122,6 +122,9 @@ dist # Stores VSCode versions used for testing VSCode extensions .vscode-test +# JetBrains IDE Info +.idea/ + # yarn v2 .yarn/cache .yarn/unplugged diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 4361685ae..5b97be254 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -28,6 +28,7 @@ use xmtp_mls::groups::group_permissions::PermissionsPolicies; use xmtp_mls::groups::intents::PermissionPolicyOption; use xmtp_mls::groups::intents::PermissionUpdateType; use xmtp_mls::groups::GroupMetadataOptions; +use xmtp_mls::groups::UnpublishedMessage; use xmtp_mls::{ api::ApiClientWrapper, builder::ClientBuilder, @@ -645,6 +646,28 @@ pub struct FfiCreateGroupOptions { pub group_pinned_frame_url: Option, } +#[derive(uniffi::Object)] +pub struct FfiUnpublishedMessage { + message: UnpublishedMessage, +} + +#[uniffi::export(async_runtime = "tokio")] +impl FfiUnpublishedMessage { + pub fn id(&self) -> Vec { + self.message.id().to_vec() + } + + pub async fn publish(&self) -> Result<(), GenericError> { + self.message.publish().await.map_err(Into::into) + } +} + +impl From> for FfiUnpublishedMessage { + fn from(message: UnpublishedMessage) -> FfiUnpublishedMessage { + Self { message } + } +} + impl FfiCreateGroupOptions { pub fn into_group_metadata_options(self) -> GroupMetadataOptions { GroupMetadataOptions { @@ -671,6 +694,23 @@ impl FfiGroup { Ok(message_id) } + /// send a message without immediately publishing to the delivery service. + pub fn send_optimistic( + &self, + content_bytes: Vec, + ) -> Result { + let group = MlsGroup::new( + self.inner_client.context().clone(), + self.group_id.clone(), + self.created_at_ns, + ); + + let message = + group.send_message_optimistic(content_bytes.as_slice(), &self.inner_client)?; + + Ok(message.into()) + } + pub async fn sync(&self) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.context().clone(), diff --git a/bindings_node/src/groups.rs b/bindings_node/src/groups.rs index 2f3ae2354..f9ecf9edd 100644 --- a/bindings_node/src/groups.rs +++ b/bindings_node/src/groups.rs @@ -10,14 +10,14 @@ use xmtp_mls::groups::{ group_metadata::{ConversationType, GroupMetadata}, group_permissions::GroupMutablePermissions, members::PermissionLevel, - MlsGroup, PreconfiguredPolicies, UpdateAdminListType, + MlsGroup, PreconfiguredPolicies, UnpublishedMessage, UpdateAdminListType, }; use xmtp_proto::xmtp::mls::message_contents::EncodedContent; use crate::{ encoded_content::NapiEncodedContent, messages::{NapiListMessagesOptions, NapiMessage}, - mls_client::RustXmtpClient, + mls_client::{RustXmtpClient, TonicApiClient}, streams::NapiStreamCloser, }; @@ -105,6 +105,32 @@ impl NapiGroupPermissions { } } +#[napi] +pub struct NapiUnpublishedMessage { + message: UnpublishedMessage, +} + +#[napi] +impl NapiUnpublishedMessage { + pub fn id(&self) -> Vec { + self.message.id().to_vec() + } + + pub async fn publish(&self) -> Result<()> { + self + .message + .publish() + .await + .map_err(|e| Error::from_reason(format!("{}", e))) + } +} + +impl From> for NapiUnpublishedMessage { + fn from(message: UnpublishedMessage) -> NapiUnpublishedMessage { + Self { message } + } +} + #[derive(Debug)] #[napi] pub struct NapiGroup { @@ -147,6 +173,28 @@ impl NapiGroup { Ok(hex::encode(message_id.clone())) } + #[napi] + pub fn send_optimistic( + &self, + encoded_content: NapiEncodedContent, + ) -> Result { + let encoded_content: EncodedContent = encoded_content.into(); + let group = MlsGroup::new( + self.inner_client.context().clone(), + self.group_id.clone(), + self.created_at_ns, + ); + + let message = group + .send_message_optimistic( + encoded_content.encode_to_vec().as_slice(), + &self.inner_client, + ) + .map_err(|e| Error::from_reason(format!("{}", e)))?; + + Ok(message.into()) + } + #[napi] pub async fn sync(&self) -> Result<()> { let group = MlsGroup::new( diff --git a/bindings_node/src/mls_client.rs b/bindings_node/src/mls_client.rs index fc641cd08..bf5af92e4 100644 --- a/bindings_node/src/mls_client.rs +++ b/bindings_node/src/mls_client.rs @@ -4,7 +4,7 @@ use napi_derive::napi; use std::collections::HashMap; use std::ops::Deref; use std::sync::Arc; -use xmtp_api_grpc::grpc_api_helper::Client as TonicApiClient; +pub use xmtp_api_grpc::grpc_api_helper::Client as TonicApiClient; use xmtp_cryptography::signature::ed25519_public_key_to_address; use xmtp_id::associations::generate_inbox_id as xmtp_id_generate_inbox_id; use xmtp_id::associations::{ diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 8349e0616..9bb7fe4bb 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -53,7 +53,7 @@ use self::{ message_history::MessageHistoryError, validated_commit::CommitValidationError, }; -use std::{collections::HashSet, sync::Arc}; +use std::{collections::HashSet, future::Future, pin::Pin, sync::Arc}; use xmtp_cryptography::signature::{sanitize_evm_addresses, AddressValidationError}; use xmtp_id::InboxId; use xmtp_proto::xmtp::mls::{ @@ -174,6 +174,10 @@ pub enum GroupError { NoPSKSupport, #[error("Metadata update must specify a metadata field")] InvalidPermissionUpdate, + #[error("The intent publishing task was cancelled")] + PublishCancelled, + #[error("the publish failed to complete due to panic")] + PublishPanicked, } impl RetryableError for GroupError { @@ -230,6 +234,46 @@ pub enum UpdateAdminListType { RemoveSuper, } +pub type MessagePublishFuture = Pin> + Send>>; + +/// An Unpublished message with an ID that can be `awaited` to publish all messages. +/// This message can be safely dropped, and [`MlsGroup::sync`] called manually instead. +pub struct UnpublishedMessage { + message_id: Vec, + client: Arc>, + group: MlsGroup, +} + +impl UnpublishedMessage +where + ApiClient: XmtpApi, +{ + fn new(message_id: Vec, client: Arc>, group: MlsGroup) -> Self { + Self { + message_id, + client, + group, + } + } + + pub fn id(&self) -> &[u8] { + &self.message_id + } + + /// Publish messages to the delivery service + pub async fn publish(&self) -> Result<(), GroupError> { + let conn = self.group.context.store.conn()?; + let update_interval = Some(5_000_000); + self.group + .maybe_update_installations(conn.clone(), update_interval, self.client.as_ref()) + .await?; + self.group + .publish_intents(conn, self.client.as_ref()) + .await?; + Ok(()) + } +} + impl MlsGroup { // Creates a new group instance. Does not validate that the group exists in the DB pub fn new(context: Arc, group_id: Vec, created_at_ns: i64) -> Self { @@ -420,6 +464,7 @@ impl MlsGroup { )) } + /// Send a message on this users XMTP [`Client`]. pub async fn send_message( &self, message: &[u8], @@ -428,12 +473,42 @@ impl MlsGroup { where ApiClient: XmtpApi, { - let conn = self.context.store.conn()?; - let update_interval = Some(5_000_000); // 5 seconds in nanoseconds + let conn = self.context.store.conn()?; self.maybe_update_installations(conn.clone(), update_interval, client) .await?; + let message_id = self.prepare_message(message, &conn); + + // Skipping a full sync here and instead just firing and forgetting + if let Err(err) = self.publish_intents(conn, client).await { + log::error!("Send: error publishing intents: {:?}", err); + } + + message_id + } + + /// Send a message, optimistically retrieving ID before the result of a message send. + pub fn send_message_optimistic( + &self, + message: &[u8], + client: &Arc>, + ) -> Result, GroupError> + where + ApiClient: XmtpApi, + { + let conn = self.context.store.conn()?; + let message_id = self.prepare_message(message, &conn)?; + + Ok(UnpublishedMessage::new( + message_id, + client.clone(), + self.clone(), + )) + } + + /// Prepare a message (intent & id) on this users XMTP [`Client`]. + fn prepare_message(&self, message: &[u8], conn: &DbConnection) -> Result, GroupError> { let now = now_ns(); let plain_envelope = Self::into_envelope(message, &now.to_string()); let mut encoded_envelope = vec![]; @@ -444,7 +519,7 @@ impl MlsGroup { let intent_data: Vec = SendMessageIntentData::new(encoded_envelope).into(); let intent = NewGroupIntent::new(IntentKind::SendMessage, self.group_id.clone(), intent_data); - intent.store(&conn)?; + intent.store(conn)?; // store this unpublished message locally before sending let message_id = calculate_message_id(&self.group_id, message, &now.to_string()); @@ -458,12 +533,8 @@ impl MlsGroup { sender_inbox_id: self.context.inbox_id(), delivery_status: DeliveryStatus::Unpublished, }; - group_message.store(&conn)?; + group_message.store(conn)?; - // Skipping a full sync here and instead just firing and forgetting - if let Err(err) = self.publish_intents(conn, client).await { - log::error!("Send: error publishing intents: {:?}", err); - } Ok(message_id) } @@ -1162,6 +1233,7 @@ fn build_group_join_config() -> MlsGroupJoinConfig { mod tests { use openmls::prelude::{tls_codec::Serialize, Member, MlsGroup as OpenMlsGroup}; use prost::Message; + use std::sync::Arc; use tracing_test::traced_test; use xmtp_cryptography::utils::generate_local_wallet; use xmtp_proto::xmtp::mls::message_contents::EncodedContent; @@ -1211,7 +1283,6 @@ mod tests { { group.sync(client).await.unwrap(); let mut messages = group.find_messages(None, None, None, None, None).unwrap(); - messages.pop().unwrap() } @@ -2565,4 +2636,53 @@ mod tests { let members = bola_group.members().unwrap(); assert_eq!(members.len(), 3); } + + #[tokio::test(flavor = "multi_thread")] + async fn test_optimistic_send() { + let amal = Arc::new(ClientBuilder::new_test_client(&generate_local_wallet()).await); + let bola_wallet = generate_local_wallet(); + let bola = Arc::new(ClientBuilder::new_test_client(&bola_wallet).await); + let amal_group = amal + .create_group(None, GroupMetadataOptions::default()) + .unwrap(); + amal_group.sync(&amal).await.unwrap(); + // Add bola to the group + amal_group + .add_members(&amal, vec![bola_wallet.get_address()]) + .await + .unwrap(); + let bola_group = receive_group_invite(&bola).await; + + amal_group + .send_message_optimistic(b"test one", &amal) + .unwrap(); + amal_group + .send_message_optimistic(b"test two", &amal) + .unwrap(); + amal_group + .send_message_optimistic(b"test three", &amal) + .unwrap(); + let four = amal_group + .send_message_optimistic(b"test four", &amal) + .unwrap(); + + four.publish().await.unwrap(); + + bola_group.sync(&bola).await.unwrap(); + let messages = bola_group + .find_messages(None, None, None, None, None) + .unwrap(); + assert_eq!( + messages + .into_iter() + .map(|m| m.decrypted_message_bytes) + .collect::>>(), + vec![ + b"test one".to_vec(), + b"test two".to_vec(), + b"test three".to_vec(), + b"test four".to_vec(), + ] + ); + } } diff --git a/xmtp_mls/src/groups/sync.rs b/xmtp_mls/src/groups/sync.rs index f6472cd66..2ddd2dc75 100644 --- a/xmtp_mls/src/groups/sync.rs +++ b/xmtp_mls/src/groups/sync.rs @@ -734,13 +734,13 @@ impl MlsGroup { } #[tracing::instrument(level = "trace", skip(conn, self, client))] - pub(super) async fn publish_intents( + pub(super) async fn publish_intents( &self, conn: DbConnection, - client: &Client, + client: &Client, ) -> Result<(), GroupError> where - ClientApi: XmtpApi, + ApiClient: XmtpApi, { let provider = self.context.mls_provider(conn); let mut openmls_group = self.load_mls_group(&provider)?;