Skip to content

Commit

Permalink
test(send queue): add more tests for cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
bnjbvr committed Nov 14, 2024
1 parent b7d4be9 commit 9b6de4e
Showing 1 changed file with 243 additions and 12 deletions.
255 changes: 243 additions & 12 deletions crates/matrix-sdk/tests/integration/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use wiremock::{Request, ResponseTemplate};
/// Queues an attachment whenever the actual data/mime type etc. don't matter.
///
/// Returns the filename, for sanity check purposes.
async fn queue_attachment_no_thumbnail(q: &RoomSendQueue) -> &'static str {
async fn queue_attachment_no_thumbnail(q: &RoomSendQueue) -> (SendHandle, &'static str) {
let filename = "surprise.jpeg.exe";
let content_type = mime::IMAGE_JPEG;
let data = b"hello world".to_vec();
Expand All @@ -54,10 +54,11 @@ async fn queue_attachment_no_thumbnail(q: &RoomSendQueue) -> &'static str {
size: Some(uint!(42)),
blurhash: None,
}));
q.send_attachment(filename, content_type, data, config)
let handle = q
.send_attachment(filename, content_type, data, config)
.await
.expect("queuing the attachment works");
filename
(handle, filename)
}

/// Queues an attachment whenever the actual data/mime type etc. don't matter,
Expand Down Expand Up @@ -2006,7 +2007,7 @@ async fn test_media_upload_retry() {

// Send the media.
assert!(watch.is_empty());
let filename = queue_attachment_no_thumbnail(&q).await;
let (_handle, filename) = queue_attachment_no_thumbnail(&q).await;

// Observe the local echo.
let (event_txn, _send_handle, content) = assert_update!(watch => local echo event);
Expand Down Expand Up @@ -2075,7 +2076,7 @@ async fn test_unwedging_media_upload() {

// Send the media.
assert!(watch.is_empty());
let filename = queue_attachment_no_thumbnail(&q).await;
let (_handle, filename) = queue_attachment_no_thumbnail(&q).await;

// Observe the local echo.
let (event_txn, send_handle, content) = assert_update!(watch => local echo event);
Expand Down Expand Up @@ -2196,7 +2197,7 @@ async fn test_media_event_is_sent_in_order() {
mock.mock_room_send().ok(event_id!("$text")).mock_once().mount().await;

// 2. Queue the media.
let filename = queue_attachment_no_thumbnail(&q).await;
let (_handle, filename) = queue_attachment_no_thumbnail(&q).await;

// 3. Queue the message.
q.send(RoomMessageEventContent::text_plain("hello world").into()).await.unwrap();
Expand Down Expand Up @@ -2311,12 +2312,68 @@ async fn test_cancel_upload_with_thumbnail_active() {
// Have the thumbnail upload take forever and time out, if continued. This will
// be interrupted when aborting, so this will never have to complete.
mock.mock_upload()
.respond_with(ResponseTemplate::new(200).set_delay(Duration::from_secs(60)).set_body_json(
json!({
"mxc_id": "mxc://sdk.rs/unreachable"
}),
))
.expect(0)
.respond_with(ResponseTemplate::new(200).set_delay(Duration::from_secs(60)))
.expect(1)
.mount()
.await;

// Send the media.
assert!(watch.is_empty());

let (upload_handle, filename) = queue_attachment_with_thumbnail(&q).await;

let (upload_txn, _send_handle, content) = assert_update!(watch => local echo event);
assert_let!(MessageType::Image(img_content) = content.msgtype);
assert_eq!(img_content.filename(), filename);

// Let the upload request start.
sleep(Duration::from_millis(500)).await;

// Abort the upload.
abort_and_verify(&client, &mut watch, img_content, upload_handle, upload_txn).await;

// To prove we're not waiting for the upload to finish, send a message and
// observe it's immediately sent.
q.send(RoomMessageEventContent::text_plain("hi").into()).await.unwrap();
let (msg_txn, _handle) = assert_update!(watch => local echo { body = "hi" });
assert_update!(watch => sent { txn = msg_txn, });

// That's all, folks!
assert!(watch.is_empty());
}

#[async_test]
async fn test_cancel_upload_with_uploaded_thumbnail_and_file_active() {
let mock = MatrixMockServer::new().await;

// Mark the room as joined.
let room_id = room_id!("!a:b.c");
let client = mock.client_builder().build().await;
let room = mock.sync_joined_room(&client, room_id).await;

let q = room.send_queue();

let (local_echoes, mut watch) = q.subscribe().await.unwrap();
assert!(local_echoes.is_empty());

// Prepare endpoints.
mock.mock_room_state_encryption().plain().mount().await;
mock.mock_room_send().ok(event_id!("$msg")).mock_once().named("send event").mount().await;

// Have the thumbnail upload finish early.
mock.mock_upload()
.ok(mxc_uri!("mxc://sdk.rs/thumbnail"))
.mock_once()
.named("thumbnail upload")
.mount()
.await;

// Have the file upload take forever and time out, if continued. This will
// be interrupted when aborting, so this will never have to complete.
mock.mock_upload()
.respond_with(ResponseTemplate::new(200).set_delay(Duration::from_secs(60)))
.expect(1)
.named("file upload")
.mount()
.await;

Expand All @@ -2329,6 +2386,12 @@ async fn test_cancel_upload_with_thumbnail_active() {
assert_let!(MessageType::Image(img_content) = content.msgtype);
assert_eq!(img_content.filename(), filename);

// The thumbnail uploads just fine.
assert_update!(watch => uploaded { related_to = upload_txn, mxc = mxc_uri!("mxc://sdk.rs/thumbnail") });

// Let the file upload request start.
sleep(Duration::from_millis(500)).await;

// Abort the upload.
abort_and_verify(&client, &mut watch, img_content, upload_handle, upload_txn).await;

Expand All @@ -2341,3 +2404,171 @@ async fn test_cancel_upload_with_thumbnail_active() {
// That's all, folks!
assert!(watch.is_empty());
}

#[async_test]
async fn test_cancel_upload_only_file_with_file_active() {
let mock = MatrixMockServer::new().await;

// Mark the room as joined.
let room_id = room_id!("!a:b.c");
let client = mock.client_builder().build().await;
let room = mock.sync_joined_room(&client, room_id).await;

let q = room.send_queue();

let (local_echoes, mut watch) = q.subscribe().await.unwrap();
assert!(local_echoes.is_empty());

// Prepare endpoints.
mock.mock_room_state_encryption().plain().mount().await;
mock.mock_room_send().ok(event_id!("$msg")).mock_once().named("send event").mount().await;

// Have the file upload take forever and time out, if continued. This will
// be interrupted when aborting, so this will never have to complete.
mock.mock_upload()
.respond_with(ResponseTemplate::new(200).set_delay(Duration::from_secs(60)))
.expect(1)
.named("file upload")
.mount()
.await;

// Send the media.
assert!(watch.is_empty());

let (upload_handle, filename) = queue_attachment_no_thumbnail(&q).await;

let (upload_txn, _send_handle, content) = assert_update!(watch => local echo event);
assert_let!(MessageType::Image(img_content) = content.msgtype);
assert_eq!(img_content.filename(), filename);

// Let the upload request start.
sleep(Duration::from_millis(500)).await;

// Abort the upload.
let aborted = upload_handle.abort().await.unwrap();
assert!(aborted, "upload must have been aborted");

assert_update!(watch => cancelled { txn = upload_txn });

// The event cache doesn't contain the medias anymore.
client
.media()
.get_media_content(
&MediaRequestParameters { source: img_content.source, format: MediaFormat::File },
true,
)
.await
.unwrap_err();

// To prove we're not waiting for the upload to finish, send a message and
// observe it's immediately sent.
q.send(RoomMessageEventContent::text_plain("hi").into()).await.unwrap();
let (msg_txn, _handle) = assert_update!(watch => local echo { body = "hi" });
assert_update!(watch => sent { txn = msg_txn, });

// That's all, folks!
assert!(watch.is_empty());
}

#[async_test]
async fn test_cancel_upload_while_sending_event() {
let mock = MatrixMockServer::new().await;

// Mark the room as joined.
let room_id = room_id!("!a:b.c");
let client = mock.client_builder().build().await;
let room = mock.sync_joined_room(&client, room_id).await;

let q = room.send_queue();

let (local_echoes, mut watch) = q.subscribe().await.unwrap();
assert!(local_echoes.is_empty());

// Prepare endpoints.
mock.mock_room_state_encryption().plain().mount().await;

// File upload will succeed immediately.
mock.mock_upload()
.ok(mxc_uri!("mxc://sdk.rs/media"))
.mock_once()
.named("file upload")
.mount()
.await;

// Sending of the media event will take 1 second, so we can abort it while it's
// happening.
mock.mock_room_send()
.respond_with(ResponseTemplate::new(200).set_delay(Duration::from_secs(1)).set_body_json(
json!({
"event_id": "$media"
}),
))
.mock_once()
.named("send event")
.mock_once()
.mount()
.await;

// A redaction will happen because the abort happens after the event is getting
// sent.
mock.mock_room_redact().ok(event_id!("$redaction")).mock_once().mount().await;

// Send the media.
assert!(watch.is_empty());

let (upload_handle, filename) = queue_attachment_no_thumbnail(&q).await;

let (upload_txn, _send_handle, content) = assert_update!(watch => local echo event);
assert_let!(MessageType::Image(local_content) = content.msgtype);
assert_eq!(local_content.filename(), filename);

assert_update!(watch => uploaded { related_to = upload_txn, mxc = mxc_uri!("mxc://sdk.rs/media") });

let edit_msg = assert_update!(watch => edit local echo { txn = upload_txn });
assert_let!(MessageType::Image(remote_content) = edit_msg.msgtype);
assert_let!(MediaSource::Plain(new_uri) = &remote_content.source);
assert_eq!(new_uri, mxc_uri!("mxc://sdk.rs/media"));

// Let the upload request start.
sleep(Duration::from_millis(250)).await;

// Abort the upload.
let aborted = upload_handle.abort().await.unwrap();
assert!(aborted, "upload must have been aborted");

// We get a local echo for the cancelled media event…
assert_update!(watch => cancelled { txn = upload_txn });
// …But the event is still sent, before getting redacted.
assert_update!(watch => sent { txn = upload_txn, });

// The event cache doesn't contain the media with the local URI.
client
.media()
.get_media_content(
&MediaRequestParameters { source: local_content.source, format: MediaFormat::File },
true,
)
.await
.unwrap_err();

// But it does contain the media with the remote URI, which hasn't been removed
// from the remote server.
client
.media()
.get_media_content(
&MediaRequestParameters { source: remote_content.source, format: MediaFormat::File },
true,
)
.await
.unwrap();

// Let things settle (and the redaction endpoint be called).
sleep(Duration::from_secs(1)).await;

// Trying to abort after it's been sent/redacted is a no-op
let aborted = upload_handle.abort().await.unwrap();
assert!(aborted.not());

// That's all, folks!
assert!(watch.is_empty());
}

0 comments on commit 9b6de4e

Please sign in to comment.