diff --git a/dev/docker/docker-compose.yml b/dev/docker/docker-compose.yml index 6661f393..bb052823 100644 --- a/dev/docker/docker-compose.yml +++ b/dev/docker/docker-compose.yml @@ -11,6 +11,13 @@ services: POSTGRES_PASSWORD: xmtp ports: - 6543:5432 + mls-db: + image: postgres:13 + environment: + POSTGRES_PASSWORD: xmtp + ports: + - 7654:5432 + prometheus: image: prom/prometheus ports: diff --git a/go.mod b/go.mod index 412f191f..026e3958 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/uptrace/bun/driver/pgdriver v1.1.3 github.com/waku-org/go-waku v0.8.0 github.com/xmtp/go-msgio v0.2.1-0.20220510223757-25a701b79cd3 - github.com/xmtp/proto/v3 v3.27.0 + github.com/xmtp/proto/v3 v3.30.0 github.com/yoheimuta/protolint v0.39.0 go.uber.org/zap v1.24.0 golang.org/x/sync v0.3.0 diff --git a/go.sum b/go.sum index a977a834..165565fe 100644 --- a/go.sum +++ b/go.sum @@ -1142,6 +1142,14 @@ github.com/xmtp/go-msgio v0.2.1-0.20220510223757-25a701b79cd3 h1:wzUffJGCTBGXIDy github.com/xmtp/go-msgio v0.2.1-0.20220510223757-25a701b79cd3/go.mod h1:bJREWk+NDnZYjgLQdAi8SUWuq/5pkMme4GqiffEhUF4= github.com/xmtp/proto/v3 v3.27.0 h1:G70006UEffkCmWvp9G/7Dywosj1sLm9StR5HWEb891U= github.com/xmtp/proto/v3 v3.27.0/go.mod h1:NF2zAjtNpVIhS4tFG19g4L1tJcPZHm81oeDFXltmOiY= +github.com/xmtp/proto/v3 v3.29.1-0.20231019020501-b49bc6ffb5eb h1:q2lR64lGFehm8m0FtcdRDMeH8MlkMyU4sz235+Ufq9E= +github.com/xmtp/proto/v3 v3.29.1-0.20231019020501-b49bc6ffb5eb/go.mod h1:NF2zAjtNpVIhS4tFG19g4L1tJcPZHm81oeDFXltmOiY= +github.com/xmtp/proto/v3 v3.29.1-0.20231019022514-1cc4b0d5a51a h1:kDEPyzhqQO9YdRAfvl21ysitvzWjdu4Ai8YCvHwqqbY= +github.com/xmtp/proto/v3 v3.29.1-0.20231019022514-1cc4b0d5a51a/go.mod h1:NF2zAjtNpVIhS4tFG19g4L1tJcPZHm81oeDFXltmOiY= +github.com/xmtp/proto/v3 v3.29.1-0.20231019163152-2a17d00f45f4 h1:Mxnc833msN9gX8DJyELd+E7oUJNHlhbIsZlTd88kg5M= +github.com/xmtp/proto/v3 v3.29.1-0.20231019163152-2a17d00f45f4/go.mod h1:NF2zAjtNpVIhS4tFG19g4L1tJcPZHm81oeDFXltmOiY= +github.com/xmtp/proto/v3 v3.30.0 h1:x6LGCWpO2HTQNhUiTXfE0l+u2HSL3Z35p41xhgy6hlw= +github.com/xmtp/proto/v3 v3.30.0/go.mod h1:NF2zAjtNpVIhS4tFG19g4L1tJcPZHm81oeDFXltmOiY= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= github.com/yoheimuta/go-protoparser/v4 v4.6.0 h1:uvz1e9/5Ihsm4Ku8AJeDImTpirKmIxubZdSn0QJNdnw= github.com/yoheimuta/go-protoparser/v4 v4.6.0/go.mod h1:AHNNnSWnb0UoL4QgHPiOAg2BniQceFscPI5X/BZNHl8= diff --git a/pkg/api/config.go b/pkg/api/config.go index 96cf8ea7..f61d8ff4 100644 --- a/pkg/api/config.go +++ b/pkg/api/config.go @@ -7,6 +7,7 @@ import ( "github.com/pkg/errors" wakunode "github.com/waku-org/go-waku/waku/v2/node" "github.com/xmtp/xmtp-node-go/pkg/authz" + "github.com/xmtp/xmtp-node-go/pkg/mlsstore" "github.com/xmtp/xmtp-node-go/pkg/ratelimiter" "github.com/xmtp/xmtp-node-go/pkg/store" "go.uber.org/zap" @@ -25,6 +26,7 @@ type Options struct { HTTPPort uint `long:"http-port" description:"API HTTP listening port" default:"5555"` Authn AuthnOptions `group:"API Authentication Options" namespace:"authn"` MaxMsgSize int `long:"max-msg-size" description:"Max message size in bytes (default 50MB)" default:"52428800"` + EnableMls bool `long:"enable-mls" description:"Enable the MLS server"` } type Config struct { @@ -33,6 +35,7 @@ type Config struct { Waku *wakunode.WakuNode Log *zap.Logger Store *store.Store + MlsStore mlsstore.MlsStore } // AuthnOptions bundle command line options associated with the authn package. diff --git a/pkg/api/message/v3/service.go b/pkg/api/message/v3/service.go new file mode 100644 index 00000000..54df462b --- /dev/null +++ b/pkg/api/message/v3/service.go @@ -0,0 +1,73 @@ +package api + +import ( + "context" + + wakunode "github.com/waku-org/go-waku/waku/v2/node" + proto "github.com/xmtp/proto/v3/go/message_api/v3" + "github.com/xmtp/xmtp-node-go/pkg/mlsstore" + "github.com/xmtp/xmtp-node-go/pkg/store" + "go.uber.org/zap" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +type Service struct { + proto.UnimplementedMlsApiServer + + log *zap.Logger + waku *wakunode.WakuNode + messageStore *store.Store + mlsStore mlsstore.MlsStore + + ctx context.Context + ctxCancel func() +} + +func NewService(node *wakunode.WakuNode, logger *zap.Logger, messageStore *store.Store, mlsStore mlsstore.MlsStore) (s *Service, err error) { + s = &Service{ + log: logger.Named("message/v3"), + waku: node, + messageStore: messageStore, + mlsStore: mlsStore, + } + + s.ctx, s.ctxCancel = context.WithCancel(context.Background()) + + return s, nil +} + +func (s *Service) Close() { + if s.ctxCancel != nil { + s.ctxCancel() + } +} + +func (s *Service) RegisterInstallation(ctx context.Context, req *proto.RegisterInstallationRequest) (*proto.RegisterInstallationResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "unimplemented") +} + +func (s *Service) ConsumeKeyPackages(ctx context.Context, req *proto.ConsumeKeyPackagesRequest) (*proto.ConsumeKeyPackagesResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "unimplemented") +} + +func (s *Service) PublishToGroup(ctx context.Context, req *proto.PublishToGroupRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "unimplemented") +} + +func (s *Service) PublishWelcomes(ctx context.Context, req *proto.PublishWelcomesRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "unimplemented") +} + +func (s *Service) UploadKeyPackages(ctx context.Context, req *proto.UploadKeyPackagesRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "unimplemented") +} + +func (s *Service) RevokeInstallation(ctx context.Context, req *proto.RevokeInstallationRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "unimplemented") +} + +func (s *Service) GetIdentityUpdates(ctx context.Context, req *proto.GetIdentityUpdatesRequest) (*proto.GetIdentityUpdatesResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "unimplemented") +} diff --git a/pkg/api/server.go b/pkg/api/server.go index 7c6e5990..e7f1b643 100644 --- a/pkg/api/server.go +++ b/pkg/api/server.go @@ -15,6 +15,7 @@ import ( "github.com/pkg/errors" swgui "github.com/swaggest/swgui/v3" proto "github.com/xmtp/proto/v3/go/message_api/v1" + v3Proto "github.com/xmtp/proto/v3/go/message_api/v3" messagev1openapi "github.com/xmtp/proto/v3/openapi/message_api/v1" "github.com/xmtp/xmtp-node-go/pkg/ratelimiter" "github.com/xmtp/xmtp-node-go/pkg/tracing" @@ -27,6 +28,7 @@ import ( messagev1 "github.com/xmtp/xmtp-node-go/pkg/api/message/v1" apicontext "github.com/xmtp/xmtp-node-go/pkg/api/message/v1/context" + messagev3 "github.com/xmtp/xmtp-node-go/pkg/api/message/v3" ) const ( @@ -43,6 +45,7 @@ type Server struct { grpcListener net.Listener httpListener net.Listener messagev1 *messagev1.Service + messagev3 *messagev3.Service wg sync.WaitGroup ctx context.Context @@ -125,6 +128,15 @@ func (s *Server) startGRPC() error { return errors.Wrap(err, "creating message service") } proto.RegisterMessageApiServer(grpcServer, s.messagev1) + + // Enable the MLS server if a store is provided + if s.Config.MlsStore != nil && s.Config.EnableMls { + s.messagev3, err = messagev3.NewService(s.Waku, s.Log, s.Store, s.Config.MlsStore) + if err != nil { + return errors.Wrap(err, "creating mls service") + } + v3Proto.RegisterMlsApiServer(grpcServer, s.messagev3) + } prometheus.Register(grpcServer) tracing.GoPanicWrap(s.ctx, &s.wg, "grpc", func(ctx context.Context) { @@ -171,6 +183,13 @@ func (s *Server) startHTTP() error { return errors.Wrap(err, "registering message handler") } + if s.Config.MlsStore != nil && s.Config.EnableMls { + err = v3Proto.RegisterMlsApiHandler(s.ctx, gwmux, conn) + if err != nil { + return errors.Wrap(err, "registering mls handler") + } + } + addr := addrString(s.HTTPAddress, s.HTTPPort) s.httpListener, err = net.Listen("tcp", addr) if err != nil { diff --git a/pkg/mlsstore/config.go b/pkg/mlsstore/config.go new file mode 100644 index 00000000..521ec282 --- /dev/null +++ b/pkg/mlsstore/config.go @@ -0,0 +1,20 @@ +package mlsstore + +import ( + "time" + + "github.com/uptrace/bun" + "go.uber.org/zap" +) + +type StoreOptions struct { + DbConnectionString string `long:"db-connection-string" description:"Connection string for MLS DB"` + ReadTimeout time.Duration `long:"read-timeout" description:"Timeout for reading from the database" default:"10s"` + WriteTimeout time.Duration `long:"write-timeout" description:"Timeout for writing to the database" default:"10s"` + MaxOpenConns int `long:"max-open-conns" description:"Maximum number of open connections" default:"80"` +} + +type Config struct { + Log *zap.Logger + DB *bun.DB +} diff --git a/pkg/mlsstore/store.go b/pkg/mlsstore/store.go new file mode 100644 index 00000000..4dcfd277 --- /dev/null +++ b/pkg/mlsstore/store.go @@ -0,0 +1,28 @@ +package mlsstore + +import ( + "context" + + "github.com/uptrace/bun" + "go.uber.org/zap" +) + +type Store struct { + ctx context.Context + cancel context.CancelFunc + log *zap.Logger + db *bun.DB +} + +type MlsStore interface { +} + +func New(config Config) (*Store, error) { + s := &Store{ + log: config.Log.Named("mlsstore"), + db: config.DB, + } + s.ctx, s.cancel = context.WithCancel(context.Background()) + + return s, nil +} diff --git a/pkg/server/options.go b/pkg/server/options.go index e9814617..40041f4d 100644 --- a/pkg/server/options.go +++ b/pkg/server/options.go @@ -4,6 +4,7 @@ import ( "time" "github.com/xmtp/xmtp-node-go/pkg/api" + "github.com/xmtp/xmtp-node-go/pkg/mlsstore" "github.com/xmtp/xmtp-node-go/pkg/store" ) @@ -66,11 +67,12 @@ type Options struct { GoProfiling bool `long:"go-profiling" description:"Enable Go profiling"` MetricsPeriod time.Duration `long:"metrics-period" description:"Polling period for server status metrics" default:"30s"` - API api.Options `group:"API Options" namespace:"api"` - Authz AuthzOptions `group:"Authz Options"` - Relay RelayOptions `group:"Relay Options"` - Store store.Options `group:"Store Options" namespace:"store"` - Metrics MetricsOptions `group:"Metrics Options"` - Tracing TracingOptions `group:"DD APM Tracing Options"` - Profiling ProfilingOptions `group:"DD APM Profiling Options" namespace:"profiling"` + API api.Options `group:"API Options" namespace:"api"` + Authz AuthzOptions `group:"Authz Options"` + Relay RelayOptions `group:"Relay Options"` + Store store.Options `group:"Store Options" namespace:"store"` + Metrics MetricsOptions `group:"Metrics Options"` + Tracing TracingOptions `group:"DD APM Tracing Options"` + Profiling ProfilingOptions `group:"DD APM Profiling Options" namespace:"profiling"` + MlsStore mlsstore.StoreOptions `group:"MLS Options" namespace:"mlsstore"` } diff --git a/pkg/server/server.go b/pkg/server/server.go index 3d9ed5f5..adf2ca53 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -37,6 +37,7 @@ import ( "github.com/xmtp/xmtp-node-go/pkg/metrics" authzmigrations "github.com/xmtp/xmtp-node-go/pkg/migrations/authz" messagemigrations "github.com/xmtp/xmtp-node-go/pkg/migrations/messages" + "github.com/xmtp/xmtp-node-go/pkg/mlsstore" xmtpstore "github.com/xmtp/xmtp-node-go/pkg/store" "github.com/xmtp/xmtp-node-go/pkg/tracing" "go.uber.org/zap" @@ -225,6 +226,23 @@ func New(ctx context.Context, log *zap.Logger, options Options) (*Server, error) } s.log.With(logging.MultiAddrs("listen", maddrs...)).Info("got server") + var mlsStore mlsstore.MlsStore + + if options.MlsStore.DbConnectionString != "" { + mlsDb, err := createBunDB(options.MlsStore.DbConnectionString, options.WaitForDB, options.MlsStore.ReadTimeout, options.MlsStore.WriteTimeout, options.MlsStore.MaxOpenConns) + if err != nil { + return nil, errors.Wrap(err, "creating mls db") + } + + mlsStore, err = mlsstore.New(mlsstore.Config{ + Log: s.log, + DB: mlsDb, + }) + if err != nil { + return nil, errors.Wrap(err, "creating mls store") + } + } + // Initialize gRPC server. s.grpc, err = api.New( &api.Config{ @@ -232,6 +250,7 @@ func New(ctx context.Context, log *zap.Logger, options Options) (*Server, error) Log: s.log.Named("api"), Waku: s.wakuNode, Store: s.store, + MlsStore: mlsStore, AllowLister: s.allowLister, }, )