Skip to content

Commit

Permalink
Scaffold MLS server
Browse files Browse the repository at this point in the history
  • Loading branch information
neekolas committed Oct 19, 2023
1 parent c025a6f commit 94460bc
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 8 deletions.
7 changes: 7 additions & 0 deletions dev/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ services:
POSTGRES_PASSWORD: xmtp
ports:
- 6543:5432
mls-db:
image: postgres:13
environment:
POSTGRES_PASSWORD: xmtp
ports:
- 7654:5432

prometheus:
image: prom/prometheus
ports:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/uptrace/bun/driver/pgdriver v1.1.3
github.com/waku-org/go-waku v0.8.0
github.com/xmtp/go-msgio v0.2.1-0.20220510223757-25a701b79cd3
github.com/xmtp/proto/v3 v3.27.0
github.com/xmtp/proto/v3 v3.29.1-0.20231019163152-2a17d00f45f4
github.com/yoheimuta/protolint v0.39.0
go.uber.org/zap v1.24.0
golang.org/x/sync v0.3.0
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1142,6 +1142,12 @@ github.com/xmtp/go-msgio v0.2.1-0.20220510223757-25a701b79cd3 h1:wzUffJGCTBGXIDy
github.com/xmtp/go-msgio v0.2.1-0.20220510223757-25a701b79cd3/go.mod h1:bJREWk+NDnZYjgLQdAi8SUWuq/5pkMme4GqiffEhUF4=
github.com/xmtp/proto/v3 v3.27.0 h1:G70006UEffkCmWvp9G/7Dywosj1sLm9StR5HWEb891U=
github.com/xmtp/proto/v3 v3.27.0/go.mod h1:NF2zAjtNpVIhS4tFG19g4L1tJcPZHm81oeDFXltmOiY=
github.com/xmtp/proto/v3 v3.29.1-0.20231019020501-b49bc6ffb5eb h1:q2lR64lGFehm8m0FtcdRDMeH8MlkMyU4sz235+Ufq9E=
github.com/xmtp/proto/v3 v3.29.1-0.20231019020501-b49bc6ffb5eb/go.mod h1:NF2zAjtNpVIhS4tFG19g4L1tJcPZHm81oeDFXltmOiY=
github.com/xmtp/proto/v3 v3.29.1-0.20231019022514-1cc4b0d5a51a h1:kDEPyzhqQO9YdRAfvl21ysitvzWjdu4Ai8YCvHwqqbY=
github.com/xmtp/proto/v3 v3.29.1-0.20231019022514-1cc4b0d5a51a/go.mod h1:NF2zAjtNpVIhS4tFG19g4L1tJcPZHm81oeDFXltmOiY=
github.com/xmtp/proto/v3 v3.29.1-0.20231019163152-2a17d00f45f4 h1:Mxnc833msN9gX8DJyELd+E7oUJNHlhbIsZlTd88kg5M=
github.com/xmtp/proto/v3 v3.29.1-0.20231019163152-2a17d00f45f4/go.mod h1:NF2zAjtNpVIhS4tFG19g4L1tJcPZHm81oeDFXltmOiY=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU=
github.com/yoheimuta/go-protoparser/v4 v4.6.0 h1:uvz1e9/5Ihsm4Ku8AJeDImTpirKmIxubZdSn0QJNdnw=
github.com/yoheimuta/go-protoparser/v4 v4.6.0/go.mod h1:AHNNnSWnb0UoL4QgHPiOAg2BniQceFscPI5X/BZNHl8=
Expand Down
3 changes: 3 additions & 0 deletions pkg/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/pkg/errors"
wakunode "github.com/waku-org/go-waku/waku/v2/node"
"github.com/xmtp/xmtp-node-go/pkg/authz"
"github.com/xmtp/xmtp-node-go/pkg/mlsstore"
"github.com/xmtp/xmtp-node-go/pkg/ratelimiter"
"github.com/xmtp/xmtp-node-go/pkg/store"
"go.uber.org/zap"
Expand All @@ -25,6 +26,7 @@ type Options struct {
HTTPPort uint `long:"http-port" description:"API HTTP listening port" default:"5555"`
Authn AuthnOptions `group:"API Authentication Options" namespace:"authn"`
MaxMsgSize int `long:"max-msg-size" description:"Max message size in bytes (default 50MB)" default:"52428800"`
EnableMls bool `long:"enable-mls" description:"Enable the MLS server"`
}

type Config struct {
Expand All @@ -33,6 +35,7 @@ type Config struct {
Waku *wakunode.WakuNode
Log *zap.Logger
Store *store.Store
MlsStore mlsstore.MlsStore
}

// AuthnOptions bundle command line options associated with the authn package.
Expand Down
73 changes: 73 additions & 0 deletions pkg/api/message/v3/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package api

import (
"context"

wakunode "github.com/waku-org/go-waku/waku/v2/node"
proto "github.com/xmtp/proto/v3/go/message_api/v3"
"github.com/xmtp/xmtp-node-go/pkg/mlsstore"
"github.com/xmtp/xmtp-node-go/pkg/store"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
emptypb "google.golang.org/protobuf/types/known/emptypb"
)

type Service struct {
proto.UnimplementedMlsApiServer

log *zap.Logger
waku *wakunode.WakuNode
messageStore *store.Store
mlsStore mlsstore.MlsStore

ctx context.Context
ctxCancel func()
}

func NewService(node *wakunode.WakuNode, logger *zap.Logger, messageStore *store.Store, mlsStore mlsstore.MlsStore) (s *Service, err error) {
s = &Service{
log: logger.Named("message/v3"),
waku: node,
messageStore: messageStore,
mlsStore: mlsStore,
}

s.ctx, s.ctxCancel = context.WithCancel(context.Background())

return s, nil
}

func (s *Service) Close() {
if s.ctxCancel != nil {
s.ctxCancel()
}
}

func (s *Service) RegisterInstallation(ctx context.Context, req *proto.RegisterInstallationRequest) (*proto.RegisterInstallationResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "unimplemented")
}

func (s *Service) ConsumeKeyPackages(ctx context.Context, req *proto.ConsumeKeyPackagesRequest) (*proto.ConsumeKeyPackagesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "unimplemented")
}

func (s *Service) PublishToGroup(ctx context.Context, req *proto.PublishToGroupRequest) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "unimplemented")
}

func (s *Service) PublishWelcomes(ctx context.Context, req *proto.PublishWelcomesRequest) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "unimplemented")
}

func (s *Service) UploadKeyPackages(ctx context.Context, req *proto.UploadKeyPackagesRequest) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "unimplemented")
}

func (s *Service) RevokeInstallation(ctx context.Context, req *proto.RevokeInstallationRequest) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "unimplemented")
}

func (s *Service) GetIdentityUpdates(ctx context.Context, req *proto.GetIdentityUpdatesRequest) (*proto.GetIdentityUpdatesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "unimplemented")
}
19 changes: 19 additions & 0 deletions pkg/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/pkg/errors"
swgui "github.com/swaggest/swgui/v3"
proto "github.com/xmtp/proto/v3/go/message_api/v1"
v3Proto "github.com/xmtp/proto/v3/go/message_api/v3"
messagev1openapi "github.com/xmtp/proto/v3/openapi/message_api/v1"
"github.com/xmtp/xmtp-node-go/pkg/ratelimiter"
"github.com/xmtp/xmtp-node-go/pkg/tracing"
Expand All @@ -27,6 +28,7 @@ import (

messagev1 "github.com/xmtp/xmtp-node-go/pkg/api/message/v1"
apicontext "github.com/xmtp/xmtp-node-go/pkg/api/message/v1/context"
messagev3 "github.com/xmtp/xmtp-node-go/pkg/api/message/v3"
)

const (
Expand All @@ -43,6 +45,7 @@ type Server struct {
grpcListener net.Listener
httpListener net.Listener
messagev1 *messagev1.Service
messagev3 *messagev3.Service
wg sync.WaitGroup
ctx context.Context

Expand Down Expand Up @@ -125,6 +128,15 @@ func (s *Server) startGRPC() error {
return errors.Wrap(err, "creating message service")
}
proto.RegisterMessageApiServer(grpcServer, s.messagev1)

// Enable the MLS server if a store is provided
if s.Config.MlsStore != nil && s.Config.EnableMls {
s.messagev3, err = messagev3.NewService(s.Waku, s.Log, s.Store)
if err != nil {
return errors.Wrap(err, "creating mls service")
}
v3Proto.RegisterMlsApiServer(grpcServer, s.messagev3)
}
prometheus.Register(grpcServer)

tracing.GoPanicWrap(s.ctx, &s.wg, "grpc", func(ctx context.Context) {
Expand Down Expand Up @@ -171,6 +183,13 @@ func (s *Server) startHTTP() error {
return errors.Wrap(err, "registering message handler")
}

if s.Config.MlsStore != nil && s.Config.EnableMls {
err = v3Proto.RegisterMlsApiHandler(s.ctx, gwmux, conn)
if err != nil {
return errors.Wrap(err, "registering mls handler")
}
}

addr := addrString(s.HTTPAddress, s.HTTPPort)
s.httpListener, err = net.Listen("tcp", addr)
if err != nil {
Expand Down
20 changes: 20 additions & 0 deletions pkg/mlsstore/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package mlsstore

import (
"time"

"github.com/uptrace/bun"
"go.uber.org/zap"
)

type MlsOptions struct {
DbConnectionString string `long:"db-connection-string" description:"Connection string for MLS DB"`
ReadTimeout time.Duration `long:"read-timeout" description:"Timeout for reading from the database" default:"10s"`
WriteTimeout time.Duration `long:"write-timeout" description:"Timeout for writing to the database" default:"10s"`
MaxOpenConns int `long:"max-open-conns" description:"Maximum number of open connections" default:"80"`
}

type Config struct {
Log *zap.Logger
DB *bun.DB
}
28 changes: 28 additions & 0 deletions pkg/mlsstore/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package mlsstore

import (
"context"

"github.com/uptrace/bun"
"go.uber.org/zap"
)

type Store struct {
ctx context.Context
cancel context.CancelFunc
log *zap.Logger
db *bun.DB
}

type MlsStore interface {
}

func New(config Config) (*Store, error) {
s := &Store{
log: config.Log.Named("mlsstore"),
db: config.DB,
}
s.ctx, s.cancel = context.WithCancel(context.Background())

return s, nil
}
16 changes: 9 additions & 7 deletions pkg/server/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

"github.com/xmtp/xmtp-node-go/pkg/api"
"github.com/xmtp/xmtp-node-go/pkg/mlsstore"
"github.com/xmtp/xmtp-node-go/pkg/store"
)

Expand Down Expand Up @@ -66,11 +67,12 @@ type Options struct {
GoProfiling bool `long:"go-profiling" description:"Enable Go profiling"`
MetricsPeriod time.Duration `long:"metrics-period" description:"Polling period for server status metrics" default:"30s"`

API api.Options `group:"API Options" namespace:"api"`
Authz AuthzOptions `group:"Authz Options"`
Relay RelayOptions `group:"Relay Options"`
Store store.Options `group:"Store Options" namespace:"store"`
Metrics MetricsOptions `group:"Metrics Options"`
Tracing TracingOptions `group:"DD APM Tracing Options"`
Profiling ProfilingOptions `group:"DD APM Profiling Options" namespace:"profiling"`
API api.Options `group:"API Options" namespace:"api"`
Authz AuthzOptions `group:"Authz Options"`
Relay RelayOptions `group:"Relay Options"`
Store store.Options `group:"Store Options" namespace:"store"`
Metrics MetricsOptions `group:"Metrics Options"`
Tracing TracingOptions `group:"DD APM Tracing Options"`
Profiling ProfilingOptions `group:"DD APM Profiling Options" namespace:"profiling"`
MlsStore mlsstore.StoreOptions `group:"MLS Options" namespace:"mlsstore"`
}
15 changes: 15 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/xmtp/xmtp-node-go/pkg/metrics"
authzmigrations "github.com/xmtp/xmtp-node-go/pkg/migrations/authz"
messagemigrations "github.com/xmtp/xmtp-node-go/pkg/migrations/messages"
"github.com/xmtp/xmtp-node-go/pkg/mlsstore"
xmtpstore "github.com/xmtp/xmtp-node-go/pkg/store"
"github.com/xmtp/xmtp-node-go/pkg/tracing"
"go.uber.org/zap"
Expand Down Expand Up @@ -225,13 +226,27 @@ func New(ctx context.Context, log *zap.Logger, options Options) (*Server, error)
}
s.log.With(logging.MultiAddrs("listen", maddrs...)).Info("got server")

var mlsStore mlsstore.MlsStore

if options.MlsStore.DbConnectionString != "" {
mlsDb, err := createBunDB(options.MlsStore.DbConnectionString, options.WaitForDB, options.MlsStore.ReadTimeout, options.MlsStore.WriteTimeout, options.MlsStore.MaxOpenConns)
if err != nil {
return nil, errors.Wrap(err, "creating mls db")
}
mlsStore, err = mlsstore.New(mlsstore.Config{
Log: s.log,
DB: mlsDb,
})
}

// Initialize gRPC server.
s.grpc, err = api.New(
&api.Config{
Options: options.API,
Log: s.log.Named("api"),
Waku: s.wakuNode,
Store: s.store,
MlsStore: mlsStore,
AllowLister: s.allowLister,
},
)
Expand Down

0 comments on commit 94460bc

Please sign in to comment.