Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RPC requests limits & implement centralized subscription handler #354

Merged
merged 24 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 122 additions & 89 deletions pkg/api/message/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"hash/fnv"
"io"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -35,6 +34,20 @@ const (

// 1048576 - 300 - 62 = 1048214
MaxMessageSize = pubsub.DefaultMaxMessageSize - MaxContentTopicNameSize - 62

// maxQueriesPerBatch defines the maximum number of queries we can support per batch.
maxQueriesPerBatch = 50

// maxTopicsPerRequest defines the maximum number of topics that can be queried in a single request.
// the number is likely to be more than we want it to be, but would be a safe place to put it -
// per Test_LargeQueryTesting, the request decoding already failing before it reaches th handler.
maxTopicsPerRequest = 157733
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to rename this to maxTopicsPerQueryBatch to make it clear it's for queries?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But.. it's not per batch. it's used both in the Query endpoint as well as in the BatchQuery.
The subsequent one, maxTopicsPerBatch is the one that is being used per batch.

Copy link
Contributor

@snormore snormore Feb 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, the double use of the term request (as in grpc request and query request) was confusing me. Maybe we can prefix these request references with query to make it clearer, like QueryRequest?

EDIT: I see you already did exactly that 🙏

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already modified it to maxTopicsPerQueryRequest and maxTopicsPerBatchQueryRequest. do you think it's clear enough ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep I think that works 👍


// maxTopicsPerBatch defines the maximum number of topics that can be queried in a batch query. This
// limit is imposed in additional to the per-query limit maxTopicsPerRequest.
// as a starting value, we've using the same value as above, since the entire request would be tossed
// away before this is reached.
maxTopicsPerBatch = maxTopicsPerRequest
)

type Service struct {
Expand All @@ -53,6 +66,8 @@ type Service struct {

ns *server.Server
nc *nats.Conn

subDispatcher *subscriptionDispatcher
}

func NewService(log *zap.Logger, store *store.Store, publishToWakuRelay func(context.Context, *wakupb.WakuMessage) error) (s *Service, err error) {
Expand All @@ -74,11 +89,16 @@ func NewService(log *zap.Logger, store *store.Store, publishToWakuRelay func(con
if !s.ns.ReadyForConnections(4 * time.Second) {
return nil, errors.New("nats not ready")
}

s.nc, err = nats.Connect(s.ns.ClientURL())
if err != nil {
return nil, err
}

s.subDispatcher, err = newSubscriptionDispatcher(s.nc, s.log)
if err != nil {
return nil, err
}
return s, nil
}

Expand All @@ -88,6 +108,7 @@ func (s *Service) Close() {
if s.ctxCancel != nil {
s.ctxCancel()
}
s.subDispatcher.Shutdown()

if s.nc != nil {
s.nc.Close()
Expand Down Expand Up @@ -161,50 +182,45 @@ func (s *Service) Subscribe(req *proto.SubscribeRequest, stream proto.MessageApi

metrics.EmitSubscribeTopics(stream.Context(), log, len(req.ContentTopics))

var streamLock sync.Mutex
// create a topics map.
topics := make(map[string]bool, len(req.ContentTopics))
for _, topic := range req.ContentTopics {
subject := topic
if subject != natsWildcardTopic {
subject = buildNatsSubject(topic)
topics[topic] = true
}
sub := s.subDispatcher.Subscribe(topics)
defer func() {
if sub != nil {
sub.Unsubscribe()
}
sub, err := s.nc.Subscribe(subject, func(msg *nats.Msg) {
var env proto.Envelope
err := pb.Unmarshal(msg.Data, &env)
if err != nil {
log.Error("parsing envelope from bytes", zap.Error(err))
return
}
if topic == natsWildcardTopic && !isValidSubscribeAllTopic(env.ContentTopic) {
return
metrics.EmitUnsubscribeTopics(stream.Context(), log, 1)
}()

var streamLock sync.Mutex
for exit := false; exit == false; {
select {
case msg, open := <-sub.messagesCh:
if open {
func() {
streamLock.Lock()
defer streamLock.Unlock()
err := stream.Send(msg)
if err != nil {
log.Error("sending envelope to subscribe", zap.Error(err))
}
}()
} else {
// channel got closed; likely due to backpressure of the sending channel.
log.Debug("stream closed due to backpressure")
exit = true
}
func() {
streamLock.Lock()
defer streamLock.Unlock()
err := stream.Send(&env)
if err != nil {
log.Error("sending envelope to subscribe", zap.Error(err))
}
}()
})
if err != nil {
log.Error("error subscribing", zap.Error(err), zap.Int("topics", len(req.ContentTopics)))
return err
case <-stream.Context().Done():
log.Debug("stream closed")
exit = true
case <-s.ctx.Done():
log.Info("service closed")
exit = true
}
defer func() {
_ = sub.Unsubscribe()
metrics.EmitUnsubscribeTopics(stream.Context(), log, 1)
}()
}

select {
case <-stream.Context().Done():
log.Debug("stream closed")
break
case <-s.ctx.Done():
log.Info("service closed")
break
}

return nil
}

Expand Down Expand Up @@ -243,14 +259,16 @@ func (s *Service) Subscribe2(stream proto.MessageApi_Subscribe2Server) error {
}
}()

subs := map[string]*nats.Subscription{}
var streamLock sync.Mutex
subscribedTopicCount := 0
var currentSubscription *subscription
defer func() {
for _, sub := range subs {
_ = sub.Unsubscribe()
if currentSubscription != nil {
currentSubscription.Unsubscribe()
metrics.EmitUnsubscribeTopics(stream.Context(), log, subscribedTopicCount)
}
metrics.EmitUnsubscribeTopics(stream.Context(), log, len(subs))
}()
var streamLock sync.Mutex
subscriptionChannel := make(chan *proto.Envelope, 1)
for {
select {
case <-stream.Context().Done():
Expand All @@ -263,52 +281,45 @@ func (s *Service) Subscribe2(stream proto.MessageApi_Subscribe2Server) error {
if req == nil {
continue
}

// unsubscribe first.
if currentSubscription != nil {
currentSubscription.Unsubscribe()
currentSubscription = nil
}
log.Info("updating subscription", zap.Int("num_content_topics", len(req.ContentTopics)))

topics := map[string]bool{}
numSubscribes := 0
for _, topic := range req.ContentTopics {
topics[topic] = true
}

// If topic not in existing subscriptions, then subscribe.
if _, ok := subs[topic]; !ok {
sub, err := s.nc.Subscribe(buildNatsSubject(topic), func(msg *nats.Msg) {
var env proto.Envelope
err := pb.Unmarshal(msg.Data, &env)
if err != nil {
log.Info("unmarshaling envelope", zap.Error(err))
return
}
func() {
streamLock.Lock()
defer streamLock.Unlock()

err = stream.Send(&env)
if err != nil {
log.Error("sending envelope to subscriber", zap.Error(err))
}
}()
})
nextSubscription := s.subDispatcher.Subscribe(topics)
if currentSubscription == nil {
// on the first time, emit subscription
metrics.EmitSubscribeTopics(stream.Context(), log, len(topics))
} else {
// otherwise, emit the change.
metrics.EmitSubscriptionChange(stream.Context(), log, len(topics)-subscribedTopicCount)
}
subscribedTopicCount = len(topics)
subscriptionChannel = nextSubscription.messagesCh
currentSubscription = nextSubscription
case msg, open := <-subscriptionChannel:
if open {
func() {
streamLock.Lock()
defer streamLock.Unlock()
err := stream.Send(msg)
if err != nil {
log.Error("error subscribing", zap.Error(err), zap.Int("topics", len(req.ContentTopics)))
return err
log.Error("sending envelope to subscribe", zap.Error(err))
}
subs[topic] = sub
numSubscribes++
}
}

// If subscription not in topic, then unsubscribe.
var numUnsubscribes int
for topic, sub := range subs {
if topics[topic] {
continue
}
_ = sub.Unsubscribe()
delete(subs, topic)
numUnsubscribes++
}()
} else {
// channel got closed; likely due to backpressure of the sending channel.
log.Debug("stream closed due to backpressure")
return nil
}
metrics.EmitSubscriptionChange(stream.Context(), log, numSubscribes-numUnsubscribes)
}
}
}
Expand All @@ -334,6 +345,10 @@ func (s *Service) Query(ctx context.Context, req *proto.QueryRequest) (*proto.Qu
}

if len(req.ContentTopics) > 1 {
if len(req.ContentTopics) > maxTopicsPerRequest {
log.Info("query exceeded topics count threshold", zap.Int("topics_count", len(req.ContentTopics)))
return nil, status.Errorf(codes.InvalidArgument, "content topic count exceeded maximum topics per request threshold of %d", maxTopicsPerRequest)
}
ri := apicontext.NewRequesterInfo(ctx)
log.Info("query with multiple topics", ri.ZapFields()...)
} else {
Expand Down Expand Up @@ -366,13 +381,35 @@ func (s *Service) BatchQuery(ctx context.Context, req *proto.BatchQueryRequest)
logFunc = log.Info
}
logFunc("large batch query", zap.Int("num_queries", len(req.Requests)))
// NOTE: in our implementation, we implicitly limit batch size to 50 requests
if len(req.Requests) > 50 {
return nil, status.Errorf(codes.InvalidArgument, "cannot exceed 50 requests in single batch")

// NOTE: in our implementation, we implicitly limit batch size to 50 requests (maxQueriesPerBatch = 50)
if len(req.Requests) > maxQueriesPerBatch {
return nil, status.Errorf(codes.InvalidArgument, "cannot exceed %d requests in single batch", maxQueriesPerBatch)
}

// calculate the total number of topics being requested in this batch request.
totalRequestedTopicsCount := 0
for _, query := range req.Requests {
totalRequestedTopicsCount += len(query.ContentTopics)
}

if totalRequestedTopicsCount == 0 {
return nil, status.Errorf(codes.InvalidArgument, "content topics required")
}

// are we still within limits ?
if totalRequestedTopicsCount > maxTopicsPerBatch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be > maxTopicsPerRequest since we're comparing the total used topics in the request?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe the variable name maxTopicsPerBatch wasn't clear enough : it should be the limit for the total number of topics per batch query. I'll update the variable name accordingly.

log.Info("batch query exceeded topics count threshold", zap.Int("topics_count", totalRequestedTopicsCount))
tsachiherman marked this conversation as resolved.
Show resolved Hide resolved
return nil, status.Errorf(codes.InvalidArgument, "batch content topics count exceeded maximum topics per batch threshold of %d", maxTopicsPerBatch)
}

// Naive implementation, perform all sub query requests sequentially
responses := make([]*proto.QueryResponse, 0)
for _, query := range req.Requests {
if len(query.ContentTopics) > maxTopicsPerRequest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be > maxTopicsPerBatch since we're comparing topics in the specific batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not.. it should align with the same limits we have in Service.Query. ( where we had a single query ).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I think maybe I'm interpreting the terms differently; you're saying that the grpc request is the batch and that there are multiple "requests" within that. That seems reasonable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly - we had

  1. grpc request
  2. containing one or more queries ( i.e. either Query or Batch Query )
  3. containing one or more topics.

log.Info("query exceeded topics count threshold", zap.Int("topics_count", len(query.ContentTopics)))
return nil, status.Errorf(codes.InvalidArgument, "content topic count exceeded maximum topics per request threshold of %d", maxTopicsPerRequest)
}
// We execute the query using the existing Query API
resp, err := s.Query(ctx, query)
if err != nil {
Expand All @@ -394,10 +431,6 @@ func buildEnvelope(msg *wakupb.WakuMessage) *proto.Envelope {
}
}

func isValidSubscribeAllTopic(topic string) bool {
return strings.HasPrefix(topic, validXMTPTopicPrefix)
}

func fromWakuTimestamp(ts int64) uint64 {
if ts < 0 {
return 0
Expand Down
Loading
Loading