Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream preferences #1424

Merged
merged 7 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 106 additions & 18 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use xmtp_id::{
},
InboxId,
};
use xmtp_mls::groups::device_sync::preference_sync::UserPreferenceUpdate;
use xmtp_mls::groups::scoped_client::LocalScopedGroupClient;
use xmtp_mls::groups::HmacKey;
use xmtp_mls::storage::group::ConversationType;
Expand Down Expand Up @@ -1072,6 +1073,8 @@ impl FfiConversations {
FfiStreamCloser::new(handle)
}

/// Get notified when there is a new consent update either locally or is synced from another device
/// allowing the user to re-render the new state appropriately
pub async fn stream_consent(&self, callback: Arc<dyn FfiConsentCallback>) -> FfiStreamCloser {
let handle =
RustXmtpClient::stream_consent_with_callback(self.inner_client.clone(), move |msg| {
Expand All @@ -1083,6 +1086,25 @@ impl FfiConversations {

FfiStreamCloser::new(handle)
}

/// Get notified when a preference changes either locally or is synced from another device
/// allowing the user to re-render the new state appropriately.
pub async fn stream_preferences(
&self,
callback: Arc<dyn FfiPreferenceCallback>,
) -> FfiStreamCloser {
let handle = RustXmtpClient::stream_preferences_with_callback(
self.inner_client.clone(),
move |msg| match msg {
Ok(m) => callback.on_preference_update(
m.into_iter().filter_map(|v| v.try_into().ok()).collect(),
),
Err(e) => callback.on_error(e.into()),
},
);

FfiStreamCloser::new(handle)
}
}

impl From<FfiConversationType> for ConversationType {
Expand All @@ -1095,6 +1117,20 @@ impl From<FfiConversationType> for ConversationType {
}
}

impl TryFrom<UserPreferenceUpdate> for FfiPreferenceUpdate {
type Error = GenericError;
fn try_from(value: UserPreferenceUpdate) -> Result<Self, Self::Error> {
match value {
UserPreferenceUpdate::HmacKeyUpdate { key } => Ok(FfiPreferenceUpdate::HMAC { key }),
// These are filtered out in the stream and should not be here
// We're keeping preference update and consent streams separate right now.
UserPreferenceUpdate::ConsentUpdate(_) => Err(GenericError::Generic {
err: "Consent updates should be filtered out.".to_string(),
}),
}
}
}

#[derive(uniffi::Object)]
pub struct FfiConversation {
inner: MlsGroup<RustXmtpClient>,
Expand Down Expand Up @@ -1755,6 +1791,17 @@ pub trait FfiConsentCallback: Send + Sync {
fn on_error(&self, error: FfiSubscribeError);
}

#[uniffi::export(with_foreign)]
pub trait FfiPreferenceCallback: Send + Sync {
fn on_preference_update(&self, preference: Vec<FfiPreferenceUpdate>);
fn on_error(&self, error: FfiSubscribeError);
}

#[derive(uniffi::Enum)]
pub enum FfiPreferenceUpdate {
HMAC { key: Vec<u8> },
}

#[derive(uniffi::Object)]
pub struct FfiConversationMetadata {
inner: Arc<GroupMetadata>,
Expand Down Expand Up @@ -1818,7 +1865,10 @@ impl FfiGroupPermissions {

#[cfg(test)]
mod tests {
use super::{create_client, FfiConsentCallback, FfiMessage, FfiMessageCallback, FfiXmtpClient};
use super::{
create_client, FfiConsentCallback, FfiMessage, FfiMessageCallback, FfiPreferenceCallback,
FfiPreferenceUpdate, FfiXmtpClient,
};
use crate::{
get_inbox_id_for_address, inbox_owner::SigningError, FfiConsent, FfiConsentEntityType,
FfiConsentState, FfiConversation, FfiConversationCallback, FfiConversationMessageKind,
Expand All @@ -1827,12 +1877,9 @@ mod tests {
FfiPermissionPolicySet, FfiPermissionUpdateType, FfiSubscribeError,
};
use ethers::utils::hex;
use std::{
sync::{
atomic::{AtomicU32, Ordering},
Arc, Mutex,
},
time::{Duration, Instant},
use std::sync::{
atomic::{AtomicU32, Ordering},
Arc, Mutex,
};
use tokio::{sync::Notify, time::error::Elapsed};
use xmtp_common::tmp_path;
Expand Down Expand Up @@ -1887,6 +1934,7 @@ mod tests {
messages: Mutex<Vec<FfiMessage>>,
conversations: Mutex<Vec<Arc<FfiConversation>>>,
consent_updates: Mutex<Vec<FfiConsent>>,
preference_updates: Mutex<Vec<FfiPreferenceUpdate>>,
notify: Notify,
inbox_id: Option<String>,
installation_id: Option<String>,
Expand Down Expand Up @@ -1973,6 +2021,23 @@ mod tests {
}
}

impl FfiPreferenceCallback for RustStreamCallback {
fn on_preference_update(&self, mut preference: Vec<super::FfiPreferenceUpdate>) {
log::debug!(
inbox_id = self.inbox_id,
installation_id = self.installation_id,
"received consent update"
);
let mut preference_updates = self.preference_updates.lock().unwrap();
preference_updates.append(&mut preference);
self.notify.notify_one();
}

fn on_error(&self, error: FfiSubscribeError) {
log::error!("{}", error)
}
}

fn static_enc_key() -> EncryptionKey {
[2u8; 32]
}
Expand Down Expand Up @@ -4211,23 +4276,17 @@ mod tests {
let result = stream_a_callback.wait_for_delivery(Some(3)).await;
assert!(result.is_ok());

let start = Instant::now();
loop {
// update the sync group's messages to pipe them into the events
wait_for_ok(|| async {
alix_b
.conversations()
.sync_all_conversations(None)
.await
.unwrap();

if stream_b_callback.wait_for_delivery(Some(1)).await.is_ok() {
break;
}

if start.elapsed() > Duration::from_secs(5) {
panic!("Timed out while waiting for alix_b consent updates.");
}
}
stream_b_callback.wait_for_delivery(Some(1)).await
})
.await
.unwrap();

// two outgoing consent updates
assert_eq!(stream_a_callback.consent_updates_count(), 2);
Expand All @@ -4238,6 +4297,35 @@ mod tests {
b_stream.end_and_wait().await.unwrap();
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_stream_preferences() {
let wallet = generate_local_wallet();
let alix_a = new_test_client_with_wallet_and_history(wallet.clone()).await;
let stream_a_callback = Arc::new(RustStreamCallback::default());

let a_stream = alix_a
.conversations()
.stream_preferences(stream_a_callback.clone())
.await;

let _alix_b = new_test_client_with_wallet_and_history(wallet).await;

let result = stream_a_callback.wait_for_delivery(Some(3)).await;
assert!(result.is_ok());

let update = {
let mut a_updates = stream_a_callback.preference_updates.lock().unwrap();
assert_eq!(a_updates.len(), 1);

a_updates.pop().unwrap()
};

// We got the HMAC update
assert!(matches!(update, FfiPreferenceUpdate::HMAC { .. }));

a_stream.end_and_wait().await.unwrap();
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_set_and_get_group_consent() {
let alix = new_test_client().await;
Expand Down
61 changes: 61 additions & 0 deletions xmtp_mls/src/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,44 @@ impl<C> LocalEvents<C> {
_ => None,
}
}

fn preference_filter(self) -> Option<Vec<UserPreferenceUpdate>> {
use LocalEvents::*;

match self {
OutgoingPreferenceUpdates(updates) => {
let updates = updates
.into_iter()
.filter_map(|pu| match pu {
UserPreferenceUpdate::ConsentUpdate(_) => None,
_ => Some(pu),
})
.collect();
Some(updates)
}
IncomingPreferenceUpdate(updates) => {
let updates = updates
.into_iter()
.filter_map(|pu| match pu {
UserPreferenceUpdate::ConsentUpdate(_) => None,
_ => Some(pu),
})
.collect();
Some(updates)
}
_ => None,
}
}
}

pub(crate) trait StreamMessages<C> {
fn stream_sync_messages(self) -> impl Stream<Item = Result<LocalEvents<C>, SubscribeError>>;
fn stream_consent_updates(
self,
) -> impl Stream<Item = Result<Vec<StoredConsentRecord>, SubscribeError>>;
fn stream_preference_updates(
self,
) -> impl Stream<Item = Result<Vec<UserPreferenceUpdate>, SubscribeError>>;
}

impl<C> StreamMessages<C> for broadcast::Receiver<LocalEvents<C>>
Expand All @@ -144,6 +175,16 @@ where
.map(Result::Ok)
})
}

fn stream_preference_updates(
self,
) -> impl Stream<Item = Result<Vec<UserPreferenceUpdate>, SubscribeError>> {
BroadcastStream::new(self).filter_map(|event| async {
xmtp_common::optify!(event, "Missed message due to event queue lag")
.and_then(LocalEvents::preference_filter)
.map(Result::Ok)
})
}
}

impl<T> StreamHandle<T> {
Expand Down Expand Up @@ -521,6 +562,26 @@ where
Ok::<_, ClientError>(())
})
}

pub fn stream_preferences_with_callback(
client: Arc<Client<ApiClient, V>>,
mut callback: impl FnMut(Result<Vec<UserPreferenceUpdate>, SubscribeError>) + Send + 'static,
) -> impl crate::StreamHandle<StreamOutput = Result<(), ClientError>> {
let (tx, rx) = oneshot::channel();

crate::spawn(Some(rx), async move {
let receiver = client.local_events.subscribe();
let stream = receiver.stream_preference_updates();

futures::pin_mut!(stream);
let _ = tx.send(());
while let Some(message) = stream.next().await {
callback(message)
}
tracing::debug!("`stream_consent` stream ended, dropping stream");
Ok::<_, ClientError>(())
})
}
}

#[cfg(test)]
Expand Down