Skip to content

Commit

Permalink
Merge pull request #878 from tinode/session-sub-race-dev
Browse files Browse the repository at this point in the history
Fix a race condition around Session.subs.
  • Loading branch information
aforge authored Jul 7, 2023
2 parents e4cde15 + e776c4d commit f89ca30
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 7 deletions.
2 changes: 1 addition & 1 deletion loadtest/tinode.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class TinodeBase extends Simulation {
val hello = exitBlockOnFail {
exec {
ws("hi").sendText(
"""{"hi":{"id":"afabb3","ver":"0.18.1","ua":"Gatling-Loadtest/1.0; gatling/1.7.0"}}"""
"""{"hi":{"id":"afabb3","ver":"0.22.8","ua":"Gatling-Loadtest/1.0; gatling/1.7.0"}}"""
)
.await(15 seconds)(
ws.checkTextMessage("hi")
Expand Down
14 changes: 9 additions & 5 deletions server/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ type Session struct {
bkgTimer *time.Timer

// Number of subscribe/unsubscribe requests in flight.
inflightReqs *sync.WaitGroup
inflightReqs *boundedWaitGroup
// Synchronizes access to session store in cluster mode:
// subscribe/unsubscribe replies are asynchronous.
sessionStoreLock sync.Mutex
Expand Down Expand Up @@ -628,11 +628,12 @@ func (s *Session) subscribe(msg *ClientComMessage) {
}
}

s.inflightReqs.Add(1)
// Session can subscribe to topic on behalf of a single user at a time.
if sub := s.getSub(msg.RcptTo); sub != nil {
s.queueOut(InfoAlreadySubscribed(msg.Id, msg.Original, msg.Timestamp))
s.inflightReqs.Done()
} else {
s.inflightReqs.Add(1)
select {
case globals.hub.join <- msg:
default:
Expand All @@ -655,18 +656,21 @@ func (s *Session) leave(msg *ClientComMessage) {
return
}

s.inflightReqs.Add(1)
if sub := s.getSub(msg.RcptTo); sub != nil {
// Session is attached to the topic.
if (msg.Original == "me" || msg.Original == "fnd") && msg.Leave.Unsub {
// User should not unsubscribe from 'me' or 'find'. Just leaving is fine.
s.queueOut(ErrPermissionDeniedReply(msg, msg.Timestamp))
s.inflightReqs.Done()
} else {
// Unlink from topic, topic will send a reply.
s.delSub(msg.RcptTo)
s.inflightReqs.Add(1)
sub.done <- msg
}
} else if !msg.Leave.Unsub {
return
}
s.inflightReqs.Done()
if !msg.Leave.Unsub {
// Session is not attached to the topic, wants to leave - fine, no change
s.queueOut(InfoNotJoined(msg.Id, msg.Original, msg.Timestamp))
} else {
Expand Down
41 changes: 40 additions & 1 deletion server/sessionstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,43 @@ import (
"github.com/tinode/chat/server/store/types"
)

// WaitGroup with a semaphore functionality
// (limiting number of threads/goroutines accessing the guarded resource simultaneously).
type boundedWaitGroup struct {
wg sync.WaitGroup
sem chan struct{}
}

func newBoundedWaitGroup(capacity int) *boundedWaitGroup {
return &boundedWaitGroup{sem: make(chan struct{}, capacity)}
}

func (w *boundedWaitGroup) Add(delta int) {
if delta <= 0 {
return
}
for i := 0; i < delta; i++ {
w.sem <- struct{}{}
}
w.wg.Add(delta)
}

func (w *boundedWaitGroup) Done() {
select {
case _, ok := <-w.sem:
if !ok {
logs.Err.Panicln("boundedWaitGroup.sem closed.")
}
default:
logs.Err.Panicln("boundedWaitGroup.Done() called before Add().")
}
w.wg.Done()
}

func (w *boundedWaitGroup) Wait() {
w.wg.Wait()
}

// SessionStore holds live sessions. Long polling sessions are stored in a linked list with
// most recent sessions on top. In addition all sessions are stored in a map indexed by session ID.
type SessionStore struct {
Expand Down Expand Up @@ -76,7 +113,9 @@ func (ss *SessionStore) NewSession(conn any, sid string) (*Session, int) {
s.bkgTimer = time.NewTimer(time.Hour)
s.bkgTimer.Stop()

s.inflightReqs = &sync.WaitGroup{}
// Make sure at most 1 request is modifying session/topic state at any time.
// TODO: use Mutex & CondVar?
s.inflightReqs = newBoundedWaitGroup(1)

s.lastTouched = time.Now()

Expand Down
3 changes: 3 additions & 0 deletions server/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,9 @@ func (t *Topic) handleLeaveRequest(msg *ClientComMessage, sess *Session) {

// User wants to leave without unsubscribing.
if pssd, _ := t.remSession(sess, asUid); pssd != nil {
if !sess.isProxy() {
sess.delSub(t.name)
}
if pssd.isChanSub != asChan {
// Cannot address non-channel subscription as channel and vice versa.
if msg.init {
Expand Down
3 changes: 3 additions & 0 deletions server/topic_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ func (t *Topic) handleProxyLeaveRequest(msg *ClientComMessage, killTimer *time.T
// because by the time the response arrives this session may be already gone from the session store
// and we won't be able to find and remove it by its sid.
pssd, result := t.remSession(msg.sess, asUid)
if result {
msg.sess.delSub(t.name)
}
if !msg.init {
// Explicitly specify the uid because the master multiplex session needs to know which
// of its multiple hosted sessions to delete.
Expand Down

0 comments on commit f89ca30

Please sign in to comment.