Skip to content

Commit

Permalink
Merge pull request #1442 from dusk-network/concurrent_voting
Browse files Browse the repository at this point in the history
Send reduction concurrently without blocking votes re-propagation
  • Loading branch information
goshawk-3 authored Jul 28, 2022
2 parents 3ebd6cb + d7d13ad commit 398f63f
Show file tree
Hide file tree
Showing 19 changed files with 270 additions and 111 deletions.
51 changes: 23 additions & 28 deletions pkg/core/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"encoding/hex"
"errors"
"sync"
"time"

"github.com/dusk-network/dusk-blockchain/pkg/config"
"github.com/dusk-network/dusk-blockchain/pkg/core/consensus"
Expand All @@ -32,6 +31,7 @@ import (
"github.com/dusk-network/dusk-blockchain/pkg/util/diagnostics"
"github.com/dusk-network/dusk-blockchain/pkg/util/nativeutils/eventbus"
"github.com/dusk-network/dusk-blockchain/pkg/util/nativeutils/rpcbus"
"github.com/dusk-network/dusk-blockchain/pkg/util/nativeutils/sortedset"
"github.com/dusk-network/dusk-protobuf/autogen/go/node"
"github.com/sirupsen/logrus"
logger "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -111,6 +111,7 @@ type Chain struct {
ctx context.Context

blacklisted dupemap.TmpMap
verified sortedset.SafeSet
}

// New returns a new chain object. It accepts the EventBus (for messages coming
Expand All @@ -129,6 +130,7 @@ func New(ctx context.Context, db database.DB, eventBus *eventbus.EventBus, rpcBu
loop: loop,
stopConsensusChan: make(chan struct{}),
blacklisted: *dupemap.NewTmpMap(1000, 120),
verified: sortedset.NewSafeSet(),
}

chain.synchronizer = newSynchronizer(db, chain)
Expand Down Expand Up @@ -384,11 +386,13 @@ func (c *Chain) ProcessSyncTimerExpired(strPeerAddr string) error {
// acceptSuccessiveBlock will accept a block which directly follows the chain
// tip, and advertises it to the node's peers.
func (c *Chain) acceptSuccessiveBlock(blk block.Block, kadcastHeight byte) error {
startTime := time.Now().UnixMilli()

log.WithField("height", blk.Header.Height).Trace("accepting succeeding block")

prevBlockTimestamp := c.tip.Header.Timestamp
// TODO: Verify Certificate
if err := c.propagateBlock(blk, kadcastHeight); err != nil {
log.WithError(err).Error("block propagation failed")
return err
}

if err := c.acceptBlock(blk, true); err != nil {
return err
Expand All @@ -398,28 +402,6 @@ func (c *Chain) acceptSuccessiveBlock(blk block.Block, kadcastHeight byte) error
c.highestSeen = blk.Header.Height
}

// Guarantee that the execution time of the entire process of accepting the
// next valid block (which includes rusk calls, block verification and
// persistence) will never be less than N Milliseconds.
// This won't be applied in cases when:
// node is in out-of-sync mode.
// block time is higher that ConsensusTimeThreshold
if prevBlockTimestamp+config.ConsensusTimeThreshold > blk.Header.Timestamp {
maxDelayMilli := config.Get().Consensus.ThrottleMilli
if maxDelayMilli == 0 {
maxDelayMilli = 2000
}

if d, err := util.Delay(startTime, maxDelayMilli); err == nil {
log.WithField("height", blk.Header.Height).WithField("sleep_for", d.String()).Trace("throttled")
}
}

if err := c.propagateBlock(blk, kadcastHeight); err != nil {
log.WithError(err).Error("block propagation failed")
return err
}

return nil
}

Expand Down Expand Up @@ -615,6 +597,7 @@ func (c *Chain) acceptBlock(blk block.Block, withSanityCheck bool) error {
}

c.tip = b
c.verified.Reset()

// 5. Perform all post-events on accepting a block
c.postAcceptBlock(*b, l)
Expand Down Expand Up @@ -692,7 +675,7 @@ func (c *Chain) postAcceptBlock(blk block.Block, l *logrus.Entry) {

// VerifyCandidateBlock can be used as a callback for the consensus in order to
// verify potential winning candidates.
func (c *Chain) VerifyCandidateBlock(candidate block.Block) error {
func (c *Chain) VerifyCandidateBlock(ctx context.Context, candidate block.Block) error {
var (
err error
chainTip block.Block
Expand All @@ -715,12 +698,24 @@ func (c *Chain) VerifyCandidateBlock(candidate block.Block) error {
return err
}

stateRoot, err = c.proxy.Executor().VerifyStateTransition(c.ctx, candidate.Txs, candidate.Header.GasLimit,
// Locking here would enable Chain to perform VST calls in a row, checking
// hash against cached hashes firstly.
c.verified.Lock()
defer c.verified.Unlock()

if c.verified.Contains(candidate.Header.Hash) {
// already verified
return nil
}

stateRoot, err = c.proxy.Executor().VerifyStateTransition(ctx, candidate.Txs, candidate.Header.GasLimit,
candidate.Header.Height, candidate.Header.GeneratorBlsPubkey)
if err != nil {
return err
}

c.verified.Insert(candidate.Header.Hash)

if !bytes.Equal(stateRoot, candidate.Header.StateHash) {
log.WithField("candidate_state_hash", hex.EncodeToString(candidate.Header.StateHash)).
WithField("vst_state_hash", hex.EncodeToString(stateRoot)).Error(errUnexpectedStateHash.Error())
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/consensus/comms.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type (

// CandidateVerificationFunc is a callback used to verify candidate blocks
// after the conclusion of the first reduction step.
CandidateVerificationFunc func(block.Block) error
CandidateVerificationFunc func(context.Context, block.Block) error

// ExecuteTxsFunc is a callback used to retrieve a valid set of txs.
ExecuteTxsFunc func(ctx context.Context, txs []transactions.ContractCall, blockHeight uint64, gasLimit uint64, generator []byte) ([]transactions.ContractCall, []byte, error)
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/consensus/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (m *mockPhase) String() string {
}

// nolint
func (m *mockPhase) Run(ctx context.Context, queue *Queue, evChan chan message.Message, r RoundUpdate, step uint8) PhaseFn {
func (m *mockPhase) Run(ctx context.Context, queue *Queue, _, _ chan message.Message, r RoundUpdate, step uint8) PhaseFn {
ctx = context.WithValue(ctx, "Packet", m.packet)
if stop := m.callback(ctx); stop {
return nil
Expand Down Expand Up @@ -179,7 +179,7 @@ func (t *TestPhase) Initialize(sv InternalPacket) PhaseFn {
}

// Run does nothing else than delegating to the specified callback.
func (t *TestPhase) Run(_ context.Context, queue *Queue, _ chan message.Message, _ RoundUpdate, step uint8) PhaseFn {
func (t *TestPhase) Run(_ context.Context, queue *Queue, _, _ chan message.Message, _ RoundUpdate, step uint8) PhaseFn {
t.callback(t.req, t.packet, t.streamer, t.aChan)
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/consensus/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type (
// PhaseFn represents the recursive consensus state function.
PhaseFn interface {
// Run the phase function.
Run(context.Context, *Queue, chan message.Message, RoundUpdate, uint8) PhaseFn
Run(context.Context, *Queue, chan message.Message, chan message.Message, RoundUpdate, uint8) PhaseFn

// String returns the description of this phase function.
String() string
Expand Down
15 changes: 15 additions & 0 deletions pkg/core/consensus/reduction/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,18 @@ func (a *Aggregator) addBitSet(sv *message.StepVotes, cluster sortedset.Cluster,
committee := a.handler.Committee(round, step)
sv.BitSet = committee.Bits(cluster.Set)
}

// Log dumps current state of voteSets in passed logrus.
func (a *Aggregator) Log(l *logrus.Entry, round uint64, step uint8) {
target := a.handler.Quorum(round)

for hash, sv := range a.voteSets {
total := sv.Cluster.TotalOccurrences()

l.WithField("hash", util.StringifyBytes([]byte(hash))).
WithField("total", total).
WithField("round", round).
WithField("step", step).
WithField("quorum_target", target).Info()
}
}
80 changes: 80 additions & 0 deletions pkg/core/consensus/reduction/asyncsend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT License was not distributed with this
// file, you can obtain one at https://opensource.org/licenses/MIT.
//
// Copyright (c) DUSK NETWORK. All rights reserved.

package reduction

import (
"context"
"fmt"

"github.com/dusk-network/dusk-blockchain/pkg/core/data/block"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message"
log "github.com/sirupsen/logrus"
)

const (
// Republish flag instructs AsyncSend to republish the output message.
Republish = iota
// ValidateOnly flag instrucst AsyncSend to validate only.
ValidateOnly
)

// AsyncSend is wrapper of SendReduction to call it in an asynchronous manner.
type AsyncSend struct {
*Reduction

round uint64
step uint8
candidate *block.Block
}

// NewAsyncSend ...
func NewAsyncSend(r *Reduction, round uint64, step uint8, candidate *block.Block) *AsyncSend {
return &AsyncSend{
Reduction: r,
round: round,
step: step,
candidate: candidate,
}
}

// Go executes SendReduction in a separate goroutine.
// Returns cancel func for canceling the started job.
func (a AsyncSend) Go(ctx context.Context, resp chan message.Message, flags int) context.CancelFunc {
ctx, cancel := context.WithCancel(ctx)

go func() {
defer a.recover()

m, _, err := a.SendReduction(ctx, a.round, a.step, a.candidate)
if err != nil {
return
}

if flags == Republish {
select {
case resp <- m:
default:
}

if err := a.Republish(m); err != nil {
panic(err)
}
}
}()

return cancel
}

func (a AsyncSend) recover() {
defer func() {
if r := recover(); r != nil {
log.WithField("round", a.round).WithField("step", a.step).
WithError(fmt.Errorf("%+v", r)).
Errorln("sending reduction err")
}
}()
}
4 changes: 2 additions & 2 deletions pkg/core/consensus/reduction/firststep/reduction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ func TestFirstStepReduction(t *testing.T) {
Seed: hash,
}

runTestCallback := firstStepReduction.Run(ctx, queue, evChan, r, step)
runTestCallback := firstStepReduction.Run(ctx, queue, evChan, evChan, r, step)
// testing the status of the step
ttest.testStep(t, firstStepReduction)
// here the tests are performed on the result of the step
_ = runTestCallback.Run(ctx, queue, evChan, r, step+1)
_ = runTestCallback.Run(ctx, queue, evChan, evChan, r, step+1)
})
}
}
28 changes: 16 additions & 12 deletions pkg/core/consensus/reduction/firststep/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,12 @@ func (p *Phase) String() string {
// Initialize passes to this reduction step the best score collected during selection.
func (p *Phase) Initialize(re consensus.InternalPacket) consensus.PhaseFn {
p.selectionResult = re.(message.NewBlock)
p.VerifiedHash = nil
return p
}

// Run the first reduction step until either there is a timeout, we reach 64%
// of votes, or we experience an unrecoverable error.
func (p *Phase) Run(ctx context.Context, queue *consensus.Queue, evChan chan message.Message, r consensus.RoundUpdate, step uint8) consensus.PhaseFn {
func (p *Phase) Run(ctx context.Context, queue *consensus.Queue, _, reductionChan chan message.Message, r consensus.RoundUpdate, step uint8) consensus.PhaseFn {
tlog := getLog(r.Round, step)

defer func() {
Expand All @@ -94,14 +93,19 @@ func (p *Phase) Run(ctx context.Context, queue *consensus.Queue, evChan chan mes
}

p.handler = reduction.NewHandler(p.Keys, r.P, r.Seed)
// first we send our own Selection
if p.handler.AmMember(r.Round, step) {
m, _ := p.SendReduction(r.Round, step, &p.selectionResult.Candidate)

// Queue my own vote to be registered locally
evChan <- m
// send our own Selection
a := reduction.NewAsyncSend(p.Reduction, r.Round, step, &p.selectionResult.Candidate)

if p.handler.AmMember(r.Round, step) {
_ = a.Go(ctx, reductionChan, reduction.Republish)
} else {
if p.handler.AmMember(r.Round, step+1) {
_ = a.Go(ctx, reductionChan, reduction.ValidateOnly)
}
}

// Process queued reduction messages
timeoutChan := time.After(p.TimeOut)
p.aggregator = reduction.NewAggregator(p.handler)

Expand Down Expand Up @@ -130,7 +134,7 @@ func (p *Phase) Run(ctx context.Context, queue *consensus.Queue, evChan chan mes

for {
select {
case ev := <-evChan:
case ev := <-reductionChan:
if reduction.ShouldProcess(ev, r.Round, step, queue) {
rMsg := ev.Payload().(message.Reduction)
if !p.handler.IsMember(rMsg.Sender(), r.Round, step) {
Expand All @@ -148,8 +152,12 @@ func (p *Phase) Run(ctx context.Context, queue *consensus.Queue, evChan chan mes
}

case <-timeoutChan:
l := lg.WithField("event", "timeout").WithField("duration", p.TimeOut.String())
p.aggregator.Log(l, r.Round, step)

// in case of timeout we proceed in the consensus with an empty hash
sv := p.createStepVoteMessage(reduction.EmptyResult, r.Round, step, *block.NewBlock())

return p.gotoNextPhase(sv)

case <-ctx.Done():
Expand All @@ -163,10 +171,6 @@ func (p *Phase) Run(ctx context.Context, queue *consensus.Queue, evChan chan mes
}

func (p *Phase) gotoNextPhase(msg *message.StepVotesMsg) consensus.PhaseFn {
if msg != nil {
msg.VerifiedHash = p.VerifiedHash
}

return p.next.Initialize(*msg)
}

Expand Down
Loading

0 comments on commit 398f63f

Please sign in to comment.