Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate message metadata #1453

Merged
merged 2 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions pkg/config/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const (
MaxBlockTime = 360 // maximum block time in seconds

// KadcastInitialHeight sets the default initial height for Kadcast broadcast algorithm.
KadcastInitialHeight byte = 128 + 1
KadcastInitialHeight byte = 128

// The dusk-blockchain executable version.
NodeVersion = "0.6.0-rc"
Expand Down Expand Up @@ -63,6 +63,3 @@ const (
// GetCandidateReceivers is a redundancy factor on retrieving a missing candidate block.
GetCandidateReceivers = 7
)

// KadcastInitHeader is used as default initial kadcast message header.
var KadcastInitHeader = []byte{KadcastInitialHeight}
4 changes: 2 additions & 2 deletions pkg/core/candidate/requestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (r *Requestor) publishGetCandidate(hash []byte) error {
return err
}

m := message.NewWithHeader(topics.GetCandidate, *buf, config.KadcastInitHeader)
m := message.New(topics.GetCandidate, *buf)

r.publisher.Publish(topics.Kadcast, m)
return nil
Expand All @@ -102,7 +102,7 @@ func (r *Requestor) sendGetCandidate(hash []byte) error {
return err
}

msg := message.NewWithHeader(topics.GetCandidate, buf, []byte{config.GetCandidateReceivers})
msg := message.NewWithMetadata(topics.GetCandidate, buf, &message.Metadata{NumNodes: config.GetCandidateReceivers})
r.publisher.Publish(topics.KadcastSendToMany, msg)
return nil
}
Expand Down
28 changes: 12 additions & 16 deletions pkg/core/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,8 @@ func (c *Chain) ProcessBlockFromNetwork(srcPeerID string, m message.Message) ([]
l := log.WithField("recv_blk_h", blk.Header.Height).
WithField("curr_h", c.tip.Header.Height)

var kh byte = 255
if len(m.Header()) > 0 {
kh = m.Header()[0]
l = l.WithField("kad_h", kh)
if m.Metadata() != nil {
l = l.WithField("kad_h", m.Metadata().KadcastHeight)
}

l.Trace("block received")
Expand Down Expand Up @@ -293,7 +291,7 @@ func (c *Chain) ProcessBlockFromNetwork(srcPeerID string, m message.Message) ([]
// out if any other node propagates it back when this node is syncing up.
c.blacklisted.Add(bytes.NewBuffer(hash))

return c.synchronizer.processBlock(srcPeerID, c.tip.Header.Height, blk, kh)
return c.synchronizer.processBlock(srcPeerID, c.tip.Header.Height, blk, m.Metadata())
}
case blk.Header.Height < c.tip.Header.Height:
l.Debug("discard block")
Expand All @@ -318,22 +316,22 @@ func (c *Chain) ProcessBlockFromNetwork(srcPeerID string, m message.Message) ([]
c.highestSeen = blk.Header.Height
}

return c.synchronizer.processBlock(srcPeerID, c.tip.Header.Height, blk, kh)
return c.synchronizer.processBlock(srcPeerID, c.tip.Header.Height, blk, m.Metadata())
}

// TryNextConsecutiveBlockOutSync is the processing path for accepting a block
// from the network during out-of-sync state.
func (c *Chain) TryNextConsecutiveBlockOutSync(blk block.Block, kadcastHeight byte) error {
func (c *Chain) TryNextConsecutiveBlockOutSync(blk block.Block, metadata *message.Metadata) error {
log.WithField("height", blk.Header.Height).Trace("accepting sync block")
return c.acceptBlock(blk, true)
}

// TryNextConsecutiveBlockInSync is the processing path for accepting a block
// from the network during in-sync state. Returns err if the block is not valid.
func (c *Chain) TryNextConsecutiveBlockInSync(blk block.Block, kadcastHeight byte) error {
func (c *Chain) TryNextConsecutiveBlockInSync(blk block.Block, metadata *message.Metadata) error {
// Make an attempt to accept a new block. If succeeds, we could safely restart the Consensus Loop.
// If not, peer reputation score should be decreased.
if err := c.acceptSuccessiveBlock(blk, kadcastHeight); err != nil {
if err := c.acceptSuccessiveBlock(blk, metadata); err != nil {
return err
}

Expand Down Expand Up @@ -385,15 +383,15 @@ func (c *Chain) ProcessSyncTimerExpired(strPeerAddr string) error {

// acceptSuccessiveBlock will accept a block which directly follows the chain
// tip, and advertises it to the node's peers.
func (c *Chain) acceptSuccessiveBlock(blk block.Block, kadcastHeight byte) error {
func (c *Chain) acceptSuccessiveBlock(blk block.Block, metadata *message.Metadata) error {
log.WithField("height", blk.Header.Height).Trace("accepting succeeding block")

if err := c.isValidHeader(blk, *c.tip, *c.p, log, true); err != nil {
log.WithError(err).Error("invalid block")
return err
}

if err := c.kadcastBlock(blk, kadcastHeight); err != nil {
if err := c.kadcastBlock(blk, metadata); err != nil {
log.WithError(err).Error("block propagation failed")
return err
}
Expand Down Expand Up @@ -735,9 +733,8 @@ func (c *Chain) ExecuteStateTransition(ctx context.Context, txs []transactions.C
return c.proxy.Executor().ExecuteStateTransition(c.ctx, txs, blockGasLimit, blockHeight, generator)
}

func (c *Chain) kadcastBlock(blk block.Block, kadcastHeight byte) error {
log.WithField("blk_height", blk.Header.Height).
WithField("kadcast_h", kadcastHeight).Trace("propagate block")
func (c *Chain) kadcastBlock(blk block.Block, metadata *message.Metadata) error {
log.WithField("blk_height", blk.Header.Height).Trace("propagate block")

buf := new(bytes.Buffer)
if err := message.MarshalBlock(buf, &blk); err != nil {
Expand All @@ -748,8 +745,7 @@ func (c *Chain) kadcastBlock(blk block.Block, kadcastHeight byte) error {
return err
}

c.eventBus.Publish(topics.Kadcast,
message.NewWithHeader(topics.Block, *buf, []byte{kadcastHeight}))
c.eventBus.Publish(topics.Kadcast, message.NewWithMetadata(topics.Block, *buf, metadata))
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/core/chain/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (c *Chain) acceptConsensusResults(ctx context.Context, winnerChan chan cons
return
}

if err = c.acceptSuccessiveBlock(block, config.KadcastInitialHeight); err != nil {
if err = c.acceptSuccessiveBlock(block, nil); err != nil {
log.WithError(err).Error("block acceptance failed")
c.lock.Unlock()
return
Expand Down
9 changes: 6 additions & 3 deletions pkg/core/chain/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@

package chain

import "github.com/dusk-network/dusk-blockchain/pkg/core/data/block"
import (
"github.com/dusk-network/dusk-blockchain/pkg/core/data/block"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message"
)

// Ledger is the Chain interface used in tests.
type Ledger interface {
TryNextConsecutiveBlockInSync(blk block.Block, kadcastHeight byte) error
TryNextConsecutiveBlockOutSync(blk block.Block, kadcastHeight byte) error
TryNextConsecutiveBlockInSync(blk block.Block, metadata *message.Metadata) error
TryNextConsecutiveBlockOutSync(blk block.Block, metadata *message.Metadata) error
TryNextConsecutiveBlockIsValid(blk block.Block) error

// RestartConsensus Stop and Start Consensus.
Expand Down
18 changes: 9 additions & 9 deletions pkg/core/chain/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ const (

var slog = logrus.WithField("process", "sync")

type syncState func(srcPeerAddr string, currentHeight uint64, blk block.Block, kadcastHeight byte) ([]bytes.Buffer, error)
type syncState func(srcPeerAddr string, currentHeight uint64, blk block.Block, metadata *message.Metadata) ([]bytes.Buffer, error)

func (s *synchronizer) inSync(srcPeerAddr string, currentHeight uint64, blk block.Block, kadcastHeight byte) ([]bytes.Buffer, error) {
func (s *synchronizer) inSync(srcPeerAddr string, currentHeight uint64, blk block.Block, metadata *message.Metadata) ([]bytes.Buffer, error) {
if blk.Header.Height > currentHeight+1 {
s.sequencer.add(blk)

Expand All @@ -43,12 +43,12 @@ func (s *synchronizer) inSync(srcPeerAddr string, currentHeight uint64, blk bloc
s.timer.Start(srcPeerAddr)

s.state = s.outSync
b, err := s.startSync(srcPeerAddr, blk.Header.Height, currentHeight, kadcastHeight)
b, err := s.startSync(srcPeerAddr, blk.Header.Height, currentHeight, metadata)
return b, err
}

// Otherwise notify the chain (and the consensus loop).
if err := s.chain.TryNextConsecutiveBlockInSync(blk, kadcastHeight); err != nil {
if err := s.chain.TryNextConsecutiveBlockInSync(blk, metadata); err != nil {
slog.WithField("blk_height", blk.Header.Height).
WithField("blk_hash", hex.EncodeToString(blk.Header.Hash)).
WithField("state", "insync").
Expand All @@ -60,7 +60,7 @@ func (s *synchronizer) inSync(srcPeerAddr string, currentHeight uint64, blk bloc
return nil, nil
}

func (s *synchronizer) outSync(srcPeerAddr string, currentHeight uint64, blk block.Block, kadcastHeight byte) ([]bytes.Buffer, error) {
func (s *synchronizer) outSync(srcPeerAddr string, currentHeight uint64, blk block.Block, metadata *message.Metadata) ([]bytes.Buffer, error) {
var err error

// Once we validate successfully the next block from the syncing
Expand Down Expand Up @@ -102,7 +102,7 @@ func (s *synchronizer) outSync(srcPeerAddr string, currentHeight uint64, blk blo

for _, blk := range blks {
// append them all to the ledger
if err = s.chain.TryNextConsecutiveBlockOutSync(blk, kadcastHeight); err != nil {
if err = s.chain.TryNextConsecutiveBlockOutSync(blk, metadata); err != nil {
slog.WithError(err).WithField("state", "outsync").
Warn("could not accept block")

Expand Down Expand Up @@ -171,17 +171,17 @@ func newSynchronizer(db database.DB, chain Ledger) *synchronizer {
}

// processBlock handles an incoming block from the network.
func (s *synchronizer) processBlock(srcPeerID string, currentHeight uint64, blk block.Block, kadcastHeight byte) (res []bytes.Buffer, err error) {
func (s *synchronizer) processBlock(srcPeerID string, currentHeight uint64, blk block.Block, metadata *message.Metadata) (res []bytes.Buffer, err error) {
// Clean up sequencer
s.sequencer.cleanup(currentHeight)
s.sequencer.dump()

currState := s.state
res, err = currState(srcPeerID, currentHeight, blk, kadcastHeight)
res, err = currState(srcPeerID, currentHeight, blk, metadata)
return
}

func (s *synchronizer) startSync(strPeerAddr string, tipHeight, currentHeight uint64, _ byte) ([]bytes.Buffer, error) {
func (s *synchronizer) startSync(strPeerAddr string, tipHeight, currentHeight uint64, _ *message.Metadata) ([]bytes.Buffer, error) {
s.hrange.from = currentHeight
s.setSyncTarget(tipHeight, currentHeight+config.MaxInvBlocks)

Expand Down
9 changes: 5 additions & 4 deletions pkg/core/chain/synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/dusk-network/dusk-blockchain/pkg/core/database"
"github.com/dusk-network/dusk-blockchain/pkg/core/database/lite"
"github.com/dusk-network/dusk-blockchain/pkg/core/tests/helper"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics"
assert "github.com/stretchr/testify/require"
)
Expand All @@ -25,7 +26,7 @@ func TestSuccessiveBlocks(t *testing.T) {

// tipHeight will be 0, so make the successive block
blk := helper.RandomBlock(1, 1)
res, err := s.processBlock("", 0, *blk, 0)
res, err := s.processBlock("", 0, *blk, nil)
assert.NoError(err)
assert.Nil(res)

Expand All @@ -40,7 +41,7 @@ func TestFutureBlocks(t *testing.T) {

height := uint64(10)
blk := helper.RandomBlock(height, 1)
resp, err := s.processBlock("", 0, *blk, 0)
resp, err := s.processBlock("", 0, *blk, nil)
assert.NoError(err)

// Response should be of the GetBlocks topic
Expand Down Expand Up @@ -75,12 +76,12 @@ func (m *mockChain) CurrentHeight() uint64 {
return m.tipHeight
}

func (m *mockChain) TryNextConsecutiveBlockInSync(blk block.Block, _ byte) error {
func (m *mockChain) TryNextConsecutiveBlockInSync(blk block.Block, _ *message.Metadata) error {
m.catchBlockChan <- consensus.Results{Blk: blk, Err: nil}
return nil
}

func (m *mockChain) TryNextConsecutiveBlockOutSync(_ block.Block, _ byte) error {
func (m *mockChain) TryNextConsecutiveBlockOutSync(_ block.Block, _ *message.Metadata) error {
return nil
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/core/consensus/agreement/agreement_in_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"testing"

"github.com/dusk-network/bls12_381-sign/go/cgo/bls"
"github.com/dusk-network/dusk-blockchain/pkg/config"
"github.com/dusk-network/dusk-blockchain/pkg/core/consensus/header"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics"
Expand Down Expand Up @@ -90,7 +89,7 @@ func TestAccumulatorProcessingAggregation(t *testing.T) {
// Verify certificate
comm = handler.Committee(hdr.Round, hdr.Step)

msg := message.NewWithHeader(topics.AggrAgreement, aggro, config.KadcastInitHeader)
msg := message.New(topics.AggrAgreement, aggro)
buf, err := message.Marshal(msg)
assert.Nil(t, err, "failed to marshal aggragreement")

Expand Down
9 changes: 4 additions & 5 deletions pkg/core/consensus/agreement/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"time"

"github.com/dusk-network/bls12_381-sign/go/cgo/bls"
"github.com/dusk-network/dusk-blockchain/pkg/config"
"github.com/dusk-network/dusk-blockchain/pkg/core/candidate"
"github.com/dusk-network/dusk-blockchain/pkg/core/consensus"
"github.com/dusk-network/dusk-blockchain/pkg/core/consensus/header"
Expand Down Expand Up @@ -244,7 +243,7 @@ func collectAgreement(h *handler, accumulator *Accumulator, ev message.Message,
return
}

m := message.NewWithHeader(topics.Agreement, a.Copy().(message.Agreement), ev.Header())
m := message.NewWithMetadata(topics.Agreement, a.Copy().(message.Agreement), ev.Metadata())

// Once the event is verified, we can republish it.
if err := e.Republish(m); err != nil {
Expand Down Expand Up @@ -288,9 +287,9 @@ func (s *Loop) processCollectedVotes(ctx context.Context, handler *handler, evs
bits := comm.Bits(*pubs)
agAgreement := message.NewAggrAgreement(evs[0], bits, sig)

m := message.NewWithHeader(topics.AggrAgreement, agAgreement, config.KadcastInitHeader)
m := message.New(topics.AggrAgreement, agAgreement)
if err := s.Emitter.Republish(m); err != nil {
lg.WithError(err).Error("could not republish aggregated agreement event")
lg.WithError(err).Error("could broadcast aggregated agreement event")
} else {
lg.
WithField("round", r.Round).
Expand Down Expand Up @@ -380,7 +379,7 @@ func (s *Loop) processAggrAgreement(ctx context.Context, h *handler, msg message
}

// Broadcast
m := message.NewWithHeader(topics.AggrAgreement, aggro.Copy(), config.KadcastInitHeader)
m := message.NewWithMetadata(topics.AggrAgreement, aggro.Copy(), msg.Metadata())
if err = e.Republish(m); err != nil {
lg.WithError(err).Errorln("could not republish aggragreement event")
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/consensus/comms.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (e *Emitter) Kadcast(msg message.Message) error {
return err
}

serialized := message.NewWithHeader(msg.Category(), buf, msg.Header())
serialized := message.NewWithMetadata(msg.Category(), buf, msg.Metadata())
e.EventBus.Publish(topics.Kadcast, serialized)
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/core/consensus/reduction/firststep/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (p *Phase) Run(ctx context.Context, queue *consensus.Queue, _, reductionCha

// if collectReduction returns a StepVote, it means we reached
// consensus and can go to the next step
if sv := p.collectReduction(ctx, rMsg, r.Round, step, ev.Header()); sv != nil {
if sv := p.collectReduction(ctx, rMsg, r.Round, step, ev.Metadata()); sv != nil {
go func() {
<-timeoutChan
}()
Expand All @@ -141,7 +141,7 @@ func (p *Phase) Run(ctx context.Context, queue *consensus.Queue, _, reductionCha
continue
}

sv := p.collectReduction(ctx, rMsg, r.Round, step, ev.Header())
sv := p.collectReduction(ctx, rMsg, r.Round, step, ev.Metadata())
if sv != nil {
// preventing timeout leakage
go func() {
Expand Down Expand Up @@ -174,7 +174,7 @@ func (p *Phase) gotoNextPhase(msg *message.StepVotesMsg) consensus.PhaseFn {
return p.next.Initialize(*msg)
}

func (p *Phase) collectReduction(ctx context.Context, r message.Reduction, round uint64, step uint8, msgHeader []byte) *message.StepVotesMsg {
func (p *Phase) collectReduction(ctx context.Context, r message.Reduction, round uint64, step uint8, metadata *message.Metadata) *message.StepVotesMsg {
if err := p.handler.VerifySignature(r.Copy().(message.Reduction)); err != nil {
lg.
WithError(err).
Expand All @@ -194,7 +194,7 @@ func (p *Phase) collectReduction(ctx context.Context, r message.Reduction, round
Debug("")
}

m := message.NewWithHeader(topics.Reduction, r, msgHeader)
m := message.NewWithMetadata(topics.Reduction, r, metadata)

// Once the event is verified, we can republish it.
if err := p.Emitter.Republish(m); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/consensus/reduction/reduction.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (r *Reduction) SendReduction(ctx context.Context, round uint64, step uint8,
red := message.NewReduction(hdr)
red.SignedHash = sig

m := message.NewWithHeader(topics.Reduction, *red, config.KadcastInitHeader)
m := message.New(topics.Reduction, *red)
return m, voteHash, nil
}

Expand Down
Loading