Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more MLS methods #321

Merged
merged 37 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
94460bc
Scaffold MLS server
neekolas Oct 19, 2023
ae8023f
Update go.mod
neekolas Oct 19, 2023
0fccac9
Fix missing argument
neekolas Oct 19, 2023
9f98795
Add unsaved file
neekolas Oct 19, 2023
d496271
Lint
neekolas Oct 19, 2023
0eda254
Working end-to-end
neekolas Oct 23, 2023
2db5d9d
Merge branch 'mls' into validation-service-scaffold
neekolas Oct 23, 2023
a3f7352
Lint
neekolas Oct 23, 2023
b833901
Add new push action
neekolas Oct 23, 2023
f9f1c4e
Add a bunch of new endpoints
neekolas Oct 24, 2023
8c94283
Address review comments
neekolas Oct 24, 2023
da11034
Change method casing
neekolas Oct 24, 2023
e09fca1
Change casing of server options
neekolas Oct 24, 2023
293053a
Change casing of validation options
neekolas Oct 24, 2023
8849ad5
Remove unused function
neekolas Oct 24, 2023
c00c75b
Merge branch 'validation-service-scaffold' into nmolnar/message-publi…
neekolas Oct 24, 2023
17ea986
Remove double pointer
neekolas Oct 24, 2023
b8b8b4d
Make private again
neekolas Oct 24, 2023
dd63aba
Merge branch 'validation-service-scaffold' into nmolnar/message-publi…
neekolas Oct 24, 2023
08266da
Fix pointer to key package
neekolas Oct 24, 2023
0ba5291
Capitalize more things
neekolas Oct 24, 2023
1e68170
Update server fields
neekolas Oct 24, 2023
e3ab161
Merge branch 'mls' into nmolnar/message-publishing-services
neekolas Oct 24, 2023
911a0db
Add test for sort methods
neekolas Oct 24, 2023
d0422ba
Save change to capitalization
neekolas Oct 24, 2023
a50e30d
Fix lint warnings
neekolas Oct 24, 2023
24c8c2a
Fix problem with mocks
neekolas Oct 24, 2023
bd821d9
Fix index name
neekolas Oct 24, 2023
6b57691
Move sorting to the store
neekolas Oct 24, 2023
cd38308
Fix ciphertext validation
neekolas Oct 24, 2023
cfffd9b
Make installation_id bytes
neekolas Oct 25, 2023
70d7c7d
Add missing credential identity
neekolas Oct 25, 2023
168b78a
Hack sql in query
neekolas Oct 26, 2023
730065d
Revert "Hack sql in query"
neekolas Oct 26, 2023
e7a4f18
Remove custom type
neekolas Oct 26, 2023
cdde011
Update to latest protos
neekolas Oct 26, 2023
2a798ec
Add CredentialIdentity
neekolas Oct 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 185 additions & 21 deletions pkg/api/message/v3/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import (
"context"

wakunode "github.com/waku-org/go-waku/waku/v2/node"
wakupb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
proto "github.com/xmtp/proto/v3/go/message_api/v3"
"github.com/xmtp/xmtp-node-go/pkg/metrics"
"github.com/xmtp/xmtp-node-go/pkg/mlsstore"
"github.com/xmtp/xmtp-node-go/pkg/mlsvalidate"
"github.com/xmtp/xmtp-node-go/pkg/store"
"github.com/xmtp/xmtp-node-go/pkg/topic"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -22,9 +25,6 @@ type Service struct {
messageStore *store.Store
mlsStore mlsstore.MlsStore
validationService mlsvalidate.MLSValidationService

ctx context.Context
ctxCancel func()
}

func NewService(node *wakunode.WakuNode, logger *zap.Logger, messageStore *store.Store, mlsStore mlsstore.MlsStore, validationService mlsvalidate.MLSValidationService) (s *Service, err error) {
Expand All @@ -36,18 +36,15 @@ func NewService(node *wakunode.WakuNode, logger *zap.Logger, messageStore *store
validationService: validationService,
}

s.ctx, s.ctxCancel = context.WithCancel(context.Background())

s.log.Info("Starting MLS service")
return s, nil
}

func (s *Service) Close() {
if s.ctxCancel != nil {
s.ctxCancel()
func (s *Service) RegisterInstallation(ctx context.Context, req *proto.RegisterInstallationRequest) (*proto.RegisterInstallationResponse, error) {
if err := validateRegisterInstallationRequest(req); err != nil {
return nil, err
}
}

func (s *Service) RegisterInstallation(ctx context.Context, req *proto.RegisterInstallationRequest) (*proto.RegisterInstallationResponse, error) {
results, err := s.validationService.ValidateKeyPackages(ctx, [][]byte{req.LastResortKeyPackage.KeyPackageTlsSerialized})
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid identity: %s", err)
Expand All @@ -59,8 +56,7 @@ func (s *Service) RegisterInstallation(ctx context.Context, req *proto.RegisterI
installationId := results[0].InstallationId
walletAddress := results[0].WalletAddress

err = s.mlsStore.CreateInstallation(ctx, installationId, walletAddress, req.LastResortKeyPackage.KeyPackageTlsSerialized)
if err != nil {
if err = s.mlsStore.CreateInstallation(ctx, installationId, walletAddress, req.LastResortKeyPackage.KeyPackageTlsSerialized); err != nil {
return nil, err
}

Expand Down Expand Up @@ -98,15 +94,82 @@ func (s *Service) ConsumeKeyPackages(ctx context.Context, req *proto.ConsumeKeyP
}, nil
}

func (s *Service) PublishToGroup(ctx context.Context, req *proto.PublishToGroupRequest) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "unimplemented")
func (s *Service) PublishToGroup(ctx context.Context, req *proto.PublishToGroupRequest) (res *emptypb.Empty, err error) {
if err = validatePublishToGroupRequest(req); err != nil {
return nil, err
}

messages := make([][]byte, len(req.Messages))
for i, message := range req.Messages {
v1 := message.GetV1()
if v1 == nil {
return nil, status.Errorf(codes.InvalidArgument, "message must be v1")
}
messages[i] = v1.MlsMessageTlsSerialized
}

validationResults, err := s.validationService.ValidateGroupMessages(ctx, messages)
if err != nil {
// TODO: Separate validation errors from internal errors
return nil, status.Errorf(codes.InvalidArgument, "invalid group message: %s", err)
}

for i, result := range validationResults {
message := messages[i]

if err = isReadyToSend(result.GroupId, message); err != nil {
return nil, err
}

// TODO: Wrap this in a transaction so publishing is all or nothing
if err = s.publishMessage(ctx, topic.BuildGroupTopic(result.GroupId), message); err != nil {
return nil, status.Errorf(codes.Internal, "failed to publish message: %s", err)
}
}

return &emptypb.Empty{}, nil
}

func (s *Service) PublishWelcomes(ctx context.Context, req *proto.PublishWelcomesRequest) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "unimplemented")
func (s *Service) publishMessage(ctx context.Context, contentTopic string, message []byte) error {
log := s.log.Named("publish-mls").With(zap.String("content_topic", contentTopic))
env, err := s.messageStore.InsertMlsMessage(ctx, contentTopic, message)
if err != nil {
return status.Errorf(codes.Internal, "failed to insert message: %s", err)
}

if _, err = s.waku.Relay().Publish(ctx, &wakupb.WakuMessage{
ContentTopic: contentTopic,
Timestamp: int64(env.TimestampNs),
Payload: message,
}); err != nil {
return status.Errorf(codes.Internal, "failed to publish message: %s", err)
}

metrics.EmitPublishedEnvelope(ctx, log, env)

return nil
}

func (s *Service) UploadKeyPackages(ctx context.Context, req *proto.UploadKeyPackagesRequest) (*emptypb.Empty, error) {
func (s *Service) PublishWelcomes(ctx context.Context, req *proto.PublishWelcomesRequest) (res *emptypb.Empty, err error) {
if err = validatePublishWelcomesRequest(req); err != nil {
return nil, err
}

// TODO: Wrap this in a transaction so publishing is all or nothing
for _, welcome := range req.WelcomeMessages {
contentTopic := topic.BuildWelcomeTopic(welcome.InstallationId)
if err = s.publishMessage(ctx, contentTopic, welcome.WelcomeMessage.GetV1().Ciphertext); err != nil {
return nil, status.Errorf(codes.Internal, "failed to publish welcome message: %s", err)
}
}
return &emptypb.Empty{}, nil
}

func (s *Service) UploadKeyPackages(ctx context.Context, req *proto.UploadKeyPackagesRequest) (res *emptypb.Empty, err error) {
if err = validateUploadKeyPackagesRequest(req); err != nil {
return nil, err
}
// Extract the key packages from the request
keyPackageBytes := make([][]byte, len(req.KeyPackages))
for i, keyPackage := range req.KeyPackages {
keyPackageBytes[i] = keyPackage.KeyPackageTlsSerialized
Expand All @@ -122,8 +185,8 @@ func (s *Service) UploadKeyPackages(ctx context.Context, req *proto.UploadKeyPac
kp := mlsstore.NewKeyPackage(validationResult.InstallationId, keyPackageBytes[i], false)
keyPackageModels[i] = kp
}
err = s.mlsStore.InsertKeyPackages(ctx, keyPackageModels)
if err != nil {

if err = s.mlsStore.InsertKeyPackages(ctx, keyPackageModels); err != nil {
return nil, status.Errorf(codes.Internal, "failed to insert key packages: %s", err)
}

Expand All @@ -134,6 +197,107 @@ func (s *Service) RevokeInstallation(ctx context.Context, req *proto.RevokeInsta
return nil, status.Errorf(codes.Unimplemented, "unimplemented")
}

func (s *Service) GetIdentityUpdates(ctx context.Context, req *proto.GetIdentityUpdatesRequest) (*proto.GetIdentityUpdatesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "unimplemented")
func (s *Service) GetIdentityUpdates(ctx context.Context, req *proto.GetIdentityUpdatesRequest) (res *proto.GetIdentityUpdatesResponse, err error) {
if err = validateGetIdentityUpdatesRequest(req); err != nil {
return nil, err
}

walletAddresses := req.WalletAddresses
updates, err := s.mlsStore.GetIdentityUpdates(ctx, req.WalletAddresses, int64(req.StartTimeNs))
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get identity updates: %s", err)
}

resUpdates := make([]*proto.GetIdentityUpdatesResponse_WalletUpdates, len(walletAddresses))
for i, walletAddress := range walletAddresses {
walletUpdates := updates[walletAddress]

resUpdates[i] = &proto.GetIdentityUpdatesResponse_WalletUpdates{
Updates: []*proto.GetIdentityUpdatesResponse_Update{},
}

for _, walletUpdate := range walletUpdates {
resUpdates[i].Updates = append(resUpdates[i].Updates, buildIdentityUpdate(walletUpdate))
}
}

return &proto.GetIdentityUpdatesResponse{
Updates: resUpdates,
}, nil
}

func buildIdentityUpdate(update mlsstore.IdentityUpdate) *proto.GetIdentityUpdatesResponse_Update {
base := proto.GetIdentityUpdatesResponse_Update{
TimestampNs: update.TimestampNs,
}
switch update.Kind {
case mlsstore.Create:
base.Kind = &proto.GetIdentityUpdatesResponse_Update_NewInstallation{
NewInstallation: &proto.GetIdentityUpdatesResponse_NewInstallationUpdate{
InstallationId: update.InstallationId,
},
}
case mlsstore.Revoke:
base.Kind = &proto.GetIdentityUpdatesResponse_Update_RevokedInstallation{
RevokedInstallation: &proto.GetIdentityUpdatesResponse_RevokedInstallationUpdate{
InstallationId: update.InstallationId,
},
}
}

return &base
}

func validatePublishToGroupRequest(req *proto.PublishToGroupRequest) error {
if req == nil || len(req.Messages) == 0 {
return status.Errorf(codes.InvalidArgument, "no messages to publish")
}
return nil
}

func validatePublishWelcomesRequest(req *proto.PublishWelcomesRequest) error {
if req == nil || len(req.WelcomeMessages) == 0 {
return status.Errorf(codes.InvalidArgument, "no welcome messages to publish")
}
for _, welcome := range req.WelcomeMessages {
if welcome == nil || welcome.WelcomeMessage == nil {
return status.Errorf(codes.InvalidArgument, "invalid welcome message")
}
ciphertext := welcome.WelcomeMessage.GetV1().Ciphertext
if len(ciphertext) == 0 {
return status.Errorf(codes.InvalidArgument, "invalid welcome message")
}
}
return nil
}

func validateRegisterInstallationRequest(req *proto.RegisterInstallationRequest) error {
if req == nil || req.LastResortKeyPackage == nil {
return status.Errorf(codes.InvalidArgument, "no last resort key package")
}
return nil
}

func validateUploadKeyPackagesRequest(req *proto.UploadKeyPackagesRequest) error {
if req == nil || len(req.KeyPackages) == 0 {
return status.Errorf(codes.InvalidArgument, "no key packages to upload")
}
return nil
}

func validateGetIdentityUpdatesRequest(req *proto.GetIdentityUpdatesRequest) error {
if req == nil || len(req.WalletAddresses) == 0 {
return status.Errorf(codes.InvalidArgument, "no wallet addresses to get updates for")
}
return nil
}

func isReadyToSend(groupId string, message []byte) error {
neekolas marked this conversation as resolved.
Show resolved Hide resolved
if groupId == "" {
return status.Errorf(codes.InvalidArgument, "group id is empty")
}
if len(message) == 0 {
return status.Errorf(codes.InvalidArgument, "message is empty")
}
return nil
}
Loading