Skip to content

Commit

Permalink
refactor(base): add a QueuedRequestKind enum
Browse files Browse the repository at this point in the history
In a next commit, the `QueuedEvent` will be renamed to `QueuedRequest`.
This specifies which kind of request we want to send with the send
queue; for now, it can only be an event.
  • Loading branch information
bnjbvr committed Oct 25, 2024
1 parent 3071a68 commit 7c314ef
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 61 deletions.
8 changes: 4 additions & 4 deletions crates/matrix-sdk-base/src/store/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1219,7 +1219,7 @@ impl StateStoreIntegrationTests for DynStateStore {
{
assert_eq!(pending[0].transaction_id, txn0);

let deserialized = pending[0].event.deserialize().unwrap();
let deserialized = pending[0].as_event().unwrap().deserialize().unwrap();
assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
assert_eq!(content.body(), "msg0");

Expand All @@ -1246,7 +1246,7 @@ impl StateStoreIntegrationTests for DynStateStore {
assert_eq!(pending[0].transaction_id, txn0);

for i in 0..4 {
let deserialized = pending[i].event.deserialize().unwrap();
let deserialized = pending[i].as_event().unwrap().deserialize().unwrap();
assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
assert_eq!(content.body(), format!("msg{i}"));
assert!(!pending[i].is_wedged());
Expand Down Expand Up @@ -1293,15 +1293,15 @@ impl StateStoreIntegrationTests for DynStateStore {
{
assert_eq!(pending[2].transaction_id, *txn2);

let deserialized = pending[2].event.deserialize().unwrap();
let deserialized = pending[2].as_event().unwrap().deserialize().unwrap();
assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
assert_eq!(content.body(), "wow that's a cool test");

assert!(!pending[2].is_wedged());

for i in 0..4 {
if i != 2 {
let deserialized = pending[i].event.deserialize().unwrap();
let deserialized = pending[i].as_event().unwrap().deserialize().unwrap();
assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
assert_eq!(content.body(), format!("msg{i}"));

Expand Down
17 changes: 7 additions & 10 deletions crates/matrix-sdk-base/src/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ use tracing::{debug, instrument, trace, warn};
use super::{
send_queue::{ChildTransactionId, QueuedEvent, SerializableEventContent},
traits::{ComposerDraft, ServerCapabilities},
DependentQueuedEvent, DependentQueuedEventKind, Result, RoomInfo, StateChanges, StateStore,
StoreError,
DependentQueuedEvent, DependentQueuedEventKind, QueuedRequestKind, Result, RoomInfo,
StateChanges, StateStore, StoreError,
};
use crate::{
deserialized_responses::RawAnySyncOrStrippedState, store::QueueWedgeError,
Expand Down Expand Up @@ -806,14 +806,11 @@ impl StateStore for MemoryStore {
&self,
room_id: &RoomId,
transaction_id: OwnedTransactionId,
event: SerializableEventContent,
content: SerializableEventContent,
) -> Result<(), Self::Error> {
self.send_queue_events
.write()
.unwrap()
.entry(room_id.to_owned())
.or_default()
.push(QueuedEvent { event, transaction_id, error: None });
self.send_queue_events.write().unwrap().entry(room_id.to_owned()).or_default().push(
QueuedEvent { kind: QueuedRequestKind::Event { content }, transaction_id, error: None },
);
Ok(())
}

Expand All @@ -832,7 +829,7 @@ impl StateStore for MemoryStore {
.iter_mut()
.find(|item| item.transaction_id == transaction_id)
{
entry.event = content;
entry.kind = QueuedRequestKind::Event { content };
entry.error = None;
Ok(true)
} else {
Expand Down
2 changes: 1 addition & 1 deletion crates/matrix-sdk-base/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub use self::{
memory_store::MemoryStore,
send_queue::{
ChildTransactionId, DependentQueuedEvent, DependentQueuedEventKind, QueueWedgeError,
QueuedEvent, SerializableEventContent,
QueuedEvent, QueuedRequestKind, SerializableEventContent,
},
traits::{
ComposerDraft, ComposerDraftType, DynStateStore, IntoStateStore, ServerCapabilities,
Expand Down
34 changes: 26 additions & 8 deletions crates/matrix-sdk-base/src/store/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use std::{collections::BTreeMap, fmt, ops::Deref};

use as_variant::as_variant;
use ruma::{
events::{AnyMessageLikeEventContent, EventContent as _, RawExt as _},
serde::Raw,
Expand Down Expand Up @@ -62,26 +63,43 @@ impl SerializableEventContent {
/// Returns the raw event content along with its type.
///
/// Useful for callers manipulating custom events.
pub fn raw(self) -> (Raw<AnyMessageLikeEventContent>, String) {
(self.event, self.event_type)
pub fn raw(&self) -> (&Raw<AnyMessageLikeEventContent>, &str) {
(&self.event, &self.event_type)
}
}

/// An event to be sent with a send queue.
/// The kind of a send queue request.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum QueuedRequestKind {
/// An event to be sent via the send queue.
Event {
/// The content of the message-like event we'd like to send.
content: SerializableEventContent,
},
}

/// A request to be sent with a send queue.
#[derive(Clone)]
pub struct QueuedEvent {
/// The content of the message-like event we'd like to send.
pub event: SerializableEventContent,
/// The kind of queued request we're going to send.
pub kind: QueuedRequestKind,

/// Unique transaction id for the queued event, acting as a key.
/// Unique transaction id for the queued request, acting as a key.
pub transaction_id: OwnedTransactionId,

/// Set when the event couldn't be sent because of an unrecoverable API
/// error. `None` if the event is in queue for being sent.
/// Error returned when the request couldn't be sent and is stuck in the
/// unrecoverable state.
///
/// `None` if the request is in the queue, waiting to be sent.
pub error: Option<QueueWedgeError>,
}

impl QueuedEvent {
/// Returns `Some` if the queued request is about sending an event.
pub fn as_event(&self) -> Option<&SerializableEventContent> {
as_variant!(&self.kind, QueuedRequestKind::Event { content } => content)
}

/// True if the event couldn't be sent because of an unrecoverable API
/// error. See [`Self::error`] for more details on the reason.
pub fn is_wedged(&self) -> bool {
Expand Down
76 changes: 48 additions & 28 deletions crates/matrix-sdk-indexeddb/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use matrix_sdk_base::{
deserialized_responses::RawAnySyncOrStrippedState,
store::{
ChildTransactionId, ComposerDraft, DependentQueuedEvent, DependentQueuedEventKind,
QueuedEvent, SerializableEventContent, ServerCapabilities, StateChanges, StateStore,
StoreError,
QueuedEvent, QueuedRequestKind, SerializableEventContent, ServerCapabilities, StateChanges,
StateStore, StoreError,
},
MinimalRoomMemberEvent, RoomInfo, RoomMemberships, StateStoreDataKey, StateStoreDataValue,
};
Expand Down Expand Up @@ -431,14 +431,36 @@ struct PersistedQueuedEvent {
pub room_id: OwnedRoomId,

// All these fields are the same as in [`QueuedEvent`].
event: SerializableEventContent,
/// Kind. Optional because it might be missing from previous formats.
kind: Option<QueuedRequestKind>,
transaction_id: OwnedTransactionId,

// Deprecated (from old format), now replaced with error field.
// Kept here for migration
pub error: Option<QueueWedgeError>,

// Migrated fields: keep these private, they're not used anymore elsewhere in the code base.
/// Deprecated (from old format), now replaced with error field.
is_wedged: Option<bool>,

pub error: Option<QueueWedgeError>,
event: Option<SerializableEventContent>,
}

impl PersistedQueuedEvent {
fn into_queued_event(self) -> Option<QueuedEvent> {
let kind =
self.kind.or_else(|| self.event.map(|content| QueuedRequestKind::Event { content }))?;

let error = match self.is_wedged {
Some(true) => {
// Migrate to a generic error.
Some(QueueWedgeError::GenericApiError {
msg: "local echo failed to send in a previous session".into(),
})
}
_ => self.error,
};

Some(QueuedEvent { kind, transaction_id: self.transaction_id, error })
}
}

// Small hack to have the following macro invocation act as the appropriate
Expand Down Expand Up @@ -1329,10 +1351,11 @@ impl_state_store!({
// Push the new event.
prev.push(PersistedQueuedEvent {
room_id: room_id.to_owned(),
event: content,
kind: Some(QueuedRequestKind::Event { content }),
transaction_id,
is_wedged: None,
error: None,
is_wedged: None,
event: None,
});

// Save the new vector into db.
Expand Down Expand Up @@ -1369,9 +1392,12 @@ impl_state_store!({

// Modify the one event.
if let Some(entry) = prev.iter_mut().find(|entry| entry.transaction_id == transaction_id) {
entry.event = content;
entry.is_wedged = None;
entry.kind = Some(QueuedRequestKind::Event { content });
// Reset the error state.
entry.error = None;
// Remove migrated fields.
entry.is_wedged = None;
entry.event = None;

// Save the new vector into db.
obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
Expand Down Expand Up @@ -1435,22 +1461,7 @@ impl_state_store!({
|val| self.deserialize_value::<Vec<PersistedQueuedEvent>>(&val),
)?;

Ok(prev
.into_iter()
.map(|item| QueuedEvent {
event: item.event,
transaction_id: item.transaction_id,
error: match item.is_wedged {
Some(true) => {
// migrate a generic error
Some(QueueWedgeError::GenericApiError {
msg: "local echo failed to send in a previous session".into(),
})
}
_ => item.error,
},
})
.collect())
Ok(prev.into_iter().filter_map(PersistedQueuedEvent::into_queued_event).collect())
}

async fn update_send_queue_event_status(
Expand Down Expand Up @@ -1663,7 +1674,8 @@ impl From<&StrippedRoomMemberEvent> for RoomMember {

#[cfg(test)]
mod migration_tests {
use matrix_sdk_base::store::SerializableEventContent;
use assert_matches2::assert_matches;
use matrix_sdk_base::store::{QueuedRequestKind, SerializableEventContent};
use ruma::{
events::room::message::RoomMessageEventContent, room_id, OwnedRoomId, OwnedTransactionId,
TransactionId,
Expand Down Expand Up @@ -1698,7 +1710,7 @@ mod migration_tests {
let old_persisted_queue_event = OldPersistedQueuedEvent {
room_id: room_a_id.to_owned(),
event: content,
transaction_id,
transaction_id: transaction_id.clone(),
is_wedged: true,
};

Expand All @@ -1710,6 +1722,14 @@ mod migration_tests {

assert_eq!(new_persisted.is_wedged, Some(true));
assert!(new_persisted.error.is_none());

assert!(new_persisted.event.is_some());
assert!(new_persisted.kind.is_none());

let queued = new_persisted.into_queued_event().unwrap();
assert_matches!(queued.kind, QueuedRequestKind::Event { .. });
assert_eq!(queued.transaction_id, transaction_id);
assert!(queued.error.is_some());
}
}

Expand Down
5 changes: 3 additions & 2 deletions crates/matrix-sdk-sqlite/src/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use matrix_sdk_base::{
deserialized_responses::{RawAnySyncOrStrippedState, SyncOrStrippedState},
store::{
migration_helpers::RoomInfoV1, ChildTransactionId, DependentQueuedEvent,
DependentQueuedEventKind, QueueWedgeError, QueuedEvent, SerializableEventContent,
DependentQueuedEventKind, QueueWedgeError, QueuedEvent, QueuedRequestKind,
SerializableEventContent,
},
MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore,
StateStoreDataKey, StateStoreDataValue,
Expand Down Expand Up @@ -1767,7 +1768,7 @@ impl StateStore for SqliteStateStore {
for entry in res {
queued_events.push(QueuedEvent {
transaction_id: entry.0.into(),
event: self.deserialize_json(&entry.1)?,
kind: QueuedRequestKind::Event { content: self.deserialize_json(&entry.1)? },
error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
});
}
Expand Down
18 changes: 10 additions & 8 deletions crates/matrix-sdk/src/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use std::{
use matrix_sdk_base::{
store::{
ChildTransactionId, DependentQueuedEvent, DependentQueuedEventKind, QueueWedgeError,
QueuedEvent, SerializableEventContent,
QueuedEvent, QueuedRequestKind, SerializableEventContent,
},
RoomState, StoreError,
};
Expand Down Expand Up @@ -452,7 +452,7 @@ impl RoomSendQueue {
continue;
};

let (event, event_type) = queued_event.event.raw();
let (event, event_type) = queued_event.as_event().unwrap().raw();
match room
.send_raw(&event_type.to_string(), event)
.with_transaction_id(&queued_event.transaction_id)
Expand Down Expand Up @@ -881,13 +881,15 @@ impl QueueStorage {
store.load_send_queue_events(&self.room_id).await?.into_iter().map(|queued| {
LocalEcho {
transaction_id: queued.transaction_id.clone(),
content: LocalEchoContent::Event {
serialized_event: queued.event,
send_handle: SendHandle {
room: room.clone(),
transaction_id: queued.transaction_id,
content: match queued.kind {
QueuedRequestKind::Event { content } => LocalEchoContent::Event {
serialized_event: content,
send_handle: SendHandle {
room: room.clone(),
transaction_id: queued.transaction_id,
},
send_error: queued.error,
},
send_error: queued.error,
},
}
});
Expand Down

0 comments on commit 7c314ef

Please sign in to comment.