From 980a5142e7198b50dc5ad22c42db596f764440ff Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Fri, 19 Jan 2024 18:57:09 -0500 Subject: [PATCH] fix: no need for full mls messages over pubsub --- pkg/mls/api/v1/service.go | 67 +++++++++------------------------- pkg/mls/api/v1/service_test.go | 17 ++------- 2 files changed, 21 insertions(+), 63 deletions(-) diff --git a/pkg/mls/api/v1/service.go b/pkg/mls/api/v1/service.go index 83359c65..7fcdcb68 100644 --- a/pkg/mls/api/v1/service.go +++ b/pkg/mls/api/v1/service.go @@ -20,7 +20,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" - pb "google.golang.org/protobuf/proto" emptypb "google.golang.org/protobuf/types/known/emptypb" ) @@ -88,28 +87,18 @@ func (s *Service) Close() { func (s *Service) HandleIncomingWakuRelayMessage(wakuMsg *wakupb.WakuMessage) error { if topic.IsMLSV1Group(wakuMsg.ContentTopic) { - var msg mlsv1.GroupMessage - err := pb.Unmarshal(wakuMsg.Payload, &msg) - if err != nil { - return err - } - if msg.GetV1() == nil { - return nil - } - err = s.nc.Publish(buildNatsSubjectForGroupMessages(msg.GetV1().GroupId), wakuMsg.Payload) + // The waku message payload is just the group ID as bytes since we only + // need to use it as a signal that a new message was published, without + // any other content. + err := s.nc.Publish(buildNatsSubjectForGroupMessages(wakuMsg.Payload), wakuMsg.Payload) if err != nil { return err } } else if topic.IsMLSV1Welcome(wakuMsg.ContentTopic) { - var msg mlsv1.WelcomeMessage - err := pb.Unmarshal(wakuMsg.Payload, &msg) - if err != nil { - return err - } - if msg.GetV1() == nil { - return nil - } - err = s.nc.Publish(buildNatsSubjectForWelcomeMessages(msg.GetV1().InstallationKey), wakuMsg.Payload) + // The waku message payload is just the installation key as bytes since + // we only need to use it as a signal that a new message was published, + // without any other content. + err := s.nc.Publish(buildNatsSubjectForWelcomeMessages(wakuMsg.Payload), wakuMsg.Payload) if err != nil { return err } @@ -261,24 +250,13 @@ func (s *Service) SendGroupMessages(ctx context.Context, req *mlsv1.SendGroupMes return nil, status.Errorf(codes.Internal, "failed to insert message: %s", err) } - msgB, err := pb.Marshal(&mlsv1.GroupMessage{ - Version: &mlsv1.GroupMessage_V1_{ - V1: &mlsv1.GroupMessage_V1{ - Id: msg.Id, - CreatedNs: uint64(msg.CreatedAt.UnixNano()), - GroupId: msg.GroupId, - Data: msg.Data, - }, - }, - }) - if err != nil { - return nil, err - } - err = s.publishToWakuRelay(ctx, &wakupb.WakuMessage{ ContentTopic: topic.BuildMLSV1GroupTopic(decodedGroupId), Timestamp: msg.CreatedAt.UnixNano(), - Payload: msgB, + // The waku message payload is just the group ID as bytes since we + // only need to use it as a signal that a new message was + // published, without any other content. + Payload: msg.GroupId, }) if err != nil { return nil, err @@ -303,24 +281,13 @@ func (s *Service) SendWelcomeMessages(ctx context.Context, req *mlsv1.SendWelcom return nil, status.Errorf(codes.Internal, "failed to insert message: %s", err) } - msgB, err := pb.Marshal(&mlsv1.WelcomeMessage{ - Version: &mlsv1.WelcomeMessage_V1_{ - V1: &mlsv1.WelcomeMessage_V1{ - Id: msg.Id, - CreatedNs: uint64(msg.CreatedAt.UnixNano()), - InstallationKey: msg.InstallationKey, - Data: msg.Data, - }, - }, - }) - if err != nil { - return nil, err - } - err = s.publishToWakuRelay(ctx, &wakupb.WakuMessage{ - ContentTopic: topic.BuildMLSV1WelcomeTopic(input.GetV1().InstallationKey), + ContentTopic: topic.BuildMLSV1WelcomeTopic(msg.InstallationKey), Timestamp: msg.CreatedAt.UnixNano(), - Payload: msgB, + // The waku message payload is just the installation key as bytes + // since we only need to use it as a signal that a new message was + // published, without any other content. + Payload: msg.InstallationKey, }) if err != nil { return nil, err diff --git a/pkg/mls/api/v1/service_test.go b/pkg/mls/api/v1/service_test.go index 0152a2ab..168c18ef 100644 --- a/pkg/mls/api/v1/service_test.go +++ b/pkg/mls/api/v1/service_test.go @@ -18,7 +18,6 @@ import ( test "github.com/xmtp/xmtp-node-go/pkg/testing" "github.com/xmtp/xmtp-node-go/pkg/topic" "go.uber.org/mock/gomock" - "google.golang.org/protobuf/proto" ) type mockedMLSValidationService struct { @@ -409,12 +408,10 @@ func TestSubscribeGroupMessages_WithoutCursor(t *testing.T) { }) require.NoError(t, err) - msgB, err := proto.Marshal(msg) - require.NoError(t, err) err = svc.HandleIncomingWakuRelayMessage(&wakupb.WakuMessage{ ContentTopic: topic.BuildMLSV1GroupTopic(msg.GetV1().GroupId), Timestamp: int64(msg.GetV1().CreatedNs), - Payload: msgB, + Payload: msg.GetV1().GroupId, }) require.NoError(t, err) @@ -524,12 +521,10 @@ func TestSubscribeGroupMessages_WithCursor(t *testing.T) { }) require.NoError(t, err) - msgB, err := proto.Marshal(msg) - require.NoError(t, err) err = svc.HandleIncomingWakuRelayMessage(&wakupb.WakuMessage{ ContentTopic: topic.BuildMLSV1GroupTopic(msg.GetV1().GroupId), Timestamp: int64(msg.GetV1().CreatedNs), - Payload: msgB, + Payload: msg.GetV1().GroupId, }) require.NoError(t, err) @@ -614,12 +609,10 @@ func TestSubscribeWelcomeMessages_WithoutCursor(t *testing.T) { }) require.NoError(t, err) - msgB, err := proto.Marshal(msg) - require.NoError(t, err) err = svc.HandleIncomingWakuRelayMessage(&wakupb.WakuMessage{ ContentTopic: topic.BuildMLSV1WelcomeTopic(msg.GetV1().InstallationKey), Timestamp: int64(msg.GetV1().CreatedNs), - Payload: msgB, + Payload: msg.GetV1().InstallationKey, }) require.NoError(t, err) @@ -731,12 +724,10 @@ func TestSubscribeWelcomeMessages_WithCursor(t *testing.T) { }) require.NoError(t, err) - msgB, err := proto.Marshal(msg) - require.NoError(t, err) err = svc.HandleIncomingWakuRelayMessage(&wakupb.WakuMessage{ ContentTopic: topic.BuildMLSV1WelcomeTopic(msg.GetV1().InstallationKey), Timestamp: int64(msg.GetV1().CreatedNs), - Payload: msgB, + Payload: msg.GetV1().InstallationKey, }) require.NoError(t, err)