From fad7a173c9bdfba33c7dce4c66436460efeaf26e Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Wed, 21 Feb 2024 10:28:04 -0500 Subject: [PATCH 1/4] Emit metrics for MLS sent messages (#355) * feat: emit metrics for mls sent messages * Fix lint errors :cop: --- dev/run | 2 +- pkg/metrics/metrics.go | 4 +++ pkg/metrics/mls.go | 55 +++++++++++++++++++++++++++++++++++++++ pkg/mls/api/v1/service.go | 6 +++++ 4 files changed, 66 insertions(+), 1 deletion(-) create mode 100644 pkg/metrics/mls.go diff --git a/dev/run b/dev/run index b108c144..a995db08 100755 --- a/dev/run +++ b/dev/run @@ -14,7 +14,7 @@ go run cmd/xmtpd/main.go \ --store.db-connection-string "${MESSAGE_DB_DSN}" \ --store.reader-db-connection-string "${MESSAGE_DB_DSN}" \ --store.metrics-period 5s \ - --mls-store.db-connection-string "${MESSAGE_DB_DSN}" \ + --mls-store.db-connection-string "${MLS_DB_DSN}" \ --authz-db-connection-string "${AUTHZ_DB_DSN}" \ --go-profiling \ "$@" diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 3b0cd0e8..e7b2771a 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -66,6 +66,10 @@ func registerCollectors(reg prometheus.Registerer) { queryResultLength, ratelimiterBuckets, ratelimiterBucketsDeleted, + mlsSentGroupMessageSize, + mlsSentGroupMessageCount, + mlsSentWelcomeMessageSize, + mlsSentWelcomeMessageCount, } for _, col := range cols { reg.MustRegister(col) diff --git a/pkg/metrics/mls.go b/pkg/metrics/mls.go new file mode 100644 index 00000000..82acbbf2 --- /dev/null +++ b/pkg/metrics/mls.go @@ -0,0 +1,55 @@ +package metrics + +import ( + "context" + + "github.com/prometheus/client_golang/prometheus" + mlsstore "github.com/xmtp/xmtp-node-go/pkg/mls/store" + "go.uber.org/zap" +) + +var mlsSentGroupMessageSize = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "mls_sent_group_message_size", + Help: "Size of a sent group message in bytes", + Buckets: []float64{100, 1000, 10000, 100000}, + }, + appClientVersionTagKeys, +) + +var mlsSentGroupMessageCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "mls_sent_group_messages", + Help: "Count of sent group messages", + }, + appClientVersionTagKeys, +) + +func EmitMLSSentGroupMessage(ctx context.Context, log *zap.Logger, msg *mlsstore.GroupMessage) { + labels := contextLabels(ctx) + mlsSentGroupMessageSize.With(labels).Observe(float64(len(msg.Data))) + mlsSentGroupMessageCount.With(labels).Inc() +} + +var mlsSentWelcomeMessageSize = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "mls_sent_welcome_message_size", + Help: "Size of a sent welcome message in bytes", + Buckets: []float64{100, 1000, 10000, 100000}, + }, + appClientVersionTagKeys, +) + +var mlsSentWelcomeMessageCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "mls_sent_welcome_messages", + Help: "Count of sent welcome messages", + }, + appClientVersionTagKeys, +) + +func EmitMLSSentWelcomeMessage(ctx context.Context, log *zap.Logger, msg *mlsstore.WelcomeMessage) { + labels := contextLabels(ctx) + mlsSentWelcomeMessageSize.With(labels).Observe(float64(len(msg.Data))) + mlsSentWelcomeMessageCount.With(labels).Inc() +} diff --git a/pkg/mls/api/v1/service.go b/pkg/mls/api/v1/service.go index e0648371..81ec3b23 100644 --- a/pkg/mls/api/v1/service.go +++ b/pkg/mls/api/v1/service.go @@ -12,6 +12,7 @@ import ( "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" wakupb "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/xmtp/xmtp-node-go/pkg/metrics" mlsstore "github.com/xmtp/xmtp-node-go/pkg/mls/store" "github.com/xmtp/xmtp-node-go/pkg/mlsvalidate" mlsv1 "github.com/xmtp/xmtp-node-go/pkg/proto/mls/api/v1" @@ -299,12 +300,15 @@ func (s *Service) SendGroupMessages(ctx context.Context, req *mlsv1.SendGroupMes return nil, err } log.Info("published to waku relay", zap.String("contentTopic", contentTopic)) + + metrics.EmitMLSSentGroupMessage(ctx, log, msg) } return &emptypb.Empty{}, nil } func (s *Service) SendWelcomeMessages(ctx context.Context, req *mlsv1.SendWelcomeMessagesRequest) (res *emptypb.Empty, err error) { + log := s.log.Named("send-welcome-messages") if err = validateSendWelcomeMessagesRequest(req); err != nil { return nil, err } @@ -342,6 +346,8 @@ func (s *Service) SendWelcomeMessages(ctx context.Context, req *mlsv1.SendWelcom if err != nil { return nil, err } + + metrics.EmitMLSSentWelcomeMessage(ctx, log, msg) } return &emptypb.Empty{}, nil } From dcd9bebc1cb473a13a593bab272226694dcdb39d Mon Sep 17 00:00:00 2001 From: Nicholas Molnar <65710+neekolas@users.noreply.github.com> Date: Wed, 21 Feb 2024 09:01:51 -0800 Subject: [PATCH 2/4] Allow MLS topics in subscribeall (#356) * Allow MLS topics in subscribeall * Remove noisy log --- pkg/api/message/v1/service.go | 4 ++-- pkg/api/server.go | 1 - pkg/mls/api/v1/service.go | 4 ---- 3 files changed, 2 insertions(+), 7 deletions(-) diff --git a/pkg/api/message/v1/service.go b/pkg/api/message/v1/service.go index 0b9d81fd..0f92ae3f 100644 --- a/pkg/api/message/v1/service.go +++ b/pkg/api/message/v1/service.go @@ -394,8 +394,8 @@ func buildEnvelope(msg *wakupb.WakuMessage) *proto.Envelope { } } -func isValidSubscribeAllTopic(topic string) bool { - return strings.HasPrefix(topic, validXMTPTopicPrefix) +func isValidSubscribeAllTopic(contentTopic string) bool { + return strings.HasPrefix(contentTopic, validXMTPTopicPrefix) || topic.IsMLSV1(contentTopic) } func fromWakuTimestamp(ts int64) uint64 { diff --git a/pkg/api/server.go b/pkg/api/server.go index e4aaf3fe..267804a2 100644 --- a/pkg/api/server.go +++ b/pkg/api/server.go @@ -165,7 +165,6 @@ func (s *Server) startGRPC() error { wakuMsg := wakuEnv.Message() if topic.IsMLSV1(wakuMsg.ContentTopic) { - s.Log.Info("handling waku relay message in mlsv1 service", zap.String("topic", wakuMsg.ContentTopic)) if s.mlsv1 != nil { err := s.mlsv1.HandleIncomingWakuRelayMessage(wakuEnv.Message()) if err != nil { diff --git a/pkg/mls/api/v1/service.go b/pkg/mls/api/v1/service.go index 81ec3b23..c6779139 100644 --- a/pkg/mls/api/v1/service.go +++ b/pkg/mls/api/v1/service.go @@ -363,7 +363,6 @@ func (s *Service) QueryWelcomeMessages(ctx context.Context, req *mlsv1.QueryWelc func (s *Service) SubscribeGroupMessages(req *mlsv1.SubscribeGroupMessagesRequest, stream mlsv1.MlsApi_SubscribeGroupMessagesServer) error { log := s.log.Named("subscribe-group-messages").With(zap.Int("filters", len(req.Filters))) log.Info("subscription started") - defer log.Info("subscription ended") // Send a header (any header) to fix an issue with Tonic based GRPC clients. // See: https://github.com/xmtp/libxmtp/pull/58 _ = stream.SendHeader(metadata.Pairs("subscribed", "true")) @@ -371,9 +370,7 @@ func (s *Service) SubscribeGroupMessages(req *mlsv1.SubscribeGroupMessagesReques var streamLock sync.Mutex for _, filter := range req.Filters { natsSubject := buildNatsSubjectForGroupMessages(filter.GroupId) - log.Info("subscribing to nats subject", zap.String("subject", natsSubject), zap.String("group_id", hex.EncodeToString(filter.GroupId))) sub, err := s.nc.Subscribe(natsSubject, func(natsMsg *nats.Msg) { - log.Info("received message from nats") var msg mlsv1.GroupMessage err := pb.Unmarshal(natsMsg.Data, &msg) if err != nil { @@ -418,7 +415,6 @@ func (s *Service) SubscribeWelcomeMessages(req *mlsv1.SubscribeWelcomeMessagesRe for _, filter := range req.Filters { natsSubject := buildNatsSubjectForWelcomeMessages(filter.InstallationKey) sub, err := s.nc.Subscribe(natsSubject, func(natsMsg *nats.Msg) { - log.Info("received message from nats") var msg mlsv1.WelcomeMessage err := pb.Unmarshal(natsMsg.Data, &msg) if err != nil { From db7c40fddd40fd89d2209d33c4e261730ca384c2 Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Wed, 21 Feb 2024 12:24:50 -0500 Subject: [PATCH 3/4] MLS subscribe from cursor (#351) * feat: mls subscribe from cursor * Fix lint errors :cop: --- pkg/mls/api/v1/service.go | 144 ++++++++++++++++--- pkg/mls/api/v1/service_test.go | 252 ++++++++++++++++++++++++++++++++- pkg/mls/store/store.go | 9 +- 3 files changed, 381 insertions(+), 24 deletions(-) diff --git a/pkg/mls/api/v1/service.go b/pkg/mls/api/v1/service.go index c6779139..dba96243 100644 --- a/pkg/mls/api/v1/service.go +++ b/pkg/mls/api/v1/service.go @@ -367,8 +367,32 @@ func (s *Service) SubscribeGroupMessages(req *mlsv1.SubscribeGroupMessagesReques // See: https://github.com/xmtp/libxmtp/pull/58 _ = stream.SendHeader(metadata.Pairs("subscribed", "true")) - var streamLock sync.Mutex + streamed := map[string]*mlsv1.GroupMessage{} + var streamingLock sync.Mutex + streamMessages := func(msgs []*mlsv1.GroupMessage) { + streamingLock.Lock() + defer streamingLock.Unlock() + + for _, msg := range msgs { + if msg.GetV1() == nil { + continue + } + encodedId := fmt.Sprintf("%x", msg.GetV1().Id) + if _, ok := streamed[encodedId]; ok { + log.Debug("skipping already streamed message", zap.String("id", encodedId)) + continue + } + err := stream.Send(msg) + if err != nil { + log.Error("error streaming group message", zap.Error(err)) + } + streamed[encodedId] = msg + } + } + for _, filter := range req.Filters { + filter := filter + natsSubject := buildNatsSubjectForGroupMessages(filter.GroupId) sub, err := s.nc.Subscribe(natsSubject, func(natsMsg *nats.Msg) { var msg mlsv1.GroupMessage @@ -377,14 +401,7 @@ func (s *Service) SubscribeGroupMessages(req *mlsv1.SubscribeGroupMessagesReques log.Error("parsing group message from bytes", zap.Error(err)) return } - func() { - streamLock.Lock() - defer streamLock.Unlock() - err := stream.Send(&msg) - if err != nil { - log.Error("sending group message to subscribe", zap.Error(err)) - } - }() + streamMessages([]*mlsv1.GroupMessage{&msg}) }) if err != nil { log.Error("error subscribing to group messages", zap.Error(err)) @@ -393,6 +410,43 @@ func (s *Service) SubscribeGroupMessages(req *mlsv1.SubscribeGroupMessagesReques defer func() { _ = sub.Unsubscribe() }() + + if filter.IdCursor > 0 { + go func() { + pagingInfo := &mlsv1.PagingInfo{ + IdCursor: filter.IdCursor, + Direction: mlsv1.SortDirection_SORT_DIRECTION_ASCENDING, + } + for { + select { + case <-stream.Context().Done(): + return + case <-s.ctx.Done(): + return + default: + } + + resp, err := s.store.QueryGroupMessagesV1(stream.Context(), &mlsv1.QueryGroupMessagesRequest{ + GroupId: filter.GroupId, + PagingInfo: pagingInfo, + }) + if err != nil { + if err == context.Canceled { + return + } + log.Error("error querying for subscription cursor messages", zap.Error(err)) + return + } + + streamMessages(resp.Messages) + + if len(resp.Messages) == 0 || resp.PagingInfo == nil || resp.PagingInfo.IdCursor == 0 { + break + } + pagingInfo = resp.PagingInfo + } + }() + } } select { @@ -411,8 +465,32 @@ func (s *Service) SubscribeWelcomeMessages(req *mlsv1.SubscribeWelcomeMessagesRe // See: https://github.com/xmtp/libxmtp/pull/58 _ = stream.SendHeader(metadata.Pairs("subscribed", "true")) - var streamLock sync.Mutex + streamed := map[string]*mlsv1.WelcomeMessage{} + var streamingLock sync.Mutex + streamMessages := func(msgs []*mlsv1.WelcomeMessage) { + streamingLock.Lock() + defer streamingLock.Unlock() + + for _, msg := range msgs { + if msg.GetV1() == nil { + continue + } + encodedId := fmt.Sprintf("%x", msg.GetV1().Id) + if _, ok := streamed[encodedId]; ok { + log.Debug("skipping already streamed message", zap.String("id", encodedId)) + continue + } + err := stream.Send(msg) + if err != nil { + log.Error("error streaming welcome message", zap.Error(err)) + } + streamed[encodedId] = msg + } + } + for _, filter := range req.Filters { + filter := filter + natsSubject := buildNatsSubjectForWelcomeMessages(filter.InstallationKey) sub, err := s.nc.Subscribe(natsSubject, func(natsMsg *nats.Msg) { var msg mlsv1.WelcomeMessage @@ -421,14 +499,7 @@ func (s *Service) SubscribeWelcomeMessages(req *mlsv1.SubscribeWelcomeMessagesRe log.Error("parsing welcome message from bytes", zap.Error(err)) return } - func() { - streamLock.Lock() - defer streamLock.Unlock() - err := stream.Send(&msg) - if err != nil { - log.Error("sending welcome message to subscribe", zap.Error(err)) - } - }() + streamMessages([]*mlsv1.WelcomeMessage{&msg}) }) if err != nil { log.Error("error subscribing to welcome messages", zap.Error(err)) @@ -437,6 +508,43 @@ func (s *Service) SubscribeWelcomeMessages(req *mlsv1.SubscribeWelcomeMessagesRe defer func() { _ = sub.Unsubscribe() }() + + if filter.IdCursor > 0 { + go func() { + pagingInfo := &mlsv1.PagingInfo{ + IdCursor: filter.IdCursor, + Direction: mlsv1.SortDirection_SORT_DIRECTION_ASCENDING, + } + for { + select { + case <-stream.Context().Done(): + return + case <-s.ctx.Done(): + return + default: + } + + resp, err := s.store.QueryWelcomeMessagesV1(stream.Context(), &mlsv1.QueryWelcomeMessagesRequest{ + InstallationKey: filter.InstallationKey, + PagingInfo: pagingInfo, + }) + if err != nil { + if err == context.Canceled { + return + } + log.Error("error querying for subscription cursor messages", zap.Error(err)) + return + } + + streamMessages(resp.Messages) + + if len(resp.Messages) == 0 || resp.PagingInfo == nil || resp.PagingInfo.IdCursor == 0 { + break + } + pagingInfo = resp.PagingInfo + } + }() + } } select { diff --git a/pkg/mls/api/v1/service_test.go b/pkg/mls/api/v1/service_test.go index 4d4565ad..d098af6c 100644 --- a/pkg/mls/api/v1/service_test.go +++ b/pkg/mls/api/v1/service_test.go @@ -1,6 +1,7 @@ package api import ( + "bytes" "context" "errors" "fmt" @@ -337,7 +338,7 @@ func TestGetIdentityUpdates(t *testing.T) { require.Len(t, identityUpdates.Updates[0].Updates, 2) } -func TestSubscribeGroupMessages(t *testing.T) { +func TestSubscribeGroupMessages_WithoutCursor(t *testing.T) { ctx := context.Background() svc, _, _, cleanup := newTestService(t, ctx) defer cleanup() @@ -393,7 +394,108 @@ func TestSubscribeGroupMessages(t *testing.T) { require.Eventually(t, ctrl.Satisfied, 5*time.Second, 100*time.Millisecond) } -func TestSubscribeWelcomeMessages(t *testing.T) { +func TestSubscribeGroupMessages_WithCursor(t *testing.T) { + ctx := context.Background() + svc, _, mlsValidationService, cleanup := newTestService(t, ctx) + defer cleanup() + + groupId := []byte(test.RandomString(32)) + + // Initial message before stream starts. + mlsValidationService.mockValidateGroupMessages(groupId) + initialMsgs := []*mlsv1.GroupMessageInput{ + { + Version: &mlsv1.GroupMessageInput_V1_{ + V1: &mlsv1.GroupMessageInput_V1{ + Data: []byte("data1"), + }, + }, + }, + { + Version: &mlsv1.GroupMessageInput_V1_{ + V1: &mlsv1.GroupMessageInput_V1{ + Data: []byte("data2"), + }, + }, + }, + { + Version: &mlsv1.GroupMessageInput_V1_{ + V1: &mlsv1.GroupMessageInput_V1{ + Data: []byte("data3"), + }, + }, + }, + } + for _, msg := range initialMsgs { + _, err := svc.SendGroupMessages(ctx, &mlsv1.SendGroupMessagesRequest{ + Messages: []*mlsv1.GroupMessageInput{msg}, + }) + require.NoError(t, err) + } + + // Set of 10 messages that are included in the stream. + msgs := make([]*mlsv1.GroupMessage, 10) + for i := 0; i < 10; i++ { + msgs[i] = &mlsv1.GroupMessage{ + Version: &mlsv1.GroupMessage_V1_{ + V1: &mlsv1.GroupMessage_V1{ + Id: uint64(i + 4), + CreatedNs: uint64(i + 4), + GroupId: groupId, + Data: []byte(fmt.Sprintf("data%d", i+4)), + }, + }, + } + } + + // Set up expectations of streaming the 11 messages from cursor. + ctrl := gomock.NewController(t) + stream := NewMockMlsApi_SubscribeGroupMessagesServer(ctrl) + stream.EXPECT().SendHeader(map[string][]string{"subscribed": {"true"}}) + stream.EXPECT().Send(newGroupMessageIdAndDataEqualsMatcher(&mlsv1.GroupMessage{ + Version: &mlsv1.GroupMessage_V1_{ + V1: &mlsv1.GroupMessage_V1{ + Id: 3, + Data: []byte("data3"), + }, + }, + })).Return(nil).Times(1) + for _, msg := range msgs { + stream.EXPECT().Send(newGroupMessageEqualsMatcher(msg)).Return(nil).Times(1) + } + stream.EXPECT().Context().Return(ctx).AnyTimes() + + go func() { + err := svc.SubscribeGroupMessages(&mlsv1.SubscribeGroupMessagesRequest{ + Filters: []*mlsv1.SubscribeGroupMessagesRequest_Filter{ + { + GroupId: groupId, + IdCursor: 2, + }, + }, + }, stream) + require.NoError(t, err) + }() + time.Sleep(50 * time.Millisecond) + + // Send the 10 real-time messages. + for _, msg := range msgs { + msgB, err := proto.Marshal(msg) + require.NoError(t, err) + + err = svc.HandleIncomingWakuRelayMessage(&wakupb.WakuMessage{ + ContentTopic: topic.BuildMLSV1GroupTopic(msg.GetV1().GroupId), + Timestamp: int64(msg.GetV1().CreatedNs), + Payload: msgB, + }) + require.NoError(t, err) + } + + // Expectations should eventually be satisfied. + require.Eventually(t, ctrl.Satisfied, 5*time.Second, 100*time.Millisecond) +} + +func TestSubscribeWelcomeMessages_WithoutCursor(t *testing.T) { ctx := context.Background() svc, _, _, cleanup := newTestService(t, ctx) defer cleanup() @@ -450,6 +552,116 @@ func TestSubscribeWelcomeMessages(t *testing.T) { require.Eventually(t, ctrl.Satisfied, 5*time.Second, 100*time.Millisecond) } +func TestSubscribeWelcomeMessages_WithCursor(t *testing.T) { + ctx := context.Background() + svc, _, _, cleanup := newTestService(t, ctx) + defer cleanup() + + installationKey := []byte(test.RandomString(32)) + hpkePublicKey := []byte(test.RandomString(32)) + + // Initial message before stream starts. + initialMsgs := []*mlsv1.WelcomeMessageInput{ + { + Version: &mlsv1.WelcomeMessageInput_V1_{ + V1: &mlsv1.WelcomeMessageInput_V1{ + InstallationKey: installationKey, + HpkePublicKey: hpkePublicKey, + Data: []byte("data1"), + }, + }, + }, + { + Version: &mlsv1.WelcomeMessageInput_V1_{ + V1: &mlsv1.WelcomeMessageInput_V1{ + InstallationKey: installationKey, + HpkePublicKey: hpkePublicKey, + Data: []byte("data2"), + }, + }, + }, + { + Version: &mlsv1.WelcomeMessageInput_V1_{ + V1: &mlsv1.WelcomeMessageInput_V1{ + InstallationKey: installationKey, + HpkePublicKey: hpkePublicKey, + Data: []byte("data3"), + }, + }, + }, + } + for _, msg := range initialMsgs { + _, err := svc.SendWelcomeMessages(ctx, &mlsv1.SendWelcomeMessagesRequest{ + Messages: []*mlsv1.WelcomeMessageInput{msg}, + }) + require.NoError(t, err) + } + + // Set of 10 messages that are included in the stream. + msgs := make([]*mlsv1.WelcomeMessage, 10) + for i := 0; i < 10; i++ { + msgs[i] = &mlsv1.WelcomeMessage{ + Version: &mlsv1.WelcomeMessage_V1_{ + V1: &mlsv1.WelcomeMessage_V1{ + Id: uint64(i + 4), + CreatedNs: uint64(i + 4), + InstallationKey: installationKey, + HpkePublicKey: hpkePublicKey, + Data: []byte(fmt.Sprintf("data%d", i+4)), + }, + }, + } + } + + // Set up expectations of streaming the 11 messages from cursor. + ctrl := gomock.NewController(t) + stream := NewMockMlsApi_SubscribeWelcomeMessagesServer(ctrl) + stream.EXPECT().SendHeader(map[string][]string{"subscribed": {"true"}}) + stream.EXPECT().Send(newWelcomeMessageEqualsMatcherWithoutTimestamp(&mlsv1.WelcomeMessage{ + Version: &mlsv1.WelcomeMessage_V1_{ + V1: &mlsv1.WelcomeMessage_V1{ + Id: 3, + InstallationKey: installationKey, + HpkePublicKey: hpkePublicKey, + Data: []byte("data3"), + }, + }, + })).Return(nil).Times(1) + for _, msg := range msgs { + stream.EXPECT().Send(newWelcomeMessageEqualsMatcher(msg)).Return(nil).Times(1) + } + stream.EXPECT().Context().Return(ctx).AnyTimes() + + go func() { + err := svc.SubscribeWelcomeMessages(&mlsv1.SubscribeWelcomeMessagesRequest{ + Filters: []*mlsv1.SubscribeWelcomeMessagesRequest_Filter{ + { + InstallationKey: installationKey, + IdCursor: 2, + }, + }, + }, stream) + require.NoError(t, err) + }() + time.Sleep(50 * time.Millisecond) + + // Send the 10 real-time messages. + for _, msg := range msgs { + msgB, err := proto.Marshal(msg) + require.NoError(t, err) + + err = svc.HandleIncomingWakuRelayMessage(&wakupb.WakuMessage{ + ContentTopic: topic.BuildMLSV1WelcomeTopic(msg.GetV1().InstallationKey), + Timestamp: int64(msg.GetV1().CreatedNs), + Payload: msgB, + }) + require.NoError(t, err) + } + + // Expectations should eventually be satisfied. + require.Eventually(t, ctrl.Satisfied, 5*time.Second, 100*time.Millisecond) +} + type groupMessageEqualsMatcher struct { obj *mlsv1.GroupMessage } @@ -466,6 +678,23 @@ func (m *groupMessageEqualsMatcher) String() string { return m.obj.String() } +type groupMessageIdAndDataEqualsMatcher struct { + obj *mlsv1.GroupMessage +} + +func newGroupMessageIdAndDataEqualsMatcher(obj *mlsv1.GroupMessage) *groupMessageIdAndDataEqualsMatcher { + return &groupMessageIdAndDataEqualsMatcher{obj} +} + +func (m *groupMessageIdAndDataEqualsMatcher) Matches(obj interface{}) bool { + return m.obj.GetV1().Id == obj.(*mlsv1.GroupMessage).GetV1().Id && + bytes.Equal(m.obj.GetV1().Data, obj.(*mlsv1.GroupMessage).GetV1().Data) +} + +func (m *groupMessageIdAndDataEqualsMatcher) String() string { + return m.obj.String() +} + type welcomeMessageEqualsMatcher struct { obj *mlsv1.WelcomeMessage } @@ -481,3 +710,22 @@ func (m *welcomeMessageEqualsMatcher) Matches(obj interface{}) bool { func (m *welcomeMessageEqualsMatcher) String() string { return m.obj.String() } + +type welcomeMessageEqualsMatcherWithoutTimestamp struct { + obj *mlsv1.WelcomeMessage +} + +func newWelcomeMessageEqualsMatcherWithoutTimestamp(obj *mlsv1.WelcomeMessage) *welcomeMessageEqualsMatcherWithoutTimestamp { + return &welcomeMessageEqualsMatcherWithoutTimestamp{obj} +} + +func (m *welcomeMessageEqualsMatcherWithoutTimestamp) Matches(obj interface{}) bool { + return m.obj.GetV1().Id == obj.(*mlsv1.WelcomeMessage).GetV1().Id && + bytes.Equal(m.obj.GetV1().InstallationKey, obj.(*mlsv1.WelcomeMessage).GetV1().InstallationKey) && + bytes.Equal(m.obj.GetV1().HpkePublicKey, obj.(*mlsv1.WelcomeMessage).GetV1().HpkePublicKey) && + bytes.Equal(m.obj.GetV1().Data, obj.(*mlsv1.WelcomeMessage).GetV1().Data) +} + +func (m *welcomeMessageEqualsMatcherWithoutTimestamp) String() string { + return m.obj.String() +} diff --git a/pkg/mls/store/store.go b/pkg/mls/store/store.go index 8810f703..64c94b32 100644 --- a/pkg/mls/store/store.go +++ b/pkg/mls/store/store.go @@ -328,10 +328,11 @@ func (s *Store) QueryWelcomeMessagesV1(ctx context.Context, req *mlsv1.QueryWelc messages = append(messages, &mlsv1.WelcomeMessage{ Version: &mlsv1.WelcomeMessage_V1_{ V1: &mlsv1.WelcomeMessage_V1{ - Id: msg.Id, - CreatedNs: uint64(msg.CreatedAt.UnixNano()), - Data: msg.Data, - HpkePublicKey: msg.HpkePublicKey, + Id: msg.Id, + CreatedNs: uint64(msg.CreatedAt.UnixNano()), + Data: msg.Data, + InstallationKey: msg.InstallationKey, + HpkePublicKey: msg.HpkePublicKey, }, }, }) From f91a285bdf19dae62551cbe065dc200055e82517 Mon Sep 17 00:00:00 2001 From: Nicholas Molnar <65710+neekolas@users.noreply.github.com> Date: Wed, 21 Feb 2024 09:35:27 -0800 Subject: [PATCH 4/4] Extend deployment timeouts (#357) * Extend deployment timeouts * Remove from e2e --- .github/workflows/deploy.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index c97b1d19..ec6c4676 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -35,6 +35,7 @@ jobs: - name: Deploy (dev) uses: xmtp-labs/terraform-deployer@v1 + timeout-minutes: 60 with: terraform-token: ${{ secrets.TERRAFORM_TOKEN }} terraform-org: xmtp @@ -45,6 +46,7 @@ jobs: - name: Deploy (production) uses: xmtp-labs/terraform-deployer@v1 + timeout-minutes: 60 with: terraform-token: ${{ secrets.TERRAFORM_TOKEN }} terraform-org: xmtp