Skip to content

Commit

Permalink
Add JS version of TestUnprocessedToDeviceMessagesArentLostOnRestart
Browse files Browse the repository at this point in the history
Also:
 - Fix a bug which caused mitm'd requests with no request body to fail
 - Fix a bug which meant you couldn't log '"' in JS log files.
  • Loading branch information
kegsay committed Mar 1, 2024
1 parent a21a4f0 commit 21d682d
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 107 deletions.
28 changes: 23 additions & 5 deletions internal/api/js/js.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ func NewJSClient(t ct.TestLike, opts api.ClientCreationOpts) (api.Client, error)
// "Uncaught (in promise) Error: createUser is undefined, it should be set with setUserCreator()!"
// https://github.com/matrix-org/matrix-js-sdk/blob/76b9c3950bfdfca922bec7f70502ff2da93bd731/src/store/indexeddb.ts#L143
chrome.RunAsyncFn[chrome.Void](t, browser.Ctx, fmt.Sprintf(`
// FIXME: this doesn't seem to work.
// JS SDK doesn't store this for us, so we need to. Do this before making the stores which can error out.
// window.__accessToken = window.localStorage.getItem("complement_crypto_access_token") || undefined;
// console.log("localStorage.getItem(complement_crypto_access_token) => " + window.__accessToken);
window.__store = new IndexedDBStore({
indexedDB: window.indexedDB,
dbName: "%s",
Expand Down Expand Up @@ -137,6 +142,7 @@ func NewJSClient(t ct.TestLike, opts api.ClientCreationOpts) (api.Client, error)
useAuthorizationHeader: %s,
userId: "%s",
deviceId: %s,
accessToken: window.__accessToken || undefined,
store: %s,
cryptoStore: %s,
cryptoCallbacks: {
Expand All @@ -163,8 +169,10 @@ func NewJSClient(t ct.TestLike, opts api.ClientCreationOpts) (api.Client, error)
return Promise.resolve(result);
},
}
});`, opts.BaseURL, "true", opts.UserID, deviceID, store, cryptoStore))

});
await window.__client.initRustCrypto();
`, opts.BaseURL, "true", opts.UserID, deviceID, store, cryptoStore))
jsc.Logf(t, "NewJSClient[%s,%s] created client storage=%v", opts.UserID, opts.DeviceID, opts.PersistentStorage)
return &api.LoggedClient{Client: jsc}, nil
}

Expand All @@ -179,8 +187,7 @@ func (c *JSClient) Login(t ct.TestLike, opts api.ClientCreationOpts) error {
user: "%s",
password: "%s",
device_id: %s,
});
await window.__client.initRustCrypto();`, opts.UserID, opts.Password, deviceID))
});`, opts.UserID, opts.Password, deviceID))

// any events need to log the control string so we get notified
chrome.MustRunAsyncFn[chrome.Void](t, c.browser.Ctx, fmt.Sprintf(`
Expand All @@ -191,6 +198,17 @@ func (c *JSClient) Login(t ct.TestLike, opts api.ClientCreationOpts) error {
console.log("%s"+event.getRoomId()+"||"+JSON.stringify(event.getEffectiveEvent()));
});`, CONSOLE_LOG_CONTROL_STRING, CONSOLE_LOG_CONTROL_STRING))

if c.opts.PersistentStorage {
/* FIXME: this doesn't work. It doesn't seem to remember across restarts.
chrome.MustRunAsyncFn[chrome.Void](t, c.browser.Ctx, `
const token = window.__client.getAccessToken();
if (token) {
window.localStorage.setItem("complement_crypto_access_token",token);
console.log("localStorage.setItem(complement_crypto_access_token) => " + token);
}
`) */
}

return nil
}

Expand Down Expand Up @@ -428,7 +446,7 @@ func (c *JSClient) Logf(t ct.TestLike, format string, args ...interface{}) {
t.Helper()
formatted := fmt.Sprintf(t.Name()+": "+format, args...)
if c.browser.Ctx.Err() == nil { // don't log on dead browsers
chrome.MustRunAsyncFn[chrome.Void](t, c.browser.Ctx, fmt.Sprintf(`console.log("%s");`, formatted))
chrome.MustRunAsyncFn[chrome.Void](t, c.browser.Ctx, fmt.Sprintf(`console.log("%s");`, strings.Replace(formatted, `"`, `\"`, -1)))
}
t.Logf(format, args...)
}
Expand Down
8 changes: 7 additions & 1 deletion tests/mitmproxy_addons/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# method: "GET|PUT|...",
# access_token: "syt_11...",
# url: "http://hs1/_matrix/client/...",
# request_body: { some json object or null if no body },
# response_body: { some json object },
# response_code: 200,
# }
# Currently this is a read-only callback. The response cannot be modified, but side-effects can be
Expand Down Expand Up @@ -61,12 +63,16 @@ def response(self, flow):
if self.config["callback_url"] == "":
return # ignore responses if we aren't told a url
if flowfilter.match(self.filter, flow):
try:
req_body = flow.request.json()
except:
req_body = None
data = json.dumps({
"method": flow.request.method,
"access_token": flow.request.headers.get("Authorization", "").removeprefix("Bearer "),
"url": flow.request.url,
"response_code": flow.response.status_code,
"request_body": flow.request.json(),
"request_body": req_body,
"response_body": flow.response.json(),
})
request = Request(
Expand Down
250 changes: 149 additions & 101 deletions tests/to_device_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/matrix-org/complement-crypto/internal/api"
"github.com/matrix-org/complement-crypto/internal/deploy"
templates "github.com/matrix-org/complement-crypto/tests/go_templates"
"github.com/matrix-org/complement/client"
"github.com/matrix-org/complement/helpers"
"github.com/matrix-org/complement/must"
"github.com/tidwall/gjson"
Expand Down Expand Up @@ -80,113 +79,162 @@ func TestUnprocessedToDeviceMessagesArentLostOnRestart(t *testing.T) {
DeviceID: "ALICE_TWO",
Password: "complement-crypto-password",
})
switch clientType.Lang {
case api.ClientTypeRust:
testUnprocessedToDeviceMessagesArentLostOnRestartRust(t, tc, roomID, alice2)
case api.ClientTypeJS:
testUnprocessedToDeviceMessagesArentLostOnRestartJS(t, tc, roomID, alice2)
default:
t.Fatalf("unknown lang: %s", clientType.Lang)
}
})
}

func testUnprocessedToDeviceMessagesArentLostOnRestartRust(t *testing.T, tc *TestContext, roomID string, alice2 *client.CSAPI) {
tc.WithAliceSyncing(t, func(alice api.Client) {
bob := tc.MustLoginClient(t, tc.Bob, tc.BobClientType, WithPersistentStorage())
// we will close this in the test, no defer
bobStopSyncing := bob.MustStartSyncing(t)
tc.WithClientSyncing(t, tc.AliceClientType, alice2, func(alice2 api.Client) { // sync to ensure alice2 has keys uploaded
// check the room works
alice.SendMessage(t, roomID, "Hello World!")
bob.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody("Hello World!")).Wait(t, 2*time.Second)
})
// stop bob's client
bobStopSyncing()
bob.Logf(t, "Bob is about to be Closed()")
bob.Close(t)

// send a lot of to-device messages to bob to increase the window in which to SIGKILL the client.
// It's unimportant what these are.
for i := 0; i < 60; i++ {
alice2.MustSendToDeviceMessages(t, "m.room_key_request", map[string]map[string]map[string]interface{}{
bob.UserID(): {
"*": {
"action": "request_cancellation",
"request_id": fmt.Sprintf("random_%d", i),
"requesting_device_id": "WHO_KNOWS",
},
},
// the initial setup for rust/js is the same.
tc.WithAliceSyncing(t, func(alice api.Client) {
bob := tc.MustLoginClient(t, tc.Bob, tc.BobClientType, WithPersistentStorage())
// we will close this in the test, no defer
bobStopSyncing := bob.MustStartSyncing(t)
tc.WithClientSyncing(t, tc.AliceClientType, alice2, func(alice2 api.Client) { // sync to ensure alice2 has keys uploaded
// check the room works
alice.SendMessage(t, roomID, "Hello World!")
bob.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody("Hello World!")).Wait(t, 2*time.Second)
})
}
t.Logf("to-device msgs sent")

// logout alice 2
alice2.MustDo(t, "POST", []string{"_matrix", "client", "v3", "logout"})

// send a message as alice to make a new room key (if we didn't already on the /logout above)
eventID := alice.SendMessage(t, roomID, "Kick to make a new room key!")

// sniff /sync traffic
waitForRoomKey := helpers.NewWaiter()
tc.Deployment.WithSniffedEndpoint(t, "/sync", func(cd deploy.CallbackData) {
// When /sync shows a to-device message from Alice (indicating the room key), sleep(1ms) then SIGKILL Bob.
body := gjson.ParseBytes(cd.ResponseBody)
toDeviceEvents := body.Get("extensions.to_device.events").Array()
if len(toDeviceEvents) > 0 {
for _, ev := range toDeviceEvents {
if ev.Get("type").Str == "m.room.encrypted" {
t.Logf("detected potential room key")
waitForRoomKey.Finish()
}
}
}
}, func() {
// bob comes back online, and will be killed a short while later.
bobOpts := bob.Opts()
t.Logf("recreating bob, base url %s", bobOpts.BaseURL)
cmd, close := templates.PrepareGoScript(t, "testUnprocessedToDeviceMessagesArentLostOnRestartRust/test.go",
struct {
UserID string
DeviceID string
Password string
BaseURL string
SSURL string
PersistentStorage bool
}{
UserID: bobOpts.UserID,
Password: bobOpts.Password,
DeviceID: bobOpts.DeviceID,
BaseURL: bobOpts.BaseURL,
PersistentStorage: bobOpts.PersistentStorage,
SSURL: tc.Deployment.SlidingSyncURL(t),
// stop bob's client
bobStopSyncing()
bob.Logf(t, "Bob is about to be Closed()")
bob.Close(t)

// send a lot of to-device messages to bob to increase the window in which to SIGKILL the client.
// It's unimportant what these are.
for i := 0; i < 60; i++ {
alice2.MustSendToDeviceMessages(t, "m.room_key_request", map[string]map[string]map[string]interface{}{
bob.UserID(): {
"*": {
"action": "request_cancellation",
"request_id": fmt.Sprintf("random_%d", i),
"requesting_device_id": "WHO_KNOWS",
},
},
})
cmd.WaitDelay = 3 * time.Second
defer close()
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Start()
waitForRoomKey.Wait(t, 10*time.Second)
t.Logf("killing external process")
must.NotError(t, "failed to kill process", cmd.Process.Kill())
}
t.Logf("to-device msgs sent")

// Ensure Bob can decrypt new messages sent from Alice.
bob = tc.MustLoginClient(t, tc.Bob, tc.BobClientType, WithPersistentStorage())
defer bob.Close(t)
bobStopSyncing := bob.MustStartSyncing(t)
defer bobStopSyncing()
// we can't rely on MustStartSyncing returning to know that the room key has been received, as
// in rust we just wait for RoomListLoadingStateLoaded which is a separate connection to the
// encryption loop.
// logout alice 2
alice2.MustDo(t, "POST", []string{"_matrix", "client", "v3", "logout"})

// if clients cycle room keys eagerly then the above logout will cause room keys to be sent.
// We want to wait for that to happen before sending the kick event. This is notable for JS.
time.Sleep(time.Second)
ev := bob.MustGetEvent(t, roomID, eventID)
must.Equal(t, ev.FailedToDecrypt, false, "unable to decrypt message")
must.Equal(t, ev.Text, "Kick to make a new room key!", "event text mismatch")
})

// send a message as alice to make a new room key (if we didn't already on the /logout above)
eventID := alice.SendMessage(t, roomID, "Kick to make a new room key!")

// client specific impls to handle restarts.
switch clientType.Lang {
case api.ClientTypeRust:
testUnprocessedToDeviceMessagesArentLostOnRestartRust(t, tc, bob.Opts(), roomID, eventID)
case api.ClientTypeJS:
testUnprocessedToDeviceMessagesArentLostOnRestartJS(t, tc, bob.Opts(), roomID, eventID)
default:
t.Fatalf("unknown lang: %s", clientType.Lang)
}
})
})
}

func testUnprocessedToDeviceMessagesArentLostOnRestartJS(t *testing.T, tc *TestContext, roomID string, alice2 *client.CSAPI) {
func testUnprocessedToDeviceMessagesArentLostOnRestartRust(t *testing.T, tc *TestContext, bobOpts api.ClientCreationOpts, roomID, eventID string) {
// sniff /sync traffic
waitForRoomKey := helpers.NewWaiter()
tc.Deployment.WithSniffedEndpoint(t, "/sync", func(cd deploy.CallbackData) {
// When /sync shows a to-device message from Alice (indicating the room key), then SIGKILL Bob.
body := gjson.ParseBytes(cd.ResponseBody)
toDeviceEvents := body.Get("extensions.to_device.events").Array() // Sliding Sync form
if len(toDeviceEvents) > 0 {
for _, ev := range toDeviceEvents {
if ev.Get("type").Str == "m.room.encrypted" {
t.Logf("detected potential room key")
waitForRoomKey.Finish()
}
}
}
}, func() {
// bob comes back online, and will be killed a short while later.
t.Logf("recreating bob")
cmd, close := templates.PrepareGoScript(t, "testUnprocessedToDeviceMessagesArentLostOnRestartRust/test.go",
struct {
UserID string
DeviceID string
Password string
BaseURL string
SSURL string
PersistentStorage bool
}{
UserID: bobOpts.UserID,
Password: bobOpts.Password,
DeviceID: bobOpts.DeviceID,
BaseURL: bobOpts.BaseURL,
PersistentStorage: bobOpts.PersistentStorage,
SSURL: tc.Deployment.SlidingSyncURL(t),
})
cmd.WaitDelay = 3 * time.Second
defer close()
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Start()
waitForRoomKey.Wait(t, 10*time.Second)
time.Sleep(time.Millisecond) // wait a bit to let the client be mid-processing
t.Logf("killing external process")
must.NotError(t, "failed to kill process", cmd.Process.Kill())

// Ensure Bob can decrypt new messages sent from Alice.
bob := tc.MustLoginClient(t, tc.Bob, tc.BobClientType, WithPersistentStorage())
defer bob.Close(t)
bobStopSyncing := bob.MustStartSyncing(t)
defer bobStopSyncing()
// we can't rely on MustStartSyncing returning to know that the room key has been received, as
// in rust we just wait for RoomListLoadingStateLoaded which is a separate connection to the
// encryption loop.
time.Sleep(time.Second)
ev := bob.MustGetEvent(t, roomID, eventID)
must.Equal(t, ev.FailedToDecrypt, false, "unable to decrypt message")
must.Equal(t, ev.Text, "Kick to make a new room key!", "event text mismatch")
})
}

func testUnprocessedToDeviceMessagesArentLostOnRestartJS(t *testing.T, tc *TestContext, bobOpts api.ClientCreationOpts, roomID, eventID string) {
// sniff /sync traffic
waitForRoomKey := helpers.NewWaiter()
tc.Deployment.WithSniffedEndpoint(t, "/sync", func(cd deploy.CallbackData) {
// When /sync shows a to-device message from Alice (indicating the room key) then SIGKILL Bob.
body := gjson.ParseBytes(cd.ResponseBody)
toDeviceEvents := body.Get("to_device.events").Array() // Sync v2 form
if len(toDeviceEvents) > 0 {
for _, ev := range toDeviceEvents {
if ev.Get("type").Str == "m.room.encrypted" {
t.Logf("detected potential room key")
waitForRoomKey.Finish()
}
}
}
}, func() {
bob := tc.MustLoginClient(t, tc.Bob, tc.BobClientType, WithPersistentStorage()) // no need to login as we have an account in storage already
// this is time-sensitive: start waiting for waitForRoomKey BEFORE we call MustStartSyncing
// which itself needs to be in a separate goroutine.
browserIsClosed := helpers.NewWaiter()
go func() {
waitForRoomKey.Wait(t, 10*time.Second)
t.Logf("killing bob as room key event received")
bob.Close(t) // close the browser
browserIsClosed.Finish()
}()
time.Sleep(100 * time.Millisecond)
go func() { // in a goroutine so we don't need this to return before closing the browser
t.Logf("bob starting to sync, expecting to be killed..")
bob.StartSyncing(t)
t.Logf("MustStartSyncing returned.")
}()

browserIsClosed.Wait(t, 10*time.Second)

// Ensure Bob can decrypt new messages sent from Alice.
bob = tc.MustLoginClient(t, tc.Bob, tc.BobClientType, WithPersistentStorage())
defer bob.Close(t)
bobStopSyncing := bob.MustStartSyncing(t)
defer bobStopSyncing()
// include a grace period like rust, no specific reason beyond consistency.
time.Sleep(time.Second)
ev := bob.MustGetEvent(t, roomID, eventID)
must.Equal(t, ev.FailedToDecrypt, false, "unable to decrypt message")
must.Equal(t, ev.Text, "Kick to make a new room key!", "event text mismatch")
})
}

0 comments on commit 21d682d

Please sign in to comment.