Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RPC requests limits & implement centralized subscription handler #354

Merged
merged 24 commits into from
Feb 22, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 120 additions & 33 deletions pkg/api/message/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"strings"
"sync"
"sync/atomic"
"time"

pubsub "github.com/libp2p/go-libp2p-pubsub"
Expand Down Expand Up @@ -35,6 +36,24 @@ const (

// 1048576 - 300 - 62 = 1048214
MaxMessageSize = pubsub.DefaultMaxMessageSize - MaxContentTopicNameSize - 62

// maxRequestLimitPerBatch defines the maximum number of request we can support per batch.
maxRequestLimitPerBatch = 50

// maxTopicsPerRequest defines the maximum number of topics that can be queried in a single request.
maxTopicsPerRequest = 1024

// maxTopicsPerBatch defines the maximum number of topics that can be queried in a batch query. This
// limit is imposed in additional to the per-query limit maxTopicsPerRequest.
maxTopicsPerBatch = 4096
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a metric that shows up to 90k in the same request, so dropping it down to 4k would break that usage, and I doubt those clients have anything in place currently to auto-adjust down to something lower because it'd be a new error.

Curious what @neekolas thinks about having a limit here too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you tell me more about these clients ? are these the notification servers ?

Copy link
Contributor

@snormore snormore Feb 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The notification servers tend to use the SubscribeAll endpoint which just listens to the whole firehose rather than specific topics. These clients are more likely chatbots that listen to a lot of topics for their users. We could push them to use SubscribeAll but today they're listening to individual topics via this endpoint. I had a wip branch from a few weeks ago that updates this subscribe endpoint to use the wildcard (listen to all topics) inside of this Subscribe endpoint if the batch size exceeds a threshold to optimize memory usage (reduce open goroutines where there's a goroutine per topic subscription now), that could be worth implementing here instead of a limit on the batch size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, so I think that now I understand where the memory issue is coming from.

I was thinking about #topic ~= number of concurrent conversation in the inbox. but I wan't aware of chatbots.

These are completely different use case, with (potentially) different attributes : we might want to favor slower operation @ a lower resource consumption for that.

I'll spend some more time looking into that, but I do think your direction is the way to go.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @snormore that a hard limit isn't an option. If we were to change the behaviour for existing apps, we'd want to give them a very generous warning and a reasonable alternative.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you be ok if we'll start off with a very high number, and add configurable options later on ? i.e. start with a 1M limit ( far beyond what would crash the server )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above metric mention by @snormore applies to the subscribe, not for the query, btw. We don't have a metric, afaik, that tracks the query size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the numerics here per a unit-test created, where I tested the max possible topics count.
This would guarantee it's won't break anything for the time being, and we will be able to adjust it in the future.

Copy link
Collaborator

@neekolas neekolas Feb 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like 1M per request would be fine. It's way above what any valid user needs.

If we were to enforce the limit per IP, we'd just have to be very sure the bookkeeping is correct. For example, that a client can't subscribe to 100,000 topics, disconnect, resubscribe to the same list, and repeat until they push themselves over the limit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree - IP filtering isn't trivial. But it's also a bit beyond the scope of this PR. The goal here was to apply some basic and immediate remedies to the existing (crashing) node on the "happy" path.
I can look into IP filtering next.


// maxConcurrentSubscribedTopics is the total number of subscribed topics currently being registered by our node.
// request to surpass that limit would be rejected, as the server would not be able to scale to that limit.
maxConcurrentSubscribedTopics = 1024 * 1024
Copy link
Contributor

@snormore snormore Feb 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The upper bound for this depends on available memory in the node, which changes depending on the deployment environment (devnet vs prod). Currently in prod we've seen over 6M concurrent topic subscriptions before memory warnings start firing.

Alternatively, or in additional to this, I think it would be great to limit concurrent topic subscriptions by IP. Maybe even to start with that vs a global limit across all IPs, but I'm not against having a global limit either, it just might need to be configurable via CLI option instead of a const.

Curious about @neekolas's thoughts on this too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree - we need multiple-tiers of limits to ensure the service won't be disrupted due to 1% of the clients placing unrealistic load on the server. I'm very much open to any other proposed default, which we would be able to dial-in later on. Does 100M sounds better as a starting point ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i.e. use a const for default, and override it via .env/config file/cli.

Copy link
Contributor

@snormore snormore Feb 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable to me. If we implement something like main...snor/sub-nats-wildcard-for-many-topics then 6M will no longer be the current prod bound too since memory usage won't be linear based on number of topic subscriptions, or at least not to the degree we have today with goroutines growing with topic subscriptions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of much more fundamental change to address this level of scale. having go-routine per subscription is ridiculous and won't scale. Instead, I think that we should do the following:

  1. create a single subscription per node, which would receive all the messages.
  2. for each subscription (regardless of it's size), create a matching topics map.
  3. when a message received, compare it to all the maps, after Unmarshal'ing it once, and dispatch it to all the recipients.

this approach is less efficient in quiescent scenarios, but would create a limit on the rendered dynamic system load.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's going to be very difficult to determine what the right global limit is. Rejecting subscription requests might avoid one incident (nodes restarting), but it will create another incident (subscriptions mysteriously failing).

Per IP, we can at least get a sense of what is reasonable and put a cap in so that 99th percentile user doesn't break the network for everyone.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the updated code doesn't reject any subscription requests anymore.


// maxSubscribedTopicsPerConnection is the total number of subscribed topics currently being registered by a
// single subscriber ( rpc session ). A subscriber could bypass that limit by using multiple rpc sessions.
maxSubscribedTopicsPerConnection = 4096
)

type Service struct {
Expand All @@ -53,13 +72,19 @@ type Service struct {

ns *server.Server
nc *nats.Conn

// totalSubscribedTopics is the total number of subscribed entries, across all the connections.
// this variable should be access via atomic calls only. It's a pointer variable to ensure
// that it would be 64-bit aligned on 32-bit platforms.
totalSubscribedTopics *uint64
}

func NewService(log *zap.Logger, store *store.Store, publishToWakuRelay func(context.Context, *wakupb.WakuMessage) error) (s *Service, err error) {
s = &Service{
log: log.Named("message/v1"),
store: store,
publishToWakuRelay: publishToWakuRelay,
log: log.Named("message/v1"),
store: store,
publishToWakuRelay: publishToWakuRelay,
totalSubscribedTopics: new(uint64),
}
s.ctx, s.ctxCancel = context.WithCancel(context.Background())

Expand Down Expand Up @@ -155,6 +180,17 @@ func (s *Service) Subscribe(req *proto.SubscribeRequest, stream proto.MessageApi
log.Debug("started")
defer log.Debug("stopped")

if len(req.ContentTopics) > maxSubscribedTopicsPerConnection {
log.Info("subscribe request exceeded topics count threshold", zap.Int("topics_count", len(req.ContentTopics)))
return status.Errorf(codes.InvalidArgument, "content topic count exceeded maximum topics per subscribe threshold of %d", maxSubscribedTopicsPerConnection)
}

// check the server limits
if len(req.ContentTopics)+int(atomic.LoadUint64(s.totalSubscribedTopics)) > maxConcurrentSubscribedTopics {
log.Info("subscribe request would exceeded concurrent topics count threshold", zap.Int("topics_count", len(req.ContentTopics)))
return status.Errorf(codes.InvalidArgument, "content topic count exceeded concurrent server maximum topics threshold of %d", maxConcurrentSubscribedTopics)
}

// Send a header (any header) to fix an issue with Tonic based GRPC clients.
// See: https://github.com/xmtp/libxmtp/pull/58
_ = stream.SendHeader(metadata.Pairs("subscribed", "true"))
Expand Down Expand Up @@ -190,9 +226,11 @@ func (s *Service) Subscribe(req *proto.SubscribeRequest, stream proto.MessageApi
log.Error("error subscribing", zap.Error(err), zap.Int("topics", len(req.ContentTopics)))
return err
}
atomic.AddUint64(s.totalSubscribedTopics, 1)
defer func() {
_ = sub.Unsubscribe()
metrics.EmitUnsubscribeTopics(stream.Context(), log, 1)
atomic.AddUint64(s.totalSubscribedTopics, ^uint64(0))
}()
}

Expand Down Expand Up @@ -245,6 +283,7 @@ func (s *Service) Subscribe2(stream proto.MessageApi_Subscribe2Server) error {

subs := map[string]*nats.Subscription{}
defer func() {
atomic.AddUint64(s.totalSubscribedTopics, ^uint64(len(subs)-1))
for _, sub := range subs {
_ = sub.Unsubscribe()
}
Expand All @@ -263,41 +302,21 @@ func (s *Service) Subscribe2(stream proto.MessageApi_Subscribe2Server) error {
if req == nil {
continue
}

if len(req.ContentTopics) > maxSubscribedTopicsPerConnection {
log.Info("subscribe2 request exceeded topics count threshold", zap.Int("topics_count", len(req.ContentTopics)))
return status.Errorf(codes.InvalidArgument, "content topic count exceeded maximum topics per subscribe threshold of %d", maxSubscribedTopicsPerConnection)
}

log.Info("updating subscription", zap.Int("num_content_topics", len(req.ContentTopics)))

topics := map[string]bool{}
numSubscribes := 0
for _, topic := range req.ContentTopics {
topics[topic] = true

// If topic not in existing subscriptions, then subscribe.
if _, ok := subs[topic]; !ok {
sub, err := s.nc.Subscribe(buildNatsSubject(topic), func(msg *nats.Msg) {
var env proto.Envelope
err := pb.Unmarshal(msg.Data, &env)
if err != nil {
log.Info("unmarshaling envelope", zap.Error(err))
return
}
func() {
streamLock.Lock()
defer streamLock.Unlock()

err = stream.Send(&env)
if err != nil {
log.Error("sending envelope to subscriber", zap.Error(err))
}
}()
})
if err != nil {
log.Error("error subscribing", zap.Error(err), zap.Int("topics", len(req.ContentTopics)))
return err
}
subs[topic] = sub
numSubscribes++
}
}

// unsubscribe from all the topics we're no longer intereseted in.
// If subscription not in topic, then unsubscribe.
var numUnsubscribes int
for topic, sub := range subs {
Expand All @@ -308,6 +327,48 @@ func (s *Service) Subscribe2(stream proto.MessageApi_Subscribe2Server) error {
delete(subs, topic)
numUnsubscribes++
}
atomic.AddUint64(s.totalSubscribedTopics, ^uint64(numUnsubscribes-1))

// check the server limits
if int(atomic.LoadUint64(s.totalSubscribedTopics))+len(req.ContentTopics)-numUnsubscribes > maxConcurrentSubscribedTopics {
log.Info("subscribe2 request would exceeded concurrent topics count threshold", zap.Int("topics_count", len(req.ContentTopics)))
return status.Errorf(codes.InvalidArgument, "content topic count exceeded concurrent server maximum topics threshold of %d", maxConcurrentSubscribedTopics)
}

// subscribe to the remainder ones.
for _, topic := range req.ContentTopics {
// If topic is already an existing subscription, we can end here.
if _, ok := subs[topic]; ok {
continue
}

// The topic isn't an existing subscription
sub, err := s.nc.Subscribe(buildNatsSubject(topic), func(msg *nats.Msg) {
var env proto.Envelope
err := pb.Unmarshal(msg.Data, &env)
if err != nil {
log.Info("unmarshaling envelope", zap.Error(err))
return
}
// the func call here ensures that we call streamUnlock in all the cases.
func() {
streamLock.Lock()
defer streamLock.Unlock()

err = stream.Send(&env)
if err != nil {
log.Error("sending envelope to subscriber", zap.Error(err))
}
}()
})
if err != nil {
log.Error("error subscribing", zap.Error(err), zap.Int("topics", len(req.ContentTopics)))
return err
}
subs[topic] = sub
numSubscribes++
}
atomic.AddUint64(s.totalSubscribedTopics, uint64(numSubscribes))
metrics.EmitSubscriptionChange(stream.Context(), log, numSubscribes-numUnsubscribes)
}
}
Expand All @@ -334,6 +395,10 @@ func (s *Service) Query(ctx context.Context, req *proto.QueryRequest) (*proto.Qu
}

if len(req.ContentTopics) > 1 {
if len(req.ContentTopics) > maxTopicsPerRequest {
log.Info("query exceeded topics count threshold", zap.Int("topics_count", len(req.ContentTopics)))
return nil, status.Errorf(codes.InvalidArgument, "content topic count exceeded maximum topics per request threshold of %d", maxTopicsPerRequest)
}
ri := apicontext.NewRequesterInfo(ctx)
log.Info("query with multiple topics", ri.ZapFields()...)
} else {
Expand Down Expand Up @@ -366,13 +431,35 @@ func (s *Service) BatchQuery(ctx context.Context, req *proto.BatchQueryRequest)
logFunc = log.Info
}
logFunc("large batch query", zap.Int("num_queries", len(req.Requests)))
// NOTE: in our implementation, we implicitly limit batch size to 50 requests
if len(req.Requests) > 50 {
return nil, status.Errorf(codes.InvalidArgument, "cannot exceed 50 requests in single batch")

// NOTE: in our implementation, we implicitly limit batch size to 50 requests (maxRequestLimitPerBatch = 50)
if len(req.Requests) > maxRequestLimitPerBatch {
tsachiherman marked this conversation as resolved.
Show resolved Hide resolved
return nil, status.Errorf(codes.InvalidArgument, "cannot exceed %d requests in single batch", maxRequestLimitPerBatch)
}

// calculate the total number of topics being requested in this batch request.
totalRequestedTopicsCount := 0
for _, query := range req.Requests {
totalRequestedTopicsCount += len(query.ContentTopics)
}

if totalRequestedTopicsCount == 0 {
return nil, status.Errorf(codes.InvalidArgument, "content topics required")
}

// are we still within limits ?
if totalRequestedTopicsCount > maxRequestLimitPerBatch {
log.Info("batch query exceeded topics count threshold", zap.Int("topics_count", totalRequestedTopicsCount))
tsachiherman marked this conversation as resolved.
Show resolved Hide resolved
return nil, status.Errorf(codes.InvalidArgument, "batch content topics count exceeded maximum topics per batch threshold of %d", maxRequestLimitPerBatch)
}

// Naive implementation, perform all sub query requests sequentially
responses := make([]*proto.QueryResponse, 0)
for _, query := range req.Requests {
if len(query.ContentTopics) > maxTopicsPerRequest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be > maxTopicsPerBatch since we're comparing topics in the specific batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not.. it should align with the same limits we have in Service.Query. ( where we had a single query ).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I think maybe I'm interpreting the terms differently; you're saying that the grpc request is the batch and that there are multiple "requests" within that. That seems reasonable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly - we had

  1. grpc request
  2. containing one or more queries ( i.e. either Query or Batch Query )
  3. containing one or more topics.

log.Info("query exceeded topics count threshold", zap.Int("topics_count", len(query.ContentTopics)))
return nil, status.Errorf(codes.InvalidArgument, "content topic count exceeded maximum topics per request threshold of %d", maxTopicsPerRequest)
}
// We execute the query using the existing Query API
resp, err := s.Query(ctx, query)
if err != nil {
Expand Down
Loading