Skip to content

Commit

Permalink
Clean up match join handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
zyro committed May 3, 2019
1 parent 12d0521 commit c393152
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 98 deletions.
49 changes: 32 additions & 17 deletions server/match_presence.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,20 @@ func (m *MatchJoinMarkerList) ClearExpired(tick int64) []*MatchPresence {
type MatchPresenceList struct {
sync.RWMutex
size *atomic.Int32
presences []*PresenceID
presenceMap map[uuid.UUID]struct{}
presences []*MatchPresenceListItem
presenceMap map[uuid.UUID]string
}

type MatchPresenceListItem struct {
PresenceID *PresenceID
Presence *MatchPresence
}

func NewMatchPresenceList() *MatchPresenceList {
return &MatchPresenceList{
size: atomic.NewInt32(0),
presences: make([]*PresenceID, 0, 10),
presenceMap: make(map[uuid.UUID]struct{}, 10),
presences: make([]*MatchPresenceListItem, 0, 10),
presenceMap: make(map[uuid.UUID]string, 10),
}
}

Expand All @@ -120,11 +125,14 @@ func (m *MatchPresenceList) Join(joins []*MatchPresence) []*MatchPresence {
m.Lock()
for _, join := range joins {
if _, ok := m.presenceMap[join.SessionID]; !ok {
m.presences = append(m.presences, &PresenceID{
Node: join.Node,
SessionID: join.SessionID,
m.presences = append(m.presences, &MatchPresenceListItem{
PresenceID: &PresenceID{
Node: join.Node,
SessionID: join.SessionID,
},
Presence: join,
})
m.presenceMap[join.SessionID] = struct{}{}
m.presenceMap[join.SessionID] = join.Node
processed = append(processed, join)
}
}
Expand All @@ -140,8 +148,8 @@ func (m *MatchPresenceList) Leave(leaves []*MatchPresence) []*MatchPresence {
m.Lock()
for _, leave := range leaves {
if _, ok := m.presenceMap[leave.SessionID]; ok {
for i, presenceID := range m.presences {
if presenceID.SessionID == leave.SessionID && presenceID.Node == leave.Node {
for i, presence := range m.presences {
if presence.PresenceID.SessionID == leave.SessionID && presence.PresenceID.Node == leave.Node {
m.presences = append(m.presences[:i], m.presences[i+1:]...)
break
}
Expand All @@ -160,21 +168,28 @@ func (m *MatchPresenceList) Leave(leaves []*MatchPresence) []*MatchPresence {
func (m *MatchPresenceList) Contains(presence *PresenceID) bool {
var found bool
m.RLock()
for _, p := range m.presences {
if p.SessionID == presence.SessionID && p.Node == p.Node {
found = true
break
}
if node, ok := m.presenceMap[presence.SessionID]; ok {
found = node == presence.Node
}
m.RUnlock()
return found
}

func (m *MatchPresenceList) List() []*PresenceID {
func (m *MatchPresenceList) ListPresenceIDs() []*PresenceID {
m.RLock()
list := make([]*PresenceID, 0, len(m.presences))
for _, presence := range m.presences {
list = append(list, presence)
list = append(list, presence.PresenceID)
}
m.RUnlock()
return list
}

func (m *MatchPresenceList) ListPresences() []*MatchPresence {
m.RLock()
list := make([]*MatchPresence, 0, len(m.presences))
for _, presence := range m.presences {
list = append(list, presence.Presence)
}
m.RUnlock()
return list
Expand Down
22 changes: 13 additions & 9 deletions server/match_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ type MatchRegistry interface {
// Returns the total number of currently active authoritative matches.
Count() int

// Pass a user join attempt to a match handler. Returns if the match was found, if the join was accepted, a reason for any rejection, and the match label.
JoinAttempt(ctx context.Context, id uuid.UUID, node string, userID, sessionID uuid.UUID, username, fromNode string, metadata map[string]string) (bool, bool, string, string)
// Pass a user join attempt to a match handler. Returns if the match was found, if the join was accepted, if it's a new user for this match, a reason for any rejection, the match label, and the list of existing match participants.
JoinAttempt(ctx context.Context, id uuid.UUID, node string, userID, sessionID uuid.UUID, username, fromNode string, metadata map[string]string) (bool, bool, bool, string, string, []*MatchPresence)
// Notify a match handler that one or more users have successfully joined the match.
// Expects that the caller has already determined the match is hosted on the current node.
Join(id uuid.UUID, presences []*MatchPresence)
Expand All @@ -94,7 +94,6 @@ type MatchRegistry interface {
}

type LocalMatchRegistry struct {
sync.RWMutex
logger *zap.Logger
config Config
tracker Tracker
Expand Down Expand Up @@ -455,34 +454,39 @@ func (r *LocalMatchRegistry) Count() int {
return int(r.matchCount.Load())
}

func (r *LocalMatchRegistry) JoinAttempt(ctx context.Context, id uuid.UUID, node string, userID, sessionID uuid.UUID, username, fromNode string, metadata map[string]string) (bool, bool, string, string) {
func (r *LocalMatchRegistry) JoinAttempt(ctx context.Context, id uuid.UUID, node string, userID, sessionID uuid.UUID, username, fromNode string, metadata map[string]string) (bool, bool, bool, string, string, []*MatchPresence) {
if node != r.node {
return false, false, "", ""
return false, false, false, "", "", nil
}

m, ok := r.matches.Load(id)
if !ok {
return false, false, "", ""
return false, false, false, "", "", nil
}
mh := m.(*MatchHandler)

if mh.PresenceList.Contains(&PresenceID{Node: fromNode, SessionID: sessionID}) {
// The user is already part of this match.
return true, true, false, "", mh.Label(), mh.PresenceList.ListPresences()
}

resultCh := make(chan *MatchJoinResult, 1)
if !mh.QueueJoinAttempt(ctx, resultCh, userID, sessionID, username, fromNode, metadata) {
// The match call queue was full, so will be closed and therefore can't be joined.
return true, false, "Match is not currently accepting join requests", ""
return true, false, false, "Match is not currently accepting join requests", "", nil
}

// Set up a limit to how long the call will wait, default is 10 seconds.
timer := time.NewTimer(time.Second * 10)
select {
case <-timer.C:
// The join attempt has timed out, join is assumed to be rejected.
return true, false, "", ""
return true, false, false, "", "", nil
case r := <-resultCh:
// Doesn't matter if the timer has fired concurrently, we're in the desired case anyway.
timer.Stop()
// The join attempt has returned a result.
return true, r.Allow, r.Reason, r.Label
return true, r.Allow, true, r.Reason, r.Label, mh.PresenceList.ListPresences()
}
}

Expand Down
135 changes: 67 additions & 68 deletions server/pipeline_match.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,37 +145,55 @@ func (p *Pipeline) matchJoin(logger *zap.Logger, session Session, envelope *rtap
return
}

// Decide if it's an authoritative or relayed match.
mode := StreamModeMatchRelayed
if node != "" {
mode = StreamModeMatchAuthoritative
}
var mode uint8
var label *wrappers.StringValue
var presences []*rtapi.UserPresence
username := session.Username()
if node == "" {
// Relayed match.
mode = StreamModeMatchRelayed
stream := PresenceStream{Mode: mode, Subject: matchID, Label: node}

stream := PresenceStream{Mode: mode, Subject: matchID, Label: node}
if !allowEmpty && !p.tracker.StreamExists(stream) {
session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
Code: int32(rtapi.Error_MATCH_NOT_FOUND),
Message: "Match not found",
}}})
return
}

// Relayed matches must 'exist' by already having some members, unless they're being joined via a token.
if mode == StreamModeMatchRelayed && !allowEmpty && !p.tracker.StreamExists(stream) {
session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
Code: int32(rtapi.Error_MATCH_NOT_FOUND),
Message: "Match not found",
}}})
return
}
isNew := p.tracker.GetLocalBySessionIDStreamUserID(session.ID(), stream, session.UserID()) == nil
if isNew {
m := PresenceMeta{
Username: username,
Format: session.Format(),
}
if success, _ := p.tracker.Track(session.ID(), stream, session.UserID(), m, false); !success {
// Presence creation was rejected due to `allowIfFirstForSession` flag, session is gone so no need to reply.
return
}
}

var label *wrappers.StringValue
meta := p.tracker.GetLocalBySessionIDStreamUserID(session.ID(), stream, session.UserID())
isNew := meta == nil
if isNew {
username := session.Username()
found := true
allow := true
var reason string
var l string
// The user is not yet part of the match, attempt to join.
if mode == StreamModeMatchAuthoritative {
// If it's an authoritative match, ask the match handler if it will allow the join.
found, allow, reason, l = p.matchRegistry.JoinAttempt(session.Context(), matchID, node, session.UserID(), session.ID(), username, p.node, incoming.Metadata)
// Whether the user has just (successfully) joined the match or was already a member, return the match info anyway.
ps := p.tracker.ListByStream(stream, false, true)
presences = make([]*rtapi.UserPresence, 0, len(ps))
for _, p := range ps {
if isNew && p.UserID == session.UserID() && p.ID.SessionID == session.ID() {
// Ensure the user themselves does not appear in the list of existing match presences.
// Only for new joins, not if the user is joining a match they're already part of.
continue
}
presences = append(presences, &rtapi.UserPresence{
UserId: p.UserID.String(),
SessionId: p.ID.SessionID.String(),
Username: p.Meta.Username,
})
}
} else {
// Authoritative match.
mode = StreamModeMatchAuthoritative

found, allow, isNew, reason, l, ps := p.matchRegistry.JoinAttempt(session.Context(), matchID, node, session.UserID(), session.ID(), username, p.node, incoming.Metadata)
if !found {
// Match did not exist.
session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
Expand All @@ -196,49 +214,30 @@ func (p *Pipeline) matchJoin(logger *zap.Logger, session Session, envelope *rtap
}}})
return
}
m := PresenceMeta{
Username: username,
Format: session.Format(),
}
if success, _ := p.tracker.Track(session.ID(), stream, session.UserID(), m, false); !success {
// Presence creation was rejected due to `allowIfFirstForSession` flag, session is gone so no need to reply.
return
}
if mode == StreamModeMatchAuthoritative {
// If we've reached here, it was an accepted authoritative join.
label = &wrappers.StringValue{Value: l}
}
meta = &m
} else if mode == StreamModeMatchAuthoritative {
// The user was already in the match, and it's an authoritative match.
// Look up the match label to return it anyway.
l, err := p.matchRegistry.GetMatchLabel(session.Context(), matchID, node)
if err != nil {
// There was a problem looking up the label.
logger.Error("Error looking up match label", zap.String("match_id", matchIDString), zap.String("node", node), zap.Error(err))
session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
Code: int32(rtapi.Error_RUNTIME_EXCEPTION),
Message: "Match label lookup failed.",
}}})
return

if isNew {
stream := PresenceStream{Mode: mode, Subject: matchID, Label: node}
m := PresenceMeta{
Username: session.Username(),
Format: session.Format(),
}
p.tracker.Track(session.ID(), stream, session.UserID(), m, false)
}
label = &wrappers.StringValue{Value: l}
}

// Whether the user has just (successfully) joined the match or was already a member, return the match info anyway.
ps := p.tracker.ListByStream(stream, false, true)
presences := make([]*rtapi.UserPresence, 0, len(ps))
for _, p := range ps {
if isNew && p.UserID == session.UserID() && p.ID.SessionID == session.ID() {
// Ensure the user themselves does not appear in the list of existing match presences.
// Only for new joins, not if the user is joining a match they're already part of.
continue
label = &wrappers.StringValue{Value: l}
presences = make([]*rtapi.UserPresence, 0, len(ps))
for _, p := range ps {
if isNew && p.UserID == session.UserID() && p.SessionID == session.ID() {
// Ensure the user themselves does not appear in the list of existing match presences.
// Only for new joins, not if the user is joining a match they're already part of.
continue
}
presences = append(presences, &rtapi.UserPresence{
UserId: p.UserID.String(),
SessionId: p.SessionID.String(),
Username: p.Username,
})
}
presences = append(presences, &rtapi.UserPresence{
UserId: p.UserID.String(),
SessionId: p.ID.SessionID.String(),
Username: p.Meta.Username,
})
}

session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Match{Match: &rtapi.Match{
Expand All @@ -250,7 +249,7 @@ func (p *Pipeline) matchJoin(logger *zap.Logger, session Session, envelope *rtap
Self: &rtapi.UserPresence{
UserId: session.UserID().String(),
SessionId: session.ID().String(),
Username: meta.Username,
Username: username,
},
}}})
}
Expand Down
4 changes: 2 additions & 2 deletions server/runtime_go_match_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (r *RuntimeGoMatchCore) validateBroadcast(opCode int64, data []byte, presen
}
} else {
// Validate multiple filtered recipients.
actualPresenceIDs := r.presenceList.List()
actualPresenceIDs := r.presenceList.ListPresenceIDs()
for i := 0; i < len(presenceIDs); i++ {
found := false
presenceID := presenceIDs[i]
Expand Down Expand Up @@ -288,7 +288,7 @@ func (r *RuntimeGoMatchCore) validateBroadcast(opCode int64, data []byte, presen
}}}

if presenceIDs == nil {
presenceIDs = r.presenceList.List()
presenceIDs = r.presenceList.ListPresenceIDs()
}

return presenceIDs, msg, nil
Expand Down
4 changes: 2 additions & 2 deletions server/runtime_lua_match_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ func (r *RuntimeLuaMatchCore) validateBroadcast(l *lua.LState) ([]*PresenceID, *
return nil, nil
}
} else {
actualPresenceIDs := r.presenceList.List()
actualPresenceIDs := r.presenceList.ListPresenceIDs()
for i := 0; i < len(presenceIDs); i++ {
found := false
presenceID := presenceIDs[i]
Expand Down Expand Up @@ -709,7 +709,7 @@ func (r *RuntimeLuaMatchCore) validateBroadcast(l *lua.LState) ([]*PresenceID, *
}}}

if presenceIDs == nil {
presenceIDs = r.presenceList.List()
presenceIDs = r.presenceList.ListPresenceIDs()
}

return presenceIDs, msg
Expand Down

0 comments on commit c393152

Please sign in to comment.