From 1f87b0f8ddb203f73bb4b11529dba42145a061e1 Mon Sep 17 00:00:00 2001 From: boxdot Date: Tue, 7 Nov 2023 17:24:11 +0100 Subject: [PATCH] feat: Add receiving mode to the message stream (#202) Add a new method `Manager::receive_messages_with_mode` allowing to configure how to handle specific sentinel messages. It is supported to stop the stream after the initial sync (empty Signal message queue) or after contact sync. --- presage/Cargo.toml | 4 +- presage/src/lib.rs | 4 +- presage/src/manager.rs | 84 ++++++++++++++++++++++++++++++++---------- 3 files changed, 69 insertions(+), 23 deletions(-) diff --git a/presage/Cargo.toml b/presage/Cargo.toml index 24e070d2b..09d707416 100644 --- a/presage/Cargo.toml +++ b/presage/Cargo.toml @@ -6,8 +6,8 @@ authors = ["Gabriel FĂ©ron "] edition = "2021" [dependencies] -libsignal-service = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "454d234" } -libsignal-service-hyper = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "454d234" } +libsignal-service = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "a36d43d62" } +libsignal-service-hyper = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "a36d43d62" } base64 = "0.21" futures = "0.3" diff --git a/presage/src/lib.rs b/presage/src/lib.rs index 60a9b29c4..753aa5516 100644 --- a/presage/src/lib.rs +++ b/presage/src/lib.rs @@ -5,7 +5,9 @@ mod serde; mod store; pub use errors::Error; -pub use manager::{Confirmation, Linking, Manager, Registered, Registration, RegistrationOptions}; +pub use manager::{ + Confirmation, Linking, Manager, ReceivingMode, Registered, Registration, RegistrationOptions, +}; pub use store::{ContentTimestamp, Store, StoreError, Thread}; #[deprecated(note = "Please help use improve the prelude module instead")] diff --git a/presage/src/manager.rs b/presage/src/manager.rs index 824d60508..b03452cc4 100644 --- a/presage/src/manager.rs +++ b/presage/src/manager.rs @@ -16,7 +16,6 @@ use rand::{ use serde::{Deserialize, Serialize}; use url::Url; -use libsignal_service::proto::EditMessage; use libsignal_service::push_service::{RegistrationMethod, VerificationTransport}; use libsignal_service::{ attachment_cipher::decrypt_in_place, @@ -27,10 +26,7 @@ use libsignal_service::{ messagepipe::ServiceCredentials, models::Contact, prelude::{phonenumber::PhoneNumber, Content, ProfileKey, PushService, Uuid}, - proto::{ - data_message::Delete, sync_message, AttachmentPointer, Envelope, GroupContextV2, - NullMessage, - }, + proto::{data_message::Delete, sync_message, AttachmentPointer, GroupContextV2, NullMessage}, protocol::{KeyPair, PrivateKey, PublicKey, SenderCertificate}, provisioning::{generate_registration_id, LinkingManager, SecondaryDeviceProvisioning}, push_service::{ @@ -47,6 +43,7 @@ use libsignal_service::{ websocket::SignalWebSocket, AccountManager, Profile, ServiceAddress, }; +use libsignal_service::{messagepipe::Incoming, proto::EditMessage}; use libsignal_service_hyper::push_service::HyperPushService; use crate::cache::CacheCell; @@ -656,13 +653,14 @@ impl Manager { } async fn sync_contacts(&mut self) -> Result<(), Error> { - let messages = self.receive_messages_stream(true).await?; - pin_mut!(messages); - + let messages = self + .receive_messages_stream(ReceivingMode::WaitForContacts) + .await?; self.request_contacts_sync().await?; info!("waiting for contacts sync for up to 60 seconds"); + pin_mut!(messages); tokio::time::timeout( Duration::from_secs(60), self.wait_for_contacts_sync(messages), @@ -692,7 +690,6 @@ impl Manager { .expect("Time went backwards") .as_millis() as u64; - // first request the sync self.send_message(self.state.service_ids.aci, sync_message, timestamp) .await?; @@ -829,7 +826,7 @@ impl Manager { async fn receive_messages_encrypted( &mut self, - ) -> Result>, Error> { + ) -> Result>, Error> { let credentials = self.credentials()?.ok_or(Error::NotYetRegisteredError)?; let allow_stories = false; let pipe = MessageReceiver::new(self.push_service()?) @@ -857,7 +854,14 @@ impl Manager { pub async fn receive_messages( &mut self, ) -> Result, Error> { - self.receive_messages_stream(false).await + self.receive_messages_stream(ReceivingMode::Forever).await + } + + pub async fn receive_messages_with_mode( + &mut self, + mode: ReceivingMode, + ) -> Result, Error> { + self.receive_messages_stream(mode).await } fn groups_manager( @@ -879,40 +883,60 @@ impl Manager { async fn receive_messages_stream( &mut self, - include_internal_events: bool, + mode: ReceivingMode, ) -> Result, Error> { struct StreamState { encrypted_messages: S, + message_receiver: MessageReceiver, service_cipher: ServiceCipher, config_store: C, groups_manager: GroupsManager, - include_internal_events: bool, + mode: ReceivingMode, } let init = StreamState { encrypted_messages: Box::pin(self.receive_messages_encrypted().await?), + message_receiver: MessageReceiver::new(self.push_service()?), service_cipher: self.new_service_cipher()?, config_store: self.config_store.clone(), groups_manager: self.groups_manager()?, - include_internal_events, + mode, }; Ok(futures::stream::unfold(init, |mut state| async move { loop { match state.encrypted_messages.next().await { - Some(Ok(envelope)) => { + Some(Ok(Incoming::Envelope(envelope))) => { match state.service_cipher.open_envelope(envelope).await { Ok(Some(content)) => { // contacts synchronization sent from the primary device (happens after linking, or on demand) if let ContentBody::SynchronizeMessage(SyncMessage { - contacts: Some(_), + contacts: Some(contacts), .. }) = &content.body { - if state.include_internal_events { - return Some((content, state)); - } else { - continue; + match state.message_receiver.retrieve_contacts(contacts).await { + Ok(contacts) => { + let _ = state.config_store.clear_contacts(); + match state + .config_store + .save_contacts(contacts.filter_map(Result::ok)) + { + Ok(()) => { + info!("saved contacts"); + } + Err(e) => { + warn!("failed to save contacts: {e}"); + } + } + } + Err(e) => { + warn!("failed to retrieve contacts: {e}"); + } + } + + if let ReceivingMode::WaitForContacts = state.mode { + return None; } } @@ -974,6 +998,12 @@ impl Manager { } } } + Some(Ok(Incoming::QueueEmpty)) => { + debug!("empty queue"); + if let ReceivingMode::InitialSync = state.mode { + return None; + } + } Some(Err(e)) => error!("Error: {}", e), None => return None, } @@ -1313,6 +1343,20 @@ impl Manager { } } +/// The mode receiving messages stream +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] +pub enum ReceivingMode { + /// Don't stop the stream + #[default] + Forever, + /// Stop the stream after the initial sync + /// + /// That is, when the Signal's message queue becomes empty. + InitialSync, + /// Stop the stream after contacts are synced + WaitForContacts, +} + async fn upsert_group( config_store: &C, groups_manager: &mut GroupsManager,