Skip to content

Commit

Permalink
Handle failure to receive/validate a decision (#436)
Browse files Browse the repository at this point in the history
Previously we'd panic. Now, we "handle" this by stopping. We will
automatically restart if/when we receive a _valid_ finality certificate
over the certificate exchange.
  • Loading branch information
Stebalien authored Jul 11, 2024
1 parent d0f5a05 commit d1b048a
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 40 deletions.
4 changes: 2 additions & 2 deletions emulator/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ func (h *driverHost) RequestBroadcast(mb *gpbft.MessageBuilder) error {
return nil
}

func (h *driverHost) ReceiveDecision(decision *gpbft.Justification) time.Time {
func (h *driverHost) ReceiveDecision(decision *gpbft.Justification) (time.Time, error) {
require.NoError(h.t, h.maybeReceiveDecision(decision))
return h.now
return h.now, nil
}

func (h *driverHost) maybeReceiveDecision(decision *gpbft.Justification) error {
Expand Down
2 changes: 1 addition & 1 deletion gpbft/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ type DecisionReceiver interface {
// The notification must return the timestamp at which the next instance should begin,
// based on the decision received (which may be in the past).
// E.g. this might be: finalised tipset timestamp + epoch duration + stabilisation delay.
ReceiveDecision(decision *Justification) time.Time
ReceiveDecision(decision *Justification) (time.Time, error)
}

// Tracer collects trace logs that capture logical state changes.
Expand Down
20 changes: 15 additions & 5 deletions gpbft/mock_host_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 18 additions & 9 deletions gpbft/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ type Participant struct {
gpbft *instance
// Messages queued for future instances.
mqueue *messageQueue
// The output from the last terminated Granite instance.
finalised *Justification
// The round number during which the last instance was terminated.
// This is for informational purposes only. It does not necessarily correspond to the
// protocol round for which a strong quorum of COMMIT messages was observed,
Expand Down Expand Up @@ -95,7 +93,8 @@ func (p *Participant) StartInstanceAt(instance uint64, when time.Time) (err erro

// Finish current instance to clean old committees and old messages queued
// and prepare to begin a new instance.
p.finishCurrentInstance(instance)
_ = p.finishCurrentInstance()
p.beginNextInstance(instance)

// Set the alarm to begin a new instance at the specified time.
p.host.SetAlarm(when)
Expand Down Expand Up @@ -266,21 +265,31 @@ func (p *Participant) fetchCommittee(instance uint64) (*committee, error) {
}

func (p *Participant) handleDecision() {
if p.terminated() {
p.finishCurrentInstance(p.currentInstance + 1)
nextStart := p.host.ReceiveDecision(p.finalised)
// Set an alarm at which to fetch the next chain and begin a new instance.
if !p.terminated() {
return
}
decision := p.finishCurrentInstance()
nextStart, err := p.host.ReceiveDecision(decision)
if err != nil {
p.tracer.Log("failed to receive decision: %+v", err)
p.host.SetAlarm(time.Time{})
} else {
p.beginNextInstance(p.currentInstance + 1)
p.host.SetAlarm(nextStart)
}
}

func (p *Participant) finishCurrentInstance(nextInstance uint64) {
func (p *Participant) finishCurrentInstance() *Justification {
var decision *Justification
if p.gpbft != nil {
p.finalised = p.gpbft.terminationValue
decision = p.gpbft.terminationValue
p.terminatedDuringRound = p.gpbft.round
}
p.gpbft = nil
return decision
}

func (p *Participant) beginNextInstance(nextInstance uint64) {
p.instanceMutex.Lock()
defer p.instanceMutex.Unlock()
// Clean all messages queued and old committees for instances below the next one.
Expand Down
20 changes: 16 additions & 4 deletions host.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"slices"
"time"

Expand Down Expand Up @@ -557,7 +558,16 @@ func (h *gpbftHost) SetAlarm(at time.Time) {
log.Debugf("set alarm for %v", at)
// we cannot reuse the timer because we don't know if it was read or not
h.alertTimer.Stop()
h.alertTimer = time.NewTimer(time.Until(at))
if at.IsZero() {
// It "at" is zero, we cancel the timer entirely. Unfortunately, we still have to
// replace it for the reason stated above.
h.alertTimer = time.NewTimer(0)
if !h.alertTimer.Stop() {
<-h.alertTimer.C
}
} else {
h.alertTimer = time.NewTimer(max(0, time.Until(at)))
}
}

// Receives a finality decision from the instance, with signatures from a strong quorum
Expand All @@ -566,14 +576,16 @@ func (h *gpbftHost) SetAlarm(at time.Time) {
// The notification must return the timestamp at which the next instance should begin,
// based on the decision received (which may be in the past).
// E.g. this might be: finalised tipset timestamp + epoch duration + stabilisation delay.
func (h *gpbftHost) ReceiveDecision(decision *gpbft.Justification) time.Time {
func (h *gpbftHost) ReceiveDecision(decision *gpbft.Justification) (time.Time, error) {
log.Infof("got decision at instance %d, finalized head at epoch: %d",
decision.Vote.Instance, decision.Vote.Value.Head().Epoch)
cert, err := h.saveDecision(decision)
if err != nil {
log.Errorf("error while saving decision: %+v", err)
err := fmt.Errorf("error while saving decision: %+v", err)
log.Error(err)
return time.Time{}, err
}
return (*gpbftRunner)(h).computeNextInstanceStart(cert)
return (*gpbftRunner)(h).computeNextInstanceStart(cert), nil
}

func (h *gpbftHost) saveDecision(decision *gpbft.Justification) (*certs.FinalityCertificate, error) {
Expand Down
4 changes: 2 additions & 2 deletions sim/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ func (v *simHost) Time() time.Time {
return v.sim.network.Time()
}

func (v *simHost) ReceiveDecision(decision *gpbft.Justification) time.Time {
func (v *simHost) ReceiveDecision(decision *gpbft.Justification) (time.Time, error) {
v.sim.ec.NotifyDecision(v.id, decision)
v.ecChain = decision.Vote.Value
return v.Time().Add(v.sim.ecEpochDuration).Add(v.sim.ecStabilisationDelay)
return v.Time().Add(v.sim.ecEpochDuration).Add(v.sim.ecStabilisationDelay), nil
}

func (v *simHost) StoragePower(instance uint64) *gpbft.StoragePower {
Expand Down
72 changes: 55 additions & 17 deletions test/f3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"fmt"
"math/big"
"os"
"sync/atomic"
"testing"
"time"

Expand All @@ -14,7 +14,10 @@ import (
"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/manifest"
"github.com/filecoin-project/go-f3/sim/signing"
leveldb "github.com/ipfs/go-ds-leveldb"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/failstore"
ds_sync "github.com/ipfs/go-datastore/sync"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -79,6 +82,37 @@ func TestPauseResumeCatchup(t *testing.T) {
env.waitForInstanceNumber(node0failInstance+3, 30*time.Second, false)
}

func TestFailRecover(t *testing.T) {
env := newTestEnvironment(t, 2, false)

// Make it possible to fail a single write for node 0.
var failDsWrite atomic.Bool
dsFailureFunc := func(op string) error {
if failDsWrite.Load() {
switch op {
case "put", "batch-put":
failDsWrite.Store(false)
return xerrors.Errorf("FAILURE!")
}
}
return nil
}

env.injectDatastoreFailures(0, dsFailureFunc)

env.connectAll()
env.start()
env.waitForInstanceNumber(1, 10*time.Second, true)

// Inject a single write failure. This should prevent us from storing a single decision
// decision.
failDsWrite.Store(true)

// We should proceed anyways (catching up via the certificate exchange protocol).
oldInstance := env.nodes[0].currentGpbftInstance()
env.waitForInstanceNumber(oldInstance+3, 10*time.Second, true)
}

func TestDynamicManifest_WithoutChanges(t *testing.T) {
env := newTestEnvironment(t, 2, true)

Expand Down Expand Up @@ -182,9 +216,10 @@ var base manifest.Manifest = manifest.Manifest{
}

type testNode struct {
e *testEnv
h host.Host
f3 *f3.F3
e *testEnv
h host.Host
f3 *f3.F3
dsErrFunc func(string) error
}

func (n *testNode) currentGpbftInstance() uint64 {
Expand Down Expand Up @@ -486,20 +521,19 @@ func (e *testEnv) newF3Instance(id int, manifestServer peer.ID) (*testNode, erro
return nil, xerrors.Errorf("creating libp2p host: %w", err)
}

n := &testNode{e: e, h: h}

ps, err := pubsub.NewGossipSub(e.testCtx, h)
if err != nil {
return nil, xerrors.Errorf("creating gossipsub: %w", err)
}

tmpdir, err := os.MkdirTemp("", "f3-*")
if err != nil {
return nil, xerrors.Errorf("creating temp dir: %w", err)
}

ds, err := leveldb.NewDatastore(tmpdir, nil)
if err != nil {
return nil, xerrors.Errorf("creating a datastore: %w", err)
}
ds := ds_sync.MutexWrap(failstore.NewFailstore(datastore.NewMapDatastore(), func(s string) error {
if n.dsErrFunc != nil {
return (n.dsErrFunc)(s)
}
return nil
}))

m := e.manifest // copy because we mutate this
var mprovider manifest.ManifestProvider
Expand All @@ -511,16 +545,20 @@ func (e *testEnv) newF3Instance(id int, manifestServer peer.ID) (*testNode, erro

e.signingBackend.Allow(int(id))

module, err := f3.New(e.testCtx, mprovider, ds, h, ps, e.signingBackend, e.ec)
n.f3, err = f3.New(e.testCtx, mprovider, ds, h, ps, e.signingBackend, e.ec)
if err != nil {
return nil, xerrors.Errorf("creating module: %w", err)
}

e.errgrp.Go(func() error {
return runMessageSubscription(e.testCtx, module, gpbft.ActorID(id), e.signingBackend)
return runMessageSubscription(e.testCtx, n.f3, gpbft.ActorID(id), e.signingBackend)
})

return &testNode{e: e, h: h, f3: module}, nil
return n, nil
}

func (e *testEnv) injectDatastoreFailures(i int, fn func(op string) error) {
e.nodes[i].dsErrFunc = fn
}

// TODO: This code is copy-pasta from cmd/f3/run.go, consider taking it out into a shared testing lib.
Expand Down

0 comments on commit d1b048a

Please sign in to comment.