diff --git a/cmd/f3/run.go b/cmd/f3/run.go index 7353c3d0..0bb59e84 100644 --- a/cmd/f3/run.go +++ b/cmd/f3/run.go @@ -2,6 +2,7 @@ package main import ( "context" + "fmt" "math/big" "os" @@ -120,47 +121,46 @@ var runCmd = cli.Command{ return xerrors.Errorf("creating module: %w", err) } - go runMessageSubscription(ctx, module, gpbft.ActorID(id), signingBackend) + errCh := make(chan error, 1) + go func() { errCh <- runMessageSubscription(ctx, module, gpbft.ActorID(id), signingBackend) }() if err := module.Start(ctx); err != nil { return nil } - <-ctx.Done() + select { + case err := <-errCh: + if err != nil { + log.Error(err) + } + case <-ctx.Done(): + } return module.Stop(context.Background()) }, } -func runMessageSubscription(ctx context.Context, module *f3.F3, actorID gpbft.ActorID, signer gpbft.Signer) { +func runMessageSubscription(ctx context.Context, module *f3.F3, actorID gpbft.ActorID, signer gpbft.Signer) error { for ctx.Err() == nil { - - ch := make(chan *gpbft.MessageBuilder, 4) - module.SubscribeForMessagesToSign(ch) - inner: - for { - select { - case mb, ok := <-ch: - if !ok { - // the broadcast bus kicked us out - log.Infof("lost message bus subscription, retrying") - break inner - } - signatureBuilder, err := mb.PrepareSigningInputs(actorID) - if err != nil { - log.Errorf("preparing signing inputs: %+v", err) - } - // signatureBuilder can be sent over RPC - payloadSig, vrfSig, err := signatureBuilder.Sign(ctx, signer) - if err != nil { - log.Errorf("signing message: %+v", err) - } - // signatureBuilder and signatures can be returned back over RPC - module.Broadcast(ctx, signatureBuilder, payloadSig, vrfSig) - case <-ctx.Done(): - return + select { + case mb, ok := <-module.MessagesToSign(): + if !ok { + return nil + } + signatureBuilder, err := mb.PrepareSigningInputs(actorID) + if err != nil { + return fmt.Errorf("preparing signing inputs: %w", err) + } + // signatureBuilder can be sent over RPC + payloadSig, vrfSig, err := signatureBuilder.Sign(ctx, signer) + if err != nil { + return fmt.Errorf("signing message: %w", err) } + // signatureBuilder and signatures can be returned back over RPC + module.Broadcast(ctx, signatureBuilder, payloadSig, vrfSig) + case <-ctx.Done(): + return nil } - } + return nil } type discoveryNotifee struct { diff --git a/f3.go b/f3.go index c9933153..c7877058 100644 --- a/f3.go +++ b/f3.go @@ -17,7 +17,6 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" - "github.com/Kubuxu/go-broadcast" "github.com/ipfs/go-datastore" "go.uber.org/multierr" "golang.org/x/sync/errgroup" @@ -28,7 +27,7 @@ type F3 struct { verifier gpbft.Verifier manifestProvider manifest.ManifestProvider - busBroadcast broadcast.Channel[*gpbft.MessageBuilder] + outboundMessages chan *gpbft.MessageBuilder host host.Host ds datastore.Datastore @@ -58,6 +57,7 @@ func New(_ctx context.Context, manifest manifest.ManifestProvider, ds datastore. return &F3{ verifier: verif, manifestProvider: manifest, + outboundMessages: make(chan *gpbft.MessageBuilder, 128), host: h, ds: ds, ec: ec, @@ -68,15 +68,11 @@ func New(_ctx context.Context, manifest manifest.ManifestProvider, ds datastore. }, nil } -// SubscribeForMessagesToSign is used to subscribe to the message broadcast channel. -// After perparing inputs and signing over them, Broadcast should be called. -// -// If the passed channel is full at any point, it will be dropped from subscription and closed. -// To stop subscribing, either the closer function can be used, or the channel can be abandoned. -// Passing a channel multiple times to the Subscribe function will result in a panic. -func (m *F3) SubscribeForMessagesToSign(ch chan<- *gpbft.MessageBuilder) (closer func()) { - _, closer = m.busBroadcast.Subscribe(ch) - return closer +// MessageStoSign returns a channel of outbound messages that need to be signed by the client(s). +// - The same channel is shared between all callers and will never be closed. +// - GPBFT will block if this channel is not read from. +func (m *F3) MessagesToSign() <-chan *gpbft.MessageBuilder { + return m.outboundMessages } func (m *F3) Manifest() *manifest.Manifest { @@ -341,7 +337,7 @@ func (m *F3) resumeInternal(ctx context.Context) error { if runner, err := newRunner( ctx, m.cs, runnerEc, m.pubsub, m.verifier, - m.busBroadcast.Publish, m.manifest, + m.outboundMessages, m.manifest, ); err != nil { return err } else { diff --git a/host.go b/host.go index af4406e5..d1b7c0c4 100644 --- a/host.go +++ b/host.go @@ -29,7 +29,7 @@ type gpbftRunner struct { ec ec.Backend pubsub *pubsub.PubSub verifier gpbft.Verifier - broadcastCb BroadcastMessage + outMessages chan<- *gpbft.MessageBuilder participant *gpbft.Participant topic *pubsub.Topic @@ -47,7 +47,7 @@ func newRunner( ec ec.Backend, ps *pubsub.PubSub, verifier gpbft.Verifier, - broadcastCb BroadcastMessage, + out chan<- *gpbft.MessageBuilder, m *manifest.Manifest, ) (*gpbftRunner, error) { runningCtx, ctxCancel := context.WithCancel(context.Background()) @@ -59,7 +59,7 @@ func newRunner( ec: ec, pubsub: ps, verifier: verifier, - broadcastCb: broadcastCb, + outMessages: out, runningCtx: runningCtx, errgrp: errgrp, ctxCancel: ctxCancel, @@ -535,8 +535,12 @@ func (h *gpbftHost) NetworkName() gpbft.NetworkName { // Sends a message to all other participants. func (h *gpbftHost) RequestBroadcast(mb *gpbft.MessageBuilder) error { - (h.broadcastCb)(mb) - return nil + select { + case h.outMessages <- mb: + return nil + case <-h.runningCtx.Done(): + return h.runningCtx.Err() + } } // Returns the current network time. diff --git a/test/f3_test.go b/test/f3_test.go index 7cdb7108..d822e51b 100644 --- a/test/f3_test.go +++ b/test/f3_test.go @@ -2,6 +2,7 @@ package test import ( "context" + "fmt" "math/big" "os" "testing" @@ -14,7 +15,6 @@ import ( "github.com/filecoin-project/go-f3/manifest" "github.com/filecoin-project/go-f3/sim/signing" leveldb "github.com/ipfs/go-ds-leveldb" - logging "github.com/ipfs/go-log/v2" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" @@ -29,8 +29,6 @@ const ( logLevel = "info" ) -var log = logging.Logger("f3-testing") - func TestSimpleF3(t *testing.T) { env := newTestEnvironment(t, 2, false) @@ -323,12 +321,17 @@ func (e *testEnv) waitForManifestChange(prev *manifest.Manifest, timeout time.Du }, timeout, ManifestSenderTimeout) } -func newTestEnvironment(t *testing.T, n int, dynamicManifest bool) testEnv { +func newTestEnvironment(t *testing.T, n int, dynamicManifest bool) *testEnv { ctx, cancel := context.WithCancel(context.Background()) grp, ctx := errgroup.WithContext(ctx) - env := testEnv{t: t, errgrp: grp, testCtx: ctx, net: mocknet.New()} + env := &testEnv{t: t, errgrp: grp, testCtx: ctx, net: mocknet.New()} + + // Cleanup on exit. env.t.Cleanup(func() { cancel() + for _, n := range env.nodes { + require.NoError(env.t, n.f3.Stop(context.Background())) + } require.NoError(env.t, env.errgrp.Wait()) }) @@ -360,6 +363,7 @@ func newTestEnvironment(t *testing.T, n int, dynamicManifest bool) testEnv { for i := 0; i < n; i++ { env.initNode(i, manifestServer) } + return env } @@ -439,9 +443,6 @@ func (e *testEnv) resumeNode(i int) { func (e *testEnv) startNode(i int) { n := e.nodes[i] require.NoError(e.t, n.f3.Start(e.testCtx)) - e.t.Cleanup(func() { - require.NoError(e.t, n.f3.Stop(context.Background())) - }) } func (e *testEnv) connectAll() { @@ -495,11 +496,6 @@ func (e *testEnv) newF3Instance(id int, manifestServer peer.ID) (*testNode, erro return nil, xerrors.Errorf("creating temp dir: %w", err) } - err = logging.SetLogLevel("f3-testing", logLevel) - if err != nil { - return nil, xerrors.Errorf("setting log level: %w", err) - } - ds, err := leveldb.NewDatastore(tmpdir, nil) if err != nil { return nil, xerrors.Errorf("creating a datastore: %w", err) @@ -530,26 +526,20 @@ func (e *testEnv) newF3Instance(id int, manifestServer peer.ID) (*testNode, erro // TODO: This code is copy-pasta from cmd/f3/run.go, consider taking it out into a shared testing lib. // We could do the same to the F3 test instantiation func runMessageSubscription(ctx context.Context, module *f3.F3, actorID gpbft.ActorID, signer gpbft.Signer) error { - ch := make(chan *gpbft.MessageBuilder, 4) - module.SubscribeForMessagesToSign(ch) for ctx.Err() == nil { select { - case mb, ok := <-ch: + case mb, ok := <-module.MessagesToSign(): if !ok { - // the broadcast bus kicked us out - log.Infof("lost message bus subscription, retrying") - ch = make(chan *gpbft.MessageBuilder, 4) - module.SubscribeForMessagesToSign(ch) - continue + return nil } signatureBuilder, err := mb.PrepareSigningInputs(actorID) if err != nil { - return xerrors.Errorf("preparing signing inputs: %w", err) + return fmt.Errorf("preparing signing inputs: %w", err) } // signatureBuilder can be sent over RPC payloadSig, vrfSig, err := signatureBuilder.Sign(ctx, signer) if err != nil { - return xerrors.Errorf("signing message: %w", err) + return fmt.Errorf("signing message: %w", err) } // signatureBuilder and signatures can be returned back over RPC module.Broadcast(ctx, signatureBuilder, payloadSig, vrfSig)