Skip to content

Commit

Permalink
Broadcast messages via a blocking channel (#445)
Browse files Browse the repository at this point in the history
Otherwise rebroadcast will end up dropping the last and most important
messages when the message bus blocks.

fixes #443
  • Loading branch information
Stebalien authored Jul 11, 2024
1 parent 18fa7de commit 0304756
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 69 deletions.
58 changes: 29 additions & 29 deletions cmd/f3/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"fmt"
"math/big"
"os"

Expand Down Expand Up @@ -120,47 +121,46 @@ var runCmd = cli.Command{
return xerrors.Errorf("creating module: %w", err)
}

go runMessageSubscription(ctx, module, gpbft.ActorID(id), signingBackend)
errCh := make(chan error, 1)
go func() { errCh <- runMessageSubscription(ctx, module, gpbft.ActorID(id), signingBackend) }()

if err := module.Start(ctx); err != nil {
return nil
}
<-ctx.Done()
select {
case err := <-errCh:
if err != nil {
log.Error(err)
}
case <-ctx.Done():
}
return module.Stop(context.Background())
},
}

func runMessageSubscription(ctx context.Context, module *f3.F3, actorID gpbft.ActorID, signer gpbft.Signer) {
func runMessageSubscription(ctx context.Context, module *f3.F3, actorID gpbft.ActorID, signer gpbft.Signer) error {
for ctx.Err() == nil {

ch := make(chan *gpbft.MessageBuilder, 4)
module.SubscribeForMessagesToSign(ch)
inner:
for {
select {
case mb, ok := <-ch:
if !ok {
// the broadcast bus kicked us out
log.Infof("lost message bus subscription, retrying")
break inner
}
signatureBuilder, err := mb.PrepareSigningInputs(actorID)
if err != nil {
log.Errorf("preparing signing inputs: %+v", err)
}
// signatureBuilder can be sent over RPC
payloadSig, vrfSig, err := signatureBuilder.Sign(ctx, signer)
if err != nil {
log.Errorf("signing message: %+v", err)
}
// signatureBuilder and signatures can be returned back over RPC
module.Broadcast(ctx, signatureBuilder, payloadSig, vrfSig)
case <-ctx.Done():
return
select {
case mb, ok := <-module.MessagesToSign():
if !ok {
return nil
}
signatureBuilder, err := mb.PrepareSigningInputs(actorID)
if err != nil {
return fmt.Errorf("preparing signing inputs: %w", err)
}
// signatureBuilder can be sent over RPC
payloadSig, vrfSig, err := signatureBuilder.Sign(ctx, signer)
if err != nil {
return fmt.Errorf("signing message: %w", err)
}
// signatureBuilder and signatures can be returned back over RPC
module.Broadcast(ctx, signatureBuilder, payloadSig, vrfSig)
case <-ctx.Done():
return nil
}

}
return nil
}

type discoveryNotifee struct {
Expand Down
20 changes: 8 additions & 12 deletions f3.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"

"github.com/Kubuxu/go-broadcast"
"github.com/ipfs/go-datastore"
"go.uber.org/multierr"
"golang.org/x/sync/errgroup"
Expand All @@ -28,7 +27,7 @@ type F3 struct {
verifier gpbft.Verifier
manifestProvider manifest.ManifestProvider

busBroadcast broadcast.Channel[*gpbft.MessageBuilder]
outboundMessages chan *gpbft.MessageBuilder

host host.Host
ds datastore.Datastore
Expand Down Expand Up @@ -58,6 +57,7 @@ func New(_ctx context.Context, manifest manifest.ManifestProvider, ds datastore.
return &F3{
verifier: verif,
manifestProvider: manifest,
outboundMessages: make(chan *gpbft.MessageBuilder, 128),
host: h,
ds: ds,
ec: ec,
Expand All @@ -68,15 +68,11 @@ func New(_ctx context.Context, manifest manifest.ManifestProvider, ds datastore.
}, nil
}

// SubscribeForMessagesToSign is used to subscribe to the message broadcast channel.
// After perparing inputs and signing over them, Broadcast should be called.
//
// If the passed channel is full at any point, it will be dropped from subscription and closed.
// To stop subscribing, either the closer function can be used, or the channel can be abandoned.
// Passing a channel multiple times to the Subscribe function will result in a panic.
func (m *F3) SubscribeForMessagesToSign(ch chan<- *gpbft.MessageBuilder) (closer func()) {
_, closer = m.busBroadcast.Subscribe(ch)
return closer
// MessageStoSign returns a channel of outbound messages that need to be signed by the client(s).
// - The same channel is shared between all callers and will never be closed.
// - GPBFT will block if this channel is not read from.
func (m *F3) MessagesToSign() <-chan *gpbft.MessageBuilder {
return m.outboundMessages
}

func (m *F3) Manifest() *manifest.Manifest {
Expand Down Expand Up @@ -341,7 +337,7 @@ func (m *F3) resumeInternal(ctx context.Context) error {

if runner, err := newRunner(
ctx, m.cs, runnerEc, m.pubsub, m.verifier,
m.busBroadcast.Publish, m.manifest,
m.outboundMessages, m.manifest,
); err != nil {
return err
} else {
Expand Down
14 changes: 9 additions & 5 deletions host.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type gpbftRunner struct {
ec ec.Backend
pubsub *pubsub.PubSub
verifier gpbft.Verifier
broadcastCb BroadcastMessage
outMessages chan<- *gpbft.MessageBuilder

participant *gpbft.Participant
topic *pubsub.Topic
Expand All @@ -47,7 +47,7 @@ func newRunner(
ec ec.Backend,
ps *pubsub.PubSub,
verifier gpbft.Verifier,
broadcastCb BroadcastMessage,
out chan<- *gpbft.MessageBuilder,
m *manifest.Manifest,
) (*gpbftRunner, error) {
runningCtx, ctxCancel := context.WithCancel(context.Background())
Expand All @@ -59,7 +59,7 @@ func newRunner(
ec: ec,
pubsub: ps,
verifier: verifier,
broadcastCb: broadcastCb,
outMessages: out,
runningCtx: runningCtx,
errgrp: errgrp,
ctxCancel: ctxCancel,
Expand Down Expand Up @@ -535,8 +535,12 @@ func (h *gpbftHost) NetworkName() gpbft.NetworkName {

// Sends a message to all other participants.
func (h *gpbftHost) RequestBroadcast(mb *gpbft.MessageBuilder) error {
(h.broadcastCb)(mb)
return nil
select {
case h.outMessages <- mb:
return nil
case <-h.runningCtx.Done():
return h.runningCtx.Err()
}
}

// Returns the current network time.
Expand Down
36 changes: 13 additions & 23 deletions test/f3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package test

import (
"context"
"fmt"
"math/big"
"os"
"testing"
Expand All @@ -14,7 +15,6 @@ import (
"github.com/filecoin-project/go-f3/manifest"
"github.com/filecoin-project/go-f3/sim/signing"
leveldb "github.com/ipfs/go-ds-leveldb"
logging "github.com/ipfs/go-log/v2"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -29,8 +29,6 @@ const (
logLevel = "info"
)

var log = logging.Logger("f3-testing")

func TestSimpleF3(t *testing.T) {
env := newTestEnvironment(t, 2, false)

Expand Down Expand Up @@ -323,12 +321,17 @@ func (e *testEnv) waitForManifestChange(prev *manifest.Manifest, timeout time.Du
}, timeout, ManifestSenderTimeout)
}

func newTestEnvironment(t *testing.T, n int, dynamicManifest bool) testEnv {
func newTestEnvironment(t *testing.T, n int, dynamicManifest bool) *testEnv {
ctx, cancel := context.WithCancel(context.Background())
grp, ctx := errgroup.WithContext(ctx)
env := testEnv{t: t, errgrp: grp, testCtx: ctx, net: mocknet.New()}
env := &testEnv{t: t, errgrp: grp, testCtx: ctx, net: mocknet.New()}

// Cleanup on exit.
env.t.Cleanup(func() {
cancel()
for _, n := range env.nodes {
require.NoError(env.t, n.f3.Stop(context.Background()))
}
require.NoError(env.t, env.errgrp.Wait())
})

Expand Down Expand Up @@ -360,6 +363,7 @@ func newTestEnvironment(t *testing.T, n int, dynamicManifest bool) testEnv {
for i := 0; i < n; i++ {
env.initNode(i, manifestServer)
}

return env
}

Expand Down Expand Up @@ -439,9 +443,6 @@ func (e *testEnv) resumeNode(i int) {
func (e *testEnv) startNode(i int) {
n := e.nodes[i]
require.NoError(e.t, n.f3.Start(e.testCtx))
e.t.Cleanup(func() {
require.NoError(e.t, n.f3.Stop(context.Background()))
})
}

func (e *testEnv) connectAll() {
Expand Down Expand Up @@ -495,11 +496,6 @@ func (e *testEnv) newF3Instance(id int, manifestServer peer.ID) (*testNode, erro
return nil, xerrors.Errorf("creating temp dir: %w", err)
}

err = logging.SetLogLevel("f3-testing", logLevel)
if err != nil {
return nil, xerrors.Errorf("setting log level: %w", err)
}

ds, err := leveldb.NewDatastore(tmpdir, nil)
if err != nil {
return nil, xerrors.Errorf("creating a datastore: %w", err)
Expand Down Expand Up @@ -530,26 +526,20 @@ func (e *testEnv) newF3Instance(id int, manifestServer peer.ID) (*testNode, erro
// TODO: This code is copy-pasta from cmd/f3/run.go, consider taking it out into a shared testing lib.
// We could do the same to the F3 test instantiation
func runMessageSubscription(ctx context.Context, module *f3.F3, actorID gpbft.ActorID, signer gpbft.Signer) error {
ch := make(chan *gpbft.MessageBuilder, 4)
module.SubscribeForMessagesToSign(ch)
for ctx.Err() == nil {
select {
case mb, ok := <-ch:
case mb, ok := <-module.MessagesToSign():
if !ok {
// the broadcast bus kicked us out
log.Infof("lost message bus subscription, retrying")
ch = make(chan *gpbft.MessageBuilder, 4)
module.SubscribeForMessagesToSign(ch)
continue
return nil
}
signatureBuilder, err := mb.PrepareSigningInputs(actorID)
if err != nil {
return xerrors.Errorf("preparing signing inputs: %w", err)
return fmt.Errorf("preparing signing inputs: %w", err)
}
// signatureBuilder can be sent over RPC
payloadSig, vrfSig, err := signatureBuilder.Sign(ctx, signer)
if err != nil {
return xerrors.Errorf("signing message: %w", err)
return fmt.Errorf("signing message: %w", err)
}
// signatureBuilder and signatures can be returned back over RPC
module.Broadcast(ctx, signatureBuilder, payloadSig, vrfSig)
Expand Down

0 comments on commit 0304756

Please sign in to comment.