From acf79a4d240ee123db26122ad16cd2221ef618cf Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Mon, 28 Oct 2024 15:09:50 +0100 Subject: [PATCH] refactor(base): rename all send-queue related "events" to "requests" Changelog: Renamed all the send-queue related "events" to "requests", so as to generalize usage of the send queue to not-events (e.g. medias, redactions, etc.). --- bindings/matrix-sdk-ffi/src/client.rs | 2 +- .../src/store/integration_tests.rs | 72 +-- .../matrix-sdk-base/src/store/memory_store.rs | 42 +- crates/matrix-sdk-base/src/store/mod.rs | 4 +- .../matrix-sdk-base/src/store/send_queue.rs | 50 +- crates/matrix-sdk-base/src/store/traits.rs | 120 ++--- .../src/state_store/mod.rs | 126 ++--- .../005_send_queue_dependent_events.sql | 2 +- crates/matrix-sdk-sqlite/src/state_store.rs | 46 +- crates/matrix-sdk/src/send_queue.rs | 430 ++++++++++-------- .../tests/integration/send_queue.rs | 2 +- 11 files changed, 466 insertions(+), 430 deletions(-) diff --git a/bindings/matrix-sdk-ffi/src/client.rs b/bindings/matrix-sdk-ffi/src/client.rs index 5e928e571f5..be6176a58df 100644 --- a/bindings/matrix-sdk-ffi/src/client.rs +++ b/bindings/matrix-sdk-ffi/src/client.rs @@ -503,7 +503,7 @@ impl Client { Arc::new(TaskHandle::new(RUNTIME.spawn(async move { // Respawn tasks for rooms that had unsent events. At this point we've just // created the subscriber, so it'll be notified about errors. - q.respawn_tasks_for_rooms_with_unsent_events().await; + q.respawn_tasks_for_rooms_with_unsent_requests().await; loop { match subscriber.recv().await { diff --git a/crates/matrix-sdk-base/src/store/integration_tests.rs b/crates/matrix-sdk-base/src/store/integration_tests.rs index 1d8933dba0c..d83eef9793e 100644 --- a/crates/matrix-sdk-base/src/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/store/integration_tests.rs @@ -33,7 +33,7 @@ use ruma::{ }; use serde_json::{json, value::Value as JsonValue}; -use super::{DependentQueuedEventKind, DynStateStore, ServerCapabilities}; +use super::{DependentQueuedRequestKind, DynStateStore, ServerCapabilities}; use crate::{ deserialized_responses::MemberEvent, store::{ChildTransactionId, QueueWedgeError, Result, SerializableEventContent, StateStoreExt}, @@ -1202,7 +1202,7 @@ impl StateStoreIntegrationTests for DynStateStore { let room_id = room_id!("!test_send_queue:localhost"); // No queued event in store at first. - let events = self.load_send_queue_events(room_id).await.unwrap(); + let events = self.load_send_queue_requests(room_id).await.unwrap(); assert!(events.is_empty()); // Saving one thing should work. @@ -1210,10 +1210,10 @@ impl StateStoreIntegrationTests for DynStateStore { let event0 = SerializableEventContent::new(&RoomMessageEventContent::text_plain("msg0").into()) .unwrap(); - self.save_send_queue_event(room_id, txn0.clone(), event0).await.unwrap(); + self.save_send_queue_request(room_id, txn0.clone(), event0).await.unwrap(); // Reading it will work. - let pending = self.load_send_queue_events(room_id).await.unwrap(); + let pending = self.load_send_queue_requests(room_id).await.unwrap(); assert_eq!(pending.len(), 1); { @@ -1234,11 +1234,11 @@ impl StateStoreIntegrationTests for DynStateStore { ) .unwrap(); - self.save_send_queue_event(room_id, txn, event).await.unwrap(); + self.save_send_queue_request(room_id, txn, event).await.unwrap(); } // Reading all the events should work. - let pending = self.load_send_queue_events(room_id).await.unwrap(); + let pending = self.load_send_queue_requests(room_id).await.unwrap(); // All the events should be retrieved, in the same order. assert_eq!(pending.len(), 4); @@ -1254,7 +1254,7 @@ impl StateStoreIntegrationTests for DynStateStore { // Marking an event as wedged works. let txn2 = &pending[2].transaction_id; - self.update_send_queue_event_status( + self.update_send_queue_request_status( room_id, txn2, Some(QueueWedgeError::GenericApiError { msg: "Oops".to_owned() }), @@ -1263,7 +1263,7 @@ impl StateStoreIntegrationTests for DynStateStore { .unwrap(); // And it is reflected. - let pending = self.load_send_queue_events(room_id).await.unwrap(); + let pending = self.load_send_queue_requests(room_id).await.unwrap(); // All the events should be retrieved, in the same order. assert_eq!(pending.len(), 4); @@ -1284,10 +1284,10 @@ impl StateStoreIntegrationTests for DynStateStore { &RoomMessageEventContent::text_plain("wow that's a cool test").into(), ) .unwrap(); - self.update_send_queue_event(room_id, txn2, event0).await.unwrap(); + self.update_send_queue_request(room_id, txn2, event0).await.unwrap(); // And it is reflected. - let pending = self.load_send_queue_events(room_id).await.unwrap(); + let pending = self.load_send_queue_requests(room_id).await.unwrap(); assert_eq!(pending.len(), 4); { @@ -1311,10 +1311,10 @@ impl StateStoreIntegrationTests for DynStateStore { } // Removing an event works. - self.remove_send_queue_event(room_id, &txn0).await.unwrap(); + self.remove_send_queue_request(room_id, &txn0).await.unwrap(); // And it is reflected. - let pending = self.load_send_queue_events(room_id).await.unwrap(); + let pending = self.load_send_queue_requests(room_id).await.unwrap(); assert_eq!(pending.len(), 3); assert_eq!(pending[1].transaction_id, *txn2); @@ -1332,7 +1332,7 @@ impl StateStoreIntegrationTests for DynStateStore { let event = SerializableEventContent::new(&RoomMessageEventContent::text_plain("room2").into()) .unwrap(); - self.save_send_queue_event(room_id2, txn.clone(), event).await.unwrap(); + self.save_send_queue_request(room_id2, txn.clone(), event).await.unwrap(); } // Add and remove one event for room3. @@ -1342,14 +1342,14 @@ impl StateStoreIntegrationTests for DynStateStore { let event = SerializableEventContent::new(&RoomMessageEventContent::text_plain("room3").into()) .unwrap(); - self.save_send_queue_event(room_id3, txn.clone(), event).await.unwrap(); + self.save_send_queue_request(room_id3, txn.clone(), event).await.unwrap(); - self.remove_send_queue_event(room_id3, &txn).await.unwrap(); + self.remove_send_queue_request(room_id3, &txn).await.unwrap(); } // Query all the rooms which have unsent events. Per the previous steps, // it should be room1 and room2, not room3. - let outstanding_rooms = self.load_rooms_with_unsent_events().await.unwrap(); + let outstanding_rooms = self.load_rooms_with_unsent_requests().await.unwrap(); assert_eq!(outstanding_rooms.len(), 2); assert!(outstanding_rooms.iter().any(|room| room == room_id)); assert!(outstanding_rooms.iter().any(|room| room == room_id2)); @@ -1363,53 +1363,53 @@ impl StateStoreIntegrationTests for DynStateStore { let event0 = SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into()) .unwrap(); - self.save_send_queue_event(room_id, txn0.clone(), event0).await.unwrap(); + self.save_send_queue_request(room_id, txn0.clone(), event0).await.unwrap(); // No dependents, to start with. - assert!(self.load_dependent_send_queue_events(room_id).await.unwrap().is_empty()); + assert!(self.load_dependent_queued_requests(room_id).await.unwrap().is_empty()); // Save a redaction for that event. let child_txn = ChildTransactionId::new(); - self.save_dependent_send_queue_event( + self.save_dependent_queued_request( room_id, &txn0, child_txn.clone(), - DependentQueuedEventKind::Redact, + DependentQueuedRequestKind::RedactEvent, ) .await .unwrap(); // It worked. - let dependents = self.load_dependent_send_queue_events(room_id).await.unwrap(); + let dependents = self.load_dependent_queued_requests(room_id).await.unwrap(); assert_eq!(dependents.len(), 1); assert_eq!(dependents[0].parent_transaction_id, txn0); assert_eq!(dependents[0].own_transaction_id, child_txn); assert!(dependents[0].event_id.is_none()); - assert_matches!(dependents[0].kind, DependentQueuedEventKind::Redact); + assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent); // Update the event id. let event_id = owned_event_id!("$1"); let num_updated = - self.update_dependent_send_queue_event(room_id, &txn0, event_id.clone()).await.unwrap(); + self.update_dependent_queued_request(room_id, &txn0, event_id.clone()).await.unwrap(); assert_eq!(num_updated, 1); // It worked. - let dependents = self.load_dependent_send_queue_events(room_id).await.unwrap(); + let dependents = self.load_dependent_queued_requests(room_id).await.unwrap(); assert_eq!(dependents.len(), 1); assert_eq!(dependents[0].parent_transaction_id, txn0); assert_eq!(dependents[0].own_transaction_id, child_txn); assert_eq!(dependents[0].event_id.as_ref(), Some(&event_id)); - assert_matches!(dependents[0].kind, DependentQueuedEventKind::Redact); + assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent); // Now remove it. let removed = self - .remove_dependent_send_queue_event(room_id, &dependents[0].own_transaction_id) + .remove_dependent_queued_request(room_id, &dependents[0].own_transaction_id) .await .unwrap(); assert!(removed); // It worked. - assert!(self.load_dependent_send_queue_events(room_id).await.unwrap().is_empty()); + assert!(self.load_dependent_queued_requests(room_id).await.unwrap().is_empty()); // Now, inserting a dependent event and removing the original send queue event // will NOT remove the dependent event. @@ -1417,23 +1417,23 @@ impl StateStoreIntegrationTests for DynStateStore { let event1 = SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into()) .unwrap(); - self.save_send_queue_event(room_id, txn1.clone(), event1).await.unwrap(); + self.save_send_queue_request(room_id, txn1.clone(), event1).await.unwrap(); - self.save_dependent_send_queue_event( + self.save_dependent_queued_request( room_id, &txn0, ChildTransactionId::new(), - DependentQueuedEventKind::Redact, + DependentQueuedRequestKind::RedactEvent, ) .await .unwrap(); - assert_eq!(self.load_dependent_send_queue_events(room_id).await.unwrap().len(), 1); + assert_eq!(self.load_dependent_queued_requests(room_id).await.unwrap().len(), 1); - self.save_dependent_send_queue_event( + self.save_dependent_queued_request( room_id, &txn1, ChildTransactionId::new(), - DependentQueuedEventKind::Edit { + DependentQueuedRequestKind::EditEvent { new_content: SerializableEventContent::new( &RoomMessageEventContent::text_plain("edit").into(), ) @@ -1442,14 +1442,14 @@ impl StateStoreIntegrationTests for DynStateStore { ) .await .unwrap(); - assert_eq!(self.load_dependent_send_queue_events(room_id).await.unwrap().len(), 2); + assert_eq!(self.load_dependent_queued_requests(room_id).await.unwrap().len(), 2); // Remove event0 / txn0. - let removed = self.remove_send_queue_event(room_id, &txn0).await.unwrap(); + let removed = self.remove_send_queue_request(room_id, &txn0).await.unwrap(); assert!(removed); // This has removed none of the dependent events. - let dependents = self.load_dependent_send_queue_events(room_id).await.unwrap(); + let dependents = self.load_dependent_queued_requests(room_id).await.unwrap(); assert_eq!(dependents.len(), 2); } } diff --git a/crates/matrix-sdk-base/src/store/memory_store.rs b/crates/matrix-sdk-base/src/store/memory_store.rs index 47fa167b193..e55ec14caf3 100644 --- a/crates/matrix-sdk-base/src/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/store/memory_store.rs @@ -36,9 +36,9 @@ use ruma::{ use tracing::{debug, instrument, trace, warn}; use super::{ - send_queue::{ChildTransactionId, QueuedEvent, SerializableEventContent}, + send_queue::{ChildTransactionId, QueuedRequest, SerializableEventContent}, traits::{ComposerDraft, ServerCapabilities}, - DependentQueuedEvent, DependentQueuedEventKind, QueuedRequestKind, Result, RoomInfo, + DependentQueuedRequest, DependentQueuedRequestKind, QueuedRequestKind, Result, RoomInfo, StateChanges, StateStore, StoreError, }; use crate::{ @@ -88,8 +88,8 @@ pub struct MemoryStore { >, >, custom: StdRwLock, Vec>>, - send_queue_events: StdRwLock>>, - dependent_send_queue_events: StdRwLock>>, + send_queue_events: StdRwLock>>, + dependent_send_queue_events: StdRwLock>>, } impl MemoryStore { @@ -802,19 +802,23 @@ impl StateStore for MemoryStore { Ok(()) } - async fn save_send_queue_event( + async fn save_send_queue_request( &self, room_id: &RoomId, transaction_id: OwnedTransactionId, content: SerializableEventContent, ) -> Result<(), Self::Error> { self.send_queue_events.write().unwrap().entry(room_id.to_owned()).or_default().push( - QueuedEvent { kind: QueuedRequestKind::Event { content }, transaction_id, error: None }, + QueuedRequest { + kind: QueuedRequestKind::Event { content }, + transaction_id, + error: None, + }, ); Ok(()) } - async fn update_send_queue_event( + async fn update_send_queue_request( &self, room_id: &RoomId, transaction_id: &TransactionId, @@ -837,7 +841,7 @@ impl StateStore for MemoryStore { } } - async fn remove_send_queue_event( + async fn remove_send_queue_request( &self, room_id: &RoomId, transaction_id: &TransactionId, @@ -860,14 +864,14 @@ impl StateStore for MemoryStore { Ok(false) } - async fn load_send_queue_events( + async fn load_send_queue_requests( &self, room_id: &RoomId, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { Ok(self.send_queue_events.write().unwrap().entry(room_id.to_owned()).or_default().clone()) } - async fn update_send_queue_event_status( + async fn update_send_queue_request_status( &self, room_id: &RoomId, transaction_id: &TransactionId, @@ -887,19 +891,19 @@ impl StateStore for MemoryStore { Ok(()) } - async fn load_rooms_with_unsent_events(&self) -> Result, Self::Error> { + async fn load_rooms_with_unsent_requests(&self) -> Result, Self::Error> { Ok(self.send_queue_events.read().unwrap().keys().cloned().collect()) } - async fn save_dependent_send_queue_event( + async fn save_dependent_queued_request( &self, room: &RoomId, parent_transaction_id: &TransactionId, own_transaction_id: ChildTransactionId, - content: DependentQueuedEventKind, + content: DependentQueuedRequestKind, ) -> Result<(), Self::Error> { self.dependent_send_queue_events.write().unwrap().entry(room.to_owned()).or_default().push( - DependentQueuedEvent { + DependentQueuedRequest { kind: content, parent_transaction_id: parent_transaction_id.to_owned(), own_transaction_id, @@ -909,7 +913,7 @@ impl StateStore for MemoryStore { Ok(()) } - async fn update_dependent_send_queue_event( + async fn update_dependent_queued_request( &self, room: &RoomId, parent_txn_id: &TransactionId, @@ -925,7 +929,7 @@ impl StateStore for MemoryStore { Ok(num_updated) } - async fn remove_dependent_send_queue_event( + async fn remove_dependent_queued_request( &self, room: &RoomId, txn_id: &ChildTransactionId, @@ -944,10 +948,10 @@ impl StateStore for MemoryStore { /// /// This returns absolutely all the dependent send queue events, whether /// they have an event id or not. - async fn load_dependent_send_queue_events( + async fn load_dependent_queued_requests( &self, room: &RoomId, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { Ok(self.dependent_send_queue_events.read().unwrap().get(room).cloned().unwrap_or_default()) } } diff --git a/crates/matrix-sdk-base/src/store/mod.rs b/crates/matrix-sdk-base/src/store/mod.rs index 8cc5de5dc4c..392dd6e5bb8 100644 --- a/crates/matrix-sdk-base/src/store/mod.rs +++ b/crates/matrix-sdk-base/src/store/mod.rs @@ -75,8 +75,8 @@ pub use self::integration_tests::StateStoreIntegrationTests; pub use self::{ memory_store::MemoryStore, send_queue::{ - ChildTransactionId, DependentQueuedEvent, DependentQueuedEventKind, QueueWedgeError, - QueuedEvent, QueuedRequestKind, SerializableEventContent, + ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError, + QueuedRequest, QueuedRequestKind, SerializableEventContent, }, traits::{ ComposerDraft, ComposerDraftType, DynStateStore, IntoStateStore, ServerCapabilities, diff --git a/crates/matrix-sdk-base/src/store/send_queue.rs b/crates/matrix-sdk-base/src/store/send_queue.rs index f76f6d95e3d..9a70c579641 100644 --- a/crates/matrix-sdk-base/src/store/send_queue.rs +++ b/crates/matrix-sdk-base/src/store/send_queue.rs @@ -80,7 +80,7 @@ pub enum QueuedRequestKind { /// A request to be sent with a send queue. #[derive(Clone)] -pub struct QueuedEvent { +pub struct QueuedRequest { /// The kind of queued request we're going to send. pub kind: QueuedRequestKind, @@ -94,13 +94,13 @@ pub struct QueuedEvent { pub error: Option, } -impl QueuedEvent { +impl QueuedRequest { /// Returns `Some` if the queued request is about sending an event. pub fn as_event(&self) -> Option<&SerializableEventContent> { as_variant!(&self.kind, QueuedRequestKind::Event { content } => content) } - /// True if the event couldn't be sent because of an unrecoverable API + /// True if the request couldn't be sent because of an unrecoverable API /// error. See [`Self::error`] for more details on the reason. pub fn is_wedged(&self) -> bool { self.error.is_some() @@ -145,27 +145,31 @@ pub enum QueueWedgeError { }, } -/// The specific user intent that characterizes a [`DependentQueuedEvent`]. +/// The specific user intent that characterizes a +/// [`DependentQueuedRequestKind`]. #[derive(Clone, Debug, Serialize, Deserialize)] -pub enum DependentQueuedEventKind { +pub enum DependentQueuedRequestKind { /// The event should be edited. - Edit { + #[serde(rename = "Edit")] + EditEvent { /// The new event for the content. new_content: SerializableEventContent, }, /// The event should be redacted/aborted/removed. - Redact, + #[serde(rename = "Redact")] + RedactEvent, /// The event should be reacted to, with the given key. - React { + #[serde(rename = "React")] + ReactEvent { /// Key used for the reaction. key: String, }, } -/// A transaction id identifying a [`DependentQueuedEvent`] rather than its -/// parent [`QueuedEvent`]. +/// A transaction id identifying a [`DependentQueuedRequest`] rather than its +/// parent [`QueuedRequest`]. /// /// This thin wrapper adds some safety to some APIs, making it possible to /// distinguish between the parent's `TransactionId` and the dependent event's @@ -203,42 +207,42 @@ impl From for OwnedTransactionId { } } -/// An event to be sent, depending on a [`QueuedEvent`] to be sent first. +/// A request to be sent, depending on a [`QueuedRequest`] to be sent first. /// -/// Depending on whether the event has been sent or not, this will either update -/// the local echo in the storage, or send an event equivalent to the user -/// intent to the homeserver. +/// Depending on whether the parent request has been sent or not, this will +/// either update the local echo in the storage, or materialize an equivalent +/// request implementing the user intent to the homeserver. #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct DependentQueuedEvent { - /// Unique identifier for this dependent queued event. +pub struct DependentQueuedRequest { + /// Unique identifier for this dependent queued request. /// /// Useful for deletion. pub own_transaction_id: ChildTransactionId, /// The kind of user intent. - pub kind: DependentQueuedEventKind, + pub kind: DependentQueuedRequestKind, /// Transaction id for the parent's local echo / used in the server request. /// - /// Note: this is the transaction id used for the depended-on event, i.e. + /// Note: this is the transaction id used for the depended-on request, i.e. /// the one that was originally sent and that's being modified with this - /// dependent event. + /// dependent request. pub parent_transaction_id: OwnedTransactionId, - /// If the parent event has been sent, the parent's event identifier + /// If the parent request has been sent, the parent's request identifier /// returned by the server once the local echo has been sent out. /// /// Note: this is the event id used for the depended-on event after it's /// been sent, not for a possible event that could have been sent - /// because of this [`DependentQueuedEvent`]. + /// because of this [`DependentQueuedRequest`]. pub event_id: Option, } #[cfg(not(tarpaulin_include))] -impl fmt::Debug for QueuedEvent { +impl fmt::Debug for QueuedRequest { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // Hide the content from the debug log. - f.debug_struct("QueuedEvent") + f.debug_struct("QueuedRequest") .field("transaction_id", &self.transaction_id) .field("is_wedged", &self.is_wedged()) .finish_non_exhaustive() diff --git a/crates/matrix-sdk-base/src/store/traits.rs b/crates/matrix-sdk-base/src/store/traits.rs index 50f2a52fae4..e3ef3c9de59 100644 --- a/crates/matrix-sdk-base/src/store/traits.rs +++ b/crates/matrix-sdk-base/src/store/traits.rs @@ -41,8 +41,8 @@ use ruma::{ use serde::{Deserialize, Serialize}; use super::{ - ChildTransactionId, DependentQueuedEvent, DependentQueuedEventKind, QueueWedgeError, - QueuedEvent, SerializableEventContent, StateChanges, StoreError, + ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError, + QueuedRequest, SerializableEventContent, StateChanges, StoreError, }; use crate::{ deserialized_responses::{RawAnySyncOrStrippedState, RawMemberEvent, RawSyncOrStrippedState}, @@ -343,7 +343,7 @@ pub trait StateStore: AsyncTraitDeps { /// * `room_id` - The `RoomId` of the room to delete. async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error>; - /// Save an event to be sent by a send queue later. + /// Save a request to be sent by a send queue later (e.g. sending an event). /// /// # Arguments /// @@ -352,98 +352,100 @@ pub trait StateStore: AsyncTraitDeps { /// (and its transaction). Note: this is expected to be randomly generated /// and thus unique. /// * `content` - Serializable event content to be sent. - async fn save_send_queue_event( + async fn save_send_queue_request( &self, room_id: &RoomId, transaction_id: OwnedTransactionId, content: SerializableEventContent, ) -> Result<(), Self::Error>; - /// Updates a send queue event with the given content, and resets its wedged - /// status to false. + /// Updates a send queue request with the given content, and resets its + /// error status. /// /// # Arguments /// /// * `room_id` - The `RoomId` of the send queue's room. - /// * `transaction_id` - The unique key identifying the event to be sent + /// * `transaction_id` - The unique key identifying the request to be sent /// (and its transaction). /// * `content` - Serializable event content to replace the original one. /// - /// Returns true if an event has been updated, or false otherwise. - async fn update_send_queue_event( + /// Returns true if a request has been updated, or false otherwise. + async fn update_send_queue_request( &self, room_id: &RoomId, transaction_id: &TransactionId, content: SerializableEventContent, ) -> Result; - /// Remove an event previously inserted with [`Self::save_send_queue_event`] - /// from the database, based on its transaction id. + /// Remove a request previously inserted with + /// [`Self::save_send_queue_request`] from the database, based on its + /// transaction id. /// - /// Returns true if an event has been removed, or false otherwise. - async fn remove_send_queue_event( + /// Returns true if something has been removed, or false otherwise. + async fn remove_send_queue_request( &self, room_id: &RoomId, transaction_id: &TransactionId, ) -> Result; - /// Loads all the send queue events for the given room. - async fn load_send_queue_events( + /// Loads all the send queue requests for the given room. + async fn load_send_queue_requests( &self, room_id: &RoomId, - ) -> Result, Self::Error>; + ) -> Result, Self::Error>; /// Updates the send queue error status (wedge) for a given send queue - /// event. - /// Set `error` to None if the problem has been resolved and the event was - /// finally sent. - async fn update_send_queue_event_status( + /// request. + async fn update_send_queue_request_status( &self, room_id: &RoomId, transaction_id: &TransactionId, error: Option, ) -> Result<(), Self::Error>; - /// Loads all the rooms which have any pending events in their send queue. - async fn load_rooms_with_unsent_events(&self) -> Result, Self::Error>; + /// Loads all the rooms which have any pending requests in their send queue. + async fn load_rooms_with_unsent_requests(&self) -> Result, Self::Error>; - /// Add a new entry to the list of dependent send queue event for an event. - async fn save_dependent_send_queue_event( + /// Add a new entry to the list of dependent send queue requests for a + /// parent request. + async fn save_dependent_queued_request( &self, room_id: &RoomId, parent_txn_id: &TransactionId, own_txn_id: ChildTransactionId, - content: DependentQueuedEventKind, + content: DependentQueuedRequestKind, ) -> Result<(), Self::Error>; - /// Update a set of dependent send queue events with an event id, + /// Update a set of dependent send queue requests with an event id, /// effectively marking them as ready. /// - /// Returns the number of updated events. - async fn update_dependent_send_queue_event( + /// Returns the number of updated requests. + async fn update_dependent_queued_request( &self, room_id: &RoomId, parent_txn_id: &TransactionId, event_id: OwnedEventId, ) -> Result; - /// Remove a specific dependent send queue event by id. + /// Remove a specific dependent send queue request by id. /// - /// Returns true if the dependent send queue event has been indeed removed. - async fn remove_dependent_send_queue_event( + /// Returns true if the dependent send queue request has been indeed + /// removed. + async fn remove_dependent_queued_request( &self, room: &RoomId, own_txn_id: &ChildTransactionId, ) -> Result; - /// List all the dependent send queue events. + /// List all the dependent send queue requests. /// - /// This returns absolutely all the dependent send queue events, whether - /// they have an event id or not. They must be returned in insertion order. - async fn load_dependent_send_queue_events( + /// This returns absolutely all the dependent send queue requests, whether + /// they have a parent event id or not. As a contract for implementors, they + /// must be returned in insertion order. + async fn load_dependent_queued_requests( &self, room: &RoomId, - ) -> Result, Self::Error>; + ) -> Result, Self::Error>; } #[repr(transparent)] @@ -629,93 +631,93 @@ impl StateStore for EraseStateStoreError { self.0.remove_room(room_id).await.map_err(Into::into) } - async fn save_send_queue_event( + async fn save_send_queue_request( &self, room_id: &RoomId, transaction_id: OwnedTransactionId, content: SerializableEventContent, ) -> Result<(), Self::Error> { - self.0.save_send_queue_event(room_id, transaction_id, content).await.map_err(Into::into) + self.0.save_send_queue_request(room_id, transaction_id, content).await.map_err(Into::into) } - async fn update_send_queue_event( + async fn update_send_queue_request( &self, room_id: &RoomId, transaction_id: &TransactionId, content: SerializableEventContent, ) -> Result { - self.0.update_send_queue_event(room_id, transaction_id, content).await.map_err(Into::into) + self.0.update_send_queue_request(room_id, transaction_id, content).await.map_err(Into::into) } - async fn remove_send_queue_event( + async fn remove_send_queue_request( &self, room_id: &RoomId, transaction_id: &TransactionId, ) -> Result { - self.0.remove_send_queue_event(room_id, transaction_id).await.map_err(Into::into) + self.0.remove_send_queue_request(room_id, transaction_id).await.map_err(Into::into) } - async fn load_send_queue_events( + async fn load_send_queue_requests( &self, room_id: &RoomId, - ) -> Result, Self::Error> { - self.0.load_send_queue_events(room_id).await.map_err(Into::into) + ) -> Result, Self::Error> { + self.0.load_send_queue_requests(room_id).await.map_err(Into::into) } - async fn update_send_queue_event_status( + async fn update_send_queue_request_status( &self, room_id: &RoomId, transaction_id: &TransactionId, error: Option, ) -> Result<(), Self::Error> { self.0 - .update_send_queue_event_status(room_id, transaction_id, error) + .update_send_queue_request_status(room_id, transaction_id, error) .await .map_err(Into::into) } - async fn load_rooms_with_unsent_events(&self) -> Result, Self::Error> { - self.0.load_rooms_with_unsent_events().await.map_err(Into::into) + async fn load_rooms_with_unsent_requests(&self) -> Result, Self::Error> { + self.0.load_rooms_with_unsent_requests().await.map_err(Into::into) } - async fn save_dependent_send_queue_event( + async fn save_dependent_queued_request( &self, room_id: &RoomId, parent_txn_id: &TransactionId, own_txn_id: ChildTransactionId, - content: DependentQueuedEventKind, + content: DependentQueuedRequestKind, ) -> Result<(), Self::Error> { self.0 - .save_dependent_send_queue_event(room_id, parent_txn_id, own_txn_id, content) + .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, content) .await .map_err(Into::into) } - async fn update_dependent_send_queue_event( + async fn update_dependent_queued_request( &self, room_id: &RoomId, parent_txn_id: &TransactionId, event_id: OwnedEventId, ) -> Result { self.0 - .update_dependent_send_queue_event(room_id, parent_txn_id, event_id) + .update_dependent_queued_request(room_id, parent_txn_id, event_id) .await .map_err(Into::into) } - async fn remove_dependent_send_queue_event( + async fn remove_dependent_queued_request( &self, room_id: &RoomId, own_txn_id: &ChildTransactionId, ) -> Result { - self.0.remove_dependent_send_queue_event(room_id, own_txn_id).await.map_err(Into::into) + self.0.remove_dependent_queued_request(room_id, own_txn_id).await.map_err(Into::into) } - async fn load_dependent_send_queue_events( + async fn load_dependent_queued_requests( &self, room_id: &RoomId, - ) -> Result, Self::Error> { - self.0.load_dependent_send_queue_events(room_id).await.map_err(Into::into) + ) -> Result, Self::Error> { + self.0.load_dependent_queued_requests(room_id).await.map_err(Into::into) } } diff --git a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs index 0a90b1b8cc4..3dd7fab7a6b 100644 --- a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs +++ b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs @@ -25,9 +25,9 @@ use indexed_db_futures::prelude::*; use matrix_sdk_base::{ deserialized_responses::RawAnySyncOrStrippedState, store::{ - ChildTransactionId, ComposerDraft, DependentQueuedEvent, DependentQueuedEventKind, - QueuedEvent, QueuedRequestKind, SerializableEventContent, ServerCapabilities, StateChanges, - StateStore, StoreError, + ChildTransactionId, ComposerDraft, DependentQueuedRequest, DependentQueuedRequestKind, + QueuedRequest, QueuedRequestKind, SerializableEventContent, ServerCapabilities, + StateChanges, StateStore, StoreError, }, MinimalRoomMemberEvent, RoomInfo, RoomMemberships, StateStoreDataKey, StateStoreDataValue, }; @@ -423,14 +423,14 @@ impl IndexeddbStateStore { } } -/// A superset of [`QueuedEvent`] that also contains the room id, since we want -/// to return them. +/// A superset of [`QueuedRequest`] that also contains the room id, since we +/// want to return them. #[derive(Serialize, Deserialize)] -struct PersistedQueuedEvent { +struct PersistedQueuedRequest { /// In which room is this event going to be sent. pub room_id: OwnedRoomId, - // All these fields are the same as in [`QueuedEvent`]. + // All these fields are the same as in [`QueuedRequest`]. /// Kind. Optional because it might be missing from previous formats. kind: Option, transaction_id: OwnedTransactionId, @@ -444,8 +444,8 @@ struct PersistedQueuedEvent { event: Option, } -impl PersistedQueuedEvent { - fn into_queued_event(self) -> Option { +impl PersistedQueuedRequest { + fn into_queued_request(self) -> Option { let kind = self.kind.or_else(|| self.event.map(|content| QueuedRequestKind::Event { content }))?; @@ -459,7 +459,7 @@ impl PersistedQueuedEvent { _ => self.error, }; - Some(QueuedEvent { kind, transaction_id: self.transaction_id, error }) + Some(QueuedRequest { kind, transaction_id: self.transaction_id, error }) } } @@ -1324,7 +1324,7 @@ impl_state_store!({ self.get_user_ids_inner(room_id, memberships, false).await } - async fn save_send_queue_event( + async fn save_send_queue_request( &self, room_id: &RoomId, transaction_id: OwnedTransactionId, @@ -1338,18 +1338,19 @@ impl_state_store!({ let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?; - // We store an encoded vector of the queued events, with their transaction ids. + // We store an encoded vector of the queued requests, with their transaction + // ids. // Reload the previous vector for this room, or create an empty one. let prev = obj.get(&encoded_key)?.await?; let mut prev = prev.map_or_else( || Ok(Vec::new()), - |val| self.deserialize_value::>(&val), + |val| self.deserialize_value::>(&val), )?; - // Push the new event. - prev.push(PersistedQueuedEvent { + // Push the new request. + prev.push(PersistedQueuedRequest { room_id: room_id.to_owned(), kind: Some(QueuedRequestKind::Event { content }), transaction_id, @@ -1366,7 +1367,7 @@ impl_state_store!({ Ok(()) } - async fn update_send_queue_event( + async fn update_send_queue_request( &self, room_id: &RoomId, transaction_id: &TransactionId, @@ -1380,17 +1381,18 @@ impl_state_store!({ let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?; - // We store an encoded vector of the queued events, with their transaction ids. + // We store an encoded vector of the queued requests, with their transaction + // ids. // Reload the previous vector for this room, or create an empty one. let prev = obj.get(&encoded_key)?.await?; let mut prev = prev.map_or_else( || Ok(Vec::new()), - |val| self.deserialize_value::>(&val), + |val| self.deserialize_value::>(&val), )?; - // Modify the one event. + // Modify the one request. if let Some(entry) = prev.iter_mut().find(|entry| entry.transaction_id == transaction_id) { entry.kind = Some(QueuedRequestKind::Event { content }); // Reset the error state. @@ -1409,7 +1411,7 @@ impl_state_store!({ } } - async fn remove_send_queue_event( + async fn remove_send_queue_request( &self, room_id: &RoomId, transaction_id: &TransactionId, @@ -1423,11 +1425,12 @@ impl_state_store!({ let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?; - // We store an encoded vector of the queued events, with their transaction ids. + // We store an encoded vector of the queued requests, with their transaction + // ids. // Reload the previous vector for this room. if let Some(val) = obj.get(&encoded_key)?.await? { - let mut prev = self.deserialize_value::>(&val)?; + let mut prev = self.deserialize_value::>(&val)?; if let Some(pos) = prev.iter().position(|item| item.transaction_id == transaction_id) { prev.remove(pos); @@ -1445,10 +1448,11 @@ impl_state_store!({ Ok(false) } - async fn load_send_queue_events(&self, room_id: &RoomId) -> Result> { + async fn load_send_queue_requests(&self, room_id: &RoomId) -> Result> { let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id); - // We store an encoded vector of the queued events, with their transaction ids. + // We store an encoded vector of the queued requests, with their transaction + // ids. let prev = self .inner .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)? @@ -1458,13 +1462,13 @@ impl_state_store!({ let prev = prev.map_or_else( || Ok(Vec::new()), - |val| self.deserialize_value::>(&val), + |val| self.deserialize_value::>(&val), )?; - Ok(prev.into_iter().filter_map(PersistedQueuedEvent::into_queued_event).collect()) + Ok(prev.into_iter().filter_map(PersistedQueuedRequest::into_queued_request).collect()) } - async fn update_send_queue_event_status( + async fn update_send_queue_request_status( &self, room_id: &RoomId, transaction_id: &TransactionId, @@ -1479,12 +1483,12 @@ impl_state_store!({ let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?; if let Some(val) = obj.get(&encoded_key)?.await? { - let mut prev = self.deserialize_value::>(&val)?; - if let Some(queued_event) = + let mut prev = self.deserialize_value::>(&val)?; + if let Some(request) = prev.iter_mut().find(|item| item.transaction_id == transaction_id) { - queued_event.is_wedged = None; - queued_event.error = error; + request.is_wedged = None; + request.error = error; obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?; } } @@ -1494,7 +1498,7 @@ impl_state_store!({ Ok(()) } - async fn load_rooms_with_unsent_events(&self) -> Result> { + async fn load_rooms_with_unsent_requests(&self) -> Result> { let tx = self .inner .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?; @@ -1505,8 +1509,8 @@ impl_state_store!({ .get_all()? .await? .into_iter() - .map(|item| self.deserialize_value::>(&item)) - .collect::>, _>>()? + .map(|item| self.deserialize_value::>(&item)) + .collect::>, _>>()? .into_iter() .flat_map(|vec| vec.into_iter().map(|item| item.room_id)) .collect::>(); @@ -1514,12 +1518,12 @@ impl_state_store!({ Ok(all_entries.into_iter().collect()) } - async fn save_dependent_send_queue_event( + async fn save_dependent_queued_request( &self, room_id: &RoomId, parent_txn_id: &TransactionId, own_txn_id: ChildTransactionId, - content: DependentQueuedEventKind, + content: DependentQueuedRequestKind, ) -> Result<()> { let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id); @@ -1530,17 +1534,17 @@ impl_state_store!({ let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?; - // We store an encoded vector of the dependent events. + // We store an encoded vector of the dependent requests. // Reload the previous vector for this room, or create an empty one. let prev = obj.get(&encoded_key)?.await?; let mut prev = prev.map_or_else( || Ok(Vec::new()), - |val| self.deserialize_value::>(&val), + |val| self.deserialize_value::>(&val), )?; - // Push the new event. - prev.push(DependentQueuedEvent { + // Push the new request. + prev.push(DependentQueuedRequest { kind: content, parent_transaction_id: parent_txn_id.to_owned(), own_transaction_id: own_txn_id, @@ -1555,7 +1559,7 @@ impl_state_store!({ Ok(()) } - async fn update_dependent_send_queue_event( + async fn update_dependent_queued_request( &self, room_id: &RoomId, parent_txn_id: &TransactionId, @@ -1570,16 +1574,16 @@ impl_state_store!({ let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?; - // We store an encoded vector of the dependent events. + // We store an encoded vector of the dependent requests. // Reload the previous vector for this room, or create an empty one. let prev = obj.get(&encoded_key)?.await?; let mut prev = prev.map_or_else( || Ok(Vec::new()), - |val| self.deserialize_value::>(&val), + |val| self.deserialize_value::>(&val), )?; - // Modify all events that match. + // Modify all requests that match. let mut num_updated = 0; for entry in prev.iter_mut().filter(|entry| entry.parent_transaction_id == parent_txn_id) { entry.event_id = Some(event_id.clone()); @@ -1594,7 +1598,7 @@ impl_state_store!({ Ok(num_updated) } - async fn remove_dependent_send_queue_event( + async fn remove_dependent_queued_request( &self, room_id: &RoomId, txn_id: &ChildTransactionId, @@ -1608,10 +1612,10 @@ impl_state_store!({ let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?; - // We store an encoded vector of the dependent events. + // We store an encoded vector of the dependent requests. // Reload the previous vector for this room. if let Some(val) = obj.get(&encoded_key)?.await? { - let mut prev = self.deserialize_value::>(&val)?; + let mut prev = self.deserialize_value::>(&val)?; if let Some(pos) = prev.iter().position(|item| item.own_transaction_id == *txn_id) { prev.remove(pos); @@ -1629,13 +1633,13 @@ impl_state_store!({ Ok(false) } - async fn load_dependent_send_queue_events( + async fn load_dependent_queued_requests( &self, room_id: &RoomId, - ) -> Result> { + ) -> Result> { let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id); - // We store an encoded vector of the dependent events. + // We store an encoded vector of the dependent requests. let prev = self .inner .transaction_on_one_with_mode( @@ -1648,7 +1652,7 @@ impl_state_store!({ prev.map_or_else( || Ok(Vec::new()), - |val| self.deserialize_value::>(&val), + |val| self.deserialize_value::>(&val), ) } }); @@ -1682,17 +1686,13 @@ mod migration_tests { }; use serde::{Deserialize, Serialize}; - use crate::state_store::PersistedQueuedEvent; + use crate::state_store::PersistedQueuedRequest; #[derive(Serialize, Deserialize)] - struct OldPersistedQueuedEvent { - /// In which room is this event going to be sent. - pub room_id: OwnedRoomId, - - // All these fields are the same as in [`QueuedEvent`]. + struct OldPersistedQueuedRequest { + room_id: OwnedRoomId, event: SerializableEventContent, transaction_id: OwnedTransactionId, - is_wedged: bool, } @@ -1707,18 +1707,18 @@ mod migration_tests { SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into()) .unwrap(); - let old_persisted_queue_event = OldPersistedQueuedEvent { + let old_persisted_queue_event = OldPersistedQueuedRequest { room_id: room_a_id.to_owned(), event: content, transaction_id: transaction_id.clone(), is_wedged: true, }; - let serialized_persisted_event = serde_json::to_vec(&old_persisted_queue_event).unwrap(); + let serialized_persisted = serde_json::to_vec(&old_persisted_queue_event).unwrap(); // Load it with the new version. - let new_persisted: PersistedQueuedEvent = - serde_json::from_slice(&serialized_persisted_event).unwrap(); + let new_persisted: PersistedQueuedRequest = + serde_json::from_slice(&serialized_persisted).unwrap(); assert_eq!(new_persisted.is_wedged, Some(true)); assert!(new_persisted.error.is_none()); @@ -1726,7 +1726,7 @@ mod migration_tests { assert!(new_persisted.event.is_some()); assert!(new_persisted.kind.is_none()); - let queued = new_persisted.into_queued_event().unwrap(); + let queued = new_persisted.into_queued_request().unwrap(); assert_matches!(queued.kind, QueuedRequestKind::Event { .. }); assert_eq!(queued.transaction_id, transaction_id); assert!(queued.error.is_some()); diff --git a/crates/matrix-sdk-sqlite/migrations/state_store/005_send_queue_dependent_events.sql b/crates/matrix-sdk-sqlite/migrations/state_store/005_send_queue_dependent_events.sql index bb522d2d008..03e65c05748 100644 --- a/crates/matrix-sdk-sqlite/migrations/state_store/005_send_queue_dependent_events.sql +++ b/crates/matrix-sdk-sqlite/migrations/state_store/005_send_queue_dependent_events.sql @@ -14,6 +14,6 @@ CREATE TABLE "dependent_send_queue_events" ( -- Used as a value (thus encrypted/decrypted), can be null. "event_id" BLOB NULL, - -- Serialized `DependentQueuedEventKind`, used as a value (thus encrypted/decrypted). + -- Serialized `DependentQueuedRequestKind`, used as a value (thus encrypted/decrypted). "content" BLOB NOT NULL ); diff --git a/crates/matrix-sdk-sqlite/src/state_store.rs b/crates/matrix-sdk-sqlite/src/state_store.rs index bc8b9d00327..86f2b95d525 100644 --- a/crates/matrix-sdk-sqlite/src/state_store.rs +++ b/crates/matrix-sdk-sqlite/src/state_store.rs @@ -11,8 +11,8 @@ use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime}; use matrix_sdk_base::{ deserialized_responses::{RawAnySyncOrStrippedState, SyncOrStrippedState}, store::{ - migration_helpers::RoomInfoV1, ChildTransactionId, DependentQueuedEvent, - DependentQueuedEventKind, QueueWedgeError, QueuedEvent, QueuedRequestKind, + migration_helpers::RoomInfoV1, ChildTransactionId, DependentQueuedRequest, + DependentQueuedRequestKind, QueueWedgeError, QueuedRequest, QueuedRequestKind, SerializableEventContent, }, MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore, @@ -1670,7 +1670,7 @@ impl StateStore for SqliteStateStore { .await } - async fn save_send_queue_event( + async fn save_send_queue_request( &self, room_id: &RoomId, transaction_id: OwnedTransactionId, @@ -1695,7 +1695,7 @@ impl StateStore for SqliteStateStore { .await } - async fn update_send_queue_event( + async fn update_send_queue_request( &self, room_id: &RoomId, transaction_id: &TransactionId, @@ -1718,7 +1718,7 @@ impl StateStore for SqliteStateStore { Ok(num_updated > 0) } - async fn remove_send_queue_event( + async fn remove_send_queue_request( &self, room_id: &RoomId, transaction_id: &TransactionId, @@ -1742,10 +1742,10 @@ impl StateStore for SqliteStateStore { Ok(num_deleted > 0) } - async fn load_send_queue_events( + async fn load_send_queue_requests( &self, room_id: &RoomId, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { let room_id = self.encode_key(keys::SEND_QUEUE, room_id); // Note: ROWID is always present and is an auto-incremented integer counter. We @@ -1764,19 +1764,19 @@ impl StateStore for SqliteStateStore { ) .await?; - let mut queued_events = Vec::with_capacity(res.len()); + let mut requests = Vec::with_capacity(res.len()); for entry in res { - queued_events.push(QueuedEvent { + requests.push(QueuedRequest { transaction_id: entry.0.into(), kind: QueuedRequestKind::Event { content: self.deserialize_json(&entry.1)? }, error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?, }); } - Ok(queued_events) + Ok(requests) } - async fn update_send_queue_event_status( + async fn update_send_queue_request_status( &self, room_id: &RoomId, transaction_id: &TransactionId, @@ -1799,7 +1799,7 @@ impl StateStore for SqliteStateStore { .await } - async fn load_rooms_with_unsent_events(&self) -> Result, Self::Error> { + async fn load_rooms_with_unsent_requests(&self) -> Result, Self::Error> { // If the values were not encrypted, we could use `SELECT DISTINCT` here, but we // have to manually do the deduplication: indeed, for all X, encrypt(X) // != encrypted(X), since we use a nonce in the encryption process. @@ -1822,12 +1822,12 @@ impl StateStore for SqliteStateStore { .collect()) } - async fn save_dependent_send_queue_event( + async fn save_dependent_queued_request( &self, room_id: &RoomId, parent_txn_id: &TransactionId, own_txn_id: ChildTransactionId, - content: DependentQueuedEventKind, + content: DependentQueuedRequestKind, ) -> Result<()> { let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id); let content = self.serialize_json(&content)?; @@ -1850,7 +1850,7 @@ impl StateStore for SqliteStateStore { .await } - async fn update_dependent_send_queue_event( + async fn update_dependent_queued_request( &self, room_id: &RoomId, parent_txn_id: &TransactionId, @@ -1873,7 +1873,7 @@ impl StateStore for SqliteStateStore { .await } - async fn remove_dependent_send_queue_event( + async fn remove_dependent_queued_request( &self, room_id: &RoomId, txn_id: &ChildTransactionId, @@ -1897,10 +1897,10 @@ impl StateStore for SqliteStateStore { Ok(num_deleted > 0) } - async fn load_dependent_send_queue_events( + async fn load_dependent_queued_requests( &self, room_id: &RoomId, - ) -> Result> { + ) -> Result> { let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id); // Note: transaction_id is not encoded, see why in `save_send_queue_event`. @@ -1919,7 +1919,7 @@ impl StateStore for SqliteStateStore { let mut dependent_events = Vec::with_capacity(res.len()); for entry in res { - dependent_events.push(DependentQueuedEvent { + dependent_events.push(DependentQueuedRequest { own_transaction_id: entry.0.into(), parent_transaction_id: entry.1.into(), event_id: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?, @@ -2290,12 +2290,12 @@ mod migration_tests { // This transparently migrates to the latest version. let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap(); - let queued_event = store.load_send_queue_events(room_a_id).await.unwrap(); + let requests = store.load_send_queue_requests(room_a_id).await.unwrap(); - assert_eq!(queued_event.len(), 2); + assert_eq!(requests.len(), 2); let migrated_wedged = - queued_event.iter().find(|e| e.transaction_id == wedged_event_transaction_id).unwrap(); + requests.iter().find(|e| e.transaction_id == wedged_event_transaction_id).unwrap(); assert!(migrated_wedged.is_wedged()); assert_matches!( @@ -2303,7 +2303,7 @@ mod migration_tests { Some(QueueWedgeError::GenericApiError { .. }) ); - let migrated_ok = queued_event + let migrated_ok = requests .iter() .find(|e| e.transaction_id == local_event_transaction_id.clone()) .unwrap(); diff --git a/crates/matrix-sdk/src/send_queue.rs b/crates/matrix-sdk/src/send_queue.rs index 5642b61d42e..fcee608eed2 100644 --- a/crates/matrix-sdk/src/send_queue.rs +++ b/crates/matrix-sdk/src/send_queue.rs @@ -38,7 +38,7 @@ //! - enable/disable them all at once with [`SendQueue::set_enabled()`]. //! - get notifications about send errors with [`SendQueue::subscribe_errors`]. //! - reload all unsent events that had been persisted in storage using -//! [`SendQueue::respawn_tasks_for_rooms_with_unsent_events()`]. It is +//! [`SendQueue::respawn_tasks_for_rooms_with_unsent_requests()`]. It is //! recommended to call this method during initialization of a client, //! otherwise persisted unsent events will only be re-sent after the send //! queue for the given room has been reopened for the first time. @@ -53,8 +53,8 @@ use std::{ use matrix_sdk_base::{ store::{ - ChildTransactionId, DependentQueuedEvent, DependentQueuedEventKind, QueueWedgeError, - QueuedEvent, QueuedRequestKind, SerializableEventContent, + ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError, + QueuedRequest, QueuedRequestKind, SerializableEventContent, }, RoomState, StoreError, }; @@ -97,16 +97,16 @@ impl SendQueue { Self { client } } - /// Reload all the rooms which had unsent events, and respawn tasks for + /// Reload all the rooms which had unsent requests, and respawn tasks for /// those rooms. - pub async fn respawn_tasks_for_rooms_with_unsent_events(&self) { + pub async fn respawn_tasks_for_rooms_with_unsent_requests(&self) { if !self.is_enabled() { return; } let room_ids = - self.client.store().load_rooms_with_unsent_events().await.unwrap_or_else(|err| { - warn!("error when loading rooms with unsent events: {err}"); + self.client.store().load_rooms_with_unsent_requests().await.unwrap_or_else(|err| { + warn!("error when loading rooms with unsent requests: {err}"); Vec::new() }); @@ -118,11 +118,14 @@ impl SendQueue { } } + /// Tiny helper to get the send queue's global context from the [`Client`]. #[inline(always)] fn data(&self) -> &SendQueueData { &self.client.inner.send_queue_data } + /// Get or create a new send queue for a given room, and insert it into our + /// memoized rooms mapping. fn for_room(&self, room: Room) -> RoomSendQueue { let data = self.data(); @@ -141,7 +144,9 @@ impl SendQueue { &self.client, owned_room_id.clone(), ); + map.insert(owned_room_id, room_q.clone()); + room_q } @@ -150,9 +155,9 @@ impl SendQueue { /// If we're disabling the queue, and requests were being sent, they're not /// aborted, and will continue until a status resolves (error responses /// will keep the events in the buffer of events to send later). The - /// disablement will happen before the next event is sent. + /// disablement will happen before the next request is sent. /// - /// This may wake up background tasks and resume sending of events in the + /// This may wake up background tasks and resume sending of requests in the /// background. pub async fn set_enabled(&self, enabled: bool) { debug!(?enabled, "setting global send queue enablement"); @@ -165,8 +170,8 @@ impl SendQueue { } // Reload some extra rooms that might not have been awaken yet, but could have - // events from previous sessions. - self.respawn_tasks_for_rooms_with_unsent_events().await; + // requests from previous sessions. + self.respawn_tasks_for_rooms_with_unsent_requests().await; } /// Returns whether the send queue is enabled, at a client-wide @@ -176,32 +181,32 @@ impl SendQueue { } /// A subscriber to the enablement status (enabled or disabled) of the - /// send queue. + /// send queue, along with useful errors. pub fn subscribe_errors(&self) -> broadcast::Receiver { self.data().error_reporter.subscribe() } } -/// A specific room ran into an error, and has disabled itself. +/// A specific room's send queue ran into an error, and it has disabled itself. #[derive(Clone, Debug)] pub struct SendQueueRoomError { - /// Which room is failing? + /// For which room is the send queue failing? pub room_id: OwnedRoomId, - /// The error the room has ran into, when trying to send an event. + /// The error the room has ran into, when trying to send a request. pub error: Arc, /// Whether the error is considered recoverable or not. /// /// An error that's recoverable will disable the room's send queue, while an - /// unrecoverable error will be parked, until the user decides to cancel - /// sending it. + /// unrecoverable error will be parked, until the user decides to do + /// something about it. pub is_recoverable: bool, } impl Client { /// Returns a [`SendQueue`] that handles sending, retrying and not - /// forgetting about messages that are to be sent. + /// forgetting about requests that are to be sent. pub fn send_queue(&self) -> SendQueue { SendQueue::new(self.clone()) } @@ -321,10 +326,11 @@ impl RoomSendQueue { /// the [`Self::subscribe()`] method to get updates about the sending of /// that event. /// - /// By default, if sending the event fails on the first attempt, it will be - /// retried a few times. If sending failed, the entire client's sending - /// queue will be disabled, and it will need to be manually re-enabled - /// by the caller. + /// By default, if sending failed on the first attempt, it will be retried a + /// few times. If sending failed after those retries, the entire + /// client's sending queue will be disabled, and it will need to be + /// manually re-enabled by the caller (e.g. after network is back, or when + /// something has been done about the faulty requests). pub async fn send_raw( &self, content: Raw, @@ -368,10 +374,11 @@ impl RoomSendQueue { /// the [`Self::subscribe()`] method to get updates about the sending of /// that event. /// - /// By default, if sending the event fails on the first attempt, it will be - /// retried a few times. If sending failed, the entire client's sending - /// queue will be disabled, and it will need to be manually re-enabled - /// by the caller. + /// By default, if sending failed on the first attempt, it will be retried a + /// few times. If sending failed after those retries, the entire + /// client's sending queue will be disabled, and it will need to be + /// manually re-enabled by the caller (e.g. after network is back, or when + /// something has been done about the faulty requests). pub async fn send( &self, content: AnyMessageLikeEventContent, @@ -383,8 +390,8 @@ impl RoomSendQueue { .await } - /// Returns the current local events as well as a receiver to listen to the - /// send queue updates, as defined in [`RoomSendQueueUpdate`]. + /// Returns the current local requests as well as a receiver to listen to + /// the send queue updates, as defined in [`RoomSendQueueUpdate`]. pub async fn subscribe( &self, ) -> Result<(Vec, broadcast::Receiver), RoomSendQueueError> @@ -394,6 +401,11 @@ impl RoomSendQueue { Ok((local_echoes, self.inner.updates.subscribe())) } + /// A task that must be spawned in the async runtime, running in the + /// background for each room that has a send queue. + /// + /// It only progresses forward: nothing can be cancelled at any point, which + /// makes the implementation not overly complicated to follow. #[instrument(skip_all, fields(room_id = %room.room_id()))] async fn sending_task( room: WeakRoom, @@ -413,10 +425,10 @@ impl RoomSendQueue { break; } - // Try to apply dependent events now; those applying to previously failed + // Try to apply dependent requests now; those applying to previously failed // attempts (local echoes) would succeed now. - if let Err(err) = queue.apply_dependent_events().await { - warn!("errors when applying dependent events: {err}"); + if let Err(err) = queue.apply_dependent_requests().await { + warn!("errors when applying dependent requests: {err}"); } if !locally_enabled.load(Ordering::SeqCst) { @@ -426,8 +438,8 @@ impl RoomSendQueue { continue; } - let queued_event = match queue.peek_next_to_send().await { - Ok(Some(event)) => event, + let queued_request = match queue.peek_next_to_send().await { + Ok(Some(request)) => request, Ok(None) => { trace!("queue is empty, sleeping"); @@ -437,12 +449,12 @@ impl RoomSendQueue { } Err(err) => { - warn!("error when loading next event to send: {err}"); + warn!("error when loading next request to send: {err}"); continue; } }; - trace!(txn_id = %queued_event.transaction_id, "received an event to send!"); + trace!(txn_id = %queued_request.transaction_id, "received a request to send!"); let Some(room) = room.get() else { if is_dropping.load(Ordering::SeqCst) { @@ -452,20 +464,23 @@ impl RoomSendQueue { continue; }; - let (event, event_type) = queued_event.as_event().unwrap().raw(); + let (event, event_type) = match &queued_request.kind { + QueuedRequestKind::Event { content } => content.raw(), + }; + match room - .send_raw(&event_type.to_string(), event) - .with_transaction_id(&queued_event.transaction_id) + .send_raw(event_type, event) + .with_transaction_id(&queued_request.transaction_id) .with_request_config(RequestConfig::short_retry()) .await { Ok(res) => { - trace!(txn_id = %queued_event.transaction_id, event_id = %res.event_id, "successfully sent"); + trace!(txn_id = %queued_request.transaction_id, event_id = %res.event_id, "successfully sent"); - match queue.mark_as_sent(&queued_event.transaction_id, &res.event_id).await { + match queue.mark_as_sent(&queued_request.transaction_id, &res.event_id).await { Ok(()) => { let _ = updates.send(RoomSendQueueUpdate::SentEvent { - transaction_id: queued_event.transaction_id, + transaction_id: queued_request.transaction_id, event_id: res.event_id, }); } @@ -497,35 +512,34 @@ impl RoomSendQueue { }; if is_recoverable { - warn!(txn_id = %queued_event.transaction_id, error = ?err, "Recoverable error when sending event: {err}, disabling send queue"); + warn!(txn_id = %queued_request.transaction_id, error = ?err, "Recoverable error when sending request: {err}, disabling send queue"); - // In this case, we intentionally keep the event in the queue, but mark it + // In this case, we intentionally keep the request in the queue, but mark it // as not being sent anymore. - queue.mark_as_not_being_sent(&queued_event.transaction_id).await; + queue.mark_as_not_being_sent(&queued_request.transaction_id).await; // Let observers know about a failure *after* we've marked the item as not // being sent anymore. Otherwise, there's a possible race where a caller - // might try to remove an item, while it's still - // marked as being sent, resulting in a cancellation - // failure. + // might try to remove an item, while it's still marked as being sent, + // resulting in a cancellation failure. // Disable the queue for this room after a recoverable error happened. This // should be the sign that this error is temporary (maybe network // disconnected, maybe the server had a hiccup). locally_enabled.store(false, Ordering::SeqCst); } else { - warn!(txn_id = %queued_event.transaction_id, error = ?err, "Unrecoverable error when sending event: {err}"); + warn!(txn_id = %queued_request.transaction_id, error = ?err, "Unrecoverable error when sending request: {err}"); - // Mark the event as wedged, so it's not picked at any future point. + // Mark the request as wedged, so it's not picked at any future point. if let Err(storage_error) = queue .mark_as_wedged( - &queued_event.transaction_id, + &queued_request.transaction_id, QueueWedgeError::from(&err), ) .await { - warn!("unable to mark event as wedged: {storage_error}"); + warn!("unable to mark request as wedged: {storage_error}"); } } @@ -538,7 +552,7 @@ impl RoomSendQueue { }); let _ = updates.send(RoomSendQueueUpdate::SendError { - transaction_id: queued_event.transaction_id, + transaction_id: queued_request.transaction_id, error, is_recoverable, }); @@ -574,7 +588,7 @@ impl RoomSendQueue { .await .map_err(RoomSendQueueError::StorageError)?; - // Wake up the queue, in case the room was asleep before unwedging the event. + // Wake up the queue, in case the room was asleep before unwedging the request. self.inner.notifier.notify_one(); let _ = self @@ -595,14 +609,17 @@ impl From<&crate::Error> for QueueWedgeError { SessionRecipientCollectionError::VerifiedUserHasUnsignedDevice(user_map) => { QueueWedgeError::InsecureDevices { user_device_map: user_map.clone() } } + SessionRecipientCollectionError::VerifiedUserChangedIdentity(users) => { QueueWedgeError::IdentityViolations { users: users.clone() } } + SessionRecipientCollectionError::CrossSigningNotSetup | SessionRecipientCollectionError::SendingFromUnverifiedDevice => { QueueWedgeError::CrossVerificationRequired } }, + _ => QueueWedgeError::GenericApiError { msg: value.to_string() }, } } @@ -612,23 +629,24 @@ struct RoomSendQueueInner { /// The room which this send queue relates to. room: WeakRoom, - /// Broadcaster for notifications about the statuses of events to be sent. + /// Broadcaster for notifications about the statuses of requests to be sent. /// /// Can be subscribed to from the outside. updates: broadcast::Sender, - /// Queue of events that are either to be sent, or being sent. + /// Queue of requests that are either to be sent, or being sent. /// - /// When an event has been sent to the server, it is removed from that queue - /// *after* being sent. That way, we will retry sending upon failure, in - /// the same order events have been inserted in the first place. + /// When a request has been sent to the server, it is removed from that + /// queue *after* being sent. That way, we will retry sending upon + /// failure, in the same order requests have been inserted in the first + /// place. queue: QueueStorage, /// A notifier that's updated any time common data is touched (stopped or /// enabled statuses), or the associated room [`QueueStorage`]. notifier: Arc, - /// Should the room process new events or not (because e.g. it might be + /// Should the room process new requests or not (because e.g. it might be /// running off the network)? locally_enabled: Arc, @@ -645,14 +663,14 @@ struct QueueStorage { /// To which room is this storage related. room_id: OwnedRoomId, - /// All the queued events that are being sent at the moment. + /// All the queued requests that are being sent at the moment. /// /// It also serves as an internal lock on the storage backend. being_sent: Arc>>, } impl QueueStorage { - /// Create a new synchronized queue for queuing events to be sent later. + /// Create a new queue for queuing requests to be sent later. fn new(client: WeakClient, room: OwnedRoomId) -> Self { Self { room_id: room, being_sent: Default::default(), client } } @@ -673,39 +691,40 @@ impl QueueStorage { self.client()? .store() - .save_send_queue_event(&self.room_id, transaction_id.clone(), serializable) + .save_send_queue_request(&self.room_id, transaction_id.clone(), serializable) .await?; Ok(transaction_id) } - /// Peeks the next event to be sent, marking it as being sent. + /// Peeks the next request to be sent, marking it as being sent. /// /// It is required to call [`Self::mark_as_sent`] after it's been /// effectively sent. - async fn peek_next_to_send(&self) -> Result, RoomSendQueueStorageError> { + async fn peek_next_to_send(&self) -> Result, RoomSendQueueStorageError> { // Keep the lock until we're done touching the storage. let mut being_sent = self.being_sent.write().await; - let queued_events = self.client()?.store().load_send_queue_events(&self.room_id).await?; + let queued_requests = + self.client()?.store().load_send_queue_requests(&self.room_id).await?; - if let Some(event) = queued_events.iter().find(|queued| !queued.is_wedged()) { - being_sent.insert(event.transaction_id.clone()); + if let Some(request) = queued_requests.iter().find(|queued| !queued.is_wedged()) { + being_sent.insert(request.transaction_id.clone()); - Ok(Some(event.clone())) + Ok(Some(request.clone())) } else { Ok(None) } } - /// Marks an event popped with [`Self::peek_next_to_send`] and identified + /// Marks a request popped with [`Self::peek_next_to_send`] and identified /// with the given transaction id as not being sent anymore, so it can /// be removed from the queue later. async fn mark_as_not_being_sent(&self, transaction_id: &TransactionId) { self.being_sent.write().await.remove(transaction_id); } - /// Marks an event popped with [`Self::peek_next_to_send`] and identified + /// Marks a request popped with [`Self::peek_next_to_send`] and identified /// with the given transaction id as being wedged (and not being sent /// anymore), so it can be removed from the queue later. async fn mark_as_wedged( @@ -720,11 +739,11 @@ impl QueueStorage { Ok(self .client()? .store() - .update_send_queue_event_status(&self.room_id, transaction_id, Some(reason)) + .update_send_queue_request_status(&self.room_id, transaction_id, Some(reason)) .await?) } - /// Marks an event identified with the given transaction id as being now + /// Marks a request identified with the given transaction id as being now /// unwedged and adds it back to the queue. async fn mark_as_unwedged( &self, @@ -733,12 +752,12 @@ impl QueueStorage { Ok(self .client()? .store() - .update_send_queue_event_status(&self.room_id, transaction_id, None) + .update_send_queue_request_status(&self.room_id, transaction_id, None) .await?) } - /// Marks an event pushed with [`Self::push`] and identified with the given - /// transaction id as sent by removing it from the local queue. + /// Marks a request pushed with [`Self::push`] and identified with the given + /// transaction id as sent, by removing it from the local queue. async fn mark_as_sent( &self, transaction_id: &TransactionId, @@ -751,15 +770,15 @@ impl QueueStorage { let client = self.client()?; let store = client.store(); - // Update all dependent events. + // Update all dependent requests. store - .update_dependent_send_queue_event(&self.room_id, transaction_id, event_id.to_owned()) + .update_dependent_queued_request(&self.room_id, transaction_id, event_id.to_owned()) .await?; - let removed = store.remove_send_queue_event(&self.room_id, transaction_id).await?; + let removed = store.remove_send_queue_request(&self.room_id, transaction_id).await?; if !removed { - warn!(txn_id = %transaction_id, "event marked as sent was missing from storage"); + warn!(txn_id = %transaction_id, "request marked as sent was missing from storage"); } Ok(()) @@ -770,8 +789,8 @@ impl QueueStorage { /// /// Returns whether the given transaction has been effectively removed. If /// false, this either means that the transaction id was unrelated to - /// this queue, or that the event was sent before we cancelled it. - async fn cancel( + /// this queue, or that the request was sent before we cancelled it. + async fn cancel_event( &self, transaction_id: &TransactionId, ) -> Result { @@ -782,11 +801,11 @@ impl QueueStorage { // Save the intent to redact the event. self.client()? .store() - .save_dependent_send_queue_event( + .save_dependent_queued_request( &self.room_id, transaction_id, ChildTransactionId::new(), - DependentQueuedEventKind::Redact, + DependentQueuedRequestKind::RedactEvent, ) .await?; @@ -794,19 +813,18 @@ impl QueueStorage { } let removed = - self.client()?.store().remove_send_queue_event(&self.room_id, transaction_id).await?; + self.client()?.store().remove_send_queue_request(&self.room_id, transaction_id).await?; Ok(removed) } - /// Replace an event that has been sent with - /// [`Self::push`] with the given transaction id, before it's been actually - /// sent. + /// Replace an event that has been sent with [`Self::push`] with the given + /// transaction id, before it's been actually sent. /// /// Returns whether the given transaction has been effectively edited. If /// false, this either means that the transaction id was unrelated to - /// this queue, or that the event was sent before we edited it. - async fn replace( + /// this queue, or that the request was sent before we edited it. + async fn replace_event( &self, transaction_id: &TransactionId, serializable: SerializableEventContent, @@ -815,14 +833,14 @@ impl QueueStorage { let being_sent = self.being_sent.read().await; if being_sent.contains(transaction_id) { - // Save the intent to redact the event. + // Save the intent to edit the associated event. self.client()? .store() - .save_dependent_send_queue_event( + .save_dependent_queued_request( &self.room_id, transaction_id, ChildTransactionId::new(), - DependentQueuedEventKind::Edit { new_content: serializable }, + DependentQueuedRequestKind::EditEvent { new_content: serializable }, ) .await?; @@ -832,12 +850,13 @@ impl QueueStorage { let edited = self .client()? .store() - .update_send_queue_event(&self.room_id, transaction_id, serializable) + .update_send_queue_request(&self.room_id, transaction_id, serializable) .await?; Ok(edited) } + /// Reacts to the given local echo of an event. #[instrument(skip(self))] async fn react( &self, @@ -847,28 +866,28 @@ impl QueueStorage { let client = self.client()?; let store = client.store(); - let queued_events = store.load_send_queue_events(&self.room_id).await?; + let requests = store.load_send_queue_requests(&self.room_id).await?; - // If the event has been already sent, abort immediately. - if !queued_events.iter().any(|item| item.transaction_id == transaction_id) { + // If the target event has been already sent, abort immediately. + if !requests.iter().any(|item| item.transaction_id == transaction_id) { return Ok(None); } - // Record the dependent event. + // Record the dependent request. let reaction_txn_id = ChildTransactionId::new(); store - .save_dependent_send_queue_event( + .save_dependent_queued_request( &self.room_id, transaction_id, reaction_txn_id.clone(), - DependentQueuedEventKind::React { key }, + DependentQueuedRequestKind::ReactEvent { key }, ) .await?; Ok(Some(reaction_txn_id)) } - /// Returns a list of the local echoes, that is, all the events that we're + /// Returns a list of the local echoes, that is, all the requests that we're /// about to send but that haven't been sent yet (or are being sent). async fn local_echoes( &self, @@ -877,8 +896,8 @@ impl QueueStorage { let client = self.client()?; let store = client.store(); - let local_events = - store.load_send_queue_events(&self.room_id).await?.into_iter().map(|queued| { + let local_requests = + store.load_send_queue_requests(&self.room_id).await?.into_iter().map(|queued| { LocalEcho { transaction_id: queued.transaction_id.clone(), content: match queued.kind { @@ -894,45 +913,48 @@ impl QueueStorage { } }); - let local_reactions = store - .load_dependent_send_queue_events(&self.room_id) - .await? - .into_iter() - .filter_map(|dep| match dep.kind { - DependentQueuedEventKind::Edit { .. } | DependentQueuedEventKind::Redact => None, - DependentQueuedEventKind::React { key } => Some(LocalEcho { - transaction_id: dep.own_transaction_id.clone().into(), - content: LocalEchoContent::React { - key, - send_handle: SendReactionHandle { - room: room.clone(), - transaction_id: dep.own_transaction_id, + let local_reactions = + store.load_dependent_queued_requests(&self.room_id).await?.into_iter().filter_map( + |dep| match dep.kind { + DependentQueuedRequestKind::EditEvent { .. } + | DependentQueuedRequestKind::RedactEvent => { + // TODO: reflect local edits/redacts too? + None + } + DependentQueuedRequestKind::ReactEvent { key } => Some(LocalEcho { + transaction_id: dep.own_transaction_id.clone().into(), + content: LocalEchoContent::React { + key, + send_handle: SendReactionHandle { + room: room.clone(), + transaction_id: dep.own_transaction_id, + }, + applies_to: dep.parent_transaction_id, }, - applies_to: dep.parent_transaction_id, - }, - }), - }); + }), + }, + ); - Ok(local_events.chain(local_reactions).collect()) + Ok(local_requests.chain(local_reactions).collect()) } - /// Try to apply a single dependent event, whether it's local or remote. + /// Try to apply a single dependent request, whether it's local or remote. /// /// This swallows errors that would retrigger every time if we retried - /// applying the dependent event: invalid edit content, etc. + /// applying the dependent request: invalid edit content, etc. /// - /// Returns true if the dependent event has been sent (or should not be + /// Returns true if the dependent request has been sent (or should not be /// retried later). #[instrument(skip_all)] - async fn try_apply_single_dependent_event( + async fn try_apply_single_dependent_request( &self, client: &Client, - de: DependentQueuedEvent, + de: DependentQueuedRequest, ) -> Result { let store = client.store(); match de.kind { - DependentQueuedEventKind::Edit { new_content } => { + DependentQueuedRequestKind::EditEvent { new_content } => { if let Some(event_id) = de.event_id { // The parent event has been sent, so send an edit event. let room = client @@ -983,7 +1005,7 @@ impl QueueStorage { ); store - .save_send_queue_event( + .save_send_queue_request( &self.room_id, de.own_transaction_id.into(), serializable, @@ -991,10 +1013,9 @@ impl QueueStorage { .await .map_err(RoomSendQueueStorageError::StorageError)?; } else { - // The parent event is still local (sending must have failed); update the local - // echo. + // The parent event is still local; update the local echo. let edited = store - .update_send_queue_event( + .update_send_queue_request( &self.room_id, &de.parent_transaction_id, new_content, @@ -1008,7 +1029,7 @@ impl QueueStorage { } } - DependentQueuedEventKind::Redact => { + DependentQueuedRequestKind::RedactEvent => { if let Some(event_id) = de.event_id { // The parent event has been sent; send a redaction. let room = client @@ -1032,7 +1053,7 @@ impl QueueStorage { // The parent event is still local (sending must have failed); redact the local // echo. let removed = store - .remove_send_queue_event(&self.room_id, &de.parent_transaction_id) + .remove_send_queue_request(&self.room_id, &de.parent_transaction_id) .await .map_err(RoomSendQueueStorageError::StorageError)?; @@ -1042,7 +1063,7 @@ impl QueueStorage { } } - DependentQueuedEventKind::React { key } => { + DependentQueuedRequestKind::ReactEvent { key } => { if let Some(event_id) = de.event_id { // Queue the reaction event in the send queue 🧠. let react_event = @@ -1054,7 +1075,7 @@ impl QueueStorage { ); store - .save_send_queue_event( + .save_send_queue_request( &self.room_id, de.own_transaction_id.into(), serializable, @@ -1072,78 +1093,78 @@ impl QueueStorage { } #[instrument(skip(self))] - async fn apply_dependent_events(&self) -> Result<(), RoomSendQueueError> { + async fn apply_dependent_requests(&self) -> Result<(), RoomSendQueueError> { // Keep the lock until we're done touching the storage. let _being_sent = self.being_sent.read().await; let client = self.client()?; let store = client.store(); - let dependent_events = store - .load_dependent_send_queue_events(&self.room_id) + let dependent_requests = store + .load_dependent_queued_requests(&self.room_id) .await .map_err(RoomSendQueueStorageError::StorageError)?; - let num_initial_dependent_events = dependent_events.len(); - if num_initial_dependent_events == 0 { + let num_initial_dependent_requests = dependent_requests.len(); + if num_initial_dependent_requests == 0 { // Returning early here avoids a bit of useless logging. return Ok(()); } - let canonicalized_dependent_events = canonicalize_dependent_events(&dependent_events); + let canonicalized_dependent_requests = canonicalize_dependent_requests(&dependent_requests); // Get rid of the all non-canonical dependent events. - for original in &dependent_events { - if !canonicalized_dependent_events + for original in &dependent_requests { + if !canonicalized_dependent_requests .iter() .any(|canonical| canonical.own_transaction_id == original.own_transaction_id) { store - .remove_dependent_send_queue_event(&self.room_id, &original.own_transaction_id) + .remove_dependent_queued_request(&self.room_id, &original.own_transaction_id) .await .map_err(RoomSendQueueStorageError::StorageError)?; } } - let mut num_dependent_events = canonicalized_dependent_events.len(); + let mut num_dependent_requests = canonicalized_dependent_requests.len(); debug!( - num_dependent_events, - num_initial_dependent_events, "starting handling of dependent events" + num_dependent_requests, + num_initial_dependent_requests, "starting handling of dependent requests" ); - for dependent in canonicalized_dependent_events { + for dependent in canonicalized_dependent_requests { let dependent_id = dependent.own_transaction_id.clone(); - match self.try_apply_single_dependent_event(&client, dependent).await { + match self.try_apply_single_dependent_request(&client, dependent).await { Ok(should_remove) => { if should_remove { - // The dependent event has been successfully applied, forget about it. + // The dependent request has been successfully applied, forget about it. store - .remove_dependent_send_queue_event(&self.room_id, &dependent_id) + .remove_dependent_queued_request(&self.room_id, &dependent_id) .await .map_err(RoomSendQueueStorageError::StorageError)?; - num_dependent_events -= 1; + num_dependent_requests -= 1; } } Err(err) => { - warn!("error when applying single dependent event: {err}"); + warn!("error when applying single dependent request: {err}"); } } } debug!( - leftover_dependent_events = num_dependent_events, - "stopped handling dependent events" + leftover_dependent_requests = num_dependent_requests, + "stopped handling dependent request" ); Ok(()) } - /// Remove a single dependent event from storage. - async fn remove_dependent_send_queue_event( + /// Remove a single dependent request from storage. + async fn remove_dependent_send_queue_request( &self, dependent_event_id: &ChildTransactionId, ) -> Result { @@ -1153,7 +1174,7 @@ impl QueueStorage { Ok(self .client()? .store() - .remove_dependent_send_queue_event(&self.room_id, dependent_event_id) + .remove_dependent_queued_request(&self.room_id, dependent_event_id) .await?) } } @@ -1184,10 +1205,11 @@ pub enum LocalEchoContent { }, } -/// An event that has been locally queued for sending, but hasn't been sent yet. +/// A local representation for a request that hasn't been sent yet to the user's +/// homeserver. #[derive(Clone, Debug)] pub struct LocalEcho { - /// Transaction id used to identify this event. + /// Transaction id used to identify the associated request. pub transaction_id: OwnedTransactionId, /// The content for the local echo. pub content: LocalEchoContent, @@ -1259,8 +1281,9 @@ pub enum RoomSendQueueError { #[error("the room isn't in the joined state")] RoomNotJoined, - /// The room is missing from the client. This could happen if the client is - /// shutting down. + /// The room is missing from the client. + /// + /// This happens only whenever the client is shutting down. #[error("the room is now missing from the client")] RoomDisappeared, @@ -1286,6 +1309,7 @@ pub enum RoomSendQueueStorageError { } /// A handle to manipulate an event that was scheduled to be sent to a room. +// TODO (bnjbvr): consider renaming `SendEventHandle`, unless we can reuse it for medias too. #[derive(Clone, Debug)] pub struct SendHandle { room: RoomSendQueue, @@ -1301,7 +1325,7 @@ impl SendHandle { pub async fn abort(&self) -> Result { trace!("received an abort request"); - if self.room.inner.queue.cancel(&self.transaction_id).await? { + if self.room.inner.queue.cancel_event(&self.transaction_id).await? { trace!("successful abort"); // Propagate a cancelled update too. @@ -1330,7 +1354,7 @@ impl SendHandle { let serializable = SerializableEventContent::from_raw(new_content, event_type); - if self.room.inner.queue.replace(&self.transaction_id, serializable.clone()).await? { + if self.room.inner.queue.replace_event(&self.transaction_id, serializable.clone()).await? { trace!("successful edit"); // Wake up the queue, in case the room was asleep before the edit. @@ -1423,7 +1447,7 @@ impl SendReactionHandle { /// Will return true if the reaction could be aborted, false if it's been /// sent (and there's no matching local echo anymore). pub async fn abort(&self) -> Result { - if self.room.inner.queue.remove_dependent_send_queue_event(&self.transaction_id).await? { + if self.room.inner.queue.remove_dependent_send_queue_request(&self.transaction_id).await? { // Simple case: the reaction was found in the dependent event list. // Propagate a cancelled update too. @@ -1450,27 +1474,29 @@ impl SendReactionHandle { } } -/// From a given source of [`DependentQueuedEvent`], return only the most +/// From a given source of [`DependentQueuedRequest`], return only the most /// meaningful, i.e. the ones that wouldn't be overridden after applying the /// others. -fn canonicalize_dependent_events(dependent: &[DependentQueuedEvent]) -> Vec { - let mut by_event_id = HashMap::>::new(); +fn canonicalize_dependent_requests( + dependent: &[DependentQueuedRequest], +) -> Vec { + let mut by_event_id = HashMap::>::new(); for d in dependent { let prevs = by_event_id.entry(d.parent_transaction_id.clone()).or_default(); - if prevs.iter().any(|prev| matches!(prev.kind, DependentQueuedEventKind::Redact)) { - // The event has already been flagged for redaction, don't consider the other - // dependent events. + if prevs.iter().any(|prev| matches!(prev.kind, DependentQueuedRequestKind::RedactEvent)) { + // The parent event has already been flagged for redaction, don't consider the + // other dependent events. continue; } match &d.kind { - DependentQueuedEventKind::Edit { .. } => { + DependentQueuedRequestKind::EditEvent { .. } => { // Replace any previous edit with this one. if let Some(prev_edit) = prevs .iter_mut() - .find(|prev| matches!(prev.kind, DependentQueuedEventKind::Edit { .. })) + .find(|prev| matches!(prev.kind, DependentQueuedRequestKind::EditEvent { .. })) { *prev_edit = d; } else { @@ -1478,11 +1504,11 @@ fn canonicalize_dependent_events(dependent: &[DependentQueuedEvent]) -> Vec { + DependentQueuedRequestKind::ReactEvent { .. } => { prevs.push(d); } - DependentQueuedEventKind::Redact => { + DependentQueuedRequestKind::RedactEvent => { // Remove every other dependent action. prevs.clear(); prevs.push(d); @@ -1502,7 +1528,7 @@ mod tests { use assert_matches2::{assert_let, assert_matches}; use matrix_sdk_base::store::{ - ChildTransactionId, DependentQueuedEvent, DependentQueuedEventKind, + ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, SerializableEventContent, }; use matrix_sdk_test::{async_test, JoinedRoomBuilder, SyncResponseBuilder}; @@ -1511,7 +1537,7 @@ mod tests { room_id, TransactionId, }; - use super::canonicalize_dependent_events; + use super::canonicalize_dependent_requests; use crate::{client::WeakClient, test_utils::logged_in_client}; #[async_test] @@ -1564,10 +1590,10 @@ mod tests { // Smoke test: canonicalizing a single dependent event returns it. let txn = TransactionId::new(); - let edit = DependentQueuedEvent { + let edit = DependentQueuedRequest { own_transaction_id: ChildTransactionId::new(), parent_transaction_id: txn.clone(), - kind: DependentQueuedEventKind::Edit { + kind: DependentQueuedRequestKind::EditEvent { new_content: SerializableEventContent::new( &RoomMessageEventContent::text_plain("edit").into(), ) @@ -1575,10 +1601,10 @@ mod tests { }, event_id: None, }; - let res = canonicalize_dependent_events(&[edit]); + let res = canonicalize_dependent_requests(&[edit]); assert_eq!(res.len(), 1); - assert_matches!(&res[0].kind, DependentQueuedEventKind::Edit { .. }); + assert_matches!(&res[0].kind, DependentQueuedRequestKind::EditEvent { .. }); assert_eq!(res[0].parent_transaction_id, txn); assert!(res[0].event_id.is_none()); } @@ -1589,17 +1615,17 @@ mod tests { let txn = TransactionId::new(); let mut inputs = Vec::with_capacity(100); - let redact = DependentQueuedEvent { + let redact = DependentQueuedRequest { own_transaction_id: ChildTransactionId::new(), parent_transaction_id: txn.clone(), - kind: DependentQueuedEventKind::Redact, + kind: DependentQueuedRequestKind::RedactEvent, event_id: None, }; - let edit = DependentQueuedEvent { + let edit = DependentQueuedRequest { own_transaction_id: ChildTransactionId::new(), parent_transaction_id: txn.clone(), - kind: DependentQueuedEventKind::Edit { + kind: DependentQueuedRequestKind::EditEvent { new_content: SerializableEventContent::new( &RoomMessageEventContent::text_plain("edit").into(), ) @@ -1622,10 +1648,10 @@ mod tests { inputs.push(edit); } - let res = canonicalize_dependent_events(&inputs); + let res = canonicalize_dependent_requests(&inputs); assert_eq!(res.len(), 1); - assert_matches!(&res[0].kind, DependentQueuedEventKind::Redact); + assert_matches!(&res[0].kind, DependentQueuedRequestKind::RedactEvent); assert_eq!(res[0].parent_transaction_id, txn); } @@ -1635,10 +1661,10 @@ mod tests { // The latest edit of a list is always preferred. let inputs = (0..10) - .map(|i| DependentQueuedEvent { + .map(|i| DependentQueuedRequest { own_transaction_id: ChildTransactionId::new(), parent_transaction_id: parent_txn.clone(), - kind: DependentQueuedEventKind::Edit { + kind: DependentQueuedRequestKind::EditEvent { new_content: SerializableEventContent::new( &RoomMessageEventContent::text_plain(format!("edit{i}")).into(), ) @@ -1650,10 +1676,10 @@ mod tests { let txn = inputs[9].parent_transaction_id.clone(); - let res = canonicalize_dependent_events(&inputs); + let res = canonicalize_dependent_requests(&inputs); assert_eq!(res.len(), 1); - assert_let!(DependentQueuedEventKind::Edit { new_content } = &res[0].kind); + assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind); assert_let!( AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap() ); @@ -1671,16 +1697,16 @@ mod tests { let inputs = vec![ // This one pertains to txn1. - DependentQueuedEvent { + DependentQueuedRequest { own_transaction_id: child1.clone(), - kind: DependentQueuedEventKind::Redact, + kind: DependentQueuedRequestKind::RedactEvent, parent_transaction_id: txn1.clone(), event_id: None, }, // This one pertains to txn2. - DependentQueuedEvent { + DependentQueuedRequest { own_transaction_id: child2, - kind: DependentQueuedEventKind::Edit { + kind: DependentQueuedRequestKind::EditEvent { new_content: SerializableEventContent::new( &RoomMessageEventContent::text_plain("edit").into(), ) @@ -1691,7 +1717,7 @@ mod tests { }, ]; - let res = canonicalize_dependent_events(&inputs); + let res = canonicalize_dependent_requests(&inputs); // The canonicalization shouldn't depend per event id. assert_eq!(res.len(), 2); @@ -1699,10 +1725,10 @@ mod tests { for dependent in res { if dependent.own_transaction_id == child1 { assert_eq!(dependent.parent_transaction_id, txn1); - assert_matches!(dependent.kind, DependentQueuedEventKind::Redact); + assert_matches!(dependent.kind, DependentQueuedRequestKind::RedactEvent); } else { assert_eq!(dependent.parent_transaction_id, txn2); - assert_matches!(dependent.kind, DependentQueuedEventKind::Edit { .. }); + assert_matches!(dependent.kind, DependentQueuedRequestKind::EditEvent { .. }); } } } @@ -1713,17 +1739,17 @@ mod tests { let txn = TransactionId::new(); let react_id = ChildTransactionId::new(); - let react = DependentQueuedEvent { + let react = DependentQueuedRequest { own_transaction_id: react_id.clone(), - kind: DependentQueuedEventKind::React { key: "🧠".to_owned() }, + kind: DependentQueuedRequestKind::ReactEvent { key: "🧠".to_owned() }, parent_transaction_id: txn.clone(), event_id: None, }; let edit_id = ChildTransactionId::new(); - let edit = DependentQueuedEvent { + let edit = DependentQueuedRequest { own_transaction_id: edit_id.clone(), - kind: DependentQueuedEventKind::Edit { + kind: DependentQueuedRequestKind::EditEvent { new_content: SerializableEventContent::new( &RoomMessageEventContent::text_plain("edit").into(), ) @@ -1733,7 +1759,7 @@ mod tests { event_id: None, }; - let res = canonicalize_dependent_events(&[react, edit]); + let res = canonicalize_dependent_requests(&[react, edit]); assert_eq!(res.len(), 2); assert_eq!(res[0].own_transaction_id, edit_id); diff --git a/crates/matrix-sdk/tests/integration/send_queue.rs b/crates/matrix-sdk/tests/integration/send_queue.rs index c88ea5c6934..55455dd0ff9 100644 --- a/crates/matrix-sdk/tests/integration/send_queue.rs +++ b/crates/matrix-sdk/tests/integration/send_queue.rs @@ -1799,7 +1799,7 @@ async fn test_reloading_rooms_with_unsent_events() { .unwrap(); set_client_session(&client).await; - client.send_queue().respawn_tasks_for_rooms_with_unsent_events().await; + client.send_queue().respawn_tasks_for_rooms_with_unsent_requests().await; // Let the sending queues process events. sleep(Duration::from_secs(1)).await;