Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore centralized subscription handler #361

Merged
merged 9 commits into from
Feb 27, 2024
210 changes: 120 additions & 90 deletions pkg/api/message/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"hash/fnv"
"io"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -35,6 +34,20 @@ const (

// 1048576 - 300 - 62 = 1048214
MaxMessageSize = pubsub.DefaultMaxMessageSize - MaxContentTopicNameSize - 62

// maxQueriesPerBatch defines the maximum number of queries we can support per batch.
maxQueriesPerBatch = 50

// maxTopicsPerQueryRequest defines the maximum number of topics that can be queried in a single request.
// the number is likely to be more than we want it to be, but would be a safe place to put it -
// per Test_LargeQueryTesting, the request decoding already failing before it reaches th handler.
// maxTopicsPerQueryRequest = 157733

// maxTopicsPerBatchQueryRequest defines the maximum number of topics that can be queried in a batch query. This
// limit is imposed in additional to the per-query limit maxTopicsPerRequest.
// as a starting value, we've using the same value as above, since the entire request would be tossed
// away before this is reached.
// maxTopicsPerBatchQueryRequest = maxTopicsPerQueryRequest
)

type Service struct {
Expand All @@ -53,6 +66,8 @@ type Service struct {

ns *server.Server
nc *nats.Conn

subDispatcher *subscriptionDispatcher
}

func NewService(log *zap.Logger, store *store.Store, publishToWakuRelay func(context.Context, *wakupb.WakuMessage) error) (s *Service, err error) {
Expand All @@ -74,11 +89,16 @@ func NewService(log *zap.Logger, store *store.Store, publishToWakuRelay func(con
if !s.ns.ReadyForConnections(4 * time.Second) {
return nil, errors.New("nats not ready")
}

s.nc, err = nats.Connect(s.ns.ClientURL())
if err != nil {
return nil, err
}

s.subDispatcher, err = newSubscriptionDispatcher(s.nc, s.log)
if err != nil {
return nil, err
}
return s, nil
}

Expand All @@ -88,6 +108,7 @@ func (s *Service) Close() {
if s.ctxCancel != nil {
s.ctxCancel()
}
s.subDispatcher.Shutdown()

if s.nc != nil {
s.nc.Close()
Expand Down Expand Up @@ -161,50 +182,45 @@ func (s *Service) Subscribe(req *proto.SubscribeRequest, stream proto.MessageApi

metrics.EmitSubscribeTopics(stream.Context(), log, len(req.ContentTopics))

var streamLock sync.Mutex
// create a topics map.
topics := make(map[string]bool, len(req.ContentTopics))
for _, topic := range req.ContentTopics {
subject := topic
if subject != natsWildcardTopic {
subject = buildNatsSubject(topic)
topics[topic] = true
}
sub := s.subDispatcher.Subscribe(topics)
defer func() {
if sub != nil {
sub.Unsubscribe()
}
sub, err := s.nc.Subscribe(subject, func(msg *nats.Msg) {
var env proto.Envelope
err := pb.Unmarshal(msg.Data, &env)
if err != nil {
log.Error("parsing envelope from bytes", zap.Error(err))
return
}
if topic == natsWildcardTopic && !isValidSubscribeAllTopic(env.ContentTopic) {
return
metrics.EmitUnsubscribeTopics(stream.Context(), log, len(req.ContentTopics))
}()

var streamLock sync.Mutex
for exit := false; !exit; {
select {
case msg, open := <-sub.messagesCh:
if open {
func() {
streamLock.Lock()
defer streamLock.Unlock()
err := stream.Send(msg)
if err != nil {
log.Error("sending envelope to subscribe", zap.Error(err))
}
}()
} else {
// channel got closed; likely due to backpressure of the sending channel.
log.Info("stream closed due to backpressure")
exit = true
}
func() {
streamLock.Lock()
defer streamLock.Unlock()
err := stream.Send(&env)
if err != nil {
log.Error("sending envelope to subscribe", zap.Error(err))
}
}()
})
if err != nil {
log.Error("error subscribing", zap.Error(err), zap.Int("topics", len(req.ContentTopics)))
return err
case <-stream.Context().Done():
log.Debug("stream closed")
exit = true
case <-s.ctx.Done():
log.Info("service closed")
exit = true
}
defer func() {
_ = sub.Unsubscribe()
metrics.EmitUnsubscribeTopics(stream.Context(), log, 1)
}()
}

select {
case <-stream.Context().Done():
log.Debug("stream closed")
break
case <-s.ctx.Done():
log.Info("service closed")
break
}

return nil
}

Expand Down Expand Up @@ -243,14 +259,16 @@ func (s *Service) Subscribe2(stream proto.MessageApi_Subscribe2Server) error {
}
}()

subs := map[string]*nats.Subscription{}
var streamLock sync.Mutex
subscribedTopicCount := 0
var currentSubscription *subscription
defer func() {
for _, sub := range subs {
_ = sub.Unsubscribe()
if currentSubscription != nil {
currentSubscription.Unsubscribe()
metrics.EmitUnsubscribeTopics(stream.Context(), log, subscribedTopicCount)
}
metrics.EmitUnsubscribeTopics(stream.Context(), log, len(subs))
}()
var streamLock sync.Mutex
subscriptionChannel := make(chan *proto.Envelope, 1)
for {
select {
case <-stream.Context().Done():
Expand All @@ -263,59 +281,52 @@ func (s *Service) Subscribe2(stream proto.MessageApi_Subscribe2Server) error {
if req == nil {
continue
}

// unsubscribe first.
if currentSubscription != nil {
currentSubscription.Unsubscribe()
currentSubscription = nil
}
log.Info("updating subscription", zap.Int("num_content_topics", len(req.ContentTopics)))

topics := map[string]bool{}
numSubscribes := 0
for _, topic := range req.ContentTopics {
topics[topic] = true
}

// If topic not in existing subscriptions, then subscribe.
if _, ok := subs[topic]; !ok {
sub, err := s.nc.Subscribe(buildNatsSubject(topic), func(msg *nats.Msg) {
var env proto.Envelope
err := pb.Unmarshal(msg.Data, &env)
if err != nil {
log.Info("unmarshaling envelope", zap.Error(err))
return
}
func() {
streamLock.Lock()
defer streamLock.Unlock()

err = stream.Send(&env)
if err != nil {
log.Error("sending envelope to subscriber", zap.Error(err))
}
}()
})
nextSubscription := s.subDispatcher.Subscribe(topics)
if currentSubscription == nil {
// on the first time, emit subscription
metrics.EmitSubscribeTopics(stream.Context(), log, len(topics))
} else {
// otherwise, emit the change.
metrics.EmitSubscriptionChange(stream.Context(), log, len(topics)-subscribedTopicCount)
}
subscribedTopicCount = len(topics)
subscriptionChannel = nextSubscription.messagesCh
currentSubscription = nextSubscription
case msg, open := <-subscriptionChannel:
if open {
func() {
streamLock.Lock()
defer streamLock.Unlock()
err := stream.Send(msg)
if err != nil {
log.Error("error subscribing", zap.Error(err), zap.Int("topics", len(req.ContentTopics)))
return err
log.Error("sending envelope to subscribe", zap.Error(err))
}
subs[topic] = sub
numSubscribes++
}
}

// If subscription not in topic, then unsubscribe.
var numUnsubscribes int
for topic, sub := range subs {
if topics[topic] {
continue
}
_ = sub.Unsubscribe()
delete(subs, topic)
numUnsubscribes++
}()
} else {
// channel got closed; likely due to backpressure of the sending channel.
log.Debug("stream closed due to backpressure")
return nil
}
metrics.EmitSubscriptionChange(stream.Context(), log, numSubscribes-numUnsubscribes)
}
}
}

func (s *Service) SubscribeAll(req *proto.SubscribeAllRequest, stream proto.MessageApi_SubscribeAllServer) error {
log := s.log.Named("subscribeAll")
log.Debug("started")
log.Info("started")
defer log.Debug("stopped")

// Subscribe to all nats subjects via wildcard
Expand All @@ -334,6 +345,9 @@ func (s *Service) Query(ctx context.Context, req *proto.QueryRequest) (*proto.Qu
}

if len(req.ContentTopics) > 1 {
// if len(req.ContentTopics) > maxTopicsPerQueryRequest {
// return nil, status.Errorf(codes.InvalidArgument, "the number of content topics(%d) exceed the maximum topics per query request (%d)", len(req.ContentTopics), maxTopicsPerQueryRequest)
// }
ri := apicontext.NewRequesterInfo(ctx)
log.Info("query with multiple topics", ri.ZapFields()...)
} else {
Expand Down Expand Up @@ -366,13 +380,33 @@ func (s *Service) BatchQuery(ctx context.Context, req *proto.BatchQueryRequest)
logFunc = log.Info
}
logFunc("large batch query", zap.Int("num_queries", len(req.Requests)))
// NOTE: in our implementation, we implicitly limit batch size to 50 requests
if len(req.Requests) > 50 {
return nil, status.Errorf(codes.InvalidArgument, "cannot exceed 50 requests in single batch")

// NOTE: in our implementation, we implicitly limit batch size to 50 requests (maxQueriesPerBatch = 50)
if len(req.Requests) > maxQueriesPerBatch {
return nil, status.Errorf(codes.InvalidArgument, "cannot exceed %d requests in single batch", maxQueriesPerBatch)
}

// calculate the total number of topics being requested in this batch request.
// totalRequestedTopicsCount := 0
// for _, query := range req.Requests {
// totalRequestedTopicsCount += len(query.ContentTopics)
// }

// if totalRequestedTopicsCount == 0 {
// return nil, status.Errorf(codes.InvalidArgument, "content topics required")
// }

// // are we still within limits ?
// if totalRequestedTopicsCount > maxTopicsPerBatchQueryRequest {
// return nil, status.Errorf(codes.InvalidArgument, "the total number of content topics(%d) exceed the maximum topics per batch query request(%d)", totalRequestedTopicsCount, maxTopicsPerBatchQueryRequest)
// }

// Naive implementation, perform all sub query requests sequentially
responses := make([]*proto.QueryResponse, 0)
for _, query := range req.Requests {
// if len(query.ContentTopics) > maxTopicsPerQueryRequest {
// return nil, status.Errorf(codes.InvalidArgument, "the number of content topics(%d) exceed the maximum topics per query request (%d)", len(query.ContentTopics), maxTopicsPerQueryRequest)
// }
// We execute the query using the existing Query API
resp, err := s.Query(ctx, query)
if err != nil {
Expand All @@ -394,10 +428,6 @@ func buildEnvelope(msg *wakupb.WakuMessage) *proto.Envelope {
}
}

func isValidSubscribeAllTopic(contentTopic string) bool {
return strings.HasPrefix(contentTopic, validXMTPTopicPrefix) || topic.IsMLSV1(contentTopic)
}

func fromWakuTimestamp(ts int64) uint64 {
if ts < 0 {
return 0
Expand Down
Loading
Loading