Skip to content

Commit

Permalink
test preference streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
codabrink committed Dec 16, 2024
1 parent 6ae1179 commit aab546f
Showing 1 changed file with 59 additions and 18 deletions.
77 changes: 59 additions & 18 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1861,7 +1861,10 @@ impl FfiGroupPermissions {

#[cfg(test)]
mod tests {
use super::{create_client, FfiConsentCallback, FfiMessage, FfiMessageCallback, FfiXmtpClient};
use super::{
create_client, FfiConsentCallback, FfiMessage, FfiMessageCallback, FfiPreferenceCallback,
FfiPreferenceUpdate, FfiXmtpClient,
};
use crate::{
get_inbox_id_for_address, inbox_owner::SigningError, FfiConsent, FfiConsentEntityType,
FfiConsentState, FfiConversation, FfiConversationCallback, FfiConversationMessageKind,
Expand All @@ -1870,12 +1873,9 @@ mod tests {
FfiPermissionPolicySet, FfiPermissionUpdateType, FfiSubscribeError,
};
use ethers::utils::hex;
use std::{
sync::{
atomic::{AtomicU32, Ordering},
Arc, Mutex,
},
time::{Duration, Instant},
use std::sync::{
atomic::{AtomicU32, Ordering},
Arc, Mutex,
};
use tokio::{sync::Notify, time::error::Elapsed};
use xmtp_common::tmp_path;
Expand Down Expand Up @@ -1930,6 +1930,7 @@ mod tests {
messages: Mutex<Vec<FfiMessage>>,
conversations: Mutex<Vec<Arc<FfiConversation>>>,
consent_updates: Mutex<Vec<FfiConsent>>,
preference_updates: Mutex<Vec<FfiPreferenceUpdate>>,
notify: Notify,
inbox_id: Option<String>,
installation_id: Option<String>,
Expand Down Expand Up @@ -2016,6 +2017,23 @@ mod tests {
}
}

impl FfiPreferenceCallback for RustStreamCallback {
fn on_preference_update(&self, mut preference: Vec<super::FfiPreferenceUpdate>) {
log::debug!(
inbox_id = self.inbox_id,
installation_id = self.installation_id,
"received consent update"
);
let mut preference_updates = self.preference_updates.lock().unwrap();
preference_updates.append(&mut preference);
self.notify.notify_one();
}

fn on_error(&self, error: FfiSubscribeError) {
log::error!("{}", error)
}
}

fn static_enc_key() -> EncryptionKey {
[2u8; 32]
}
Expand Down Expand Up @@ -4254,23 +4272,17 @@ mod tests {
let result = stream_a_callback.wait_for_delivery(Some(3)).await;
assert!(result.is_ok());

let start = Instant::now();
loop {
// update the sync group's messages to pipe them into the events
wait_for_ok(|| async {
alix_b
.conversations()
.sync_all_conversations(None)
.await
.unwrap();

if stream_b_callback.wait_for_delivery(Some(1)).await.is_ok() {
break;
}

if start.elapsed() > Duration::from_secs(5) {
panic!("Timed out while waiting for alix_b consent updates.");
}
}
stream_b_callback.wait_for_delivery(Some(1)).await
})
.await
.unwrap();

// two outgoing consent updates
assert_eq!(stream_a_callback.consent_updates_count(), 2);
Expand All @@ -4281,6 +4293,35 @@ mod tests {
b_stream.end_and_wait().await.unwrap();
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_stream_preferences() {
let wallet = generate_local_wallet();
let alix_a = new_test_client_with_wallet_and_history(wallet.clone()).await;
let stream_a_callback = Arc::new(RustStreamCallback::default());

let a_stream = alix_a
.conversations()
.stream_preferences(stream_a_callback.clone())
.await;

let _alix_b = new_test_client_with_wallet_and_history(wallet).await;

let result = stream_a_callback.wait_for_delivery(Some(3)).await;
assert!(result.is_ok());

let update = {
let mut a_updates = stream_a_callback.preference_updates.lock().unwrap();
assert_eq!(a_updates.len(), 1);

a_updates.pop().unwrap()
};

// We got the HMAC update
assert!(matches!(update, FfiPreferenceUpdate::HMAC { .. }));

a_stream.end_and_wait().await.unwrap();
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_set_and_get_group_consent() {
let alix = new_test_client().await;
Expand Down

0 comments on commit aab546f

Please sign in to comment.