Skip to content

Commit

Permalink
Restore centralized subscription handler (#361)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardhuaaa authored Feb 27, 2024
1 parent 26a92f4 commit b49b8ef
Show file tree
Hide file tree
Showing 12 changed files with 498 additions and 182 deletions.
210 changes: 120 additions & 90 deletions pkg/api/message/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"hash/fnv"
"io"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -35,6 +34,20 @@ const (

// 1048576 - 300 - 62 = 1048214
MaxMessageSize = pubsub.DefaultMaxMessageSize - MaxContentTopicNameSize - 62

// maxQueriesPerBatch defines the maximum number of queries we can support per batch.
maxQueriesPerBatch = 50

// maxTopicsPerQueryRequest defines the maximum number of topics that can be queried in a single request.
// the number is likely to be more than we want it to be, but would be a safe place to put it -
// per Test_LargeQueryTesting, the request decoding already failing before it reaches th handler.
// maxTopicsPerQueryRequest = 157733

// maxTopicsPerBatchQueryRequest defines the maximum number of topics that can be queried in a batch query. This
// limit is imposed in additional to the per-query limit maxTopicsPerRequest.
// as a starting value, we've using the same value as above, since the entire request would be tossed
// away before this is reached.
// maxTopicsPerBatchQueryRequest = maxTopicsPerQueryRequest
)

type Service struct {
Expand All @@ -53,6 +66,8 @@ type Service struct {

ns *server.Server
nc *nats.Conn

subDispatcher *subscriptionDispatcher
}

func NewService(log *zap.Logger, store *store.Store, publishToWakuRelay func(context.Context, *wakupb.WakuMessage) error) (s *Service, err error) {
Expand All @@ -74,11 +89,16 @@ func NewService(log *zap.Logger, store *store.Store, publishToWakuRelay func(con
if !s.ns.ReadyForConnections(4 * time.Second) {
return nil, errors.New("nats not ready")
}

s.nc, err = nats.Connect(s.ns.ClientURL())
if err != nil {
return nil, err
}

s.subDispatcher, err = newSubscriptionDispatcher(s.nc, s.log)
if err != nil {
return nil, err
}
return s, nil
}

Expand All @@ -88,6 +108,7 @@ func (s *Service) Close() {
if s.ctxCancel != nil {
s.ctxCancel()
}
s.subDispatcher.Shutdown()

if s.nc != nil {
s.nc.Close()
Expand Down Expand Up @@ -161,50 +182,45 @@ func (s *Service) Subscribe(req *proto.SubscribeRequest, stream proto.MessageApi

metrics.EmitSubscribeTopics(stream.Context(), log, len(req.ContentTopics))

var streamLock sync.Mutex
// create a topics map.
topics := make(map[string]bool, len(req.ContentTopics))
for _, topic := range req.ContentTopics {
subject := topic
if subject != natsWildcardTopic {
subject = buildNatsSubject(topic)
topics[topic] = true
}
sub := s.subDispatcher.Subscribe(topics)
defer func() {
if sub != nil {
sub.Unsubscribe()
}
sub, err := s.nc.Subscribe(subject, func(msg *nats.Msg) {
var env proto.Envelope
err := pb.Unmarshal(msg.Data, &env)
if err != nil {
log.Error("parsing envelope from bytes", zap.Error(err))
return
}
if topic == natsWildcardTopic && !isValidSubscribeAllTopic(env.ContentTopic) {
return
metrics.EmitUnsubscribeTopics(stream.Context(), log, len(req.ContentTopics))
}()

var streamLock sync.Mutex
for exit := false; !exit; {
select {
case msg, open := <-sub.messagesCh:
if open {
func() {
streamLock.Lock()
defer streamLock.Unlock()
err := stream.Send(msg)
if err != nil {
log.Error("sending envelope to subscribe", zap.Error(err))
}
}()
} else {
// channel got closed; likely due to backpressure of the sending channel.
log.Info("stream closed due to backpressure")
exit = true
}
func() {
streamLock.Lock()
defer streamLock.Unlock()
err := stream.Send(&env)
if err != nil {
log.Error("sending envelope to subscribe", zap.Error(err))
}
}()
})
if err != nil {
log.Error("error subscribing", zap.Error(err), zap.Int("topics", len(req.ContentTopics)))
return err
case <-stream.Context().Done():
log.Debug("stream closed")
exit = true
case <-s.ctx.Done():
log.Info("service closed")
exit = true
}
defer func() {
_ = sub.Unsubscribe()
metrics.EmitUnsubscribeTopics(stream.Context(), log, 1)
}()
}

select {
case <-stream.Context().Done():
log.Debug("stream closed")
break
case <-s.ctx.Done():
log.Info("service closed")
break
}

return nil
}

Expand Down Expand Up @@ -243,14 +259,16 @@ func (s *Service) Subscribe2(stream proto.MessageApi_Subscribe2Server) error {
}
}()

subs := map[string]*nats.Subscription{}
var streamLock sync.Mutex
subscribedTopicCount := 0
var currentSubscription *subscription
defer func() {
for _, sub := range subs {
_ = sub.Unsubscribe()
if currentSubscription != nil {
currentSubscription.Unsubscribe()
metrics.EmitUnsubscribeTopics(stream.Context(), log, subscribedTopicCount)
}
metrics.EmitUnsubscribeTopics(stream.Context(), log, len(subs))
}()
var streamLock sync.Mutex
subscriptionChannel := make(chan *proto.Envelope, 1)
for {
select {
case <-stream.Context().Done():
Expand All @@ -263,59 +281,52 @@ func (s *Service) Subscribe2(stream proto.MessageApi_Subscribe2Server) error {
if req == nil {
continue
}

// unsubscribe first.
if currentSubscription != nil {
currentSubscription.Unsubscribe()
currentSubscription = nil
}
log.Info("updating subscription", zap.Int("num_content_topics", len(req.ContentTopics)))

topics := map[string]bool{}
numSubscribes := 0
for _, topic := range req.ContentTopics {
topics[topic] = true
}

// If topic not in existing subscriptions, then subscribe.
if _, ok := subs[topic]; !ok {
sub, err := s.nc.Subscribe(buildNatsSubject(topic), func(msg *nats.Msg) {
var env proto.Envelope
err := pb.Unmarshal(msg.Data, &env)
if err != nil {
log.Info("unmarshaling envelope", zap.Error(err))
return
}
func() {
streamLock.Lock()
defer streamLock.Unlock()

err = stream.Send(&env)
if err != nil {
log.Error("sending envelope to subscriber", zap.Error(err))
}
}()
})
nextSubscription := s.subDispatcher.Subscribe(topics)
if currentSubscription == nil {
// on the first time, emit subscription
metrics.EmitSubscribeTopics(stream.Context(), log, len(topics))
} else {
// otherwise, emit the change.
metrics.EmitSubscriptionChange(stream.Context(), log, len(topics)-subscribedTopicCount)
}
subscribedTopicCount = len(topics)
subscriptionChannel = nextSubscription.messagesCh
currentSubscription = nextSubscription
case msg, open := <-subscriptionChannel:
if open {
func() {
streamLock.Lock()
defer streamLock.Unlock()
err := stream.Send(msg)
if err != nil {
log.Error("error subscribing", zap.Error(err), zap.Int("topics", len(req.ContentTopics)))
return err
log.Error("sending envelope to subscribe", zap.Error(err))
}
subs[topic] = sub
numSubscribes++
}
}

// If subscription not in topic, then unsubscribe.
var numUnsubscribes int
for topic, sub := range subs {
if topics[topic] {
continue
}
_ = sub.Unsubscribe()
delete(subs, topic)
numUnsubscribes++
}()
} else {
// channel got closed; likely due to backpressure of the sending channel.
log.Debug("stream closed due to backpressure")
return nil
}
metrics.EmitSubscriptionChange(stream.Context(), log, numSubscribes-numUnsubscribes)
}
}
}

func (s *Service) SubscribeAll(req *proto.SubscribeAllRequest, stream proto.MessageApi_SubscribeAllServer) error {
log := s.log.Named("subscribeAll")
log.Debug("started")
log.Info("started")
defer log.Debug("stopped")

// Subscribe to all nats subjects via wildcard
Expand All @@ -334,6 +345,9 @@ func (s *Service) Query(ctx context.Context, req *proto.QueryRequest) (*proto.Qu
}

if len(req.ContentTopics) > 1 {
// if len(req.ContentTopics) > maxTopicsPerQueryRequest {
// return nil, status.Errorf(codes.InvalidArgument, "the number of content topics(%d) exceed the maximum topics per query request (%d)", len(req.ContentTopics), maxTopicsPerQueryRequest)
// }
ri := apicontext.NewRequesterInfo(ctx)
log.Info("query with multiple topics", ri.ZapFields()...)
} else {
Expand Down Expand Up @@ -366,13 +380,33 @@ func (s *Service) BatchQuery(ctx context.Context, req *proto.BatchQueryRequest)
logFunc = log.Info
}
logFunc("large batch query", zap.Int("num_queries", len(req.Requests)))
// NOTE: in our implementation, we implicitly limit batch size to 50 requests
if len(req.Requests) > 50 {
return nil, status.Errorf(codes.InvalidArgument, "cannot exceed 50 requests in single batch")

// NOTE: in our implementation, we implicitly limit batch size to 50 requests (maxQueriesPerBatch = 50)
if len(req.Requests) > maxQueriesPerBatch {
return nil, status.Errorf(codes.InvalidArgument, "cannot exceed %d requests in single batch", maxQueriesPerBatch)
}

// calculate the total number of topics being requested in this batch request.
// totalRequestedTopicsCount := 0
// for _, query := range req.Requests {
// totalRequestedTopicsCount += len(query.ContentTopics)
// }

// if totalRequestedTopicsCount == 0 {
// return nil, status.Errorf(codes.InvalidArgument, "content topics required")
// }

// // are we still within limits ?
// if totalRequestedTopicsCount > maxTopicsPerBatchQueryRequest {
// return nil, status.Errorf(codes.InvalidArgument, "the total number of content topics(%d) exceed the maximum topics per batch query request(%d)", totalRequestedTopicsCount, maxTopicsPerBatchQueryRequest)
// }

// Naive implementation, perform all sub query requests sequentially
responses := make([]*proto.QueryResponse, 0)
for _, query := range req.Requests {
// if len(query.ContentTopics) > maxTopicsPerQueryRequest {
// return nil, status.Errorf(codes.InvalidArgument, "the number of content topics(%d) exceed the maximum topics per query request (%d)", len(query.ContentTopics), maxTopicsPerQueryRequest)
// }
// We execute the query using the existing Query API
resp, err := s.Query(ctx, query)
if err != nil {
Expand All @@ -394,10 +428,6 @@ func buildEnvelope(msg *wakupb.WakuMessage) *proto.Envelope {
}
}

func isValidSubscribeAllTopic(contentTopic string) bool {
return strings.HasPrefix(contentTopic, validXMTPTopicPrefix) || topic.IsMLSV1(contentTopic)
}

func fromWakuTimestamp(ts int64) uint64 {
if ts < 0 {
return 0
Expand Down
Loading

0 comments on commit b49b8ef

Please sign in to comment.