From 76e9d830c1b568daa43a8197beb6cb57b58c915f Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Wed, 3 Aug 2022 17:01:26 +0200 Subject: [PATCH] Propagate message metadata - Replace the current `message.Header` bytes with a structured `message.Metadata` - Remove redundant usages of `config.KadcastInitHeader` - Remove unused `kadcastHeight` in synchronizer Resolves #1452 --- pkg/config/consts.go | 5 +- pkg/core/candidate/requestor.go | 4 +- pkg/core/chain/chain.go | 28 +++++------ pkg/core/chain/consensus.go | 2 +- pkg/core/chain/ledger.go | 4 +- pkg/core/chain/synchronizer.go | 18 ++++---- pkg/core/chain/synchronizer_test.go | 8 ++-- .../consensus/agreement/agreement_in_test.go | 3 +- pkg/core/consensus/agreement/step.go | 9 ++-- pkg/core/consensus/comms.go | 2 +- .../consensus/reduction/firststep/step.go | 8 ++-- pkg/core/consensus/reduction/reduction.go | 2 +- .../consensus/reduction/secondstep/step.go | 11 ++--- pkg/core/consensus/selection/step.go | 11 ++--- pkg/core/mempool/mempool.go | 10 ++-- pkg/p2p/kadcast/peer_test.go | 46 +++++++++++++++++-- pkg/p2p/kadcast/reader.go | 9 ++-- pkg/p2p/kadcast/writer/broadcast.go | 34 +++++++------- pkg/p2p/kadcast/writer/sendmany.go | 13 +++--- pkg/p2p/kadcast/writer/sendone.go | 13 +++--- pkg/p2p/peer/peer.go | 3 +- pkg/p2p/peer/processor.go | 4 +- pkg/p2p/wire/message/message.go | 22 ++++----- pkg/p2p/wire/message/metadata.go | 18 ++++++++ pkg/p2p/wire/message/transactions_test.go | 3 +- pkg/util/container/ring/buffer.go | 4 +- pkg/util/container/ring/consumer.go | 4 +- .../nativeutils/eventbus/eventbus_test.go | 2 +- pkg/util/nativeutils/eventbus/listener.go | 4 +- pkg/util/nativeutils/eventbus/mock.go | 2 +- 30 files changed, 179 insertions(+), 127 deletions(-) create mode 100644 pkg/p2p/wire/message/metadata.go diff --git a/pkg/config/consts.go b/pkg/config/consts.go index 83f705734..67fc7d213 100644 --- a/pkg/config/consts.go +++ b/pkg/config/consts.go @@ -25,7 +25,7 @@ const ( MaxBlockTime = 360 // maximum block time in seconds // KadcastInitialHeight sets the default initial height for Kadcast broadcast algorithm. - KadcastInitialHeight byte = 128 + 1 + KadcastInitialHeight byte = 128 // The dusk-blockchain executable version. NodeVersion = "0.6.0-rc" @@ -63,6 +63,3 @@ const ( // GetCandidateReceivers is a redundancy factor on retrieving a missing candidate block. GetCandidateReceivers = 7 ) - -// KadcastInitHeader is used as default initial kadcast message header. -var KadcastInitHeader = []byte{KadcastInitialHeight} diff --git a/pkg/core/candidate/requestor.go b/pkg/core/candidate/requestor.go index 0543b5569..008e4ca5f 100644 --- a/pkg/core/candidate/requestor.go +++ b/pkg/core/candidate/requestor.go @@ -87,7 +87,7 @@ func (r *Requestor) publishGetCandidate(hash []byte) error { return err } - m := message.NewWithHeader(topics.GetCandidate, *buf, config.KadcastInitHeader) + m := message.New(topics.GetCandidate, *buf) r.publisher.Publish(topics.Kadcast, m) return nil @@ -102,7 +102,7 @@ func (r *Requestor) sendGetCandidate(hash []byte) error { return err } - msg := message.NewWithHeader(topics.GetCandidate, buf, []byte{config.GetCandidateReceivers}) + msg := message.NewWithMetadata(topics.GetCandidate, buf, &message.Metadata{NumNodes: config.GetCandidateReceivers}) r.publisher.Publish(topics.KadcastSendToMany, msg) return nil } diff --git a/pkg/core/chain/chain.go b/pkg/core/chain/chain.go index 0f523b888..3b75e4dea 100644 --- a/pkg/core/chain/chain.go +++ b/pkg/core/chain/chain.go @@ -256,10 +256,8 @@ func (c *Chain) ProcessBlockFromNetwork(srcPeerID string, m message.Message) ([] l := log.WithField("recv_blk_h", blk.Header.Height). WithField("curr_h", c.tip.Header.Height) - var kh byte = 255 - if len(m.Header()) > 0 { - kh = m.Header()[0] - l = l.WithField("kad_h", kh) + if m.Metadata() != nil { + l = l.WithField("kad_h", m.Metadata().KadcastHeight) } l.Trace("block received") @@ -293,7 +291,7 @@ func (c *Chain) ProcessBlockFromNetwork(srcPeerID string, m message.Message) ([] // out if any other node propagates it back when this node is syncing up. c.blacklisted.Add(bytes.NewBuffer(hash)) - return c.synchronizer.processBlock(srcPeerID, c.tip.Header.Height, blk, kh) + return c.synchronizer.processBlock(srcPeerID, c.tip.Header.Height, blk) } case blk.Header.Height < c.tip.Header.Height: l.Debug("discard block") @@ -318,22 +316,22 @@ func (c *Chain) ProcessBlockFromNetwork(srcPeerID string, m message.Message) ([] c.highestSeen = blk.Header.Height } - return c.synchronizer.processBlock(srcPeerID, c.tip.Header.Height, blk, kh) + return c.synchronizer.processBlock(srcPeerID, c.tip.Header.Height, blk) } // TryNextConsecutiveBlockOutSync is the processing path for accepting a block // from the network during out-of-sync state. -func (c *Chain) TryNextConsecutiveBlockOutSync(blk block.Block, kadcastHeight byte) error { +func (c *Chain) TryNextConsecutiveBlockOutSync(blk block.Block) error { log.WithField("height", blk.Header.Height).Trace("accepting sync block") return c.acceptBlock(blk, true) } // TryNextConsecutiveBlockInSync is the processing path for accepting a block // from the network during in-sync state. Returns err if the block is not valid. -func (c *Chain) TryNextConsecutiveBlockInSync(blk block.Block, kadcastHeight byte) error { +func (c *Chain) TryNextConsecutiveBlockInSync(blk block.Block) error { // Make an attempt to accept a new block. If succeeds, we could safely restart the Consensus Loop. // If not, peer reputation score should be decreased. - if err := c.acceptSuccessiveBlock(blk, kadcastHeight); err != nil { + if err := c.acceptSuccessiveBlock(blk); err != nil { return err } @@ -385,7 +383,7 @@ func (c *Chain) ProcessSyncTimerExpired(strPeerAddr string) error { // acceptSuccessiveBlock will accept a block which directly follows the chain // tip, and advertises it to the node's peers. -func (c *Chain) acceptSuccessiveBlock(blk block.Block, kadcastHeight byte) error { +func (c *Chain) acceptSuccessiveBlock(blk block.Block) error { log.WithField("height", blk.Header.Height).Trace("accepting succeeding block") if err := c.isValidHeader(blk, *c.tip, *c.p, log, true); err != nil { @@ -393,7 +391,7 @@ func (c *Chain) acceptSuccessiveBlock(blk block.Block, kadcastHeight byte) error return err } - if err := c.kadcastBlock(blk, kadcastHeight); err != nil { + if err := c.kadcastBlock(blk); err != nil { log.WithError(err).Error("block propagation failed") return err } @@ -735,9 +733,8 @@ func (c *Chain) ExecuteStateTransition(ctx context.Context, txs []transactions.C return c.proxy.Executor().ExecuteStateTransition(c.ctx, txs, blockGasLimit, blockHeight, generator) } -func (c *Chain) kadcastBlock(blk block.Block, kadcastHeight byte) error { - log.WithField("blk_height", blk.Header.Height). - WithField("kadcast_h", kadcastHeight).Trace("propagate block") +func (c *Chain) kadcastBlock(blk block.Block) error { + log.WithField("blk_height", blk.Header.Height).Trace("propagate block") buf := new(bytes.Buffer) if err := message.MarshalBlock(buf, &blk); err != nil { @@ -748,8 +745,7 @@ func (c *Chain) kadcastBlock(blk block.Block, kadcastHeight byte) error { return err } - c.eventBus.Publish(topics.Kadcast, - message.NewWithHeader(topics.Block, *buf, []byte{kadcastHeight})) + c.eventBus.Publish(topics.Kadcast, message.New(topics.Block, *buf)) return nil } diff --git a/pkg/core/chain/consensus.go b/pkg/core/chain/consensus.go index 6fc9749be..403d0caea 100644 --- a/pkg/core/chain/consensus.go +++ b/pkg/core/chain/consensus.go @@ -70,7 +70,7 @@ func (c *Chain) acceptConsensusResults(ctx context.Context, winnerChan chan cons return } - if err = c.acceptSuccessiveBlock(block, config.KadcastInitialHeight); err != nil { + if err = c.acceptSuccessiveBlock(block); err != nil { log.WithError(err).Error("block acceptance failed") c.lock.Unlock() return diff --git a/pkg/core/chain/ledger.go b/pkg/core/chain/ledger.go index babf8e3a1..a74a200eb 100644 --- a/pkg/core/chain/ledger.go +++ b/pkg/core/chain/ledger.go @@ -10,8 +10,8 @@ import "github.com/dusk-network/dusk-blockchain/pkg/core/data/block" // Ledger is the Chain interface used in tests. type Ledger interface { - TryNextConsecutiveBlockInSync(blk block.Block, kadcastHeight byte) error - TryNextConsecutiveBlockOutSync(blk block.Block, kadcastHeight byte) error + TryNextConsecutiveBlockInSync(blk block.Block) error + TryNextConsecutiveBlockOutSync(blk block.Block) error TryNextConsecutiveBlockIsValid(blk block.Block) error // RestartConsensus Stop and Start Consensus. diff --git a/pkg/core/chain/synchronizer.go b/pkg/core/chain/synchronizer.go index f9b71d68a..55ba40ee7 100644 --- a/pkg/core/chain/synchronizer.go +++ b/pkg/core/chain/synchronizer.go @@ -26,9 +26,9 @@ const ( var slog = logrus.WithField("process", "sync") -type syncState func(srcPeerAddr string, currentHeight uint64, blk block.Block, kadcastHeight byte) ([]bytes.Buffer, error) +type syncState func(srcPeerAddr string, currentHeight uint64, blk block.Block) ([]bytes.Buffer, error) -func (s *synchronizer) inSync(srcPeerAddr string, currentHeight uint64, blk block.Block, kadcastHeight byte) ([]bytes.Buffer, error) { +func (s *synchronizer) inSync(srcPeerAddr string, currentHeight uint64, blk block.Block) ([]bytes.Buffer, error) { if blk.Header.Height > currentHeight+1 { s.sequencer.add(blk) @@ -43,12 +43,12 @@ func (s *synchronizer) inSync(srcPeerAddr string, currentHeight uint64, blk bloc s.timer.Start(srcPeerAddr) s.state = s.outSync - b, err := s.startSync(srcPeerAddr, blk.Header.Height, currentHeight, kadcastHeight) + b, err := s.startSync(srcPeerAddr, blk.Header.Height, currentHeight) return b, err } // Otherwise notify the chain (and the consensus loop). - if err := s.chain.TryNextConsecutiveBlockInSync(blk, kadcastHeight); err != nil { + if err := s.chain.TryNextConsecutiveBlockInSync(blk); err != nil { slog.WithField("blk_height", blk.Header.Height). WithField("blk_hash", hex.EncodeToString(blk.Header.Hash)). WithField("state", "insync"). @@ -60,7 +60,7 @@ func (s *synchronizer) inSync(srcPeerAddr string, currentHeight uint64, blk bloc return nil, nil } -func (s *synchronizer) outSync(srcPeerAddr string, currentHeight uint64, blk block.Block, kadcastHeight byte) ([]bytes.Buffer, error) { +func (s *synchronizer) outSync(srcPeerAddr string, currentHeight uint64, blk block.Block) ([]bytes.Buffer, error) { var err error // Once we validate successfully the next block from the syncing @@ -102,7 +102,7 @@ func (s *synchronizer) outSync(srcPeerAddr string, currentHeight uint64, blk blo for _, blk := range blks { // append them all to the ledger - if err = s.chain.TryNextConsecutiveBlockOutSync(blk, kadcastHeight); err != nil { + if err = s.chain.TryNextConsecutiveBlockOutSync(blk); err != nil { slog.WithError(err).WithField("state", "outsync"). Warn("could not accept block") @@ -171,17 +171,17 @@ func newSynchronizer(db database.DB, chain Ledger) *synchronizer { } // processBlock handles an incoming block from the network. -func (s *synchronizer) processBlock(srcPeerID string, currentHeight uint64, blk block.Block, kadcastHeight byte) (res []bytes.Buffer, err error) { +func (s *synchronizer) processBlock(srcPeerID string, currentHeight uint64, blk block.Block) (res []bytes.Buffer, err error) { // Clean up sequencer s.sequencer.cleanup(currentHeight) s.sequencer.dump() currState := s.state - res, err = currState(srcPeerID, currentHeight, blk, kadcastHeight) + res, err = currState(srcPeerID, currentHeight, blk) return } -func (s *synchronizer) startSync(strPeerAddr string, tipHeight, currentHeight uint64, _ byte) ([]bytes.Buffer, error) { +func (s *synchronizer) startSync(strPeerAddr string, tipHeight, currentHeight uint64) ([]bytes.Buffer, error) { s.hrange.from = currentHeight s.setSyncTarget(tipHeight, currentHeight+config.MaxInvBlocks) diff --git a/pkg/core/chain/synchronizer_test.go b/pkg/core/chain/synchronizer_test.go index e5af1a76c..e4ca78d7c 100644 --- a/pkg/core/chain/synchronizer_test.go +++ b/pkg/core/chain/synchronizer_test.go @@ -25,7 +25,7 @@ func TestSuccessiveBlocks(t *testing.T) { // tipHeight will be 0, so make the successive block blk := helper.RandomBlock(1, 1) - res, err := s.processBlock("", 0, *blk, 0) + res, err := s.processBlock("", 0, *blk) assert.NoError(err) assert.Nil(res) @@ -40,7 +40,7 @@ func TestFutureBlocks(t *testing.T) { height := uint64(10) blk := helper.RandomBlock(height, 1) - resp, err := s.processBlock("", 0, *blk, 0) + resp, err := s.processBlock("", 0, *blk) assert.NoError(err) // Response should be of the GetBlocks topic @@ -75,12 +75,12 @@ func (m *mockChain) CurrentHeight() uint64 { return m.tipHeight } -func (m *mockChain) TryNextConsecutiveBlockInSync(blk block.Block, _ byte) error { +func (m *mockChain) TryNextConsecutiveBlockInSync(blk block.Block) error { m.catchBlockChan <- consensus.Results{Blk: blk, Err: nil} return nil } -func (m *mockChain) TryNextConsecutiveBlockOutSync(_ block.Block, _ byte) error { +func (m *mockChain) TryNextConsecutiveBlockOutSync(_ block.Block) error { return nil } diff --git a/pkg/core/consensus/agreement/agreement_in_test.go b/pkg/core/consensus/agreement/agreement_in_test.go index 6e4162117..cc319d58c 100644 --- a/pkg/core/consensus/agreement/agreement_in_test.go +++ b/pkg/core/consensus/agreement/agreement_in_test.go @@ -11,7 +11,6 @@ import ( "testing" "github.com/dusk-network/bls12_381-sign/go/cgo/bls" - "github.com/dusk-network/dusk-blockchain/pkg/config" "github.com/dusk-network/dusk-blockchain/pkg/core/consensus/header" "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message" "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics" @@ -90,7 +89,7 @@ func TestAccumulatorProcessingAggregation(t *testing.T) { // Verify certificate comm = handler.Committee(hdr.Round, hdr.Step) - msg := message.NewWithHeader(topics.AggrAgreement, aggro, config.KadcastInitHeader) + msg := message.New(topics.AggrAgreement, aggro) buf, err := message.Marshal(msg) assert.Nil(t, err, "failed to marshal aggragreement") diff --git a/pkg/core/consensus/agreement/step.go b/pkg/core/consensus/agreement/step.go index 00b436f1b..b6c7abb0a 100644 --- a/pkg/core/consensus/agreement/step.go +++ b/pkg/core/consensus/agreement/step.go @@ -13,7 +13,6 @@ import ( "time" "github.com/dusk-network/bls12_381-sign/go/cgo/bls" - "github.com/dusk-network/dusk-blockchain/pkg/config" "github.com/dusk-network/dusk-blockchain/pkg/core/candidate" "github.com/dusk-network/dusk-blockchain/pkg/core/consensus" "github.com/dusk-network/dusk-blockchain/pkg/core/consensus/header" @@ -244,7 +243,7 @@ func collectAgreement(h *handler, accumulator *Accumulator, ev message.Message, return } - m := message.NewWithHeader(topics.Agreement, a.Copy().(message.Agreement), ev.Header()) + m := message.NewWithMetadata(topics.Agreement, a.Copy().(message.Agreement), ev.Metadata()) // Once the event is verified, we can republish it. if err := e.Republish(m); err != nil { @@ -288,9 +287,9 @@ func (s *Loop) processCollectedVotes(ctx context.Context, handler *handler, evs bits := comm.Bits(*pubs) agAgreement := message.NewAggrAgreement(evs[0], bits, sig) - m := message.NewWithHeader(topics.AggrAgreement, agAgreement, config.KadcastInitHeader) + m := message.New(topics.AggrAgreement, agAgreement) if err := s.Emitter.Republish(m); err != nil { - lg.WithError(err).Error("could not republish aggregated agreement event") + lg.WithError(err).Error("could broadcast aggregated agreement event") } else { lg. WithField("round", r.Round). @@ -380,7 +379,7 @@ func (s *Loop) processAggrAgreement(ctx context.Context, h *handler, msg message } // Broadcast - m := message.NewWithHeader(topics.AggrAgreement, aggro.Copy(), config.KadcastInitHeader) + m := message.NewWithMetadata(topics.AggrAgreement, aggro.Copy(), msg.Metadata()) if err = e.Republish(m); err != nil { lg.WithError(err).Errorln("could not republish aggragreement event") return nil, err diff --git a/pkg/core/consensus/comms.go b/pkg/core/consensus/comms.go index 4779c3ece..2f49ffef1 100644 --- a/pkg/core/consensus/comms.go +++ b/pkg/core/consensus/comms.go @@ -152,7 +152,7 @@ func (e *Emitter) Kadcast(msg message.Message) error { return err } - serialized := message.NewWithHeader(msg.Category(), buf, msg.Header()) + serialized := message.NewWithMetadata(msg.Category(), buf, msg.Metadata()) e.EventBus.Publish(topics.Kadcast, serialized) return nil } diff --git a/pkg/core/consensus/reduction/firststep/step.go b/pkg/core/consensus/reduction/firststep/step.go index 44eef0981..b33a458b9 100644 --- a/pkg/core/consensus/reduction/firststep/step.go +++ b/pkg/core/consensus/reduction/firststep/step.go @@ -123,7 +123,7 @@ func (p *Phase) Run(ctx context.Context, queue *consensus.Queue, _, reductionCha // if collectReduction returns a StepVote, it means we reached // consensus and can go to the next step - if sv := p.collectReduction(ctx, rMsg, r.Round, step, ev.Header()); sv != nil { + if sv := p.collectReduction(ctx, rMsg, r.Round, step, ev.Metadata()); sv != nil { go func() { <-timeoutChan }() @@ -141,7 +141,7 @@ func (p *Phase) Run(ctx context.Context, queue *consensus.Queue, _, reductionCha continue } - sv := p.collectReduction(ctx, rMsg, r.Round, step, ev.Header()) + sv := p.collectReduction(ctx, rMsg, r.Round, step, ev.Metadata()) if sv != nil { // preventing timeout leakage go func() { @@ -174,7 +174,7 @@ func (p *Phase) gotoNextPhase(msg *message.StepVotesMsg) consensus.PhaseFn { return p.next.Initialize(*msg) } -func (p *Phase) collectReduction(ctx context.Context, r message.Reduction, round uint64, step uint8, msgHeader []byte) *message.StepVotesMsg { +func (p *Phase) collectReduction(ctx context.Context, r message.Reduction, round uint64, step uint8, metadata *message.Metadata) *message.StepVotesMsg { if err := p.handler.VerifySignature(r.Copy().(message.Reduction)); err != nil { lg. WithError(err). @@ -194,7 +194,7 @@ func (p *Phase) collectReduction(ctx context.Context, r message.Reduction, round Debug("") } - m := message.NewWithHeader(topics.Reduction, r, msgHeader) + m := message.NewWithMetadata(topics.Reduction, r, metadata) // Once the event is verified, we can republish it. if err := p.Emitter.Republish(m); err != nil { diff --git a/pkg/core/consensus/reduction/reduction.go b/pkg/core/consensus/reduction/reduction.go index 3b332ac4a..8a503552f 100644 --- a/pkg/core/consensus/reduction/reduction.go +++ b/pkg/core/consensus/reduction/reduction.go @@ -152,7 +152,7 @@ func (r *Reduction) SendReduction(ctx context.Context, round uint64, step uint8, red := message.NewReduction(hdr) red.SignedHash = sig - m := message.NewWithHeader(topics.Reduction, *red, config.KadcastInitHeader) + m := message.New(topics.Reduction, *red) return m, voteHash, nil } diff --git a/pkg/core/consensus/reduction/secondstep/step.go b/pkg/core/consensus/reduction/secondstep/step.go index ae97fd1c3..5e5a9dcf3 100644 --- a/pkg/core/consensus/reduction/secondstep/step.go +++ b/pkg/core/consensus/reduction/secondstep/step.go @@ -12,7 +12,6 @@ import ( "encoding/hex" "time" - "github.com/dusk-network/dusk-blockchain/pkg/config" "github.com/dusk-network/dusk-blockchain/pkg/core/consensus" "github.com/dusk-network/dusk-blockchain/pkg/core/consensus/header" "github.com/dusk-network/dusk-blockchain/pkg/core/consensus/reduction" @@ -117,7 +116,7 @@ func (p *Phase) Run(ctx context.Context, queue *consensus.Queue, _, reductionCha // if collectReduction returns a StepVote, it means we reached // consensus and can go to the next step - svm := p.collectReduction(rMsg, r.Round, step, ev.Header()) + svm := p.collectReduction(rMsg, r.Round, step, ev.Metadata()) if svm == nil { continue } @@ -139,7 +138,7 @@ func (p *Phase) Run(ctx context.Context, queue *consensus.Queue, _, reductionCha continue } - svm := p.collectReduction(rMsg, r.Round, step, ev.Header()) + svm := p.collectReduction(rMsg, r.Round, step, ev.Metadata()) if svm == nil { continue } @@ -177,7 +176,7 @@ func (p *Phase) Run(ctx context.Context, queue *consensus.Queue, _, reductionCha } } -func (p *Phase) collectReduction(r message.Reduction, round uint64, step uint8, msgHeader []byte) *message.StepVotesMsg { +func (p *Phase) collectReduction(r message.Reduction, round uint64, step uint8, metadata *message.Metadata) *message.StepVotesMsg { hdr := r.State() if err := p.handler.VerifySignature(r.Copy().(message.Reduction)); err != nil { @@ -201,7 +200,7 @@ func (p *Phase) collectReduction(r message.Reduction, round uint64, step uint8, Debug("") } - m := message.NewWithHeader(topics.Reduction, r.Copy().(message.Reduction), msgHeader) + m := message.NewWithMetadata(topics.Reduction, r.Copy().(message.Reduction), metadata) // Once the event is verified, we can republish it. if err := p.Emitter.Republish(m); err != nil { @@ -279,7 +278,7 @@ func (p *Phase) sendAgreement(round uint64, step uint8, svm *message.StepVotesMs // Publishing Agreement internally so that it's the internal Agreement process(goroutine) // that should register it locally and only then broadcast it. - m := message.NewWithHeader(topics.Agreement, *ev, config.KadcastInitHeader) + m := message.New(topics.Agreement, *ev) p.EventBus.Publish(topics.Agreement, m) } diff --git a/pkg/core/consensus/selection/step.go b/pkg/core/consensus/selection/step.go index 41b47daad..1a7fdef88 100644 --- a/pkg/core/consensus/selection/step.go +++ b/pkg/core/consensus/selection/step.go @@ -15,7 +15,6 @@ import ( "strconv" "time" - "github.com/dusk-network/dusk-blockchain/pkg/config" "github.com/dusk-network/dusk-blockchain/pkg/core/candidate" "github.com/dusk-network/dusk-blockchain/pkg/core/consensus/blockgenerator" "github.com/dusk-network/dusk-blockchain/pkg/core/consensus/header" @@ -124,14 +123,14 @@ func (p *Phase) Run(parentCtx context.Context, queue *consensus.Queue, newBlockC logNewBlock(r.Round, step, scr.State().BlockHash, p.Keys.BLSPubKey) // Broadcast the candidate block for this round/iteration. - m := message.NewWithHeader(topics.NewBlock, *scr, []byte{config.KadcastInitialHeight}) + m := message.NewWithMetadata(topics.NewBlock, *scr, nil) if err := p.Republish(m); err != nil { lg.WithError(err). Error("could not republish new block") } // register new candidate in local state without propagating it. - m = message.NewWithHeader(topics.NewBlock, *scr, []byte{0}) + m = message.NewWithMetadata(topics.NewBlock, *scr, &message.Metadata{KadcastHeight: 0}) newBlockChan <- m } } @@ -149,7 +148,7 @@ func (p *Phase) Run(parentCtx context.Context, queue *consensus.Queue, newBlockC case ev := <-newBlockChan: if shouldProcess(ev, r.Round, step, queue) { b := ev.Payload().(message.NewBlock) - if err := p.collectNewBlock(b, ev.Header()); err != nil { + if err := p.collectNewBlock(b, ev.Metadata()); err != nil { continue } @@ -212,7 +211,7 @@ func (p *Phase) verifyNewBlock(msg message.NewBlock) error { return nil } -func (p *Phase) collectNewBlock(msg message.NewBlock, msgHeader []byte) error { +func (p *Phase) collectNewBlock(msg message.NewBlock, metadata *message.Metadata) error { if err := p.verifyNewBlock(msg); err != nil { msg.WithFields(lg). WithField("seed", hex.EncodeToString(p.handler.Seed())). @@ -237,7 +236,7 @@ func (p *Phase) collectNewBlock(msg message.NewBlock, msgHeader []byte) error { // Once the event is verified, and has passed all preliminary checks, // we can republish it to the network. - m := message.NewWithHeader(topics.NewBlock, msg, msgHeader) + m := message.NewWithMetadata(topics.NewBlock, msg, metadata) if err := p.Republish(m); err != nil { lg.WithError(err). Error("could not republish new block") diff --git a/pkg/core/mempool/mempool.go b/pkg/core/mempool/mempool.go index 6739cf9ba..76234c12e 100644 --- a/pkg/core/mempool/mempool.go +++ b/pkg/core/mempool/mempool.go @@ -214,8 +214,8 @@ func (m *Mempool) ProcessTx(srcPeerID string, msg message.Message) ([]bytes.Buff } var h byte - if len(msg.Header()) > 0 { - h = msg.Header()[0] + if msg.Metadata() != nil { + h = msg.Metadata().KadcastHeight } t := TxDesc{ @@ -508,7 +508,8 @@ func (m *Mempool) kadcastTx(t TxDesc) error { return err } - msg := message.NewWithHeader(topics.Tx, *buf, []byte{t.kadHeight}) + metadata := message.Metadata{KadcastHeight: t.kadHeight} + msg := message.NewWithMetadata(topics.Tx, *buf, &metadata) m.eventBus.Publish(topics.Kadcast, msg) return nil @@ -536,7 +537,8 @@ func (m *Mempool) RequestUpdates() { panic(err) } - msg := message.NewWithHeader(topics.MemPool, buf, []byte{numNodes}) + metadata := message.Metadata{NumNodes: numNodes} + msg := message.NewWithMetadata(topics.MemPool, buf, &metadata) m.eventBus.Publish(topics.KadcastSendToMany, msg) } diff --git a/pkg/p2p/kadcast/peer_test.go b/pkg/p2p/kadcast/peer_test.go index 2db08b18c..8b20c782b 100644 --- a/pkg/p2p/kadcast/peer_test.go +++ b/pkg/p2p/kadcast/peer_test.go @@ -13,6 +13,7 @@ import ( "fmt" "net" "testing" + "time" "google.golang.org/grpc" @@ -89,6 +90,43 @@ func TestListenStreamReader(t *testing.T) { srv.Stop() } +// TestNoBroadcastWriter tests the kadcli.Writer by broadcasting +// a block message that should not be repropagated. +func TestNoBroadcastWriter(t *testing.T) { + // assert := assert.New(t) + rcvChan := make(chan *rusk.BroadcastMessage) + + // Basic infrastructure + eb := eventbus.New() + g := protocol.NewGossip() + + // create a mock client + cli := NewMockNetworkClient(rcvChan) + + // create our kadcli Writer + _ = writer.NewBroadcast(context.Background(), eb, g, cli) + + // create a mock message + buf, err := createBlockMessage() + if err != nil { + t.Errorf("fail to create msg: %v", err) + } + + // prepare a message to not being repropagated + pubm := message.NewWithMetadata(topics.Block, *buf, &message.Metadata{KadcastHeight: 0}) + + errList := eb.Publish(topics.Kadcast, pubm) + if len(errList) > 0 { + t.Fatal("error publishing to evt bus") + } + + select { + case m := <-rcvChan: + t.Fatal("Received message with height ", m.KadcastHeight) + case <-time.After(3 * time.Second): + } +} + // TestBroadcastWriter tests the kadcli.Writer by broadcasting // a block message through a mocked rusk client. func TestBroadcastWriter(t *testing.T) { @@ -111,8 +149,10 @@ func TestBroadcastWriter(t *testing.T) { t.Errorf("fail to create msg: %v", err) } + var testBroadcastHeight uint32 = 5 + // send a broadcast message - pubm := message.NewWithHeader(topics.Block, *buf, []byte{127}) + pubm := message.NewWithMetadata(topics.Block, *buf, &message.Metadata{KadcastHeight: byte(testBroadcastHeight)}) errList := eb.Publish(topics.Kadcast, pubm) if len(errList) > 0 { @@ -121,7 +161,7 @@ func TestBroadcastWriter(t *testing.T) { // process status/output m := <-rcvChan - assert.True(m.KadcastHeight == 127-1) + assert.True(m.KadcastHeight == testBroadcastHeight-1) // attempt to read the message reader := bytes.NewReader(m.Message) @@ -149,7 +189,7 @@ func TestBroadcastWriter(t *testing.T) { topic := topics.Topic(rb.Bytes()[0]) assert.True(topic == topics.Block) // unmarshal message - res, err := message.Unmarshal(rb, []byte{}) + res, err := message.Unmarshal(rb, nil) if err != nil { t.Error("failed to unmarshal") } diff --git a/pkg/p2p/kadcast/reader.go b/pkg/p2p/kadcast/reader.go index 5b3bbe9b4..c6df1f2b4 100644 --- a/pkg/p2p/kadcast/reader.go +++ b/pkg/p2p/kadcast/reader.go @@ -97,10 +97,13 @@ func (r *Reader) processMessage(msg *rusk.Message) { return } - h := []byte{byte(msg.Metadata.KadcastHeight)} + metadata := message.Metadata{ + KadcastHeight: byte(msg.Metadata.KadcastHeight), + Source: msg.Metadata.SrcAddress, + } // collect (process) the message - respBufs, err := r.processor.Collect(msg.Metadata.SrcAddress, m, nil, protocol.FullNode, h) + respBufs, err := r.processor.Collect(msg.Metadata.SrcAddress, m, nil, protocol.FullNode, &metadata) if err != nil { var topic string if len(m) > 0 { @@ -117,7 +120,7 @@ func (r *Reader) processMessage(msg *rusk.Message) { for i := 0; i < len(respBufs); i++ { log.WithField("r_addr", msg.Metadata.SrcAddress).Trace("send point-to-point message") // send Kadcast point-to-point message with source address as destination - msg := message.NewWithHeader(topics.KadcastSendToOne, respBufs[i], []byte(msg.Metadata.SrcAddress)) + msg := message.NewWithMetadata(topics.KadcastSendToOne, respBufs[i], &metadata) r.publisher.Publish(topics.KadcastSendToOne, msg) } } diff --git a/pkg/p2p/kadcast/writer/broadcast.go b/pkg/p2p/kadcast/writer/broadcast.go index 410e01db7..d412ffb64 100644 --- a/pkg/p2p/kadcast/writer/broadcast.go +++ b/pkg/p2p/kadcast/writer/broadcast.go @@ -9,8 +9,9 @@ package writer import ( "bytes" "context" - "errors" + "github.com/dusk-network/dusk-blockchain/pkg/config" + "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message" "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/protocol" "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics" "github.com/dusk-network/dusk-blockchain/pkg/util/container/ring" @@ -64,8 +65,8 @@ func (w *Broadcast) Subscribe() { } // Write implements. ring.Writer. -func (w *Broadcast) Write(data, header []byte, priority byte) (int, error) { - if err := w.broadcast(data, header, priority); err != nil { +func (w *Broadcast) Write(data []byte, metadata *message.Metadata, priority byte) (int, error) { + if err := w.broadcast(data, metadata, priority); err != nil { // A returned error here is treated as unrecoverable err. log.WithError(err).WithField("handler", w.topic.String()).Warn("write failed") } @@ -74,24 +75,21 @@ func (w *Broadcast) Write(data, header []byte, priority byte) (int, error) { } // broadcast broadcasts message to the entire network. -// The kadcast height is read from message Header. -func (w *Broadcast) broadcast(data, header []byte, _ byte) error { - // check header - if len(header) == 0 { - return errors.New("empty message header") - } +// The kadcast height is read from message metadata. +func (w *Broadcast) broadcast(data []byte, metadata *message.Metadata, _ byte) error { + h := config.KadcastInitialHeight // extract kadcast height - h := uint32(header[0]) - if h == 0 { - // Apparently, this node is the last peer in a bucket of height 0. We - // should not repropagate. - return nil + if metadata != nil { + if metadata.KadcastHeight == 0 { + // Apparently, this node is the last peer in a bucket of height 0. We + // should not repropagate. + return nil + } + // Decrement kadcast height + h = metadata.KadcastHeight - 1 } - // Decrement kadcast height - h-- - // create the message b := bytes.NewBuffer(data) if err := w.gossip.Process(b); err != nil { @@ -100,7 +98,7 @@ func (w *Broadcast) broadcast(data, header []byte, _ byte) error { // prepare message m := &rusk.BroadcastMessage{ - KadcastHeight: h, + KadcastHeight: uint32(h), Message: b.Bytes(), } // broadcast message diff --git a/pkg/p2p/kadcast/writer/sendmany.go b/pkg/p2p/kadcast/writer/sendmany.go index 1ef1186ac..58262a3a3 100644 --- a/pkg/p2p/kadcast/writer/sendmany.go +++ b/pkg/p2p/kadcast/writer/sendmany.go @@ -10,6 +10,7 @@ import ( "context" "errors" + "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message" "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/protocol" "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics" "github.com/dusk-network/dusk-blockchain/pkg/util/container/ring" @@ -46,8 +47,8 @@ func (w *SendToMany) Subscribe() { } // Write ... -func (w *SendToMany) Write(data, header []byte, priority byte) (int, error) { - if err := w.sendToMany(data, header, priority); err != nil { +func (w *SendToMany) Write(data []byte, metadata *message.Metadata, priority byte) (int, error) { + if err := w.sendToMany(data, metadata, priority); err != nil { log.WithError(err).Warn("write failed") } @@ -55,13 +56,13 @@ func (w *SendToMany) Write(data, header []byte, priority byte) (int, error) { } // sendToMany sends a message to N random endpoints returned by AliveNodes. -func (w *SendToMany) sendToMany(data, header []byte, _ byte) error { - if len(header) == 0 || header[0] == 0 { - return errors.New("empty message header") +func (w *SendToMany) sendToMany(data []byte, metadata *message.Metadata, _ byte) error { + if metadata == nil { + return errors.New("empty message metadata") } // get N active nodes - req := &rusk.AliveNodesRequest{MaxNodes: uint32(header[0])} + req := &rusk.AliveNodesRequest{MaxNodes: uint32(metadata.NumNodes)} resp, err := w.client.AliveNodes(w.ctx, req) if err != nil { diff --git a/pkg/p2p/kadcast/writer/sendone.go b/pkg/p2p/kadcast/writer/sendone.go index 3ca40c6e7..495d0ee93 100644 --- a/pkg/p2p/kadcast/writer/sendone.go +++ b/pkg/p2p/kadcast/writer/sendone.go @@ -10,6 +10,7 @@ import ( "context" "errors" + "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message" "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/protocol" "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics" "github.com/dusk-network/dusk-blockchain/pkg/util/container/ring" @@ -45,18 +46,18 @@ func (w *SendToOne) Subscribe() { } // Write implements. ring.Writer. -func (w *SendToOne) Write(data, header []byte, priority byte) (int, error) { - if err := w.sendToOne(data, header, priority); err != nil { +func (w *SendToOne) Write(data []byte, metadata *message.Metadata, priority byte) (int, error) { + if err := w.sendToOne(data, metadata, priority); err != nil { log.WithError(err).Warn("write failed") } return 0, nil } -func (w *SendToOne) sendToOne(data, header []byte, _ byte) error { - if len(header) == 0 { - return errors.New("empty message header") +func (w *SendToOne) sendToOne(data []byte, metadata *message.Metadata, _ byte) error { + if metadata == nil { + return errors.New("empty message metadata") } - return w.Send(data, string(header)) + return w.Send(data, metadata.Source) } diff --git a/pkg/p2p/peer/peer.go b/pkg/p2p/peer/peer.go index 8675440a3..0af5bbed1 100755 --- a/pkg/p2p/peer/peer.go +++ b/pkg/p2p/peer/peer.go @@ -22,6 +22,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/checksum" + "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message" "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/protocol" "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics" "github.com/dusk-network/dusk-blockchain/pkg/util/container/ring" @@ -62,7 +63,7 @@ type GossipConnector struct { *Connection } -func (g *GossipConnector) Write(b, header []byte, priority byte) (int, error) { +func (g *GossipConnector) Write(b []byte, _ *message.Metadata, priority byte) (int, error) { if !canRoute(g.services, topics.Topic(b[0])) { return 0, nil } diff --git a/pkg/p2p/peer/processor.go b/pkg/p2p/peer/processor.go index 600f7bca0..51e023fe1 100644 --- a/pkg/p2p/peer/processor.go +++ b/pkg/p2p/peer/processor.go @@ -49,7 +49,7 @@ func (m *MessageProcessor) Register(topic topics.Topic, fn ProcessorFunc) { // Collect a message from the network. The message is unmarshaled and passed down // to the processing function. -func (m *MessageProcessor) Collect(srcPeerID string, packet []byte, respRingBuf *ring.Buffer, services protocol.ServiceFlag, header []byte) ([]bytes.Buffer, error) { +func (m *MessageProcessor) Collect(srcPeerID string, packet []byte, respRingBuf *ring.Buffer, services protocol.ServiceFlag, metadata *message.Metadata) ([]bytes.Buffer, error) { if len(packet) == 0 { return nil, errors.New("empty packet provided") } @@ -58,7 +58,7 @@ func (m *MessageProcessor) Collect(srcPeerID string, packet []byte, respRingBuf b := bytes.NewBuffer(packet) topic := topics.Topic(b.Bytes()[0]) - msg, err := message.Unmarshal(b, header) + msg, err := message.Unmarshal(b, metadata) if err != nil { return nil, fmt.Errorf("error while unmarshaling: %s - topic: %s", err, topic) } diff --git a/pkg/p2p/wire/message/message.go b/pkg/p2p/wire/message/message.go index 2961256fa..475f1bdab 100644 --- a/pkg/p2p/wire/message/message.go +++ b/pkg/p2p/wire/message/message.go @@ -53,7 +53,7 @@ type Message interface { // created internally and never serialized, this should return an empty buffer. CachedBinary() bytes.Buffer - Header() []byte + Metadata() *Metadata } // Serializable allows to set a payload. @@ -81,8 +81,8 @@ type simple struct { payload payload.Safe // cached marshaled form with Category. marshaled *bytes.Buffer - // header used as metadata (e.g kadcast.height). - header []byte + // metadata from kadcast network. + metadata *Metadata } // Clone creates a new Message which carries a copy of the payload. @@ -97,7 +97,7 @@ func Clone(m Message) (Message, error) { category: m.Category(), marshaled: &b, payload: m.Payload().Copy(), - header: m.Header(), + metadata: m.Metadata(), }, nil } @@ -135,10 +135,6 @@ func (m simple) String() string { return sb.String() } -func (m simple) Header() []byte { - return m.header -} - // Id is the Id the Message. // nolint:golint func (m simple) Id() []byte { @@ -215,10 +211,10 @@ func New(top topics.Topic, p interface{}) Message { return &simple{category: top, payload: safePayload} } -// NewWithHeader creates a new Message with non-nil header. -func NewWithHeader(t topics.Topic, payload interface{}, header []byte) Message { +// NewWithMetadata creates a new Message with non-nil metadata. +func NewWithMetadata(t topics.Topic, payload interface{}, metadata *Metadata) Message { safePayload := convertToSafePayload(payload) - return &simple{category: t, payload: safePayload, header: header} + return &simple{category: t, payload: safePayload, metadata: metadata} } func (m *simple) initPayloadBuffer(b bytes.Buffer) { @@ -230,10 +226,10 @@ func (m *simple) initPayloadBuffer(b bytes.Buffer) { // Unmarshal mutates the buffer by extracting the topic. It create the Message // by setting the topic and unmarshaling the payload into the proper structure // It also caches the serialized form within the message. -func Unmarshal(b *bytes.Buffer, h []byte) (Message, error) { +func Unmarshal(b *bytes.Buffer, h *Metadata) (Message, error) { var err error - msg := &simple{header: h} + msg := &simple{metadata: h} msg.initPayloadBuffer(*b) topic, err := topics.Extract(b) diff --git a/pkg/p2p/wire/message/metadata.go b/pkg/p2p/wire/message/metadata.go new file mode 100644 index 000000000..159300955 --- /dev/null +++ b/pkg/p2p/wire/message/metadata.go @@ -0,0 +1,18 @@ +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT License was not distributed with this +// file, you can obtain one at https://opensource.org/licenses/MIT. +// +// Copyright (c) DUSK NETWORK. All rights reserved. + +package message + +// Metadata is a struct containing messages metadata. +type Metadata struct { + KadcastHeight byte + Source string + NumNodes byte +} + +func (m simple) Metadata() *Metadata { + return m.metadata +} diff --git a/pkg/p2p/wire/message/transactions_test.go b/pkg/p2p/wire/message/transactions_test.go index d41488961..a76336e52 100644 --- a/pkg/p2p/wire/message/transactions_test.go +++ b/pkg/p2p/wire/message/transactions_test.go @@ -11,7 +11,6 @@ import ( "encoding/hex" "testing" - "github.com/dusk-network/dusk-blockchain/pkg/config" "github.com/dusk-network/dusk-blockchain/pkg/core/data/ipc/transactions" "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/checksum" "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message" @@ -63,7 +62,7 @@ func TestWireTransaction(t *testing.T) { buffer := bytes.NewBuffer(m) - message, err := message.Unmarshal(buffer, config.KadcastInitHeader) + message, err := message.Unmarshal(buffer, nil) if err != nil { t.Fatalf("Unable to unmarshal: %v", err) } diff --git a/pkg/util/container/ring/buffer.go b/pkg/util/container/ring/buffer.go index 1e3e4a066..b4a693855 100644 --- a/pkg/util/container/ring/buffer.go +++ b/pkg/util/container/ring/buffer.go @@ -10,12 +10,14 @@ import ( "bytes" "sync" "sync/atomic" + + "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message" ) // Elem single data unit of a ring buffer. type Elem struct { Data []byte - Header []byte + Metadata *message.Metadata Priority byte } diff --git a/pkg/util/container/ring/consumer.go b/pkg/util/container/ring/consumer.go index 305f04fe9..570a688f4 100644 --- a/pkg/util/container/ring/consumer.go +++ b/pkg/util/container/ring/consumer.go @@ -8,11 +8,13 @@ package ring import ( "sort" + + "github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message" ) // Writer defines a Writer interface compatible with ring.Elem. type Writer interface { - Write(data, header []byte, priority byte) (int, error) + Write(data []byte, metadata *message.Metadata, priority byte) (int, error) Close() error } diff --git a/pkg/util/nativeutils/eventbus/eventbus_test.go b/pkg/util/nativeutils/eventbus/eventbus_test.go index 3ae2cc8f4..1b4738081 100644 --- a/pkg/util/nativeutils/eventbus/eventbus_test.go +++ b/pkg/util/nativeutils/eventbus/eventbus_test.go @@ -185,7 +185,7 @@ func TestExitChan(t *testing.T) { type mockWriteCloser struct{} -func (m *mockWriteCloser) Write(data, header []byte, priority byte) (int, error) { +func (m *mockWriteCloser) Write(data []byte, _ *message.Metadata, priority byte) (int, error) { return 0, errors.New("failed") } diff --git a/pkg/util/nativeutils/eventbus/listener.go b/pkg/util/nativeutils/eventbus/listener.go index 461741b9c..a0e3764ba 100644 --- a/pkg/util/nativeutils/eventbus/listener.go +++ b/pkg/util/nativeutils/eventbus/listener.go @@ -123,7 +123,7 @@ func (s *StreamListener) Notify(m message.Message) error { e := ring.Elem{ Data: buf.Bytes(), - Header: m.Header(), + Metadata: m.Metadata(), Priority: 0, } @@ -153,7 +153,7 @@ func (s *StreamListener) Close() { // Consume an item by writing it to the specified WriteCloser. This is used in the StreamListener creation. func Consume(elems []ring.Elem, w ring.Writer) bool { for _, e := range elems { - if _, err := w.Write(e.Data, e.Header, e.Priority); err != nil { + if _, err := w.Write(e.Data, e.Metadata, e.Priority); err != nil { logEB.WithField("queue", "ringbuffer").WithError(err).Warnln("error in writing to WriteCloser") return false } diff --git a/pkg/util/nativeutils/eventbus/mock.go b/pkg/util/nativeutils/eventbus/mock.go index fdccc576d..a5250544e 100644 --- a/pkg/util/nativeutils/eventbus/mock.go +++ b/pkg/util/nativeutils/eventbus/mock.go @@ -149,7 +149,7 @@ func NewSimpleStreamer() *SimpleStreamer { // Write receives the packets from the ringbuffer and writes it on the internal // pipe immediately. -func (ms *SimpleStreamer) Write(data, header []byte, priority byte) (n int, err error) { +func (ms *SimpleStreamer) Write(data []byte, _ *message.Metadata, priority byte) (n int, err error) { b := bytes.NewBuffer(data) if e := ms.gossip.Process(b); e != nil { return 0, e