Skip to content

Commit

Permalink
Merge pull request #82 from matrix-org/kegan/ref-TestUnprocessedToDev…
Browse files Browse the repository at this point in the history
…iceMessagesArentLostOnRestart

Refactor TestUnprocessedToDeviceMessagesArentLostOnRestart
  • Loading branch information
kegsay authored Jun 13, 2024
2 parents d0f10b2 + f528a0e commit d6f68e0
Showing 1 changed file with 21 additions and 46 deletions.
67 changes: 21 additions & 46 deletions tests/to_device_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ func TestClientRetriesSendToDevice(t *testing.T) {
// Regression test for https://github.com/vector-im/element-web/issues/23113
// "If you restart (e.g. upgrade) Element while it's waiting to process a m.room_key, it'll drop it and you'll get UISIs"
//
// - Alice (2 devices) and Bob are in an encrypted room.
// - Alice and Bob are in an encrypted room with rotation period msgs = 1
// - Bob's client is shut down temporarily.
// - Alice's 2nd device logs out, which will Alice's 1st device to cycle room keys.
// - Alice sends a message, this will cause a new room key to be sent.
// - Start sniffing /sync traffic. Bob's client comes back.
// - When /sync shows a to-device message from Alice (indicating the room key), sleep(1ms) then SIGKILL Bob.
// - Restart Bob's client.
Expand All @@ -71,32 +71,29 @@ func TestUnprocessedToDeviceMessagesArentLostOnRestart(t *testing.T) {
ForEachClientType(t, func(t *testing.T, clientType api.ClientType) {
// prepare for the test: register all 3 clients and create the room
tc := CreateTestContext(t, clientType, clientType)
roomID := tc.CreateNewEncryptedRoom(t, tc.Alice, EncRoomOptions.Invite([]string{tc.Bob.UserID}))
roomID := tc.CreateNewEncryptedRoom(t, tc.Alice,
EncRoomOptions.Invite([]string{tc.Bob.UserID}), EncRoomOptions.RotationPeriodMsgs(1),
)
tc.Bob.MustJoinRoom(t, roomID, []string{clientType.HS})
alice2 := tc.Deployment.Login(t, clientType.HS, tc.Alice, helpers.LoginOpts{
DeviceID: "ALICE_TWO",
Password: "complement-crypto-password",
})
// the initial setup for rust/js is the same.
// login bob first so we have OTKs
bob := tc.MustLoginClient(t, tc.Bob, tc.BobClientType, WithPersistentStorage())
tc.WithAliceSyncing(t, func(alice api.Client) {
// we will close this in the test, no defer
bobStopSyncing := bob.MustStartSyncing(t)
tc.WithClientSyncing(t, tc.AliceClientType, alice2, func(alice2 api.Client) { // sync to ensure alice2 has keys uploaded
// check the room works
alice.SendMessage(t, roomID, "Hello World!")
bob.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody("Hello World!")).Waitf(t, 2*time.Second, "bob did not see event with body 'Hello World!'")
})
// stop bob's client
// check the room works
alice.SendMessage(t, roomID, "Hello World!")
bob.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody("Hello World!")).Waitf(t, 2*time.Second, "bob did not see event with body 'Hello World!'")
// stop bob's client, but grab the access token first so we can re-use it
bobOpts := bob.Opts()
bobStopSyncing()
bob.Logf(t, "Bob is about to be Closed()")
bob.Close(t)

// send a lot of to-device messages to bob to increase the window in which to SIGKILL the client.
// It's unimportant what these are.
for i := 0; i < 60; i++ {
alice2.MustSendToDeviceMessages(t, "m.room_key_request", map[string]map[string]map[string]interface{}{
tc.Alice.MustSendToDeviceMessages(t, "m.room_key_request", map[string]map[string]map[string]interface{}{
bob.UserID(): {
"*": {
"action": "request_cancellation",
Expand All @@ -108,34 +105,25 @@ func TestUnprocessedToDeviceMessagesArentLostOnRestart(t *testing.T) {
}
t.Logf("to-device msgs sent")

// logout alice 2
alice2.MustDo(t, "POST", []string{"_matrix", "client", "v3", "logout"})

// if clients cycle room keys eagerly then the above logout will cause room keys to be sent.
// We want to wait for that to happen before sending the kick event. This is notable for JS.
time.Sleep(time.Second)

// send a message as alice to make a new room key (if we didn't already on the /logout above)
// send a message as alice to make a new room key
eventID := alice.SendMessage(t, roomID, "Kick to make a new room key!")

// client specific impls to handle restarts.
switch clientType.Lang {
case api.ClientTypeRust:
testUnprocessedToDeviceMessagesArentLostOnRestartRust(t, tc, alice.UserID(), bob.Opts(), roomID, eventID)
testUnprocessedToDeviceMessagesArentLostOnRestartRust(t, tc, bobOpts, roomID, eventID)
case api.ClientTypeJS:
testUnprocessedToDeviceMessagesArentLostOnRestartJS(t, tc, bob.Opts(), roomID, eventID)
testUnprocessedToDeviceMessagesArentLostOnRestartJS(t, tc, bobOpts, roomID, eventID)
default:
t.Fatalf("unknown lang: %s", clientType.Lang)
}
})
})
}

// TODO: unsure if this is actually testing the regression now.
func testUnprocessedToDeviceMessagesArentLostOnRestartRust(t *testing.T, tc *TestContext, aliceUserID string, bobOpts api.ClientCreationOpts, roomID, eventID string) {
func testUnprocessedToDeviceMessagesArentLostOnRestartRust(t *testing.T, tc *TestContext, bobOpts api.ClientCreationOpts, roomID, eventID string) {
// sniff /sync traffic
waitForRoomKey := helpers.NewWaiter()
waitForChangedDeviceList := helpers.NewWaiter()
tc.Deployment.WithSniffedEndpoint(t, "/sync", func(cd deploy.CallbackData) {
// When /sync shows a to-device message from Alice (indicating the room key), then SIGKILL Bob.
t.Logf("/sync => %v", string(cd.ResponseBody))
Expand All @@ -149,33 +137,20 @@ func testUnprocessedToDeviceMessagesArentLostOnRestartRust(t *testing.T, tc *Tes
}
}
}
for _, changed := range body.Get("extensions.e2ee.device_lists.changed").Array() {
if changed.Str == aliceUserID {
t.Logf("detected alice in device_lists.changed")
waitForChangedDeviceList.Finish()
}
}
}, func() {
// bob comes back online, and will be killed a short while later.
// No need to login as we will reuse the session from before.
// This is critical to ensure we get the room key update as it would have been sent
// to bob's logged in device, not any new logins.
remoteClient := tc.MustCreateMultiprocessClient(t, api.ClientTypeRust, bobOpts)
must.NotError(t, "failed to login", remoteClient.Login(t, remoteClient.Opts()))

// start syncing but don't wait, we wait for the to device event
go func() {
_, err := remoteClient.StartSyncing(t)
if err != nil {
t.Errorf("bob failed to start syncing: %s", err)
}
// we purposefully ignore the error here because we expect the RPC client
// to return an error when the RPC server is sigkilled.
remoteClient.StartSyncing(t)
}()

// send a message which should cycle the room key. Whilst bob's client will be told
// that alice is in device_lists.changed, it won't eagerly cycle the room key when
// that happens, instead waiting for a message send.
waitForChangedDeviceList.Waitf(t, 5*time.Second, "did not see alice in device_lists.changed")
// now send a message after a brief pause to let the client process the device_list
time.Sleep(time.Second)
remoteClient.SendMessage(t, roomID, "kick to make a room key be sent")

waitForRoomKey.Waitf(t, 10*time.Second, "did not see room key")
t.Logf("killing remote bob client")
remoteClient.ForceClose(t)
Expand Down

0 comments on commit d6f68e0

Please sign in to comment.