Skip to content

Commit

Permalink
Decouple message store from waku
Browse files Browse the repository at this point in the history
  • Loading branch information
snormore committed Sep 2, 2023
1 parent d3e9121 commit f8a8b6f
Show file tree
Hide file tree
Showing 30 changed files with 599 additions and 2,400 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ LICENSE*
tests
examples
*.db
bin
2 changes: 1 addition & 1 deletion dev/docker/up
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ set -e
. dev/docker/env

docker_compose build
docker_compose up -d
docker_compose up -d --remove-orphans
8 changes: 4 additions & 4 deletions dev/e2e/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ services:
- 18008:8008
restart: always
command:
- --store
- --store.enable
- --metrics
- --metrics-address=0.0.0.0
- --ws
Expand Down Expand Up @@ -58,7 +58,7 @@ services:
- 25555:5555
restart: always
command:
- --store
- --store.enable
- --metrics
- --metrics-address=0.0.0.0
- --ws
Expand Down Expand Up @@ -90,7 +90,7 @@ services:
- 35555:5555
restart: always
command:
- --store
- --store.enable
- --metrics
- --metrics-address=0.0.0.0
- --ws
Expand Down Expand Up @@ -122,7 +122,7 @@ services:
- 45555:5555
restart: always
command:
- --store
- --store.enable
- --metrics
- --metrics-address=0.0.0.0
- --ws
Expand Down
2 changes: 1 addition & 1 deletion dev/e2e/docker/up
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ set -e
export GO_VERSION="$(go list -f "{{.GoVersion}}" -m)"

docker_compose build
docker_compose up -d
docker_compose up -d --remove-orphans
8 changes: 5 additions & 3 deletions dev/run
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ AUTHZ_DB_DSN="postgres://postgres:xmtp@localhost:15432/postgres?sslmode=disable"
NODE_KEY="8a30dcb604b0b53627a5adc054dbf434b446628d4bd1eccc681d223f0550ce67"

go run cmd/xmtpd/main.go \
--message-db-connection-string "${MESSAGE_DB_DSN}" \
--message-db-reader-connection-string "${MESSAGE_DB_DSN}" \
--authz-db-connection-string "${AUTHZ_DB_DSN}" \
--nodekey "${NODE_KEY}" \
--metrics \
--metrics-period 5s \
--store.enable \
--store.db-connection-string "${MESSAGE_DB_DSN}" \
--store.reader-db-connection-string "${MESSAGE_DB_DSN}" \
--store.metrics-period 5s \
--authz-db-connection-string "${AUTHZ_DB_DSN}" \
--go-profiling \
"$@"
1 change: 0 additions & 1 deletion dev/start
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ set -e
PORT="${PORT:-9002}"

dev/run \
--store \
--filter \
--lightpush \
--ws \
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ require (
github.com/libp2p/go-libp2p v0.20.2
github.com/libp2p/go-libp2p-core v0.16.1
github.com/libp2p/go-libp2p-pubsub v0.6.1
github.com/libp2p/go-msgio v0.2.0
github.com/mattn/go-sqlite3 v1.14.13
github.com/multiformats/go-multiaddr v0.5.0
github.com/nats-io/nats-server/v2 v2.1.2
Expand Down Expand Up @@ -114,6 +113,7 @@ require (
github.com/libp2p/go-libp2p-peerstore v0.6.0 // indirect
github.com/libp2p/go-libp2p-resource-manager v0.3.0 // indirect
github.com/libp2p/go-mplex v0.7.0 // indirect
github.com/libp2p/go-msgio v0.2.0 // indirect
github.com/libp2p/go-nat v0.1.0 // indirect
github.com/libp2p/go-netroute v0.2.0 // indirect
github.com/libp2p/go-openssl v0.0.7 // indirect
Expand Down
12 changes: 9 additions & 3 deletions pkg/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
wakunode "github.com/status-im/go-waku/waku/v2/node"
"github.com/xmtp/xmtp-node-go/pkg/authz"
"github.com/xmtp/xmtp-node-go/pkg/ratelimiter"
"github.com/xmtp/xmtp-node-go/pkg/store"
"go.uber.org/zap"
)

var (
ErrMissingLog = errors.New("missing log config")
ErrMissingWaku = errors.New("missing waku config")
ErrMissingLog = errors.New("missing log config")
ErrMissingWaku = errors.New("missing waku config")
ErrMissingStore = errors.New("missing store config")
)

type Options struct {
Expand All @@ -30,6 +32,7 @@ type Config struct {
AllowLister authz.WalletAllowLister
Waku *wakunode.WakuNode
Log *zap.Logger
Store *store.Store
}

// Options bundle command line options associated with the authn package.
Expand All @@ -49,13 +52,16 @@ type AuthnConfig struct {
Log *zap.Logger
}

func (params *Config) check() error {
func (params *Config) validate() error {
if params.Log == nil {
return ErrMissingLog
}
if params.Waku == nil {
return ErrMissingWaku
}
if params.Store == nil {
return ErrMissingStore
}
if err := validateAddr(params.HTTPAddress, params.HTTPPort); err != nil {
return errors.Wrap(err, "Invalid HTTP Address")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (wa *WalletAuthorizer) Stream() grpc.StreamServerInterceptor {

func (wa *WalletAuthorizer) isProtocolVersion3(request *messagev1.PublishRequest) bool {
envelopes := request.Envelopes
if envelopes == nil || len(envelopes) == 0 {
if len(envelopes) == 0 {
return false
}
// If any of the envelopes are not for a v3 topic, then we treat the request as non-v3
Expand Down
124 changes: 14 additions & 110 deletions pkg/api/message/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ type Service struct {
proto.UnimplementedMessageApiServer

// Configured as constructor options.
log *zap.Logger
waku *wakunode.WakuNode
log *zap.Logger
waku *wakunode.WakuNode
store *store.Store

// Configured internally.
ctx context.Context
Expand All @@ -55,10 +56,11 @@ type Service struct {
nc *nats.Conn
}

func NewService(node *wakunode.WakuNode, logger *zap.Logger) (s *Service, err error) {
func NewService(node *wakunode.WakuNode, logger *zap.Logger, store *store.Store) (s *Service, err error) {
s = &Service{
waku: node,
log: logger.Named("message/v1"),
waku: node,
log: logger.Named("message/v1"),
store: store,
}
s.ctx, s.ctxCancel = context.WithCancel(context.Background())

Expand Down Expand Up @@ -141,29 +143,22 @@ func (s *Service) Publish(ctx context.Context, req *proto.PublishRequest) (*prot
return nil, status.Errorf(codes.InvalidArgument, "topic length too big")
}

wakuMsg := &wakupb.WakuMessage{
ContentTopic: env.ContentTopic,
Timestamp: toWakuTimestamp(env.TimestampNs),
Payload: env.Message,
}

if len(env.Message) > MaxMessageSize {
return nil, status.Errorf(codes.InvalidArgument, "message too big")
}

store, ok := s.waku.Store().(*store.XmtpStore)
if !ok {
return nil, status.Errorf(codes.Internal, "waku store not xmtp store")
}

if !topic.IsEphemeral(env.ContentTopic) {
_, err := store.InsertMessage(wakuMsg)
_, err := s.store.InsertMessage(env)
if err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}
}

_, err := s.waku.Relay().Publish(ctx, wakuMsg)
_, err := s.waku.Relay().Publish(ctx, &wakupb.WakuMessage{
ContentTopic: env.ContentTopic,
Timestamp: int64(env.TimestampNs),
Payload: env.Message,
})
if err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}
Expand Down Expand Up @@ -361,31 +356,7 @@ func (s *Service) Query(ctx context.Context, req *proto.QueryRequest) (*proto.Qu
}
}

store, ok := s.waku.Store().(*store.XmtpStore)
if !ok {
return nil, status.Errorf(codes.Internal, "waku store not xmtp store")
}
start := time.Now()
res, err := store.FindMessages(buildWakuQuery(req))
duration := time.Since(start)
if err != nil {
metrics.EmitQuery(ctx, req, 0, err, duration)
return nil, status.Errorf(codes.Internal, err.Error())
}
metrics.EmitQuery(ctx, req, len(res.Messages), nil, duration)
if duration > 10*time.Millisecond {
log.With(zap.Duration("duration", duration), zap.Int("results", len(res.Messages))).Info("slow query")
}

envs := make([]*proto.Envelope, 0, len(res.Messages))
for _, msg := range res.Messages {
envs = append(envs, buildEnvelope(msg))
}

return &proto.QueryResponse{
Envelopes: envs,
PagingInfo: buildPagingInfo(res.PagingInfo),
}, nil
return s.store.Query(req)
}

func (s *Service) BatchQuery(ctx context.Context, req *proto.BatchQueryRequest) (*proto.BatchQueryResponse, error) {
Expand Down Expand Up @@ -423,69 +394,6 @@ func buildEnvelope(msg *wakupb.WakuMessage) *proto.Envelope {
}
}

func buildWakuQuery(req *proto.QueryRequest) *wakupb.HistoryQuery {
contentFilters := []*wakupb.ContentFilter{}
for _, contentTopic := range req.ContentTopics {
if contentTopic != "" {
contentFilters = append(contentFilters, &wakupb.ContentFilter{
ContentTopic: contentTopic,
})
}
}

return &wakupb.HistoryQuery{
ContentFilters: contentFilters,
StartTime: toWakuTimestamp(req.StartTimeNs),
EndTime: toWakuTimestamp(req.EndTimeNs),
PagingInfo: buildWakuPagingInfo(req.PagingInfo),
}
}

func buildPagingInfo(pi *wakupb.PagingInfo) *proto.PagingInfo {
if pi == nil {
return nil
}
var pagingInfo proto.PagingInfo
pagingInfo.Limit = uint32(pi.PageSize)
switch pi.Direction {
case wakupb.PagingInfo_BACKWARD:
pagingInfo.Direction = proto.SortDirection_SORT_DIRECTION_DESCENDING
case wakupb.PagingInfo_FORWARD:
pagingInfo.Direction = proto.SortDirection_SORT_DIRECTION_ASCENDING
}
if index := pi.Cursor; index != nil {
pagingInfo.Cursor = &proto.Cursor{
Cursor: &proto.Cursor_Index{
Index: &proto.IndexCursor{
Digest: index.Digest,
SenderTimeNs: uint64(index.SenderTime),
}}}
}
return &pagingInfo
}

func buildWakuPagingInfo(pi *proto.PagingInfo) *wakupb.PagingInfo {
if pi == nil {
return nil
}
pagingInfo := &wakupb.PagingInfo{
PageSize: uint64(pi.Limit),
}
switch pi.Direction {
case proto.SortDirection_SORT_DIRECTION_ASCENDING:
pagingInfo.Direction = wakupb.PagingInfo_FORWARD
case proto.SortDirection_SORT_DIRECTION_DESCENDING:
pagingInfo.Direction = wakupb.PagingInfo_BACKWARD
}
if ic := pi.Cursor.GetIndex(); ic != nil {
pagingInfo.Cursor = &wakupb.Index{
Digest: ic.Digest,
SenderTime: toWakuTimestamp(ic.SenderTimeNs),
}
}
return pagingInfo
}

func isValidSubscribeAllTopic(topic string) bool {
return strings.HasPrefix(topic, validXMTPTopicPrefix)
}
Expand All @@ -496,7 +404,3 @@ func fromWakuTimestamp(ts int64) uint64 {
}
return uint64(ts)
}

func toWakuTimestamp(ts uint64) int64 {
return int64(ts)
}
4 changes: 2 additions & 2 deletions pkg/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type Server struct {
}

func New(config *Config) (*Server, error) {
if err := config.check(); err != nil {
if err := config.validate(); err != nil {
return nil, err
}

Expand Down Expand Up @@ -107,7 +107,7 @@ func (s *Server) startGRPC() error {
healthcheck := health.NewServer()
healthgrpc.RegisterHealthServer(grpcServer, healthcheck)

s.messagev1, err = messagev1.NewService(s.Waku, s.Log)
s.messagev1, err = messagev1.NewService(s.Waku, s.Log, s.Store)
if err != nil {
return errors.Wrap(err, "creating message service")
}
Expand Down
Loading

0 comments on commit f8a8b6f

Please sign in to comment.