From 7c6c19f9ca9d5e24a19a93e82d8637b1c1e5b1c2 Mon Sep 17 00:00:00 2001 From: Tsachi Herman <24438559+tsachiherman@users.noreply.github.com> Date: Wed, 14 Feb 2024 13:35:52 -0500 Subject: [PATCH 01/23] add reuqests limit. --- pkg/api/message/v1/service.go | 153 ++++++++++++++++++++++++++-------- 1 file changed, 120 insertions(+), 33 deletions(-) diff --git a/pkg/api/message/v1/service.go b/pkg/api/message/v1/service.go index 0b9d81fd..1e539888 100644 --- a/pkg/api/message/v1/service.go +++ b/pkg/api/message/v1/service.go @@ -7,6 +7,7 @@ import ( "io" "strings" "sync" + "sync/atomic" "time" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -35,6 +36,24 @@ const ( // 1048576 - 300 - 62 = 1048214 MaxMessageSize = pubsub.DefaultMaxMessageSize - MaxContentTopicNameSize - 62 + + // maxRequestLimitPerBatch defines the maximum number of request we can support per batch. + maxRequestLimitPerBatch = 50 + + // maxTopicsPerRequest defines the maximum number of topics that can be queried in a single request. + maxTopicsPerRequest = 1024 + + // maxTopicsPerBatch defines the maximum number of topics that can be queried in a batch query. This + // limit is imposed in additional to the per-query limit maxTopicsPerRequest. + maxTopicsPerBatch = 4096 + + // maxConcurrentSubscribedTopics is the total number of subscribed topics currently being registered by our node. + // request to surpass that limit would be rejected, as the server would not be able to scale to that limit. + maxConcurrentSubscribedTopics = 1024 * 1024 + + // maxSubscribedTopicsPerConnection is the total number of subscribed topics currently being registered by a + // single subscriber ( rpc session ). A subscriber could bypass that limit by using multiple rpc sessions. + maxSubscribedTopicsPerConnection = 4096 ) type Service struct { @@ -53,13 +72,19 @@ type Service struct { ns *server.Server nc *nats.Conn + + // totalSubscribedTopics is the total number of subscribed entries, across all the connections. + // this variable should be access via atomic calls only. It's a pointer variable to ensure + // that it would be 64-bit aligned on 32-bit platforms. + totalSubscribedTopics *uint64 } func NewService(log *zap.Logger, store *store.Store, publishToWakuRelay func(context.Context, *wakupb.WakuMessage) error) (s *Service, err error) { s = &Service{ - log: log.Named("message/v1"), - store: store, - publishToWakuRelay: publishToWakuRelay, + log: log.Named("message/v1"), + store: store, + publishToWakuRelay: publishToWakuRelay, + totalSubscribedTopics: new(uint64), } s.ctx, s.ctxCancel = context.WithCancel(context.Background()) @@ -155,6 +180,17 @@ func (s *Service) Subscribe(req *proto.SubscribeRequest, stream proto.MessageApi log.Debug("started") defer log.Debug("stopped") + if len(req.ContentTopics) > maxSubscribedTopicsPerConnection { + log.Info("subscribe request exceeded topics count threshold", zap.Int("topics_count", len(req.ContentTopics))) + return status.Errorf(codes.InvalidArgument, "content topic count exceeded maximum topics per subscribe threshold of %d", maxSubscribedTopicsPerConnection) + } + + // check the server limits + if len(req.ContentTopics)+int(atomic.LoadUint64(s.totalSubscribedTopics)) > maxConcurrentSubscribedTopics { + log.Info("subscribe request would exceeded concurrent topics count threshold", zap.Int("topics_count", len(req.ContentTopics))) + return status.Errorf(codes.InvalidArgument, "content topic count exceeded concurrent server maximum topics threshold of %d", maxConcurrentSubscribedTopics) + } + // Send a header (any header) to fix an issue with Tonic based GRPC clients. // See: https://github.com/xmtp/libxmtp/pull/58 _ = stream.SendHeader(metadata.Pairs("subscribed", "true")) @@ -190,9 +226,11 @@ func (s *Service) Subscribe(req *proto.SubscribeRequest, stream proto.MessageApi log.Error("error subscribing", zap.Error(err), zap.Int("topics", len(req.ContentTopics))) return err } + atomic.AddUint64(s.totalSubscribedTopics, 1) defer func() { _ = sub.Unsubscribe() metrics.EmitUnsubscribeTopics(stream.Context(), log, 1) + atomic.AddUint64(s.totalSubscribedTopics, ^uint64(0)) }() } @@ -245,6 +283,7 @@ func (s *Service) Subscribe2(stream proto.MessageApi_Subscribe2Server) error { subs := map[string]*nats.Subscription{} defer func() { + atomic.AddUint64(s.totalSubscribedTopics, ^uint64(len(subs)-1)) for _, sub := range subs { _ = sub.Unsubscribe() } @@ -263,41 +302,21 @@ func (s *Service) Subscribe2(stream proto.MessageApi_Subscribe2Server) error { if req == nil { continue } + + if len(req.ContentTopics) > maxSubscribedTopicsPerConnection { + log.Info("subscribe2 request exceeded topics count threshold", zap.Int("topics_count", len(req.ContentTopics))) + return status.Errorf(codes.InvalidArgument, "content topic count exceeded maximum topics per subscribe threshold of %d", maxSubscribedTopicsPerConnection) + } + log.Info("updating subscription", zap.Int("num_content_topics", len(req.ContentTopics))) topics := map[string]bool{} numSubscribes := 0 for _, topic := range req.ContentTopics { topics[topic] = true - - // If topic not in existing subscriptions, then subscribe. - if _, ok := subs[topic]; !ok { - sub, err := s.nc.Subscribe(buildNatsSubject(topic), func(msg *nats.Msg) { - var env proto.Envelope - err := pb.Unmarshal(msg.Data, &env) - if err != nil { - log.Info("unmarshaling envelope", zap.Error(err)) - return - } - func() { - streamLock.Lock() - defer streamLock.Unlock() - - err = stream.Send(&env) - if err != nil { - log.Error("sending envelope to subscriber", zap.Error(err)) - } - }() - }) - if err != nil { - log.Error("error subscribing", zap.Error(err), zap.Int("topics", len(req.ContentTopics))) - return err - } - subs[topic] = sub - numSubscribes++ - } } + // unsubscribe from all the topics we're no longer intereseted in. // If subscription not in topic, then unsubscribe. var numUnsubscribes int for topic, sub := range subs { @@ -308,6 +327,48 @@ func (s *Service) Subscribe2(stream proto.MessageApi_Subscribe2Server) error { delete(subs, topic) numUnsubscribes++ } + atomic.AddUint64(s.totalSubscribedTopics, ^uint64(numUnsubscribes-1)) + + // check the server limits + if int(atomic.LoadUint64(s.totalSubscribedTopics))+len(req.ContentTopics)-numUnsubscribes > maxConcurrentSubscribedTopics { + log.Info("subscribe2 request would exceeded concurrent topics count threshold", zap.Int("topics_count", len(req.ContentTopics))) + return status.Errorf(codes.InvalidArgument, "content topic count exceeded concurrent server maximum topics threshold of %d", maxConcurrentSubscribedTopics) + } + + // subscribe to the remainder ones. + for _, topic := range req.ContentTopics { + // If topic is already an existing subscription, we can end here. + if _, ok := subs[topic]; ok { + continue + } + + // The topic isn't an existing subscription + sub, err := s.nc.Subscribe(buildNatsSubject(topic), func(msg *nats.Msg) { + var env proto.Envelope + err := pb.Unmarshal(msg.Data, &env) + if err != nil { + log.Info("unmarshaling envelope", zap.Error(err)) + return + } + // the func call here ensures that we call streamUnlock in all the cases. + func() { + streamLock.Lock() + defer streamLock.Unlock() + + err = stream.Send(&env) + if err != nil { + log.Error("sending envelope to subscriber", zap.Error(err)) + } + }() + }) + if err != nil { + log.Error("error subscribing", zap.Error(err), zap.Int("topics", len(req.ContentTopics))) + return err + } + subs[topic] = sub + numSubscribes++ + } + atomic.AddUint64(s.totalSubscribedTopics, uint64(numSubscribes)) metrics.EmitSubscriptionChange(stream.Context(), log, numSubscribes-numUnsubscribes) } } @@ -334,6 +395,10 @@ func (s *Service) Query(ctx context.Context, req *proto.QueryRequest) (*proto.Qu } if len(req.ContentTopics) > 1 { + if len(req.ContentTopics) > maxTopicsPerRequest { + log.Info("query exceeded topics count threshold", zap.Int("topics_count", len(req.ContentTopics))) + return nil, status.Errorf(codes.InvalidArgument, "content topic count exceeded maximum topics per request threshold of %d", maxTopicsPerRequest) + } ri := apicontext.NewRequesterInfo(ctx) log.Info("query with multiple topics", ri.ZapFields()...) } else { @@ -366,13 +431,35 @@ func (s *Service) BatchQuery(ctx context.Context, req *proto.BatchQueryRequest) logFunc = log.Info } logFunc("large batch query", zap.Int("num_queries", len(req.Requests))) - // NOTE: in our implementation, we implicitly limit batch size to 50 requests - if len(req.Requests) > 50 { - return nil, status.Errorf(codes.InvalidArgument, "cannot exceed 50 requests in single batch") + + // NOTE: in our implementation, we implicitly limit batch size to 50 requests (maxRequestLimitPerBatch = 50) + if len(req.Requests) > maxRequestLimitPerBatch { + return nil, status.Errorf(codes.InvalidArgument, "cannot exceed %d requests in single batch", maxRequestLimitPerBatch) + } + + // calculate the total number of topics being requested in this batch request. + totalRequestedTopicsCount := 0 + for _, query := range req.Requests { + totalRequestedTopicsCount += len(query.ContentTopics) } + + if totalRequestedTopicsCount == 0 { + return nil, status.Errorf(codes.InvalidArgument, "content topics required") + } + + // are we still within limits ? + if totalRequestedTopicsCount > maxRequestLimitPerBatch { + log.Info("batch query exceeded topics count threshold", zap.Int("topics_count", totalRequestedTopicsCount)) + return nil, status.Errorf(codes.InvalidArgument, "batch content topics count exceeded maximum topics per batch threshold of %d", maxRequestLimitPerBatch) + } + // Naive implementation, perform all sub query requests sequentially responses := make([]*proto.QueryResponse, 0) for _, query := range req.Requests { + if len(query.ContentTopics) > maxTopicsPerRequest { + log.Info("query exceeded topics count threshold", zap.Int("topics_count", len(query.ContentTopics))) + return nil, status.Errorf(codes.InvalidArgument, "content topic count exceeded maximum topics per request threshold of %d", maxTopicsPerRequest) + } // We execute the query using the existing Query API resp, err := s.Query(ctx, query) if err != nil { From 782d2bd811a99441a330c3b4de40ff74564a2ff3 Mon Sep 17 00:00:00 2001 From: Tsachi Herman <24438559+tsachiherman@users.noreply.github.com> Date: Wed, 14 Feb 2024 16:07:54 -0500 Subject: [PATCH 02/23] update per feedback 1. --- pkg/api/message/v1/service.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/api/message/v1/service.go b/pkg/api/message/v1/service.go index 1e539888..8b21137c 100644 --- a/pkg/api/message/v1/service.go +++ b/pkg/api/message/v1/service.go @@ -37,8 +37,8 @@ const ( // 1048576 - 300 - 62 = 1048214 MaxMessageSize = pubsub.DefaultMaxMessageSize - MaxContentTopicNameSize - 62 - // maxRequestLimitPerBatch defines the maximum number of request we can support per batch. - maxRequestLimitPerBatch = 50 + // maxSubscribeRequestLimitPerBatch defines the maximum number of request we can support per batch. + maxSubscribeRequestLimitPerBatch = 50 // maxTopicsPerRequest defines the maximum number of topics that can be queried in a single request. maxTopicsPerRequest = 1024 @@ -432,9 +432,9 @@ func (s *Service) BatchQuery(ctx context.Context, req *proto.BatchQueryRequest) } logFunc("large batch query", zap.Int("num_queries", len(req.Requests))) - // NOTE: in our implementation, we implicitly limit batch size to 50 requests (maxRequestLimitPerBatch = 50) - if len(req.Requests) > maxRequestLimitPerBatch { - return nil, status.Errorf(codes.InvalidArgument, "cannot exceed %d requests in single batch", maxRequestLimitPerBatch) + // NOTE: in our implementation, we implicitly limit batch size to 50 requests (maxSubscribeRequestLimitPerBatch = 50) + if len(req.Requests) > maxSubscribeRequestLimitPerBatch { + return nil, status.Errorf(codes.InvalidArgument, "cannot exceed %d requests in single batch", maxSubscribeRequestLimitPerBatch) } // calculate the total number of topics being requested in this batch request. @@ -448,9 +448,9 @@ func (s *Service) BatchQuery(ctx context.Context, req *proto.BatchQueryRequest) } // are we still within limits ? - if totalRequestedTopicsCount > maxRequestLimitPerBatch { + if totalRequestedTopicsCount > maxTopicsPerBatch { log.Info("batch query exceeded topics count threshold", zap.Int("topics_count", totalRequestedTopicsCount)) - return nil, status.Errorf(codes.InvalidArgument, "batch content topics count exceeded maximum topics per batch threshold of %d", maxRequestLimitPerBatch) + return nil, status.Errorf(codes.InvalidArgument, "batch content topics count exceeded maximum topics per batch threshold of %d", maxTopicsPerBatch) } // Naive implementation, perform all sub query requests sequentially From 368f7e002c408464ce1f00ff480e7b0303390fc1 Mon Sep 17 00:00:00 2001 From: Tsachi Herman <24438559+tsachiherman@users.noreply.github.com> Date: Thu, 15 Feb 2024 12:49:11 -0500 Subject: [PATCH 03/23] update per peer review. --- pkg/api/message/v1/service.go | 206 +++++++++++------------------ pkg/api/message/v1/subscription.go | 141 ++++++++++++++++++++ 2 files changed, 217 insertions(+), 130 deletions(-) create mode 100644 pkg/api/message/v1/subscription.go diff --git a/pkg/api/message/v1/service.go b/pkg/api/message/v1/service.go index 8b21137c..91a2a79b 100644 --- a/pkg/api/message/v1/service.go +++ b/pkg/api/message/v1/service.go @@ -7,7 +7,6 @@ import ( "io" "strings" "sync" - "sync/atomic" "time" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -46,14 +45,6 @@ const ( // maxTopicsPerBatch defines the maximum number of topics that can be queried in a batch query. This // limit is imposed in additional to the per-query limit maxTopicsPerRequest. maxTopicsPerBatch = 4096 - - // maxConcurrentSubscribedTopics is the total number of subscribed topics currently being registered by our node. - // request to surpass that limit would be rejected, as the server would not be able to scale to that limit. - maxConcurrentSubscribedTopics = 1024 * 1024 - - // maxSubscribedTopicsPerConnection is the total number of subscribed topics currently being registered by a - // single subscriber ( rpc session ). A subscriber could bypass that limit by using multiple rpc sessions. - maxSubscribedTopicsPerConnection = 4096 ) type Service struct { @@ -73,18 +64,14 @@ type Service struct { ns *server.Server nc *nats.Conn - // totalSubscribedTopics is the total number of subscribed entries, across all the connections. - // this variable should be access via atomic calls only. It's a pointer variable to ensure - // that it would be 64-bit aligned on 32-bit platforms. - totalSubscribedTopics *uint64 + subDispatcher *subscriptionDispatcher } func NewService(log *zap.Logger, store *store.Store, publishToWakuRelay func(context.Context, *wakupb.WakuMessage) error) (s *Service, err error) { s = &Service{ - log: log.Named("message/v1"), - store: store, - publishToWakuRelay: publishToWakuRelay, - totalSubscribedTopics: new(uint64), + log: log.Named("message/v1"), + store: store, + publishToWakuRelay: publishToWakuRelay, } s.ctx, s.ctxCancel = context.WithCancel(context.Background()) @@ -95,6 +82,10 @@ func NewService(log *zap.Logger, store *store.Store, publishToWakuRelay func(con if err != nil { return nil, err } + s.subDispatcher, err = newSubscriptionDispatcher(s.nc, s.log) + if err != nil { + return nil, err + } go s.ns.Start() if !s.ns.ReadyForConnections(4 * time.Second) { return nil, errors.New("nats not ready") @@ -113,6 +104,7 @@ func (s *Service) Close() { if s.ctxCancel != nil { s.ctxCancel() } + s.subDispatcher.Shutdown() if s.nc != nil { s.nc.Close() @@ -180,69 +172,51 @@ func (s *Service) Subscribe(req *proto.SubscribeRequest, stream proto.MessageApi log.Debug("started") defer log.Debug("stopped") - if len(req.ContentTopics) > maxSubscribedTopicsPerConnection { - log.Info("subscribe request exceeded topics count threshold", zap.Int("topics_count", len(req.ContentTopics))) - return status.Errorf(codes.InvalidArgument, "content topic count exceeded maximum topics per subscribe threshold of %d", maxSubscribedTopicsPerConnection) - } - - // check the server limits - if len(req.ContentTopics)+int(atomic.LoadUint64(s.totalSubscribedTopics)) > maxConcurrentSubscribedTopics { - log.Info("subscribe request would exceeded concurrent topics count threshold", zap.Int("topics_count", len(req.ContentTopics))) - return status.Errorf(codes.InvalidArgument, "content topic count exceeded concurrent server maximum topics threshold of %d", maxConcurrentSubscribedTopics) - } - // Send a header (any header) to fix an issue with Tonic based GRPC clients. // See: https://github.com/xmtp/libxmtp/pull/58 _ = stream.SendHeader(metadata.Pairs("subscribed", "true")) metrics.EmitSubscribeTopics(stream.Context(), log, len(req.ContentTopics)) - var streamLock sync.Mutex + // create a topics map. + topics := make(map[string]bool, len(req.ContentTopics)) for _, topic := range req.ContentTopics { - subject := topic - if subject != natsWildcardTopic { - subject = buildNatsSubject(topic) + topics[topic] = true + } + sub := s.subDispatcher.Subscribe(topics) + defer func() { + if sub != nil { + sub.Unsubscribe() } - sub, err := s.nc.Subscribe(subject, func(msg *nats.Msg) { - var env proto.Envelope - err := pb.Unmarshal(msg.Data, &env) - if err != nil { - log.Error("parsing envelope from bytes", zap.Error(err)) - return - } - if topic == natsWildcardTopic && !isValidSubscribeAllTopic(env.ContentTopic) { - return + metrics.EmitUnsubscribeTopics(stream.Context(), log, 1) + }() + + var streamLock sync.Mutex + for exit := false; exit == false; { + select { + case msg, open := <-sub.messagesCh: + if open { + func() { + streamLock.Lock() + defer streamLock.Unlock() + err := stream.Send(msg) + if err != nil { + log.Error("sending envelope to subscribe", zap.Error(err)) + } + }() + } else { + // channel got closed; likely due to backpressure of the sending channel. + log.Debug("stream closed due to backpressure") + exit = true } - func() { - streamLock.Lock() - defer streamLock.Unlock() - err := stream.Send(&env) - if err != nil { - log.Error("sending envelope to subscribe", zap.Error(err)) - } - }() - }) - if err != nil { - log.Error("error subscribing", zap.Error(err), zap.Int("topics", len(req.ContentTopics))) - return err + case <-stream.Context().Done(): + log.Debug("stream closed") + exit = true + case <-s.ctx.Done(): + log.Info("service closed") + exit = true } - atomic.AddUint64(s.totalSubscribedTopics, 1) - defer func() { - _ = sub.Unsubscribe() - metrics.EmitUnsubscribeTopics(stream.Context(), log, 1) - atomic.AddUint64(s.totalSubscribedTopics, ^uint64(0)) - }() - } - - select { - case <-stream.Context().Done(): - log.Debug("stream closed") - break - case <-s.ctx.Done(): - log.Info("service closed") - break } - return nil } @@ -281,15 +255,16 @@ func (s *Service) Subscribe2(stream proto.MessageApi_Subscribe2Server) error { } }() - subs := map[string]*nats.Subscription{} + var streamLock sync.Mutex + subscribedTopicCount := 0 + var currentSubscription *subscription defer func() { - atomic.AddUint64(s.totalSubscribedTopics, ^uint64(len(subs)-1)) - for _, sub := range subs { - _ = sub.Unsubscribe() + if currentSubscription != nil { + currentSubscription.Unsubscribe() + metrics.EmitUnsubscribeTopics(stream.Context(), log, subscribedTopicCount) } - metrics.EmitUnsubscribeTopics(stream.Context(), log, len(subs)) }() - var streamLock sync.Mutex + subscriptionChannel := make(chan *proto.Envelope, 1) for { select { case <-stream.Context().Done(): @@ -303,73 +278,44 @@ func (s *Service) Subscribe2(stream proto.MessageApi_Subscribe2Server) error { continue } - if len(req.ContentTopics) > maxSubscribedTopicsPerConnection { - log.Info("subscribe2 request exceeded topics count threshold", zap.Int("topics_count", len(req.ContentTopics))) - return status.Errorf(codes.InvalidArgument, "content topic count exceeded maximum topics per subscribe threshold of %d", maxSubscribedTopicsPerConnection) + // unsubscribe first. + if currentSubscription != nil { + currentSubscription.Unsubscribe() + currentSubscription = nil } - log.Info("updating subscription", zap.Int("num_content_topics", len(req.ContentTopics))) topics := map[string]bool{} - numSubscribes := 0 for _, topic := range req.ContentTopics { topics[topic] = true } - // unsubscribe from all the topics we're no longer intereseted in. - // If subscription not in topic, then unsubscribe. - var numUnsubscribes int - for topic, sub := range subs { - if topics[topic] { - continue - } - _ = sub.Unsubscribe() - delete(subs, topic) - numUnsubscribes++ - } - atomic.AddUint64(s.totalSubscribedTopics, ^uint64(numUnsubscribes-1)) - - // check the server limits - if int(atomic.LoadUint64(s.totalSubscribedTopics))+len(req.ContentTopics)-numUnsubscribes > maxConcurrentSubscribedTopics { - log.Info("subscribe2 request would exceeded concurrent topics count threshold", zap.Int("topics_count", len(req.ContentTopics))) - return status.Errorf(codes.InvalidArgument, "content topic count exceeded concurrent server maximum topics threshold of %d", maxConcurrentSubscribedTopics) + nextSubscription := s.subDispatcher.Subscribe(topics) + if currentSubscription == nil { + // on the first time, emit subscription + metrics.EmitSubscribeTopics(stream.Context(), log, len(topics)) + } else { + // otherwise, emit the change. + metrics.EmitSubscriptionChange(stream.Context(), log, len(topics)-subscribedTopicCount) } - - // subscribe to the remainder ones. - for _, topic := range req.ContentTopics { - // If topic is already an existing subscription, we can end here. - if _, ok := subs[topic]; ok { - continue - } - - // The topic isn't an existing subscription - sub, err := s.nc.Subscribe(buildNatsSubject(topic), func(msg *nats.Msg) { - var env proto.Envelope - err := pb.Unmarshal(msg.Data, &env) + subscribedTopicCount = len(topics) + subscriptionChannel = nextSubscription.messagesCh + currentSubscription = nextSubscription + case msg, open := <-subscriptionChannel: + if open { + func() { + streamLock.Lock() + defer streamLock.Unlock() + err := stream.Send(msg) if err != nil { - log.Info("unmarshaling envelope", zap.Error(err)) - return + log.Error("sending envelope to subscribe", zap.Error(err)) } - // the func call here ensures that we call streamUnlock in all the cases. - func() { - streamLock.Lock() - defer streamLock.Unlock() - - err = stream.Send(&env) - if err != nil { - log.Error("sending envelope to subscriber", zap.Error(err)) - } - }() - }) - if err != nil { - log.Error("error subscribing", zap.Error(err), zap.Int("topics", len(req.ContentTopics))) - return err - } - subs[topic] = sub - numSubscribes++ + }() + } else { + // channel got closed; likely due to backpressure of the sending channel. + log.Debug("stream closed due to backpressure") + return nil } - atomic.AddUint64(s.totalSubscribedTopics, uint64(numSubscribes)) - metrics.EmitSubscriptionChange(stream.Context(), log, numSubscribes-numUnsubscribes) } } } diff --git a/pkg/api/message/v1/subscription.go b/pkg/api/message/v1/subscription.go new file mode 100644 index 00000000..a9839d7d --- /dev/null +++ b/pkg/api/message/v1/subscription.go @@ -0,0 +1,141 @@ +package api + +import ( + "sync" + + "github.com/nats-io/nats.go" // NATS messaging system + proto "github.com/xmtp/xmtp-node-go/pkg/proto/message_api/v1" // Custom XMTP Protocol Buffers definition + "go.uber.org/zap" // Logging library + pb "google.golang.org/protobuf/proto" // Protocol Buffers for serialization +) + +// allTopicsBacklogLength defines the buffer size for subscriptions that listen to all topics. +const allTopicsBacklogLength = 1024 + +// subscriptionDispatcher manages subscriptions and message dispatching. +type subscriptionDispatcher struct { + conn *nats.Conn // Connection to NATS server + subscription *nats.Subscription // Subscription to NATS topics + log *zap.Logger // Logger instance + subscriptions map[*subscription]interface{} // Active subscriptions + mu sync.Mutex // Mutex for concurrency control +} + +// newSubscriptionDispatcher creates a new dispatcher for managing subscriptions. +func newSubscriptionDispatcher(conn *nats.Conn, log *zap.Logger) (*subscriptionDispatcher, error) { + dispatcher := &subscriptionDispatcher{ + conn: conn, + log: log, + subscriptions: make(map[*subscription]interface{}), + } + + // Subscribe to NATS wildcard topic and assign message handler + var err error + dispatcher.subscription, err = conn.Subscribe(natsWildcardTopic, dispatcher.MessageHandler) + if err != nil { + return nil, err + } + return dispatcher, nil +} + +// Shutdown gracefully shuts down the dispatcher, unsubscribing from all topics. +func (d *subscriptionDispatcher) Shutdown() { + _ = d.subscription.Unsubscribe() + // the lock/unlock ensures that there is no in-process dispatching. + d.mu.Lock() + defer d.mu.Unlock() + d.subscription = nil + d.conn = nil + d.subscriptions = nil + +} + +// MessageHandler processes incoming messages, dispatching them to the correct subscription. +func (d *subscriptionDispatcher) MessageHandler(msg *nats.Msg) { + var env proto.Envelope + err := pb.Unmarshal(msg.Data, &env) + if err != nil { + d.log.Info("unmarshaling envelope", zap.Error(err)) + return + } + // ensure valid topic. + if !isValidSubscribeAllTopic(env.ContentTopic) { + return + } + + d.mu.Lock() + defer d.mu.Unlock() + for subscription := range d.subscriptions { + if subscription.all || subscription.topics[env.ContentTopic] { + select { + case subscription.messagesCh <- &env: + default: + // we got here since the message channel was full. This happens when the client cannot + // consume the data fast enough. In that case, we don't want to block further since it migth + // slow down other users. Instead, we're going to close the channel and let the + // consumer re-establish the connection if needed. + close(subscription.messagesCh) + delete(d.subscriptions, subscription) + } + continue + } + } +} + +// subscription represents a single subscription, including its message channel and topics. +type subscription struct { + messagesCh chan *proto.Envelope // Channel for receiving messages + topics map[string]bool // Map of topics to subscribe to + all bool // Flag indicating subscription to all topics + dispatcher *subscriptionDispatcher // Parent dispatcher +} + +// log2 calculates the base-2 logarithm of an integer using bitwise operations. +// It returns the floor of the actual base-2 logarithm. +func log2(n uint) (log2 uint) { + if n == 0 { + return 0 + } + + // Keep shifting n right until it becomes 0. + // The number of shifts needed is the floor of log2(n). + for n > 1 { + n >>= 1 + log2++ + } + return log2 +} + +// Subscribe creates a new subscription for the given topics. +func (d *subscriptionDispatcher) Subscribe(topics map[string]bool) *subscription { + sub := &subscription{ + dispatcher: d, + } + + // Determine if subscribing to all topics or specific ones + for topic := range topics { + if natsWildcardTopic == topic { + sub.all = true + break + } + } + if !sub.all { + sub.topics = topics + // use a log2(length) as a backbuffer + sub.messagesCh = make(chan *proto.Envelope, log2(uint(len(topics)))+1) + } else { + sub.messagesCh = make(chan *proto.Envelope, allTopicsBacklogLength) + } + + d.mu.Lock() + defer d.mu.Unlock() + d.subscriptions[sub] = true + return sub +} + +// Unsubscribe removes the subscription from its dispatcher. +func (sub *subscription) Unsubscribe() { + sub.dispatcher.mu.Lock() + defer sub.dispatcher.mu.Unlock() + delete(sub.dispatcher.subscriptions, sub) +} From 0e22ae9cfccb068bc08ed027228d8cb50576201a Mon Sep 17 00:00:00 2001 From: Tsachi Herman <24438559+tsachiherman@users.noreply.github.com> Date: Thu, 15 Feb 2024 15:33:28 -0500 Subject: [PATCH 04/23] update testing. --- pkg/api/server_test.go | 31 +++++++++++++++++++++++++++++++ pkg/api/setup_test.go | 8 ++++---- pkg/api/utils_test.go | 8 ++++---- pkg/testing/log.go | 2 +- pkg/testing/node.go | 4 ++-- pkg/testing/store.go | 4 ++-- 6 files changed, 44 insertions(+), 13 deletions(-) diff --git a/pkg/api/server_test.go b/pkg/api/server_test.go index 5273ce1e..e91ec21a 100644 --- a/pkg/api/server_test.go +++ b/pkg/api/server_test.go @@ -841,3 +841,34 @@ func requireErrorEqual(t *testing.T, err error, code codes.Code, msg string, det require.ElementsMatch(t, details, httpErr["details"]) } } + +func Benchmark_SubscribePublishQuery(b *testing.B) { + ctx := withAuth(b, context.Background()) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + server, cleanup := newTestServer(b) + defer cleanup() + + client, err := messageclient.NewGRPCClient(ctx, server.dialGRPC) + require.NoError(b, err) + + // start subscribe stream + stream, err := client.Subscribe(ctx, &messageV1.SubscribeRequest{ + ContentTopics: []string{"topic"}, + }) + require.NoError(b, err) + defer stream.Close() + time.Sleep(50 * time.Millisecond) + + // publish 10 messages + envs := makeEnvelopes(10) + publishRes, err := client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs}) + require.NoError(b, err) + require.NotNil(b, publishRes) + + // read subscription + subscribeExpect(b, stream, envs) + + // query for messages + requireEventuallyStored(b, ctx, client, envs) +} diff --git a/pkg/api/setup_test.go b/pkg/api/setup_test.go index 806f9c1d..1505a637 100644 --- a/pkg/api/setup_test.go +++ b/pkg/api/setup_test.go @@ -19,7 +19,7 @@ const ( testMaxMsgSize = 2 * 1024 * 1024 ) -func newTestServer(t *testing.T) (*Server, func()) { +func newTestServer(t testing.TB) (*Server, func()) { log := test.NewLog(t) waku, wakuCleanup := test.NewNode(t) store, storeCleanup := newTestStore(t, log) @@ -51,7 +51,7 @@ func newTestServer(t *testing.T) (*Server, func()) { } } -func newTestStore(t *testing.T, log *zap.Logger) (*store.Store, func()) { +func newTestStore(t testing.TB, log *zap.Logger) (*store.Store, func()) { db, _, dbCleanup := test.NewDB(t) store, err := store.New(&store.Config{ Log: log, @@ -109,7 +109,7 @@ func testGRPC(t *testing.T, ctx context.Context, f func(*testing.T, messageclien f(t, c, server) } -func withAuth(t *testing.T, ctx context.Context) context.Context { +func withAuth(t testing.TB, ctx context.Context) context.Context { ctx, _ = withAuthWithDetails(t, ctx, time.Now()) return ctx } @@ -139,7 +139,7 @@ func withMissingIdentityKey(t *testing.T, ctx context.Context) context.Context { return metadata.AppendToOutgoingContext(ctx, authorizationMetadataKey, "Bearer "+et) } -func withAuthWithDetails(t *testing.T, ctx context.Context, when time.Time) (context.Context, *v1.AuthData) { +func withAuthWithDetails(t testing.TB, ctx context.Context, when time.Time) (context.Context, *v1.AuthData) { token, data, err := generateV2AuthToken(when) require.NoError(t, err) et, err := EncodeAuthToken(token) diff --git a/pkg/api/utils_test.go b/pkg/api/utils_test.go index 3cc3f9ad..b8108fbf 100644 --- a/pkg/api/utils_test.go +++ b/pkg/api/utils_test.go @@ -24,7 +24,7 @@ func makeEnvelopes(count int) (envs []*messageV1.Envelope) { return envs } -func subscribeExpect(t *testing.T, stream messageclient.Stream, expected []*messageV1.Envelope) { +func subscribeExpect(t testing.TB, stream messageclient.Stream, expected []*messageV1.Envelope) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() received := []*messageV1.Envelope{} @@ -38,7 +38,7 @@ func subscribeExpect(t *testing.T, stream messageclient.Stream, expected []*mess requireEnvelopesEqual(t, expected, received) } -func requireEventuallyStored(t *testing.T, ctx context.Context, client messageclient.Client, expected []*messageV1.Envelope) { +func requireEventuallyStored(t testing.TB, ctx context.Context, client messageclient.Client, expected []*messageV1.Envelope) { var queryRes *messageV1.QueryResponse require.Eventually(t, func() bool { var err error @@ -54,14 +54,14 @@ func requireEventuallyStored(t *testing.T, ctx context.Context, client messagecl requireEnvelopesEqual(t, expected, queryRes.Envelopes) } -func requireEnvelopesEqual(t *testing.T, expected, received []*messageV1.Envelope) { +func requireEnvelopesEqual(t testing.TB, expected, received []*messageV1.Envelope) { require.Equal(t, len(expected), len(received), "length mismatch") for i, env := range received { requireEnvelopeEqual(t, expected[i], env, "mismatched message[%d]", i) } } -func requireEnvelopeEqual(t *testing.T, expected, actual *messageV1.Envelope, msgAndArgs ...interface{}) { +func requireEnvelopeEqual(t testing.TB, expected, actual *messageV1.Envelope, msgAndArgs ...interface{}) { require.Equal(t, expected.ContentTopic, actual.ContentTopic, msgAndArgs...) require.Equal(t, expected.Message, actual.Message, msgAndArgs...) if expected.TimestampNs != 0 { diff --git a/pkg/testing/log.go b/pkg/testing/log.go index ec885634..cbfd3fd0 100644 --- a/pkg/testing/log.go +++ b/pkg/testing/log.go @@ -14,7 +14,7 @@ func init() { flag.BoolVar(&debug, "debug", false, "debug level logging in tests") } -func NewLog(t *testing.T) *zap.Logger { +func NewLog(t testing.TB) *zap.Logger { cfg := zap.NewDevelopmentConfig() if !debug { cfg.Level = zap.NewAtomicLevelAt(zap.InfoLevel) diff --git a/pkg/testing/node.go b/pkg/testing/node.go index 68f425a0..1ad43f19 100644 --- a/pkg/testing/node.go +++ b/pkg/testing/node.go @@ -65,7 +65,7 @@ func Disconnect(t *testing.T, n1 *wakunode.WakuNode, n2 *wakunode.WakuNode) { }, 3*time.Second, 50*time.Millisecond) } -func NewNode(t *testing.T, opts ...wakunode.WakuNodeOption) (*wakunode.WakuNode, func()) { +func NewNode(t testing.TB, opts ...wakunode.WakuNodeOption) (*wakunode.WakuNode, func()) { hostAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0") prvKey := NewPrivateKey(t) ctx := context.Background() @@ -94,7 +94,7 @@ func NewPeer(t *testing.T) host.Host { return host } -func NewPrivateKey(t *testing.T) *ecdsa.PrivateKey { +func NewPrivateKey(t testing.TB) *ecdsa.PrivateKey { key, err := tests.RandomHex(32) require.NoError(t, err) prvKey, err := crypto.HexToECDSA(key) diff --git a/pkg/testing/store.go b/pkg/testing/store.go index 3727fe27..bb4c4aff 100644 --- a/pkg/testing/store.go +++ b/pkg/testing/store.go @@ -19,7 +19,7 @@ const ( localTestDBDSNSuffix = "?sslmode=disable" ) -func NewDB(t *testing.T) (*sql.DB, string, func()) { +func NewDB(t testing.TB) (*sql.DB, string, func()) { dsn := localTestDBDSNPrefix + localTestDBDSNSuffix ctlDB := sql.OpenDB(pgdriver.NewConnector(pgdriver.WithDSN(dsn))) dbName := "test_" + RandomStringLower(12) @@ -36,7 +36,7 @@ func NewDB(t *testing.T) (*sql.DB, string, func()) { } } -func NewAuthzDB(t *testing.T) (*bun.DB, string, func()) { +func NewAuthzDB(t testing.TB) (*bun.DB, string, func()) { db, dsn, cleanup := NewDB(t) bunDB := bun.NewDB(db, pgdialect.New()) From 8c3a2319817aed34c5482d173e00cd7953720215 Mon Sep 17 00:00:00 2001 From: Tsachi Herman <24438559+tsachiherman@users.noreply.github.com> Date: Fri, 16 Feb 2024 08:43:08 -0500 Subject: [PATCH 05/23] update --- pkg/api/message/v1/service.go | 9 +++-- pkg/api/server_test.go | 74 ++++++++++++++++++++++++++--------- pkg/api/setup_test.go | 12 ++++-- pkg/api/utils_test.go | 2 +- pkg/server/node_test.go | 8 ++-- pkg/server/server_test.go | 4 +- pkg/testing/node.go | 4 +- 7 files changed, 77 insertions(+), 36 deletions(-) diff --git a/pkg/api/message/v1/service.go b/pkg/api/message/v1/service.go index 91a2a79b..9f37fab2 100644 --- a/pkg/api/message/v1/service.go +++ b/pkg/api/message/v1/service.go @@ -82,19 +82,20 @@ func NewService(log *zap.Logger, store *store.Store, publishToWakuRelay func(con if err != nil { return nil, err } - s.subDispatcher, err = newSubscriptionDispatcher(s.nc, s.log) - if err != nil { - return nil, err - } go s.ns.Start() if !s.ns.ReadyForConnections(4 * time.Second) { return nil, errors.New("nats not ready") } + s.nc, err = nats.Connect(s.ns.ClientURL()) if err != nil { return nil, err } + s.subDispatcher, err = newSubscriptionDispatcher(s.nc, s.log) + if err != nil { + return nil, err + } return s, nil } diff --git a/pkg/api/server_test.go b/pkg/api/server_test.go index e91ec21a..3d7d524c 100644 --- a/pkg/api/server_test.go +++ b/pkg/api/server_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io" + "math/rand" "net/http" "strings" "testing" @@ -16,6 +17,7 @@ import ( messageV1 "github.com/xmtp/xmtp-node-go/pkg/proto/message_api/v1" "github.com/xmtp/xmtp-node-go/pkg/ratelimiter" test "github.com/xmtp/xmtp-node-go/pkg/testing" + "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -65,7 +67,7 @@ func Test_SubscribePublishQuery(t *testing.T) { testGRPCAndHTTP(t, ctx, func(t *testing.T, client messageclient.Client, _ *Server) { // start subscribe stream stream, err := client.Subscribe(ctx, &messageV1.SubscribeRequest{ - ContentTopics: []string{"topic"}, + ContentTopics: []string{"/xmtp/0/topic"}, }) require.NoError(t, err) defer stream.Close() @@ -843,32 +845,66 @@ func requireErrorEqual(t *testing.T, err error, code codes.Code, msg string, det } func Benchmark_SubscribePublishQuery(b *testing.B) { + server, cleanup := newTestServerWithLog(b, zap.NewNop()) + defer cleanup() + ctx := withAuth(b, context.Background()) ctx, cancel := context.WithCancel(ctx) defer cancel() - server, cleanup := newTestServer(b) - defer cleanup() client, err := messageclient.NewGRPCClient(ctx, server.dialGRPC) require.NoError(b, err) - // start subscribe stream - stream, err := client.Subscribe(ctx, &messageV1.SubscribeRequest{ - ContentTopics: []string{"topic"}, - }) - require.NoError(b, err) - defer stream.Close() - time.Sleep(50 * time.Millisecond) + // create topics large topics for 10 streams. Topic should be interleaved. + const chunkSize = 1000 + const streamsCount = 10 + topics := [streamsCount][]string{} + + maxTopic := (len(topics)-1)*chunkSize*3/4 + chunkSize + // create a random order of topics. + topicsOrder := rand.Perm(maxTopic) + envs := make([]*messageV1.Envelope, len(topicsOrder)) + for i, topicID := range topicsOrder { + envs[i] = &messageV1.Envelope{ + ContentTopic: fmt.Sprintf("/xmtp/0/topic/%d", topicID), + Message: []byte{1, 2, 3}, + TimestampNs: uint64(time.Second), + } + } - // publish 10 messages - envs := makeEnvelopes(10) - publishRes, err := client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs}) - require.NoError(b, err) - require.NotNil(b, publishRes) + for j := range topics { + topics[j] = make([]string, chunkSize) + for k := range topics[j] { + topics[j][k] = fmt.Sprintf("/xmtp/0/topic/%d", (j*chunkSize*3/4 + k)) + } + } - // read subscription - subscribeExpect(b, stream, envs) + streams := [10]messageclient.Stream{} + b.ResetTimer() + for i := range streams { + // start subscribe streams + var err error + streams[i], err = client.Subscribe(ctx, &messageV1.SubscribeRequest{ + ContentTopics: topics[i], + }) + require.NoError(b, err) + defer streams[i].Close() + } + + for n := 0; n < b.N; n++ { + // publish messages + publishRes, err := client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs}) + require.NoError(b, err) + require.NotNil(b, publishRes) - // query for messages - requireEventuallyStored(b, ctx, client, envs) + readCtx, readCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer readCancel() + // read subscription + for _, stream := range streams { + for k := 0; k < chunkSize; k++ { + _, err := stream.Next(readCtx) + require.NoError(b, err) + } + } + } } diff --git a/pkg/api/setup_test.go b/pkg/api/setup_test.go index 1505a637..3a234202 100644 --- a/pkg/api/setup_test.go +++ b/pkg/api/setup_test.go @@ -19,9 +19,8 @@ const ( testMaxMsgSize = 2 * 1024 * 1024 ) -func newTestServer(t testing.TB) (*Server, func()) { - log := test.NewLog(t) - waku, wakuCleanup := test.NewNode(t) +func newTestServerWithLog(t testing.TB, log *zap.Logger) (*Server, func()) { + waku, wakuCleanup := test.NewNode(t, log) store, storeCleanup := newTestStore(t, log) authzDB, _, authzDBCleanup := test.NewAuthzDB(t) allowLister := authz.NewDatabaseWalletAllowLister(authzDB, log) @@ -38,7 +37,7 @@ func newTestServer(t testing.TB) (*Server, func()) { MaxMsgSize: testMaxMsgSize, }, Waku: waku, - Log: test.NewLog(t), + Log: log, Store: store, AllowLister: allowLister, }) @@ -51,6 +50,11 @@ func newTestServer(t testing.TB) (*Server, func()) { } } +func newTestServer(t testing.TB) (*Server, func()) { + log := test.NewLog(t) + return newTestServerWithLog(t, log) +} + func newTestStore(t testing.TB, log *zap.Logger) (*store.Store, func()) { db, _, dbCleanup := test.NewDB(t) store, err := store.New(&store.Config{ diff --git a/pkg/api/utils_test.go b/pkg/api/utils_test.go index b8108fbf..2b427078 100644 --- a/pkg/api/utils_test.go +++ b/pkg/api/utils_test.go @@ -16,7 +16,7 @@ import ( func makeEnvelopes(count int) (envs []*messageV1.Envelope) { for i := 0; i < count; i++ { envs = append(envs, &messageV1.Envelope{ - ContentTopic: "topic", + ContentTopic: "/xmtp/0/topic", Message: []byte(fmt.Sprintf("msg %d", i)), TimestampNs: uint64(i * 1000000000), // i seconds }) diff --git a/pkg/server/node_test.go b/pkg/server/node_test.go index 223918c0..14259b2f 100644 --- a/pkg/server/node_test.go +++ b/pkg/server/node_test.go @@ -91,9 +91,9 @@ func TestNodes_Deployment(t *testing.T) { n2PrivKey := test.NewPrivateKey(t) // Spin up initial instances of the nodes. - n1, cleanup := test.NewNode(t, wakunode.WithPrivateKey(n1PrivKey)) + n1, cleanup := test.NewNode(t, test.NewLog(t), wakunode.WithPrivateKey(n1PrivKey)) defer cleanup() - n2, cleanup := test.NewNode(t, wakunode.WithPrivateKey(n2PrivKey)) + n2, cleanup := test.NewNode(t, test.NewLog(t), wakunode.WithPrivateKey(n2PrivKey)) defer cleanup() // Connect the nodes. @@ -101,9 +101,9 @@ func TestNodes_Deployment(t *testing.T) { test.Connect(t, n2, n1) // Spin up new instances of the nodes. - newN1, cleanup := test.NewNode(t, wakunode.WithPrivateKey(n1PrivKey)) + newN1, cleanup := test.NewNode(t, test.NewLog(t), wakunode.WithPrivateKey(n1PrivKey)) defer cleanup() - newN2, cleanup := test.NewNode(t, wakunode.WithPrivateKey(n2PrivKey)) + newN2, cleanup := test.NewNode(t, test.NewLog(t), wakunode.WithPrivateKey(n2PrivKey)) defer cleanup() // Expect matching peer IDs for new and old instances. diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 63a1d137..aed1368f 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -21,11 +21,11 @@ func TestServer_NewShutdown(t *testing.T) { func TestServer_StaticNodesReconnect(t *testing.T) { t.Parallel() - n1, cleanup := test.NewNode(t) + n1, cleanup := test.NewNode(t, test.NewLog(t)) defer cleanup() n1ID := n1.Host().ID() - n2, cleanup := test.NewNode(t) + n2, cleanup := test.NewNode(t, test.NewLog(t)) defer cleanup() n2ID := n2.Host().ID() diff --git a/pkg/testing/node.go b/pkg/testing/node.go index 1ad43f19..acdce878 100644 --- a/pkg/testing/node.go +++ b/pkg/testing/node.go @@ -18,6 +18,7 @@ import ( "github.com/waku-org/go-waku/tests" wakunode "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/peerstore" + "go.uber.org/zap" ) func Connect(t *testing.T, n1 *wakunode.WakuNode, n2 *wakunode.WakuNode, protocols ...protocol.ID) { @@ -65,11 +66,10 @@ func Disconnect(t *testing.T, n1 *wakunode.WakuNode, n2 *wakunode.WakuNode) { }, 3*time.Second, 50*time.Millisecond) } -func NewNode(t testing.TB, opts ...wakunode.WakuNodeOption) (*wakunode.WakuNode, func()) { +func NewNode(t testing.TB, log *zap.Logger, opts ...wakunode.WakuNodeOption) (*wakunode.WakuNode, func()) { hostAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0") prvKey := NewPrivateKey(t) ctx := context.Background() - log := NewLog(t) opts = append([]wakunode.WakuNodeOption{ wakunode.WithLogger(log), wakunode.WithPrivateKey(prvKey), From 11c24f6ce16a641d0a69900ced5da33c40bd91e2 Mon Sep 17 00:00:00 2001 From: Tsachi Herman <24438559+tsachiherman@users.noreply.github.com> Date: Fri, 16 Feb 2024 09:15:15 -0500 Subject: [PATCH 06/23] update --- pkg/api/message/v1/subscription.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/api/message/v1/subscription.go b/pkg/api/message/v1/subscription.go index a9839d7d..285309b3 100644 --- a/pkg/api/message/v1/subscription.go +++ b/pkg/api/message/v1/subscription.go @@ -58,14 +58,15 @@ func (d *subscriptionDispatcher) MessageHandler(msg *nats.Msg) { d.log.Info("unmarshaling envelope", zap.Error(err)) return } - // ensure valid topic. - if !isValidSubscribeAllTopic(env.ContentTopic) { - return - } + + xmtpTopic := isValidSubscribeAllTopic(env.ContentTopic) d.mu.Lock() defer d.mu.Unlock() for subscription := range d.subscriptions { + if subscription.all && !xmtpTopic { + continue + } if subscription.all || subscription.topics[env.ContentTopic] { select { case subscription.messagesCh <- &env: From 4908e719f67ba24f8a5af172dfbab69df95ea791 Mon Sep 17 00:00:00 2001 From: Tsachi Herman <24438559+tsachiherman@users.noreply.github.com> Date: Fri, 16 Feb 2024 11:51:56 -0500 Subject: [PATCH 07/23] rollback change. --- pkg/api/utils_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/api/utils_test.go b/pkg/api/utils_test.go index 2b427078..b8108fbf 100644 --- a/pkg/api/utils_test.go +++ b/pkg/api/utils_test.go @@ -16,7 +16,7 @@ import ( func makeEnvelopes(count int) (envs []*messageV1.Envelope) { for i := 0; i < count; i++ { envs = append(envs, &messageV1.Envelope{ - ContentTopic: "/xmtp/0/topic", + ContentTopic: "topic", Message: []byte(fmt.Sprintf("msg %d", i)), TimestampNs: uint64(i * 1000000000), // i seconds }) From 545d86dc0099c0432153b637d3a2692e0ad4b48c Mon Sep 17 00:00:00 2001 From: Tsachi Herman <24438559+tsachiherman@users.noreply.github.com> Date: Fri, 16 Feb 2024 11:57:28 -0500 Subject: [PATCH 08/23] additional rollback. --- pkg/api/server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/api/server_test.go b/pkg/api/server_test.go index 3d7d524c..013780ce 100644 --- a/pkg/api/server_test.go +++ b/pkg/api/server_test.go @@ -67,7 +67,7 @@ func Test_SubscribePublishQuery(t *testing.T) { testGRPCAndHTTP(t, ctx, func(t *testing.T, client messageclient.Client, _ *Server) { // start subscribe stream stream, err := client.Subscribe(ctx, &messageV1.SubscribeRequest{ - ContentTopics: []string{"/xmtp/0/topic"}, + ContentTopics: []string{"topic"}, }) require.NoError(t, err) defer stream.Close() From 03b03043323cf89d744862e0949a5e4208661bc5 Mon Sep 17 00:00:00 2001 From: Tsachi Herman <24438559+tsachiherman@users.noreply.github.com> Date: Fri, 16 Feb 2024 13:05:45 -0500 Subject: [PATCH 09/23] few small tweaks. --- pkg/api/message/v1/service.go | 5 ----- pkg/api/message/v1/subscription.go | 5 +++++ pkg/e2e/test_messagev1.go | 5 ++--- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/pkg/api/message/v1/service.go b/pkg/api/message/v1/service.go index 9f37fab2..052d31f1 100644 --- a/pkg/api/message/v1/service.go +++ b/pkg/api/message/v1/service.go @@ -5,7 +5,6 @@ import ( "fmt" "hash/fnv" "io" - "strings" "sync" "time" @@ -428,10 +427,6 @@ func buildEnvelope(msg *wakupb.WakuMessage) *proto.Envelope { } } -func isValidSubscribeAllTopic(topic string) bool { - return strings.HasPrefix(topic, validXMTPTopicPrefix) -} - func fromWakuTimestamp(ts int64) uint64 { if ts < 0 { return 0 diff --git a/pkg/api/message/v1/subscription.go b/pkg/api/message/v1/subscription.go index 285309b3..85c85a01 100644 --- a/pkg/api/message/v1/subscription.go +++ b/pkg/api/message/v1/subscription.go @@ -1,6 +1,7 @@ package api import ( + "strings" "sync" "github.com/nats-io/nats.go" // NATS messaging system @@ -140,3 +141,7 @@ func (sub *subscription) Unsubscribe() { defer sub.dispatcher.mu.Unlock() delete(sub.dispatcher.subscriptions, sub) } + +func isValidSubscribeAllTopic(topic string) bool { + return strings.HasPrefix(topic, validXMTPTopicPrefix) +} diff --git a/pkg/e2e/test_messagev1.go b/pkg/e2e/test_messagev1.go index 3fb71cc0..202d2f21 100644 --- a/pkg/e2e/test_messagev1.go +++ b/pkg/e2e/test_messagev1.go @@ -136,9 +136,8 @@ syncLoop: // Expect them to be relayed to each subscription. for i := 0; i < clientCount; i++ { - stream := streams[i] envC := make(chan *messagev1.Envelope, 100) - go func() { + go func(stream messageclient.Stream, envC chan *messagev1.Envelope) { for { env, err := stream.Next(ctx) if err != nil { @@ -153,7 +152,7 @@ syncLoop: } envC <- env } - }() + }(streams[i], envC) err = subscribeExpect(envC, envs) if err != nil { return err From 6799df3e4a3b3ef4f8fb00e31dc1d9883806d3a4 Mon Sep 17 00:00:00 2001 From: Tsachi Herman <24438559+tsachiherman@users.noreply.github.com> Date: Fri, 16 Feb 2024 13:27:57 -0500 Subject: [PATCH 10/23] update test --- pkg/api/server_test.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/pkg/api/server_test.go b/pkg/api/server_test.go index 013780ce..a6b61802 100644 --- a/pkg/api/server_test.go +++ b/pkg/api/server_test.go @@ -8,6 +8,7 @@ import ( "math/rand" "net/http" "strings" + "sync" "testing" "time" @@ -75,13 +76,22 @@ func Test_SubscribePublishQuery(t *testing.T) { // publish 10 messages envs := makeEnvelopes(10) - publishRes, err := client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs}) - require.NoError(t, err) - require.NotNil(t, publishRes) + var publishRes *messageV1.PublishResponse + var publishErr error + var publishComplete sync.WaitGroup + publishComplete.Add(1) + go func() { + defer publishComplete.Done() + publishRes, publishErr = client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs}) + }() // read subscription subscribeExpect(t, stream, envs) + publishComplete.Wait() + require.NoError(t, publishErr) + require.NotNil(t, publishRes) + // query for messages requireEventuallyStored(t, ctx, client, envs) }) From a5369794aa70f301be8a7fe42e05a4d5b13fcf99 Mon Sep 17 00:00:00 2001 From: Tsachi Herman <24438559+tsachiherman@users.noreply.github.com> Date: Fri, 16 Feb 2024 15:12:43 -0500 Subject: [PATCH 11/23] update tests --- pkg/api/server.go | 2 + pkg/api/server_test.go | 113 ++++++++++++++++++++--------------------- 2 files changed, 56 insertions(+), 59 deletions(-) diff --git a/pkg/api/server.go b/pkg/api/server.go index e4aaf3fe..7e438f6d 100644 --- a/pkg/api/server.go +++ b/pkg/api/server.go @@ -284,6 +284,7 @@ func (s *Server) Close() { if err != nil { s.Log.Error("closing http listener", zap.Error(err)) } + s.httpListener = nil } if s.grpcListener != nil { @@ -291,6 +292,7 @@ func (s *Server) Close() { if err != nil { s.Log.Error("closing grpc listener", zap.Error(err)) } + s.grpcListener = nil } s.wg.Wait() diff --git a/pkg/api/server_test.go b/pkg/api/server_test.go index a6b61802..60ba0827 100644 --- a/pkg/api/server_test.go +++ b/pkg/api/server_test.go @@ -63,6 +63,24 @@ func Test_HTTPRootPath(t *testing.T) { require.NotEmpty(t, body) } +type deferedPublishResponseFunc func(tb testing.TB) + +func publishTestEnvelopes(ctx context.Context, client messageclient.Client, msgs *messageV1.PublishRequest) deferedPublishResponseFunc { + var waitGroup sync.WaitGroup + waitGroup.Add(1) + var publishErr error + var publishRes *messageV1.PublishResponse + go func() { + defer waitGroup.Done() + publishRes, publishErr = client.Publish(ctx, msgs) + }() + return func(tb testing.TB) { + waitGroup.Wait() + require.NoError(tb, publishErr) + require.NotNil(tb, publishRes) + } +} + func Test_SubscribePublishQuery(t *testing.T) { ctx := withAuth(t, context.Background()) testGRPCAndHTTP(t, ctx, func(t *testing.T, client messageclient.Client, _ *Server) { @@ -76,22 +94,12 @@ func Test_SubscribePublishQuery(t *testing.T) { // publish 10 messages envs := makeEnvelopes(10) - var publishRes *messageV1.PublishResponse - var publishErr error - var publishComplete sync.WaitGroup - publishComplete.Add(1) - go func() { - defer publishComplete.Done() - publishRes, publishErr = client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs}) - }() + deferedPublishResult := publishTestEnvelopes(ctx, client, &messageV1.PublishRequest{Envelopes: envs}) + defer deferedPublishResult(t) // read subscription subscribeExpect(t, stream, envs) - publishComplete.Wait() - require.NoError(t, publishErr) - require.NotNil(t, publishRes) - // query for messages requireEventuallyStored(t, ctx, client, envs) }) @@ -259,9 +267,8 @@ func Test_GRPCMaxMessageSize(t *testing.T) { TimestampNs: 3, }, } - publishRes, err := client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs}) - require.NoError(t, err) - require.NotNil(t, publishRes) + deferedPublishResult := publishTestEnvelopes(ctx, client, &messageV1.PublishRequest{Envelopes: envs}) + defer deferedPublishResult(t) subscribeExpect(t, stream, envs) requireEventuallyStored(t, ctx, client, envs) @@ -336,9 +343,8 @@ func Test_SubscribeClientClose(t *testing.T) { // publish 5 messages envs := makeEnvelopes(10) - publishRes, err := client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs[:5]}) - require.NoError(t, err) - require.NotNil(t, publishRes) + deferedPublishResult := publishTestEnvelopes(ctx, client, &messageV1.PublishRequest{Envelopes: envs[:5]}) + defer deferedPublishResult(t) // receive 5 and close the stream subscribeExpect(t, stream, envs[:5]) @@ -346,9 +352,9 @@ func Test_SubscribeClientClose(t *testing.T) { require.NoError(t, err) // publish another 5 - publishRes, err = client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs[5:]}) - require.NoError(t, err) - require.NotNil(t, publishRes) + deferedPublishResult = publishTestEnvelopes(ctx, client, &messageV1.PublishRequest{Envelopes: envs[5:]}) + defer deferedPublishResult(t) + time.Sleep(50 * time.Millisecond) ctx, cancel := context.WithTimeout(ctx, 1*time.Second) @@ -371,9 +377,8 @@ func Test_Subscribe2ClientClose(t *testing.T) { // publish 5 messages envs := makeEnvelopes(10) - publishRes, err := client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs[:5]}) - require.NoError(t, err) - require.NotNil(t, publishRes) + deferedPublishResult := publishTestEnvelopes(ctx, client, &messageV1.PublishRequest{Envelopes: envs[:5]}) + defer deferedPublishResult(t) // receive 5 and close the stream subscribeExpect(t, stream, envs[:5]) @@ -381,9 +386,8 @@ func Test_Subscribe2ClientClose(t *testing.T) { require.NoError(t, err) // publish another 5 - publishRes, err = client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs[5:]}) - require.NoError(t, err) - require.NotNil(t, publishRes) + deferedPublishResult = publishTestEnvelopes(ctx, client, &messageV1.PublishRequest{Envelopes: envs[5:]}) + defer deferedPublishResult(t) time.Sleep(50 * time.Millisecond) ctx, cancel := context.WithTimeout(ctx, 1*time.Second) @@ -406,9 +410,9 @@ func Test_Subscribe2UpdateTopics(t *testing.T) { // publish 5 messages envs := makeEnvelopes(10) - publishRes, err := client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs[:5]}) - require.NoError(t, err) - require.NotNil(t, publishRes) + deferedPublishResult := publishTestEnvelopes(ctx, client, &messageV1.PublishRequest{Envelopes: envs[:5]}) + defer deferedPublishResult(t) + // receive 5 and close the stream subscribeExpect(t, stream, envs[:5]) @@ -455,9 +459,8 @@ func Test_SubscribeAllClientClose(t *testing.T) { for i, env := range envs { envs[i].ContentTopic = "/xmtp/0/" + env.ContentTopic } - publishRes, err := client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs[:5]}) - require.NoError(t, err) - require.NotNil(t, publishRes) + deferedPublishResult := publishTestEnvelopes(ctx, client, &messageV1.PublishRequest{Envelopes: envs[:5]}) + defer deferedPublishResult(t) // receive 5 and close the stream subscribeExpect(t, stream, envs[:5]) @@ -465,9 +468,8 @@ func Test_SubscribeAllClientClose(t *testing.T) { require.NoError(t, err) // publish another 5 - publishRes, err = client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs[5:]}) - require.NoError(t, err) - require.NotNil(t, publishRes) + deferedPublishResult = publishTestEnvelopes(ctx, client, &messageV1.PublishRequest{Envelopes: envs[5:]}) + defer deferedPublishResult(t) time.Sleep(50 * time.Millisecond) ctx, cancel := context.WithTimeout(ctx, 1*time.Second) @@ -490,12 +492,11 @@ func Test_SubscribeServerClose(t *testing.T) { // Publish 5 messages. envs := makeEnvelopes(5) - publishRes, err := client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs}) - require.NoError(t, err) - require.NotNil(t, publishRes) + deferedPublishResult := publishTestEnvelopes(ctx, client, &messageV1.PublishRequest{Envelopes: envs}) + defer deferedPublishResult(t) // Receive 5 - subscribeExpect(t, stream, envs[:5]) + subscribeExpect(t, stream, envs) // stop Server server.Close() @@ -521,9 +522,8 @@ func Test_SubscribeAllServerClose(t *testing.T) { for i, env := range envs { envs[i].ContentTopic = "/xmtp/0/" + env.ContentTopic } - publishRes, err := client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs}) - require.NoError(t, err) - require.NotNil(t, publishRes) + deferedPublishResult := publishTestEnvelopes(ctx, client, &messageV1.PublishRequest{Envelopes: envs}) + defer deferedPublishResult(t) // Receive 5 subscribeExpect(t, stream, envs[:5]) @@ -593,9 +593,8 @@ func Test_MultipleSubscriptions(t *testing.T) { // publish 5 envelopes envs := makeEnvelopes(10) - publishRes, err := client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs[:5]}) - require.NoError(t, err) - require.NotNil(t, publishRes) + deferedPublishResult := publishTestEnvelopes(ctx, client, &messageV1.PublishRequest{Envelopes: envs[:5]}) + defer deferedPublishResult(t) // receive 5 envelopes on both streams subscribeExpect(t, stream1, envs[:5]) @@ -612,9 +611,8 @@ func Test_MultipleSubscriptions(t *testing.T) { time.Sleep(50 * time.Millisecond) // publish another 5 envelopes - publishRes, err = client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs[5:]}) - require.NoError(t, err) - require.NotNil(t, publishRes) + deferedPublishResult = publishTestEnvelopes(ctx, client, &messageV1.PublishRequest{Envelopes: envs[5:]}) + defer deferedPublishResult(t) // receive 5 on stream 2 and 3 subscribeExpect(t, stream2, envs[5:]) @@ -627,9 +625,8 @@ func Test_QueryPaging(t *testing.T) { testGRPCAndHTTP(t, ctx, func(t *testing.T, client messageclient.Client, _ *Server) { // Store 10 envelopes with increasing SenderTimestamp envs := makeEnvelopes(10) - publishRes, err := client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs}) - require.NoError(t, err) - require.NotNil(t, publishRes) + deferedPublishResult := publishTestEnvelopes(ctx, client, &messageV1.PublishRequest{Envelopes: envs}) + defer deferedPublishResult(t) time.Sleep(50 * time.Millisecond) requireEventuallyStored(t, ctx, client, envs) @@ -676,9 +673,8 @@ func Test_BatchQuery(t *testing.T) { testGRPCAndHTTP(t, ctx, func(t *testing.T, client messageclient.Client, _ *Server) { // Store 10 envelopes with increasing SenderTimestamp envs := makeEnvelopes(10) - publishRes, err := client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs}) - require.NoError(t, err) - require.NotNil(t, publishRes) + deferedPublishResult := publishTestEnvelopes(ctx, client, &messageV1.PublishRequest{Envelopes: envs}) + defer deferedPublishResult(t) requireEventuallyStored(t, ctx, client, envs) batchSize := 50 @@ -726,9 +722,8 @@ func Test_BatchQueryOverLimitError(t *testing.T) { testGRPCAndHTTP(t, ctx, func(t *testing.T, client messageclient.Client, _ *Server) { // Store 10 envelopes with increasing SenderTimestamp envs := makeEnvelopes(10) - publishRes, err := client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs}) - require.NoError(t, err) - require.NotNil(t, publishRes) + deferedPublishResult := publishTestEnvelopes(ctx, client, &messageV1.PublishRequest{Envelopes: envs}) + defer deferedPublishResult(t) requireEventuallyStored(t, ctx, client, envs) // Limit is 50 queries implicitly so 100 should result in an error @@ -743,7 +738,7 @@ func Test_BatchQueryOverLimitError(t *testing.T) { } repeatedQueries = append(repeatedQueries, query) } - _, err = client.BatchQuery(ctx, &messageV1.BatchQueryRequest{ + _, err := client.BatchQuery(ctx, &messageV1.BatchQueryRequest{ Requests: repeatedQueries, }) grpcErr, ok := status.FromError(err) From 06b76ffcc67d2e9c39e9f6aed7b60b063af36ca0 Mon Sep 17 00:00:00 2001 From: Tsachi Herman <24438559+tsachiherman@users.noreply.github.com> Date: Fri, 16 Feb 2024 15:25:22 -0500 Subject: [PATCH 12/23] update --- pkg/e2e/test_messagev1.go | 43 +++++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/pkg/e2e/test_messagev1.go b/pkg/e2e/test_messagev1.go index 202d2f21..75d8760c 100644 --- a/pkg/e2e/test_messagev1.go +++ b/pkg/e2e/test_messagev1.go @@ -114,6 +114,29 @@ syncLoop: } } + // start listeners + streamsCh := make([]chan *messagev1.Envelope, len(clients)) + for i := 0; i < clientCount; i++ { + envC := make(chan *messagev1.Envelope, 100) + go func(stream messageclient.Stream, envC chan *messagev1.Envelope) { + for { + env, err := stream.Next(ctx) + if err != nil { + if isErrClosedConnection(err) || err.Error() == "context canceled" { + break + } + s.log.Error("getting next", zap.Error(err)) + break + } + if env == nil { + continue + } + envC <- env + } + }(streams[i], envC) + streamsCh[i] = envC + } + // Publish messages. envs := []*messagev1.Envelope{} for i, client := range clients { @@ -126,6 +149,7 @@ syncLoop: } } envs = append(envs, clientEnvs...) + _, err = client.Publish(ctx, &messagev1.PublishRequest{ Envelopes: clientEnvs, }) @@ -136,24 +160,7 @@ syncLoop: // Expect them to be relayed to each subscription. for i := 0; i < clientCount; i++ { - envC := make(chan *messagev1.Envelope, 100) - go func(stream messageclient.Stream, envC chan *messagev1.Envelope) { - for { - env, err := stream.Next(ctx) - if err != nil { - if isErrClosedConnection(err) || err.Error() == "context canceled" { - break - } - s.log.Error("getting next", zap.Error(err)) - break - } - if env == nil { - continue - } - envC <- env - } - }(streams[i], envC) - err = subscribeExpect(envC, envs) + err = subscribeExpect(streamsCh[i], envs) if err != nil { return err } From ff2eaed76c1aace3f94ed2e5bf351ad2535fd787 Mon Sep 17 00:00:00 2001 From: Tsachi Herman <24438559+tsachiherman@users.noreply.github.com> Date: Fri, 16 Feb 2024 15:57:15 -0500 Subject: [PATCH 13/23] add wait. --- pkg/e2e/test_messagev1.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/e2e/test_messagev1.go b/pkg/e2e/test_messagev1.go index 75d8760c..5a69f879 100644 --- a/pkg/e2e/test_messagev1.go +++ b/pkg/e2e/test_messagev1.go @@ -137,6 +137,9 @@ syncLoop: streamsCh[i] = envC } + // wait until all the listeners are up and ready. + time.Sleep(100 * time.Millisecond) + // Publish messages. envs := []*messagev1.Envelope{} for i, client := range clients { From 24bde2f56134cced10e0df23801b2b7aa05c49ab Mon Sep 17 00:00:00 2001 From: Tsachi Herman <24438559+tsachiherman@users.noreply.github.com> Date: Fri, 16 Feb 2024 16:01:29 -0500 Subject: [PATCH 14/23] add minBacklogLength --- pkg/api/message/v1/subscription.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/pkg/api/message/v1/subscription.go b/pkg/api/message/v1/subscription.go index 85c85a01..0654536c 100644 --- a/pkg/api/message/v1/subscription.go +++ b/pkg/api/message/v1/subscription.go @@ -10,8 +10,13 @@ import ( pb "google.golang.org/protobuf/proto" // Protocol Buffers for serialization ) -// allTopicsBacklogLength defines the buffer size for subscriptions that listen to all topics. -const allTopicsBacklogLength = 1024 +const ( + // allTopicsBacklogLength defines the buffer size for subscriptions that listen to all topics. + allTopicsBacklogLength = 1024 + + // minBacklogBufferLength defines the minimal length used for backlog buffer. + minBacklogBufferLength +) // subscriptionDispatcher manages subscriptions and message dispatching. type subscriptionDispatcher struct { @@ -124,7 +129,11 @@ func (d *subscriptionDispatcher) Subscribe(topics map[string]bool) *subscription if !sub.all { sub.topics = topics // use a log2(length) as a backbuffer - sub.messagesCh = make(chan *proto.Envelope, log2(uint(len(topics)))+1) + backlogBufferSize := log2(uint(len(topics))) + 1 + if backlogBufferSize < minBacklogBufferLength { + backlogBufferSize = minBacklogBufferLength + } + sub.messagesCh = make(chan *proto.Envelope, backlogBufferSize) } else { sub.messagesCh = make(chan *proto.Envelope, allTopicsBacklogLength) } From 3d5d07bb4d1c3a80bf2adb7a758a56ec62892d0f Mon Sep 17 00:00:00 2001 From: Tsachi Herman <24438559+tsachiherman@users.noreply.github.com> Date: Fri, 16 Feb 2024 19:26:55 -0500 Subject: [PATCH 15/23] update --- pkg/api/message/v1/service.go | 8 +++++-- pkg/api/server_test.go | 41 +++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/pkg/api/message/v1/service.go b/pkg/api/message/v1/service.go index 052d31f1..cab0e795 100644 --- a/pkg/api/message/v1/service.go +++ b/pkg/api/message/v1/service.go @@ -39,11 +39,15 @@ const ( maxSubscribeRequestLimitPerBatch = 50 // maxTopicsPerRequest defines the maximum number of topics that can be queried in a single request. - maxTopicsPerRequest = 1024 + // the number is likely to be more than we want it to be, but would be a safe place to put it - + // per Test_LargeQueryTesting, the request decoding already failing before it reaches th handler. + maxTopicsPerRequest = 157733 // maxTopicsPerBatch defines the maximum number of topics that can be queried in a batch query. This // limit is imposed in additional to the per-query limit maxTopicsPerRequest. - maxTopicsPerBatch = 4096 + // as a starting value, we've using the same value as above, since the entire request would be tossed + // away before this is reached. + maxTopicsPerBatch = maxTopicsPerRequest ) type Service struct { diff --git a/pkg/api/server_test.go b/pkg/api/server_test.go index 60ba0827..abbd8ee4 100644 --- a/pkg/api/server_test.go +++ b/pkg/api/server_test.go @@ -913,3 +913,44 @@ func Benchmark_SubscribePublishQuery(b *testing.B) { } } } + +func Test_LargeQueryTesting(t *testing.T) { + ctx := withAuth(t, context.Background()) + testGRPCAndHTTP(t, ctx, func(t *testing.T, client messageclient.Client, _ *Server) { + // Store 10 envelopes with increasing SenderTimestamp + envs := makeEnvelopes(10) + deferedPublishResult := publishTestEnvelopes(ctx, client, &messageV1.PublishRequest{Envelopes: envs}) + defer deferedPublishResult(t) + time.Sleep(50 * time.Millisecond) + requireEventuallyStored(t, ctx, client, envs) + + // create a large set of query topics. + topics := make([]string, 512*1024) + for i := range topics { + topics[i] = fmt.Sprintf("topic/%d", i) + } + size := 16 + step := 16 + prevSize := 16 + for { + query := &messageV1.QueryRequest{ + ContentTopics: topics[:size], + } + _, err := client.Query(ctx, query) + if err != nil { + // go back, and cut the step by half. + size = prevSize + step /= 2 + size += step + if step == 0 { + break + } + continue + } + prevSize = size + step *= 2 + size += step + } + t.Logf("max number of topics without any error was %d", size) + }) +} From e703fbd171acb339ace2b6570e028ddbb8ec30de Mon Sep 17 00:00:00 2001 From: Tsachi Herman <24438559+tsachiherman@users.noreply.github.com> Date: Fri, 16 Feb 2024 19:38:27 -0500 Subject: [PATCH 16/23] update --- pkg/api/message/v1/service.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/api/message/v1/service.go b/pkg/api/message/v1/service.go index cab0e795..53f08ebd 100644 --- a/pkg/api/message/v1/service.go +++ b/pkg/api/message/v1/service.go @@ -35,8 +35,8 @@ const ( // 1048576 - 300 - 62 = 1048214 MaxMessageSize = pubsub.DefaultMaxMessageSize - MaxContentTopicNameSize - 62 - // maxSubscribeRequestLimitPerBatch defines the maximum number of request we can support per batch. - maxSubscribeRequestLimitPerBatch = 50 + // maxQueriesPerBatch defines the maximum number of queries we can support per batch. + maxQueriesPerBatch = 50 // maxTopicsPerRequest defines the maximum number of topics that can be queried in a single request. // the number is likely to be more than we want it to be, but would be a safe place to put it - @@ -382,9 +382,9 @@ func (s *Service) BatchQuery(ctx context.Context, req *proto.BatchQueryRequest) } logFunc("large batch query", zap.Int("num_queries", len(req.Requests))) - // NOTE: in our implementation, we implicitly limit batch size to 50 requests (maxSubscribeRequestLimitPerBatch = 50) - if len(req.Requests) > maxSubscribeRequestLimitPerBatch { - return nil, status.Errorf(codes.InvalidArgument, "cannot exceed %d requests in single batch", maxSubscribeRequestLimitPerBatch) + // NOTE: in our implementation, we implicitly limit batch size to 50 requests (maxQueriesPerBatch = 50) + if len(req.Requests) > maxQueriesPerBatch { + return nil, status.Errorf(codes.InvalidArgument, "cannot exceed %d requests in single batch", maxQueriesPerBatch) } // calculate the total number of topics being requested in this batch request. From b129da482e7b72e3d56055afd48e5055f2c7cc84 Mon Sep 17 00:00:00 2001 From: Tsachi Herman <24438559+tsachiherman@users.noreply.github.com> Date: Tue, 20 Feb 2024 09:35:58 -0500 Subject: [PATCH 17/23] update per peer feedback --- pkg/api/message/v1/service.go | 23 ++++++++++------------- pkg/api/message/v1/subscription.go | 19 +++++++++---------- 2 files changed, 19 insertions(+), 23 deletions(-) diff --git a/pkg/api/message/v1/service.go b/pkg/api/message/v1/service.go index 53f08ebd..b09fa82d 100644 --- a/pkg/api/message/v1/service.go +++ b/pkg/api/message/v1/service.go @@ -38,16 +38,16 @@ const ( // maxQueriesPerBatch defines the maximum number of queries we can support per batch. maxQueriesPerBatch = 50 - // maxTopicsPerRequest defines the maximum number of topics that can be queried in a single request. + // maxTopicsPerQueryRequest defines the maximum number of topics that can be queried in a single request. // the number is likely to be more than we want it to be, but would be a safe place to put it - // per Test_LargeQueryTesting, the request decoding already failing before it reaches th handler. - maxTopicsPerRequest = 157733 + maxTopicsPerQueryRequest = 157733 - // maxTopicsPerBatch defines the maximum number of topics that can be queried in a batch query. This + // maxTopicsPerBatchQueryRequest defines the maximum number of topics that can be queried in a batch query. This // limit is imposed in additional to the per-query limit maxTopicsPerRequest. // as a starting value, we've using the same value as above, since the entire request would be tossed // away before this is reached. - maxTopicsPerBatch = maxTopicsPerRequest + maxTopicsPerBatchQueryRequest = maxTopicsPerQueryRequest ) type Service struct { @@ -345,9 +345,8 @@ func (s *Service) Query(ctx context.Context, req *proto.QueryRequest) (*proto.Qu } if len(req.ContentTopics) > 1 { - if len(req.ContentTopics) > maxTopicsPerRequest { - log.Info("query exceeded topics count threshold", zap.Int("topics_count", len(req.ContentTopics))) - return nil, status.Errorf(codes.InvalidArgument, "content topic count exceeded maximum topics per request threshold of %d", maxTopicsPerRequest) + if len(req.ContentTopics) > maxTopicsPerQueryRequest { + return nil, status.Errorf(codes.InvalidArgument, "the number of content topics(%d) exceed the maximum topics per query request (%d)", len(req.ContentTopics), maxTopicsPerQueryRequest) } ri := apicontext.NewRequesterInfo(ctx) log.Info("query with multiple topics", ri.ZapFields()...) @@ -398,17 +397,15 @@ func (s *Service) BatchQuery(ctx context.Context, req *proto.BatchQueryRequest) } // are we still within limits ? - if totalRequestedTopicsCount > maxTopicsPerBatch { - log.Info("batch query exceeded topics count threshold", zap.Int("topics_count", totalRequestedTopicsCount)) - return nil, status.Errorf(codes.InvalidArgument, "batch content topics count exceeded maximum topics per batch threshold of %d", maxTopicsPerBatch) + if totalRequestedTopicsCount > maxTopicsPerBatchQueryRequest { + return nil, status.Errorf(codes.InvalidArgument, "the total number of content topics(%d) exceed the maximum topics per batch query request(%d)", totalRequestedTopicsCount, maxTopicsPerBatchQueryRequest) } // Naive implementation, perform all sub query requests sequentially responses := make([]*proto.QueryResponse, 0) for _, query := range req.Requests { - if len(query.ContentTopics) > maxTopicsPerRequest { - log.Info("query exceeded topics count threshold", zap.Int("topics_count", len(query.ContentTopics))) - return nil, status.Errorf(codes.InvalidArgument, "content topic count exceeded maximum topics per request threshold of %d", maxTopicsPerRequest) + if len(query.ContentTopics) > maxTopicsPerQueryRequest { + return nil, status.Errorf(codes.InvalidArgument, "the number of content topics(%d) exceed the maximum topics per query request (%d)", len(query.ContentTopics), maxTopicsPerQueryRequest) } // We execute the query using the existing Query API resp, err := s.Query(ctx, query) diff --git a/pkg/api/message/v1/subscription.go b/pkg/api/message/v1/subscription.go index 0654536c..7fc276f1 100644 --- a/pkg/api/message/v1/subscription.go +++ b/pkg/api/message/v1/subscription.go @@ -20,8 +20,8 @@ const ( // subscriptionDispatcher manages subscriptions and message dispatching. type subscriptionDispatcher struct { - conn *nats.Conn // Connection to NATS server - subscription *nats.Subscription // Subscription to NATS topics + natsConn *nats.Conn // Connection to NATS server + natsSub *nats.Subscription // Subscription to NATS topics log *zap.Logger // Logger instance subscriptions map[*subscription]interface{} // Active subscriptions mu sync.Mutex // Mutex for concurrency control @@ -30,14 +30,14 @@ type subscriptionDispatcher struct { // newSubscriptionDispatcher creates a new dispatcher for managing subscriptions. func newSubscriptionDispatcher(conn *nats.Conn, log *zap.Logger) (*subscriptionDispatcher, error) { dispatcher := &subscriptionDispatcher{ - conn: conn, + natsConn: conn, log: log, subscriptions: make(map[*subscription]interface{}), } // Subscribe to NATS wildcard topic and assign message handler var err error - dispatcher.subscription, err = conn.Subscribe(natsWildcardTopic, dispatcher.MessageHandler) + dispatcher.natsSub, err = conn.Subscribe(natsWildcardTopic, dispatcher.messageHandler) if err != nil { return nil, err } @@ -46,18 +46,18 @@ func newSubscriptionDispatcher(conn *nats.Conn, log *zap.Logger) (*subscriptionD // Shutdown gracefully shuts down the dispatcher, unsubscribing from all topics. func (d *subscriptionDispatcher) Shutdown() { - _ = d.subscription.Unsubscribe() + _ = d.natsSub.Unsubscribe() // the lock/unlock ensures that there is no in-process dispatching. d.mu.Lock() defer d.mu.Unlock() - d.subscription = nil - d.conn = nil + d.natsSub = nil + d.natsConn = nil d.subscriptions = nil } -// MessageHandler processes incoming messages, dispatching them to the correct subscription. -func (d *subscriptionDispatcher) MessageHandler(msg *nats.Msg) { +// messageHandler processes incoming messages, dispatching them to the correct subscription. +func (d *subscriptionDispatcher) messageHandler(msg *nats.Msg) { var env proto.Envelope err := pb.Unmarshal(msg.Data, &env) if err != nil { @@ -84,7 +84,6 @@ func (d *subscriptionDispatcher) MessageHandler(msg *nats.Msg) { close(subscription.messagesCh) delete(d.subscriptions, subscription) } - continue } } } From 52391ba1da1b9caa13bbb5c6049bd800bab5151f Mon Sep 17 00:00:00 2001 From: Tsachi Herman <24438559+tsachiherman@users.noreply.github.com> Date: Tue, 20 Feb 2024 09:37:38 -0500 Subject: [PATCH 18/23] cherry pick Steven's fix --- dev/run | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/run b/dev/run index b108c144..a995db08 100755 --- a/dev/run +++ b/dev/run @@ -14,7 +14,7 @@ go run cmd/xmtpd/main.go \ --store.db-connection-string "${MESSAGE_DB_DSN}" \ --store.reader-db-connection-string "${MESSAGE_DB_DSN}" \ --store.metrics-period 5s \ - --mls-store.db-connection-string "${MESSAGE_DB_DSN}" \ + --mls-store.db-connection-string "${MLS_DB_DSN}" \ --authz-db-connection-string "${AUTHZ_DB_DSN}" \ --go-profiling \ "$@" From c7f317a45d91b45ba9be06ff90c12ef4fef5cfce Mon Sep 17 00:00:00 2001 From: Tsachi Herman <24438559+tsachiherman@users.noreply.github.com> Date: Tue, 20 Feb 2024 09:42:52 -0500 Subject: [PATCH 19/23] fix lint --- pkg/api/message/v1/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/api/message/v1/service.go b/pkg/api/message/v1/service.go index b09fa82d..b4292160 100644 --- a/pkg/api/message/v1/service.go +++ b/pkg/api/message/v1/service.go @@ -196,7 +196,7 @@ func (s *Service) Subscribe(req *proto.SubscribeRequest, stream proto.MessageApi }() var streamLock sync.Mutex - for exit := false; exit == false; { + for exit := false; !exit; { select { case msg, open := <-sub.messagesCh: if open { From 57b1ee0119b7b57279079575e5aa31e37f578fc8 Mon Sep 17 00:00:00 2001 From: Tsachi Herman <24438559+tsachiherman@users.noreply.github.com> Date: Tue, 20 Feb 2024 10:21:51 -0500 Subject: [PATCH 20/23] add docker hub deploy for branch. --- .github/workflows/push-subscribers.yml | 35 ++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 .github/workflows/push-subscribers.yml diff --git a/.github/workflows/push-subscribers.yml b/.github/workflows/push-subscribers.yml new file mode 100644 index 00000000..e31bc2c3 --- /dev/null +++ b/.github/workflows/push-subscribers.yml @@ -0,0 +1,35 @@ +name: Push Subscribers Container +on: + push: + branches: + # - mls + - tsachi/topics-count-limit +jobs: + deploy: + concurrency: main + runs-on: ubuntu-latest + steps: + - name: Set up Docker Buildx + id: buildx + uses: docker/setup-buildx-action@v1 + + - name: Login to Docker Hub + uses: docker/login-action@v1 + with: + username: xmtpeng + password: ${{ secrets.DOCKER_HUB_ACCESS_TOKEN }} + + - name: Git Checkout + uses: actions/checkout@v3 + + - uses: actions/setup-go@v3 + with: + go-version-file: go.mod + + - name: Push + id: push + run: | + export DOCKER_IMAGE_TAG=subscribers-dev + IMAGE_TO_DEPLOY=xmtp/node-go@$(dev/docker/build) + echo Successfully pushed $IMAGE_TO_DEPLOY + echo "docker_image=${IMAGE_TO_DEPLOY}" >> $GITHUB_OUTPUT From e0d9f5729004b0b2c2b7920534683b37083851a4 Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Wed, 21 Feb 2024 16:28:23 -0500 Subject: [PATCH 21/23] Deploy this branch to dev for testing --- .github/workflows/deploy.yml | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index ec6c4676..9d8aead7 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -2,11 +2,13 @@ name: Deploy Nodes on: push: branches: - - main + # - main + - tsachi/topics-count-limit jobs: deploy: concurrency: main runs-on: ubuntu-latest + timeout-minutes: 60 steps: - name: Set up Docker Buildx id: buildx @@ -28,14 +30,13 @@ jobs: - name: Push id: push run: | - export DOCKER_IMAGE_TAG=latest + export DOCKER_IMAGE_TAG=dev IMAGE_TO_DEPLOY=xmtp/node-go@$(dev/docker/build) echo Successfully pushed $IMAGE_TO_DEPLOY echo "docker_image=${IMAGE_TO_DEPLOY}" >> $GITHUB_OUTPUT - name: Deploy (dev) uses: xmtp-labs/terraform-deployer@v1 - timeout-minutes: 60 with: terraform-token: ${{ secrets.TERRAFORM_TOKEN }} terraform-org: xmtp @@ -44,13 +45,12 @@ jobs: variable-value: ${{ steps.push.outputs.docker_image }} variable-value-required-prefix: "xmtp/node-go@sha256:" - - name: Deploy (production) - uses: xmtp-labs/terraform-deployer@v1 - timeout-minutes: 60 - with: - terraform-token: ${{ secrets.TERRAFORM_TOKEN }} - terraform-org: xmtp - terraform-workspace: production - variable-name: xmtp_node_image - variable-value: ${{ steps.push.outputs.docker_image }} - variable-value-required-prefix: "xmtp/node-go@sha256:" + # - name: Deploy (production) + # uses: xmtp-labs/terraform-deployer@v1 + # with: + # terraform-token: ${{ secrets.TERRAFORM_TOKEN }} + # terraform-org: xmtp + # terraform-workspace: production + # variable-name: xmtp_node_image + # variable-value: ${{ steps.push.outputs.docker_image }} + # variable-value-required-prefix: "xmtp/node-go@sha256:" From f06531a449720f4c87b7a00fc428ece191da5ac8 Mon Sep 17 00:00:00 2001 From: Tsachi Herman <24438559+tsachiherman@users.noreply.github.com> Date: Thu, 22 Feb 2024 09:11:05 -0500 Subject: [PATCH 22/23] fix bug in subscribed topic count --- pkg/api/message/v1/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/api/message/v1/service.go b/pkg/api/message/v1/service.go index b4292160..0f18504f 100644 --- a/pkg/api/message/v1/service.go +++ b/pkg/api/message/v1/service.go @@ -192,7 +192,7 @@ func (s *Service) Subscribe(req *proto.SubscribeRequest, stream proto.MessageApi if sub != nil { sub.Unsubscribe() } - metrics.EmitUnsubscribeTopics(stream.Context(), log, 1) + metrics.EmitUnsubscribeTopics(stream.Context(), log, len(req.ContentTopics)) }() var streamLock sync.Mutex From aded2d716bca5950de9491817e8a262c3d3af82f Mon Sep 17 00:00:00 2001 From: Tsachi Herman <24438559+tsachiherman@users.noreply.github.com> Date: Thu, 22 Feb 2024 14:30:29 -0500 Subject: [PATCH 23/23] remove temp deploy workflows. --- .github/workflows/deploy.yml | 26 +++++++++---------- .github/workflows/push-subscribers.yml | 35 -------------------------- 2 files changed, 13 insertions(+), 48 deletions(-) delete mode 100644 .github/workflows/push-subscribers.yml diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 9d8aead7..ec6c4676 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -2,13 +2,11 @@ name: Deploy Nodes on: push: branches: - # - main - - tsachi/topics-count-limit + - main jobs: deploy: concurrency: main runs-on: ubuntu-latest - timeout-minutes: 60 steps: - name: Set up Docker Buildx id: buildx @@ -30,13 +28,14 @@ jobs: - name: Push id: push run: | - export DOCKER_IMAGE_TAG=dev + export DOCKER_IMAGE_TAG=latest IMAGE_TO_DEPLOY=xmtp/node-go@$(dev/docker/build) echo Successfully pushed $IMAGE_TO_DEPLOY echo "docker_image=${IMAGE_TO_DEPLOY}" >> $GITHUB_OUTPUT - name: Deploy (dev) uses: xmtp-labs/terraform-deployer@v1 + timeout-minutes: 60 with: terraform-token: ${{ secrets.TERRAFORM_TOKEN }} terraform-org: xmtp @@ -45,12 +44,13 @@ jobs: variable-value: ${{ steps.push.outputs.docker_image }} variable-value-required-prefix: "xmtp/node-go@sha256:" - # - name: Deploy (production) - # uses: xmtp-labs/terraform-deployer@v1 - # with: - # terraform-token: ${{ secrets.TERRAFORM_TOKEN }} - # terraform-org: xmtp - # terraform-workspace: production - # variable-name: xmtp_node_image - # variable-value: ${{ steps.push.outputs.docker_image }} - # variable-value-required-prefix: "xmtp/node-go@sha256:" + - name: Deploy (production) + uses: xmtp-labs/terraform-deployer@v1 + timeout-minutes: 60 + with: + terraform-token: ${{ secrets.TERRAFORM_TOKEN }} + terraform-org: xmtp + terraform-workspace: production + variable-name: xmtp_node_image + variable-value: ${{ steps.push.outputs.docker_image }} + variable-value-required-prefix: "xmtp/node-go@sha256:" diff --git a/.github/workflows/push-subscribers.yml b/.github/workflows/push-subscribers.yml deleted file mode 100644 index e31bc2c3..00000000 --- a/.github/workflows/push-subscribers.yml +++ /dev/null @@ -1,35 +0,0 @@ -name: Push Subscribers Container -on: - push: - branches: - # - mls - - tsachi/topics-count-limit -jobs: - deploy: - concurrency: main - runs-on: ubuntu-latest - steps: - - name: Set up Docker Buildx - id: buildx - uses: docker/setup-buildx-action@v1 - - - name: Login to Docker Hub - uses: docker/login-action@v1 - with: - username: xmtpeng - password: ${{ secrets.DOCKER_HUB_ACCESS_TOKEN }} - - - name: Git Checkout - uses: actions/checkout@v3 - - - uses: actions/setup-go@v3 - with: - go-version-file: go.mod - - - name: Push - id: push - run: | - export DOCKER_IMAGE_TAG=subscribers-dev - IMAGE_TO_DEPLOY=xmtp/node-go@$(dev/docker/build) - echo Successfully pushed $IMAGE_TO_DEPLOY - echo "docker_image=${IMAGE_TO_DEPLOY}" >> $GITHUB_OUTPUT