Skip to content

Commit

Permalink
Only update delta in subscribe2
Browse files Browse the repository at this point in the history
  • Loading branch information
neekolas committed Jan 24, 2024
1 parent 987915d commit 34a1ef5
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions pkg/api/message/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,8 @@ func (s *Service) Subscribe2(stream proto.MessageApi_Subscribe2Server) error {
}
log.Info("updating subscription", zap.Int("num_content_topics", len(req.ContentTopics)))

metrics.EmitSubscribeTopics(stream.Context(), log, len(req.ContentTopics))

topics := map[string]bool{}
numSubscribes := 0
for _, topic := range req.ContentTopics {
topics[topic] = true

Expand Down Expand Up @@ -295,20 +294,21 @@ func (s *Service) Subscribe2(stream proto.MessageApi_Subscribe2Server) error {
return err
}
subs[topic] = sub
numSubscribes++
}
}

// If subscription not in topic, then unsubscribe.
var count int
var numUnsubscribes int
for topic, sub := range subs {
if topics[topic] {
continue
}
_ = sub.Unsubscribe()
delete(subs, topic)
count++
numUnsubscribes++
}
metrics.EmitUnsubscribeTopics(stream.Context(), log, count)
metrics.EmitUnsubscribeTopics(stream.Context(), log, numSubscribes-numUnsubscribes)
}
}
}
Expand Down

0 comments on commit 34a1ef5

Please sign in to comment.