diff --git a/internal/api/rust/dynamic_slice.go b/internal/api/rust/dynamic_slice.go new file mode 100644 index 0000000..45e9fae --- /dev/null +++ b/internal/api/rust/dynamic_slice.go @@ -0,0 +1,66 @@ +package rust + +import ( + "fmt" + + "golang.org/x/exp/slices" +) + +type DynamicSlice[T any] struct { + Slice []T +} + +// Insert the item, shifting anything in this place up by one. +func (s *DynamicSlice[T]) Insert(i int, val T) { + s.Slice = slices.Insert(s.Slice, i, val) +} + +// Remove the item, shifting anything above this down by one. +func (s *DynamicSlice[T]) Remove(i int) { + s.Slice = slices.Delete(s.Slice, i, i+1) +} + +// Append items to the end of the slice. +func (s *DynamicSlice[T]) Append(vals ...T) { + s.Slice = append(s.Slice, vals...) +} + +// Set the item at i to val, does not shift anything. +func (s *DynamicSlice[T]) Set(i int, val T) { + if i > len(s.Slice) { + panic(fmt.Sprintf("DynamicSlice.Set[%d, %+v] out of bounds of size %d", i, val, len(s.Slice))) + } else if i == len(s.Slice) { + s.Slice = append(s.Slice, val) + } else { + s.Slice[i] = val + } +} + +func (s *DynamicSlice[T]) PushBack(val T) { + s.Append(val) +} + +func (s *DynamicSlice[T]) PushFront(val T) { + s.Insert(0, val) +} + +func (s *DynamicSlice[T]) PopBack() { + s.Remove(len(s.Slice) - 1) +} + +func (s *DynamicSlice[T]) PopFront() { + s.Remove(0) +} + +func (s *DynamicSlice[T]) Clear() { + s.Slice = s.Slice[:0] +} + +func (s *DynamicSlice[T]) Reset(vals []T) { + s.Clear() + s.Append(vals...) +} + +func (s *DynamicSlice[T]) Truncate(length int) { + s.Slice = s.Slice[0:length] +} diff --git a/internal/api/rust/room_listener.go b/internal/api/rust/room_listener.go new file mode 100644 index 0000000..5a8ac31 --- /dev/null +++ b/internal/api/rust/room_listener.go @@ -0,0 +1,55 @@ +package rust + +import ( + "sync" + "sync/atomic" +) + +type RoomsListener struct { + listeners map[int32]func(roomID string) (cancel bool) + listenerID atomic.Int32 + mu *sync.RWMutex +} + +func NewRoomsListener() *RoomsListener { + return &RoomsListener{ + listeners: make(map[int32]func(roomID string) (cancel bool)), + mu: &sync.RWMutex{}, + } +} + +// AddListener registers the given callback, which will be invoked for every call to BroadcastUpdateForRoom. +func (l *RoomsListener) AddListener(callback func(broadcastRoomID string) (cancel bool)) (cancel func()) { + id := l.listenerID.Add(1) + l.mu.Lock() + defer l.mu.Unlock() + l.listeners[id] = callback + return func() { + l.removeListener(id) + } +} + +func (l *RoomsListener) removeListener(id int32) { + l.mu.Lock() + defer l.mu.Unlock() + delete(l.listeners, id) +} + +// BroadcastUpdateForRoom informs all attached listeners that something has happened in relation +// to this room ID. This could be a new event, or the room appearing in all_rooms, or something else entirely. +// It is up to the listener to decide what to do upon receipt of the poke. +func (l *RoomsListener) BroadcastUpdateForRoom(roomID string) { + // take a snapshot of callbacks so callbacks can unregister themselves without causing deadlocks due to + // .Lock within .RLock. + callbacks := map[int32]func(roomID string) (cancel bool){} + l.mu.RLock() + for id, cb := range l.listeners { + callbacks[id] = cb + } + l.mu.RUnlock() + for id, cb := range callbacks { + if cb(roomID) { + l.removeListener(id) + } + } +} diff --git a/internal/api/rust/rust.go b/internal/api/rust/rust.go index fa6eb59..2f7526e 100644 --- a/internal/api/rust/rust.go +++ b/internal/api/rust/rust.go @@ -36,8 +36,7 @@ type RustRoomInfo struct { type RustClient struct { FFIClient *matrix_sdk_ffi.Client - listeners map[int32]func(roomID string) - listenerID atomic.Int32 + roomsListener *RoomsListener allRooms *matrix_sdk_ffi.RoomList rooms map[string]*RustRoomInfo roomsMu *sync.RWMutex @@ -61,12 +60,12 @@ func NewRustClient(t ct.TestLike, opts api.ClientCreationOpts, ssURL string) (ap return nil, fmt.Errorf("ClientBuilder.Build failed: %s", err) } c := &RustClient{ - userID: opts.UserID, - FFIClient: client, - rooms: make(map[string]*RustRoomInfo), - listeners: make(map[int32]func(roomID string)), - roomsMu: &sync.RWMutex{}, - opts: opts, + userID: opts.UserID, + FFIClient: client, + roomsListener: NewRoomsListener(), + rooms: make(map[string]*RustRoomInfo), + roomsMu: &sync.RWMutex{}, + opts: opts, } if opts.PersistentStorage { c.persistentStoragePath = "./rust_storage/" + username @@ -159,6 +158,7 @@ func (c *RustClient) StartSyncing(t ct.TestLike) (stopSyncing func(), err error) if err != nil { return nil, fmt.Errorf("[%s]failed to call SyncService.RoomListService.AllRooms: %s", c.userID, err) } + must.NotEqual(t, roomList, nil, "AllRooms room list must not be nil") genericListener := newGenericStateListener[matrix_sdk_ffi.RoomListLoadingState]() result, err := roomList.LoadingState(genericListener) if err != nil { @@ -166,6 +166,57 @@ func (c *RustClient) StartSyncing(t ct.TestLike) (stopSyncing func(), err error) } go syncService.Start() c.allRooms = roomList + // track new rooms when they are made + allRoomsListener := newGenericStateListener[[]matrix_sdk_ffi.RoomListEntriesUpdate]() + go func() { + var allRoomIds DynamicSlice[matrix_sdk_ffi.RoomListEntry] + for !allRoomsListener.isClosed { + updates := <-allRoomsListener.ch + var newEntries []matrix_sdk_ffi.RoomListEntry + for _, update := range updates { + switch x := update.(type) { + case matrix_sdk_ffi.RoomListEntriesUpdateAppend: + allRoomIds.Append(x.Values...) + newEntries = append(newEntries, x.Values...) + case matrix_sdk_ffi.RoomListEntriesUpdateInsert: + allRoomIds.Insert(int(x.Index), x.Value) + newEntries = append(newEntries, x.Value) + case matrix_sdk_ffi.RoomListEntriesUpdatePushBack: + allRoomIds.PushBack(x.Value) + newEntries = append(newEntries, x.Value) + case matrix_sdk_ffi.RoomListEntriesUpdatePushFront: + allRoomIds.PushFront(x.Value) + newEntries = append(newEntries, x.Value) + case matrix_sdk_ffi.RoomListEntriesUpdateSet: + allRoomIds.Set(int(x.Index), x.Value) + newEntries = append(newEntries, x.Value) + case matrix_sdk_ffi.RoomListEntriesUpdateClear: + allRoomIds.Clear() + case matrix_sdk_ffi.RoomListEntriesUpdatePopBack: + allRoomIds.PopBack() + case matrix_sdk_ffi.RoomListEntriesUpdatePopFront: + allRoomIds.PopFront() + case matrix_sdk_ffi.RoomListEntriesUpdateRemove: + allRoomIds.Remove(int(x.Index)) + case matrix_sdk_ffi.RoomListEntriesUpdateReset: + allRoomIds.Reset(x.Values) + newEntries = append(newEntries, x.Values...) + case matrix_sdk_ffi.RoomListEntriesUpdateTruncate: + allRoomIds.Truncate(int(x.Length)) + default: + c.Logf(t, "unhandled all rooms update: %+v", update) + } + } + // inform anything waiting on this room that it exists + for _, entry := range newEntries { + switch x := entry.(type) { + case matrix_sdk_ffi.RoomListEntryFilled: + c.roomsListener.BroadcastUpdateForRoom(x.RoomId) + } + } + } + }() + c.allRooms.Entries(allRoomsListener) isSyncing := false @@ -278,11 +329,15 @@ func (c *RustClient) TrySendMessage(t ct.TestLike, roomID, text string) (eventID ch := make(chan bool) // we need a timeline listener before we can send messages, AND that listener must be attached to the // same *Room you call .Send on :S - r := c.ensureListening(t, roomID) - cancel := c.listenForUpdates(func(roomID string) { + c.ensureListening(t, roomID) + r := c.findRoom(t, roomID) + cancel := c.roomsListener.AddListener(func(broadcastRoomID string) bool { + if roomID != broadcastRoomID { + return false + } info := c.rooms[roomID] if info == nil { - return + return false } for _, ev := range info.timeline { if ev == nil { @@ -293,6 +348,7 @@ func (c *RustClient) TrySendMessage(t ct.TestLike, roomID, text string) (eventID close(ch) } } + return false }) defer cancel() timeline, err := r.Timeline() @@ -323,7 +379,7 @@ func (c *RustClient) UserID() string { return c.userID } -func (c *RustClient) findRoomInMap(roomID string) *matrix_sdk_ffi.Room { +func (c *RustClient) findRoomInCache(roomID string) *matrix_sdk_ffi.Room { c.roomsMu.RLock() defer c.roomsMu.RUnlock() // do we have a reference to it already? @@ -334,10 +390,11 @@ func (c *RustClient) findRoomInMap(roomID string) *matrix_sdk_ffi.Room { return nil } -// findRoom returns the room, waiting up to 5s for it to appear +// findRoom tries to find the room in the FFI client. Has a cache of already found rooms to ensure +// the same pointer is always returned for the same room. func (c *RustClient) findRoom(t ct.TestLike, roomID string) *matrix_sdk_ffi.Room { t.Helper() - room := c.findRoomInMap(roomID) + room := c.findRoomInCache(roomID) if room != nil { return room } @@ -365,7 +422,7 @@ func (c *RustClient) findRoom(t ct.TestLike, roomID string) *matrix_sdk_ffi.Room } } } - // try to find it from cache? + // try to find it from FFI rooms := c.FFIClient.Rooms() for i, r := range rooms { rid := r.Id() @@ -382,6 +439,7 @@ func (c *RustClient) findRoom(t ct.TestLike, roomID string) *matrix_sdk_ffi.Room return c.rooms[rid].room } } + // we really don't know about this room yet return nil } @@ -395,14 +453,28 @@ func (c *RustClient) logToFile(t ct.TestLike, format string, args ...interface{} matrix_sdk_ffi.LogEvent("rust.go", &zero, matrix_sdk_ffi.LogLevelInfo, t.Name(), fmt.Sprintf(format, args...)) } -func (c *RustClient) ensureListening(t ct.TestLike, roomID string) *matrix_sdk_ffi.Room { +func (c *RustClient) ensureListening(t ct.TestLike, roomID string) { t.Helper() r := c.findRoom(t, roomID) + if r == nil { + // we allow the room to not exist yet. If this happens, wait until we see the room before continuing + c.roomsListener.AddListener(func(broadcastRoomID string) bool { + if broadcastRoomID != roomID { + return false + } + if room := c.findRoom(t, roomID); room != nil { + c.ensureListening(t, roomID) // this should work now + return true + } + return false + }) + return + } must.NotEqual(t, r, nil, fmt.Sprintf("room %s does not exist", roomID)) info := c.rooms[roomID] - if info.stream != nil { - return r + if info != nil && info.stream != nil { + return } c.Logf(t, "[%s]AddTimelineListener[%s]", c.userID, roomID) @@ -421,6 +493,11 @@ func (c *RustClient) ensureListening(t ct.TestLike, roomID string) *matrix_sdk_f i := int(insertData.Index) if i >= len(timeline) { t.Logf("TimelineListener[%s] INSERT %d out of bounds of events timeline of size %d", roomID, i, len(timeline)) + if i == len(timeline) { + t.Logf("TimelineListener[%s] treating as append", roomID) + timeline = append(timeline, timelineItemToEvent(insertData.Item)) + newEvents = append(newEvents, timeline[i]) + } continue } timeline = slices.Insert(timeline, i, timelineItemToEvent(insertData.Item)) @@ -470,14 +547,20 @@ func (c *RustClient) ensureListening(t ct.TestLike, roomID string) *matrix_sdk_f timeline[i] = timelineItemToEvent(setData.Item) c.logToFile(t, "[%s]_______ SET %+v\n", c.userID, timeline[i]) newEvents = append(newEvents, timeline[i]) + case matrix_sdk_ffi.TimelineChangePushFront: + pushFrontData := d.PushFront() + if pushFrontData == nil { + continue + } + ev := timelineItemToEvent(*pushFrontData) + timeline = slices.Insert(timeline, 0, ev) + newEvents = append(newEvents, ev) default: t.Logf("Unhandled TimelineDiff change %v", d.Change()) } } c.rooms[roomID].timeline = timeline - for _, l := range c.listeners { - l(roomID) - } + c.roomsListener.BroadcastUpdateForRoom(roomID) for _, e := range newEvents { c.Logf(t, "TimelineDiff change: %+v", e) } @@ -490,18 +573,7 @@ func (c *RustClient) ensureListening(t ct.TestLike, roomID string) *matrix_sdk_f c.rooms[roomID].timeline = events c.Logf(t, "[%s]AddTimelineListener[%s] result.Items len=%d", c.userID, roomID, len(result.Items)) if len(events) > 0 { - for _, l := range c.listeners { - l(roomID) - } - } - return r -} - -func (c *RustClient) listenForUpdates(callback func(roomID string)) (cancel func()) { - id := c.listenerID.Add(1) - c.listeners[id] = callback - return func() { - delete(c.listeners, id) + c.roomsListener.BroadcastUpdateForRoom(roomID) } } @@ -540,17 +612,28 @@ func (w *timelineWaiter) Wait(t ct.TestLike, s time.Duration) { } updates := make(chan bool, 3) - cancel := w.client.listenForUpdates(func(roomID string) { + var isClosed atomic.Bool + cancel := w.client.roomsListener.AddListener(func(roomID string) bool { + if isClosed.Load() { + return true + } if w.roomID != roomID { - return + return false } if !checkForEvent() { - return + return false } + isClosed.Store(true) close(updates) + return true }) defer cancel() + // check again in case it was added after the previous checkForEvent but before AddListener + if checkForEvent() { + return + } + // either no timeline or doesn't exist yet, start blocking start := time.Now() for { @@ -568,6 +651,9 @@ func (w *timelineWaiter) Wait(t ct.TestLike, s time.Duration) { } func mustGetTimeline(t ct.TestLike, room *matrix_sdk_ffi.Room) *matrix_sdk_ffi.Timeline { + if room == nil { + ct.Fatalf(t, "mustGetTimeline: room does not exist") + } timeline, err := room.Timeline() must.NotError(t, "failed to get room timeline", err) return timeline diff --git a/internal/tests/client_test.go b/internal/tests/client_test.go new file mode 100644 index 0000000..43b2697 --- /dev/null +++ b/internal/tests/client_test.go @@ -0,0 +1,189 @@ +// package tests contains sanity checks that any client implementation can run to ensure their concrete implementation will work +// correctly with complement-crypto. Writing code to interact with your concrete client SDK is error-prone. The purpose of these +// tests is to ensure that the code that implements api.Client is correct. +package tests + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/matrix-org/complement" + "github.com/matrix-org/complement-crypto/internal/api" + "github.com/matrix-org/complement-crypto/internal/api/js" + "github.com/matrix-org/complement-crypto/internal/api/rust" + "github.com/matrix-org/complement-crypto/internal/deploy" + "github.com/matrix-org/complement/b" + "github.com/matrix-org/complement/client" + "github.com/matrix-org/complement/helpers" + "github.com/matrix-org/complement/must" +) + +var ( + ssMutex *sync.Mutex = &sync.Mutex{} + ssDeployment *deploy.SlidingSyncDeployment + // aka functions which make clients, and we don't care about the language. + // Tests just loop through this array for each client impl. + clientFactories []func(t *testing.T, cfg api.ClientCreationOpts, deployment *deploy.SlidingSyncDeployment) api.Client +) + +func Deploy(t *testing.T) *deploy.SlidingSyncDeployment { + ssMutex.Lock() + defer ssMutex.Unlock() + if ssDeployment != nil { + return ssDeployment + } + ssDeployment = deploy.RunNewDeployment(t, false) + return ssDeployment +} + +func TestMain(m *testing.M) { + rustClientCreator := func(t *testing.T, cfg api.ClientCreationOpts, deployment *deploy.SlidingSyncDeployment) api.Client { + client, err := rust.NewRustClient(t, cfg, deployment.SlidingSyncURL(t)) + if err != nil { + t.Fatalf("NewRustClient: %s", err) + } + return client + } + jsClientCreator := func(t *testing.T, cfg api.ClientCreationOpts, deployment *deploy.SlidingSyncDeployment) api.Client { + client, err := js.NewJSClient(t, cfg) + if err != nil { + t.Fatalf("NewJSClient: %s", err) + } + return client + } + clientFactories = append(clientFactories, rustClientCreator, jsClientCreator) + js.SetupJSLogs("./logs/js_sdk.log") // rust sdk logs on its own + complement.TestMainWithCleanup(m, "clienttests", func() { // always teardown even if panicking + ssMutex.Lock() + if ssDeployment != nil { + ssDeployment.Teardown(false) + } + ssMutex.Unlock() + js.WriteJSLogs() + }) +} + +// Test that the client can receive live messages as well as return events that have already been received. +func TestReceiveTimeline(t *testing.T) { + deployment := Deploy(t) + + createAndSendEvents := func(t *testing.T, csapi *client.CSAPI) (roomID string, eventIDs []string) { + roomID = csapi.MustCreateRoom(t, map[string]interface{}{}) + for i := 0; i < 10; i++ { + eventIDs = append(eventIDs, csapi.SendEventSynced(t, roomID, b.Event{ + Type: "m.room.message", + Content: map[string]interface{}{ + "msgtype": "m.text", + "body": fmt.Sprintf("Test Message %d", i+1), + }, + })) + } + return + } + + // test that if we start syncing with a room full of events, we see those events. + ForEachClient(t, "existing_events", deployment, func(t *testing.T, client api.Client, csapi *client.CSAPI) { + must.NotError(t, "Failed to login", client.Login(t, client.Opts())) + roomID, eventIDs := createAndSendEvents(t, csapi) + time.Sleep(time.Second) // give time for everything to settle server-side e.g sliding sync proxy + stopSyncing := client.MustStartSyncing(t) + defer stopSyncing() + // wait until we see the latest event + client.WaitUntilEventInRoom(t, roomID, api.CheckEventHasEventID(eventIDs[len(eventIDs)-1])).Wait(t, 5*time.Second) + // ensure we have backpaginated if we need to. It is valid for a client to only sync the latest + // event in the room, so we have to backpaginate here. + client.MustBackpaginate(t, roomID, len(eventIDs)) + // ensure we see all the events + for _, eventID := range eventIDs { + client.WaitUntilEventInRoom(t, roomID, api.CheckEventHasEventID(eventID)).Wait(t, 5*time.Second) + } + // check event content is correct + for i, eventID := range eventIDs { + ev := client.MustGetEvent(t, roomID, eventID) + must.Equal(t, ev.FailedToDecrypt, false, "FailedToDecrypt") + must.Equal(t, ev.ID, eventID, "ID") + must.Equal(t, ev.Membership, "", "Membership") + must.Equal(t, ev.Sender, csapi.UserID, "Sender") + must.Equal(t, ev.Target, "", "Target") + must.Equal(t, ev.Text, fmt.Sprintf("Test Message %d", i+1), "Text") + } + }) + + // test that if we are already syncing and then see a room live stream full of events, we see those events. + ForEachClient(t, "live_events", deployment, func(t *testing.T, client api.Client, csapi *client.CSAPI) { + must.NotError(t, "Failed to login", client.Login(t, client.Opts())) + stopSyncing := client.MustStartSyncing(t) + defer stopSyncing() + time.Sleep(time.Second) // give time for syncing to be well established. + // send the messages whilst syncing. + roomID, eventIDs := createAndSendEvents(t, csapi) + // ensure we see all the events + for i, eventID := range eventIDs { + t.Logf("waiting for event %d : %s", i, eventID) + client.WaitUntilEventInRoom(t, roomID, api.CheckEventHasEventID(eventID)).Wait(t, 5*time.Second) + } + // now send another live event and ensure we see it. This ensure we can still wait for events after having + // previously waited for events. + waiter := client.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody("Final")) + csapi.SendEventSynced(t, roomID, b.Event{ + Type: "m.room.message", + Content: map[string]interface{}{ + "msgtype": "m.text", + "body": "Final", + }, + }) + waiter.Wait(t, 5*time.Second) + + // check event content is correct + for i, eventID := range eventIDs { + ev := client.MustGetEvent(t, roomID, eventID) + must.Equal(t, ev.FailedToDecrypt, false, "FailedToDecrypt") + must.Equal(t, ev.ID, eventID, "ID") + must.Equal(t, ev.Membership, "", "Membership") + must.Equal(t, ev.Sender, csapi.UserID, "Sender") + must.Equal(t, ev.Target, "", "Target") + must.Equal(t, ev.Text, fmt.Sprintf("Test Message %d", i+1), "Text") + } + }) +} + +func TestCanWaitUntilEventInRoomBeforeRoomIsKnown(t *testing.T) { + deployment := Deploy(t) + ForEachClient(t, "", deployment, func(t *testing.T, client api.Client, csapi *client.CSAPI) { + roomID := csapi.MustCreateRoom(t, map[string]interface{}{}) + eventID := csapi.SendEventSynced(t, roomID, b.Event{ + Type: "m.room.message", + Content: map[string]interface{}{ + "msgtype": "m.text", + "body": "Test Message", + }, + }) + must.NotError(t, "Failed to login", client.Login(t, client.Opts())) + completed := helpers.NewWaiter() + waiter := client.WaitUntilEventInRoom(t, roomID, api.CheckEventHasEventID(eventID)) + go func() { + waiter.Wait(t, 5*time.Second) + completed.Finish() + }() + t.Logf("waiting for event %s", eventID) + stopSyncing := client.MustStartSyncing(t) + defer stopSyncing() + completed.Wait(t, 5*time.Second) + }) +} + +// run a subtest for each client factory +func ForEachClient(t *testing.T, name string, deployment *deploy.SlidingSyncDeployment, fn func(t *testing.T, client api.Client, csapi *client.CSAPI)) { + for _, createClient := range clientFactories { + csapiAlice := deployment.Register(t, "hs1", helpers.RegistrationOpts{ + LocalpartSuffix: "client", + Password: "complement-crypto-password", + }) + client := createClient(t, api.NewClientCreationOpts(csapiAlice), deployment) + t.Run(name+" "+string(client.Type()), func(t *testing.T) { + fn(t, client, csapiAlice) + }) + } +}