Skip to content

Commit

Permalink
Merge branch 'main' into tsachi/topics-count-limit
Browse files Browse the repository at this point in the history
  • Loading branch information
tsachiherman committed Feb 21, 2024
2 parents 57b1ee0 + f91a285 commit 9b33511
Show file tree
Hide file tree
Showing 7 changed files with 448 additions and 29 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ jobs:
- name: Deploy (dev)
uses: xmtp-labs/terraform-deployer@v1
timeout-minutes: 60
with:
terraform-token: ${{ secrets.TERRAFORM_TOKEN }}
terraform-org: xmtp
Expand All @@ -45,6 +46,7 @@ jobs:

- name: Deploy (production)
uses: xmtp-labs/terraform-deployer@v1
timeout-minutes: 60
with:
terraform-token: ${{ secrets.TERRAFORM_TOKEN }}
terraform-org: xmtp
Expand Down
1 change: 0 additions & 1 deletion pkg/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ func (s *Server) startGRPC() error {
wakuMsg := wakuEnv.Message()

if topic.IsMLSV1(wakuMsg.ContentTopic) {
s.Log.Info("handling waku relay message in mlsv1 service", zap.String("topic", wakuMsg.ContentTopic))
if s.mlsv1 != nil {
err := s.mlsv1.HandleIncomingWakuRelayMessage(wakuEnv.Message())
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ func registerCollectors(reg prometheus.Registerer) {
queryResultLength,
ratelimiterBuckets,
ratelimiterBucketsDeleted,
mlsSentGroupMessageSize,
mlsSentGroupMessageCount,
mlsSentWelcomeMessageSize,
mlsSentWelcomeMessageCount,
}
for _, col := range cols {
reg.MustRegister(col)
Expand Down
55 changes: 55 additions & 0 deletions pkg/metrics/mls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package metrics

import (
"context"

"github.com/prometheus/client_golang/prometheus"
mlsstore "github.com/xmtp/xmtp-node-go/pkg/mls/store"
"go.uber.org/zap"
)

var mlsSentGroupMessageSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "mls_sent_group_message_size",
Help: "Size of a sent group message in bytes",
Buckets: []float64{100, 1000, 10000, 100000},
},
appClientVersionTagKeys,
)

var mlsSentGroupMessageCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "mls_sent_group_messages",
Help: "Count of sent group messages",
},
appClientVersionTagKeys,
)

func EmitMLSSentGroupMessage(ctx context.Context, log *zap.Logger, msg *mlsstore.GroupMessage) {
labels := contextLabels(ctx)
mlsSentGroupMessageSize.With(labels).Observe(float64(len(msg.Data)))
mlsSentGroupMessageCount.With(labels).Inc()
}

var mlsSentWelcomeMessageSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "mls_sent_welcome_message_size",
Help: "Size of a sent welcome message in bytes",
Buckets: []float64{100, 1000, 10000, 100000},
},
appClientVersionTagKeys,
)

var mlsSentWelcomeMessageCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "mls_sent_welcome_messages",
Help: "Count of sent welcome messages",
},
appClientVersionTagKeys,
)

func EmitMLSSentWelcomeMessage(ctx context.Context, log *zap.Logger, msg *mlsstore.WelcomeMessage) {
labels := contextLabels(ctx)
mlsSentWelcomeMessageSize.With(labels).Observe(float64(len(msg.Data)))
mlsSentWelcomeMessageCount.With(labels).Inc()
}
154 changes: 132 additions & 22 deletions pkg/mls/api/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
wakupb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/xmtp/xmtp-node-go/pkg/metrics"
mlsstore "github.com/xmtp/xmtp-node-go/pkg/mls/store"
"github.com/xmtp/xmtp-node-go/pkg/mlsvalidate"
mlsv1 "github.com/xmtp/xmtp-node-go/pkg/proto/mls/api/v1"
Expand Down Expand Up @@ -299,12 +300,15 @@ func (s *Service) SendGroupMessages(ctx context.Context, req *mlsv1.SendGroupMes
return nil, err
}
log.Info("published to waku relay", zap.String("contentTopic", contentTopic))

metrics.EmitMLSSentGroupMessage(ctx, log, msg)
}

return &emptypb.Empty{}, nil
}

func (s *Service) SendWelcomeMessages(ctx context.Context, req *mlsv1.SendWelcomeMessagesRequest) (res *emptypb.Empty, err error) {
log := s.log.Named("send-welcome-messages")
if err = validateSendWelcomeMessagesRequest(req); err != nil {
return nil, err
}
Expand Down Expand Up @@ -342,6 +346,8 @@ func (s *Service) SendWelcomeMessages(ctx context.Context, req *mlsv1.SendWelcom
if err != nil {
return nil, err
}

metrics.EmitMLSSentWelcomeMessage(ctx, log, msg)
}
return &emptypb.Empty{}, nil
}
Expand All @@ -357,31 +363,45 @@ func (s *Service) QueryWelcomeMessages(ctx context.Context, req *mlsv1.QueryWelc
func (s *Service) SubscribeGroupMessages(req *mlsv1.SubscribeGroupMessagesRequest, stream mlsv1.MlsApi_SubscribeGroupMessagesServer) error {
log := s.log.Named("subscribe-group-messages").With(zap.Int("filters", len(req.Filters)))
log.Info("subscription started")
defer log.Info("subscription ended")
// Send a header (any header) to fix an issue with Tonic based GRPC clients.
// See: https://github.com/xmtp/libxmtp/pull/58
_ = stream.SendHeader(metadata.Pairs("subscribed", "true"))

var streamLock sync.Mutex
streamed := map[string]*mlsv1.GroupMessage{}
var streamingLock sync.Mutex
streamMessages := func(msgs []*mlsv1.GroupMessage) {
streamingLock.Lock()
defer streamingLock.Unlock()

for _, msg := range msgs {
if msg.GetV1() == nil {
continue
}
encodedId := fmt.Sprintf("%x", msg.GetV1().Id)
if _, ok := streamed[encodedId]; ok {
log.Debug("skipping already streamed message", zap.String("id", encodedId))
continue
}
err := stream.Send(msg)
if err != nil {
log.Error("error streaming group message", zap.Error(err))
}
streamed[encodedId] = msg
}
}

for _, filter := range req.Filters {
filter := filter

natsSubject := buildNatsSubjectForGroupMessages(filter.GroupId)
log.Info("subscribing to nats subject", zap.String("subject", natsSubject), zap.String("group_id", hex.EncodeToString(filter.GroupId)))
sub, err := s.nc.Subscribe(natsSubject, func(natsMsg *nats.Msg) {
log.Info("received message from nats")
var msg mlsv1.GroupMessage
err := pb.Unmarshal(natsMsg.Data, &msg)
if err != nil {
log.Error("parsing group message from bytes", zap.Error(err))
return
}
func() {
streamLock.Lock()
defer streamLock.Unlock()
err := stream.Send(&msg)
if err != nil {
log.Error("sending group message to subscribe", zap.Error(err))
}
}()
streamMessages([]*mlsv1.GroupMessage{&msg})
})
if err != nil {
log.Error("error subscribing to group messages", zap.Error(err))
Expand All @@ -390,6 +410,43 @@ func (s *Service) SubscribeGroupMessages(req *mlsv1.SubscribeGroupMessagesReques
defer func() {
_ = sub.Unsubscribe()
}()

if filter.IdCursor > 0 {
go func() {
pagingInfo := &mlsv1.PagingInfo{
IdCursor: filter.IdCursor,
Direction: mlsv1.SortDirection_SORT_DIRECTION_ASCENDING,
}
for {
select {
case <-stream.Context().Done():
return
case <-s.ctx.Done():
return
default:
}

resp, err := s.store.QueryGroupMessagesV1(stream.Context(), &mlsv1.QueryGroupMessagesRequest{
GroupId: filter.GroupId,
PagingInfo: pagingInfo,
})
if err != nil {
if err == context.Canceled {
return
}
log.Error("error querying for subscription cursor messages", zap.Error(err))
return
}

streamMessages(resp.Messages)

if len(resp.Messages) == 0 || resp.PagingInfo == nil || resp.PagingInfo.IdCursor == 0 {
break
}
pagingInfo = resp.PagingInfo
}
}()
}
}

select {
Expand All @@ -408,25 +465,41 @@ func (s *Service) SubscribeWelcomeMessages(req *mlsv1.SubscribeWelcomeMessagesRe
// See: https://github.com/xmtp/libxmtp/pull/58
_ = stream.SendHeader(metadata.Pairs("subscribed", "true"))

var streamLock sync.Mutex
streamed := map[string]*mlsv1.WelcomeMessage{}
var streamingLock sync.Mutex
streamMessages := func(msgs []*mlsv1.WelcomeMessage) {
streamingLock.Lock()
defer streamingLock.Unlock()

for _, msg := range msgs {
if msg.GetV1() == nil {
continue
}
encodedId := fmt.Sprintf("%x", msg.GetV1().Id)
if _, ok := streamed[encodedId]; ok {
log.Debug("skipping already streamed message", zap.String("id", encodedId))
continue
}
err := stream.Send(msg)
if err != nil {
log.Error("error streaming welcome message", zap.Error(err))
}
streamed[encodedId] = msg
}
}

for _, filter := range req.Filters {
filter := filter

natsSubject := buildNatsSubjectForWelcomeMessages(filter.InstallationKey)
sub, err := s.nc.Subscribe(natsSubject, func(natsMsg *nats.Msg) {
log.Info("received message from nats")
var msg mlsv1.WelcomeMessage
err := pb.Unmarshal(natsMsg.Data, &msg)
if err != nil {
log.Error("parsing welcome message from bytes", zap.Error(err))
return
}
func() {
streamLock.Lock()
defer streamLock.Unlock()
err := stream.Send(&msg)
if err != nil {
log.Error("sending welcome message to subscribe", zap.Error(err))
}
}()
streamMessages([]*mlsv1.WelcomeMessage{&msg})
})
if err != nil {
log.Error("error subscribing to welcome messages", zap.Error(err))
Expand All @@ -435,6 +508,43 @@ func (s *Service) SubscribeWelcomeMessages(req *mlsv1.SubscribeWelcomeMessagesRe
defer func() {
_ = sub.Unsubscribe()
}()

if filter.IdCursor > 0 {
go func() {
pagingInfo := &mlsv1.PagingInfo{
IdCursor: filter.IdCursor,
Direction: mlsv1.SortDirection_SORT_DIRECTION_ASCENDING,
}
for {
select {
case <-stream.Context().Done():
return
case <-s.ctx.Done():
return
default:
}

resp, err := s.store.QueryWelcomeMessagesV1(stream.Context(), &mlsv1.QueryWelcomeMessagesRequest{
InstallationKey: filter.InstallationKey,
PagingInfo: pagingInfo,
})
if err != nil {
if err == context.Canceled {
return
}
log.Error("error querying for subscription cursor messages", zap.Error(err))
return
}

streamMessages(resp.Messages)

if len(resp.Messages) == 0 || resp.PagingInfo == nil || resp.PagingInfo.IdCursor == 0 {
break
}
pagingInfo = resp.PagingInfo
}
}()
}
}

select {
Expand Down
Loading

0 comments on commit 9b33511

Please sign in to comment.