Skip to content

Commit

Permalink
Rebroadcast already signed messages from WAL
Browse files Browse the repository at this point in the history
Previously, rebroadcast functioned by keeping track of message templates
and calling `RequestBroadcast` again to re-broadcast them. This approach
can cause erroneous behaviour when messages are re-signed in conjunction
with participant churn.

To avoid this category of errors treat broadcast final: Every
broadcasted message is recorded in WAL and once recorded the same
message is used for re-broadcast; no re-signing. A dedicated
re-broadcast API is introduced to take instance, round and phase and use
it to then look up broadcast messages.

Note that as a result of the changes above, failures are also sicky. In
that, if signing of a message fails at the time of original broadcast,
future requests to rebroadcast it will have no effect.

The changes here also include a series of WAL functionality refactors to
make it easier to traverse write-ahead log entries.

Fixes #474
  • Loading branch information
masih committed Oct 3, 2024
1 parent 5453867 commit cfa2e0d
Show file tree
Hide file tree
Showing 15 changed files with 363 additions and 159 deletions.
19 changes: 16 additions & 3 deletions emulator/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ type driverHost struct {
pendingAlarm *time.Time
receivedBroadcasts []*gpbft.GMessage
chain map[uint64]*Instance
messages MessageCache
}

func newHost(t *testing.T, signing Signing) *driverHost {
return &driverHost{
Signing: signing,
t: t,
chain: make(map[uint64]*Instance),
Signing: signing,
t: t,
chain: make(map[uint64]*Instance),
messages: NewMessageCache(),
}
}

Expand All @@ -48,6 +50,17 @@ func (h *driverHost) RequestBroadcast(mb *gpbft.MessageBuilder) error {
return err
}
h.receivedBroadcasts = append(h.receivedBroadcasts, msg)
require.True(h.t, h.messages.PutIfAbsent(msg))
return nil
}

func (h *driverHost) RequestRebroadcast(instance, round uint64, phase gpbft.Phase) error {
message, found := h.messages.Get(instance, round, phase)
if found {
h.receivedBroadcasts = append(h.receivedBroadcasts, message)
} else {
fmt.Printf("asdasd")
}
return nil
}

Expand Down
39 changes: 39 additions & 0 deletions emulator/message_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package emulator

import "github.com/filecoin-project/go-f3/gpbft"

type MessageKey struct {
Instance, Round uint64
Phase gpbft.Phase
}

// MessageCache is a repository of messages keyed by their instance, round and
// phase. This cache is used for testing purposes only and has no eviction
// strategy. It is primarily used to store messages from self for rebroadcast.
type MessageCache map[MessageKey]*gpbft.GMessage

func NewMessageCache() MessageCache {
return make(map[MessageKey]*gpbft.GMessage)
}

func (mc MessageCache) Get(instance, round uint64, phase gpbft.Phase) (*gpbft.GMessage, bool) {
msg, found := mc[MessageKey{
Instance: instance,
Round: round,
Phase: phase,
}]
return msg, found
}

func (mc MessageCache) PutIfAbsent(msg *gpbft.GMessage) bool {
key := MessageKey{
Instance: msg.Vote.Instance,
Round: msg.Vote.Round,
Phase: msg.Vote.Phase,
}
if _, found := mc[key]; found {
return false
}
mc[key] = msg
return true
}
2 changes: 1 addition & 1 deletion f3.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (m *F3) Resume(ctx context.Context) error {
cleanName = strings.ReplaceAll(cleanName, "\u0000", "")

walPath := filepath.Join(m.diskPath, "wal", cleanName)
wal, err := writeaheadlog.Open[walEntry](walPath)
wal, err := writeaheadlog.OpenMessageWriteAheadLog(walPath)
if err != nil {
return fmt.Errorf("opening WAL: %w", err)
}
Expand Down
3 changes: 3 additions & 0 deletions gpbft/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ type Network interface {
NetworkName() NetworkName
// Requests that the message is signed and broadcasted, it should also be delivered locally
RequestBroadcast(mb *MessageBuilder) error
// RequestRebroadcast requests that a message at given instance, round and phase
// previously broadcasted via RequestBroadcast be rebroadcasted.
RequestRebroadcast(instance, round uint64, phase Phase) error
}

type Clock interface {
Expand Down
66 changes: 40 additions & 26 deletions gpbft/gpbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,27 +959,27 @@ func (i *instance) rebroadcast() error {
// Note that the implementation here rebroadcasts more messages than FIP-0086
// strictly requires. Because, the cost of rebroadcasting additional messages is
// small compared to the reduction in need for rebroadcast.
var msgs []*MessageBuilder
var requests []*rebroadcastRequest
if i.broadcasted.decide != nil {
msgs = append(msgs, i.broadcasted.decide)
requests = append(requests, i.broadcasted.decide)
} else {
// We rebroadcast messages newest to oldest, except quality which we always
// rebroadcast first. That way, if we try to rebroadcast too many at once and some
// get dropped, chances are we've already rebroadcast the most useful messages.
msgs = append(msgs, i.broadcasted.previousRound...)
msgs = append(msgs, i.broadcasted.currentRound...)
requests = append(requests, i.broadcasted.previousRound...)
requests = append(requests, i.broadcasted.currentRound...)
if i.broadcasted.quality != nil {
msgs = append(msgs, i.broadcasted.quality)
requests = append(requests, i.broadcasted.quality)
}
slices.Reverse(msgs)
slices.Reverse(requests)
}
for _, mb := range msgs {
if err := i.participant.host.RequestBroadcast(mb); err != nil {
for _, request := range requests {
if err := i.participant.host.RequestRebroadcast(i.instanceID, request.round, request.phase); err != nil {
// Silently log the error and proceed. This is consistent with the behaviour of
// instance for regular broadcasts.
i.log("failed to request rebroadcast %s at round %d: %v", mb.Payload.Phase, mb.Payload.Round, err)
i.log("failed to request rebroadcast %s at round %d: %v", request.phase, request.round, err)
} else {
i.log("rebroadcasting %s at round %d for value %s", mb.Payload.Phase.String(), mb.Payload.Round, mb.Payload.Value)
i.log("rebroadcasting %s at round %d", request.phase, request.round)
metrics.reBroadcastCounter.Add(context.TODO(), 1)
}
}
Expand Down Expand Up @@ -1402,21 +1402,35 @@ func (c *convergeState) FindProposalFor(chain ECChain) ConvergeValue {
}

type broadcastState struct {
// Our quality message, always rebroadcast until we reach a decision.
quality *MessageBuilder
// Our decide message. If set, this is the only message we rebroadcast (and all other fields
// should be nil).
decide *MessageBuilder
// The messages we sent during the previous round (`round - 1`), except quality/decide.
// Rebroadcast unless `decide` is non-nil.
previousRound []*MessageBuilder
// The messages we sent this round (`round`), except quality/decide. Rebroadcast unless
// `decide` is non-nil.
currentRound []*MessageBuilder
// quality captures the broadcasted QUALITY message. This message is always
// rebroadcasted until we reach a decision.
quality *rebroadcastRequest
// decide captures the broadcasted DECIDE message. If set, this is the only
// message that is rebroadcasted and all other fields are cleared.
decide *rebroadcastRequest
// previousRound contains list of rebroadcast requests that correspond to the
// messages we sent during the previous round (`round - 1`), except
// quality/decide. Rebroadcast unless `decide` is non-nil.
previousRound []*rebroadcastRequest
// currentRound contains list of rebroadcast requests that correspond to the
// messages we sent this round (`round`), except quality/decide. Rebroadcast
// unless `decide` is non-nil.
currentRound []*rebroadcastRequest
// The latest round in which we broadcast a message.
round uint64
}

// rebroadcastRequest captures just enough metadata for making a rebroadcast
// request.
type rebroadcastRequest struct {
round uint64
phase Phase
}

func newRebroadcastRequest(mb *MessageBuilder) *rebroadcastRequest {
return &rebroadcastRequest{mb.Payload.Round, mb.Payload.Phase}
}

// record stores messages that are required should rebroadcast becomes necessary.
// The messages stored depend on the current progress of instance. If the
// instance progresses to DECIDE then only the decide message will be recorded.
Expand All @@ -1439,28 +1453,28 @@ func (bs *broadcastState) record(mb *MessageBuilder) {
}
switch mb.Payload.Phase {
case QUALITY_PHASE:
bs.quality = mb
bs.quality = newRebroadcastRequest(mb)
case DECIDE_PHASE:
// Clear all previous messages, as only DECIDE message need to be rebroadcasted.
// Note that DECIDE message is not associated to any round.
bs.currentRound = nil
bs.previousRound = nil
bs.quality = nil
bs.decide = mb
bs.decide = newRebroadcastRequest(mb)
case CONVERGE_PHASE, PREPARE_PHASE, COMMIT_PHASE:
switch {
case mb.Payload.Round < bs.round:
log.Warnw("Unexpected broadcast of a message for a prior round",
"latestRecordedRound", bs.round, "messageRound", mb.Payload.Round)
return
case mb.Payload.Round == bs.round:
bs.currentRound = append(bs.currentRound, mb)
bs.currentRound = append(bs.currentRound, newRebroadcastRequest(mb))
case mb.Payload.Round == bs.round+1:
bs.previousRound = bs.currentRound
bs.currentRound = []*MessageBuilder{mb}
bs.currentRound = []*rebroadcastRequest{newRebroadcastRequest(mb)}
default:
bs.previousRound = nil
bs.currentRound = []*MessageBuilder{mb}
bs.currentRound = []*rebroadcastRequest{newRebroadcastRequest(mb)}
}
bs.round = mb.Payload.Round
default:
Expand Down
48 changes: 48 additions & 0 deletions gpbft/mock_host_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions gpbft/participant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1153,7 +1153,7 @@ func TestParticipant_WithMisbehavingSigner(t *testing.T) {
// Panic at every signing operation.
driver.SetSigning(emulator.PanicSigning())

// Expect that instance does not start, capturs panic and returns an error.
// Expect that instance does not start, captures panic and returns an error.
require.ErrorContains(t, driver.StartInstance(instance.ID()), "participant panicked")
// Expect no broadcast as the instance is not started
driver.RequireNoBroadcast()
Expand Down Expand Up @@ -1182,12 +1182,12 @@ func TestParticipant_WithMisbehavingSigner(t *testing.T) {
driver.SetSigning(emulator.AdhocSigning())
// Trigger timeout at PREPARE to schedule re-broadcast.
driver.RequireDeliverAlarm()
// Trigger re-broadcast timeout to force broadcast attempt for QUALITY and
// PREPARE from round 0, which should have been scheduled regardless of signing
// panic.
// Trigger re-broadcast timeout to force broadcast attempt for QUALITY but not
// PREPARE. Because, signing of PREPARE from round 0 never actually happened due
// to panic and should be missing from the state (i.e. WAL in production code).
driver.RequireDeliverAlarm()
driver.RequireQuality()
driver.RequirePrepare(instance.Proposal().BaseChain())
driver.RequireNoBroadcast()
})
}

Expand Down
Loading

0 comments on commit cfa2e0d

Please sign in to comment.