Skip to content

Commit

Permalink
Merge pull request #1453 from dusk-network/metadata-1452
Browse files Browse the repository at this point in the history
Propagate message metadata
  • Loading branch information
goshawk-3 authored Aug 12, 2022
2 parents 95a5d5c + c6688ac commit d3f3348
Show file tree
Hide file tree
Showing 30 changed files with 191 additions and 133 deletions.
5 changes: 1 addition & 4 deletions pkg/config/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const (
MaxBlockTime = 360 // maximum block time in seconds

// KadcastInitialHeight sets the default initial height for Kadcast broadcast algorithm.
KadcastInitialHeight byte = 128 + 1
KadcastInitialHeight byte = 128

// The dusk-blockchain executable version.
NodeVersion = "0.6.0-rc"
Expand Down Expand Up @@ -63,6 +63,3 @@ const (
// GetCandidateReceivers is a redundancy factor on retrieving a missing candidate block.
GetCandidateReceivers = 7
)

// KadcastInitHeader is used as default initial kadcast message header.
var KadcastInitHeader = []byte{KadcastInitialHeight}
4 changes: 2 additions & 2 deletions pkg/core/candidate/requestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (r *Requestor) publishGetCandidate(hash []byte) error {
return err
}

m := message.NewWithHeader(topics.GetCandidate, *buf, config.KadcastInitHeader)
m := message.New(topics.GetCandidate, *buf)

r.publisher.Publish(topics.Kadcast, m)
return nil
Expand All @@ -102,7 +102,7 @@ func (r *Requestor) sendGetCandidate(hash []byte) error {
return err
}

msg := message.NewWithHeader(topics.GetCandidate, buf, []byte{config.GetCandidateReceivers})
msg := message.NewWithMetadata(topics.GetCandidate, buf, &message.Metadata{NumNodes: config.GetCandidateReceivers})
r.publisher.Publish(topics.KadcastSendToMany, msg)
return nil
}
Expand Down
28 changes: 12 additions & 16 deletions pkg/core/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,8 @@ func (c *Chain) ProcessBlockFromNetwork(srcPeerID string, m message.Message) ([]
l := log.WithField("recv_blk_h", blk.Header.Height).
WithField("curr_h", c.tip.Header.Height)

var kh byte = 255
if len(m.Header()) > 0 {
kh = m.Header()[0]
l = l.WithField("kad_h", kh)
if m.Metadata() != nil {
l = l.WithField("kad_h", m.Metadata().KadcastHeight)
}

l.Trace("block received")
Expand Down Expand Up @@ -293,7 +291,7 @@ func (c *Chain) ProcessBlockFromNetwork(srcPeerID string, m message.Message) ([]
// out if any other node propagates it back when this node is syncing up.
c.blacklisted.Add(bytes.NewBuffer(hash))

return c.synchronizer.processBlock(srcPeerID, c.tip.Header.Height, blk, kh)
return c.synchronizer.processBlock(srcPeerID, c.tip.Header.Height, blk, m.Metadata())
}
case blk.Header.Height < c.tip.Header.Height:
l.Debug("discard block")
Expand All @@ -318,22 +316,22 @@ func (c *Chain) ProcessBlockFromNetwork(srcPeerID string, m message.Message) ([]
c.highestSeen = blk.Header.Height
}

return c.synchronizer.processBlock(srcPeerID, c.tip.Header.Height, blk, kh)
return c.synchronizer.processBlock(srcPeerID, c.tip.Header.Height, blk, m.Metadata())
}

// TryNextConsecutiveBlockOutSync is the processing path for accepting a block
// from the network during out-of-sync state.
func (c *Chain) TryNextConsecutiveBlockOutSync(blk block.Block, kadcastHeight byte) error {
func (c *Chain) TryNextConsecutiveBlockOutSync(blk block.Block, metadata *message.Metadata) error {
log.WithField("height", blk.Header.Height).Trace("accepting sync block")
return c.acceptBlock(blk, true)
}

// TryNextConsecutiveBlockInSync is the processing path for accepting a block
// from the network during in-sync state. Returns err if the block is not valid.
func (c *Chain) TryNextConsecutiveBlockInSync(blk block.Block, kadcastHeight byte) error {
func (c *Chain) TryNextConsecutiveBlockInSync(blk block.Block, metadata *message.Metadata) error {
// Make an attempt to accept a new block. If succeeds, we could safely restart the Consensus Loop.
// If not, peer reputation score should be decreased.
if err := c.acceptSuccessiveBlock(blk, kadcastHeight); err != nil {
if err := c.acceptSuccessiveBlock(blk, metadata); err != nil {
return err
}

Expand Down Expand Up @@ -385,15 +383,15 @@ func (c *Chain) ProcessSyncTimerExpired(strPeerAddr string) error {

// acceptSuccessiveBlock will accept a block which directly follows the chain
// tip, and advertises it to the node's peers.
func (c *Chain) acceptSuccessiveBlock(blk block.Block, kadcastHeight byte) error {
func (c *Chain) acceptSuccessiveBlock(blk block.Block, metadata *message.Metadata) error {
log.WithField("height", blk.Header.Height).Trace("accepting succeeding block")

if err := c.isValidHeader(blk, *c.tip, *c.p, log, true); err != nil {
log.WithError(err).Error("invalid block")
return err
}

if err := c.kadcastBlock(blk, kadcastHeight); err != nil {
if err := c.kadcastBlock(blk, metadata); err != nil {
log.WithError(err).Error("block propagation failed")
return err
}
Expand Down Expand Up @@ -735,9 +733,8 @@ func (c *Chain) ExecuteStateTransition(ctx context.Context, txs []transactions.C
return c.proxy.Executor().ExecuteStateTransition(c.ctx, txs, blockGasLimit, blockHeight, generator)
}

func (c *Chain) kadcastBlock(blk block.Block, kadcastHeight byte) error {
log.WithField("blk_height", blk.Header.Height).
WithField("kadcast_h", kadcastHeight).Trace("propagate block")
func (c *Chain) kadcastBlock(blk block.Block, metadata *message.Metadata) error {
log.WithField("blk_height", blk.Header.Height).Trace("propagate block")

buf := new(bytes.Buffer)
if err := message.MarshalBlock(buf, &blk); err != nil {
Expand All @@ -748,8 +745,7 @@ func (c *Chain) kadcastBlock(blk block.Block, kadcastHeight byte) error {
return err
}

c.eventBus.Publish(topics.Kadcast,
message.NewWithHeader(topics.Block, *buf, []byte{kadcastHeight}))
c.eventBus.Publish(topics.Kadcast, message.NewWithMetadata(topics.Block, *buf, metadata))
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/core/chain/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (c *Chain) acceptConsensusResults(ctx context.Context, winnerChan chan cons
return
}

if err = c.acceptSuccessiveBlock(block, config.KadcastInitialHeight); err != nil {
if err = c.acceptSuccessiveBlock(block, nil); err != nil {
log.WithError(err).Error("block acceptance failed")
c.lock.Unlock()
return
Expand Down
9 changes: 6 additions & 3 deletions pkg/core/chain/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@

package chain

import "github.com/dusk-network/dusk-blockchain/pkg/core/data/block"
import (
"github.com/dusk-network/dusk-blockchain/pkg/core/data/block"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message"
)

// Ledger is the Chain interface used in tests.
type Ledger interface {
TryNextConsecutiveBlockInSync(blk block.Block, kadcastHeight byte) error
TryNextConsecutiveBlockOutSync(blk block.Block, kadcastHeight byte) error
TryNextConsecutiveBlockInSync(blk block.Block, metadata *message.Metadata) error
TryNextConsecutiveBlockOutSync(blk block.Block, metadata *message.Metadata) error
TryNextConsecutiveBlockIsValid(blk block.Block) error

// RestartConsensus Stop and Start Consensus.
Expand Down
18 changes: 9 additions & 9 deletions pkg/core/chain/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ const (

var slog = logrus.WithField("process", "sync")

type syncState func(srcPeerAddr string, currentHeight uint64, blk block.Block, kadcastHeight byte) ([]bytes.Buffer, error)
type syncState func(srcPeerAddr string, currentHeight uint64, blk block.Block, metadata *message.Metadata) ([]bytes.Buffer, error)

func (s *synchronizer) inSync(srcPeerAddr string, currentHeight uint64, blk block.Block, kadcastHeight byte) ([]bytes.Buffer, error) {
func (s *synchronizer) inSync(srcPeerAddr string, currentHeight uint64, blk block.Block, metadata *message.Metadata) ([]bytes.Buffer, error) {
if blk.Header.Height > currentHeight+1 {
s.sequencer.add(blk)

Expand All @@ -43,12 +43,12 @@ func (s *synchronizer) inSync(srcPeerAddr string, currentHeight uint64, blk bloc
s.timer.Start(srcPeerAddr)

s.state = s.outSync
b, err := s.startSync(srcPeerAddr, blk.Header.Height, currentHeight, kadcastHeight)
b, err := s.startSync(srcPeerAddr, blk.Header.Height, currentHeight, metadata)
return b, err
}

// Otherwise notify the chain (and the consensus loop).
if err := s.chain.TryNextConsecutiveBlockInSync(blk, kadcastHeight); err != nil {
if err := s.chain.TryNextConsecutiveBlockInSync(blk, metadata); err != nil {
slog.WithField("blk_height", blk.Header.Height).
WithField("blk_hash", hex.EncodeToString(blk.Header.Hash)).
WithField("state", "insync").
Expand All @@ -60,7 +60,7 @@ func (s *synchronizer) inSync(srcPeerAddr string, currentHeight uint64, blk bloc
return nil, nil
}

func (s *synchronizer) outSync(srcPeerAddr string, currentHeight uint64, blk block.Block, kadcastHeight byte) ([]bytes.Buffer, error) {
func (s *synchronizer) outSync(srcPeerAddr string, currentHeight uint64, blk block.Block, metadata *message.Metadata) ([]bytes.Buffer, error) {
var err error

// Once we validate successfully the next block from the syncing
Expand Down Expand Up @@ -102,7 +102,7 @@ func (s *synchronizer) outSync(srcPeerAddr string, currentHeight uint64, blk blo

for _, blk := range blks {
// append them all to the ledger
if err = s.chain.TryNextConsecutiveBlockOutSync(blk, kadcastHeight); err != nil {
if err = s.chain.TryNextConsecutiveBlockOutSync(blk, metadata); err != nil {
slog.WithError(err).WithField("state", "outsync").
Warn("could not accept block")

Expand Down Expand Up @@ -171,17 +171,17 @@ func newSynchronizer(db database.DB, chain Ledger) *synchronizer {
}

// processBlock handles an incoming block from the network.
func (s *synchronizer) processBlock(srcPeerID string, currentHeight uint64, blk block.Block, kadcastHeight byte) (res []bytes.Buffer, err error) {
func (s *synchronizer) processBlock(srcPeerID string, currentHeight uint64, blk block.Block, metadata *message.Metadata) (res []bytes.Buffer, err error) {
// Clean up sequencer
s.sequencer.cleanup(currentHeight)
s.sequencer.dump()

currState := s.state
res, err = currState(srcPeerID, currentHeight, blk, kadcastHeight)
res, err = currState(srcPeerID, currentHeight, blk, metadata)
return
}

func (s *synchronizer) startSync(strPeerAddr string, tipHeight, currentHeight uint64, _ byte) ([]bytes.Buffer, error) {
func (s *synchronizer) startSync(strPeerAddr string, tipHeight, currentHeight uint64, _ *message.Metadata) ([]bytes.Buffer, error) {
s.hrange.from = currentHeight
s.setSyncTarget(tipHeight, currentHeight+config.MaxInvBlocks)

Expand Down
9 changes: 5 additions & 4 deletions pkg/core/chain/synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/dusk-network/dusk-blockchain/pkg/core/database"
"github.com/dusk-network/dusk-blockchain/pkg/core/database/lite"
"github.com/dusk-network/dusk-blockchain/pkg/core/tests/helper"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics"
assert "github.com/stretchr/testify/require"
)
Expand All @@ -25,7 +26,7 @@ func TestSuccessiveBlocks(t *testing.T) {

// tipHeight will be 0, so make the successive block
blk := helper.RandomBlock(1, 1)
res, err := s.processBlock("", 0, *blk, 0)
res, err := s.processBlock("", 0, *blk, nil)
assert.NoError(err)
assert.Nil(res)

Expand All @@ -40,7 +41,7 @@ func TestFutureBlocks(t *testing.T) {

height := uint64(10)
blk := helper.RandomBlock(height, 1)
resp, err := s.processBlock("", 0, *blk, 0)
resp, err := s.processBlock("", 0, *blk, nil)
assert.NoError(err)

// Response should be of the GetBlocks topic
Expand Down Expand Up @@ -75,12 +76,12 @@ func (m *mockChain) CurrentHeight() uint64 {
return m.tipHeight
}

func (m *mockChain) TryNextConsecutiveBlockInSync(blk block.Block, _ byte) error {
func (m *mockChain) TryNextConsecutiveBlockInSync(blk block.Block, _ *message.Metadata) error {
m.catchBlockChan <- consensus.Results{Blk: blk, Err: nil}
return nil
}

func (m *mockChain) TryNextConsecutiveBlockOutSync(_ block.Block, _ byte) error {
func (m *mockChain) TryNextConsecutiveBlockOutSync(_ block.Block, _ *message.Metadata) error {
return nil
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/core/consensus/agreement/agreement_in_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"testing"

"github.com/dusk-network/bls12_381-sign/go/cgo/bls"
"github.com/dusk-network/dusk-blockchain/pkg/config"
"github.com/dusk-network/dusk-blockchain/pkg/core/consensus/header"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics"
Expand Down Expand Up @@ -90,7 +89,7 @@ func TestAccumulatorProcessingAggregation(t *testing.T) {
// Verify certificate
comm = handler.Committee(hdr.Round, hdr.Step)

msg := message.NewWithHeader(topics.AggrAgreement, aggro, config.KadcastInitHeader)
msg := message.New(topics.AggrAgreement, aggro)
buf, err := message.Marshal(msg)
assert.Nil(t, err, "failed to marshal aggragreement")

Expand Down
9 changes: 4 additions & 5 deletions pkg/core/consensus/agreement/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"time"

"github.com/dusk-network/bls12_381-sign/go/cgo/bls"
"github.com/dusk-network/dusk-blockchain/pkg/config"
"github.com/dusk-network/dusk-blockchain/pkg/core/candidate"
"github.com/dusk-network/dusk-blockchain/pkg/core/consensus"
"github.com/dusk-network/dusk-blockchain/pkg/core/consensus/header"
Expand Down Expand Up @@ -244,7 +243,7 @@ func collectAgreement(h *handler, accumulator *Accumulator, ev message.Message,
return
}

m := message.NewWithHeader(topics.Agreement, a.Copy().(message.Agreement), ev.Header())
m := message.NewWithMetadata(topics.Agreement, a.Copy().(message.Agreement), ev.Metadata())

// Once the event is verified, we can republish it.
if err := e.Republish(m); err != nil {
Expand Down Expand Up @@ -288,9 +287,9 @@ func (s *Loop) processCollectedVotes(ctx context.Context, handler *handler, evs
bits := comm.Bits(*pubs)
agAgreement := message.NewAggrAgreement(evs[0], bits, sig)

m := message.NewWithHeader(topics.AggrAgreement, agAgreement, config.KadcastInitHeader)
m := message.New(topics.AggrAgreement, agAgreement)
if err := s.Emitter.Republish(m); err != nil {
lg.WithError(err).Error("could not republish aggregated agreement event")
lg.WithError(err).Error("could broadcast aggregated agreement event")
} else {
lg.
WithField("round", r.Round).
Expand Down Expand Up @@ -380,7 +379,7 @@ func (s *Loop) processAggrAgreement(ctx context.Context, h *handler, msg message
}

// Broadcast
m := message.NewWithHeader(topics.AggrAgreement, aggro.Copy(), config.KadcastInitHeader)
m := message.NewWithMetadata(topics.AggrAgreement, aggro.Copy(), msg.Metadata())
if err = e.Republish(m); err != nil {
lg.WithError(err).Errorln("could not republish aggragreement event")
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/consensus/comms.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (e *Emitter) Kadcast(msg message.Message) error {
return err
}

serialized := message.NewWithHeader(msg.Category(), buf, msg.Header())
serialized := message.NewWithMetadata(msg.Category(), buf, msg.Metadata())
e.EventBus.Publish(topics.Kadcast, serialized)
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/core/consensus/reduction/firststep/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (p *Phase) Run(ctx context.Context, queue *consensus.Queue, _, reductionCha

// if collectReduction returns a StepVote, it means we reached
// consensus and can go to the next step
if sv := p.collectReduction(ctx, rMsg, r.Round, step, ev.Header()); sv != nil {
if sv := p.collectReduction(ctx, rMsg, r.Round, step, ev.Metadata()); sv != nil {
go func() {
<-timeoutChan
}()
Expand All @@ -141,7 +141,7 @@ func (p *Phase) Run(ctx context.Context, queue *consensus.Queue, _, reductionCha
continue
}

sv := p.collectReduction(ctx, rMsg, r.Round, step, ev.Header())
sv := p.collectReduction(ctx, rMsg, r.Round, step, ev.Metadata())
if sv != nil {
// preventing timeout leakage
go func() {
Expand Down Expand Up @@ -174,7 +174,7 @@ func (p *Phase) gotoNextPhase(msg *message.StepVotesMsg) consensus.PhaseFn {
return p.next.Initialize(*msg)
}

func (p *Phase) collectReduction(ctx context.Context, r message.Reduction, round uint64, step uint8, msgHeader []byte) *message.StepVotesMsg {
func (p *Phase) collectReduction(ctx context.Context, r message.Reduction, round uint64, step uint8, metadata *message.Metadata) *message.StepVotesMsg {
if err := p.handler.VerifySignature(r.Copy().(message.Reduction)); err != nil {
lg.
WithError(err).
Expand All @@ -194,7 +194,7 @@ func (p *Phase) collectReduction(ctx context.Context, r message.Reduction, round
Debug("")
}

m := message.NewWithHeader(topics.Reduction, r, msgHeader)
m := message.NewWithMetadata(topics.Reduction, r, metadata)

// Once the event is verified, we can republish it.
if err := p.Emitter.Republish(m); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/consensus/reduction/reduction.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (r *Reduction) SendReduction(ctx context.Context, round uint64, step uint8,
red := message.NewReduction(hdr)
red.SignedHash = sig

m := message.NewWithHeader(topics.Reduction, *red, config.KadcastInitHeader)
m := message.New(topics.Reduction, *red)
return m, voteHash, nil
}

Expand Down
Loading

0 comments on commit d3f3348

Please sign in to comment.