Skip to content

Commit

Permalink
streaming wip
Browse files Browse the repository at this point in the history
  • Loading branch information
codabrink committed Dec 16, 2024
1 parent 97eca90 commit 7ad6163
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 0 deletions.
26 changes: 26 additions & 0 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1083,6 +1083,21 @@ impl FfiConversations {

FfiStreamCloser::new(handle)
}

pub async fn stream_preferences(
&self,
callback: Arc<dyn FfiPreferenceCallback>,
) -> FfiStreamCloser {
let handle =
RustXmtpClient::stream_consent_with_callback(self.inner_client.clone(), move |msg| {
match msg {
Ok(m) => callback.on_preference_update(m.into_iter().map(Into::into).collect()),
Err(e) => callback.on_error(e.into()),
}
});

FfiStreamCloser::new(handle)
}
}

impl From<FfiConversationType> for ConversationType {
Expand Down Expand Up @@ -1755,6 +1770,17 @@ pub trait FfiConsentCallback: Send + Sync {
fn on_error(&self, error: FfiSubscribeError);
}

#[uniffi::export(with_foreign)]
pub trait FfiPreferenceCallback: Send + Sync {
fn on_preference_update(&self, preference: Vec<FfiPreferenceUpdate>);
fn on_error(&self, error: FfiSubscribeError);
}

#[derive(uniffi::Enum)]
pub enum FfiPreferenceUpdate {
HMAC { key: Vec<u8> },
}

#[derive(uniffi::Object)]
pub struct FfiConversationMetadata {
inner: Arc<GroupMetadata>,
Expand Down
61 changes: 61 additions & 0 deletions xmtp_mls/src/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,44 @@ impl<C> LocalEvents<C> {
_ => None,
}
}

fn preference_filter(self) -> Option<Vec<UserPreferenceUpdate>> {
use LocalEvents::*;

match self {
OutgoingPreferenceUpdates(updates) => {
let updates = updates
.into_iter()
.filter_map(|pu| match pu {
UserPreferenceUpdate::ConsentUpdate(_) => None,
_ => Some(pu),
})
.collect();
Some(updates)
}
IncomingPreferenceUpdate(updates) => {
let updates = updates
.into_iter()
.filter_map(|pu| match pu {
UserPreferenceUpdate::ConsentUpdate(_) => None,
_ => Some(pu),
})
.collect();
Some(updates)
}
_ => None,
}
}
}

pub(crate) trait StreamMessages<C> {
fn stream_sync_messages(self) -> impl Stream<Item = Result<LocalEvents<C>, SubscribeError>>;
fn stream_consent_updates(
self,
) -> impl Stream<Item = Result<Vec<StoredConsentRecord>, SubscribeError>>;
fn stream_preference_updates(
self,
) -> impl Stream<Item = Result<Vec<UserPreferenceUpdate>, SubscribeError>>;
}

impl<C> StreamMessages<C> for broadcast::Receiver<LocalEvents<C>>
Expand All @@ -144,6 +175,16 @@ where
.map(Result::Ok)
})
}

fn stream_preference_updates(
self,
) -> impl Stream<Item = Result<Vec<UserPreferenceUpdate>, SubscribeError>> {
BroadcastStream::new(self).filter_map(|event| async {
xmtp_common::optify!(event, "Missed message due to event queue lag")
.and_then(LocalEvents::preference_filter)
.map(Result::Ok)
})
}
}

impl<T> StreamHandle<T> {
Expand Down Expand Up @@ -521,6 +562,26 @@ where
Ok::<_, ClientError>(())
})
}

pub fn stream_preferences_with_callback(
client: Arc<Client<ApiClient, V>>,
mut callback: impl FnMut(Result<Vec<UserPreferenceUpdate>, SubscribeError>) + Send + 'static,
) -> impl crate::StreamHandle<StreamOutput = Result<(), ClientError>> {
let (tx, rx) = oneshot::channel();

crate::spawn(Some(rx), async move {
let receiver = client.local_events.subscribe();
let stream = receiver.stream_preference_updates();

futures::pin_mut!(stream);
let _ = tx.send(());
while let Some(message) = stream.next().await {
callback(message)
}
tracing::debug!("`stream_consent` stream ended, dropping stream");
Ok::<_, ClientError>(())
})
}
}

#[cfg(test)]
Expand Down

0 comments on commit 7ad6163

Please sign in to comment.