Skip to content

Commit

Permalink
Revert "Add RPC requests limits & implement centralized subscription …
Browse files Browse the repository at this point in the history
…handler …" (#360)

This reverts commit 1481d1d.
  • Loading branch information
richardhuaaa authored Feb 26, 2024
1 parent bc77427 commit 26a92f4
Show file tree
Hide file tree
Showing 12 changed files with 181 additions and 494 deletions.
208 changes: 89 additions & 119 deletions pkg/api/message/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"hash/fnv"
"io"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -34,20 +35,6 @@ const (

// 1048576 - 300 - 62 = 1048214
MaxMessageSize = pubsub.DefaultMaxMessageSize - MaxContentTopicNameSize - 62

// maxQueriesPerBatch defines the maximum number of queries we can support per batch.
maxQueriesPerBatch = 50

// maxTopicsPerQueryRequest defines the maximum number of topics that can be queried in a single request.
// the number is likely to be more than we want it to be, but would be a safe place to put it -
// per Test_LargeQueryTesting, the request decoding already failing before it reaches th handler.
maxTopicsPerQueryRequest = 157733

// maxTopicsPerBatchQueryRequest defines the maximum number of topics that can be queried in a batch query. This
// limit is imposed in additional to the per-query limit maxTopicsPerRequest.
// as a starting value, we've using the same value as above, since the entire request would be tossed
// away before this is reached.
maxTopicsPerBatchQueryRequest = maxTopicsPerQueryRequest
)

type Service struct {
Expand All @@ -66,8 +53,6 @@ type Service struct {

ns *server.Server
nc *nats.Conn

subDispatcher *subscriptionDispatcher
}

func NewService(log *zap.Logger, store *store.Store, publishToWakuRelay func(context.Context, *wakupb.WakuMessage) error) (s *Service, err error) {
Expand All @@ -89,16 +74,11 @@ func NewService(log *zap.Logger, store *store.Store, publishToWakuRelay func(con
if !s.ns.ReadyForConnections(4 * time.Second) {
return nil, errors.New("nats not ready")
}

s.nc, err = nats.Connect(s.ns.ClientURL())
if err != nil {
return nil, err
}

s.subDispatcher, err = newSubscriptionDispatcher(s.nc, s.log)
if err != nil {
return nil, err
}
return s, nil
}

Expand All @@ -108,7 +88,6 @@ func (s *Service) Close() {
if s.ctxCancel != nil {
s.ctxCancel()
}
s.subDispatcher.Shutdown()

if s.nc != nil {
s.nc.Close()
Expand Down Expand Up @@ -182,45 +161,50 @@ func (s *Service) Subscribe(req *proto.SubscribeRequest, stream proto.MessageApi

metrics.EmitSubscribeTopics(stream.Context(), log, len(req.ContentTopics))

// create a topics map.
topics := make(map[string]bool, len(req.ContentTopics))
var streamLock sync.Mutex
for _, topic := range req.ContentTopics {
topics[topic] = true
}
sub := s.subDispatcher.Subscribe(topics)
defer func() {
if sub != nil {
sub.Unsubscribe()
subject := topic
if subject != natsWildcardTopic {
subject = buildNatsSubject(topic)
}
metrics.EmitUnsubscribeTopics(stream.Context(), log, len(req.ContentTopics))
}()

var streamLock sync.Mutex
for exit := false; !exit; {
select {
case msg, open := <-sub.messagesCh:
if open {
func() {
streamLock.Lock()
defer streamLock.Unlock()
err := stream.Send(msg)
if err != nil {
log.Error("sending envelope to subscribe", zap.Error(err))
}
}()
} else {
// channel got closed; likely due to backpressure of the sending channel.
log.Debug("stream closed due to backpressure")
exit = true
sub, err := s.nc.Subscribe(subject, func(msg *nats.Msg) {
var env proto.Envelope
err := pb.Unmarshal(msg.Data, &env)
if err != nil {
log.Error("parsing envelope from bytes", zap.Error(err))
return
}
case <-stream.Context().Done():
log.Debug("stream closed")
exit = true
case <-s.ctx.Done():
log.Info("service closed")
exit = true
if topic == natsWildcardTopic && !isValidSubscribeAllTopic(env.ContentTopic) {
return
}
func() {
streamLock.Lock()
defer streamLock.Unlock()
err := stream.Send(&env)
if err != nil {
log.Error("sending envelope to subscribe", zap.Error(err))
}
}()
})
if err != nil {
log.Error("error subscribing", zap.Error(err), zap.Int("topics", len(req.ContentTopics)))
return err
}
defer func() {
_ = sub.Unsubscribe()
metrics.EmitUnsubscribeTopics(stream.Context(), log, 1)
}()
}

select {
case <-stream.Context().Done():
log.Debug("stream closed")
break
case <-s.ctx.Done():
log.Info("service closed")
break
}

return nil
}

Expand Down Expand Up @@ -259,16 +243,14 @@ func (s *Service) Subscribe2(stream proto.MessageApi_Subscribe2Server) error {
}
}()

var streamLock sync.Mutex
subscribedTopicCount := 0
var currentSubscription *subscription
subs := map[string]*nats.Subscription{}
defer func() {
if currentSubscription != nil {
currentSubscription.Unsubscribe()
metrics.EmitUnsubscribeTopics(stream.Context(), log, subscribedTopicCount)
for _, sub := range subs {
_ = sub.Unsubscribe()
}
metrics.EmitUnsubscribeTopics(stream.Context(), log, len(subs))
}()
subscriptionChannel := make(chan *proto.Envelope, 1)
var streamLock sync.Mutex
for {
select {
case <-stream.Context().Done():
Expand All @@ -281,45 +263,52 @@ func (s *Service) Subscribe2(stream proto.MessageApi_Subscribe2Server) error {
if req == nil {
continue
}

// unsubscribe first.
if currentSubscription != nil {
currentSubscription.Unsubscribe()
currentSubscription = nil
}
log.Info("updating subscription", zap.Int("num_content_topics", len(req.ContentTopics)))

topics := map[string]bool{}
numSubscribes := 0
for _, topic := range req.ContentTopics {
topics[topic] = true
}

nextSubscription := s.subDispatcher.Subscribe(topics)
if currentSubscription == nil {
// on the first time, emit subscription
metrics.EmitSubscribeTopics(stream.Context(), log, len(topics))
} else {
// otherwise, emit the change.
metrics.EmitSubscriptionChange(stream.Context(), log, len(topics)-subscribedTopicCount)
}
subscribedTopicCount = len(topics)
subscriptionChannel = nextSubscription.messagesCh
currentSubscription = nextSubscription
case msg, open := <-subscriptionChannel:
if open {
func() {
streamLock.Lock()
defer streamLock.Unlock()
err := stream.Send(msg)
// If topic not in existing subscriptions, then subscribe.
if _, ok := subs[topic]; !ok {
sub, err := s.nc.Subscribe(buildNatsSubject(topic), func(msg *nats.Msg) {
var env proto.Envelope
err := pb.Unmarshal(msg.Data, &env)
if err != nil {
log.Info("unmarshaling envelope", zap.Error(err))
return
}
func() {
streamLock.Lock()
defer streamLock.Unlock()

err = stream.Send(&env)
if err != nil {
log.Error("sending envelope to subscriber", zap.Error(err))
}
}()
})
if err != nil {
log.Error("sending envelope to subscribe", zap.Error(err))
log.Error("error subscribing", zap.Error(err), zap.Int("topics", len(req.ContentTopics)))
return err
}
}()
} else {
// channel got closed; likely due to backpressure of the sending channel.
log.Debug("stream closed due to backpressure")
return nil
subs[topic] = sub
numSubscribes++
}
}

// If subscription not in topic, then unsubscribe.
var numUnsubscribes int
for topic, sub := range subs {
if topics[topic] {
continue
}
_ = sub.Unsubscribe()
delete(subs, topic)
numUnsubscribes++
}
metrics.EmitSubscriptionChange(stream.Context(), log, numSubscribes-numUnsubscribes)
}
}
}
Expand All @@ -345,9 +334,6 @@ func (s *Service) Query(ctx context.Context, req *proto.QueryRequest) (*proto.Qu
}

if len(req.ContentTopics) > 1 {
if len(req.ContentTopics) > maxTopicsPerQueryRequest {
return nil, status.Errorf(codes.InvalidArgument, "the number of content topics(%d) exceed the maximum topics per query request (%d)", len(req.ContentTopics), maxTopicsPerQueryRequest)
}
ri := apicontext.NewRequesterInfo(ctx)
log.Info("query with multiple topics", ri.ZapFields()...)
} else {
Expand Down Expand Up @@ -380,33 +366,13 @@ func (s *Service) BatchQuery(ctx context.Context, req *proto.BatchQueryRequest)
logFunc = log.Info
}
logFunc("large batch query", zap.Int("num_queries", len(req.Requests)))

// NOTE: in our implementation, we implicitly limit batch size to 50 requests (maxQueriesPerBatch = 50)
if len(req.Requests) > maxQueriesPerBatch {
return nil, status.Errorf(codes.InvalidArgument, "cannot exceed %d requests in single batch", maxQueriesPerBatch)
}

// calculate the total number of topics being requested in this batch request.
totalRequestedTopicsCount := 0
for _, query := range req.Requests {
totalRequestedTopicsCount += len(query.ContentTopics)
}

if totalRequestedTopicsCount == 0 {
return nil, status.Errorf(codes.InvalidArgument, "content topics required")
}

// are we still within limits ?
if totalRequestedTopicsCount > maxTopicsPerBatchQueryRequest {
return nil, status.Errorf(codes.InvalidArgument, "the total number of content topics(%d) exceed the maximum topics per batch query request(%d)", totalRequestedTopicsCount, maxTopicsPerBatchQueryRequest)
// NOTE: in our implementation, we implicitly limit batch size to 50 requests
if len(req.Requests) > 50 {
return nil, status.Errorf(codes.InvalidArgument, "cannot exceed 50 requests in single batch")
}

// Naive implementation, perform all sub query requests sequentially
responses := make([]*proto.QueryResponse, 0)
for _, query := range req.Requests {
if len(query.ContentTopics) > maxTopicsPerQueryRequest {
return nil, status.Errorf(codes.InvalidArgument, "the number of content topics(%d) exceed the maximum topics per query request (%d)", len(query.ContentTopics), maxTopicsPerQueryRequest)
}
// We execute the query using the existing Query API
resp, err := s.Query(ctx, query)
if err != nil {
Expand All @@ -428,6 +394,10 @@ func buildEnvelope(msg *wakupb.WakuMessage) *proto.Envelope {
}
}

func isValidSubscribeAllTopic(contentTopic string) bool {
return strings.HasPrefix(contentTopic, validXMTPTopicPrefix) || topic.IsMLSV1(contentTopic)
}

func fromWakuTimestamp(ts int64) uint64 {
if ts < 0 {
return 0
Expand Down
Loading

0 comments on commit 26a92f4

Please sign in to comment.