Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(block manager): rotation graceful role rotation #1154

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
17 changes: 11 additions & 6 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
// Prune old heights, if requested by ABCI app.
// retainHeight is determined by currentHeight - min-retain-blocks (app.toml config).
// Unless max_age_num_blocks in consensus params is higher than min-retain-block, then max_age_num_blocks will be used instead of min-retain-blocks.

if 0 < retainHeight {
// TODO: can be called in intervals rather than every block (https://github.com/dymensionxyz/dymint/issues/334)
select {
case m.pruningC <- retainHeight:
default:
Expand All @@ -86,8 +86,12 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
m.Executor.UpdateStateAfterCommit(m.State, responses, appHash, block.Header.Height)
}

// check if the proposer needs to be changed
switchRole := m.Executor.UpdateProposerFromBlock(m.State, block)
// update proposer from block header if needed
// if new proposer is set, we become the proposer
isNewProposer, err := m.Executor.UpdateProposerFromBlock(m.State, block)
if err != nil {
return fmt.Errorf("update proposer from block: %w", err)
}

// save sequencers to store to be queried over RPC
batch := m.Store.NewBatch()
Expand All @@ -110,10 +114,11 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta

m.blockCache.Delete(block.Header.Height)

if switchRole {
// TODO: graceful role change (https://github.com/dymensionxyz/dymint/issues/1008)
// signal the role switch, in case where this node is the new proposer
// the other direction is handled elsewhere
if isNewProposer && block.Header.Height == m.TargetHeight.Load() {
Comment on lines +117 to +119
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a comment (or maybe even a condition) that the code in if can be called only in a full node mode. meaning, if the node is a proposer, then it can't switch to the proposer mode once again.

m.roleSwitchC <- true
m.logger.Info("Node changing to proposer role")
panic("sequencer is no longer the proposer")
}

// validate whether configuration params and rollapp consensus params keep in line, after rollapp params are updated from the responses received in the block execution
Expand Down
4 changes: 4 additions & 0 deletions block/initchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,9 @@ func (m *Manager) RunInitChain(ctx context.Context) error {
if _, err := m.Store.SaveState(m.State, nil); err != nil {
return err
}

targetHeight := uint64(m.Genesis.InitialHeight - 1)
m.UpdateLastSubmittedHeight(targetHeight)
m.UpdateTargetHeight(targetHeight)
return nil
}
239 changes: 152 additions & 87 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sync"
"sync/atomic"

"github.com/dymensionxyz/gerr-cosmos/gerrc"
"golang.org/x/sync/errgroup"

"github.com/dymensionxyz/dymint/da/registry"
Expand All @@ -16,6 +15,7 @@ import (
uerrors "github.com/dymensionxyz/dymint/utils/errors"
uevent "github.com/dymensionxyz/dymint/utils/event"
"github.com/dymensionxyz/dymint/version"
"github.com/dymensionxyz/gerr-cosmos/gerrc"

"github.com/libp2p/go-libp2p/core/crypto"
tmcrypto "github.com/tendermint/tendermint/crypto"
Expand Down Expand Up @@ -54,6 +54,8 @@ type Manager struct {
DAClient da.DataAvailabilityLayerClient
SLClient settlement.ClientI

isProposer bool // is the local node the proposer
roleSwitchC chan bool // channel to receive role switch signal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so true is this chan means a proposer and false is not a proposer, right? seems a bit intransparent for me, i'd use an enum here or at least add a comment with explanations

type Role bool

const (
    Proposer Role = true
    FullNode Role = false
)

/*
Submission
*/
Expand Down Expand Up @@ -133,7 +135,8 @@ func NewManager(
blockCache: &Cache{
cache: make(map[uint64]types.CachedBlock),
},
pruningC: make(chan int64, 10), // use of buffered channel to avoid blocking applyBlock thread. In case channel is full, pruning will be skipped, but the retain height can be pruned in the next iteration.
pruningC: make(chan int64, 10), // use of buffered channel to avoid blocking applyBlock thread. In case channel is full, pruning will be skipped, but the retain height can be pruned in the next iteration.
roleSwitchC: make(chan bool, 1), // channel to be used to signal role switch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it buffered?

}

err = m.LoadStateOnInit(store, genesis, logger)
Expand All @@ -155,6 +158,93 @@ func NewManager(
return m, nil
}

// runNonProducerLoops runs the loops that are common to all nodes, but not the proposer.
// This includes syncing from the DA and SL, and listening to new blocks from P2P.
func (m *Manager) runNonProducerLoops(ctx context.Context) {
// P2P Sync. Subscribe to P2P received blocks events
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewGossipedBlock, m.onReceivedBlock, m.logger)
Dismissed Show dismissed Hide dismissed
go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockSyncBlocksLoop", p2p.EventQueryNewBlockSyncBlock, m.onReceivedBlock, m.logger)
Dismissed Show dismissed Hide dismissed
// SL Sync. Subscribe to SL state update events
go uevent.MustSubscribe(ctx, m.Pubsub, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted, m.onNewStateUpdate, m.logger)
Dismissed Show dismissed Hide dismissed
}

func (m *Manager) runProducerLoops(ctx context.Context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it's better to call it runProposerLoops and runNonProposerLoops respectively? not to confuse the terminology

eg, ctx := errgroup.WithContext(ctx)

// populate the bytes produced channel
bytesProducedC := make(chan int)
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.SubmitLoop(ctx, bytesProducedC)
})
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
bytesProducedC <- m.GetUnsubmittedBytes() // load unsubmitted bytes from previous run
return m.ProduceBlockLoop(ctx, bytesProducedC)
})

// channel to signal sequencer rotation started
rotateSequencerC := make(chan string, 1)
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.MonitorSequencerRotation(ctx, rotateSequencerC)
})

_ = eg.Wait()
// Check if exited due to sequencer rotation signal
select {
case nextSeqAddr := <-rotateSequencerC:
m.handleRotationReq(ctx, nextSeqAddr)
m.roleSwitchC <- false
default:
m.logger.Info("producer err group finished.")
}
}

func (m *Manager) RunLoops(ctx context.Context) {
/* --------------------------------- common --------------------------------- */
// listen to new bonded sequencers events to add them in the sequencer set
go uevent.MustSubscribe(ctx, m.Pubsub, "newBondedSequencer", settlement.EventQueryNewBondedSequencer, m.UpdateSequencerSet, m.logger)
Dismissed Show dismissed Hide dismissed
// run pruning loop
go m.PruningLoop(ctx)
Dismissed Show dismissed Hide dismissed

// run loops initially (producer or non-producer)
cancel := m.runLoopsWithCancelFunc(ctx)

// listen to role switch trigger
go func() {
for {
select {
// ctx cancelled, shutdown
case <-ctx.Done():
cancel()
return
case proposer := <-m.roleSwitchC:
if proposer == m.isProposer {
m.logger.Error("Role switch signal received, but already in the same role", "proposer", proposer)
continue
}
m.logger.Info("Role switch signal received", "from", m.isProposer, "to", proposer)
m.isProposer = proposer

// shutdown all active loops and run loops again with new role
cancel()
cancel = m.runLoopsWithCancelFunc(ctx)
}
}
}()
}

func (m *Manager) runLoopsWithCancelFunc(ctx context.Context) context.CancelFunc {
loopCtx, cancel := context.WithCancel(ctx)
if m.isProposer {
// we processed the last block, so it will be the committed height (submitted by the previous proposer)
m.UpdateLastSubmittedHeight(m.State.Height())

go m.runProducerLoops(loopCtx)
Dismissed Show dismissed Hide dismissed
} else {
m.runNonProducerLoops(loopCtx)
}
return cancel
}

// Start starts the block manager.
func (m *Manager) Start(ctx context.Context) error {
// Check if InitChain flow is needed
Expand All @@ -173,83 +263,45 @@ func (m *Manager) Start(ctx context.Context) error {
return err
}

isProposer := m.IsProposer()
m.logger.Info("starting block manager", "proposer", isProposer)

eg, ctx := errgroup.WithContext(ctx)
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.PruningLoop(ctx)
})

// listen to new bonded sequencers events to add them in the sequencer set
go uevent.MustSubscribe(ctx, m.Pubsub, "newBondedSequencer", settlement.EventQueryNewBondedSequencer, m.UpdateSequencerSet, m.logger)

/* ----------------------------- full node mode ----------------------------- */
if !isProposer {
err = m.syncMetadataFromSettlement()
if err != nil {
return fmt.Errorf("sync block manager from settlement: %w", err)
}
targetHeight := m.TargetHeight.Load()

m.logger.Info("starting block manager", "proposer", m.isProposer, "targetHeight", targetHeight, "height", m.State.Height())
/* -------------------------------------------------------------------------- */
/* sync section */
/* -------------------------------------------------------------------------- */
if !m.isProposer {
/* ----------------------------- full node mode ----------------------------- */
// Full-nodes can sync from DA but it is not necessary to wait for it, since it can sync from P2P as well in parallel.
go func() {
err := m.syncFromSettlement()
err = m.syncToTargetHeight(targetHeight)
if err != nil {
m.logger.Error("sync block manager from settlement", "err", err)
m.logger.Error("sync to target height", "error", err)
}
// DA Sync. Subscribe to SL next batch events
go uevent.MustSubscribe(ctx, m.Pubsub, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted, m.onNewStateUpdate, m.logger)
}()
} else {
/* ----------------------------- sequencer mode ----------------------------- */
// Sequencer must wait till DA is synced to start submitting blobs
<-m.DAClient.Synced()
err = m.syncToTargetHeight(targetHeight)
if err != nil {
return fmt.Errorf("sync block manager from settlement: %w", err)
}

// P2P Sync. Subscribe to P2P received blocks events
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewGossipedBlock, m.onReceivedBlock, m.logger)
go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockSyncBlocksLoop", p2p.EventQueryNewBlockSyncBlock, m.onReceivedBlock, m.logger)
return nil
}

/* ----------------------------- sequencer mode ----------------------------- */
// Subscribe to batch events, to update last submitted height in case batch confirmation was lost. This could happen if the sequencer crash/restarted just after submitting a batch to the settlement and by the time we query the last batch, this batch wasn't accepted yet.
go uevent.MustSubscribe(ctx, m.Pubsub, "updateSubmittedHeightLoop", settlement.EventQueryNewSettlementBatchAccepted, m.UpdateLastSubmittedHeight, m.logger)

// Sequencer must wait till DA is synced to start submitting blobs
<-m.DAClient.Synced()
err = m.syncFromSettlement()
if err != nil {
return fmt.Errorf("sync block manager from settlement: %w", err)
}
// check if sequencer in the middle of rotation
nextSeqAddr, missing, err := m.MissingLastBatch()
if err != nil {
return fmt.Errorf("checking if missing last batch: %w", err)
}
// if sequencer is in the middle of rotation, complete rotation instead of running the main loop
if missing {
m.handleRotationReq(ctx, nextSeqAddr)
return nil
// check if sequencer in the middle of rotation
err := m.CompleteRotationIfNeeded(ctx)
if err != nil {
return fmt.Errorf("checking if missing last batch: %w", err)
}
}

// populate the bytes produced channel
bytesProducedC := make(chan int)

// channel to signal sequencer rotation started
rotateSequencerC := make(chan string, 1)

uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.SubmitLoop(ctx, bytesProducedC)
})
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
bytesProducedC <- m.GetUnsubmittedBytes() // load unsubmitted bytes from previous run
return m.ProduceBlockLoop(ctx, bytesProducedC)
})
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.MonitorSequencerRotation(ctx, rotateSequencerC)
})

go func() {
_ = eg.Wait()
// Check if exited due to sequencer rotation signal
select {
case nextSeqAddr := <-rotateSequencerC:
m.handleRotationReq(ctx, nextSeqAddr)
default:
m.logger.Info("Block manager err group finished.")
}
}()
/* -------------------------------------------------------------------------- */
/* loops section */
/* -------------------------------------------------------------------------- */
m.RunLoops(ctx)

return nil
}
Expand All @@ -272,40 +324,43 @@ func (m *Manager) NextHeightToSubmit() uint64 {
return m.LastSubmittedHeight.Load() + 1
}

// syncFromSettlement enforces the node to be synced on initial run from SL and DA.
func (m *Manager) syncFromSettlement() error {
// syncMetadataFromSettlement gets the latest height and sequencer set from the settlement layer.
func (m *Manager) syncMetadataFromSettlement() error {
m.isProposer = m.IsProposer()

err := m.UpdateSequencerSetFromSL()
if err != nil {
return fmt.Errorf("update bonded sequencer set: %w", err)
}

err = m.syncLastCommittedHeight()
if err != nil {
return fmt.Errorf("sync last committed height: %w", err)
}

return nil
}

func (m *Manager) syncLastCommittedHeight() error {
res, err := m.SLClient.GetLatestBatch()
// TODO: separate between fresh rollapp and non-registered rollapp
// The SL hasn't got any batches for this chain yet.
if errors.Is(err, gerrc.ErrNotFound) {
// The SL hasn't got any batches for this chain yet.
m.logger.Info("No batches for chain found in SL.")
m.LastSubmittedHeight.Store(uint64(m.Genesis.InitialHeight - 1))
return nil
}

if err != nil {
// TODO: separate between fresh rollapp and non-registered rollapp
return err
}
m.LastSubmittedHeight.Store(res.EndHeight)
err = m.syncToTargetHeight(res.EndHeight)
m.UpdateTargetHeight(res.EndHeight)
if err != nil {
} else if err != nil {
return err
}

m.logger.Info("Synced.", "current height", m.State.Height(), "last submitted height", m.LastSubmittedHeight.Load())
m.UpdateLastSubmittedHeight(res.EndHeight)
m.UpdateTargetHeight(res.EndHeight)
return nil
}

func (m *Manager) GetProposerPubKey() tmcrypto.PubKey {
return m.State.Sequencers.GetProposerPubKey()
}

// UpdateTargetHeight will update the highest height seen from either P2P or DA.
func (m *Manager) UpdateTargetHeight(h uint64) {
for {
currentHeight := m.TargetHeight.Load()
Expand All @@ -315,6 +370,16 @@ func (m *Manager) UpdateTargetHeight(h uint64) {
}
}

// UpdateLastSubmittedHeight will update last height seen on the settlement layer.
func (m *Manager) UpdateLastSubmittedHeight(h uint64) {
for {
curr := m.LastSubmittedHeight.Load()
if m.LastSubmittedHeight.CompareAndSwap(curr, max(curr, h)) {
break
}
}
}

// ValidateConfigWithRollappParams checks the configuration params are consistent with the params in the dymint state (e.g. DA and version)
func (m *Manager) ValidateConfigWithRollappParams() error {
if version.Commit != m.State.RollappParams.Version {
Expand Down
6 changes: 6 additions & 0 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,12 @@ func TestManager_updateTargetHeight(t *testing.T) {
h: 101,
expTargetHeight: 101,
},
{
name: "same height",
TargetHeight: 100,
h: 100,
expTargetHeight: 100,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
Loading
Loading