From ad1fdaaf2c8cbaa6b6cd1e7be155dcfdcf267c52 Mon Sep 17 00:00:00 2001 From: Vitaliy Vlasov Date: Tue, 25 Jul 2023 13:54:08 +0300 Subject: [PATCH] Filter v2 test updates --- tests/utils.go | 10 +- waku/v2/node/wakunode2.go | 18 +- waku/v2/node/wakuoptions.go | 4 +- waku/v2/protocol/filter/client.go | 44 +-- waku/v2/protocol/filter/filter_test.go | 418 ++++++++++++++----------- waku/v2/protocol/filter/server.go | 8 +- waku/v2/protocol/relay/waku_relay.go | 4 + 7 files changed, 288 insertions(+), 218 deletions(-) diff --git a/tests/utils.go b/tests/utils.go index 26350b543..dff08becd 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -122,8 +122,14 @@ func MakeHost(ctx context.Context, port int, randomness io.Reader) (host.Host, e } // CreateWakuMessage creates a WakuMessage protobuffer with default values and a custom contenttopic and timestamp -func CreateWakuMessage(contentTopic string, timestamp int64) *pb.WakuMessage { - return &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: contentTopic, Version: 0, Timestamp: timestamp} +func CreateWakuMessage(contentTopic string, timestamp int64, optionalPayload ...string) *pb.WakuMessage { + var payload []byte + if len(optionalPayload) > 0 { + payload = []byte(optionalPayload[0]) + } else { + payload = []byte{1, 2, 3} + } + return &pb.WakuMessage{Payload: payload, ContentTopic: contentTopic, Version: 0, Timestamp: timestamp} } // RandomHex returns a random hex string of n bytes diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index db73c680c..b2855eb1a 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -90,7 +90,7 @@ type WakuNode struct { peerExchange Service rendezvous Service legacyFilter ReceptorService - filterFullnode ReceptorService + filterFullNode ReceptorService filterLightnode Service store ReceptorService rlnRelay RLNRelay @@ -273,8 +273,8 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.rendezvous = rendezvous.NewRendezvous(w.opts.rendezvousDB, w.peerConnector, w.log) w.relay = relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...) w.legacyFilter = legacy_filter.NewWakuFilter(w.bcaster, w.opts.isLegacyFilterFullnode, w.timesource, w.log, w.opts.legacyFilterOpts...) - w.filterFullnode = filter.NewWakuFilterFullnode(w.timesource, w.log, w.opts.filterOpts...) - w.filterLightnode = filter.NewWakuFilterLightnode(w.bcaster, w.peermanager, w.timesource, w.log) + w.filterFullnode = filter.NewWakuFilterFullNode(w.timesource, w.log, w.opts.filterOpts...) + w.filterLightnode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.log) w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.log) if params.storeFactory != nil { @@ -434,10 +434,10 @@ func (w *WakuNode) Start(ctx context.Context) error { w.log.Info("Subscribing filter to broadcaster") } - w.filterFullnode.SetHost(host) + w.filterFullNode.SetHost(host) if w.opts.enableFilterFullNode { sub := w.bcaster.RegisterForAll() - err := w.filterFullnode.Start(ctx, sub) + err := w.filterFullNode.Start(ctx, sub) if err != nil { return err } @@ -501,7 +501,7 @@ func (w *WakuNode) Stop() { w.lightPush.Stop() w.store.Stop() w.legacyFilter.Stop() - w.filterFullnode.Stop() + w.filterFullNode.Stop() if w.opts.enableDiscV5 { w.discoveryV5.Stop() @@ -598,14 +598,14 @@ func (w *WakuNode) LegacyFilter() *legacy_filter.WakuFilter { } // FilterLightnode is used to access any operation related to Waku Filter protocol Full node feature -func (w *WakuNode) FilterFullnode() *filter.WakuFilterFullNode { - if result, ok := w.filterFullnode.(*filter.WakuFilterFullNode); ok { +func (w *WakuNode) FilterFullNode() *filter.WakuFilterFullNode { + if result, ok := w.filterFullNode.(*filter.WakuFilterFullNode); ok { return result } return nil } -// FilterFullnode is used to access any operation related to Waku Filter protocol Light node feature +// FilterFullNode is used to access any operation related to Waku Filter protocol Light node feature func (w *WakuNode) FilterLightnode() *filter.WakuFilterLightnode { if result, ok := w.filterLightnode.(*filter.WakuFilterLightnode); ok { return result diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 2c90f7026..52c6c964d 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -68,7 +68,7 @@ type WakuNodeParameters struct { enableRelay bool enableLegacyFilter bool - isLegacyFilterFullnode bool + isLegacyFilterFullNode bool enableFilterLightNode bool enableFilterFullNode bool legacyFilterOpts []legacy_filter.Option @@ -363,7 +363,7 @@ func WithPeerExchange() WakuNodeOption { func WithLegacyWakuFilter(fullnode bool, filterOpts ...legacy_filter.Option) WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableLegacyFilter = true - params.isLegacyFilterFullnode = fullnode + params.isLegacyFilterFullNode = fullnode params.legacyFilterOpts = filterOpts return nil } diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index da0ab8b6d..d72deed34 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -34,7 +34,7 @@ var ( ErrNoPeersAvailable = errors.New("no suitable remote peers") ) -type WakuFilterLightnode struct { +type WakuFilterLightNode struct { cancel context.CancelFunc ctx context.Context h host.Host @@ -59,9 +59,9 @@ type WakuFilterPushResult struct { // NewWakuFilterLightnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options // Takes an optional peermanager if WakuFilterLightnode is being created along with WakuNode. // If using libp2p host, then pass peermanager as nil -func NewWakuFilterLightnode(broadcaster relay.Broadcaster, pm *peermanager.PeerManager, - timesource timesource.Timesource, log *zap.Logger) *WakuFilterLightnode { - wf := new(WakuFilterLightnode) +func NewWakuFilterLightNode(broadcaster relay.Broadcaster, pm *peermanager.PeerManager, + timesource timesource.Timesource, log *zap.Logger) *WakuFilterLightNode { + wf := new(WakuFilterLightNode) wf.log = log.Named("filterv2-lightnode") wf.broadcaster = broadcaster wf.timesource = timesource @@ -72,11 +72,15 @@ func NewWakuFilterLightnode(broadcaster relay.Broadcaster, pm *peermanager.PeerM } // Sets the host to be able to mount or consume a protocol -func (wf *WakuFilterLightnode) SetHost(h host.Host) { +func (wf *WakuFilterLightNode) SetHost(h host.Host) { wf.h = h } -func (wf *WakuFilterLightnode) Start(ctx context.Context) error { +func (wf *WakuFilterLightNode) Host() host.Host { + return wf.h +} + +func (wf *WakuFilterLightNode) Start(ctx context.Context) error { wf.wg.Wait() // Wait for any goroutines to stop ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter")) @@ -98,7 +102,7 @@ func (wf *WakuFilterLightnode) Start(ctx context.Context) error { } // Stop unmounts the filter protocol -func (wf *WakuFilterLightnode) Stop() { +func (wf *WakuFilterLightNode) Stop() { if wf.cancel == nil { return } @@ -114,7 +118,7 @@ func (wf *WakuFilterLightnode) Stop() { wf.wg.Wait() } -func (wf *WakuFilterLightnode) onRequest(ctx context.Context) func(s network.Stream) { +func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Stream) { return func(s network.Stream) { defer s.Close() logger := wf.log.With(logging.HostID("peer", s.Conn().RemotePeer())) @@ -149,7 +153,7 @@ func (wf *WakuFilterLightnode) onRequest(ctx context.Context) func(s network.Str } } -func (wf *WakuFilterLightnode) notify(remotePeerID peer.ID, pubsubTopic string, msg *wpb.WakuMessage) { +func (wf *WakuFilterLightNode) notify(remotePeerID peer.ID, pubsubTopic string, msg *wpb.WakuMessage) { envelope := protocol.NewEnvelope(msg, wf.timesource.Now().UnixNano(), pubsubTopic) // Broadcasting message so it's stored @@ -159,7 +163,7 @@ func (wf *WakuFilterLightnode) notify(remotePeerID peer.ID, pubsubTopic string, wf.subscriptions.Notify(remotePeerID, envelope) } -func (wf *WakuFilterLightnode) request(ctx context.Context, params *FilterSubscribeParameters, reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter ContentFilter) error { +func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscribeParameters, reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter ContentFilter) error { conn, err := wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1) if err != nil { metrics.RecordFilterError(ctx, "dial_failure") @@ -210,7 +214,7 @@ func (wf *WakuFilterLightnode) request(ctx context.Context, params *FilterSubscr } // Subscribe setups a subscription to receive messages that match a specific content filter -func (wf *WakuFilterLightnode) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) (*SubscriptionDetails, error) { +func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) (*SubscriptionDetails, error) { if contentFilter.Topic == "" { return nil, errors.New("topic is required") } @@ -248,7 +252,7 @@ func (wf *WakuFilterLightnode) Subscribe(ctx context.Context, contentFilter Cont } // FilterSubscription is used to obtain an object from which you could receive messages received via filter protocol -func (wf *WakuFilterLightnode) FilterSubscription(peerID peer.ID, contentFilter ContentFilter) (*SubscriptionDetails, error) { +func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter ContentFilter) (*SubscriptionDetails, error) { if !wf.subscriptions.Has(peerID, contentFilter.Topic, contentFilter.ContentTopics...) { return nil, errors.New("subscription does not exist") } @@ -256,7 +260,7 @@ func (wf *WakuFilterLightnode) FilterSubscription(peerID peer.ID, contentFilter return wf.subscriptions.NewSubscription(peerID, contentFilter.Topic, contentFilter.ContentTopics), nil } -func (wf *WakuFilterLightnode) getUnsubscribeParameters(opts ...FilterUnsubscribeOption) (*FilterUnsubscribeParameters, error) { +func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterUnsubscribeOption) (*FilterUnsubscribeParameters, error) { params := new(FilterUnsubscribeParameters) params.log = wf.log for _, opt := range opts { @@ -266,7 +270,7 @@ func (wf *WakuFilterLightnode) getUnsubscribeParameters(opts ...FilterUnsubscrib return params, nil } -func (wf *WakuFilterLightnode) Ping(ctx context.Context, peerID peer.ID) error { +func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID) error { return wf.request( ctx, &FilterSubscribeParameters{selectedPeer: peerID}, @@ -274,11 +278,11 @@ func (wf *WakuFilterLightnode) Ping(ctx context.Context, peerID peer.ID) error { ContentFilter{}) } -func (wf *WakuFilterLightnode) IsSubscriptionAlive(ctx context.Context, subscription *SubscriptionDetails) error { +func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscription *SubscriptionDetails) error { return wf.Ping(ctx, subscription.PeerID) } -func (wf *WakuFilterLightnode) Subscriptions() []*SubscriptionDetails { +func (wf *WakuFilterLightNode) Subscriptions() []*SubscriptionDetails { wf.subscriptions.RLock() defer wf.subscriptions.RUnlock() @@ -295,7 +299,7 @@ func (wf *WakuFilterLightnode) Subscriptions() []*SubscriptionDetails { return output } -func (wf *WakuFilterLightnode) cleanupSubscriptions(peerID peer.ID, contentFilter ContentFilter) { +func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilter ContentFilter) { wf.subscriptions.Lock() defer wf.subscriptions.Unlock() @@ -327,7 +331,7 @@ func (wf *WakuFilterLightnode) cleanupSubscriptions(peerID peer.ID, contentFilte } // Unsubscribe is used to stop receiving messages from a peer that match a content filter -func (wf *WakuFilterLightnode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { +func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { if contentFilter.Topic == "" { return nil, errors.New("topic is required") } @@ -391,7 +395,7 @@ func (wf *WakuFilterLightnode) Unsubscribe(ctx context.Context, contentFilter Co } // Unsubscribe is used to stop receiving messages from a peer that match a content filter -func (wf *WakuFilterLightnode) UnsubscribeWithSubscription(ctx context.Context, sub *SubscriptionDetails, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { +func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *SubscriptionDetails, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { var contentTopics []string for k := range sub.ContentTopics { contentTopics = append(contentTopics, k) @@ -403,7 +407,7 @@ func (wf *WakuFilterLightnode) UnsubscribeWithSubscription(ctx context.Context, } // UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions -func (wf *WakuFilterLightnode) UnsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { +func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { params, err := wf.getUnsubscribeParameters(opts...) if err != nil { return nil, err diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index be9d1382e..9741de2e0 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -9,270 +9,322 @@ import ( "time" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" - "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" + "go.uber.org/zap" ) -func makeWakuRelay(t *testing.T, topic string, broadcaster relay.Broadcaster) (*relay.WakuRelay, *relay.Subscription, host.Host) { - port, err := tests.FindFreePort(t, "", 5) - require.NoError(t, err) +func TestFilterSuite(t *testing.T) { + suite.Run(t, new(FilterTestSuite)) +} + +type FilterTestSuite struct { + suite.Suite + + testTopic string + testContentTopic string + ctx context.Context + ctxCancel context.CancelFunc + lightNode *WakuFilterLightNode + relayNode *relay.WakuRelay + relaySub *relay.Subscription + fullNode *WakuFilterFullNode + wg *sync.WaitGroup + contentFilter ContentFilter + subDetails *SubscriptionDetails + log *zap.Logger +} + +func (s *FilterTestSuite) makeWakuRelay(topic string) (*relay.WakuRelay, *relay.Subscription, host.Host, relay.Broadcaster) { + + broadcaster := relay.NewBroadcaster(10) + s.Require().NoError(broadcaster.Start(context.Background())) + + port, err := tests.FindFreePort(s.T(), "", 5) + s.Require().NoError(err) host, err := tests.MakeHost(context.Background(), port, rand.Reader) - require.NoError(t, err) + s.Require().NoError(err) - relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), utils.Logger()) + relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), s.log) relay.SetHost(host) err = relay.Start(context.Background()) - require.NoError(t, err) + s.Require().NoError(err) sub, err := relay.SubscribeToTopic(context.Background(), topic) - require.NoError(t, err) + s.Require().NoError(err) - return relay, sub, host + return relay, sub, host, broadcaster } -func makeWakuFilterLightNode(t *testing.T) (*WakuFilterLightnode, host.Host) { - port, err := tests.FindFreePort(t, "", 5) - require.NoError(t, err) +func (s *FilterTestSuite) makeWakuFilterLightNode() *WakuFilterLightNode { + port, err := tests.FindFreePort(s.T(), "", 5) + s.Require().NoError(err) host, err := tests.MakeHost(context.Background(), port, rand.Reader) - require.NoError(t, err) + s.Require().NoError(err) b := relay.NewBroadcaster(10) - require.NoError(t, b.Start(context.Background())) - filterPush := NewWakuFilterLightnode(b, nil, timesource.NewDefaultClock(), utils.Logger()) + s.Require().NoError(b.Start(context.Background())) + filterPush := NewWakuFilterLightNode(b, nil, timesource.NewDefaultClock(), s.log) filterPush.SetHost(host) err = filterPush.Start(context.Background()) - require.NoError(t, err) + s.Require().NoError(err) - return filterPush, host + return filterPush } -// Node1: Filter subscribed to content topic A -// Node2: Relay + Filter -// -// # Node1 and Node2 are peers -// -// Node2 send a successful message with topic A -// Node1 receive the message -// -// Node2 send a successful message with topic B -// Node1 doesn't receive the message -func TestWakuFilter(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds - defer cancel() - - testTopic := "/waku/2/go/filter/test" - testContentTopic := "TopicA" +func (s *FilterTestSuite) makeWakuFilterFullNode(topic string) (*relay.WakuRelay, *WakuFilterFullNode) { + node, relaySub, host, broadcaster := s.makeWakuRelay(topic) + s.relaySub = relaySub - node1, host1 := makeWakuFilterLightNode(t) - defer node1.Stop() + node2Filter := NewWakuFilterFullNode(timesource.NewDefaultClock(), s.log) + node2Filter.SetHost(host) + sub := broadcaster.Register(topic) + err := node2Filter.Start(s.ctx, sub) + s.Require().NoError(err) - broadcaster := relay.NewBroadcaster(10) - require.NoError(t, broadcaster.Start(context.Background())) - node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster) - defer node2.Stop() - defer sub2.Unsubscribe() - - node2Filter := NewWakuFilterFullnode(timesource.NewDefaultClock(), utils.Logger()) - node2Filter.SetHost(host2) - sub := broadcaster.Register(testTopic) - err := node2Filter.Start(ctx, sub) - require.NoError(t, err) - - host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) - err = host1.Peerstore().AddProtocols(host2.ID(), FilterSubscribeID_v20beta1) - require.NoError(t, err) - - contentFilter := ContentFilter{ - Topic: string(testTopic), - ContentTopics: []string{testContentTopic}, - } - - subscriptionChannel, err := node1.Subscribe(ctx, contentFilter, WithPeer(node2Filter.h.ID())) - require.NoError(t, err) - - // Sleep to make sure the filter is subscribed - time.Sleep(2 * time.Second) - - var wg sync.WaitGroup + return node, node2Filter +} - wg.Add(1) +func (s *FilterTestSuite) waitForMsg(fn func(), ch chan *protocol.Envelope) { + s.wg.Add(1) go func() { - defer wg.Done() - env := <-subscriptionChannel.C - require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic()) + defer s.wg.Done() + select { + case env := <-ch: + s.Require().Equal(s.contentFilter.ContentTopics[0], env.Message().GetContentTopic()) + case <-time.After(5 * time.Second): + s.Require().Fail("Message timeout") + case <-s.ctx.Done(): + s.Require().Fail("test exceeded allocated time") + } }() - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) - require.NoError(t, err) + fn() - wg.Wait() + s.wg.Wait() +} - wg.Add(1) +func (s *FilterTestSuite) waitForTimeout(fn func(), ch chan *protocol.Envelope) { + s.wg.Add(1) go func() { + defer s.wg.Done() select { - case <-subscriptionChannel.C: - require.Fail(t, "should not receive another message") + case <-ch: + s.Require().Fail("should not receive another message") case <-time.After(1 * time.Second): - defer wg.Done() - case <-ctx.Done(): - require.Fail(t, "test exceeded allocated time") + // Timeout elapsed, all good + case <-s.ctx.Done(): + s.Require().Fail("test exceeded allocated time") } }() - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage("TopicB", utils.GetUnixEpoch()), testTopic) - require.NoError(t, err) + fn() - wg.Wait() + s.wg.Wait() +} - wg.Add(1) - go func() { - select { - case <-subscriptionChannel.C: - require.Fail(t, "should not receive another message") - case <-time.After(1 * time.Second): - defer wg.Done() - case <-ctx.Done(): - require.Fail(t, "test exceeded allocated time") - } - }() +func (s *FilterTestSuite) subscribe(topic string, contentTopic string, peer peer.ID) *SubscriptionDetails { + s.contentFilter = ContentFilter{ + Topic: string(topic), + ContentTopics: []string{contentTopic}, + } - _, err = node1.Unsubscribe(ctx, contentFilter, Peer(node2Filter.h.ID())) - require.NoError(t, err) + subDetails, err := s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(peer)) + s.Require().NoError(err) - time.Sleep(1 * time.Second) + // Sleep to make sure the filter is subscribed + time.Sleep(2 * time.Second) + + return subDetails +} + +func (s *FilterTestSuite) publishMsg(topic, contentTopic string, optionalPayload ...string) { + var payload string + if len(optionalPayload) > 0 { + payload = optionalPayload[0] + } else { + payload = "123" + } - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) - require.NoError(t, err) - wg.Wait() + _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(contentTopic, utils.GetUnixEpoch(), payload), topic) + s.Require().NoError(err) } -func TestSubscriptionPing(t *testing.T) { +func (s *FilterTestSuite) SetupTest() { + log := utils.Logger() //.Named("filterv2-test") + s.log = log + // Use a pointer to WaitGroup so that to avoid copying + // https://pkg.go.dev/sync#WaitGroup + s.wg = &sync.WaitGroup{} + + // Create test context ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds - defer cancel() + s.ctx = ctx + s.ctxCancel = cancel - testTopic := "/waku/2/go/filter/test" + s.testTopic = "/waku/2/go/filter/test" + s.testContentTopic = "TopicA" - node1, host1 := makeWakuFilterLightNode(t) - defer node1.Stop() + s.lightNode = s.makeWakuFilterLightNode() - broadcaster := relay.NewBroadcaster(10) - require.NoError(t, broadcaster.Start(context.Background())) - node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster) - defer node2.Stop() - defer sub2.Unsubscribe() - - node2Filter := NewWakuFilterFullnode(timesource.NewDefaultClock(), utils.Logger()) - node2Filter.SetHost(host2) - err := node2Filter.Start(ctx, relay.NoopSubscription()) - require.NoError(t, err) - - host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) - err = host1.Peerstore().AddProtocols(host2.ID(), FilterSubscribeID_v20beta1) - require.NoError(t, err) - - err = node1.Ping(context.Background(), host2.ID()) - require.Error(t, err) - filterErr, ok := err.(*FilterError) - require.True(t, ok) - require.Equal(t, filterErr.Code, http.StatusNotFound) + s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic) - contentFilter := ContentFilter{ - Topic: string(testTopic), - ContentTopics: []string{"abc"}, - } - _, err = node1.Subscribe(ctx, contentFilter, WithPeer(node2Filter.h.ID())) - require.NoError(t, err) + // Connect nodes + s.lightNode.Host().Peerstore().AddAddr(s.fullNode.Host().ID(), tests.GetHostAddress(s.fullNode.Host()), peerstore.PermanentAddrTTL) + err := s.lightNode.Host().Peerstore().AddProtocols(s.fullNode.Host().ID(), FilterSubscribeID_v20beta1) + s.Require().NoError(err) - err = node1.Ping(context.Background(), host2.ID()) - require.NoError(t, err) } -func TestWakuFilterPeerFailure(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds - defer cancel() +func (s *FilterTestSuite) TearDownTest() { + s.fullNode.Stop() + s.relayNode.Stop() + s.relaySub.Unsubscribe() + s.lightNode.Stop() + s.ctxCancel() +} - testTopic := "/waku/2/go/filter/test" - testContentTopic := "TopicA" +func (s *FilterTestSuite) TestWakuFilter() { - node1, host1 := makeWakuFilterLightNode(t) + // Initial subscribe + s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNode.Host().ID()) - broadcaster := relay.NewBroadcaster(10) - require.NoError(t, broadcaster.Start(context.Background())) - node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster) - defer node2.Stop() - defer sub2.Unsubscribe() + // Should be received + s.waitForMsg(func() { + s.publishMsg(s.testTopic, s.testContentTopic, "first") + }, s.subDetails.C) - broadcaster2 := relay.NewBroadcaster(10) - require.NoError(t, broadcaster2.Start(context.Background())) - node2Filter := NewWakuFilterFullnode(timesource.NewDefaultClock(), utils.Logger(), WithTimeout(5*time.Second)) - node2Filter.SetHost(host2) - sub := broadcaster.Register(testTopic) - err := node2Filter.Start(ctx, sub) - require.NoError(t, err) - - host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) - err = host1.Peerstore().AddProtocols(host2.ID(), FilterPushID_v20beta1) - require.NoError(t, err) - - contentFilter := &ContentFilter{ - Topic: string(testTopic), - ContentTopics: []string{testContentTopic}, - } + // Wrong content topic + s.waitForTimeout(func() { + s.publishMsg(s.testTopic, "TopicB", "second") + }, s.subDetails.C) - f, err := node1.Subscribe(ctx, *contentFilter, WithPeer(node2Filter.h.ID())) - require.NoError(t, err) + _, err := s.lightNode.Unsubscribe(s.ctx, s.contentFilter, Peer(s.fullNode.Host().ID())) + s.Require().NoError(err) - // Simulate there's been a failure before - node2Filter.subscriptions.FlagAsFailure(host1.ID()) + time.Sleep(1 * time.Second) - // Sleep to make sure the filter is subscribed - time.Sleep(2 * time.Second) + // Should not receive after unsubscribe + s.waitForTimeout(func() { + s.publishMsg(s.testTopic, s.testContentTopic, "third") + }, s.subDetails.C) +} - require.True(t, node2Filter.subscriptions.IsFailedPeer(host1.ID())) +func (s *FilterTestSuite) TestSubscriptionPing() { - var wg sync.WaitGroup + err := s.lightNode.Ping(context.Background(), s.fullNode.Host().ID()) + s.Require().Error(err) + filterErr, ok := err.(*FilterError) + s.Require().True(ok) + s.Require().Equal(filterErr.Code, http.StatusNotFound) - wg.Add(1) - go func() { - defer wg.Done() - env := <-f.C - require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic()) + contentTopic := "abc" + s.subDetails = s.subscribe(s.testTopic, contentTopic, s.fullNode.Host().ID()) + + err = s.lightNode.Ping(context.Background(), s.fullNode.Host().ID()) + s.Require().NoError(err) +} - // Failure is removed - require.False(t, node2Filter.subscriptions.IsFailedPeer(host1.ID())) +func (s *FilterTestSuite) TestPeerFailure() { - }() + broadcaster2 := relay.NewBroadcaster(10) + s.Require().NoError(broadcaster2.Start(context.Background())) + + // Initial subscribe + s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNode.Host().ID()) + + // Simulate there's been a failure before + s.fullNode.subscriptions.FlagAsFailure(s.lightNode.Host().ID()) + + // Sleep to make sure the filter is subscribed + time.Sleep(2 * time.Second) + + s.Require().True(s.fullNode.subscriptions.IsFailedPeer(s.lightNode.Host().ID())) - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) - require.NoError(t, err) + s.waitForMsg(func() { + s.publishMsg(s.testTopic, s.testContentTopic) + }, s.subDetails.C) - wg.Wait() + // Failure is removed + s.Require().False(s.fullNode.subscriptions.IsFailedPeer(s.lightNode.Host().ID())) // Kill the subscriber - host1.Close() + s.lightNode.Host().Close() time.Sleep(1 * time.Second) - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) - require.NoError(t, err) + s.publishMsg(s.testTopic, s.testContentTopic) // TODO: find out how to eliminate this sleep time.Sleep(1 * time.Second) - require.True(t, node2Filter.subscriptions.IsFailedPeer(host1.ID())) + s.Require().True(s.fullNode.subscriptions.IsFailedPeer(s.lightNode.Host().ID())) time.Sleep(2 * time.Second) - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) - require.NoError(t, err) + s.publishMsg(s.testTopic, s.testContentTopic) time.Sleep(2 * time.Second) - require.True(t, node2Filter.subscriptions.IsFailedPeer(host1.ID())) // Failed peer has been removed - require.False(t, node2Filter.subscriptions.Has(host1.ID())) // Failed peer has been removed + s.Require().True(s.fullNode.subscriptions.IsFailedPeer(s.lightNode.Host().ID())) // Failed peer has been removed + s.Require().False(s.fullNode.subscriptions.Has(s.lightNode.Host().ID())) // Failed peer has been removed +} + +func (s *FilterTestSuite) TestCreateSubscription() { + + // Initial subscribe + s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNode.Host().ID()) + + s.waitForMsg(func() { + _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic) + s.Require().NoError(err) + + }, s.subDetails.C) +} + +func (s *FilterTestSuite) TestModifySubscription() { + + // Initial subscribe + s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNode.Host().ID()) + + s.waitForMsg(func() { + _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic) + s.Require().NoError(err) + + }, s.subDetails.C) + + // Subscribe to another content_topic + newContentTopic := "Topic_modified" + s.subDetails = s.subscribe(s.testTopic, newContentTopic, s.fullNode.Host().ID()) + + s.waitForMsg(func() { + _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), s.testTopic) + s.Require().NoError(err) + + }, s.subDetails.C) +} + +func (s *FilterTestSuite) TestMultipleMessages() { + + // Initial subscribe + s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNode.Host().ID()) + + s.waitForMsg(func() { + _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "first"), s.testTopic) + s.Require().NoError(err) + + }, s.subDetails.C) + + s.waitForMsg(func() { + _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), s.testTopic) + s.Require().NoError(err) + + }, s.subDetails.C) } diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index 1e8fd50f8..b3298158d 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -45,8 +45,8 @@ type ( } ) -// NewWakuFilterFullnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options -func NewWakuFilterFullnode(timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilterFullNode { +// NewWakuFilterFullNode returns a new instance of Waku Filter struct setup according to the chosen parameter and options +func NewWakuFilterFullNode(timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilterFullNode { wf := new(WakuFilterFullNode) wf.log = log.Named("filterv2-fullnode") @@ -69,6 +69,10 @@ func (wf *WakuFilterFullNode) SetHost(h host.Host) { wf.h = h } +func (wf *WakuFilterFullNode) Host() host.Host { + return wf.h +} + func (wf *WakuFilterFullNode) Start(ctx context.Context, sub relay.Subscription) error { wf.wg.Wait() // Wait for any goroutines to stop diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 5377e06d4..9f5832c36 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -210,6 +210,10 @@ func (w *WakuRelay) SetHost(h host.Host) { w.host = h } +func (w *WakuRelay) Host() *host.Host { + return &w.host +} + // Start initiates the WakuRelay protocol func (w *WakuRelay) Start(ctx context.Context) error { w.wg.Wait()