From 363daec242727cc8d7eb07901ed77b60f83f7f8e Mon Sep 17 00:00:00 2001 From: Martin Kysel Date: Fri, 22 Nov 2024 13:48:33 -0500 Subject: [PATCH] Replication Client: Fix Batch message send (#1300) In the previous code we would break down a batch message containing N messages to N network calls with 1 message each. Fix that. --- xmtp_api_grpc/src/replication_client.rs | 44 ++++++++++--------- xmtp_proto/src/convert.rs | 57 +++++++++++-------------- 2 files changed, 48 insertions(+), 53 deletions(-) diff --git a/xmtp_api_grpc/src/replication_client.rs b/xmtp_api_grpc/src/replication_client.rs index d7ae88e2c..97f7fa5c5 100644 --- a/xmtp_api_grpc/src/replication_client.rs +++ b/xmtp_api_grpc/src/replication_client.rs @@ -362,14 +362,8 @@ impl XmtpIdentityClient for ClientV4 { &self, request: PublishIdentityUpdateRequest, ) -> Result { - let client = &mut self.payer_client.clone(); - let res = client - .publish_client_envelopes(PublishClientEnvelopesRequest::try_from(request)?) - .await; - match res { - Ok(_) => Ok(PublishIdentityUpdateResponse {}), - Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)), - } + self.publish_envelopes_to_payer(vec![request]).await?; + Ok(PublishIdentityUpdateResponse {}) } #[tracing::instrument(level = "trace", skip_all)] @@ -482,20 +476,30 @@ impl ClientV4 { } #[tracing::instrument(level = "trace", skip_all)] - async fn publish_envelopes_to_payer< - T: TryInto, - >( + async fn publish_envelopes_to_payer( &self, - items: impl IntoIterator, - ) -> Result<(), Error> { + messages: impl IntoIterator, + ) -> Result<(), Error> + where + T: TryInto, + >::Error: std::error::Error + Send + Sync + 'static, + { let client = &mut self.payer_client.clone(); - for item in items { - let request = item.try_into()?; - let res = client.publish_client_envelopes(request).await; - if let Err(e) = res { - return Err(Error::new(ErrorKind::MlsError).with(e)); - } - } + + let envelopes: Vec = messages + .into_iter() + .map(|message| { + message + .try_into() + .map_err(|e| Error::new(ErrorKind::MlsError).with(e)) + }) + .collect::>()?; + + client + .publish_client_envelopes(PublishClientEnvelopesRequest { envelopes }) + .await + .map_err(|e| Error::new(ErrorKind::MlsError).with(e))?; + Ok(()) } } diff --git a/xmtp_proto/src/convert.rs b/xmtp_proto/src/convert.rs index fe6f2a1e8..7e265f27c 100644 --- a/xmtp_proto/src/convert.rs +++ b/xmtp_proto/src/convert.rs @@ -6,7 +6,6 @@ use crate::xmtp::mls::api::v1::{ }; use crate::xmtp::xmtpv4::envelopes::client_envelope::Payload; use crate::xmtp::xmtpv4::envelopes::{AuthenticatedData, ClientEnvelope}; -use crate::xmtp::xmtpv4::payer_api::PublishClientEnvelopesRequest; use crate::v4_utils::{ build_identity_topic_from_hex_encoded, build_welcome_message_topic, get_group_message_topic, @@ -34,18 +33,16 @@ mod inbox_id { } } -impl TryFrom for PublishClientEnvelopesRequest { +impl TryFrom for ClientEnvelope { type Error = Error; fn try_from(req: UploadKeyPackageRequest) -> Result { if let Some(key_package) = req.key_package.as_ref() { - Ok(PublishClientEnvelopesRequest { - envelopes: vec![ClientEnvelope { - aad: Some(AuthenticatedData::with_topic(get_key_package_topic( - key_package, - )?)), - payload: Some(Payload::UploadKeyPackage(req)), - }], + Ok(ClientEnvelope { + aad: Some(AuthenticatedData::with_topic(get_key_package_topic( + key_package, + )?)), + payload: Some(Payload::UploadKeyPackage(req)), }) } else { Err(Error::new(InternalError(MissingPayloadError))) @@ -53,18 +50,16 @@ impl TryFrom for PublishClientEnvelopesRequest { } } -impl TryFrom for PublishClientEnvelopesRequest { +impl TryFrom for ClientEnvelope { type Error = Error; fn try_from(req: PublishIdentityUpdateRequest) -> Result { if let Some(identity_update) = req.identity_update { - Ok(PublishClientEnvelopesRequest { - envelopes: vec![ClientEnvelope { - aad: Some(AuthenticatedData::with_topic( - build_identity_topic_from_hex_encoded(&identity_update.inbox_id)?, - )), - payload: Some(Payload::IdentityUpdate(identity_update)), - }], + Ok(ClientEnvelope { + aad: Some(AuthenticatedData::with_topic( + build_identity_topic_from_hex_encoded(&identity_update.inbox_id)?, + )), + payload: Some(Payload::IdentityUpdate(identity_update)), }) } else { Err(Error::new(InternalError(MissingPayloadError))) @@ -72,18 +67,16 @@ impl TryFrom for PublishClientEnvelopesRequest { } } -impl TryFrom for PublishClientEnvelopesRequest { +impl TryFrom for ClientEnvelope { type Error = crate::Error; fn try_from(req: GroupMessageInput) -> Result { if let Some(GroupMessageInputVersion::V1(ref version)) = req.version { - Ok(PublishClientEnvelopesRequest { - envelopes: vec![ClientEnvelope { - aad: Some(AuthenticatedData::with_topic(get_group_message_topic( - version.data.clone(), - )?)), - payload: Some(Payload::GroupMessage(req)), - }], + Ok(ClientEnvelope { + aad: Some(AuthenticatedData::with_topic(get_group_message_topic( + version.data.clone(), + )?)), + payload: Some(Payload::GroupMessage(req)), }) } else { Err(Error::new(InternalError(MissingPayloadError))) @@ -91,18 +84,16 @@ impl TryFrom for PublishClientEnvelopesRequest { } } -impl TryFrom for PublishClientEnvelopesRequest { +impl TryFrom for ClientEnvelope { type Error = crate::Error; fn try_from(req: WelcomeMessageInput) -> Result { if let Some(WelcomeMessageVersion::V1(ref version)) = req.version { - Ok(PublishClientEnvelopesRequest { - envelopes: vec![ClientEnvelope { - aad: Some(AuthenticatedData::with_topic(build_welcome_message_topic( - version.installation_key.as_slice(), - ))), - payload: Some(Payload::WelcomeMessage(req)), - }], + Ok(ClientEnvelope { + aad: Some(AuthenticatedData::with_topic(build_welcome_message_topic( + version.installation_key.as_slice(), + ))), + payload: Some(Payload::WelcomeMessage(req)), }) } else { Err(Error::new(InternalError(MissingPayloadError)))