Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: clean up p2p & implement bootnode (seed peers) support #2852

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions gno.land/cmd/gnoland/secrets_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,5 @@ func generateLastSignValidatorState() *privval.FilePVLastSignState {

// generateNodeKey generates the p2p node key
func generateNodeKey() *p2p.NodeKey {
privKey := ed25519.GenPrivKey()

return &p2p.NodeKey{
PrivKey: privKey,
}
return p2p.GenerateNodeKey()
}
40 changes: 20 additions & 20 deletions tm2/pkg/bft/blockchain/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/gnolang/gno/tm2/pkg/bft/types"
"github.com/gnolang/gno/tm2/pkg/flow"
"github.com/gnolang/gno/tm2/pkg/log"
"github.com/gnolang/gno/tm2/pkg/p2p"
types2 "github.com/gnolang/gno/tm2/pkg/p2p"
"github.com/gnolang/gno/tm2/pkg/service"
)

Expand Down Expand Up @@ -69,7 +69,7 @@ type BlockPool struct {
requesters map[int64]*bpRequester
height int64 // the lowest key in requesters.
// peers
peers map[p2p.ID]*bpPeer
peers map[types2.ID]*bpPeer
maxPeerHeight int64 // the biggest reported height

// atomic
Expand All @@ -83,7 +83,7 @@ type BlockPool struct {
// requests and errors will be sent to requestsCh and errorsCh accordingly.
func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- peerError) *BlockPool {
bp := &BlockPool{
peers: make(map[p2p.ID]*bpPeer),
peers: make(map[types2.ID]*bpPeer),

requesters: make(map[int64]*bpRequester),
height: start,
Expand Down Expand Up @@ -226,13 +226,13 @@ func (pool *BlockPool) PopRequest() {
// RedoRequest invalidates the block at pool.height,
// Remove the peer and redo request from others.
// Returns the ID of the removed peer.
func (pool *BlockPool) RedoRequest(height int64) p2p.ID {
func (pool *BlockPool) RedoRequest(height int64) types2.ID {
pool.mtx.Lock()
defer pool.mtx.Unlock()

request := pool.requesters[height]
peerID := request.getPeerID()
if peerID != p2p.ID("") {
if peerID != types2.ID("") {
// RemovePeer will redo all requesters associated with this peer.
pool.removePeer(peerID)
}
Expand All @@ -241,7 +241,7 @@ func (pool *BlockPool) RedoRequest(height int64) p2p.ID {

// AddBlock validates that the block comes from the peer it was expected from and calls the requester to store it.
// TODO: ensure that blocks come in order for each peer.
func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int) {
func (pool *BlockPool) AddBlock(peerID types2.ID, block *types.Block, blockSize int) {
pool.mtx.Lock()
defer pool.mtx.Unlock()

Expand Down Expand Up @@ -278,7 +278,7 @@ func (pool *BlockPool) MaxPeerHeight() int64 {
}

// SetPeerHeight sets the peer's alleged blockchain height.
func (pool *BlockPool) SetPeerHeight(peerID p2p.ID, height int64) {
func (pool *BlockPool) SetPeerHeight(peerID types2.ID, height int64) {
pool.mtx.Lock()
defer pool.mtx.Unlock()

Expand All @@ -298,14 +298,14 @@ func (pool *BlockPool) SetPeerHeight(peerID p2p.ID, height int64) {

// RemovePeer removes the peer with peerID from the pool. If there's no peer
// with peerID, function is a no-op.
func (pool *BlockPool) RemovePeer(peerID p2p.ID) {
func (pool *BlockPool) RemovePeer(peerID types2.ID) {
pool.mtx.Lock()
defer pool.mtx.Unlock()

pool.removePeer(peerID)
}

func (pool *BlockPool) removePeer(peerID p2p.ID) {
func (pool *BlockPool) removePeer(peerID types2.ID) {
for _, requester := range pool.requesters {
if requester.getPeerID() == peerID {
requester.redo(peerID)
Expand Down Expand Up @@ -386,14 +386,14 @@ func (pool *BlockPool) requestersLen() int64 {
return int64(len(pool.requesters))
}

func (pool *BlockPool) sendRequest(height int64, peerID p2p.ID) {
func (pool *BlockPool) sendRequest(height int64, peerID types2.ID) {
if !pool.IsRunning() {
return
}
pool.requestsCh <- BlockRequest{height, peerID}
}

func (pool *BlockPool) sendError(err error, peerID p2p.ID) {
func (pool *BlockPool) sendError(err error, peerID types2.ID) {
if !pool.IsRunning() {
return
}
Expand Down Expand Up @@ -424,7 +424,7 @@ func (pool *BlockPool) debug() string {

type bpPeer struct {
pool *BlockPool
id p2p.ID
id types2.ID
recvMonitor *flow.Monitor

height int64
Expand All @@ -435,7 +435,7 @@ type bpPeer struct {
logger *slog.Logger
}

func newBPPeer(pool *BlockPool, peerID p2p.ID, height int64) *bpPeer {
func newBPPeer(pool *BlockPool, peerID types2.ID, height int64) *bpPeer {
peer := &bpPeer{
pool: pool,
id: peerID,
Expand Down Expand Up @@ -499,10 +499,10 @@ type bpRequester struct {
pool *BlockPool
height int64
gotBlockCh chan struct{}
redoCh chan p2p.ID // redo may send multitime, add peerId to identify repeat
redoCh chan types2.ID // redo may send multitime, add peerId to identify repeat

mtx sync.Mutex
peerID p2p.ID
peerID types2.ID
block *types.Block
}

Expand All @@ -511,7 +511,7 @@ func newBPRequester(pool *BlockPool, height int64) *bpRequester {
pool: pool,
height: height,
gotBlockCh: make(chan struct{}, 1),
redoCh: make(chan p2p.ID, 1),
redoCh: make(chan types2.ID, 1),

peerID: "",
block: nil,
Expand All @@ -526,7 +526,7 @@ func (bpr *bpRequester) OnStart() error {
}

// Returns true if the peer matches and block doesn't already exist.
func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool {
func (bpr *bpRequester) setBlock(block *types.Block, peerID types2.ID) bool {
bpr.mtx.Lock()
if bpr.block != nil || bpr.peerID != peerID {
bpr.mtx.Unlock()
Expand All @@ -548,7 +548,7 @@ func (bpr *bpRequester) getBlock() *types.Block {
return bpr.block
}

func (bpr *bpRequester) getPeerID() p2p.ID {
func (bpr *bpRequester) getPeerID() types2.ID {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()
return bpr.peerID
Expand All @@ -570,7 +570,7 @@ func (bpr *bpRequester) reset() {
// Tells bpRequester to pick another peer and try again.
// NOTE: Nonblocking, and does nothing if another redo
// was already requested.
func (bpr *bpRequester) redo(peerID p2p.ID) {
func (bpr *bpRequester) redo(peerID types2.ID) {
select {
case bpr.redoCh <- peerID:
default:
Expand Down Expand Up @@ -631,5 +631,5 @@ OUTER_LOOP:
// delivering the block
type BlockRequest struct {
Height int64
PeerID p2p.ID
PeerID types2.ID
}
16 changes: 8 additions & 8 deletions tm2/pkg/bft/blockchain/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (
"testing"
"time"

types2 "github.com/gnolang/gno/tm2/pkg/p2p"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/gnolang/gno/tm2/pkg/bft/types"
"github.com/gnolang/gno/tm2/pkg/log"
"github.com/gnolang/gno/tm2/pkg/p2p"
"github.com/gnolang/gno/tm2/pkg/random"
)

Expand All @@ -19,7 +19,7 @@ func init() {
}

type testPeer struct {
id p2p.ID
id types2.ID
height int64
inputChan chan inputData // make sure each peer's data is sequential
}
Expand Down Expand Up @@ -47,7 +47,7 @@ func (p testPeer) simulateInput(input inputData) {
// input.t.Logf("Added block from peer %v (height: %v)", input.request.PeerID, input.request.Height)
}

type testPeers map[p2p.ID]testPeer
type testPeers map[types2.ID]testPeer

func (ps testPeers) start() {
for _, v := range ps {
Expand All @@ -64,7 +64,7 @@ func (ps testPeers) stop() {
func makePeers(numPeers int, minHeight, maxHeight int64) testPeers {
peers := make(testPeers, numPeers)
for i := 0; i < numPeers; i++ {
peerID := p2p.ID(random.RandStr(12))
peerID := types2.ID(random.RandStr(12))
height := minHeight + random.RandInt63n(maxHeight-minHeight)
peers[peerID] = testPeer{peerID, height, make(chan inputData, 10)}
}
Expand Down Expand Up @@ -172,7 +172,7 @@ func TestBlockPoolTimeout(t *testing.T) {

// Pull from channels
counter := 0
timedOut := map[p2p.ID]struct{}{}
timedOut := map[types2.ID]struct{}{}
for {
select {
case err := <-errorsCh:
Expand All @@ -195,7 +195,7 @@ func TestBlockPoolRemovePeer(t *testing.T) {

peers := make(testPeers, 10)
for i := 0; i < 10; i++ {
peerID := p2p.ID(fmt.Sprintf("%d", i+1))
peerID := types2.ID(fmt.Sprintf("%d", i+1))
height := int64(i + 1)
peers[peerID] = testPeer{peerID, height, make(chan inputData)}
}
Expand All @@ -215,10 +215,10 @@ func TestBlockPoolRemovePeer(t *testing.T) {
assert.EqualValues(t, 10, pool.MaxPeerHeight())

// remove not-existing peer
assert.NotPanics(t, func() { pool.RemovePeer(p2p.ID("Superman")) })
assert.NotPanics(t, func() { pool.RemovePeer(types2.ID("Superman")) })

// remove peer with biggest height
pool.RemovePeer(p2p.ID("10"))
pool.RemovePeer(types2.ID("10"))
assert.EqualValues(t, 9, pool.MaxPeerHeight())

// remove all peers
Expand Down
5 changes: 2 additions & 3 deletions tm2/pkg/bft/blockchain/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@

// SetLogger implements cmn.Service by setting the logger on reactor and pool.
func (bcR *BlockchainReactor) SetLogger(l *slog.Logger) {
bcR.BaseService.Logger = l

Check failure on line 106 in tm2/pkg/bft/blockchain/reactor.go

View workflow job for this annotation

GitHub Actions / Run Main / Go Linter / lint

bcR.BaseService undefined (type *BlockchainReactor has no field or method BaseService) (typecheck)

Check failure on line 106 in tm2/pkg/bft/blockchain/reactor.go

View workflow job for this annotation

GitHub Actions / Run Main / Go Linter / lint

bcR.BaseService undefined (type *BlockchainReactor has no field or method BaseService) (typecheck)

Check failure on line 106 in tm2/pkg/bft/blockchain/reactor.go

View workflow job for this annotation

GitHub Actions / Run Main / Go Linter / lint

bcR.BaseService undefined (type *BlockchainReactor has no field or method BaseService) (typecheck)

Check failure on line 106 in tm2/pkg/bft/blockchain/reactor.go

View workflow job for this annotation

GitHub Actions / Run Main / Go Linter / lint

bcR.BaseService undefined (type *BlockchainReactor has no field or method BaseService) (typecheck)

Check failure on line 106 in tm2/pkg/bft/blockchain/reactor.go

View workflow job for this annotation

GitHub Actions / Run Main / Go Linter / lint

bcR.BaseService undefined (type *BlockchainReactor has no field or method BaseService) (typecheck)

Check failure on line 106 in tm2/pkg/bft/blockchain/reactor.go

View workflow job for this annotation

GitHub Actions / Run Main / Go Linter / lint

bcR.BaseService undefined (type *BlockchainReactor has no field or method BaseService) (typecheck)

Check failure on line 106 in tm2/pkg/bft/blockchain/reactor.go

View workflow job for this annotation

GitHub Actions / Run Main / Go Linter / lint

bcR.BaseService undefined (type *BlockchainReactor has no field or method BaseService) (typecheck)

Check failure on line 106 in tm2/pkg/bft/blockchain/reactor.go

View workflow job for this annotation

GitHub Actions / Run Main / Go Linter / lint

bcR.BaseService undefined (type *BlockchainReactor has no field or method BaseService) (typecheck)

Check failure on line 106 in tm2/pkg/bft/blockchain/reactor.go

View workflow job for this annotation

GitHub Actions / Run Main / Go Linter / lint

bcR.BaseService undefined (type *BlockchainReactor has no field or method BaseService) (typecheck)

Check failure on line 106 in tm2/pkg/bft/blockchain/reactor.go

View workflow job for this annotation

GitHub Actions / Run Main / Go Linter / lint

bcR.BaseService undefined (type *BlockchainReactor has no field or method BaseService) (typecheck)
bcR.pool.Logger = l
}

Expand Down Expand Up @@ -257,9 +257,8 @@
select {
case <-switchToConsensusTicker.C:
height, numPending, lenRequesters := bcR.pool.GetStatus()
outbound, inbound, _ := bcR.Switch.NumPeers()
bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "total", lenRequesters,
"outbound", outbound, "inbound", inbound)

bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "total", lenRequesters)
if bcR.pool.IsCaughtUp() {
bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
bcR.pool.Stop()
Expand Down
18 changes: 9 additions & 9 deletions tm2/pkg/bft/consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/gnolang/gno/tm2/pkg/errors"
"github.com/gnolang/gno/tm2/pkg/events"
osm "github.com/gnolang/gno/tm2/pkg/os"
"github.com/gnolang/gno/tm2/pkg/p2p"
types2 "github.com/gnolang/gno/tm2/pkg/p2p"
"github.com/gnolang/gno/tm2/pkg/service"
"github.com/gnolang/gno/tm2/pkg/telemetry"
"github.com/gnolang/gno/tm2/pkg/telemetry/metrics"
Expand Down Expand Up @@ -53,7 +53,7 @@ type newRoundStepInfo struct {
// msgs from the reactor which may update the state
type msgInfo struct {
Msg ConsensusMessage `json:"msg"`
PeerID p2p.ID `json:"peer_key"`
PeerID types2.ID `json:"peer_key"`
}

// WAL message.
Expand Down Expand Up @@ -399,7 +399,7 @@ func (cs *ConsensusState) OpenWAL(walFile string) (walm.WAL, error) {
// TODO: should these return anything or let callers just use events?

// AddVote inputs a vote.
func (cs *ConsensusState) AddVote(vote *types.Vote, peerID p2p.ID) (added bool, err error) {
func (cs *ConsensusState) AddVote(vote *types.Vote, peerID types2.ID) (added bool, err error) {
if peerID == "" {
cs.internalMsgQueue <- msgInfo{&VoteMessage{vote}, ""}
} else {
Expand All @@ -411,7 +411,7 @@ func (cs *ConsensusState) AddVote(vote *types.Vote, peerID p2p.ID) (added bool,
}

// SetProposal inputs a proposal.
func (cs *ConsensusState) SetProposal(proposal *types.Proposal, peerID p2p.ID) error {
func (cs *ConsensusState) SetProposal(proposal *types.Proposal, peerID types2.ID) error {
if peerID == "" {
cs.internalMsgQueue <- msgInfo{&ProposalMessage{proposal}, ""}
} else {
Expand All @@ -423,7 +423,7 @@ func (cs *ConsensusState) SetProposal(proposal *types.Proposal, peerID p2p.ID) e
}

// AddProposalBlockPart inputs a part of the proposal block.
func (cs *ConsensusState) AddProposalBlockPart(height int64, round int, part *types.Part, peerID p2p.ID) error {
func (cs *ConsensusState) AddProposalBlockPart(height int64, round int, part *types.Part, peerID types2.ID) error {
if peerID == "" {
cs.internalMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, ""}
} else {
Expand All @@ -435,7 +435,7 @@ func (cs *ConsensusState) AddProposalBlockPart(height int64, round int, part *ty
}

// SetProposalAndBlock inputs the proposal and all block parts.
func (cs *ConsensusState) SetProposalAndBlock(proposal *types.Proposal, block *types.Block, parts *types.PartSet, peerID p2p.ID) error {
func (cs *ConsensusState) SetProposalAndBlock(proposal *types.Proposal, block *types.Block, parts *types.PartSet, peerID types2.ID) error {
if err := cs.SetProposal(proposal, peerID); err != nil {
return err
}
Expand Down Expand Up @@ -1444,7 +1444,7 @@ func (cs *ConsensusState) defaultSetProposal(proposal *types.Proposal) error {

// NOTE: block is not necessarily valid.
// Asynchronously triggers either enterPrevote (before we timeout of propose) or tryFinalizeCommit, once we have the full block.
func (cs *ConsensusState) addProposalBlockPart(msg *BlockPartMessage, peerID p2p.ID) (added bool, err error) {
func (cs *ConsensusState) addProposalBlockPart(msg *BlockPartMessage, peerID types2.ID) (added bool, err error) {
height, round, part := msg.Height, msg.Round, msg.Part

// Blocks might be reused, so round mismatch is OK
Expand Down Expand Up @@ -1514,7 +1514,7 @@ func (cs *ConsensusState) addProposalBlockPart(msg *BlockPartMessage, peerID p2p
}

// Attempt to add the vote. if its a duplicate signature, dupeout the validator
func (cs *ConsensusState) tryAddVote(vote *types.Vote, peerID p2p.ID) (bool, error) {
func (cs *ConsensusState) tryAddVote(vote *types.Vote, peerID types2.ID) (bool, error) {
added, err := cs.addVote(vote, peerID)
if err != nil {
// If the vote height is off, we'll just ignore it,
Expand Down Expand Up @@ -1547,7 +1547,7 @@ func (cs *ConsensusState) tryAddVote(vote *types.Vote, peerID p2p.ID) (bool, err

// -----------------------------------------------------------------------------

func (cs *ConsensusState) addVote(vote *types.Vote, peerID p2p.ID) (added bool, err error) {
func (cs *ConsensusState) addVote(vote *types.Vote, peerID types2.ID) (added bool, err error) {
cs.Logger.Debug("addVote", "voteHeight", vote.Height, "voteType", vote.Type, "valIndex", vote.ValidatorIndex, "csHeight", cs.Height)

// A precommit for the previous height?
Expand Down
10 changes: 5 additions & 5 deletions tm2/pkg/bft/consensus/types/height_vote_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

"github.com/gnolang/gno/tm2/pkg/amino"
"github.com/gnolang/gno/tm2/pkg/bft/types"
"github.com/gnolang/gno/tm2/pkg/p2p"
types2 "github.com/gnolang/gno/tm2/pkg/p2p"
)

type RoundVoteSet struct {
Expand Down Expand Up @@ -41,7 +41,7 @@ type HeightVoteSet struct {
mtx sync.Mutex
round int // max tracked round
roundVoteSets map[int]RoundVoteSet // keys: [0...round]
peerCatchupRounds map[p2p.ID][]int // keys: peer.ID; values: at most 2 rounds
peerCatchupRounds map[types2.ID][]int // keys: peer.ID; values: at most 2 rounds
}

func NewHeightVoteSet(chainID string, height int64, valSet *types.ValidatorSet) *HeightVoteSet {
Expand All @@ -59,7 +59,7 @@ func (hvs *HeightVoteSet) Reset(height int64, valSet *types.ValidatorSet) {
hvs.height = height
hvs.valSet = valSet
hvs.roundVoteSets = make(map[int]RoundVoteSet)
hvs.peerCatchupRounds = make(map[p2p.ID][]int)
hvs.peerCatchupRounds = make(map[types2.ID][]int)

hvs.addRound(0)
hvs.round = 0
Expand Down Expand Up @@ -108,7 +108,7 @@ func (hvs *HeightVoteSet) addRound(round int) {

// Duplicate votes return added=false, err=nil.
// By convention, peerID is "" if origin is self.
func (hvs *HeightVoteSet) AddVote(vote *types.Vote, peerID p2p.ID) (added bool, err error) {
func (hvs *HeightVoteSet) AddVote(vote *types.Vote, peerID types2.ID) (added bool, err error) {
hvs.mtx.Lock()
defer hvs.mtx.Unlock()
if !types.IsVoteTypeValid(vote.Type) {
Expand Down Expand Up @@ -176,7 +176,7 @@ func (hvs *HeightVoteSet) getVoteSet(round int, type_ types.SignedMsgType) *type
// NOTE: if there are too many peers, or too much peer churn,
// this can cause memory issues.
// TODO: implement ability to remove peers too
func (hvs *HeightVoteSet) SetPeerMaj23(round int, type_ types.SignedMsgType, peerID p2p.ID, blockID types.BlockID) error {
func (hvs *HeightVoteSet) SetPeerMaj23(round int, type_ types.SignedMsgType, peerID types2.ID, blockID types.BlockID) error {
hvs.mtx.Lock()
defer hvs.mtx.Unlock()
if !types.IsVoteTypeValid(type_) {
Expand Down
Loading
Loading