Skip to content

Commit

Permalink
feat: implement mls subscribe group/welcome messages
Browse files Browse the repository at this point in the history
  • Loading branch information
Steven Normore committed Jan 12, 2024
1 parent 85f5f24 commit b3c3840
Show file tree
Hide file tree
Showing 8 changed files with 365 additions and 109 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/uptrace/bun/driver/pgdriver v1.1.16
github.com/waku-org/go-waku v0.8.0
github.com/xmtp/go-msgio v0.2.1-0.20220510223757-25a701b79cd3
github.com/xmtp/proto/v3 v3.37.1-0.20240112031043-fd75b4bf81f8
github.com/xmtp/proto/v3 v3.37.1-0.20240112125235-f02fe8d0f1a0
github.com/yoheimuta/protolint v0.39.0
go.uber.org/zap v1.24.0
golang.org/x/sync v0.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1146,8 +1146,8 @@ github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0
github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg=
github.com/xmtp/go-msgio v0.2.1-0.20220510223757-25a701b79cd3 h1:wzUffJGCTBGXIDyNU+1UBu1fn2Nzo+OQzM1pLrheh58=
github.com/xmtp/go-msgio v0.2.1-0.20220510223757-25a701b79cd3/go.mod h1:bJREWk+NDnZYjgLQdAi8SUWuq/5pkMme4GqiffEhUF4=
github.com/xmtp/proto/v3 v3.37.1-0.20240112031043-fd75b4bf81f8 h1:r7KYIg8OtDLDHGwlEHo5SiOkEM86C33DHtZsQ9B0pKM=
github.com/xmtp/proto/v3 v3.37.1-0.20240112031043-fd75b4bf81f8/go.mod h1:NF2zAjtNpVIhS4tFG19g4L1tJcPZHm81oeDFXltmOiY=
github.com/xmtp/proto/v3 v3.37.1-0.20240112125235-f02fe8d0f1a0 h1:eGNiXDTiXcXTf5ne4HACbqbHaQrVlRz2hwcn05E7v8U=
github.com/xmtp/proto/v3 v3.37.1-0.20240112125235-f02fe8d0f1a0/go.mod h1:NF2zAjtNpVIhS4tFG19g4L1tJcPZHm81oeDFXltmOiY=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU=
github.com/yoheimuta/go-protoparser/v4 v4.6.0 h1:uvz1e9/5Ihsm4Ku8AJeDImTpirKmIxubZdSn0QJNdnw=
github.com/yoheimuta/go-protoparser/v4 v4.6.0/go.mod h1:AHNNnSWnb0UoL4QgHPiOAg2BniQceFscPI5X/BZNHl8=
Expand Down
67 changes: 24 additions & 43 deletions pkg/api/message/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,13 @@ import (
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
"github.com/pkg/errors"
wakunode "github.com/waku-org/go-waku/waku/v2/node"
wakupb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
wakurelay "github.com/waku-org/go-waku/waku/v2/protocol/relay"
proto "github.com/xmtp/proto/v3/go/message_api/v1"
apicontext "github.com/xmtp/xmtp-node-go/pkg/api/message/v1/context"
"github.com/xmtp/xmtp-node-go/pkg/logging"
"github.com/xmtp/xmtp-node-go/pkg/metrics"
"github.com/xmtp/xmtp-node-go/pkg/store"
"github.com/xmtp/xmtp-node-go/pkg/topic"
"github.com/xmtp/xmtp-node-go/pkg/tracing"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
Expand All @@ -45,24 +42,24 @@ type Service struct {

// Configured as constructor options.
log *zap.Logger
waku *wakunode.WakuNode
store *store.Store

publishToWakuRelay func(context.Context, *wakupb.WakuMessage) error

// Configured internally.
ctx context.Context
ctxCancel func()
wg sync.WaitGroup
relaySub *wakurelay.Subscription

ns *server.Server
nc *nats.Conn
}

func NewService(node *wakunode.WakuNode, logger *zap.Logger, store *store.Store) (s *Service, err error) {
func NewService(log *zap.Logger, store *store.Store, publishToWakuRelay func(context.Context, *wakupb.WakuMessage) error) (s *Service, err error) {
s = &Service{
waku: node,
log: logger.Named("message/v1"),
store: store,
log: log.Named("message/v1"),
store: store,
publishToWakuRelay: publishToWakuRelay,
}
s.ctx, s.ctxCancel = context.WithCancel(context.Background())

Expand All @@ -82,44 +79,11 @@ func NewService(node *wakunode.WakuNode, logger *zap.Logger, store *store.Store)
return nil, err
}

// Initialize waku relay subscription.
s.relaySub, err = s.waku.Relay().Subscribe(s.ctx)
if err != nil {
return nil, errors.Wrap(err, "subscribing to relay")
}
tracing.GoPanicWrap(s.ctx, &s.wg, "broadcast", func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case wakuEnv := <-s.relaySub.Ch:
if wakuEnv == nil {
continue
}
env := buildEnvelope(wakuEnv.Message())

envB, err := pb.Marshal(env)
if err != nil {
s.log.Error("marshalling envelope", zap.Error(err))
continue
}
err = s.nc.Publish(buildNatsSubject(env.ContentTopic), envB)
if err != nil {
s.log.Error("publishing envelope to local nats", zap.Error(err))
continue
}
}
}
})

return s, nil
}

func (s *Service) Close() {
s.log.Info("closing")
if s.relaySub != nil {
s.relaySub.Unsubscribe()
}

if s.ctxCancel != nil {
s.ctxCancel()
Expand All @@ -136,6 +100,22 @@ func (s *Service) Close() {
s.log.Info("closed")
}

func (s *Service) HandleIncomingWakuRelayMessage(msg *wakupb.WakuMessage) error {
env := buildEnvelope(msg)

envB, err := pb.Marshal(env)
if err != nil {
return err
}

err = s.nc.Publish(buildNatsSubject(env.ContentTopic), envB)
if err != nil {
return err
}

return nil
}

func (s *Service) Publish(ctx context.Context, req *proto.PublishRequest) (*proto.PublishResponse, error) {
for _, env := range req.Envelopes {
log := s.log.Named("publish").With(zap.String("content_topic", env.ContentTopic))
Expand All @@ -156,14 +136,15 @@ func (s *Service) Publish(ctx context.Context, req *proto.PublishRequest) (*prot
}
}

_, err := s.waku.Relay().Publish(ctx, &wakupb.WakuMessage{
err := s.publishToWakuRelay(ctx, &wakupb.WakuMessage{
ContentTopic: env.ContentTopic,
Timestamp: int64(env.TimestampNs),
Payload: env.Message,
})
if err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}

metrics.EmitPublishedEnvelope(ctx, log, env)
}
return &proto.PublishResponse{}, nil
Expand Down
59 changes: 56 additions & 3 deletions pkg/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ import (
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/pkg/errors"
swgui "github.com/swaggest/swgui/v3"
wakupb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
wakurelay "github.com/waku-org/go-waku/waku/v2/protocol/relay"
proto "github.com/xmtp/proto/v3/go/message_api/v1"
mlsv1pb "github.com/xmtp/proto/v3/go/mls/api/v1"
messagev1openapi "github.com/xmtp/proto/v3/openapi/message_api/v1"
"github.com/xmtp/xmtp-node-go/pkg/ratelimiter"
"github.com/xmtp/xmtp-node-go/pkg/topic"
"github.com/xmtp/xmtp-node-go/pkg/tracing"
"google.golang.org/grpc/health"
healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
Expand Down Expand Up @@ -48,6 +51,8 @@ type Server struct {
mlsv1 *mlsv1.Service
wg sync.WaitGroup
ctx context.Context
ctxCancel func()
wakuRelaySub *wakurelay.Subscription

authorizer *WalletAuthorizer
}
Expand All @@ -61,7 +66,7 @@ func New(config *Config) (*Server, error) {
Config: config,
}

s.ctx = context.Background()
s.ctx, s.ctxCancel = context.WithCancel(context.Background())

// Start gRPC services.
err := s.startGRPC()
Expand Down Expand Up @@ -123,20 +128,56 @@ func (s *Server) startGRPC() error {
healthcheck := health.NewServer()
healthgrpc.RegisterHealthServer(grpcServer, healthcheck)

s.messagev1, err = messagev1.NewService(s.Waku, s.Log, s.Store)
publishToWakuRelay := func(ctx context.Context, msg *wakupb.WakuMessage) error {
_, err := s.Waku.Relay().Publish(ctx, msg)
return err
}

s.messagev1, err = messagev1.NewService(s.Log, s.Store, publishToWakuRelay)
if err != nil {
return errors.Wrap(err, "creating message service")
}
proto.RegisterMessageApiServer(grpcServer, s.messagev1)

// Enable the MLS server if a store is provided
if s.Config.MLSStore != nil && s.Config.MLSValidator != nil && s.Config.EnableMls {
s.mlsv1, err = mlsv1.NewService(s.Waku, s.Log, s.Config.MLSStore, s.Config.MLSValidator)
s.mlsv1, err = mlsv1.NewService(s.Log, s.Config.MLSStore, s.Config.MLSValidator, publishToWakuRelay)
if err != nil {
return errors.Wrap(err, "creating mls service")
}
mlsv1pb.RegisterMlsApiServer(grpcServer, s.mlsv1)
}

// Initialize waku relay subscription.
s.wakuRelaySub, err = s.Waku.Relay().Subscribe(s.ctx)
if err != nil {
return errors.Wrap(err, "subscribing to relay")
}
tracing.GoPanicWrap(s.ctx, &s.wg, "broadcast", func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case wakuEnv := <-s.wakuRelaySub.Ch:
if wakuEnv == nil || wakuEnv.Message() == nil {
continue
}
wakuMsg := wakuEnv.Message()

if topic.IsMLSV1(wakuMsg.ContentTopic) {
if s.mlsv1 != nil {
s.mlsv1.HandleIncomingWakuRelayMessage(wakuEnv.Message())

Check failure on line 169 in pkg/api/server.go

View workflow job for this annotation

GitHub Actions / Lint

Error return value of `s.mlsv1.HandleIncomingWakuRelayMessage` is not checked (errcheck)
}
} else {
if s.messagev1 != nil {
s.messagev1.HandleIncomingWakuRelayMessage(wakuEnv.Message())

Check failure on line 173 in pkg/api/server.go

View workflow job for this annotation

GitHub Actions / Lint

Error return value of `s.messagev1.HandleIncomingWakuRelayMessage` is not checked (errcheck)
}
}

}
}
})

prometheus.Register(grpcServer)

tracing.GoPanicWrap(s.ctx, &s.wg, "grpc", func(ctx context.Context) {
Expand Down Expand Up @@ -215,9 +256,21 @@ func (s *Server) startHTTP() error {

func (s *Server) Close() {
s.Log.Info("closing")

if s.ctxCancel != nil {
s.ctxCancel()
}

if s.wakuRelaySub != nil {
s.wakuRelaySub.Unsubscribe()
}

if s.messagev1 != nil {
s.messagev1.Close()
}
if s.mlsv1 != nil {
s.mlsv1.Close()
}

if s.httpListener != nil {
err := s.httpListener.Close()
Expand Down
Loading

0 comments on commit b3c3840

Please sign in to comment.