From 25758b468f831f735a0e7ab99e73d6fb4a8910a7 Mon Sep 17 00:00:00 2001 From: Dakota Brink <779390+codabrink@users.noreply.github.com> Date: Mon, 16 Dec 2024 16:16:19 -0500 Subject: [PATCH] Stream preferences (#1424) * streaming wip * the rest of the implementation * test preference streaming * comments * typo --- bindings_ffi/src/mls.rs | 124 +++++++++++++++++++++++++++++----- xmtp_mls/src/subscriptions.rs | 61 +++++++++++++++++ 2 files changed, 167 insertions(+), 18 deletions(-) diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index a9ae8232b..7ba44c7a6 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -16,6 +16,7 @@ use xmtp_id::{ }, InboxId, }; +use xmtp_mls::groups::device_sync::preference_sync::UserPreferenceUpdate; use xmtp_mls::groups::scoped_client::LocalScopedGroupClient; use xmtp_mls::groups::HmacKey; use xmtp_mls::storage::group::ConversationType; @@ -1072,6 +1073,8 @@ impl FfiConversations { FfiStreamCloser::new(handle) } + /// Get notified when there is a new consent update either locally or is synced from another device + /// allowing the user to re-render the new state appropriately pub async fn stream_consent(&self, callback: Arc) -> FfiStreamCloser { let handle = RustXmtpClient::stream_consent_with_callback(self.inner_client.clone(), move |msg| { @@ -1083,6 +1086,25 @@ impl FfiConversations { FfiStreamCloser::new(handle) } + + /// Get notified when a preference changes either locally or is synced from another device + /// allowing the user to re-render the new state appropriately. + pub async fn stream_preferences( + &self, + callback: Arc, + ) -> FfiStreamCloser { + let handle = RustXmtpClient::stream_preferences_with_callback( + self.inner_client.clone(), + move |msg| match msg { + Ok(m) => callback.on_preference_update( + m.into_iter().filter_map(|v| v.try_into().ok()).collect(), + ), + Err(e) => callback.on_error(e.into()), + }, + ); + + FfiStreamCloser::new(handle) + } } impl From for ConversationType { @@ -1095,6 +1117,20 @@ impl From for ConversationType { } } +impl TryFrom for FfiPreferenceUpdate { + type Error = GenericError; + fn try_from(value: UserPreferenceUpdate) -> Result { + match value { + UserPreferenceUpdate::HmacKeyUpdate { key } => Ok(FfiPreferenceUpdate::HMAC { key }), + // These are filtered out in the stream and should not be here + // We're keeping preference update and consent streams separate right now. + UserPreferenceUpdate::ConsentUpdate(_) => Err(GenericError::Generic { + err: "Consent updates should be filtered out.".to_string(), + }), + } + } +} + #[derive(uniffi::Object)] pub struct FfiConversation { inner: MlsGroup, @@ -1755,6 +1791,17 @@ pub trait FfiConsentCallback: Send + Sync { fn on_error(&self, error: FfiSubscribeError); } +#[uniffi::export(with_foreign)] +pub trait FfiPreferenceCallback: Send + Sync { + fn on_preference_update(&self, preference: Vec); + fn on_error(&self, error: FfiSubscribeError); +} + +#[derive(uniffi::Enum)] +pub enum FfiPreferenceUpdate { + HMAC { key: Vec }, +} + #[derive(uniffi::Object)] pub struct FfiConversationMetadata { inner: Arc, @@ -1818,7 +1865,10 @@ impl FfiGroupPermissions { #[cfg(test)] mod tests { - use super::{create_client, FfiConsentCallback, FfiMessage, FfiMessageCallback, FfiXmtpClient}; + use super::{ + create_client, FfiConsentCallback, FfiMessage, FfiMessageCallback, FfiPreferenceCallback, + FfiPreferenceUpdate, FfiXmtpClient, + }; use crate::{ get_inbox_id_for_address, inbox_owner::SigningError, FfiConsent, FfiConsentEntityType, FfiConsentState, FfiConversation, FfiConversationCallback, FfiConversationMessageKind, @@ -1827,12 +1877,9 @@ mod tests { FfiPermissionPolicySet, FfiPermissionUpdateType, FfiSubscribeError, }; use ethers::utils::hex; - use std::{ - sync::{ - atomic::{AtomicU32, Ordering}, - Arc, Mutex, - }, - time::{Duration, Instant}, + use std::sync::{ + atomic::{AtomicU32, Ordering}, + Arc, Mutex, }; use tokio::{sync::Notify, time::error::Elapsed}; use xmtp_common::tmp_path; @@ -1887,6 +1934,7 @@ mod tests { messages: Mutex>, conversations: Mutex>>, consent_updates: Mutex>, + preference_updates: Mutex>, notify: Notify, inbox_id: Option, installation_id: Option, @@ -1973,6 +2021,23 @@ mod tests { } } + impl FfiPreferenceCallback for RustStreamCallback { + fn on_preference_update(&self, mut preference: Vec) { + log::debug!( + inbox_id = self.inbox_id, + installation_id = self.installation_id, + "received consent update" + ); + let mut preference_updates = self.preference_updates.lock().unwrap(); + preference_updates.append(&mut preference); + self.notify.notify_one(); + } + + fn on_error(&self, error: FfiSubscribeError) { + log::error!("{}", error) + } + } + fn static_enc_key() -> EncryptionKey { [2u8; 32] } @@ -4211,23 +4276,17 @@ mod tests { let result = stream_a_callback.wait_for_delivery(Some(3)).await; assert!(result.is_ok()); - let start = Instant::now(); - loop { - // update the sync group's messages to pipe them into the events + wait_for_ok(|| async { alix_b .conversations() .sync_all_conversations(None) .await .unwrap(); - if stream_b_callback.wait_for_delivery(Some(1)).await.is_ok() { - break; - } - - if start.elapsed() > Duration::from_secs(5) { - panic!("Timed out while waiting for alix_b consent updates."); - } - } + stream_b_callback.wait_for_delivery(Some(1)).await + }) + .await + .unwrap(); // two outgoing consent updates assert_eq!(stream_a_callback.consent_updates_count(), 2); @@ -4238,6 +4297,35 @@ mod tests { b_stream.end_and_wait().await.unwrap(); } + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + async fn test_stream_preferences() { + let wallet = generate_local_wallet(); + let alix_a = new_test_client_with_wallet_and_history(wallet.clone()).await; + let stream_a_callback = Arc::new(RustStreamCallback::default()); + + let a_stream = alix_a + .conversations() + .stream_preferences(stream_a_callback.clone()) + .await; + + let _alix_b = new_test_client_with_wallet_and_history(wallet).await; + + let result = stream_a_callback.wait_for_delivery(Some(3)).await; + assert!(result.is_ok()); + + let update = { + let mut a_updates = stream_a_callback.preference_updates.lock().unwrap(); + assert_eq!(a_updates.len(), 1); + + a_updates.pop().unwrap() + }; + + // We got the HMAC update + assert!(matches!(update, FfiPreferenceUpdate::HMAC { .. })); + + a_stream.end_and_wait().await.unwrap(); + } + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] async fn test_set_and_get_group_consent() { let alix = new_test_client().await; diff --git a/xmtp_mls/src/subscriptions.rs b/xmtp_mls/src/subscriptions.rs index d285050c2..f9675d500 100644 --- a/xmtp_mls/src/subscriptions.rs +++ b/xmtp_mls/src/subscriptions.rs @@ -113,6 +113,34 @@ impl LocalEvents { _ => None, } } + + fn preference_filter(self) -> Option> { + use LocalEvents::*; + + match self { + OutgoingPreferenceUpdates(updates) => { + let updates = updates + .into_iter() + .filter_map(|pu| match pu { + UserPreferenceUpdate::ConsentUpdate(_) => None, + _ => Some(pu), + }) + .collect(); + Some(updates) + } + IncomingPreferenceUpdate(updates) => { + let updates = updates + .into_iter() + .filter_map(|pu| match pu { + UserPreferenceUpdate::ConsentUpdate(_) => None, + _ => Some(pu), + }) + .collect(); + Some(updates) + } + _ => None, + } + } } pub(crate) trait StreamMessages { @@ -120,6 +148,9 @@ pub(crate) trait StreamMessages { fn stream_consent_updates( self, ) -> impl Stream, SubscribeError>>; + fn stream_preference_updates( + self, + ) -> impl Stream, SubscribeError>>; } impl StreamMessages for broadcast::Receiver> @@ -144,6 +175,16 @@ where .map(Result::Ok) }) } + + fn stream_preference_updates( + self, + ) -> impl Stream, SubscribeError>> { + BroadcastStream::new(self).filter_map(|event| async { + xmtp_common::optify!(event, "Missed message due to event queue lag") + .and_then(LocalEvents::preference_filter) + .map(Result::Ok) + }) + } } impl StreamHandle { @@ -521,6 +562,26 @@ where Ok::<_, ClientError>(()) }) } + + pub fn stream_preferences_with_callback( + client: Arc>, + mut callback: impl FnMut(Result, SubscribeError>) + Send + 'static, + ) -> impl crate::StreamHandle> { + let (tx, rx) = oneshot::channel(); + + crate::spawn(Some(rx), async move { + let receiver = client.local_events.subscribe(); + let stream = receiver.stream_preference_updates(); + + futures::pin_mut!(stream); + let _ = tx.send(()); + while let Some(message) = stream.next().await { + callback(message) + } + tracing::debug!("`stream_consent` stream ended, dropping stream"); + Ok::<_, ClientError>(()) + }) + } } #[cfg(test)]