diff --git a/Cargo.lock b/Cargo.lock index f54a2ff83..98fc3b557 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6409,6 +6409,7 @@ dependencies = [ "thiserror", "timeago", "tokio", + "tokio-stream", "url", "xmtp_api_grpc", "xmtp_cryptography", diff --git a/xmtp_mls/src/groups/message_history.rs b/xmtp_mls/src/groups/message_history.rs index a5e6deb21..e5d2f8acf 100644 --- a/xmtp_mls/src/groups/message_history.rs +++ b/xmtp_mls/src/groups/message_history.rs @@ -99,7 +99,8 @@ where Ok(()) } - pub async fn send_history_request(&self) -> Result { + // returns (sync_group_id, pin_code) + pub async fn send_history_request(&self) -> Result<(Vec, String), GroupError> { // find the sync group let conn = self.store().conn()?; let sync_group_id = conn @@ -134,9 +135,7 @@ where log::error!("error publishing sync group intents: {:?}", err); } - // TODO: set up stream here?? for the history sync requester - - Ok(pin_code) + Ok((sync_group.group_id, pin_code)) } pub(crate) async fn send_history_reply( @@ -650,7 +649,7 @@ mod tests { assert_ok!(client.allow_history_sync().await); // test that the request is sent, and that the pin code is returned - let pin_code = client + let (_group_id, pin_code) = client .send_history_request() .await .expect("history request"); @@ -682,7 +681,7 @@ mod tests { amal_a.sync_welcomes().await.expect("sync_welcomes"); - let _sent = amal_b + let (_group_id, _pin_code) = amal_b .send_history_request() .await .expect("history request"); @@ -715,7 +714,7 @@ mod tests { amal_a.sync_welcomes().await.expect("sync_welcomes"); - let pin_code = amal_b + let (_, pin_code) = amal_b .send_history_request() .await .expect("history request"); @@ -792,7 +791,7 @@ mod tests { amal_a.sync_welcomes().await.expect("sync_welcomes"); // amal_b sends a message history request to sync group messages - let _pin_code = amal_b + let (_group_id, _pin_code) = amal_b .send_history_request() .await .expect("history request"); diff --git a/xmtp_mls/src/subscriptions.rs b/xmtp_mls/src/subscriptions.rs index 490a8193f..6a1b6a5bc 100644 --- a/xmtp_mls/src/subscriptions.rs +++ b/xmtp_mls/src/subscriptions.rs @@ -128,9 +128,11 @@ where .map_err(|e| ClientError::Generic(e.to_string()))?; let welcome = self.process_streamed_welcome(envelope).await?; + Ok(welcome) } + // really, stream *groups* pub async fn stream_conversations( &self, ) -> Result + Send + '_>>, ClientError> { @@ -173,6 +175,12 @@ where Ok(Box::pin(futures::stream::select(stream, event_queue))) } + pub async fn stream_sync_groups( + &self, + ) -> Result + Send + '_>>, ClientError> { + Self::stream_conversations(self).await + } + #[tracing::instrument(skip(self, group_id_to_info))] pub(crate) async fn stream_messages( self: Arc, @@ -278,26 +286,40 @@ where pub async fn stream_all_messages( client: Arc>, + is_for_sync_groups: bool, ) -> Result, ClientError> { let (tx, rx) = mpsc::unbounded_channel(); client.sync_welcomes().await?; - let mut group_id_to_info = client - .store() - .conn()? - .find_groups(None, None, None, None)? - .into_iter() - .map(Into::into) - .collect::, MessagesStreamInfo>>(); + let mut group_id_to_info; + + if !is_for_sync_groups { + // Gather all regular conversational groups + group_id_to_info = client + .store() + .conn()? + .find_groups(None, None, None, None)? + .into_iter() + .map(Into::into) + .collect::, MessagesStreamInfo>>(); + } else { + // Gather the sync groups + group_id_to_info = client + .store() + .conn()? + .find_sync_groups()? + .into_iter() + .map(Into::into) + .collect::, MessagesStreamInfo>>(); + } tokio::spawn(async move { - let client = client.clone(); + let mut convo_stream = Self::stream_conversations(&client).await?; let mut messages_stream = client .clone() .stream_messages(group_id_to_info.clone()) .await?; - let mut convo_stream = Self::stream_conversations(&client).await?; let mut extra_messages = Vec::new(); loop { @@ -362,7 +384,7 @@ where let (tx, rx) = oneshot::channel(); let handle = tokio::spawn(async move { - let mut stream = Self::stream_all_messages(client).await?; + let mut stream = Self::stream_all_messages(client, false).await?; let _ = tx.send(()); while let Some(message) = stream.next().await { callback(message)