Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update lightpush API for autosharding #774

Merged
merged 9 commits into from
Sep 29, 2023
2 changes: 1 addition & 1 deletion library/c/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,7 @@ Publish a message using Waku Lightpush.

1. `char* messageJson`: JSON string containing the [Waku Message](https://rfc.vac.dev/spec/14/) as [`JsonMessage`](#jsonmessage-type).
2. `char* pubsubTopic`: pubsub topic on which to publish the message.
If `NULL`, it uses the default pubsub topic.
If `NULL`, it derives the pubsub topic from content-topic based on autosharding.
3. `char* peerID`: Peer ID supporting the lightpush protocol.
The peer must be already known.
It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid)
Expand Down
2 changes: 1 addition & 1 deletion library/c/api_lightpush.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package main
import "C"
import "github.com/waku-org/go-waku/library"

// Publish a message using waku lightpush. Use NULL for topic to use the default pubsub topic..
// Publish a message using waku lightpush. Use NULL for topic to derive the pubsub topic from the contentTopic.
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
// peerID should contain the ID of a peer supporting the lightpush protocol. Use NULL to automatically select a node
// If ms is greater than 0, the broadcast of the message must happen before the timeout
// (in milliseconds) is reached, or an error will be returned
Expand Down
4 changes: 2 additions & 2 deletions library/lightpush.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ func lightpushPublish(msg *pb.WakuMessage, pubsubTopic string, peerID string, ms
}

// LightpushPublish is used to publish a WakuMessage in a pubsub topic using Lightpush protocol
func LightpushPublish(messageJSON string, topic string, peerID string, ms int) (string, error) {
func LightpushPublish(messageJSON string, pubsubTopic string, peerID string, ms int) (string, error) {
msg, err := wakuMessage(messageJSON)
if err != nil {
return "", err
}

return lightpushPublish(msg, getTopic(topic), peerID, ms)
return lightpushPublish(msg, getTopic(pubsubTopic), peerID, ms)
}
14 changes: 2 additions & 12 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str
pubSubTopic := ""
//For now returning failure, this will get addressed with autosharding changes for filter.
if messagePush.PubsubTopic == nil {
pubSubTopic, err = getPubSubTopicFromContentTopic(messagePush.WakuMessage.ContentTopic)
pubSubTopic, err = protocol.GetPubSubTopicFromContentTopic(messagePush.WakuMessage.ContentTopic)
if err != nil {
logger.Error("could not derive pubSubTopic from contentTopic", zap.Error(err))
wf.metrics.RecordError(decodeRPCFailure)
Expand Down Expand Up @@ -230,16 +230,6 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr
return nil
}

func getPubSubTopicFromContentTopic(cTopicString string) (string, error) {
cTopic, err := protocol.StringToContentTopic(cTopicString)
if err != nil {
return "", fmt.Errorf("%s : %s", err.Error(), cTopicString)
}
pTopic := protocol.GetShardFromContentTopic(cTopic, protocol.GenerationZeroShardsCount)

return pTopic.String(), nil
}

// This function converts a contentFilter into a map of pubSubTopics and corresponding contentTopics
func contentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[string][]string, error) {
pubSubTopicMap := make(map[string][]string)
Expand All @@ -249,7 +239,7 @@ func contentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[string][]st
} else {
//Parse the content-Topics to figure out shards.
for _, cTopicString := range contentFilter.ContentTopicsList() {
pTopicStr, err := getPubSubTopicFromContentTopic(cTopicString)
pTopicStr, err := protocol.GetPubSubTopicFromContentTopic(cTopicString)
if err != nil {
return nil, err
}
Expand Down
21 changes: 14 additions & 7 deletions waku/v2/protocol/lightpush/waku_lightpush.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,31 +215,38 @@ func (wakuLP *WakuLightPush) Stop() {
wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1)
}

// PublishToTopic is used to broadcast a WakuMessage to a pubsub topic via lightpush protocol
func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.WakuMessage, topic string, opts ...Option) ([]byte, error) {
// Optional PublishToTopic is used to broadcast a WakuMessage to a pubsub topic via lightpush protocol
// If pubSubTopic is not provided, then contentTopic is use to derive the relevant pubSubTopic via autosharding.
func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.WakuMessage, pubsubTopic string, opts ...Option) ([]byte, error) {
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
if message == nil {
return nil, errors.New("message can't be null")
}

if pubsubTopic == "" {
var err error
pubsubTopic, err = protocol.GetPubSubTopicFromContentTopic(message.ContentTopic)
if err != nil {
return nil, err
}
}
req := new(pb.PushRequest)
req.Message = message
req.PubsubTopic = topic
req.PubsubTopic = pubsubTopic

response, err := wakuLP.request(ctx, req, opts...)
if err != nil {
return nil, err
}

if response.IsSuccess {
hash := message.Hash(topic)
hash := message.Hash(pubsubTopic)
wakuLP.log.Info("waku.lightpush published", logging.HexString("hash", hash))
return hash, nil
}

return nil, errors.New(response.Info)
}

// Publish is used to broadcast a WakuMessage to the default waku pubsub topic via lightpush protocol
// Publish is used to broadcast a WakuMessage to the pubSubTopic (which is derived from the contentTopic) via lightpush protocol
func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) {
return wakuLP.PublishToTopic(ctx, message, relay.DefaultWakuTopic, opts...)
return wakuLP.PublishToTopic(ctx, message, "", opts...)
}
79 changes: 77 additions & 2 deletions waku/v2/protocol/lightpush/waku_lightpush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/utils"
)

func makeWakuRelay(t *testing.T, topic string) (*relay.WakuRelay, *relay.Subscription, host.Host) {
func makeWakuRelay(t *testing.T, pusubTopic string) (*relay.WakuRelay, *relay.Subscription, host.Host) {
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)

Expand All @@ -34,7 +34,7 @@ func makeWakuRelay(t *testing.T, topic string) (*relay.WakuRelay, *relay.Subscri
err = relay.Start(context.Background())
require.NoError(t, err)

sub, err := relay.SubscribeToTopic(context.Background(), topic)
sub, err := relay.SubscribeToTopic(context.Background(), pusubTopic)
require.NoError(t, err)

return relay, sub, host
Expand Down Expand Up @@ -150,3 +150,78 @@ func TestWakuLightPushNoPeers(t *testing.T) {
_, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", utils.GetUnixEpoch()), testTopic)
require.Errorf(t, err, "no suitable remote peers")
}

func TestWakuLightPushAutoSharding(t *testing.T) {
contentTopic := "0/test/1/testTopic/proto"
cTopic1, err := protocol.StringToContentTopic(contentTopic)
require.NoError(t, err)
//Computing pubSubTopic only for filterFullNode.
pubSubTopicInst := protocol.GetShardFromContentTopic(cTopic1, protocol.GenerationZeroShardsCount)
pubSubTopic := pubSubTopicInst.String()
node1, sub1, host1 := makeWakuRelay(t, pubSubTopic)
defer node1.Stop()
defer sub1.Unsubscribe()

node2, sub2, host2 := makeWakuRelay(t, pubSubTopic)
defer node2.Stop()
defer sub2.Unsubscribe()

ctx := context.Background()
lightPushNode2 := NewWakuLightPush(node2, nil, prometheus.DefaultRegisterer, utils.Logger())
lightPushNode2.SetHost(host2)
err = lightPushNode2.Start(ctx)
require.NoError(t, err)
defer lightPushNode2.Stop()

port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)

clientHost, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
client := NewWakuLightPush(nil, nil, prometheus.DefaultRegisterer, utils.Logger())
client.SetHost(clientHost)

host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
err = host2.Peerstore().AddProtocols(host1.ID(), relay.WakuRelayID_v200)
require.NoError(t, err)

err = host2.Connect(ctx, host2.Peerstore().PeerInfo(host1.ID()))
require.NoError(t, err)

clientHost.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
err = clientHost.Peerstore().AddProtocols(host2.ID(), LightPushID_v20beta1)
require.NoError(t, err)

msg1 := tests.CreateWakuMessage(contentTopic, utils.GetUnixEpoch())
msg2 := tests.CreateWakuMessage(contentTopic, utils.GetUnixEpoch())

// Wait for the mesh connection to happen between node1 and node2
time.Sleep(2 * time.Second)
var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
<-sub1.Ch
<-sub1.Ch
}()

wg.Add(1)
go func() {
defer wg.Done()
<-sub2.Ch
<-sub2.Ch
}()

// Verifying successful request
hash1, err := client.Publish(ctx, msg1)
require.NoError(t, err)
require.Equal(t, protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), string(pubSubTopic)).Hash(), hash1)

// Checking that msg hash is correct
hash, err := client.PublishToTopic(ctx, msg2, "")
require.NoError(t, err)
require.Equal(t, protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), string(pubSubTopic)).Hash(), hash)
wg.Wait()

}
10 changes: 10 additions & 0 deletions waku/v2/protocol/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,13 @@ func GetShardFromContentTopic(topic ContentTopic, shardCount int) StaticSharding

return NewStaticShardingPubsubTopic(ClusterIndex, uint16(shard))
}

func GetPubSubTopicFromContentTopic(cTopicString string) (string, error) {
cTopic, err := StringToContentTopic(cTopicString)
if err != nil {
return "", fmt.Errorf("%s : %s", err.Error(), cTopicString)
}
pTopic := GetShardFromContentTopic(cTopic, GenerationZeroShardsCount)

return pTopic.String(), nil
}