From 9bfd0b4295f7d51dbe306c5d20a0502705918056 Mon Sep 17 00:00:00 2001 From: "Masih H. Derkani" Date: Tue, 10 Dec 2024 12:56:04 +0000 Subject: [PATCH] chore: remove IPNI advertisement relay over pubsub via Lotus node (#12768) The initial implementation of IPNI used GossipSub as a way to propagate IPNI advertisement chain. To do this the propagation had to be relayed through the Lotus node due to strict Filecoin GossipSub validation rules. Since then IPNI has moved on to roll out its own sync protocol that works over HTTP, and HTTP-over-libp2p. This move has been the official way of advertising content to IPNI federation over a year now. Therefore, remove the ad relay over pubsub via Lotus node as it is now considered to have reached its EOL as a mechanism for advertising to IPNI. --- CHANGELOG.md | 1 + build/params_shared_funcs.go | 11 -- chain/sub/incoming.go | 170 ------------------------------- chain/sub/incoming_test.go | 191 ----------------------------------- go.mod | 2 +- go.sum | 2 - node/builder.go | 7 ++ node/builder_chain.go | 2 - node/modules/lp2p/pubsub.go | 20 ---- node/modules/services.go | 31 ------ 10 files changed, 9 insertions(+), 428 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 33244cff958..e26d46f4a2b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ # UNRELEASED - Add json output of tipsets to `louts chain list`. ([filecoin-project/lotus#12691](https://github.com/filecoin-project/lotus/pull/12691)) +- Remove IPNI advertisement relay over pubsub via Lotus node as it now has been deprecated. ([filecoin-project/lotus#12768](https://github.com/filecoin-project/lotus/pull/12768) # UNRELEASED v.1.32.0 diff --git a/build/params_shared_funcs.go b/build/params_shared_funcs.go index 05dbe7817fa..8fe0ee07e36 100644 --- a/build/params_shared_funcs.go +++ b/build/params_shared_funcs.go @@ -14,17 +14,6 @@ import ( func BlocksTopic(netName dtypes.NetworkName) string { return "/fil/blocks/" + string(netName) } func MessagesTopic(netName dtypes.NetworkName) string { return "/fil/msgs/" + string(netName) } -func IndexerIngestTopic(netName dtypes.NetworkName) string { - - nn := string(netName) - // The network name testnetnet is here for historical reasons. - // Going forward we aim to use the name `mainnet` where possible. - if nn == "testnetnet" { - nn = "mainnet" - } - - return "/indexer/ingest/" + nn -} func DhtProtocolName(netName dtypes.NetworkName) protocol.ID { return protocol.ID("/fil/kad/" + string(netName)) } diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index bc5b09842bd..f21af601eb1 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -1,11 +1,8 @@ package sub import ( - "bytes" "context" - "encoding/binary" "errors" - "sync" "time" lru "github.com/hashicorp/golang-lru/v2" @@ -13,7 +10,6 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" - "github.com/ipni/go-libipni/announce/message" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/peer" @@ -30,10 +26,8 @@ import ( "github.com/filecoin-project/lotus/chain/consensus" "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/store" - "github.com/filecoin-project/lotus/chain/sub/ratelimit" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/metrics" - "github.com/filecoin-project/lotus/node/impl/full" ) var log = logging.Logger("sub") @@ -470,167 +464,3 @@ func recordFailure(ctx context.Context, metric *stats.Int64Measure, failureType ) stats.Record(ctx, metric.M(1)) } - -type peerMsgInfo struct { - peerID peer.ID - lastCid cid.Cid - lastSeqno uint64 - rateLimit *ratelimit.Window - mutex sync.Mutex -} - -type IndexerMessageValidator struct { - self peer.ID - - peerCache *lru.TwoQueueCache[address.Address, *peerMsgInfo] - chainApi full.ChainModuleAPI - stateApi full.StateModuleAPI -} - -func NewIndexerMessageValidator(self peer.ID, chainApi full.ChainModuleAPI, stateApi full.StateModuleAPI) *IndexerMessageValidator { - peerCache, _ := lru.New2Q[address.Address, *peerMsgInfo](8192) - - return &IndexerMessageValidator{ - self: self, - peerCache: peerCache, - chainApi: chainApi, - stateApi: stateApi, - } -} - -func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult { - // This chain-node should not be publishing its own messages. These are - // relayed from market-nodes. If a node appears to be local, reject it. - if pid == v.self { - log.Debug("ignoring indexer message from self") - stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1)) - return pubsub.ValidationIgnore - } - originPeer := msg.GetFrom() - if originPeer == v.self { - log.Debug("ignoring indexer message originating from self") - stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1)) - return pubsub.ValidationIgnore - } - - idxrMsg := message.Message{} - err := idxrMsg.UnmarshalCBOR(bytes.NewBuffer(msg.Data)) - if err != nil { - log.Errorw("Could not decode indexer pubsub message", "err", err) - return pubsub.ValidationReject - } - if len(idxrMsg.ExtraData) == 0 { - log.Debugw("ignoring message missing miner id", "peer", originPeer) - return pubsub.ValidationIgnore - } - - // Get miner info from lotus - minerAddr, err := address.NewFromBytes(idxrMsg.ExtraData) - if err != nil { - log.Warnw("cannot parse extra data as miner address", "err", err, "extraData", idxrMsg.ExtraData) - return pubsub.ValidationReject - } - - msgCid := idxrMsg.Cid - - msgInfo, cached := v.peerCache.Get(minerAddr) - if !cached { - msgInfo = &peerMsgInfo{} - } - - // Lock this peer's message info. - msgInfo.mutex.Lock() - defer msgInfo.mutex.Unlock() - - var seqno uint64 - if cached { - // Reject replayed messages. - seqno = binary.BigEndian.Uint64(msg.Message.GetSeqno()) - if seqno <= msgInfo.lastSeqno { - log.Debugf("ignoring replayed indexer message") - return pubsub.ValidationIgnore - } - } - - if !cached || originPeer != msgInfo.peerID { - // Check that the miner ID maps to the peer that sent the message. - err = v.authenticateMessage(ctx, minerAddr, originPeer) - if err != nil { - log.Warnw("cannot authenticate message", "err", err, "peer", originPeer, "minerID", minerAddr) - stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1)) - return pubsub.ValidationReject - } - msgInfo.peerID = originPeer - if !cached { - // Add msgInfo to cache only after being authenticated. If two - // messages from the same peer are handled concurrently, there is a - // small chance that one msgInfo could replace the other here when - // the info is first cached. This is OK, so no need to prevent it. - v.peerCache.Add(minerAddr, msgInfo) - } - } - - // Update message info cache with the latest message's sequence number. - msgInfo.lastSeqno = seqno - - // See if message needs to be ignored due to rate limiting. - if v.rateLimitPeer(msgInfo, msgCid) { - return pubsub.ValidationIgnore - } - - stats.Record(ctx, metrics.IndexerMessageValidationSuccess.M(1)) - return pubsub.ValidationAccept -} - -func (v *IndexerMessageValidator) rateLimitPeer(msgInfo *peerMsgInfo, msgCid cid.Cid) bool { - const ( - msgLimit = 5 - msgTimeLimit = 10 * time.Second - repeatTimeLimit = 2 * time.Hour - ) - - timeWindow := msgInfo.rateLimit - - // Check overall message rate. - if timeWindow == nil { - timeWindow = ratelimit.NewWindow(msgLimit, msgTimeLimit) - msgInfo.rateLimit = timeWindow - } else if msgInfo.lastCid == msgCid { - // Check if this is a repeat of the previous message data. - if time.Since(timeWindow.Newest()) < repeatTimeLimit { - log.Warnw("ignoring repeated indexer message", "sender", msgInfo.peerID) - return true - } - } - - err := timeWindow.Add() - if err != nil { - log.Warnw("ignoring indexer message", "sender", msgInfo.peerID, "err", err) - return true - } - - msgInfo.lastCid = msgCid - - return false -} - -func (v *IndexerMessageValidator) authenticateMessage(ctx context.Context, minerAddress address.Address, peerID peer.ID) error { - ts, err := v.chainApi.ChainHead(ctx) - if err != nil { - return err - } - - minerInfo, err := v.stateApi.StateMinerInfo(ctx, minerAddress, ts.Key()) - if err != nil { - return err - } - - if minerInfo.PeerId == nil { - return xerrors.New("no peer id for miner") - } - if *minerInfo.PeerId != peerID { - return xerrors.New("miner id does not map to peer that sent message") - } - - return nil -} diff --git a/chain/sub/incoming_test.go b/chain/sub/incoming_test.go index dbb0c398b22..d172b1bb83d 100644 --- a/chain/sub/incoming_test.go +++ b/chain/sub/incoming_test.go @@ -1,23 +1,14 @@ package sub import ( - "bytes" "context" "testing" - "github.com/golang/mock/gomock" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" - "github.com/ipni/go-libipni/announce/message" - pubsub "github.com/libp2p/go-libp2p-pubsub" - pb "github.com/libp2p/go-libp2p-pubsub/pb" - "github.com/libp2p/go-libp2p/core/crypto" - "github.com/libp2p/go-libp2p/core/peer" "github.com/filecoin-project/go-address" - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/api/mocks" "github.com/filecoin-project/lotus/chain/types" ) @@ -72,185 +63,3 @@ func TestFetchCidsWithDedup(t *testing.T) { t.Fatalf("there is a nil message: first %p, last %p", res[0], res[len(res)-1]) } } - -func TestIndexerMessageValidator_Validate(t *testing.T) { - validCid, err := cid.Decode("QmbpDgg5kRLDgMxS8vPKNFXEcA6D5MC4CkuUdSWDVtHPGK") - if err != nil { - t.Fatal(err) - } - tests := []struct { - name string - selfPID string - senderPID string - extraData []byte - wantValidation pubsub.ValidationResult - }{ - { - name: "invalid extra data is rejected", - selfPID: "12D3KooWQiCbqEStCkdqUvr69gQsrp9urYJZUCkzsQXia7mbqbFW", - senderPID: "12D3KooWE8yt84RVwW3sFcd6WMjbUdWrZer2YtT4dmtj3dHdahSZ", - extraData: []byte("f0127896"), // note, casting encoded address to byte is invalid. - wantValidation: pubsub.ValidationReject, - }, - { - name: "same sender and receiver is ignored", - selfPID: "12D3KooWQiCbqEStCkdqUvr69gQsrp9urYJZUCkzsQXia7mbqbFW", - senderPID: "12D3KooWQiCbqEStCkdqUvr69gQsrp9urYJZUCkzsQXia7mbqbFW", - wantValidation: pubsub.ValidationIgnore, - }, - } - for _, tc := range tests { - tc := tc - t.Run(tc.name, func(t *testing.T) { - mc := gomock.NewController(t) - node := mocks.NewMockFullNode(mc) - subject := NewIndexerMessageValidator(peer.ID(tc.selfPID), node, node) - message := message.Message{ - Cid: validCid, - Addrs: nil, - ExtraData: tc.extraData, - } - buf := bytes.NewBuffer(nil) - if err := message.MarshalCBOR(buf); err != nil { - t.Fatal(err) - } - - topic := "topic" - pbm := &pb.Message{ - Data: buf.Bytes(), - Topic: &topic, - From: nil, - Seqno: nil, - } - validate := subject.Validate(context.Background(), peer.ID(tc.senderPID), &pubsub.Message{ - Message: pbm, - ReceivedFrom: peer.ID(tc.senderPID), - ValidatorData: nil, - }) - - if validate != tc.wantValidation { - t.Fatalf("expected %v but got %v", tc.wantValidation, validate) - } - }) - } -} - -func TestIdxValidator(t *testing.T) { - validCid, err := cid.Decode("QmbpDgg5kRLDgMxS8vPKNFXEcA6D5MC4CkuUdSWDVtHPGK") - if err != nil { - t.Fatal(err) - } - - addr, err := address.NewFromString("f01024") - if err != nil { - t.Fatal(err) - } - - buf1, err := addr.MarshalBinary() - if err != nil { - t.Fatal(err) - } - - selfPID := "12D3KooWQiCbqEStCkdqUvr69gQsrp9urYJZUCkzsQXia7mbqbFW" - senderPID := "12D3KooWE8yt84RVwW3sFcd6WMjbUdWrZer2YtT4dmtj3dHdahSZ" - extraData := buf1 - - mc := gomock.NewController(t) - node := mocks.NewMockFullNode(mc) - node.EXPECT().ChainHead(gomock.Any()).Return(nil, nil).AnyTimes() - - subject := NewIndexerMessageValidator(peer.ID(selfPID), node, node) - message := message.Message{ - Cid: validCid, - Addrs: nil, - ExtraData: extraData, - } - buf := bytes.NewBuffer(nil) - if err := message.MarshalCBOR(buf); err != nil { - t.Fatal(err) - } - - topic := "topic" - - privk, _, err := crypto.GenerateKeyPair(crypto.RSA, 2048) - if err != nil { - t.Fatal(err) - } - id, err := peer.IDFromPublicKey(privk.GetPublic()) - if err != nil { - t.Fatal(err) - } - - node.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(api.MinerInfo{PeerId: &id}, nil).AnyTimes() - - pbm := &pb.Message{ - Data: buf.Bytes(), - Topic: &topic, - From: []byte(id), - Seqno: []byte{1, 1, 1, 1, 2, 2, 2, 2}, - } - validate := subject.Validate(context.Background(), peer.ID(senderPID), &pubsub.Message{ - Message: pbm, - ReceivedFrom: peer.ID("f01024"), // peer.ID(senderPID), - ValidatorData: nil, - }) - if validate != pubsub.ValidationAccept { - t.Error("Expected to receive ValidationAccept") - } - msgInfo, cached := subject.peerCache.Get(addr) - if !cached { - t.Fatal("Message info should be in cache") - } - seqno := msgInfo.lastSeqno - msgInfo.rateLimit = nil // prevent interference from rate limiting - - t.Log("Sending DoS msg") - privk, _, err = crypto.GenerateKeyPair(crypto.RSA, 2048) - if err != nil { - t.Fatal(err) - } - id2, err := peer.IDFromPublicKey(privk.GetPublic()) - if err != nil { - t.Fatal(err) - } - pbm = &pb.Message{ - Data: buf.Bytes(), - Topic: &topic, - From: []byte(id2), - Seqno: []byte{255, 255, 255, 255, 255, 255, 255, 255}, - } - validate = subject.Validate(context.Background(), peer.ID(senderPID), &pubsub.Message{ - Message: pbm, - ReceivedFrom: peer.ID(senderPID), - ValidatorData: nil, - }) - if validate != pubsub.ValidationReject { - t.Error("Expected to get ValidationReject") - } - msgInfo, cached = subject.peerCache.Get(addr) - if !cached { - t.Fatal("Message info should be in cache") - } - msgInfo.rateLimit = nil // prevent interference from rate limiting - - // Check if DoS is possible. - if msgInfo.lastSeqno != seqno { - t.Fatal("Sequence number should not have been updated") - } - - t.Log("Sending another valid message from miner...") - pbm = &pb.Message{ - Data: buf.Bytes(), - Topic: &topic, - From: []byte(id), - Seqno: []byte{1, 1, 1, 1, 2, 2, 2, 3}, - } - validate = subject.Validate(context.Background(), peer.ID(senderPID), &pubsub.Message{ - Message: pbm, - ReceivedFrom: peer.ID("f01024"), // peer.ID(senderPID), - ValidatorData: nil, - }) - if validate != pubsub.ValidationAccept { - t.Fatal("Did not receive ValidationAccept") - } -} diff --git a/go.mod b/go.mod index edace0bfb97..16551239caf 100644 --- a/go.mod +++ b/go.mod @@ -102,7 +102,6 @@ require ( github.com/ipld/go-car v0.6.2 github.com/ipld/go-car/v2 v2.13.1 github.com/ipld/go-ipld-prime v0.21.0 - github.com/ipni/go-libipni v0.0.8 github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 github.com/jpillora/backoff v1.0.0 github.com/kelseyhightower/envconfig v1.4.0 @@ -321,6 +320,7 @@ require ( github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/shirou/gopsutil v2.18.12+incompatible // indirect github.com/shopspring/decimal v1.4.0 // indirect + github.com/smartystreets/assertions v1.13.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/cast v1.7.0 // indirect github.com/tidwall/gjson v1.14.4 // indirect diff --git a/go.sum b/go.sum index 4ac23ae9d06..95caaa4d37c 100644 --- a/go.sum +++ b/go.sum @@ -715,8 +715,6 @@ github.com/ipld/go-ipld-prime v0.21.0/go.mod h1:3RLqy//ERg/y5oShXXdx5YIp50cFGOan github.com/ipld/go-ipld-prime-proto v0.0.0-20191113031812-e32bd156a1e5/go.mod h1:gcvzoEDBjwycpXt3LBE061wT9f46szXGHAmj9uoP6fU= github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236dd h1:gMlw/MhNr2Wtp5RwGdsW23cs+yCuj9k2ON7i9MiJlRo= github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236dd/go.mod h1:wZ8hH8UxeryOs4kJEJaiui/s00hDSbE37OKsL47g+Sw= -github.com/ipni/go-libipni v0.0.8 h1:0wLfZRSBG84swmZwmaLKul/iB/FlBkkl9ZcR1ub+Z+w= -github.com/ipni/go-libipni v0.0.8/go.mod h1:paYP9U4N3/vOzGCuN9kU972vtvw9JUcQjOKyiCFGwRk= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4= github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 h1:Dj0L5fhJ9F82ZJyVOmBx6msDp/kfd1t9GRfny/mfJA0= diff --git a/node/builder.go b/node/builder.go index 7d03e9593a4..98b40355dfa 100644 --- a/node/builder.go +++ b/node/builder.go @@ -104,6 +104,13 @@ const ( HandleIncomingMessagesKey HandlePaymentChannelManagerKey + // Deprecated: RelayIndexerMessagesKey is no longer used, since IPNI has + // deprecated the use of GossipSub for propagating advertisements. Use IPNI Sync + // protocol instead. + // + // See: + // - https://github.com/ipni/specs/blob/main/IPNI_HTTP_PROVIDER.md + // - https://github.com/ipni/go-libipni/tree/main/dagsync/ipnisync RelayIndexerMessagesKey // miner diff --git a/node/builder_chain.go b/node/builder_chain.go index 72d6f2ee7f1..1293dcd0c76 100644 --- a/node/builder_chain.go +++ b/node/builder_chain.go @@ -129,8 +129,6 @@ var ChainNode = Options( Override(new(*full.GasPriceCache), full.NewGasPriceCache), - Override(RelayIndexerMessagesKey, modules.RelayIndexerMessages), - // Lite node API ApplyIf(isLiteNode, Override(new(messagepool.Provider), messagepool.NewProviderLite), diff --git a/node/modules/lp2p/pubsub.go b/node/modules/lp2p/pubsub.go index 20a222cd21c..a8560a539ea 100644 --- a/node/modules/lp2p/pubsub.go +++ b/node/modules/lp2p/pubsub.go @@ -145,22 +145,6 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), } - ingestTopicParams := &pubsub.TopicScoreParams{ - // expected ~0.5 confirmed deals / min. sampled - TopicWeight: 0.1, - - TimeInMeshWeight: 0.00027, // ~1/3600 - TimeInMeshQuantum: time.Second, - TimeInMeshCap: 1, - - FirstMessageDeliveriesWeight: 0.5, - FirstMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), - FirstMessageDeliveriesCap: 100, // allowing for burstiness - - InvalidMessageDeliveriesWeight: -1000, - InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), - } - topicParams := map[string]*pubsub.TopicScoreParams{ build.BlocksTopic(in.Nn): { // expected 10 blocks/min @@ -255,9 +239,6 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { drandTopics = append(drandTopics, topic) } - // Index ingestion whitelist - topicParams[build.IndexerIngestTopic(in.Nn)] = ingestTopicParams - // IP colocation whitelist var ipcoloWhitelist []*net.IPNet for _, cidr := range in.Cfg.IPColocationWhitelist { @@ -382,7 +363,6 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { allowTopics := []string{ build.BlocksTopic(in.Nn), build.MessagesTopic(in.Nn), - build.IndexerIngestTopic(in.Nn), } allowTopics = append(allowTopics, drandTopics...) diff --git a/node/modules/services.go b/node/modules/services.go index 37ae325d3ec..267bbe0b81d 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -31,7 +31,6 @@ import ( "github.com/filecoin-project/lotus/journal/fsjournal" "github.com/filecoin-project/lotus/lib/peermgr" "github.com/filecoin-project/lotus/node/hello" - "github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/repo" @@ -189,36 +188,6 @@ func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub waitForSync(stmgr, pubsubMsgsSyncEpochs, subscribe) } -func RelayIndexerMessages(lc fx.Lifecycle, ps *pubsub.PubSub, nn dtypes.NetworkName, h host.Host, chainModule full.ChainModuleAPI, stateModule full.StateModuleAPI) error { - topicName := build.IndexerIngestTopic(nn) - - v := sub.NewIndexerMessageValidator(h.ID(), chainModule, stateModule) - - if err := ps.RegisterTopicValidator(topicName, v.Validate); err != nil { - return xerrors.Errorf("failed to register validator for topic %s, err: %w", topicName, err) - } - - topicHandle, err := ps.Join(topicName) - if err != nil { - return xerrors.Errorf("failed to join pubsub topic %s: %w", topicName, err) - } - cancelFunc, err := topicHandle.Relay() - if err != nil { - return xerrors.Errorf("failed to relay to pubsub messages for topic %s: %w", topicName, err) - } - - // Cancel message relay on shutdown. - lc.Append(fx.Hook{ - OnStop: func(_ context.Context) error { - cancelFunc() - return nil - }, - }) - - log.Infof("relaying messages for pubsub topic %s", topicName) - return nil -} - type RandomBeaconParams struct { fx.In