Skip to content

Commit

Permalink
fix: no need for full mls messages over pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
Steven Normore committed Jan 19, 2024
1 parent 4e32c31 commit 980a514
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 63 deletions.
67 changes: 17 additions & 50 deletions pkg/mls/api/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
pb "google.golang.org/protobuf/proto"
emptypb "google.golang.org/protobuf/types/known/emptypb"
)

Expand Down Expand Up @@ -88,28 +87,18 @@ func (s *Service) Close() {

func (s *Service) HandleIncomingWakuRelayMessage(wakuMsg *wakupb.WakuMessage) error {
if topic.IsMLSV1Group(wakuMsg.ContentTopic) {
var msg mlsv1.GroupMessage
err := pb.Unmarshal(wakuMsg.Payload, &msg)
if err != nil {
return err
}
if msg.GetV1() == nil {
return nil
}
err = s.nc.Publish(buildNatsSubjectForGroupMessages(msg.GetV1().GroupId), wakuMsg.Payload)
// The waku message payload is just the group ID as bytes since we only
// need to use it as a signal that a new message was published, without
// any other content.
err := s.nc.Publish(buildNatsSubjectForGroupMessages(wakuMsg.Payload), wakuMsg.Payload)
if err != nil {
return err
}
} else if topic.IsMLSV1Welcome(wakuMsg.ContentTopic) {
var msg mlsv1.WelcomeMessage
err := pb.Unmarshal(wakuMsg.Payload, &msg)
if err != nil {
return err
}
if msg.GetV1() == nil {
return nil
}
err = s.nc.Publish(buildNatsSubjectForWelcomeMessages(msg.GetV1().InstallationKey), wakuMsg.Payload)
// The waku message payload is just the installation key as bytes since
// we only need to use it as a signal that a new message was published,
// without any other content.
err := s.nc.Publish(buildNatsSubjectForWelcomeMessages(wakuMsg.Payload), wakuMsg.Payload)
if err != nil {
return err
}
Expand Down Expand Up @@ -261,24 +250,13 @@ func (s *Service) SendGroupMessages(ctx context.Context, req *mlsv1.SendGroupMes
return nil, status.Errorf(codes.Internal, "failed to insert message: %s", err)
}

msgB, err := pb.Marshal(&mlsv1.GroupMessage{
Version: &mlsv1.GroupMessage_V1_{
V1: &mlsv1.GroupMessage_V1{
Id: msg.Id,
CreatedNs: uint64(msg.CreatedAt.UnixNano()),
GroupId: msg.GroupId,
Data: msg.Data,
},
},
})
if err != nil {
return nil, err
}

err = s.publishToWakuRelay(ctx, &wakupb.WakuMessage{
ContentTopic: topic.BuildMLSV1GroupTopic(decodedGroupId),
Timestamp: msg.CreatedAt.UnixNano(),
Payload: msgB,
// The waku message payload is just the group ID as bytes since we
// only need to use it as a signal that a new message was
// published, without any other content.
Payload: msg.GroupId,
})
if err != nil {
return nil, err
Expand All @@ -303,24 +281,13 @@ func (s *Service) SendWelcomeMessages(ctx context.Context, req *mlsv1.SendWelcom
return nil, status.Errorf(codes.Internal, "failed to insert message: %s", err)
}

msgB, err := pb.Marshal(&mlsv1.WelcomeMessage{
Version: &mlsv1.WelcomeMessage_V1_{
V1: &mlsv1.WelcomeMessage_V1{
Id: msg.Id,
CreatedNs: uint64(msg.CreatedAt.UnixNano()),
InstallationKey: msg.InstallationKey,
Data: msg.Data,
},
},
})
if err != nil {
return nil, err
}

err = s.publishToWakuRelay(ctx, &wakupb.WakuMessage{
ContentTopic: topic.BuildMLSV1WelcomeTopic(input.GetV1().InstallationKey),
ContentTopic: topic.BuildMLSV1WelcomeTopic(msg.InstallationKey),
Timestamp: msg.CreatedAt.UnixNano(),
Payload: msgB,
// The waku message payload is just the installation key as bytes
// since we only need to use it as a signal that a new message was
// published, without any other content.
Payload: msg.InstallationKey,
})
if err != nil {
return nil, err
Expand Down
17 changes: 4 additions & 13 deletions pkg/mls/api/v1/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
test "github.com/xmtp/xmtp-node-go/pkg/testing"
"github.com/xmtp/xmtp-node-go/pkg/topic"
"go.uber.org/mock/gomock"
"google.golang.org/protobuf/proto"
)

type mockedMLSValidationService struct {
Expand Down Expand Up @@ -409,12 +408,10 @@ func TestSubscribeGroupMessages_WithoutCursor(t *testing.T) {
})
require.NoError(t, err)

msgB, err := proto.Marshal(msg)
require.NoError(t, err)
err = svc.HandleIncomingWakuRelayMessage(&wakupb.WakuMessage{
ContentTopic: topic.BuildMLSV1GroupTopic(msg.GetV1().GroupId),
Timestamp: int64(msg.GetV1().CreatedNs),
Payload: msgB,
Payload: msg.GetV1().GroupId,
})
require.NoError(t, err)

Expand Down Expand Up @@ -524,12 +521,10 @@ func TestSubscribeGroupMessages_WithCursor(t *testing.T) {
})
require.NoError(t, err)

msgB, err := proto.Marshal(msg)
require.NoError(t, err)
err = svc.HandleIncomingWakuRelayMessage(&wakupb.WakuMessage{
ContentTopic: topic.BuildMLSV1GroupTopic(msg.GetV1().GroupId),
Timestamp: int64(msg.GetV1().CreatedNs),
Payload: msgB,
Payload: msg.GetV1().GroupId,
})
require.NoError(t, err)

Expand Down Expand Up @@ -614,12 +609,10 @@ func TestSubscribeWelcomeMessages_WithoutCursor(t *testing.T) {
})
require.NoError(t, err)

msgB, err := proto.Marshal(msg)
require.NoError(t, err)
err = svc.HandleIncomingWakuRelayMessage(&wakupb.WakuMessage{
ContentTopic: topic.BuildMLSV1WelcomeTopic(msg.GetV1().InstallationKey),
Timestamp: int64(msg.GetV1().CreatedNs),
Payload: msgB,
Payload: msg.GetV1().InstallationKey,
})
require.NoError(t, err)

Expand Down Expand Up @@ -731,12 +724,10 @@ func TestSubscribeWelcomeMessages_WithCursor(t *testing.T) {
})
require.NoError(t, err)

msgB, err := proto.Marshal(msg)
require.NoError(t, err)
err = svc.HandleIncomingWakuRelayMessage(&wakupb.WakuMessage{
ContentTopic: topic.BuildMLSV1WelcomeTopic(msg.GetV1().InstallationKey),
Timestamp: int64(msg.GetV1().CreatedNs),
Payload: msgB,
Payload: msg.GetV1().InstallationKey,
})
require.NoError(t, err)

Expand Down

0 comments on commit 980a514

Please sign in to comment.