diff --git a/server/match_presence.go b/server/match_presence.go index fc4bafb7dc..de02c99d2c 100644 --- a/server/match_presence.go +++ b/server/match_presence.go @@ -103,15 +103,20 @@ func (m *MatchJoinMarkerList) ClearExpired(tick int64) []*MatchPresence { type MatchPresenceList struct { sync.RWMutex size *atomic.Int32 - presences []*PresenceID - presenceMap map[uuid.UUID]struct{} + presences []*MatchPresenceListItem + presenceMap map[uuid.UUID]string +} + +type MatchPresenceListItem struct { + PresenceID *PresenceID + Presence *MatchPresence } func NewMatchPresenceList() *MatchPresenceList { return &MatchPresenceList{ size: atomic.NewInt32(0), - presences: make([]*PresenceID, 0, 10), - presenceMap: make(map[uuid.UUID]struct{}, 10), + presences: make([]*MatchPresenceListItem, 0, 10), + presenceMap: make(map[uuid.UUID]string, 10), } } @@ -120,11 +125,14 @@ func (m *MatchPresenceList) Join(joins []*MatchPresence) []*MatchPresence { m.Lock() for _, join := range joins { if _, ok := m.presenceMap[join.SessionID]; !ok { - m.presences = append(m.presences, &PresenceID{ - Node: join.Node, - SessionID: join.SessionID, + m.presences = append(m.presences, &MatchPresenceListItem{ + PresenceID: &PresenceID{ + Node: join.Node, + SessionID: join.SessionID, + }, + Presence: join, }) - m.presenceMap[join.SessionID] = struct{}{} + m.presenceMap[join.SessionID] = join.Node processed = append(processed, join) } } @@ -140,8 +148,8 @@ func (m *MatchPresenceList) Leave(leaves []*MatchPresence) []*MatchPresence { m.Lock() for _, leave := range leaves { if _, ok := m.presenceMap[leave.SessionID]; ok { - for i, presenceID := range m.presences { - if presenceID.SessionID == leave.SessionID && presenceID.Node == leave.Node { + for i, presence := range m.presences { + if presence.PresenceID.SessionID == leave.SessionID && presence.PresenceID.Node == leave.Node { m.presences = append(m.presences[:i], m.presences[i+1:]...) break } @@ -160,21 +168,28 @@ func (m *MatchPresenceList) Leave(leaves []*MatchPresence) []*MatchPresence { func (m *MatchPresenceList) Contains(presence *PresenceID) bool { var found bool m.RLock() - for _, p := range m.presences { - if p.SessionID == presence.SessionID && p.Node == p.Node { - found = true - break - } + if node, ok := m.presenceMap[presence.SessionID]; ok { + found = node == presence.Node } m.RUnlock() return found } -func (m *MatchPresenceList) List() []*PresenceID { +func (m *MatchPresenceList) ListPresenceIDs() []*PresenceID { m.RLock() list := make([]*PresenceID, 0, len(m.presences)) for _, presence := range m.presences { - list = append(list, presence) + list = append(list, presence.PresenceID) + } + m.RUnlock() + return list +} + +func (m *MatchPresenceList) ListPresences() []*MatchPresence { + m.RLock() + list := make([]*MatchPresence, 0, len(m.presences)) + for _, presence := range m.presences { + list = append(list, presence.Presence) } m.RUnlock() return list diff --git a/server/match_registry.go b/server/match_registry.go index 0f961079d1..41bd4eedc4 100644 --- a/server/match_registry.go +++ b/server/match_registry.go @@ -78,8 +78,8 @@ type MatchRegistry interface { // Returns the total number of currently active authoritative matches. Count() int - // Pass a user join attempt to a match handler. Returns if the match was found, if the join was accepted, a reason for any rejection, and the match label. - JoinAttempt(ctx context.Context, id uuid.UUID, node string, userID, sessionID uuid.UUID, username, fromNode string, metadata map[string]string) (bool, bool, string, string) + // Pass a user join attempt to a match handler. Returns if the match was found, if the join was accepted, if it's a new user for this match, a reason for any rejection, the match label, and the list of existing match participants. + JoinAttempt(ctx context.Context, id uuid.UUID, node string, userID, sessionID uuid.UUID, username, fromNode string, metadata map[string]string) (bool, bool, bool, string, string, []*MatchPresence) // Notify a match handler that one or more users have successfully joined the match. // Expects that the caller has already determined the match is hosted on the current node. Join(id uuid.UUID, presences []*MatchPresence) @@ -94,7 +94,6 @@ type MatchRegistry interface { } type LocalMatchRegistry struct { - sync.RWMutex logger *zap.Logger config Config tracker Tracker @@ -455,21 +454,26 @@ func (r *LocalMatchRegistry) Count() int { return int(r.matchCount.Load()) } -func (r *LocalMatchRegistry) JoinAttempt(ctx context.Context, id uuid.UUID, node string, userID, sessionID uuid.UUID, username, fromNode string, metadata map[string]string) (bool, bool, string, string) { +func (r *LocalMatchRegistry) JoinAttempt(ctx context.Context, id uuid.UUID, node string, userID, sessionID uuid.UUID, username, fromNode string, metadata map[string]string) (bool, bool, bool, string, string, []*MatchPresence) { if node != r.node { - return false, false, "", "" + return false, false, false, "", "", nil } m, ok := r.matches.Load(id) if !ok { - return false, false, "", "" + return false, false, false, "", "", nil } mh := m.(*MatchHandler) + if mh.PresenceList.Contains(&PresenceID{Node: fromNode, SessionID: sessionID}) { + // The user is already part of this match. + return true, true, false, "", mh.Label(), mh.PresenceList.ListPresences() + } + resultCh := make(chan *MatchJoinResult, 1) if !mh.QueueJoinAttempt(ctx, resultCh, userID, sessionID, username, fromNode, metadata) { // The match call queue was full, so will be closed and therefore can't be joined. - return true, false, "Match is not currently accepting join requests", "" + return true, false, false, "Match is not currently accepting join requests", "", nil } // Set up a limit to how long the call will wait, default is 10 seconds. @@ -477,12 +481,12 @@ func (r *LocalMatchRegistry) JoinAttempt(ctx context.Context, id uuid.UUID, node select { case <-timer.C: // The join attempt has timed out, join is assumed to be rejected. - return true, false, "", "" + return true, false, false, "", "", nil case r := <-resultCh: // Doesn't matter if the timer has fired concurrently, we're in the desired case anyway. timer.Stop() // The join attempt has returned a result. - return true, r.Allow, r.Reason, r.Label + return true, r.Allow, true, r.Reason, r.Label, mh.PresenceList.ListPresences() } } diff --git a/server/pipeline_match.go b/server/pipeline_match.go index c856c6709f..9d44170a6c 100644 --- a/server/pipeline_match.go +++ b/server/pipeline_match.go @@ -145,37 +145,55 @@ func (p *Pipeline) matchJoin(logger *zap.Logger, session Session, envelope *rtap return } - // Decide if it's an authoritative or relayed match. - mode := StreamModeMatchRelayed - if node != "" { - mode = StreamModeMatchAuthoritative - } + var mode uint8 + var label *wrappers.StringValue + var presences []*rtapi.UserPresence + username := session.Username() + if node == "" { + // Relayed match. + mode = StreamModeMatchRelayed + stream := PresenceStream{Mode: mode, Subject: matchID, Label: node} - stream := PresenceStream{Mode: mode, Subject: matchID, Label: node} + if !allowEmpty && !p.tracker.StreamExists(stream) { + session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{ + Code: int32(rtapi.Error_MATCH_NOT_FOUND), + Message: "Match not found", + }}}) + return + } - // Relayed matches must 'exist' by already having some members, unless they're being joined via a token. - if mode == StreamModeMatchRelayed && !allowEmpty && !p.tracker.StreamExists(stream) { - session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{ - Code: int32(rtapi.Error_MATCH_NOT_FOUND), - Message: "Match not found", - }}}) - return - } + isNew := p.tracker.GetLocalBySessionIDStreamUserID(session.ID(), stream, session.UserID()) == nil + if isNew { + m := PresenceMeta{ + Username: username, + Format: session.Format(), + } + if success, _ := p.tracker.Track(session.ID(), stream, session.UserID(), m, false); !success { + // Presence creation was rejected due to `allowIfFirstForSession` flag, session is gone so no need to reply. + return + } + } - var label *wrappers.StringValue - meta := p.tracker.GetLocalBySessionIDStreamUserID(session.ID(), stream, session.UserID()) - isNew := meta == nil - if isNew { - username := session.Username() - found := true - allow := true - var reason string - var l string - // The user is not yet part of the match, attempt to join. - if mode == StreamModeMatchAuthoritative { - // If it's an authoritative match, ask the match handler if it will allow the join. - found, allow, reason, l = p.matchRegistry.JoinAttempt(session.Context(), matchID, node, session.UserID(), session.ID(), username, p.node, incoming.Metadata) + // Whether the user has just (successfully) joined the match or was already a member, return the match info anyway. + ps := p.tracker.ListByStream(stream, false, true) + presences = make([]*rtapi.UserPresence, 0, len(ps)) + for _, p := range ps { + if isNew && p.UserID == session.UserID() && p.ID.SessionID == session.ID() { + // Ensure the user themselves does not appear in the list of existing match presences. + // Only for new joins, not if the user is joining a match they're already part of. + continue + } + presences = append(presences, &rtapi.UserPresence{ + UserId: p.UserID.String(), + SessionId: p.ID.SessionID.String(), + Username: p.Meta.Username, + }) } + } else { + // Authoritative match. + mode = StreamModeMatchAuthoritative + + found, allow, isNew, reason, l, ps := p.matchRegistry.JoinAttempt(session.Context(), matchID, node, session.UserID(), session.ID(), username, p.node, incoming.Metadata) if !found { // Match did not exist. session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{ @@ -196,49 +214,30 @@ func (p *Pipeline) matchJoin(logger *zap.Logger, session Session, envelope *rtap }}}) return } - m := PresenceMeta{ - Username: username, - Format: session.Format(), - } - if success, _ := p.tracker.Track(session.ID(), stream, session.UserID(), m, false); !success { - // Presence creation was rejected due to `allowIfFirstForSession` flag, session is gone so no need to reply. - return - } - if mode == StreamModeMatchAuthoritative { - // If we've reached here, it was an accepted authoritative join. - label = &wrappers.StringValue{Value: l} - } - meta = &m - } else if mode == StreamModeMatchAuthoritative { - // The user was already in the match, and it's an authoritative match. - // Look up the match label to return it anyway. - l, err := p.matchRegistry.GetMatchLabel(session.Context(), matchID, node) - if err != nil { - // There was a problem looking up the label. - logger.Error("Error looking up match label", zap.String("match_id", matchIDString), zap.String("node", node), zap.Error(err)) - session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{ - Code: int32(rtapi.Error_RUNTIME_EXCEPTION), - Message: "Match label lookup failed.", - }}}) - return + + if isNew { + stream := PresenceStream{Mode: mode, Subject: matchID, Label: node} + m := PresenceMeta{ + Username: session.Username(), + Format: session.Format(), + } + p.tracker.Track(session.ID(), stream, session.UserID(), m, false) } - label = &wrappers.StringValue{Value: l} - } - // Whether the user has just (successfully) joined the match or was already a member, return the match info anyway. - ps := p.tracker.ListByStream(stream, false, true) - presences := make([]*rtapi.UserPresence, 0, len(ps)) - for _, p := range ps { - if isNew && p.UserID == session.UserID() && p.ID.SessionID == session.ID() { - // Ensure the user themselves does not appear in the list of existing match presences. - // Only for new joins, not if the user is joining a match they're already part of. - continue + label = &wrappers.StringValue{Value: l} + presences = make([]*rtapi.UserPresence, 0, len(ps)) + for _, p := range ps { + if isNew && p.UserID == session.UserID() && p.SessionID == session.ID() { + // Ensure the user themselves does not appear in the list of existing match presences. + // Only for new joins, not if the user is joining a match they're already part of. + continue + } + presences = append(presences, &rtapi.UserPresence{ + UserId: p.UserID.String(), + SessionId: p.SessionID.String(), + Username: p.Username, + }) } - presences = append(presences, &rtapi.UserPresence{ - UserId: p.UserID.String(), - SessionId: p.ID.SessionID.String(), - Username: p.Meta.Username, - }) } session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Match{Match: &rtapi.Match{ @@ -250,7 +249,7 @@ func (p *Pipeline) matchJoin(logger *zap.Logger, session Session, envelope *rtap Self: &rtapi.UserPresence{ UserId: session.UserID().String(), SessionId: session.ID().String(), - Username: meta.Username, + Username: username, }, }}}) } diff --git a/server/runtime_go_match_core.go b/server/runtime_go_match_core.go index 7358518bb4..edd0506e10 100644 --- a/server/runtime_go_match_core.go +++ b/server/runtime_go_match_core.go @@ -253,7 +253,7 @@ func (r *RuntimeGoMatchCore) validateBroadcast(opCode int64, data []byte, presen } } else { // Validate multiple filtered recipients. - actualPresenceIDs := r.presenceList.List() + actualPresenceIDs := r.presenceList.ListPresenceIDs() for i := 0; i < len(presenceIDs); i++ { found := false presenceID := presenceIDs[i] @@ -288,7 +288,7 @@ func (r *RuntimeGoMatchCore) validateBroadcast(opCode int64, data []byte, presen }}} if presenceIDs == nil { - presenceIDs = r.presenceList.List() + presenceIDs = r.presenceList.ListPresenceIDs() } return presenceIDs, msg, nil diff --git a/server/runtime_lua_match_core.go b/server/runtime_lua_match_core.go index fb23e2600e..f41e96ed1f 100644 --- a/server/runtime_lua_match_core.go +++ b/server/runtime_lua_match_core.go @@ -674,7 +674,7 @@ func (r *RuntimeLuaMatchCore) validateBroadcast(l *lua.LState) ([]*PresenceID, * return nil, nil } } else { - actualPresenceIDs := r.presenceList.List() + actualPresenceIDs := r.presenceList.ListPresenceIDs() for i := 0; i < len(presenceIDs); i++ { found := false presenceID := presenceIDs[i] @@ -709,7 +709,7 @@ func (r *RuntimeLuaMatchCore) validateBroadcast(l *lua.LState) ([]*PresenceID, * }}} if presenceIDs == nil { - presenceIDs = r.presenceList.List() + presenceIDs = r.presenceList.ListPresenceIDs() } return presenceIDs, msg