Skip to content

Commit

Permalink
rust: allow waiting when the room doesn't exist yet, factor out some …
Browse files Browse the repository at this point in the history
…code

Add client tests which test client impls are correct.
  • Loading branch information
kegsay committed Feb 14, 2024
1 parent e61f8b3 commit 5ad6847
Show file tree
Hide file tree
Showing 4 changed files with 432 additions and 36 deletions.
66 changes: 66 additions & 0 deletions internal/api/rust/dynamic_slice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package rust

import (
"fmt"

"golang.org/x/exp/slices"
)

type DynamicSlice[T any] struct {
Slice []T
}

// Insert the item, shifting anything in this place up by one.
func (s *DynamicSlice[T]) Insert(i int, val T) {
s.Slice = slices.Insert(s.Slice, i, val)
}

// Remove the item, shifting anything above this down by one.
func (s *DynamicSlice[T]) Remove(i int) {
s.Slice = slices.Delete(s.Slice, i, i+1)
}

// Append items to the end of the slice.
func (s *DynamicSlice[T]) Append(vals ...T) {
s.Slice = append(s.Slice, vals...)
}

// Set the item at i to val, does not shift anything.
func (s *DynamicSlice[T]) Set(i int, val T) {
if i > len(s.Slice) {
panic(fmt.Sprintf("DynamicSlice.Set[%d, %+v] out of bounds of size %d", i, val, len(s.Slice)))
} else if i == len(s.Slice) {
s.Slice = append(s.Slice, val)
} else {
s.Slice[i] = val
}
}

func (s *DynamicSlice[T]) PushBack(val T) {
s.Append(val)
}

func (s *DynamicSlice[T]) PushFront(val T) {
s.Insert(0, val)
}

func (s *DynamicSlice[T]) PopBack() {
s.Remove(len(s.Slice) - 1)
}

func (s *DynamicSlice[T]) PopFront() {
s.Remove(0)
}

func (s *DynamicSlice[T]) Clear() {
s.Slice = s.Slice[:0]
}

func (s *DynamicSlice[T]) Reset(vals []T) {
s.Clear()
s.Append(vals...)
}

func (s *DynamicSlice[T]) Truncate(length int) {
s.Slice = s.Slice[0:length]
}
55 changes: 55 additions & 0 deletions internal/api/rust/room_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package rust

import (
"sync"
"sync/atomic"
)

type RoomsListener struct {
listeners map[int32]func(roomID string) (cancel bool)
listenerID atomic.Int32
mu *sync.RWMutex
}

func NewRoomsListener() *RoomsListener {
return &RoomsListener{
listeners: make(map[int32]func(roomID string) (cancel bool)),
mu: &sync.RWMutex{},
}
}

// AddListener registers the given callback, which will be invoked for every call to BroadcastUpdateForRoom.
func (l *RoomsListener) AddListener(callback func(broadcastRoomID string) (cancel bool)) (cancel func()) {
id := l.listenerID.Add(1)
l.mu.Lock()
defer l.mu.Unlock()
l.listeners[id] = callback
return func() {
l.removeListener(id)
}
}

func (l *RoomsListener) removeListener(id int32) {
l.mu.Lock()
defer l.mu.Unlock()
delete(l.listeners, id)
}

// BroadcastUpdateForRoom informs all attached listeners that something has happened in relation
// to this room ID. This could be a new event, or the room appearing in all_rooms, or something else entirely.
// It is up to the listener to decide what to do upon receipt of the poke.
func (l *RoomsListener) BroadcastUpdateForRoom(roomID string) {
// take a snapshot of callbacks so callbacks can unregister themselves without causing deadlocks due to
// .Lock within .RLock.
callbacks := map[int32]func(roomID string) (cancel bool){}
l.mu.RLock()
for id, cb := range l.listeners {
callbacks[id] = cb
}
l.mu.RUnlock()
for id, cb := range callbacks {
if cb(roomID) {
l.removeListener(id)
}
}
}
Loading

0 comments on commit 5ad6847

Please sign in to comment.