From 9d7aa5d16a31e23b6aa21c02506ae9154b054259 Mon Sep 17 00:00:00 2001 From: Dakota Brink <779390+codabrink@users.noreply.github.com> Date: Wed, 20 Nov 2024 15:46:12 -0500 Subject: [PATCH] Stream consent (#1296) * stream consent * getting closer * passes, but needs a better way to wait for event stream * update filter * cleanup * race condition fix --- bindings_ffi/src/mls.rs | 244 +++++++++++++++++++++++------ xmtp_mls/src/client.rs | 2 +- xmtp_mls/src/groups/device_sync.rs | 11 +- xmtp_mls/src/groups/mls_sync.rs | 142 ++++++++--------- xmtp_mls/src/groups/mod.rs | 2 +- xmtp_mls/src/subscriptions.rs | 51 +++++- 6 files changed, 323 insertions(+), 129 deletions(-) diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 4e935c10e..7ab18fe12 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -906,6 +906,12 @@ impl FfiConversations { Ok(()) } + pub fn get_sync_group(&self) -> Result { + let inner = self.inner_client.as_ref(); + let sync_group = inner.get_sync_group()?; + Ok(sync_group.into()) + } + pub async fn sync_all_conversations(&self) -> Result { let inner = self.inner_client.as_ref(); let groups = inner.find_groups(GroupQueryArgs::default().include_sync_groups())?; @@ -1058,6 +1064,18 @@ impl FfiConversations { FfiStreamCloser::new(handle) } + + pub async fn stream_consent(&self, callback: Arc) -> FfiStreamCloser { + let handle = + RustXmtpClient::stream_consent_with_callback(self.inner_client.clone(), move |msg| { + match msg { + Ok(m) => callback.on_consent_update(m.into_iter().map(Into::into).collect()), + Err(e) => callback.on_error(e.into()), + } + }); + + FfiStreamCloser::new(handle) + } } #[derive(uniffi::Object)] @@ -1071,6 +1089,20 @@ impl From> for FfiConversation { } } +impl From for FfiConsent { + fn from(value: StoredConsentRecord) -> Self { + FfiConsent { + entity: value.entity, + entity_type: match value.entity_type { + ConsentType::Address => FfiConsentEntityType::Address, + ConsentType::ConversationId => FfiConsentEntityType::ConversationId, + ConsentType::InboxId => FfiConsentEntityType::InboxId, + }, + state: value.state.into(), + } + } +} + #[derive(uniffi::Record)] pub struct FfiConversationMember { pub inbox_id: String, @@ -1699,6 +1731,12 @@ pub trait FfiConversationCallback: Send + Sync { fn on_error(&self, error: FfiSubscribeError); } +#[uniffi::export(with_foreign)] +pub trait FfiConsentCallback: Send + Sync { + fn on_consent_update(&self, consent: Vec); + fn on_error(&self, error: FfiSubscribeError); +} + #[derive(uniffi::Object)] pub struct FfiConversationMetadata { inner: Arc, @@ -1762,7 +1800,7 @@ impl FfiGroupPermissions { #[cfg(test)] mod tests { - use super::{create_client, FfiMessage, FfiMessageCallback, FfiXmtpClient}; + use super::{create_client, FfiConsentCallback, FfiMessage, FfiMessageCallback, FfiXmtpClient}; use crate::{ get_inbox_id_for_address, inbox_owner::SigningError, logger::FfiLogger, FfiConsent, FfiConsentEntityType, FfiConsentState, FfiConversation, FfiConversationCallback, @@ -1778,6 +1816,7 @@ mod tests { atomic::{AtomicU32, Ordering}, Arc, Mutex, }, + time::{Duration, Instant}, }; use tokio::{sync::Notify, time::error::Elapsed}; use xmtp_cryptography::{signature::RecoverableSignature, utils::rng}; @@ -1785,7 +1824,13 @@ mod tests { generate_inbox_id, unverified::{UnverifiedRecoverableEcdsaSignature, UnverifiedSignature}, }; - use xmtp_mls::{groups::GroupError, storage::EncryptionKey, InboxOwner}; + use xmtp_mls::{ + groups::{scoped_client::LocalScopedGroupClient, GroupError}, + storage::EncryptionKey, + InboxOwner, + }; + + const HISTORY_SYNC_URL: &str = "http://localhost:5558"; #[derive(Clone)] pub struct LocalWalletInboxOwner { @@ -1826,12 +1871,13 @@ mod tests { } } - #[derive(Default, Clone)] + #[derive(Default)] struct RustStreamCallback { - num_messages: Arc, - messages: Arc>>, - conversations: Arc>>>, - notify: Arc, + num_messages: AtomicU32, + messages: Mutex>, + conversations: Mutex>>, + consent_updates: Mutex>, + notify: Notify, } impl RustStreamCallback { @@ -1839,6 +1885,10 @@ mod tests { self.num_messages.load(Ordering::SeqCst) } + pub fn consent_updates_count(&self) -> usize { + self.consent_updates.lock().unwrap().len() + } + pub async fn wait_for_delivery(&self, timeout_secs: Option) -> Result<(), Elapsed> { tokio::time::timeout( std::time::Duration::from_secs(timeout_secs.unwrap_or(60)), @@ -1880,6 +1930,19 @@ mod tests { } } + impl FfiConsentCallback for RustStreamCallback { + fn on_consent_update(&self, mut consent: Vec) { + log::debug!("received consent update"); + let mut consent_updates = self.consent_updates.lock().unwrap(); + consent_updates.append(&mut consent); + self.notify.notify_one(); + } + + fn on_error(&self, error: FfiSubscribeError) { + log::error!("{}", error) + } + } + pub fn rand_string() -> String { Alphanumeric.sample_string(&mut rand::thread_rng(), 24) } @@ -1909,6 +1972,20 @@ mod tests { /// Create a new test client with a given wallet. async fn new_test_client_with_wallet( wallet: xmtp_cryptography::utils::LocalWallet, + ) -> Arc { + new_test_client_with_wallet_and_history_sync_url(wallet, None).await + } + + async fn new_test_client_with_wallet_and_history( + wallet: xmtp_cryptography::utils::LocalWallet, + ) -> Arc { + new_test_client_with_wallet_and_history_sync_url(wallet, Some(HISTORY_SYNC_URL.to_string())) + .await + } + + async fn new_test_client_with_wallet_and_history_sync_url( + wallet: xmtp_cryptography::utils::LocalWallet, + history_sync_url: Option, ) -> Arc { let ffi_inbox_owner = LocalWalletInboxOwner::with_wallet(wallet); let nonce = 1; @@ -1924,7 +2001,7 @@ mod tests { ffi_inbox_owner.get_address(), nonce, None, - None, + history_sync_url, ) .await .unwrap(); @@ -1938,6 +2015,12 @@ mod tests { new_test_client_with_wallet(wallet).await } + async fn new_test_client_with_history() -> Arc { + let wallet = xmtp_cryptography::utils::LocalWallet::new(&mut rng()); + new_test_client_with_wallet_and_history_sync_url(wallet, Some(HISTORY_SYNC_URL.to_string())) + .await + } + impl FfiConversation { #[cfg(test)] async fn update_installations(&self) -> Result<(), GroupError> { @@ -2414,10 +2497,10 @@ mod tests { let bo = new_test_client().await; // Stream all group messages - let message_callbacks = RustStreamCallback::default(); + let message_callbacks = Arc::new(RustStreamCallback::default()); let stream_messages = bo .conversations() - .stream_all_messages(Arc::new(message_callbacks.clone())) + .stream_all_messages(message_callbacks.clone()) .await; stream_messages.wait_for_ready().await; @@ -2706,10 +2789,10 @@ mod tests { let caro = new_test_client().await; // Alix begins a stream for all messages - let message_callbacks = RustStreamCallback::default(); + let message_callbacks = Arc::new(RustStreamCallback::default()); let stream_messages = alix .conversations() - .stream_all_messages(Arc::new(message_callbacks.clone())) + .stream_all_messages(message_callbacks.clone()) .await; stream_messages.wait_for_ready().await; @@ -2753,10 +2836,10 @@ mod tests { let bo2 = new_test_client_with_wallet(bo_wallet).await; // Bo begins a stream for all messages - let bo_message_callbacks = RustStreamCallback::default(); + let bo_message_callbacks = Arc::new(RustStreamCallback::default()); let bo_stream_messages = bo2 .conversations() - .stream_all_messages(Arc::new(bo_message_callbacks.clone())) + .stream_all_messages(bo_message_callbacks.clone()) .await; bo_stream_messages.wait_for_ready().await; @@ -3012,10 +3095,10 @@ mod tests { let bo = new_test_client().await; // Stream all group messages - let message_callbacks = RustStreamCallback::default(); + let message_callbacks = Arc::new(RustStreamCallback::default()); let stream_messages = bo .conversations() - .stream_all_messages(Arc::new(message_callbacks.clone())) + .stream_all_messages(message_callbacks.clone()) .await; stream_messages.wait_for_ready().await; @@ -3085,12 +3168,9 @@ mod tests { let amal = new_test_client().await; let bola = new_test_client().await; - let stream_callback = RustStreamCallback::default(); + let stream_callback = Arc::new(RustStreamCallback::default()); - let stream = bola - .conversations() - .stream(Arc::new(stream_callback.clone())) - .await; + let stream = bola.conversations().stream(stream_callback.clone()).await; amal.conversations() .create_group( @@ -3136,11 +3216,11 @@ mod tests { .await .unwrap(); - let stream_callback = RustStreamCallback::default(); + let stream_callback = Arc::new(RustStreamCallback::default()); let stream = caro .conversations() - .stream_all_messages(Arc::new(stream_callback.clone())) + .stream_all_messages(stream_callback.clone()) .await; stream.wait_for_ready().await; @@ -3188,8 +3268,8 @@ mod tests { bola.inner_client.sync_welcomes(&bola_conn).await.unwrap(); let bola_group = bola.conversation(amal_group.id()).unwrap(); - let stream_callback = RustStreamCallback::default(); - let stream_closer = bola_group.stream(Arc::new(stream_callback.clone())).await; + let stream_callback = Arc::new(RustStreamCallback::default()); + let stream_closer = bola_group.stream(stream_callback.clone()).await; stream_closer.wait_for_ready().await; @@ -3225,10 +3305,10 @@ mod tests { .await .unwrap(); - let stream_callback = RustStreamCallback::default(); + let stream_callback = Arc::new(RustStreamCallback::default()); let stream_closer = bola .conversations() - .stream_all_messages(Arc::new(stream_callback.clone())) + .stream_all_messages(stream_callback.clone()) .await; stream_closer.wait_for_ready().await; @@ -3318,16 +3398,13 @@ mod tests { let bo = new_test_client().await; // Stream all group messages - let message_callback = RustStreamCallback::default(); - let group_callback = RustStreamCallback::default(); - let stream_groups = bo - .conversations() - .stream(Arc::new(group_callback.clone())) - .await; + let message_callback = Arc::new(RustStreamCallback::default()); + let group_callback = Arc::new(RustStreamCallback::default()); + let stream_groups = bo.conversations().stream(group_callback.clone()).await; let stream_messages = bo .conversations() - .stream_all_messages(Arc::new(message_callback.clone())) + .stream_all_messages(message_callback.clone()) .await; stream_messages.wait_for_ready().await; @@ -3847,11 +3924,8 @@ mod tests { let bo = new_test_client().await; // Stream all conversations - let stream_callback = RustStreamCallback::default(); - let stream = bo - .conversations() - .stream(Arc::new(stream_callback.clone())) - .await; + let stream_callback = Arc::new(RustStreamCallback::default()); + let stream = bo.conversations().stream(stream_callback.clone()).await; alix.conversations() .create_group( @@ -3876,10 +3950,10 @@ mod tests { assert!(stream.is_closed()); // Stream just groups - let stream_callback = RustStreamCallback::default(); + let stream_callback = Arc::new(RustStreamCallback::default()); let stream = bo .conversations() - .stream_groups(Arc::new(stream_callback.clone())) + .stream_groups(stream_callback.clone()) .await; alix.conversations() @@ -3905,11 +3979,8 @@ mod tests { assert!(stream.is_closed()); // Stream just dms - let stream_callback = RustStreamCallback::default(); - let stream = bo - .conversations() - .stream_dms(Arc::new(stream_callback.clone())) - .await; + let stream_callback = Arc::new(RustStreamCallback::default()); + let stream = bo.conversations().stream_dms(stream_callback.clone()).await; alix.conversations() .create_dm(bo.account_address.clone()) @@ -3954,10 +4025,10 @@ mod tests { .unwrap(); // Stream all conversations - let stream_callback = RustStreamCallback::default(); + let stream_callback = Arc::new(RustStreamCallback::default()); let stream = bo .conversations() - .stream_all_messages(Arc::new(stream_callback.clone())) + .stream_all_messages(stream_callback.clone()) .await; stream.wait_for_ready().await; @@ -3973,10 +4044,10 @@ mod tests { assert!(stream.is_closed()); // Stream just groups - let stream_callback = RustStreamCallback::default(); + let stream_callback = Arc::new(RustStreamCallback::default()); let stream = bo .conversations() - .stream_all_group_messages(Arc::new(stream_callback.clone())) + .stream_all_group_messages(stream_callback.clone()) .await; stream.wait_for_ready().await; @@ -3993,10 +4064,10 @@ mod tests { assert!(stream.is_closed()); // Stream just dms - let stream_callback = RustStreamCallback::default(); + let stream_callback = Arc::new(RustStreamCallback::default()); let stream = bo .conversations() - .stream_all_dm_messages(Arc::new(stream_callback.clone())) + .stream_all_dm_messages(stream_callback.clone()) .await; stream.wait_for_ready().await; @@ -4016,6 +4087,75 @@ mod tests { assert!(stream.is_closed()); } + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + async fn test_stream_consent() { + let wallet = generate_local_wallet(); + let alix_a = new_test_client_with_wallet_and_history(wallet.clone()).await; + let alix_b = new_test_client_with_wallet_and_history(wallet).await; + let bo = new_test_client_with_history().await; + + // have alix_a pull down the new sync group created by alix_b + assert!(alix_a.conversations().sync().await.is_ok()); + + // check that they have the same sync group + let sync_group_a = alix_a.conversations().get_sync_group().unwrap(); + let sync_group_b = alix_b.conversations().get_sync_group().unwrap(); + assert_eq!(sync_group_a.id(), sync_group_b.id()); + + // create a stream from both installations + let stream_a_callback = Arc::new(RustStreamCallback::default()); + let stream_b_callback = Arc::new(RustStreamCallback::default()); + let a_stream = alix_a + .conversations() + .stream_consent(stream_a_callback.clone()) + .await; + let b_stream = alix_b + .conversations() + .stream_consent(stream_b_callback.clone()) + .await; + a_stream.wait_for_ready().await; + b_stream.wait_for_ready().await; + + // consent with bo + alix_a + .set_consent_states(vec![FfiConsent { + entity: bo.account_address.clone(), + entity_type: FfiConsentEntityType::Address, + state: FfiConsentState::Allowed, + }]) + .await + .unwrap(); + + let result = stream_a_callback.wait_for_delivery(Some(3)).await; + assert!(result.is_ok()); + + let start = Instant::now(); + loop { + // update the sync group's messages to pipe them into the events + alix_b + .conversations() + .sync_all_conversations() + .await + .unwrap(); + + if stream_b_callback.wait_for_delivery(Some(1)).await.is_ok() { + break; + } + + if start.elapsed() > Duration::from_secs(5) { + panic!("Timed out while waiting for alix_b consent updates."); + } + } + + // two outgoing consent updates + assert_eq!(stream_a_callback.consent_updates_count(), 2); + // and two incoming consent updates + assert_eq!(stream_b_callback.consent_updates_count(), 2); + + a_stream.end_and_wait().await.unwrap(); + b_stream.end_and_wait().await.unwrap(); + } + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] async fn test_set_and_get_group_consent() { let alix = new_test_client().await; diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index f3ea97435..d025705d9 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -429,7 +429,7 @@ where let mut records = records.to_vec(); records.append(&mut new_records); self.local_events - .send(LocalEvents::ConsentUpdate(records)) + .send(LocalEvents::OutgoingConsentUpdates(records)) .map_err(|e| ClientError::Generic(e.to_string()))?; } diff --git a/xmtp_mls/src/groups/device_sync.rs b/xmtp_mls/src/groups/device_sync.rs index f8161d87c..872a2e53e 100644 --- a/xmtp_mls/src/groups/device_sync.rs +++ b/xmtp_mls/src/groups/device_sync.rs @@ -210,11 +210,16 @@ where } } }, - LocalEvents::ConsentUpdate(consent_records) => { + LocalEvents::OutgoingConsentUpdates(consent_records) => { for consent_record in consent_records { self.send_consent_update(&provider, &consent_record).await?; } } + LocalEvents::IncomingConsentUpdates(consent_records) => { + let conn = provider.conn_ref(); + + conn.insert_or_replace_consent_records(&consent_records)?; + } _ => {} } } @@ -567,7 +572,9 @@ where if existing_consent_record.state != consent_record.state { warn!("Existing consent record exists and does not match payload state. Streaming consent_record update to sync group."); self.local_events - .send(LocalEvents::ConsentUpdate(vec![existing_consent_record])) + .send(LocalEvents::OutgoingConsentUpdates(vec![ + existing_consent_record, + ])) .map_err(|e| DeviceSyncError::Generic(e.to_string()))?; } } diff --git a/xmtp_mls/src/groups/mls_sync.rs b/xmtp_mls/src/groups/mls_sync.rs index cab052e72..32e6591a1 100644 --- a/xmtp_mls/src/groups/mls_sync.rs +++ b/xmtp_mls/src/groups/mls_sync.rs @@ -549,80 +549,82 @@ where Some(Content::V2(V2 { idempotency_key, message_type, - })) => match message_type { - Some(MessageType::DeviceSyncRequest(history_request)) => { - let content: DeviceSyncContent = - DeviceSyncContent::Request(history_request); - let content_bytes = serde_json::to_vec(&content)?; - let message_id = calculate_message_id( - &self.group_id, - &content_bytes, - &idempotency_key, - ); - - // store the request message - StoredGroupMessage { - id: message_id.clone(), - group_id: self.group_id.clone(), - decrypted_message_bytes: content_bytes, - sent_at_ns: envelope_timestamp_ns as i64, - kind: GroupMessageKind::Application, - sender_installation_id, - sender_inbox_id: sender_inbox_id.clone(), - delivery_status: DeliveryStatus::Published, + })) => { + match message_type { + Some(MessageType::DeviceSyncRequest(history_request)) => { + let content: DeviceSyncContent = + DeviceSyncContent::Request(history_request); + let content_bytes = serde_json::to_vec(&content)?; + let message_id = calculate_message_id( + &self.group_id, + &content_bytes, + &idempotency_key, + ); + + // store the request message + StoredGroupMessage { + id: message_id.clone(), + group_id: self.group_id.clone(), + decrypted_message_bytes: content_bytes, + sent_at_ns: envelope_timestamp_ns as i64, + kind: GroupMessageKind::Application, + sender_installation_id, + sender_inbox_id: sender_inbox_id.clone(), + delivery_status: DeliveryStatus::Published, + } + .store_or_ignore(provider.conn_ref())?; + + tracing::info!("Received a history request."); + let _ = self.client.local_events().send(LocalEvents::SyncMessage( + SyncMessage::Request { message_id }, + )); } - .store_or_ignore(provider.conn_ref())?; - - tracing::info!("Received a history request."); - let _ = self.client.local_events().send(LocalEvents::SyncMessage( - SyncMessage::Request { message_id }, - )); - } - - Some(MessageType::DeviceSyncReply(history_reply)) => { - let content: DeviceSyncContent = - DeviceSyncContent::Reply(history_reply); - let content_bytes = serde_json::to_vec(&content)?; - let message_id = calculate_message_id( - &self.group_id, - &content_bytes, - &idempotency_key, - ); - // store the reply message - StoredGroupMessage { - id: message_id.clone(), - group_id: self.group_id.clone(), - decrypted_message_bytes: content_bytes, - sent_at_ns: envelope_timestamp_ns as i64, - kind: GroupMessageKind::Application, - sender_installation_id, - sender_inbox_id, - delivery_status: DeliveryStatus::Published, + Some(MessageType::DeviceSyncReply(history_reply)) => { + let content: DeviceSyncContent = + DeviceSyncContent::Reply(history_reply); + let content_bytes = serde_json::to_vec(&content)?; + let message_id = calculate_message_id( + &self.group_id, + &content_bytes, + &idempotency_key, + ); + + // store the reply message + StoredGroupMessage { + id: message_id.clone(), + group_id: self.group_id.clone(), + decrypted_message_bytes: content_bytes, + sent_at_ns: envelope_timestamp_ns as i64, + kind: GroupMessageKind::Application, + sender_installation_id, + sender_inbox_id, + delivery_status: DeliveryStatus::Published, + } + .store_or_ignore(provider.conn_ref())?; + + tracing::info!("Received a history reply."); + let _ = self.client.local_events().send(LocalEvents::SyncMessage( + SyncMessage::Reply { message_id }, + )); + } + Some(MessageType::ConsentUpdate(update)) => { + tracing::info!( + "Incoming streamed consent update: {:?} {} updated to {:?}.", + update.entity_type(), + update.entity, + update.state() + ); + + let _ = self.client.local_events().send( + LocalEvents::IncomingConsentUpdates(vec![update.try_into()?]), + ); + } + _ => { + return Err(GroupMessageProcessingError::InvalidPayload); } - .store_or_ignore(provider.conn_ref())?; - - tracing::info!("Received a history reply."); - let _ = self - .client - .local_events() - .send(LocalEvents::SyncMessage(SyncMessage::Reply { message_id })); - } - Some(MessageType::ConsentUpdate(update)) => { - tracing::info!( - "Incoming streamed consent update: {:?} {} updated to {:?}.", - update.entity_type(), - update.entity, - update.state() - ); - - let conn = provider.conn_ref(); - conn.insert_or_replace_consent_records(&[update.try_into()?])?; - } - _ => { - return Err(GroupMessageProcessingError::InvalidPayload); } - }, + } None => return Err(GroupMessageProcessingError::InvalidPayload), } } diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 05aaf1eb4..c10ee8dbd 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -1083,7 +1083,7 @@ impl MlsGroup { // Dispatch an update event so it can be synced across devices self.client .local_events() - .send(LocalEvents::ConsentUpdate(vec![consent_record])) + .send(LocalEvents::OutgoingConsentUpdates(vec![consent_record])) .map_err(|e| GroupError::Generic(e.to_string()))?; } diff --git a/xmtp_mls/src/subscriptions.rs b/xmtp_mls/src/subscriptions.rs index 4fa241650..90de8461f 100644 --- a/xmtp_mls/src/subscriptions.rs +++ b/xmtp_mls/src/subscriptions.rs @@ -51,7 +51,8 @@ pub enum LocalEvents { // a new group was created NewGroup(MlsGroup), SyncMessage(SyncMessage), - ConsentUpdate(Vec), + OutgoingConsentUpdates(Vec), + IncomingConsentUpdates(Vec), } #[derive(Clone)] @@ -70,12 +71,23 @@ impl LocalEvents { } } - pub(crate) fn sync_filter(self) -> Option { + fn sync_filter(self) -> Option { use LocalEvents::*; match &self { SyncMessage(_) => Some(self), - ConsentUpdate(_) => Some(self), + OutgoingConsentUpdates(_) => Some(self), + IncomingConsentUpdates(_) => Some(self), + _ => None, + } + } + + fn consent_filter(self) -> Option> { + use LocalEvents::*; + + match self { + OutgoingConsentUpdates(cr) => Some(cr), + IncomingConsentUpdates(cr) => Some(cr), _ => None, } } @@ -83,6 +95,9 @@ impl LocalEvents { pub(crate) trait StreamMessages { fn stream_sync_messages(self) -> impl Stream, SubscribeError>>; + fn stream_consent_updates( + self, + ) -> impl Stream, SubscribeError>>; } impl StreamMessages for broadcast::Receiver> @@ -96,6 +111,16 @@ where .map(Result::Ok) }) } + + fn stream_consent_updates( + self, + ) -> impl Stream, SubscribeError>> { + BroadcastStream::new(self).filter_map(|event| async { + crate::optify!(event, "Missed message due to event queue lag") + .and_then(LocalEvents::consent_filter) + .map(Result::Ok) + }) + } } impl StreamHandle { @@ -429,6 +454,26 @@ where Ok::<_, ClientError>(()) }) } + + pub fn stream_consent_with_callback( + client: Arc>, + mut callback: impl FnMut(Result, SubscribeError>) + Send + 'static, + ) -> impl crate::StreamHandle> { + let (tx, rx) = oneshot::channel(); + + crate::spawn(Some(rx), async move { + let receiver = client.local_events.subscribe(); + let stream = receiver.stream_consent_updates(); + + futures::pin_mut!(stream); + let _ = tx.send(()); + while let Some(message) = stream.next().await { + callback(message) + } + tracing::debug!("`stream_consent` stream ended, dropping stream"); + Ok::<_, ClientError>(()) + }) + } } #[cfg(test)]