Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
tsachiherman committed Feb 16, 2024
1 parent 0e22ae9 commit 8c3a231
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 36 deletions.
9 changes: 5 additions & 4 deletions pkg/api/message/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,19 +82,20 @@ func NewService(log *zap.Logger, store *store.Store, publishToWakuRelay func(con
if err != nil {
return nil, err
}
s.subDispatcher, err = newSubscriptionDispatcher(s.nc, s.log)
if err != nil {
return nil, err
}
go s.ns.Start()
if !s.ns.ReadyForConnections(4 * time.Second) {
return nil, errors.New("nats not ready")
}

s.nc, err = nats.Connect(s.ns.ClientURL())
if err != nil {
return nil, err
}

s.subDispatcher, err = newSubscriptionDispatcher(s.nc, s.log)
if err != nil {
return nil, err
}
return s, nil
}

Expand Down
74 changes: 55 additions & 19 deletions pkg/api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"io"
"math/rand"
"net/http"
"strings"
"testing"
Expand All @@ -16,6 +17,7 @@ import (
messageV1 "github.com/xmtp/xmtp-node-go/pkg/proto/message_api/v1"
"github.com/xmtp/xmtp-node-go/pkg/ratelimiter"
test "github.com/xmtp/xmtp-node-go/pkg/testing"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -65,7 +67,7 @@ func Test_SubscribePublishQuery(t *testing.T) {
testGRPCAndHTTP(t, ctx, func(t *testing.T, client messageclient.Client, _ *Server) {
// start subscribe stream
stream, err := client.Subscribe(ctx, &messageV1.SubscribeRequest{
ContentTopics: []string{"topic"},
ContentTopics: []string{"/xmtp/0/topic"},
})
require.NoError(t, err)
defer stream.Close()
Expand Down Expand Up @@ -843,32 +845,66 @@ func requireErrorEqual(t *testing.T, err error, code codes.Code, msg string, det
}

func Benchmark_SubscribePublishQuery(b *testing.B) {
server, cleanup := newTestServerWithLog(b, zap.NewNop())
defer cleanup()

ctx := withAuth(b, context.Background())
ctx, cancel := context.WithCancel(ctx)
defer cancel()
server, cleanup := newTestServer(b)
defer cleanup()

client, err := messageclient.NewGRPCClient(ctx, server.dialGRPC)
require.NoError(b, err)

// start subscribe stream
stream, err := client.Subscribe(ctx, &messageV1.SubscribeRequest{
ContentTopics: []string{"topic"},
})
require.NoError(b, err)
defer stream.Close()
time.Sleep(50 * time.Millisecond)
// create topics large topics for 10 streams. Topic should be interleaved.
const chunkSize = 1000
const streamsCount = 10
topics := [streamsCount][]string{}

maxTopic := (len(topics)-1)*chunkSize*3/4 + chunkSize
// create a random order of topics.
topicsOrder := rand.Perm(maxTopic)
envs := make([]*messageV1.Envelope, len(topicsOrder))
for i, topicID := range topicsOrder {
envs[i] = &messageV1.Envelope{
ContentTopic: fmt.Sprintf("/xmtp/0/topic/%d", topicID),
Message: []byte{1, 2, 3},
TimestampNs: uint64(time.Second),
}
}

// publish 10 messages
envs := makeEnvelopes(10)
publishRes, err := client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs})
require.NoError(b, err)
require.NotNil(b, publishRes)
for j := range topics {
topics[j] = make([]string, chunkSize)
for k := range topics[j] {
topics[j][k] = fmt.Sprintf("/xmtp/0/topic/%d", (j*chunkSize*3/4 + k))
}
}

// read subscription
subscribeExpect(b, stream, envs)
streams := [10]messageclient.Stream{}
b.ResetTimer()
for i := range streams {
// start subscribe streams
var err error
streams[i], err = client.Subscribe(ctx, &messageV1.SubscribeRequest{
ContentTopics: topics[i],
})
require.NoError(b, err)
defer streams[i].Close()
}

for n := 0; n < b.N; n++ {
// publish messages
publishRes, err := client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs})
require.NoError(b, err)
require.NotNil(b, publishRes)

// query for messages
requireEventuallyStored(b, ctx, client, envs)
readCtx, readCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer readCancel()
// read subscription
for _, stream := range streams {
for k := 0; k < chunkSize; k++ {
_, err := stream.Next(readCtx)
require.NoError(b, err)
}
}
}
}
12 changes: 8 additions & 4 deletions pkg/api/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ const (
testMaxMsgSize = 2 * 1024 * 1024
)

func newTestServer(t testing.TB) (*Server, func()) {
log := test.NewLog(t)
waku, wakuCleanup := test.NewNode(t)
func newTestServerWithLog(t testing.TB, log *zap.Logger) (*Server, func()) {
waku, wakuCleanup := test.NewNode(t, log)
store, storeCleanup := newTestStore(t, log)
authzDB, _, authzDBCleanup := test.NewAuthzDB(t)
allowLister := authz.NewDatabaseWalletAllowLister(authzDB, log)
Expand All @@ -38,7 +37,7 @@ func newTestServer(t testing.TB) (*Server, func()) {
MaxMsgSize: testMaxMsgSize,
},
Waku: waku,
Log: test.NewLog(t),
Log: log,
Store: store,
AllowLister: allowLister,
})
Expand All @@ -51,6 +50,11 @@ func newTestServer(t testing.TB) (*Server, func()) {
}
}

func newTestServer(t testing.TB) (*Server, func()) {
log := test.NewLog(t)
return newTestServerWithLog(t, log)
}

func newTestStore(t testing.TB, log *zap.Logger) (*store.Store, func()) {
db, _, dbCleanup := test.NewDB(t)
store, err := store.New(&store.Config{
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
func makeEnvelopes(count int) (envs []*messageV1.Envelope) {
for i := 0; i < count; i++ {
envs = append(envs, &messageV1.Envelope{
ContentTopic: "topic",
ContentTopic: "/xmtp/0/topic",
Message: []byte(fmt.Sprintf("msg %d", i)),
TimestampNs: uint64(i * 1000000000), // i seconds
})
Expand Down
8 changes: 4 additions & 4 deletions pkg/server/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,19 @@ func TestNodes_Deployment(t *testing.T) {
n2PrivKey := test.NewPrivateKey(t)

// Spin up initial instances of the nodes.
n1, cleanup := test.NewNode(t, wakunode.WithPrivateKey(n1PrivKey))
n1, cleanup := test.NewNode(t, test.NewLog(t), wakunode.WithPrivateKey(n1PrivKey))
defer cleanup()
n2, cleanup := test.NewNode(t, wakunode.WithPrivateKey(n2PrivKey))
n2, cleanup := test.NewNode(t, test.NewLog(t), wakunode.WithPrivateKey(n2PrivKey))
defer cleanup()

// Connect the nodes.
test.Connect(t, n1, n2)
test.Connect(t, n2, n1)

// Spin up new instances of the nodes.
newN1, cleanup := test.NewNode(t, wakunode.WithPrivateKey(n1PrivKey))
newN1, cleanup := test.NewNode(t, test.NewLog(t), wakunode.WithPrivateKey(n1PrivKey))
defer cleanup()
newN2, cleanup := test.NewNode(t, wakunode.WithPrivateKey(n2PrivKey))
newN2, cleanup := test.NewNode(t, test.NewLog(t), wakunode.WithPrivateKey(n2PrivKey))
defer cleanup()

// Expect matching peer IDs for new and old instances.
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ func TestServer_NewShutdown(t *testing.T) {
func TestServer_StaticNodesReconnect(t *testing.T) {
t.Parallel()

n1, cleanup := test.NewNode(t)
n1, cleanup := test.NewNode(t, test.NewLog(t))
defer cleanup()
n1ID := n1.Host().ID()

n2, cleanup := test.NewNode(t)
n2, cleanup := test.NewNode(t, test.NewLog(t))
defer cleanup()
n2ID := n2.Host().ID()

Expand Down
4 changes: 2 additions & 2 deletions pkg/testing/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/waku-org/go-waku/tests"
wakunode "github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"go.uber.org/zap"
)

func Connect(t *testing.T, n1 *wakunode.WakuNode, n2 *wakunode.WakuNode, protocols ...protocol.ID) {
Expand Down Expand Up @@ -65,11 +66,10 @@ func Disconnect(t *testing.T, n1 *wakunode.WakuNode, n2 *wakunode.WakuNode) {
}, 3*time.Second, 50*time.Millisecond)
}

func NewNode(t testing.TB, opts ...wakunode.WakuNodeOption) (*wakunode.WakuNode, func()) {
func NewNode(t testing.TB, log *zap.Logger, opts ...wakunode.WakuNodeOption) (*wakunode.WakuNode, func()) {
hostAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
prvKey := NewPrivateKey(t)
ctx := context.Background()
log := NewLog(t)
opts = append([]wakunode.WakuNodeOption{
wakunode.WithLogger(log),
wakunode.WithPrivateKey(prvKey),
Expand Down

0 comments on commit 8c3a231

Please sign in to comment.