Skip to content

Commit

Permalink
Process streamed group message (#593)
Browse files Browse the repository at this point in the history
* first pass at the new group message decoder

* move where this logic was

* add the welcome decoding

* I think I want these functions in these places instead

* make everything await

* bad naming

* fix up the bindings

* remove unneeded code

* fix up the formatting
  • Loading branch information
nplasterer authored Mar 28, 2024
1 parent d0bcb73 commit 124c53f
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 0 deletions.
25 changes: 25 additions & 0 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,19 @@ impl FfiConversations {
Ok(out)
}

pub fn process_streamed_welcome_message(&self, envelope_bytes: Vec<u8>) -> Result<Arc<FfiGroup>, GenericError> {
let inner = self.inner_client.as_ref();
let group = inner.process_streamed_welcome_message(envelope_bytes)?;

let out = Arc::new(FfiGroup {
inner_client: self.inner_client.clone(),
group_id: group.group_id,
created_at_ns: group.created_at_ns,
});

Ok(out)
}

pub async fn sync(&self) -> Result<(), GenericError> {
let inner = self.inner_client.as_ref();
inner.sync_welcomes().await?;
Expand Down Expand Up @@ -356,6 +369,18 @@ impl FfiGroup {
Ok(messages)
}

pub async fn process_streamed_group_message(&self, envelope_bytes: Vec<u8>) -> Result<FfiMessage, GenericError> {
let group = MlsGroup::new(
self.inner_client.as_ref(),
self.group_id.clone(),
self.created_at_ns,
);
let message = group.process_streamed_group_message(envelope_bytes).await?;
let ffi_message = message.into();

Ok(ffi_message)
}

pub fn list_members(&self) -> Result<Vec<FfiGroupMember>, GenericError> {
let group = MlsGroup::new(
self.inner_client.as_ref(),
Expand Down
12 changes: 12 additions & 0 deletions xmtp_mls/src/groups/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::Arc;

use futures::Stream;

use prost::Message;
use xmtp_proto::{api_client::XmtpMlsClient, xmtp::mls::api::v1::GroupMessage};

use super::{extract_message_v1, GroupError, MlsGroup};
Expand Down Expand Up @@ -44,6 +45,17 @@ where
Ok(new_message)
}

pub async fn process_streamed_group_message(
&self,
envelope_bytes: Vec<u8>,
) -> Result<StoredGroupMessage, GroupError> {
let envelope = GroupMessage::decode(envelope_bytes.as_slice())
.map_err(|e| GroupError::Generic(e.to_string()))?;

let message = self.process_stream_entry(envelope).await?;
Ok(message.unwrap())
}

pub async fn stream(
&'c self,
) -> Result<Pin<Box<dyn Stream<Item = StoredGroupMessage> + 'c + Send>>, GroupError> {
Expand Down
12 changes: 12 additions & 0 deletions xmtp_mls/src/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
};

use futures::{Stream, StreamExt};
use prost::Message;
use tokio::sync::oneshot::{self, Sender};
use xmtp_proto::{api_client::XmtpMlsClient, xmtp::mls::api::v1::WelcomeMessage};

Expand Down Expand Up @@ -69,6 +70,17 @@ where
.map_err(|e| ClientError::Generic(e.to_string()))
}

pub fn process_streamed_welcome_message(
&self,
envelope_bytes: Vec<u8>,
) -> Result<MlsGroup<ApiClient>, ClientError> {
let envelope = WelcomeMessage::decode(envelope_bytes.as_slice())
.map_err(|e| ClientError::Generic(e.to_string()))?;

let welcome = self.process_streamed_welcome(envelope)?;
Ok(welcome)
}

pub async fn stream_conversations(
&'a self,
) -> Result<Pin<Box<dyn Stream<Item = MlsGroup<ApiClient>> + Send + 'a>>, ClientError> {
Expand Down

0 comments on commit 124c53f

Please sign in to comment.