From ed0a9d89d3cc946fef40d1534977fc2bbae56b72 Mon Sep 17 00:00:00 2001 From: goshawk Date: Tue, 12 Jul 2022 12:19:42 +0300 Subject: [PATCH 1/8] Report as a warning an event of discarding newblock while in a reduction --- pkg/core/consensus/reduction/firststep/step.go | 1 - pkg/core/consensus/reduction/reduction.go | 16 ++++++++++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/pkg/core/consensus/reduction/firststep/step.go b/pkg/core/consensus/reduction/firststep/step.go index 2afa31244..3e33b37b5 100644 --- a/pkg/core/consensus/reduction/firststep/step.go +++ b/pkg/core/consensus/reduction/firststep/step.go @@ -146,7 +146,6 @@ func (p *Phase) Run(ctx context.Context, queue *consensus.Queue, evChan chan mes return p.gotoNextPhase(sv) } } - case <-timeoutChan: // in case of timeout we proceed in the consensus with an empty hash sv := p.createStepVoteMessage(reduction.EmptyResult, r.Round, step, *block.NewBlock()) diff --git a/pkg/core/consensus/reduction/reduction.go b/pkg/core/consensus/reduction/reduction.go index 47757ce19..f96b0449c 100644 --- a/pkg/core/consensus/reduction/reduction.go +++ b/pkg/core/consensus/reduction/reduction.go @@ -179,14 +179,22 @@ func ShouldProcess(m message.Message, round uint64, step uint8, queue *consensus cmp := hdr.CompareRoundAndStep(round, step) if cmp == header.Before { - lg. + l := lg. WithFields(log.Fields{ - "topic": m.Category(), + "topic": m.Category().String(), "round": hdr.Round, "step": hdr.Step, "expected round": round, - }). - Debugln("discarding obsolete event") + }) + + // Report as a warning an event of discarding newblock while in a + // reduction phase. + if m.Category() == topics.NewBlock { + l.Warnln("discarding obsolete event") + } else { + l.Debugln("discarding obsolete event") + } + return false } From 6f21938e1a5b4527532c316790a8b19f0fe1886a Mon Sep 17 00:00:00 2001 From: goshawk Date: Thu, 14 Jul 2022 14:54:59 +0300 Subject: [PATCH 2/8] Implement candidate.Collector structure --- pkg/core/candidate/collector.go | 125 ++++++++++++++++++++++++++++++++ 1 file changed, 125 insertions(+) create mode 100644 pkg/core/candidate/collector.go diff --git a/pkg/core/candidate/collector.go b/pkg/core/candidate/collector.go new file mode 100644 index 000000000..95e7dd6c1 --- /dev/null +++ b/pkg/core/candidate/collector.go @@ -0,0 +1,125 @@ +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT License was not distributed with this +// file, you can obtain one at https://opensource.org/licenses/MIT. +// +// Copyright (c) DUSK NETWORK. All rights reserved. + +package candidate + +import ( + "bytes" + "errors" + + "github.com/dusk-network/dusk-blockchain/pkg/config" + "github.com/dusk-network/dusk-blockchain/pkg/core/consensus/committee" + "github.com/dusk-network/dusk-blockchain/pkg/core/consensus/header" + "github.com/dusk-network/dusk-blockchain/pkg/core/consensus/msg" + "github.com/dusk-network/dusk-blockchain/pkg/core/database" + "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message" + "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics" + "github.com/dusk-network/dusk-blockchain/pkg/util/nativeutils/eventbus" + "github.com/sirupsen/logrus" +) + +var ( + ErrInvalidNewBlock = errors.New("invalid newblock message") + ErrInvalidBlockHash = errors.New("invalid block hash") + ErrNotBlockGenerator = errors.New("message not signed by generator") + ErrInvalidMsgRound = errors.New("invalid message round") +) + +type Collector struct { + eventbus *eventbus.EventBus + handler *committee.Handler + db database.DB + + round uint64 + step uint8 + stepName string +} + +func NewCollector(e *eventbus.EventBus, h *committee.Handler, db database.DB, round uint64) *Collector { + return &Collector{ + eventbus: e, + handler: h, + db: db, + round: round, + } +} + +// UpdateStep set step/stepName +func (c *Collector) UpdateStep(step uint8, name string) { + c.step = step + c.stepName = name +} + +// Collect put the candidate block from message.Block to DB, if message is valid. +func (c *Collector) Collect(msg message.NewBlock, msgHeader []byte) error { + if msg.State().Round != c.round { + return ErrInvalidMsgRound + } + + // TODO: check msg.State().Step belongs to this Round and Iteration + log := logrus.WithField("process", c.stepName) + + if err := c.verify(msg); err != nil { + return ErrInvalidNewBlock + } + + // Persist Candidate Block on disk. + if err := c.db.Update(func(t database.Transaction) error { + // TODO: Check a candidate from this BLSKey for this iteration has been registered + return t.StoreCandidateMessage(msg.Candidate) + }); err != nil { + log.WithError(err).Errorln("could not store candidate") + } + + // Once the event is verified, and has passed all preliminary checks, + // we can republish it to the network. + m := message.NewWithHeader(topics.NewBlock, msg, msgHeader) + _ = c.eventbus.Publish(topics.Kadcast, m) + + return nil +} + +// verify executes a set of check points to ensure the hash of the candidate +// block has been signed by the single committee member of selection step of +// this iteration. +func (c *Collector) verify(msg message.NewBlock) error { + if !c.handler.IsMember(msg.State().PubKeyBLS, msg.State().Round, msg.State().Step, config.ConsensusSelectionMaxCommitteeSize) { + return ErrNotBlockGenerator + } + + // Verify message signagure + if err := verifySignature(msg); err != nil { + return err + } + + // Sanity-check the candidate block + if err := SanityCheckCandidate(msg.Candidate); err != nil { + return err + } + + // Ensure candidate block hash is equal to the BlockHash of the msg.header + hash, err := msg.Candidate.CalculateHash() + if err != nil { + return err + } + + if !bytes.Equal(msg.State().BlockHash, hash) { + return ErrInvalidBlockHash + } + + return nil +} + +func verifySignature(scr message.NewBlock) error { + packet := new(bytes.Buffer) + + hdr := scr.State() + if err := header.MarshalSignableVote(packet, hdr); err != nil { + return err + } + + return msg.VerifyBLSSignature(hdr.PubKeyBLS, scr.SignedHash, packet.Bytes()) +} From 2197216fa8eeee641778c936a210edfcbb687f2c Mon Sep 17 00:00:00 2001 From: goshawk Date: Thu, 14 Jul 2022 14:56:33 +0300 Subject: [PATCH 3/8] Drop candidate block if already added --- pkg/core/database/heavy/transactions.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/core/database/heavy/transactions.go b/pkg/core/database/heavy/transactions.go index d21adbf04..53680a738 100644 --- a/pkg/core/database/heavy/transactions.go +++ b/pkg/core/database/heavy/transactions.go @@ -497,12 +497,18 @@ func (t transaction) FetchBlockHeightSince(sinceUnixTime int64, offset uint64) ( } func (t transaction) StoreCandidateMessage(cm block.Block) error { + key := append(CandidatePrefix, cm.Header.Hash...) + + _, err := t.snapshot.Get(key, nil) + if err == nil { + return nil + } + buf := new(bytes.Buffer) if err := message.MarshalBlock(buf, &cm); err != nil { return err } - key := append(CandidatePrefix, cm.Header.Hash...) t.put(key, buf.Bytes()) return nil } From 64c7497a89c8aac11a12adb2d5fee88c166fdcfe Mon Sep 17 00:00:00 2001 From: goshawk Date: Thu, 14 Jul 2022 14:57:18 +0300 Subject: [PATCH 4/8] Use candidate.Collector in Selection step --- pkg/core/consensus/selection/step.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/core/consensus/selection/step.go b/pkg/core/consensus/selection/step.go index 47004a5e9..b24f6c0b5 100644 --- a/pkg/core/consensus/selection/step.go +++ b/pkg/core/consensus/selection/step.go @@ -108,6 +108,9 @@ func (p *Phase) Run(parentCtx context.Context, queue *consensus.Queue, evChan ch p.handler = NewHandler(p.Keys, r.P, r.Seed) + collector := candidate.NewCollector(p.EventBus, p.handler.Handler, p.db, r.Round) + collector.UpdateStep(step, "selection") + isMember := p.handler.AmMember(r.Round, step) if log.GetLevel() >= logrus.DebugLevel { @@ -144,7 +147,7 @@ func (p *Phase) Run(parentCtx context.Context, queue *consensus.Queue, evChan ch case ev := <-evChan: if shouldProcess(ev, r.Round, step, queue) { b := ev.Payload().(message.NewBlock) - if err := p.collectNewBlock(b, ev.Header()); err != nil { + if err := collector.Collect(b, ev.Header()); err != nil { continue } @@ -212,6 +215,7 @@ func (p *Phase) collectNewBlock(msg message.NewBlock, msgHeader []byte) error { // Persist Candidate Block on disk. if err := p.db.Update(func(t database.Transaction) error { + // TODO: return t.StoreCandidateMessage(msg.Candidate) }); err != nil { lg.WithError(err).Errorln("could not store candidate") From 916398a97211bbf41a49595dd5138c2b5e6d88f5 Mon Sep 17 00:00:00 2001 From: goshawk Date: Thu, 14 Jul 2022 14:57:54 +0300 Subject: [PATCH 5/8] Use candidate.Collector in firststep --- .../consensus/reduction/firststep/step.go | 40 +++++++++++++------ pkg/core/consensus/reduction/reduction.go | 9 +++-- 2 files changed, 32 insertions(+), 17 deletions(-) diff --git a/pkg/core/consensus/reduction/firststep/step.go b/pkg/core/consensus/reduction/firststep/step.go index 3e33b37b5..006b1defd 100644 --- a/pkg/core/consensus/reduction/firststep/step.go +++ b/pkg/core/consensus/reduction/firststep/step.go @@ -87,13 +87,17 @@ func (p *Phase) Run(ctx context.Context, queue *consensus.Queue, evChan chan mes tlog.Traceln("ending first reduction step") }() + p.handler = reduction.NewHandler(p.Keys, r.P, r.Seed) + + collector := candidate.NewCollector(p.EventBus, p.handler.Handler, p.db, r.Round) + collector.UpdateStep(step, "1st_reduction") + if log.GetLevel() >= logrus.DebugLevel { c := p.selectionResult.Candidate tlog.WithField("hash", util.StringifyBytes(c.Header.Hash)). Debug("initialized") } - p.handler = reduction.NewHandler(p.Keys, r.P, r.Seed) // first we send our own Selection if p.handler.AmMember(r.Round, step) { m, _ := p.SendReduction(r.Round, step, &p.selectionResult.Candidate) @@ -131,21 +135,31 @@ func (p *Phase) Run(ctx context.Context, queue *consensus.Queue, evChan chan mes for { select { case ev := <-evChan: - if reduction.ShouldProcess(ev, r.Round, step, queue) { - rMsg := ev.Payload().(message.Reduction) - if !p.handler.IsMember(rMsg.Sender(), r.Round, step) { - continue - } - sv := p.collectReduction(ctx, rMsg, r.Round, step, ev.Header()) - if sv != nil { - // preventing timeout leakage - go func() { - <-timeoutChan - }() - return p.gotoNextPhase(sv) + switch ev.Category() { + case topics.NewBlock: + // Collect and repropagate the candidate block of this iteration. + b := ev.Payload().(message.NewBlock) + _ = collector.Collect(b, ev.Header()) + + case topics.Reduction: + if reduction.ShouldProcess(ev, r.Round, step, queue) { + rMsg := ev.Payload().(message.Reduction) + if !p.handler.IsMember(rMsg.Sender(), r.Round, step) { + continue + } + + sv := p.collectReduction(ctx, rMsg, r.Round, step, ev.Header()) + if sv != nil { + // preventing timeout leakage + go func() { + <-timeoutChan + }() + return p.gotoNextPhase(sv) + } } } + case <-timeoutChan: // in case of timeout we proceed in the consensus with an empty hash sv := p.createStepVoteMessage(reduction.EmptyResult, r.Round, step, *block.NewBlock()) diff --git a/pkg/core/consensus/reduction/reduction.go b/pkg/core/consensus/reduction/reduction.go index f96b0449c..08fbcb31b 100644 --- a/pkg/core/consensus/reduction/reduction.go +++ b/pkg/core/consensus/reduction/reduction.go @@ -182,15 +182,16 @@ func ShouldProcess(m message.Message, round uint64, step uint8, queue *consensus l := lg. WithFields(log.Fields{ "topic": m.Category().String(), - "round": hdr.Round, - "step": hdr.Step, + "msg_round": hdr.Round, + "msg_step": hdr.Step, "expected round": round, + "expected step": step, }) // Report as a warning an event of discarding newblock while in a // reduction phase. - if m.Category() == topics.NewBlock { - l.Warnln("discarding obsolete event") + if m.Category() == topics.NewBlock && round == hdr.Round { + l.Warnln("discarding newblock event") } else { l.Debugln("discarding obsolete event") } From 49fa031dc1adaa4087f3f19b071ea57b6a0ac429 Mon Sep 17 00:00:00 2001 From: goshawk Date: Thu, 14 Jul 2022 15:36:33 +0300 Subject: [PATCH 6/8] Use candidate.Collector in 2nd_reduction --- pkg/core/candidate/collector.go | 8 +++++++- pkg/core/consensus/reduction/reduction.go | 3 +++ .../consensus/reduction/secondstep/reduction_test.go | 2 +- pkg/core/consensus/reduction/secondstep/step.go | 10 +++++++++- pkg/core/loop/consensus.go | 2 +- 5 files changed, 21 insertions(+), 4 deletions(-) diff --git a/pkg/core/candidate/collector.go b/pkg/core/candidate/collector.go index 95e7dd6c1..ed92ce9bf 100644 --- a/pkg/core/candidate/collector.go +++ b/pkg/core/candidate/collector.go @@ -77,7 +77,13 @@ func (c *Collector) Collect(msg message.NewBlock, msgHeader []byte) error { // Once the event is verified, and has passed all preliminary checks, // we can republish it to the network. m := message.NewWithHeader(topics.NewBlock, msg, msgHeader) - _ = c.eventbus.Publish(topics.Kadcast, m) + buf, err := message.Marshal(m) + if err != nil { + return err + } + + serialized := message.NewWithHeader(m.Category(), buf, m.Header()) + _ = c.eventbus.Publish(topics.Kadcast, serialized) return nil } diff --git a/pkg/core/consensus/reduction/reduction.go b/pkg/core/consensus/reduction/reduction.go index 08fbcb31b..533de3b9c 100644 --- a/pkg/core/consensus/reduction/reduction.go +++ b/pkg/core/consensus/reduction/reduction.go @@ -16,6 +16,7 @@ import ( "github.com/dusk-network/dusk-blockchain/pkg/core/consensus" "github.com/dusk-network/dusk-blockchain/pkg/core/consensus/header" "github.com/dusk-network/dusk-blockchain/pkg/core/data/block" + "github.com/dusk-network/dusk-blockchain/pkg/core/database" "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message" "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics" "github.com/dusk-network/dusk-blockchain/pkg/util" @@ -61,6 +62,8 @@ type Reduction struct { VerifyFn consensus.CandidateVerificationFunc VerifiedHash []byte + + db database.DB } // IncreaseTimeout is used when reduction does not reach the quorum or diff --git a/pkg/core/consensus/reduction/secondstep/reduction_test.go b/pkg/core/consensus/reduction/secondstep/reduction_test.go index 15d100a46..e3c21bf07 100644 --- a/pkg/core/consensus/reduction/secondstep/reduction_test.go +++ b/pkg/core/consensus/reduction/secondstep/reduction_test.go @@ -37,7 +37,7 @@ func TestSendReduction(t *testing.T) { timeout := time.Second hlp := reduction.NewHelper(messageToSpawn, timeout) - secondStep := New(hlp.Emitter, verifyFn, 10*time.Second) + secondStep := New(hlp.Emitter, verifyFn, 10*time.Second, db) // Generate second StepVotes svs := message.GenVotes(hash, []byte{0, 0, 0, 0}, 1, 2, hlp.ProvisionersKeys, hlp.P) diff --git a/pkg/core/consensus/reduction/secondstep/step.go b/pkg/core/consensus/reduction/secondstep/step.go index 13c3b6065..d49fc018d 100644 --- a/pkg/core/consensus/reduction/secondstep/step.go +++ b/pkg/core/consensus/reduction/secondstep/step.go @@ -13,10 +13,12 @@ import ( "time" "github.com/dusk-network/dusk-blockchain/pkg/config" + "github.com/dusk-network/dusk-blockchain/pkg/core/candidate" "github.com/dusk-network/dusk-blockchain/pkg/core/consensus" "github.com/dusk-network/dusk-blockchain/pkg/core/consensus/header" "github.com/dusk-network/dusk-blockchain/pkg/core/consensus/reduction" "github.com/dusk-network/dusk-blockchain/pkg/core/data/block" + "github.com/dusk-network/dusk-blockchain/pkg/core/database" "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message" "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics" "github.com/dusk-network/dusk-blockchain/pkg/util" @@ -39,6 +41,7 @@ type Phase struct { *reduction.Reduction handler *reduction.Handler aggregator *reduction.Aggregator + db database.DB firstStepVotesMsg message.StepVotesMsg @@ -51,13 +54,14 @@ type Phase struct { // NB: we cannot push the agreement directly within the agreementChannel // until we have a way to deduplicate it from the peer (the dupemap will not be // notified of duplicates). -func New(e *consensus.Emitter, verifyFn consensus.CandidateVerificationFunc, timeOut time.Duration) *Phase { +func New(e *consensus.Emitter, verifyFn consensus.CandidateVerificationFunc, timeOut time.Duration, db database.DB) *Phase { return &Phase{ Reduction: &reduction.Reduction{ Emitter: e, TimeOut: timeOut, VerifyFn: verifyFn, }, + db: db, } } @@ -94,6 +98,10 @@ func (p *Phase) Run(ctx context.Context, queue *consensus.Queue, evChan chan mes } p.handler = reduction.NewHandler(p.Keys, r.P, r.Seed) + + collector := candidate.NewCollector(p.EventBus, p.handler.Handler, p.db, r.Round) + collector.UpdateStep(step, "2nd_reduction") + // first we send our own Selection if p.handler.AmMember(r.Round, step) { diff --git a/pkg/core/loop/consensus.go b/pkg/core/loop/consensus.go index 597b6e9b5..ca93a1bc5 100644 --- a/pkg/core/loop/consensus.go +++ b/pkg/core/loop/consensus.go @@ -69,7 +69,7 @@ func CreateStateMachine(e *consensus.Emitter, db database.DB, consensusTimeOut t // CreateInitialStep creates the selection step by injecting a BlockGenerator // interface to it. func CreateInitialStep(e *consensus.Emitter, consensusTimeOut time.Duration, bg blockgenerator.BlockGenerator, verifyFn consensus.CandidateVerificationFunc, db database.DB, requestor *candidate.Requestor) consensus.Phase { - redu2 := secondstep.New(e, verifyFn, consensusTimeOut) + redu2 := secondstep.New(e, verifyFn, consensusTimeOut, db) redu1 := firststep.New(redu2, e, verifyFn, consensusTimeOut, db, requestor) selectionStep := selection.New(redu1, bg, e, consensusTimeOut, db) From fdaa434bcc3f7b4fb02f0119b3d98f050dce797b Mon Sep 17 00:00:00 2001 From: goshawk Date: Thu, 14 Jul 2022 15:46:23 +0300 Subject: [PATCH 7/8] Add database field in reduction_test units --- .../consensus/reduction/secondstep/reduction_test.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/core/consensus/reduction/secondstep/reduction_test.go b/pkg/core/consensus/reduction/secondstep/reduction_test.go index e3c21bf07..545558b43 100644 --- a/pkg/core/consensus/reduction/secondstep/reduction_test.go +++ b/pkg/core/consensus/reduction/secondstep/reduction_test.go @@ -14,6 +14,7 @@ import ( "github.com/dusk-network/dusk-blockchain/pkg/core/consensus" "github.com/dusk-network/dusk-blockchain/pkg/core/consensus/reduction" "github.com/dusk-network/dusk-blockchain/pkg/core/data/block" + "github.com/dusk-network/dusk-blockchain/pkg/core/database/lite" "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message" "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics" "github.com/dusk-network/dusk-blockchain/pkg/util/nativeutils/eventbus" @@ -34,6 +35,9 @@ func TestSendReduction(t *testing.T) { hash, err := crypto.RandEntropy(32) require.NoError(t, err) + _, db := lite.CreateDBConnection() + defer db.Close() + timeout := time.Second hlp := reduction.NewHelper(messageToSpawn, timeout) @@ -140,6 +144,9 @@ func TestSecondStepReduction(t *testing.T) { hash, err := crypto.RandEntropy(32) require.NoError(t, err) + _, db := lite.CreateDBConnection() + defer db.Close() + timeout := time.Second table := initiateTableTest(timeout, hash, round, step) @@ -173,7 +180,7 @@ func TestSecondStepReduction(t *testing.T) { } // spin secondStepVotes - secondStepReduction := New(hlp.Emitter, verifyFn, timeout) + secondStepReduction := New(hlp.Emitter, verifyFn, timeout, db) // Generate second StepVotes svs := message.GenVotes(hash, []byte{0, 0, 0, 0}, 1, 2, hlp.ProvisionersKeys, hlp.P) From 6655b9e115120fb81d24242be408f728afa18b79 Mon Sep 17 00:00:00 2001 From: goshawk Date: Mon, 18 Jul 2022 08:31:33 +0300 Subject: [PATCH 8/8] Check msg.State().Step belongs to the current iteration --- pkg/core/candidate/collector.go | 34 +++++++-- .../consensus/reduction/firststep/step.go | 1 - pkg/core/consensus/reduction/reduction.go | 3 - pkg/core/consensus/selection/step.go | 75 ------------------- 4 files changed, 28 insertions(+), 85 deletions(-) diff --git a/pkg/core/candidate/collector.go b/pkg/core/candidate/collector.go index ed92ce9bf..f78452676 100644 --- a/pkg/core/candidate/collector.go +++ b/pkg/core/candidate/collector.go @@ -22,12 +22,21 @@ import ( ) var ( - ErrInvalidNewBlock = errors.New("invalid newblock message") - ErrInvalidBlockHash = errors.New("invalid block hash") + // ErrInvalidNewBlock error invalid newblock message. + ErrInvalidNewBlock = errors.New("invalid newblock message") + // ErrInvalidBlockHash ... + ErrInvalidBlockHash = errors.New("invalid block hash") + // ErrNotBlockGenerator ... ErrNotBlockGenerator = errors.New("message not signed by generator") - ErrInvalidMsgRound = errors.New("invalid message round") + // ErrInvalidMsgRound ... + ErrInvalidMsgRound = errors.New("invalid message round") + // ErrInvalidStep ... + ErrInvalidStep = errors.New("invalid newblock step") + // ErrMaxStepExceeded ... + ErrMaxStepExceeded = errors.New("max step exceeded") ) +// Collector implements a procedure of collecting candidate block from a wire message (newblock). type Collector struct { eventbus *eventbus.EventBus handler *committee.Handler @@ -38,6 +47,7 @@ type Collector struct { stepName string } +// NewCollector instantiates Collector. func NewCollector(e *eventbus.EventBus, h *committee.Handler, db database.DB, round uint64) *Collector { return &Collector{ eventbus: e, @@ -47,19 +57,30 @@ func NewCollector(e *eventbus.EventBus, h *committee.Handler, db database.DB, ro } } -// UpdateStep set step/stepName +// UpdateStep set step/stepName. func (c *Collector) UpdateStep(step uint8, name string) { c.step = step c.stepName = name } -// Collect put the candidate block from message.Block to DB, if message is valid. +// Collect put a candidate block from message.Block to DB, if message is valid. func (c *Collector) Collect(msg message.NewBlock, msgHeader []byte) error { if msg.State().Round != c.round { return ErrInvalidMsgRound } - // TODO: check msg.State().Step belongs to this Round and Iteration + msgStep := msg.State().Step + + if msgStep >= config.ConsensusMaxStep { + return ErrMaxStepExceeded + } + + // Check msg.State().Step belongs to current iteration + if msgStep == 0 || + (msgStep-1)/3 != (c.step-1)/3 { + return errors.New("invalid newblock step") + } + log := logrus.WithField("process", c.stepName) if err := c.verify(msg); err != nil { @@ -77,6 +98,7 @@ func (c *Collector) Collect(msg message.NewBlock, msgHeader []byte) error { // Once the event is verified, and has passed all preliminary checks, // we can republish it to the network. m := message.NewWithHeader(topics.NewBlock, msg, msgHeader) + buf, err := message.Marshal(m) if err != nil { return err diff --git a/pkg/core/consensus/reduction/firststep/step.go b/pkg/core/consensus/reduction/firststep/step.go index 006b1defd..60d95db2f 100644 --- a/pkg/core/consensus/reduction/firststep/step.go +++ b/pkg/core/consensus/reduction/firststep/step.go @@ -135,7 +135,6 @@ func (p *Phase) Run(ctx context.Context, queue *consensus.Queue, evChan chan mes for { select { case ev := <-evChan: - switch ev.Category() { case topics.NewBlock: // Collect and repropagate the candidate block of this iteration. diff --git a/pkg/core/consensus/reduction/reduction.go b/pkg/core/consensus/reduction/reduction.go index 533de3b9c..08fbcb31b 100644 --- a/pkg/core/consensus/reduction/reduction.go +++ b/pkg/core/consensus/reduction/reduction.go @@ -16,7 +16,6 @@ import ( "github.com/dusk-network/dusk-blockchain/pkg/core/consensus" "github.com/dusk-network/dusk-blockchain/pkg/core/consensus/header" "github.com/dusk-network/dusk-blockchain/pkg/core/data/block" - "github.com/dusk-network/dusk-blockchain/pkg/core/database" "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message" "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics" "github.com/dusk-network/dusk-blockchain/pkg/util" @@ -62,8 +61,6 @@ type Reduction struct { VerifyFn consensus.CandidateVerificationFunc VerifiedHash []byte - - db database.DB } // IncreaseTimeout is used when reduction does not reach the quorum or diff --git a/pkg/core/consensus/selection/step.go b/pkg/core/consensus/selection/step.go index b24f6c0b5..104c4eb57 100644 --- a/pkg/core/consensus/selection/step.go +++ b/pkg/core/consensus/selection/step.go @@ -7,10 +7,7 @@ package selection import ( - "bytes" "context" - "encoding/hex" - "errors" "os" "strconv" "time" @@ -20,7 +17,6 @@ import ( "github.com/dusk-network/dusk-blockchain/pkg/core/consensus/blockgenerator" "github.com/dusk-network/dusk-blockchain/pkg/core/consensus/header" "github.com/dusk-network/dusk-blockchain/pkg/core/database" - "github.com/dusk-network/dusk-blockchain/pkg/util" "github.com/dusk-network/dusk-blockchain/pkg/core/consensus" "github.com/dusk-network/dusk-blockchain/pkg/core/consensus/key" @@ -32,11 +28,6 @@ import ( var lg = log.WithField("process", "consensus").WithField("phase", "selector") -var ( - errInvalidHash = errors.New("candidate hash is different from message.header hash") - errNotBlockGenerator = errors.New("newblock msg is not signed by a block generator") -) - // Phase is the implementation of the Selection step component. type Phase struct { *consensus.Emitter @@ -173,72 +164,6 @@ func (p *Phase) endSelection(result message.NewBlock) consensus.PhaseFn { return p.next.Initialize(result) } -// verifyNewBlock executes a set of check points to ensure the hash of the -// candidate block has been signed by the single committee member of selection -// step of this iteration. -func (p *Phase) verifyNewBlock(msg message.NewBlock) error { - if !p.handler.IsMember(msg.State().PubKeyBLS, msg.State().Round, msg.State().Step) { - return errNotBlockGenerator - } - - // Verify message signagure - if err := p.handler.VerifySignature(msg); err != nil { - return err - } - - // Sanity-check the candidate block - if err := candidate.SanityCheckCandidate(msg.Candidate); err != nil { - return err - } - - // Ensure candidate block hash is equal to the BlockHash of the msg.header - hash, err := msg.Candidate.CalculateHash() - if err != nil { - return err - } - - if !bytes.Equal(msg.State().BlockHash, hash) { - return errInvalidHash - } - - return nil -} - -func (p *Phase) collectNewBlock(msg message.NewBlock, msgHeader []byte) error { - if err := p.verifyNewBlock(msg); err != nil { - msg.WithFields(lg). - WithField("seed", hex.EncodeToString(p.handler.Seed())). - WithError(err).Error("failed to verify newblock") - - return err - } - - // Persist Candidate Block on disk. - if err := p.db.Update(func(t database.Transaction) error { - // TODO: - return t.StoreCandidateMessage(msg.Candidate) - }); err != nil { - lg.WithError(err).Errorln("could not store candidate") - } - - if log.GetLevel() >= logrus.DebugLevel { - log := consensus.WithFields(msg.State().Round, msg.State().Step, "newblock_collected", - msg.Candidate.Header.Hash, p.Keys.BLSPubKey, nil, nil, nil) - - log.WithField("sender", util.StringifyBytes(msg.State().PubKeyBLS)).Debug() - } - - // Once the event is verified, and has passed all preliminary checks, - // we can republish it to the network. - m := message.NewWithHeader(topics.NewBlock, msg, msgHeader) - if err := p.Republish(m); err != nil { - lg.WithError(err). - Error("could not republish score event") - } - - return nil -} - // increaseTimeOut increases the timeout after a failed selection. func (p *Phase) increaseTimeOut() { p.timeout = p.timeout * 2