Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Propagate message metadata
Browse files Browse the repository at this point in the history
- Replace the current `message.Header` bytes with a structured `message.Metadata`
- Remove redundant usages of `config.KadcastInitHeader`

Resolves #1452
herr-seppia committed Aug 8, 2022

Verified

This commit was signed with the committer’s verified signature.
oesteban Oscar Esteban
1 parent a01ed51 commit bbc400e
Showing 30 changed files with 189 additions and 129 deletions.
5 changes: 1 addition & 4 deletions pkg/config/consts.go
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@ const (
MaxBlockTime = 360 // maximum block time in seconds

// KadcastInitialHeight sets the default initial height for Kadcast broadcast algorithm.
KadcastInitialHeight byte = 128 + 1
KadcastInitialHeight byte = 128

// The dusk-blockchain executable version.
NodeVersion = "0.6.0-rc"
@@ -63,6 +63,3 @@ const (
// GetCandidateReceivers is a redundancy factor on retrieving a missing candidate block.
GetCandidateReceivers = 7
)

// KadcastInitHeader is used as default initial kadcast message header.
var KadcastInitHeader = []byte{KadcastInitialHeight}
4 changes: 2 additions & 2 deletions pkg/core/candidate/requestor.go
Original file line number Diff line number Diff line change
@@ -87,7 +87,7 @@ func (r *Requestor) publishGetCandidate(hash []byte) error {
return err
}

m := message.NewWithHeader(topics.GetCandidate, *buf, config.KadcastInitHeader)
m := message.New(topics.GetCandidate, *buf)

r.publisher.Publish(topics.Kadcast, m)
return nil
@@ -102,7 +102,7 @@ func (r *Requestor) sendGetCandidate(hash []byte) error {
return err
}

msg := message.NewWithHeader(topics.GetCandidate, buf, []byte{config.GetCandidateReceivers})
msg := message.NewWithMetadata(topics.GetCandidate, buf, &message.Metadata{NumNodes: config.GetCandidateReceivers})
r.publisher.Publish(topics.KadcastSendToMany, msg)
return nil
}
28 changes: 12 additions & 16 deletions pkg/core/chain/chain.go
Original file line number Diff line number Diff line change
@@ -256,10 +256,8 @@ func (c *Chain) ProcessBlockFromNetwork(srcPeerID string, m message.Message) ([]
l := log.WithField("recv_blk_h", blk.Header.Height).
WithField("curr_h", c.tip.Header.Height)

var kh byte = 255
if len(m.Header()) > 0 {
kh = m.Header()[0]
l = l.WithField("kad_h", kh)
if m.Metadata() != nil {
l = l.WithField("kad_h", m.Metadata().KadcastHeight)
}

l.Trace("block received")
@@ -293,7 +291,7 @@ func (c *Chain) ProcessBlockFromNetwork(srcPeerID string, m message.Message) ([]
// out if any other node propagates it back when this node is syncing up.
c.blacklisted.Add(bytes.NewBuffer(hash))

return c.synchronizer.processBlock(srcPeerID, c.tip.Header.Height, blk, kh)
return c.synchronizer.processBlock(srcPeerID, c.tip.Header.Height, blk, m.Metadata())
}
case blk.Header.Height < c.tip.Header.Height:
l.Debug("discard block")
@@ -318,22 +316,22 @@ func (c *Chain) ProcessBlockFromNetwork(srcPeerID string, m message.Message) ([]
c.highestSeen = blk.Header.Height
}

return c.synchronizer.processBlock(srcPeerID, c.tip.Header.Height, blk, kh)
return c.synchronizer.processBlock(srcPeerID, c.tip.Header.Height, blk, m.Metadata())
}

// TryNextConsecutiveBlockOutSync is the processing path for accepting a block
// from the network during out-of-sync state.
func (c *Chain) TryNextConsecutiveBlockOutSync(blk block.Block, kadcastHeight byte) error {
func (c *Chain) TryNextConsecutiveBlockOutSync(blk block.Block, metadata *message.Metadata) error {
log.WithField("height", blk.Header.Height).Trace("accepting sync block")
return c.acceptBlock(blk, true)
}

// TryNextConsecutiveBlockInSync is the processing path for accepting a block
// from the network during in-sync state. Returns err if the block is not valid.
func (c *Chain) TryNextConsecutiveBlockInSync(blk block.Block, kadcastHeight byte) error {
func (c *Chain) TryNextConsecutiveBlockInSync(blk block.Block, metadata *message.Metadata) error {
// Make an attempt to accept a new block. If succeeds, we could safely restart the Consensus Loop.
// If not, peer reputation score should be decreased.
if err := c.acceptSuccessiveBlock(blk, kadcastHeight); err != nil {
if err := c.acceptSuccessiveBlock(blk, metadata); err != nil {
return err
}

@@ -385,15 +383,15 @@ func (c *Chain) ProcessSyncTimerExpired(strPeerAddr string) error {

// acceptSuccessiveBlock will accept a block which directly follows the chain
// tip, and advertises it to the node's peers.
func (c *Chain) acceptSuccessiveBlock(blk block.Block, kadcastHeight byte) error {
func (c *Chain) acceptSuccessiveBlock(blk block.Block, metadata *message.Metadata) error {
log.WithField("height", blk.Header.Height).Trace("accepting succeeding block")

if err := c.isValidHeader(blk, *c.tip, *c.p, log, true); err != nil {
log.WithError(err).Error("invalid block")
return err
}

if err := c.kadcastBlock(blk, kadcastHeight); err != nil {
if err := c.kadcastBlock(blk, metadata); err != nil {
log.WithError(err).Error("block propagation failed")
return err
}
@@ -735,9 +733,8 @@ func (c *Chain) ExecuteStateTransition(ctx context.Context, txs []transactions.C
return c.proxy.Executor().ExecuteStateTransition(c.ctx, txs, blockGasLimit, blockHeight, generator)
}

func (c *Chain) kadcastBlock(blk block.Block, kadcastHeight byte) error {
log.WithField("blk_height", blk.Header.Height).
WithField("kadcast_h", kadcastHeight).Trace("propagate block")
func (c *Chain) kadcastBlock(blk block.Block, metadata *message.Metadata) error {
log.WithField("blk_height", blk.Header.Height).Trace("propagate block")

buf := new(bytes.Buffer)
if err := message.MarshalBlock(buf, &blk); err != nil {
@@ -748,8 +745,7 @@ func (c *Chain) kadcastBlock(blk block.Block, kadcastHeight byte) error {
return err
}

c.eventBus.Publish(topics.Kadcast,
message.NewWithHeader(topics.Block, *buf, []byte{kadcastHeight}))
c.eventBus.Publish(topics.Kadcast, message.NewWithMetadata(topics.Block, *buf, metadata))
return nil
}

2 changes: 1 addition & 1 deletion pkg/core/chain/consensus.go
Original file line number Diff line number Diff line change
@@ -70,7 +70,7 @@ func (c *Chain) acceptConsensusResults(ctx context.Context, winnerChan chan cons
return
}

if err = c.acceptSuccessiveBlock(block, config.KadcastInitialHeight); err != nil {
if err = c.acceptSuccessiveBlock(block, nil); err != nil {
log.WithError(err).Error("block acceptance failed")
c.lock.Unlock()
return
9 changes: 6 additions & 3 deletions pkg/core/chain/ledger.go
Original file line number Diff line number Diff line change
@@ -6,12 +6,15 @@

package chain

import "github.com/dusk-network/dusk-blockchain/pkg/core/data/block"
import (
"github.com/dusk-network/dusk-blockchain/pkg/core/data/block"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message"
)

// Ledger is the Chain interface used in tests.
type Ledger interface {
TryNextConsecutiveBlockInSync(blk block.Block, kadcastHeight byte) error
TryNextConsecutiveBlockOutSync(blk block.Block, kadcastHeight byte) error
TryNextConsecutiveBlockInSync(blk block.Block, metadata *message.Metadata) error
TryNextConsecutiveBlockOutSync(blk block.Block, metadata *message.Metadata) error
TryNextConsecutiveBlockIsValid(blk block.Block) error

// RestartConsensus Stop and Start Consensus.
18 changes: 9 additions & 9 deletions pkg/core/chain/synchronizer.go
Original file line number Diff line number Diff line change
@@ -26,9 +26,9 @@ const (

var slog = logrus.WithField("process", "sync")

type syncState func(srcPeerAddr string, currentHeight uint64, blk block.Block, kadcastHeight byte) ([]bytes.Buffer, error)
type syncState func(srcPeerAddr string, currentHeight uint64, blk block.Block, metadata *message.Metadata) ([]bytes.Buffer, error)

func (s *synchronizer) inSync(srcPeerAddr string, currentHeight uint64, blk block.Block, kadcastHeight byte) ([]bytes.Buffer, error) {
func (s *synchronizer) inSync(srcPeerAddr string, currentHeight uint64, blk block.Block, metadata *message.Metadata) ([]bytes.Buffer, error) {
if blk.Header.Height > currentHeight+1 {
s.sequencer.add(blk)

@@ -43,12 +43,12 @@ func (s *synchronizer) inSync(srcPeerAddr string, currentHeight uint64, blk bloc
s.timer.Start(srcPeerAddr)

s.state = s.outSync
b, err := s.startSync(srcPeerAddr, blk.Header.Height, currentHeight, kadcastHeight)
b, err := s.startSync(srcPeerAddr, blk.Header.Height, currentHeight, metadata)
return b, err
}

// Otherwise notify the chain (and the consensus loop).
if err := s.chain.TryNextConsecutiveBlockInSync(blk, kadcastHeight); err != nil {
if err := s.chain.TryNextConsecutiveBlockInSync(blk, metadata); err != nil {
slog.WithField("blk_height", blk.Header.Height).
WithField("blk_hash", hex.EncodeToString(blk.Header.Hash)).
WithField("state", "insync").
@@ -60,7 +60,7 @@ func (s *synchronizer) inSync(srcPeerAddr string, currentHeight uint64, blk bloc
return nil, nil
}

func (s *synchronizer) outSync(srcPeerAddr string, currentHeight uint64, blk block.Block, kadcastHeight byte) ([]bytes.Buffer, error) {
func (s *synchronizer) outSync(srcPeerAddr string, currentHeight uint64, blk block.Block, metadata *message.Metadata) ([]bytes.Buffer, error) {
var err error

// Once we validate successfully the next block from the syncing
@@ -102,7 +102,7 @@ func (s *synchronizer) outSync(srcPeerAddr string, currentHeight uint64, blk blo

for _, blk := range blks {
// append them all to the ledger
if err = s.chain.TryNextConsecutiveBlockOutSync(blk, kadcastHeight); err != nil {
if err = s.chain.TryNextConsecutiveBlockOutSync(blk, metadata); err != nil {
slog.WithError(err).WithField("state", "outsync").
Warn("could not accept block")

@@ -171,17 +171,17 @@ func newSynchronizer(db database.DB, chain Ledger) *synchronizer {
}

// processBlock handles an incoming block from the network.
func (s *synchronizer) processBlock(srcPeerID string, currentHeight uint64, blk block.Block, kadcastHeight byte) (res []bytes.Buffer, err error) {
func (s *synchronizer) processBlock(srcPeerID string, currentHeight uint64, blk block.Block, metadata *message.Metadata) (res []bytes.Buffer, err error) {
// Clean up sequencer
s.sequencer.cleanup(currentHeight)
s.sequencer.dump()

currState := s.state
res, err = currState(srcPeerID, currentHeight, blk, kadcastHeight)
res, err = currState(srcPeerID, currentHeight, blk, metadata)
return
}

func (s *synchronizer) startSync(strPeerAddr string, tipHeight, currentHeight uint64, _ byte) ([]bytes.Buffer, error) {
func (s *synchronizer) startSync(strPeerAddr string, tipHeight, currentHeight uint64, _ *message.Metadata) ([]bytes.Buffer, error) {
s.hrange.from = currentHeight
s.setSyncTarget(tipHeight, currentHeight+config.MaxInvBlocks)

9 changes: 5 additions & 4 deletions pkg/core/chain/synchronizer_test.go
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@ import (
"github.com/dusk-network/dusk-blockchain/pkg/core/database"
"github.com/dusk-network/dusk-blockchain/pkg/core/database/lite"
"github.com/dusk-network/dusk-blockchain/pkg/core/tests/helper"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics"
assert "github.com/stretchr/testify/require"
)
@@ -25,7 +26,7 @@ func TestSuccessiveBlocks(t *testing.T) {

// tipHeight will be 0, so make the successive block
blk := helper.RandomBlock(1, 1)
res, err := s.processBlock("", 0, *blk, 0)
res, err := s.processBlock("", 0, *blk, nil)
assert.NoError(err)
assert.Nil(res)

@@ -40,7 +41,7 @@ func TestFutureBlocks(t *testing.T) {

height := uint64(10)
blk := helper.RandomBlock(height, 1)
resp, err := s.processBlock("", 0, *blk, 0)
resp, err := s.processBlock("", 0, *blk, nil)
assert.NoError(err)

// Response should be of the GetBlocks topic
@@ -75,12 +76,12 @@ func (m *mockChain) CurrentHeight() uint64 {
return m.tipHeight
}

func (m *mockChain) TryNextConsecutiveBlockInSync(blk block.Block, _ byte) error {
func (m *mockChain) TryNextConsecutiveBlockInSync(blk block.Block, _ *message.Metadata) error {
m.catchBlockChan <- consensus.Results{Blk: blk, Err: nil}
return nil
}

func (m *mockChain) TryNextConsecutiveBlockOutSync(_ block.Block, _ byte) error {
func (m *mockChain) TryNextConsecutiveBlockOutSync(_ block.Block, _ *message.Metadata) error {
return nil
}

3 changes: 1 addition & 2 deletions pkg/core/consensus/agreement/agreement_in_test.go
Original file line number Diff line number Diff line change
@@ -11,7 +11,6 @@ import (
"testing"

"github.com/dusk-network/bls12_381-sign/go/cgo/bls"
"github.com/dusk-network/dusk-blockchain/pkg/config"
"github.com/dusk-network/dusk-blockchain/pkg/core/consensus/header"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics"
@@ -90,7 +89,7 @@ func TestAccumulatorProcessingAggregation(t *testing.T) {
// Verify certificate
comm = handler.Committee(hdr.Round, hdr.Step)

msg := message.NewWithHeader(topics.AggrAgreement, aggro, config.KadcastInitHeader)
msg := message.New(topics.AggrAgreement, aggro)
buf, err := message.Marshal(msg)
assert.Nil(t, err, "failed to marshal aggragreement")

9 changes: 4 additions & 5 deletions pkg/core/consensus/agreement/step.go
Original file line number Diff line number Diff line change
@@ -13,7 +13,6 @@ import (
"time"

"github.com/dusk-network/bls12_381-sign/go/cgo/bls"
"github.com/dusk-network/dusk-blockchain/pkg/config"
"github.com/dusk-network/dusk-blockchain/pkg/core/candidate"
"github.com/dusk-network/dusk-blockchain/pkg/core/consensus"
"github.com/dusk-network/dusk-blockchain/pkg/core/consensus/header"
@@ -244,7 +243,7 @@ func collectAgreement(h *handler, accumulator *Accumulator, ev message.Message,
return
}

m := message.NewWithHeader(topics.Agreement, a.Copy().(message.Agreement), ev.Header())
m := message.NewWithMetadata(topics.Agreement, a.Copy().(message.Agreement), ev.Metadata())

// Once the event is verified, we can republish it.
if err := e.Republish(m); err != nil {
@@ -288,9 +287,9 @@ func (s *Loop) processCollectedVotes(ctx context.Context, handler *handler, evs
bits := comm.Bits(*pubs)
agAgreement := message.NewAggrAgreement(evs[0], bits, sig)

m := message.NewWithHeader(topics.AggrAgreement, agAgreement, config.KadcastInitHeader)
m := message.New(topics.AggrAgreement, agAgreement)
if err := s.Emitter.Republish(m); err != nil {
lg.WithError(err).Error("could not republish aggregated agreement event")
lg.WithError(err).Error("could broadcast aggregated agreement event")
} else {
lg.
WithField("round", r.Round).
@@ -380,7 +379,7 @@ func (s *Loop) processAggrAgreement(ctx context.Context, h *handler, msg message
}

// Broadcast
m := message.NewWithHeader(topics.AggrAgreement, aggro.Copy(), config.KadcastInitHeader)
m := message.NewWithMetadata(topics.AggrAgreement, aggro.Copy(), msg.Metadata())
if err = e.Republish(m); err != nil {
lg.WithError(err).Errorln("could not republish aggragreement event")
return nil, err
2 changes: 1 addition & 1 deletion pkg/core/consensus/comms.go
Original file line number Diff line number Diff line change
@@ -152,7 +152,7 @@ func (e *Emitter) Kadcast(msg message.Message) error {
return err
}

serialized := message.NewWithHeader(msg.Category(), buf, msg.Header())
serialized := message.NewWithMetadata(msg.Category(), buf, msg.Metadata())
e.EventBus.Publish(topics.Kadcast, serialized)
return nil
}
8 changes: 4 additions & 4 deletions pkg/core/consensus/reduction/firststep/step.go
Original file line number Diff line number Diff line change
@@ -123,7 +123,7 @@ func (p *Phase) Run(ctx context.Context, queue *consensus.Queue, _, reductionCha

// if collectReduction returns a StepVote, it means we reached
// consensus and can go to the next step
if sv := p.collectReduction(ctx, rMsg, r.Round, step, ev.Header()); sv != nil {
if sv := p.collectReduction(ctx, rMsg, r.Round, step, ev.Metadata()); sv != nil {
go func() {
<-timeoutChan
}()
@@ -141,7 +141,7 @@ func (p *Phase) Run(ctx context.Context, queue *consensus.Queue, _, reductionCha
continue
}

sv := p.collectReduction(ctx, rMsg, r.Round, step, ev.Header())
sv := p.collectReduction(ctx, rMsg, r.Round, step, ev.Metadata())
if sv != nil {
// preventing timeout leakage
go func() {
@@ -174,7 +174,7 @@ func (p *Phase) gotoNextPhase(msg *message.StepVotesMsg) consensus.PhaseFn {
return p.next.Initialize(*msg)
}

func (p *Phase) collectReduction(ctx context.Context, r message.Reduction, round uint64, step uint8, msgHeader []byte) *message.StepVotesMsg {
func (p *Phase) collectReduction(ctx context.Context, r message.Reduction, round uint64, step uint8, metadata *message.Metadata) *message.StepVotesMsg {
if err := p.handler.VerifySignature(r.Copy().(message.Reduction)); err != nil {
lg.
WithError(err).
@@ -194,7 +194,7 @@ func (p *Phase) collectReduction(ctx context.Context, r message.Reduction, round
Debug("")
}

m := message.NewWithHeader(topics.Reduction, r, msgHeader)
m := message.NewWithMetadata(topics.Reduction, r, metadata)

// Once the event is verified, we can republish it.
if err := p.Emitter.Republish(m); err != nil {
2 changes: 1 addition & 1 deletion pkg/core/consensus/reduction/reduction.go
Original file line number Diff line number Diff line change
@@ -152,7 +152,7 @@ func (r *Reduction) SendReduction(ctx context.Context, round uint64, step uint8,
red := message.NewReduction(hdr)
red.SignedHash = sig

m := message.NewWithHeader(topics.Reduction, *red, config.KadcastInitHeader)
m := message.New(topics.Reduction, *red)
return m, voteHash, nil
}

11 changes: 5 additions & 6 deletions pkg/core/consensus/reduction/secondstep/step.go
Original file line number Diff line number Diff line change
@@ -12,7 +12,6 @@ import (
"encoding/hex"
"time"

"github.com/dusk-network/dusk-blockchain/pkg/config"
"github.com/dusk-network/dusk-blockchain/pkg/core/consensus"
"github.com/dusk-network/dusk-blockchain/pkg/core/consensus/header"
"github.com/dusk-network/dusk-blockchain/pkg/core/consensus/reduction"
@@ -117,7 +116,7 @@ func (p *Phase) Run(ctx context.Context, queue *consensus.Queue, _, reductionCha

// if collectReduction returns a StepVote, it means we reached
// consensus and can go to the next step
svm := p.collectReduction(rMsg, r.Round, step, ev.Header())
svm := p.collectReduction(rMsg, r.Round, step, ev.Metadata())
if svm == nil {
continue
}
@@ -139,7 +138,7 @@ func (p *Phase) Run(ctx context.Context, queue *consensus.Queue, _, reductionCha
continue
}

svm := p.collectReduction(rMsg, r.Round, step, ev.Header())
svm := p.collectReduction(rMsg, r.Round, step, ev.Metadata())
if svm == nil {
continue
}
@@ -177,7 +176,7 @@ func (p *Phase) Run(ctx context.Context, queue *consensus.Queue, _, reductionCha
}
}

func (p *Phase) collectReduction(r message.Reduction, round uint64, step uint8, msgHeader []byte) *message.StepVotesMsg {
func (p *Phase) collectReduction(r message.Reduction, round uint64, step uint8, metadata *message.Metadata) *message.StepVotesMsg {
hdr := r.State()

if err := p.handler.VerifySignature(r.Copy().(message.Reduction)); err != nil {
@@ -201,7 +200,7 @@ func (p *Phase) collectReduction(r message.Reduction, round uint64, step uint8,
Debug("")
}

m := message.NewWithHeader(topics.Reduction, r.Copy().(message.Reduction), msgHeader)
m := message.NewWithMetadata(topics.Reduction, r.Copy().(message.Reduction), metadata)

// Once the event is verified, we can republish it.
if err := p.Emitter.Republish(m); err != nil {
@@ -279,7 +278,7 @@ func (p *Phase) sendAgreement(round uint64, step uint8, svm *message.StepVotesMs

// Publishing Agreement internally so that it's the internal Agreement process(goroutine)
// that should register it locally and only then broadcast it.
m := message.NewWithHeader(topics.Agreement, *ev, config.KadcastInitHeader)
m := message.New(topics.Agreement, *ev)
p.EventBus.Publish(topics.Agreement, m)
}

11 changes: 5 additions & 6 deletions pkg/core/consensus/selection/step.go
Original file line number Diff line number Diff line change
@@ -15,7 +15,6 @@ import (
"strconv"
"time"

"github.com/dusk-network/dusk-blockchain/pkg/config"
"github.com/dusk-network/dusk-blockchain/pkg/core/candidate"
"github.com/dusk-network/dusk-blockchain/pkg/core/consensus/blockgenerator"
"github.com/dusk-network/dusk-blockchain/pkg/core/consensus/header"
@@ -124,14 +123,14 @@ func (p *Phase) Run(parentCtx context.Context, queue *consensus.Queue, newBlockC
logNewBlock(r.Round, step, scr.State().BlockHash, p.Keys.BLSPubKey)

// Broadcast the candidate block for this round/iteration.
m := message.NewWithHeader(topics.NewBlock, *scr, []byte{config.KadcastInitialHeight})
m := message.New(topics.NewBlock, *scr)
if err := p.Republish(m); err != nil {
lg.WithError(err).
Error("could not republish new block")
}

// register new candidate in local state without propagating it.
m = message.NewWithHeader(topics.NewBlock, *scr, []byte{0})
m = message.NewWithMetadata(topics.NewBlock, *scr, &message.Metadata{KadcastHeight: 0})
newBlockChan <- m
}
}
@@ -149,7 +148,7 @@ func (p *Phase) Run(parentCtx context.Context, queue *consensus.Queue, newBlockC
case ev := <-newBlockChan:
if shouldProcess(ev, r.Round, step, queue) {
b := ev.Payload().(message.NewBlock)
if err := p.collectNewBlock(b, ev.Header()); err != nil {
if err := p.collectNewBlock(b, ev.Metadata()); err != nil {
continue
}

@@ -212,7 +211,7 @@ func (p *Phase) verifyNewBlock(msg message.NewBlock) error {
return nil
}

func (p *Phase) collectNewBlock(msg message.NewBlock, msgHeader []byte) error {
func (p *Phase) collectNewBlock(msg message.NewBlock, metadata *message.Metadata) error {
if err := p.verifyNewBlock(msg); err != nil {
msg.WithFields(lg).
WithField("seed", hex.EncodeToString(p.handler.Seed())).
@@ -237,7 +236,7 @@ func (p *Phase) collectNewBlock(msg message.NewBlock, msgHeader []byte) error {

// Once the event is verified, and has passed all preliminary checks,
// we can republish it to the network.
m := message.NewWithHeader(topics.NewBlock, msg, msgHeader)
m := message.NewWithMetadata(topics.NewBlock, msg, metadata)
if err := p.Republish(m); err != nil {
lg.WithError(err).
Error("could not republish new block")
16 changes: 11 additions & 5 deletions pkg/core/mempool/mempool.go
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"math"
"sync"
"time"

@@ -213,9 +214,12 @@ func (m *Mempool) ProcessTx(srcPeerID string, msg message.Message) ([]bytes.Buff
return nil, errors.New("mempool is full, dropping transaction")
}

var h byte
if len(msg.Header()) > 0 {
h = msg.Header()[0]
// Initializing with MaxUint8 as KadcastInitialHeight will not work.
// This because `h` will be decremented by the kadcast writer as per
// it's interpreted as "the kadcast height at which it's been received"
var h byte = math.MaxUint8
if msg.Metadata() != nil {
h = msg.Metadata().KadcastHeight
}

t := TxDesc{
@@ -508,7 +512,8 @@ func (m *Mempool) kadcastTx(t TxDesc) error {
return err
}

msg := message.NewWithHeader(topics.Tx, *buf, []byte{t.kadHeight})
metadata := message.Metadata{KadcastHeight: t.kadHeight}
msg := message.NewWithMetadata(topics.Tx, *buf, &metadata)

m.eventBus.Publish(topics.Kadcast, msg)
return nil
@@ -536,7 +541,8 @@ func (m *Mempool) RequestUpdates() {
panic(err)
}

msg := message.NewWithHeader(topics.MemPool, buf, []byte{numNodes})
metadata := message.Metadata{NumNodes: numNodes}
msg := message.NewWithMetadata(topics.MemPool, buf, &metadata)
m.eventBus.Publish(topics.KadcastSendToMany, msg)
}

46 changes: 43 additions & 3 deletions pkg/p2p/kadcast/peer_test.go
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ import (
"fmt"
"net"
"testing"
"time"

"google.golang.org/grpc"

@@ -89,6 +90,43 @@ func TestListenStreamReader(t *testing.T) {
srv.Stop()
}

// TestNoBroadcastWriter tests the kadcli.Writer by broadcasting
// a block message that should not be repropagated.
func TestNoBroadcastWriter(t *testing.T) {
// assert := assert.New(t)
rcvChan := make(chan *rusk.BroadcastMessage)

// Basic infrastructure
eb := eventbus.New()
g := protocol.NewGossip()

// create a mock client
cli := NewMockNetworkClient(rcvChan)

// create our kadcli Writer
_ = writer.NewBroadcast(context.Background(), eb, g, cli)

// create a mock message
buf, err := createBlockMessage()
if err != nil {
t.Errorf("fail to create msg: %v", err)
}

// prepare a message to not being repropagated
pubm := message.NewWithMetadata(topics.Block, *buf, &message.Metadata{KadcastHeight: 0})

errList := eb.Publish(topics.Kadcast, pubm)
if len(errList) > 0 {
t.Fatal("error publishing to evt bus")
}

select {
case m := <-rcvChan:
t.Fatal("Received message with height ", m.KadcastHeight)
case <-time.After(3 * time.Second):
}
}

// TestBroadcastWriter tests the kadcli.Writer by broadcasting
// a block message through a mocked rusk client.
func TestBroadcastWriter(t *testing.T) {
@@ -111,8 +149,10 @@ func TestBroadcastWriter(t *testing.T) {
t.Errorf("fail to create msg: %v", err)
}

var testBroadcastHeight uint32 = 5

// send a broadcast message
pubm := message.NewWithHeader(topics.Block, *buf, []byte{127})
pubm := message.NewWithMetadata(topics.Block, *buf, &message.Metadata{KadcastHeight: byte(testBroadcastHeight)})

errList := eb.Publish(topics.Kadcast, pubm)
if len(errList) > 0 {
@@ -121,7 +161,7 @@ func TestBroadcastWriter(t *testing.T) {

// process status/output
m := <-rcvChan
assert.True(m.KadcastHeight == 127-1)
assert.True(m.KadcastHeight == testBroadcastHeight-1)

// attempt to read the message
reader := bytes.NewReader(m.Message)
@@ -149,7 +189,7 @@ func TestBroadcastWriter(t *testing.T) {
topic := topics.Topic(rb.Bytes()[0])
assert.True(topic == topics.Block)
// unmarshal message
res, err := message.Unmarshal(rb, []byte{})
res, err := message.Unmarshal(rb, nil)
if err != nil {
t.Error("failed to unmarshal")
}
9 changes: 6 additions & 3 deletions pkg/p2p/kadcast/reader.go
Original file line number Diff line number Diff line change
@@ -97,10 +97,13 @@ func (r *Reader) processMessage(msg *rusk.Message) {
return
}

h := []byte{byte(msg.Metadata.KadcastHeight)}
metadata := message.Metadata{
KadcastHeight: byte(msg.Metadata.KadcastHeight),
Source: msg.Metadata.SrcAddress,
}

// collect (process) the message
respBufs, err := r.processor.Collect(msg.Metadata.SrcAddress, m, nil, protocol.FullNode, h)
respBufs, err := r.processor.Collect(msg.Metadata.SrcAddress, m, nil, protocol.FullNode, &metadata)
if err != nil {
var topic string
if len(m) > 0 {
@@ -117,7 +120,7 @@ func (r *Reader) processMessage(msg *rusk.Message) {
for i := 0; i < len(respBufs); i++ {
log.WithField("r_addr", msg.Metadata.SrcAddress).Trace("send point-to-point message")
// send Kadcast point-to-point message with source address as destination
msg := message.NewWithHeader(topics.KadcastSendToOne, respBufs[i], []byte(msg.Metadata.SrcAddress))
msg := message.NewWithMetadata(topics.KadcastSendToOne, respBufs[i], &metadata)
r.publisher.Publish(topics.KadcastSendToOne, msg)
}
}
34 changes: 16 additions & 18 deletions pkg/p2p/kadcast/writer/broadcast.go
Original file line number Diff line number Diff line change
@@ -9,8 +9,9 @@ package writer
import (
"bytes"
"context"
"errors"

"github.com/dusk-network/dusk-blockchain/pkg/config"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/protocol"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics"
"github.com/dusk-network/dusk-blockchain/pkg/util/container/ring"
@@ -64,8 +65,8 @@ func (w *Broadcast) Subscribe() {
}

// Write implements. ring.Writer.
func (w *Broadcast) Write(data, header []byte, priority byte) (int, error) {
if err := w.broadcast(data, header, priority); err != nil {
func (w *Broadcast) Write(data []byte, metadata *message.Metadata, priority byte) (int, error) {
if err := w.broadcast(data, metadata, priority); err != nil {
// A returned error here is treated as unrecoverable err.
log.WithError(err).WithField("handler", w.topic.String()).Warn("write failed")
}
@@ -74,24 +75,21 @@ func (w *Broadcast) Write(data, header []byte, priority byte) (int, error) {
}

// broadcast broadcasts message to the entire network.
// The kadcast height is read from message Header.
func (w *Broadcast) broadcast(data, header []byte, _ byte) error {
// check header
if len(header) == 0 {
return errors.New("empty message header")
}
// The kadcast height is read from message metadata.
func (w *Broadcast) broadcast(data []byte, metadata *message.Metadata, _ byte) error {
h := config.KadcastInitialHeight

// extract kadcast height
h := uint32(header[0])
if h == 0 {
// Apparently, this node is the last peer in a bucket of height 0. We
// should not repropagate.
return nil
if metadata != nil {
if metadata.KadcastHeight == 0 {
// Apparently, this node is the last peer in a bucket of height 0. We
// should not repropagate.
return nil
}
// Decrement kadcast height
h = metadata.KadcastHeight - 1
}

// Decrement kadcast height
h--

// create the message
b := bytes.NewBuffer(data)
if err := w.gossip.Process(b); err != nil {
@@ -100,7 +98,7 @@ func (w *Broadcast) broadcast(data, header []byte, _ byte) error {

// prepare message
m := &rusk.BroadcastMessage{
KadcastHeight: h,
KadcastHeight: uint32(h),
Message: b.Bytes(),
}
// broadcast message
13 changes: 7 additions & 6 deletions pkg/p2p/kadcast/writer/sendmany.go
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ import (
"context"
"errors"

"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/protocol"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics"
"github.com/dusk-network/dusk-blockchain/pkg/util/container/ring"
@@ -46,22 +47,22 @@ func (w *SendToMany) Subscribe() {
}

// Write ...
func (w *SendToMany) Write(data, header []byte, priority byte) (int, error) {
if err := w.sendToMany(data, header, priority); err != nil {
func (w *SendToMany) Write(data []byte, metadata *message.Metadata, priority byte) (int, error) {
if err := w.sendToMany(data, metadata, priority); err != nil {
log.WithError(err).Warn("write failed")
}

return 0, nil
}

// sendToMany sends a message to N random endpoints returned by AliveNodes.
func (w *SendToMany) sendToMany(data, header []byte, _ byte) error {
if len(header) == 0 || header[0] == 0 {
return errors.New("empty message header")
func (w *SendToMany) sendToMany(data []byte, metadata *message.Metadata, _ byte) error {
if metadata == nil {
return errors.New("empty message metadata")
}

// get N active nodes
req := &rusk.AliveNodesRequest{MaxNodes: uint32(header[0])}
req := &rusk.AliveNodesRequest{MaxNodes: uint32(metadata.NumNodes)}

resp, err := w.client.AliveNodes(w.ctx, req)
if err != nil {
13 changes: 7 additions & 6 deletions pkg/p2p/kadcast/writer/sendone.go
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ import (
"context"
"errors"

"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/protocol"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics"
"github.com/dusk-network/dusk-blockchain/pkg/util/container/ring"
@@ -45,18 +46,18 @@ func (w *SendToOne) Subscribe() {
}

// Write implements. ring.Writer.
func (w *SendToOne) Write(data, header []byte, priority byte) (int, error) {
if err := w.sendToOne(data, header, priority); err != nil {
func (w *SendToOne) Write(data []byte, metadata *message.Metadata, priority byte) (int, error) {
if err := w.sendToOne(data, metadata, priority); err != nil {
log.WithError(err).Warn("write failed")
}

return 0, nil
}

func (w *SendToOne) sendToOne(data, header []byte, _ byte) error {
if len(header) == 0 {
return errors.New("empty message header")
func (w *SendToOne) sendToOne(data []byte, metadata *message.Metadata, _ byte) error {
if metadata == nil {
return errors.New("empty message metadata")
}

return w.Send(data, string(header))
return w.Send(data, metadata.Source)
}
3 changes: 2 additions & 1 deletion pkg/p2p/peer/peer.go
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ import (
log "github.com/sirupsen/logrus"

"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/checksum"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/protocol"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics"
"github.com/dusk-network/dusk-blockchain/pkg/util/container/ring"
@@ -62,7 +63,7 @@ type GossipConnector struct {
*Connection
}

func (g *GossipConnector) Write(b, header []byte, priority byte) (int, error) {
func (g *GossipConnector) Write(b []byte, _ *message.Metadata, priority byte) (int, error) {
if !canRoute(g.services, topics.Topic(b[0])) {
return 0, nil
}
4 changes: 2 additions & 2 deletions pkg/p2p/peer/processor.go
Original file line number Diff line number Diff line change
@@ -49,7 +49,7 @@ func (m *MessageProcessor) Register(topic topics.Topic, fn ProcessorFunc) {

// Collect a message from the network. The message is unmarshaled and passed down
// to the processing function.
func (m *MessageProcessor) Collect(srcPeerID string, packet []byte, respRingBuf *ring.Buffer, services protocol.ServiceFlag, header []byte) ([]bytes.Buffer, error) {
func (m *MessageProcessor) Collect(srcPeerID string, packet []byte, respRingBuf *ring.Buffer, services protocol.ServiceFlag, metadata *message.Metadata) ([]bytes.Buffer, error) {
if len(packet) == 0 {
return nil, errors.New("empty packet provided")
}
@@ -58,7 +58,7 @@ func (m *MessageProcessor) Collect(srcPeerID string, packet []byte, respRingBuf
b := bytes.NewBuffer(packet)
topic := topics.Topic(b.Bytes()[0])

msg, err := message.Unmarshal(b, header)
msg, err := message.Unmarshal(b, metadata)
if err != nil {
return nil, fmt.Errorf("error while unmarshaling: %s - topic: %s", err, topic)
}
22 changes: 9 additions & 13 deletions pkg/p2p/wire/message/message.go
Original file line number Diff line number Diff line change
@@ -53,7 +53,7 @@ type Message interface {
// created internally and never serialized, this should return an empty buffer.
CachedBinary() bytes.Buffer

Header() []byte
Metadata() *Metadata
}

// Serializable allows to set a payload.
@@ -81,8 +81,8 @@ type simple struct {
payload payload.Safe
// cached marshaled form with Category.
marshaled *bytes.Buffer
// header used as metadata (e.g kadcast.height).
header []byte
// metadata from kadcast network.
metadata *Metadata
}

// Clone creates a new Message which carries a copy of the payload.
@@ -97,7 +97,7 @@ func Clone(m Message) (Message, error) {
category: m.Category(),
marshaled: &b,
payload: m.Payload().Copy(),
header: m.Header(),
metadata: m.Metadata(),
}, nil
}

@@ -135,10 +135,6 @@ func (m simple) String() string {
return sb.String()
}

func (m simple) Header() []byte {
return m.header
}

// Id is the Id the Message.
// nolint:golint
func (m simple) Id() []byte {
@@ -215,10 +211,10 @@ func New(top topics.Topic, p interface{}) Message {
return &simple{category: top, payload: safePayload}
}

// NewWithHeader creates a new Message with non-nil header.
func NewWithHeader(t topics.Topic, payload interface{}, header []byte) Message {
// NewWithMetadata creates a new Message with non-nil metadata.
func NewWithMetadata(t topics.Topic, payload interface{}, metadata *Metadata) Message {
safePayload := convertToSafePayload(payload)
return &simple{category: t, payload: safePayload, header: header}
return &simple{category: t, payload: safePayload, metadata: metadata}
}

func (m *simple) initPayloadBuffer(b bytes.Buffer) {
@@ -230,10 +226,10 @@ func (m *simple) initPayloadBuffer(b bytes.Buffer) {
// Unmarshal mutates the buffer by extracting the topic. It create the Message
// by setting the topic and unmarshaling the payload into the proper structure
// It also caches the serialized form within the message.
func Unmarshal(b *bytes.Buffer, h []byte) (Message, error) {
func Unmarshal(b *bytes.Buffer, h *Metadata) (Message, error) {
var err error

msg := &simple{header: h}
msg := &simple{metadata: h}
msg.initPayloadBuffer(*b)

topic, err := topics.Extract(b)
18 changes: 18 additions & 0 deletions pkg/p2p/wire/message/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT License was not distributed with this
// file, you can obtain one at https://opensource.org/licenses/MIT.
//
// Copyright (c) DUSK NETWORK. All rights reserved.

package message

// Metadata is a struct containing messages metadata.
type Metadata struct {
KadcastHeight byte
Source string
NumNodes byte
}

func (m simple) Metadata() *Metadata {
return m.metadata
}
3 changes: 1 addition & 2 deletions pkg/p2p/wire/message/transactions_test.go
Original file line number Diff line number Diff line change
@@ -11,7 +11,6 @@ import (
"encoding/hex"
"testing"

"github.com/dusk-network/dusk-blockchain/pkg/config"
"github.com/dusk-network/dusk-blockchain/pkg/core/data/ipc/transactions"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/checksum"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message"
@@ -63,7 +62,7 @@ func TestWireTransaction(t *testing.T) {

buffer := bytes.NewBuffer(m)

message, err := message.Unmarshal(buffer, config.KadcastInitHeader)
message, err := message.Unmarshal(buffer, nil)
if err != nil {
t.Fatalf("Unable to unmarshal: %v", err)
}
4 changes: 3 additions & 1 deletion pkg/util/container/ring/buffer.go
Original file line number Diff line number Diff line change
@@ -10,12 +10,14 @@ import (
"bytes"
"sync"
"sync/atomic"

"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message"
)

// Elem single data unit of a ring buffer.
type Elem struct {
Data []byte
Header []byte
Metadata *message.Metadata
Priority byte
}

4 changes: 3 additions & 1 deletion pkg/util/container/ring/consumer.go
Original file line number Diff line number Diff line change
@@ -8,11 +8,13 @@ package ring

import (
"sort"

"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message"
)

// Writer defines a Writer interface compatible with ring.Elem.
type Writer interface {
Write(data, header []byte, priority byte) (int, error)
Write(data []byte, metadata *message.Metadata, priority byte) (int, error)
Close() error
}

2 changes: 1 addition & 1 deletion pkg/util/nativeutils/eventbus/eventbus_test.go
Original file line number Diff line number Diff line change
@@ -185,7 +185,7 @@ func TestExitChan(t *testing.T) {

type mockWriteCloser struct{}

func (m *mockWriteCloser) Write(data, header []byte, priority byte) (int, error) {
func (m *mockWriteCloser) Write(data []byte, _ *message.Metadata, priority byte) (int, error) {
return 0, errors.New("failed")
}

4 changes: 2 additions & 2 deletions pkg/util/nativeutils/eventbus/listener.go
Original file line number Diff line number Diff line change
@@ -123,7 +123,7 @@ func (s *StreamListener) Notify(m message.Message) error {

e := ring.Elem{
Data: buf.Bytes(),
Header: m.Header(),
Metadata: m.Metadata(),
Priority: 0,
}

@@ -153,7 +153,7 @@ func (s *StreamListener) Close() {
// Consume an item by writing it to the specified WriteCloser. This is used in the StreamListener creation.
func Consume(elems []ring.Elem, w ring.Writer) bool {
for _, e := range elems {
if _, err := w.Write(e.Data, e.Header, e.Priority); err != nil {
if _, err := w.Write(e.Data, e.Metadata, e.Priority); err != nil {
logEB.WithField("queue", "ringbuffer").WithError(err).Warnln("error in writing to WriteCloser")
return false
}
2 changes: 1 addition & 1 deletion pkg/util/nativeutils/eventbus/mock.go
Original file line number Diff line number Diff line change
@@ -149,7 +149,7 @@ func NewSimpleStreamer() *SimpleStreamer {

// Write receives the packets from the ringbuffer and writes it on the internal
// pipe immediately.
func (ms *SimpleStreamer) Write(data, header []byte, priority byte) (n int, err error) {
func (ms *SimpleStreamer) Write(data []byte, _ *message.Metadata, priority byte) (n int, err error) {
b := bytes.NewBuffer(data)
if e := ms.gossip.Process(b); e != nil {
return 0, e

0 comments on commit bbc400e

Please sign in to comment.