Skip to content

Commit

Permalink
Filter v2 test updates
Browse files Browse the repository at this point in the history
  • Loading branch information
Vitaliy Vlasov committed Aug 10, 2023
1 parent 9f45d27 commit ad1fdaa
Show file tree
Hide file tree
Showing 7 changed files with 288 additions and 218 deletions.
10 changes: 8 additions & 2 deletions tests/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,14 @@ func MakeHost(ctx context.Context, port int, randomness io.Reader) (host.Host, e
}

// CreateWakuMessage creates a WakuMessage protobuffer with default values and a custom contenttopic and timestamp
func CreateWakuMessage(contentTopic string, timestamp int64) *pb.WakuMessage {
return &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: contentTopic, Version: 0, Timestamp: timestamp}
func CreateWakuMessage(contentTopic string, timestamp int64, optionalPayload ...string) *pb.WakuMessage {
var payload []byte
if len(optionalPayload) > 0 {
payload = []byte(optionalPayload[0])
} else {
payload = []byte{1, 2, 3}
}
return &pb.WakuMessage{Payload: payload, ContentTopic: contentTopic, Version: 0, Timestamp: timestamp}
}

// RandomHex returns a random hex string of n bytes
Expand Down
18 changes: 9 additions & 9 deletions waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type WakuNode struct {
peerExchange Service
rendezvous Service
legacyFilter ReceptorService
filterFullnode ReceptorService
filterFullNode ReceptorService
filterLightnode Service
store ReceptorService
rlnRelay RLNRelay
Expand Down Expand Up @@ -273,8 +273,8 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.rendezvous = rendezvous.NewRendezvous(w.opts.rendezvousDB, w.peerConnector, w.log)
w.relay = relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...)
w.legacyFilter = legacy_filter.NewWakuFilter(w.bcaster, w.opts.isLegacyFilterFullnode, w.timesource, w.log, w.opts.legacyFilterOpts...)
w.filterFullnode = filter.NewWakuFilterFullnode(w.timesource, w.log, w.opts.filterOpts...)
w.filterLightnode = filter.NewWakuFilterLightnode(w.bcaster, w.peermanager, w.timesource, w.log)
w.filterFullnode = filter.NewWakuFilterFullNode(w.timesource, w.log, w.opts.filterOpts...)
w.filterLightnode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.log)
w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.log)

if params.storeFactory != nil {
Expand Down Expand Up @@ -434,10 +434,10 @@ func (w *WakuNode) Start(ctx context.Context) error {
w.log.Info("Subscribing filter to broadcaster")
}

w.filterFullnode.SetHost(host)
w.filterFullNode.SetHost(host)
if w.opts.enableFilterFullNode {
sub := w.bcaster.RegisterForAll()
err := w.filterFullnode.Start(ctx, sub)
err := w.filterFullNode.Start(ctx, sub)
if err != nil {
return err
}
Expand Down Expand Up @@ -501,7 +501,7 @@ func (w *WakuNode) Stop() {
w.lightPush.Stop()
w.store.Stop()
w.legacyFilter.Stop()
w.filterFullnode.Stop()
w.filterFullNode.Stop()

if w.opts.enableDiscV5 {
w.discoveryV5.Stop()
Expand Down Expand Up @@ -598,14 +598,14 @@ func (w *WakuNode) LegacyFilter() *legacy_filter.WakuFilter {
}

// FilterLightnode is used to access any operation related to Waku Filter protocol Full node feature
func (w *WakuNode) FilterFullnode() *filter.WakuFilterFullNode {
if result, ok := w.filterFullnode.(*filter.WakuFilterFullNode); ok {
func (w *WakuNode) FilterFullNode() *filter.WakuFilterFullNode {
if result, ok := w.filterFullNode.(*filter.WakuFilterFullNode); ok {
return result
}
return nil
}

// FilterFullnode is used to access any operation related to Waku Filter protocol Light node feature
// FilterFullNode is used to access any operation related to Waku Filter protocol Light node feature
func (w *WakuNode) FilterLightnode() *filter.WakuFilterLightnode {
if result, ok := w.filterLightnode.(*filter.WakuFilterLightnode); ok {
return result
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/node/wakuoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type WakuNodeParameters struct {

enableRelay bool
enableLegacyFilter bool
isLegacyFilterFullnode bool
isLegacyFilterFullNode bool
enableFilterLightNode bool
enableFilterFullNode bool
legacyFilterOpts []legacy_filter.Option
Expand Down Expand Up @@ -363,7 +363,7 @@ func WithPeerExchange() WakuNodeOption {
func WithLegacyWakuFilter(fullnode bool, filterOpts ...legacy_filter.Option) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableLegacyFilter = true
params.isLegacyFilterFullnode = fullnode
params.isLegacyFilterFullNode = fullnode
params.legacyFilterOpts = filterOpts
return nil
}
Expand Down
44 changes: 24 additions & 20 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var (
ErrNoPeersAvailable = errors.New("no suitable remote peers")
)

type WakuFilterLightnode struct {
type WakuFilterLightNode struct {
cancel context.CancelFunc
ctx context.Context
h host.Host
Expand All @@ -59,9 +59,9 @@ type WakuFilterPushResult struct {
// NewWakuFilterLightnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options
// Takes an optional peermanager if WakuFilterLightnode is being created along with WakuNode.
// If using libp2p host, then pass peermanager as nil
func NewWakuFilterLightnode(broadcaster relay.Broadcaster, pm *peermanager.PeerManager,
timesource timesource.Timesource, log *zap.Logger) *WakuFilterLightnode {
wf := new(WakuFilterLightnode)
func NewWakuFilterLightNode(broadcaster relay.Broadcaster, pm *peermanager.PeerManager,
timesource timesource.Timesource, log *zap.Logger) *WakuFilterLightNode {
wf := new(WakuFilterLightNode)
wf.log = log.Named("filterv2-lightnode")
wf.broadcaster = broadcaster
wf.timesource = timesource
Expand All @@ -72,11 +72,15 @@ func NewWakuFilterLightnode(broadcaster relay.Broadcaster, pm *peermanager.PeerM
}

// Sets the host to be able to mount or consume a protocol
func (wf *WakuFilterLightnode) SetHost(h host.Host) {
func (wf *WakuFilterLightNode) SetHost(h host.Host) {
wf.h = h
}

func (wf *WakuFilterLightnode) Start(ctx context.Context) error {
func (wf *WakuFilterLightNode) Host() host.Host {
return wf.h
}

func (wf *WakuFilterLightNode) Start(ctx context.Context) error {
wf.wg.Wait() // Wait for any goroutines to stop

ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter"))
Expand All @@ -98,7 +102,7 @@ func (wf *WakuFilterLightnode) Start(ctx context.Context) error {
}

// Stop unmounts the filter protocol
func (wf *WakuFilterLightnode) Stop() {
func (wf *WakuFilterLightNode) Stop() {
if wf.cancel == nil {
return
}
Expand All @@ -114,7 +118,7 @@ func (wf *WakuFilterLightnode) Stop() {
wf.wg.Wait()
}

func (wf *WakuFilterLightnode) onRequest(ctx context.Context) func(s network.Stream) {
func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Stream) {
return func(s network.Stream) {
defer s.Close()
logger := wf.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
Expand Down Expand Up @@ -149,7 +153,7 @@ func (wf *WakuFilterLightnode) onRequest(ctx context.Context) func(s network.Str
}
}

func (wf *WakuFilterLightnode) notify(remotePeerID peer.ID, pubsubTopic string, msg *wpb.WakuMessage) {
func (wf *WakuFilterLightNode) notify(remotePeerID peer.ID, pubsubTopic string, msg *wpb.WakuMessage) {
envelope := protocol.NewEnvelope(msg, wf.timesource.Now().UnixNano(), pubsubTopic)

// Broadcasting message so it's stored
Expand All @@ -159,7 +163,7 @@ func (wf *WakuFilterLightnode) notify(remotePeerID peer.ID, pubsubTopic string,
wf.subscriptions.Notify(remotePeerID, envelope)
}

func (wf *WakuFilterLightnode) request(ctx context.Context, params *FilterSubscribeParameters, reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter ContentFilter) error {
func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscribeParameters, reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter ContentFilter) error {
conn, err := wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1)
if err != nil {
metrics.RecordFilterError(ctx, "dial_failure")
Expand Down Expand Up @@ -210,7 +214,7 @@ func (wf *WakuFilterLightnode) request(ctx context.Context, params *FilterSubscr
}

// Subscribe setups a subscription to receive messages that match a specific content filter
func (wf *WakuFilterLightnode) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) (*SubscriptionDetails, error) {
func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) (*SubscriptionDetails, error) {
if contentFilter.Topic == "" {
return nil, errors.New("topic is required")
}
Expand Down Expand Up @@ -248,15 +252,15 @@ func (wf *WakuFilterLightnode) Subscribe(ctx context.Context, contentFilter Cont
}

// FilterSubscription is used to obtain an object from which you could receive messages received via filter protocol
func (wf *WakuFilterLightnode) FilterSubscription(peerID peer.ID, contentFilter ContentFilter) (*SubscriptionDetails, error) {
func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter ContentFilter) (*SubscriptionDetails, error) {
if !wf.subscriptions.Has(peerID, contentFilter.Topic, contentFilter.ContentTopics...) {
return nil, errors.New("subscription does not exist")
}

return wf.subscriptions.NewSubscription(peerID, contentFilter.Topic, contentFilter.ContentTopics), nil
}

func (wf *WakuFilterLightnode) getUnsubscribeParameters(opts ...FilterUnsubscribeOption) (*FilterUnsubscribeParameters, error) {
func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterUnsubscribeOption) (*FilterUnsubscribeParameters, error) {
params := new(FilterUnsubscribeParameters)
params.log = wf.log
for _, opt := range opts {
Expand All @@ -266,19 +270,19 @@ func (wf *WakuFilterLightnode) getUnsubscribeParameters(opts ...FilterUnsubscrib
return params, nil
}

func (wf *WakuFilterLightnode) Ping(ctx context.Context, peerID peer.ID) error {
func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID) error {
return wf.request(
ctx,
&FilterSubscribeParameters{selectedPeer: peerID},
pb.FilterSubscribeRequest_SUBSCRIBER_PING,
ContentFilter{})
}

func (wf *WakuFilterLightnode) IsSubscriptionAlive(ctx context.Context, subscription *SubscriptionDetails) error {
func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscription *SubscriptionDetails) error {
return wf.Ping(ctx, subscription.PeerID)
}

func (wf *WakuFilterLightnode) Subscriptions() []*SubscriptionDetails {
func (wf *WakuFilterLightNode) Subscriptions() []*SubscriptionDetails {
wf.subscriptions.RLock()
defer wf.subscriptions.RUnlock()

Expand All @@ -295,7 +299,7 @@ func (wf *WakuFilterLightnode) Subscriptions() []*SubscriptionDetails {
return output
}

func (wf *WakuFilterLightnode) cleanupSubscriptions(peerID peer.ID, contentFilter ContentFilter) {
func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilter ContentFilter) {
wf.subscriptions.Lock()
defer wf.subscriptions.Unlock()

Expand Down Expand Up @@ -327,7 +331,7 @@ func (wf *WakuFilterLightnode) cleanupSubscriptions(peerID peer.ID, contentFilte
}

// Unsubscribe is used to stop receiving messages from a peer that match a content filter
func (wf *WakuFilterLightnode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
if contentFilter.Topic == "" {
return nil, errors.New("topic is required")
}
Expand Down Expand Up @@ -391,7 +395,7 @@ func (wf *WakuFilterLightnode) Unsubscribe(ctx context.Context, contentFilter Co
}

// Unsubscribe is used to stop receiving messages from a peer that match a content filter
func (wf *WakuFilterLightnode) UnsubscribeWithSubscription(ctx context.Context, sub *SubscriptionDetails, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *SubscriptionDetails, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
var contentTopics []string
for k := range sub.ContentTopics {
contentTopics = append(contentTopics, k)
Expand All @@ -403,7 +407,7 @@ func (wf *WakuFilterLightnode) UnsubscribeWithSubscription(ctx context.Context,
}

// UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions
func (wf *WakuFilterLightnode) UnsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
params, err := wf.getUnsubscribeParameters(opts...)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit ad1fdaa

Please sign in to comment.