Skip to content

Commit

Permalink
refactor(base): rename all send-queue related "events" to "requests"
Browse files Browse the repository at this point in the history
Changelog: Renamed all the send-queue related "events" to "requests", so
  as to generalize usage of the send queue to not-events (e.g. medias,
  redactions, etc.).
  • Loading branch information
bnjbvr committed Oct 29, 2024
1 parent 58d46f0 commit 9c858c1
Show file tree
Hide file tree
Showing 11 changed files with 466 additions and 430 deletions.
2 changes: 1 addition & 1 deletion bindings/matrix-sdk-ffi/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ impl Client {
Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
// Respawn tasks for rooms that had unsent events. At this point we've just
// created the subscriber, so it'll be notified about errors.
q.respawn_tasks_for_rooms_with_unsent_events().await;
q.respawn_tasks_for_rooms_with_unsent_requests().await;

loop {
match subscriber.recv().await {
Expand Down
72 changes: 36 additions & 36 deletions crates/matrix-sdk-base/src/store/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use ruma::{
};
use serde_json::{json, value::Value as JsonValue};

use super::{DependentQueuedEventKind, DynStateStore, ServerCapabilities};
use super::{DependentQueuedRequestKind, DynStateStore, ServerCapabilities};
use crate::{
deserialized_responses::MemberEvent,
store::{ChildTransactionId, QueueWedgeError, Result, SerializableEventContent, StateStoreExt},
Expand Down Expand Up @@ -1202,18 +1202,18 @@ impl StateStoreIntegrationTests for DynStateStore {
let room_id = room_id!("!test_send_queue:localhost");

// No queued event in store at first.
let events = self.load_send_queue_events(room_id).await.unwrap();
let events = self.load_send_queue_requests(room_id).await.unwrap();
assert!(events.is_empty());

// Saving one thing should work.
let txn0 = TransactionId::new();
let event0 =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("msg0").into())
.unwrap();
self.save_send_queue_event(room_id, txn0.clone(), event0).await.unwrap();
self.save_send_queue_request(room_id, txn0.clone(), event0).await.unwrap();

// Reading it will work.
let pending = self.load_send_queue_events(room_id).await.unwrap();
let pending = self.load_send_queue_requests(room_id).await.unwrap();

assert_eq!(pending.len(), 1);
{
Expand All @@ -1234,11 +1234,11 @@ impl StateStoreIntegrationTests for DynStateStore {
)
.unwrap();

self.save_send_queue_event(room_id, txn, event).await.unwrap();
self.save_send_queue_request(room_id, txn, event).await.unwrap();
}

// Reading all the events should work.
let pending = self.load_send_queue_events(room_id).await.unwrap();
let pending = self.load_send_queue_requests(room_id).await.unwrap();

// All the events should be retrieved, in the same order.
assert_eq!(pending.len(), 4);
Expand All @@ -1254,7 +1254,7 @@ impl StateStoreIntegrationTests for DynStateStore {

// Marking an event as wedged works.
let txn2 = &pending[2].transaction_id;
self.update_send_queue_event_status(
self.update_send_queue_request_status(
room_id,
txn2,
Some(QueueWedgeError::GenericApiError { msg: "Oops".to_owned() }),
Expand All @@ -1263,7 +1263,7 @@ impl StateStoreIntegrationTests for DynStateStore {
.unwrap();

// And it is reflected.
let pending = self.load_send_queue_events(room_id).await.unwrap();
let pending = self.load_send_queue_requests(room_id).await.unwrap();

// All the events should be retrieved, in the same order.
assert_eq!(pending.len(), 4);
Expand All @@ -1284,10 +1284,10 @@ impl StateStoreIntegrationTests for DynStateStore {
&RoomMessageEventContent::text_plain("wow that's a cool test").into(),
)
.unwrap();
self.update_send_queue_event(room_id, txn2, event0).await.unwrap();
self.update_send_queue_request(room_id, txn2, event0).await.unwrap();

// And it is reflected.
let pending = self.load_send_queue_events(room_id).await.unwrap();
let pending = self.load_send_queue_requests(room_id).await.unwrap();

assert_eq!(pending.len(), 4);
{
Expand All @@ -1311,10 +1311,10 @@ impl StateStoreIntegrationTests for DynStateStore {
}

// Removing an event works.
self.remove_send_queue_event(room_id, &txn0).await.unwrap();
self.remove_send_queue_request(room_id, &txn0).await.unwrap();

// And it is reflected.
let pending = self.load_send_queue_events(room_id).await.unwrap();
let pending = self.load_send_queue_requests(room_id).await.unwrap();

assert_eq!(pending.len(), 3);
assert_eq!(pending[1].transaction_id, *txn2);
Expand All @@ -1332,7 +1332,7 @@ impl StateStoreIntegrationTests for DynStateStore {
let event =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("room2").into())
.unwrap();
self.save_send_queue_event(room_id2, txn.clone(), event).await.unwrap();
self.save_send_queue_request(room_id2, txn.clone(), event).await.unwrap();
}

// Add and remove one event for room3.
Expand All @@ -1342,14 +1342,14 @@ impl StateStoreIntegrationTests for DynStateStore {
let event =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("room3").into())
.unwrap();
self.save_send_queue_event(room_id3, txn.clone(), event).await.unwrap();
self.save_send_queue_request(room_id3, txn.clone(), event).await.unwrap();

self.remove_send_queue_event(room_id3, &txn).await.unwrap();
self.remove_send_queue_request(room_id3, &txn).await.unwrap();
}

// Query all the rooms which have unsent events. Per the previous steps,
// it should be room1 and room2, not room3.
let outstanding_rooms = self.load_rooms_with_unsent_events().await.unwrap();
let outstanding_rooms = self.load_rooms_with_unsent_requests().await.unwrap();
assert_eq!(outstanding_rooms.len(), 2);
assert!(outstanding_rooms.iter().any(|room| room == room_id));
assert!(outstanding_rooms.iter().any(|room| room == room_id2));
Expand All @@ -1363,77 +1363,77 @@ impl StateStoreIntegrationTests for DynStateStore {
let event0 =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into())
.unwrap();
self.save_send_queue_event(room_id, txn0.clone(), event0).await.unwrap();
self.save_send_queue_request(room_id, txn0.clone(), event0).await.unwrap();

// No dependents, to start with.
assert!(self.load_dependent_send_queue_events(room_id).await.unwrap().is_empty());
assert!(self.load_dependent_queued_requests(room_id).await.unwrap().is_empty());

// Save a redaction for that event.
let child_txn = ChildTransactionId::new();
self.save_dependent_send_queue_event(
self.save_dependent_queued_request(
room_id,
&txn0,
child_txn.clone(),
DependentQueuedEventKind::Redact,
DependentQueuedRequestKind::RedactEvent,
)
.await
.unwrap();

// It worked.
let dependents = self.load_dependent_send_queue_events(room_id).await.unwrap();
let dependents = self.load_dependent_queued_requests(room_id).await.unwrap();
assert_eq!(dependents.len(), 1);
assert_eq!(dependents[0].parent_transaction_id, txn0);
assert_eq!(dependents[0].own_transaction_id, child_txn);
assert!(dependents[0].event_id.is_none());
assert_matches!(dependents[0].kind, DependentQueuedEventKind::Redact);
assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);

// Update the event id.
let event_id = owned_event_id!("$1");
let num_updated =
self.update_dependent_send_queue_event(room_id, &txn0, event_id.clone()).await.unwrap();
self.update_dependent_queued_request(room_id, &txn0, event_id.clone()).await.unwrap();
assert_eq!(num_updated, 1);

// It worked.
let dependents = self.load_dependent_send_queue_events(room_id).await.unwrap();
let dependents = self.load_dependent_queued_requests(room_id).await.unwrap();
assert_eq!(dependents.len(), 1);
assert_eq!(dependents[0].parent_transaction_id, txn0);
assert_eq!(dependents[0].own_transaction_id, child_txn);
assert_eq!(dependents[0].event_id.as_ref(), Some(&event_id));
assert_matches!(dependents[0].kind, DependentQueuedEventKind::Redact);
assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);

// Now remove it.
let removed = self
.remove_dependent_send_queue_event(room_id, &dependents[0].own_transaction_id)
.remove_dependent_queued_request(room_id, &dependents[0].own_transaction_id)
.await
.unwrap();
assert!(removed);

// It worked.
assert!(self.load_dependent_send_queue_events(room_id).await.unwrap().is_empty());
assert!(self.load_dependent_queued_requests(room_id).await.unwrap().is_empty());

// Now, inserting a dependent event and removing the original send queue event
// will NOT remove the dependent event.
let txn1 = TransactionId::new();
let event1 =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into())
.unwrap();
self.save_send_queue_event(room_id, txn1.clone(), event1).await.unwrap();
self.save_send_queue_request(room_id, txn1.clone(), event1).await.unwrap();

self.save_dependent_send_queue_event(
self.save_dependent_queued_request(
room_id,
&txn0,
ChildTransactionId::new(),
DependentQueuedEventKind::Redact,
DependentQueuedRequestKind::RedactEvent,
)
.await
.unwrap();
assert_eq!(self.load_dependent_send_queue_events(room_id).await.unwrap().len(), 1);
assert_eq!(self.load_dependent_queued_requests(room_id).await.unwrap().len(), 1);

self.save_dependent_send_queue_event(
self.save_dependent_queued_request(
room_id,
&txn1,
ChildTransactionId::new(),
DependentQueuedEventKind::Edit {
DependentQueuedRequestKind::EditEvent {
new_content: SerializableEventContent::new(
&RoomMessageEventContent::text_plain("edit").into(),
)
Expand All @@ -1442,14 +1442,14 @@ impl StateStoreIntegrationTests for DynStateStore {
)
.await
.unwrap();
assert_eq!(self.load_dependent_send_queue_events(room_id).await.unwrap().len(), 2);
assert_eq!(self.load_dependent_queued_requests(room_id).await.unwrap().len(), 2);

// Remove event0 / txn0.
let removed = self.remove_send_queue_event(room_id, &txn0).await.unwrap();
let removed = self.remove_send_queue_request(room_id, &txn0).await.unwrap();
assert!(removed);

// This has removed none of the dependent events.
let dependents = self.load_dependent_send_queue_events(room_id).await.unwrap();
let dependents = self.load_dependent_queued_requests(room_id).await.unwrap();
assert_eq!(dependents.len(), 2);
}
}
Expand Down
42 changes: 23 additions & 19 deletions crates/matrix-sdk-base/src/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ use ruma::{
use tracing::{debug, instrument, trace, warn};

use super::{
send_queue::{ChildTransactionId, QueuedEvent, SerializableEventContent},
send_queue::{ChildTransactionId, QueuedRequest, SerializableEventContent},
traits::{ComposerDraft, ServerCapabilities},
DependentQueuedEvent, DependentQueuedEventKind, QueuedRequestKind, Result, RoomInfo,
DependentQueuedRequest, DependentQueuedRequestKind, QueuedRequestKind, Result, RoomInfo,
StateChanges, StateStore, StoreError,
};
use crate::{
Expand Down Expand Up @@ -88,8 +88,8 @@ pub struct MemoryStore {
>,
>,
custom: StdRwLock<HashMap<Vec<u8>, Vec<u8>>>,
send_queue_events: StdRwLock<BTreeMap<OwnedRoomId, Vec<QueuedEvent>>>,
dependent_send_queue_events: StdRwLock<BTreeMap<OwnedRoomId, Vec<DependentQueuedEvent>>>,
send_queue_events: StdRwLock<BTreeMap<OwnedRoomId, Vec<QueuedRequest>>>,
dependent_send_queue_events: StdRwLock<BTreeMap<OwnedRoomId, Vec<DependentQueuedRequest>>>,
}

impl MemoryStore {
Expand Down Expand Up @@ -802,19 +802,23 @@ impl StateStore for MemoryStore {
Ok(())
}

async fn save_send_queue_event(
async fn save_send_queue_request(
&self,
room_id: &RoomId,
transaction_id: OwnedTransactionId,
content: SerializableEventContent,
) -> Result<(), Self::Error> {
self.send_queue_events.write().unwrap().entry(room_id.to_owned()).or_default().push(
QueuedEvent { kind: QueuedRequestKind::Event { content }, transaction_id, error: None },
QueuedRequest {
kind: QueuedRequestKind::Event { content },
transaction_id,
error: None,
},
);
Ok(())
}

async fn update_send_queue_event(
async fn update_send_queue_request(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
Expand All @@ -837,7 +841,7 @@ impl StateStore for MemoryStore {
}
}

async fn remove_send_queue_event(
async fn remove_send_queue_request(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
Expand All @@ -860,14 +864,14 @@ impl StateStore for MemoryStore {
Ok(false)
}

async fn load_send_queue_events(
async fn load_send_queue_requests(
&self,
room_id: &RoomId,
) -> Result<Vec<QueuedEvent>, Self::Error> {
) -> Result<Vec<QueuedRequest>, Self::Error> {
Ok(self.send_queue_events.write().unwrap().entry(room_id.to_owned()).or_default().clone())
}

async fn update_send_queue_event_status(
async fn update_send_queue_request_status(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
Expand All @@ -887,19 +891,19 @@ impl StateStore for MemoryStore {
Ok(())
}

async fn load_rooms_with_unsent_events(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
Ok(self.send_queue_events.read().unwrap().keys().cloned().collect())
}

async fn save_dependent_send_queue_event(
async fn save_dependent_queued_request(
&self,
room: &RoomId,
parent_transaction_id: &TransactionId,
own_transaction_id: ChildTransactionId,
content: DependentQueuedEventKind,
content: DependentQueuedRequestKind,
) -> Result<(), Self::Error> {
self.dependent_send_queue_events.write().unwrap().entry(room.to_owned()).or_default().push(
DependentQueuedEvent {
DependentQueuedRequest {
kind: content,
parent_transaction_id: parent_transaction_id.to_owned(),
own_transaction_id,
Expand All @@ -909,7 +913,7 @@ impl StateStore for MemoryStore {
Ok(())
}

async fn update_dependent_send_queue_event(
async fn update_dependent_queued_request(
&self,
room: &RoomId,
parent_txn_id: &TransactionId,
Expand All @@ -925,7 +929,7 @@ impl StateStore for MemoryStore {
Ok(num_updated)
}

async fn remove_dependent_send_queue_event(
async fn remove_dependent_queued_request(
&self,
room: &RoomId,
txn_id: &ChildTransactionId,
Expand All @@ -944,10 +948,10 @@ impl StateStore for MemoryStore {
///
/// This returns absolutely all the dependent send queue events, whether
/// they have an event id or not.
async fn load_dependent_send_queue_events(
async fn load_dependent_queued_requests(
&self,
room: &RoomId,
) -> Result<Vec<DependentQueuedEvent>, Self::Error> {
) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
Ok(self.dependent_send_queue_events.read().unwrap().get(room).cloned().unwrap_or_default())
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/matrix-sdk-base/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ pub use self::integration_tests::StateStoreIntegrationTests;
pub use self::{
memory_store::MemoryStore,
send_queue::{
ChildTransactionId, DependentQueuedEvent, DependentQueuedEventKind, QueueWedgeError,
QueuedEvent, QueuedRequestKind, SerializableEventContent,
ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError,
QueuedRequest, QueuedRequestKind, SerializableEventContent,
},
traits::{
ComposerDraft, ComposerDraftType, DynStateStore, IntoStateStore, ServerCapabilities,
Expand Down
Loading

0 comments on commit 9c858c1

Please sign in to comment.