From afa9f97ef07b05c88f8c875296fac514dd66ae52 Mon Sep 17 00:00:00 2001 From: Michael Tsitrin <michael@dymension.xyz> Date: Mon, 14 Oct 2024 10:48:27 +0300 Subject: [PATCH 01/13] WIP --- block/manager.go | 179 +++++++++++++++++++++++++++++-------------- block/sequencers.go | 1 + go.mod | 1 - node/node.go | 2 + utils/event/funcs.go | 4 + 5 files changed, 128 insertions(+), 59 deletions(-) diff --git a/block/manager.go b/block/manager.go index 89ee16df0..175301aef 100644 --- a/block/manager.go +++ b/block/manager.go @@ -155,6 +155,100 @@ func NewManager( return m, nil } +// runNonProducerLoops runs the loops that are common to all nodes, but not the proposer. +func (m *Manager) runNonProducerLoops(ctx context.Context) { + // P2P Sync. Subscribe to P2P received blocks events + go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewGossipedBlock, m.onReceivedBlock, m.logger) + go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockSyncBlocksLoop", p2p.EventQueryNewBlockSyncBlock, m.onReceivedBlock, m.logger) + + // FIXME: we used to call this in full node, ONLY after DA sync. isn't this the same as the below?? + // FIXME: probably need to have some mutex and maintaining the last height synced from DA + go uevent.MustSubscribe(ctx, m.Pubsub, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted, m.onNewStateUpdate, m.logger) +} + +func (m *Manager) runProducerLoops(ctx context.Context, eg *errgroup.Group) { + // populate the bytes produced channel + bytesProducedC := make(chan int) + uerrors.ErrGroupGoLog(eg, m.logger, func() error { + return m.SubmitLoop(ctx, bytesProducedC) + }) + uerrors.ErrGroupGoLog(eg, m.logger, func() error { + bytesProducedC <- m.GetUnsubmittedBytes() // load unsubmitted bytes from previous run + return m.ProduceBlockLoop(ctx, bytesProducedC) + }) + + // channel to signal sequencer rotation started + rotateSequencerC := make(chan string, 1) + uerrors.ErrGroupGoLog(eg, m.logger, func() error { + return m.MonitorSequencerRotation(ctx, rotateSequencerC) + }) + + _ = eg.Wait() + // Check if exited due to sequencer rotation signal + select { + case nextSeqAddr := <-rotateSequencerC: + m.handleRotationReq(ctx, nextSeqAddr) + // FIXME: produce roleSwitch event + default: + m.logger.Info("Block manager err group finished.") + } +} + +func (m *Manager) RunLoops(ctx context.Context, roleSwitchC chan bool) error { + ctx, cancel := context.WithCancel(ctx) + eg, ctx := errgroup.WithContext(ctx) + + /* --------------------------------- common --------------------------------- */ + // listen to new bonded sequencers events to add them in the sequencer set + go uevent.MustSubscribe(ctx, m.Pubsub, "newBondedSequencer", settlement.EventQueryNewBondedSequencer, m.UpdateSequencerSet, m.logger) + uerrors.ErrGroupGoLog(eg, m.logger, func() error { + return m.PruningLoop(ctx) + }) + + // run loops initially if needed + if m.Proposer { + m.runProducerLoops() + } else { + m.runNonProducerLoops(ctx) + } + + // listen to role switch trigger + newRoleC := make(chan bool) // channel to receive new role (true for proposer, false for non-producer) + for { + select { + case <-ctx.Done(): + break + case proposer := <-newRoleC: + if proposer == m.Proposer { + //log error + continue + } + m.Proposer = proposer + + // if false->true, start producing + if proposer { + m.runProducerLoops() + } + + // if true->false, stop producing + if !proposer { + cancel() // shutdown all loops + _ = eg.Wait() + // Check if exited due to sequencer rotation signal + select { + case nextSeqAddr := <-rotateSequencerC: + m.handleRotationReq(ctx, nextSeqAddr) + default: + m.logger.Info("Block manager err group finished.") + } + } + + } + _ = eg.Wait() + return nil + } +} + // Start starts the block manager. func (m *Manager) Start(ctx context.Context) error { // Check if InitChain flow is needed @@ -176,14 +270,12 @@ func (m *Manager) Start(ctx context.Context) error { isProposer := m.IsProposer() m.logger.Info("starting block manager", "proposer", isProposer) + /* ---------------------------- common goroutines --------------------------- */ eg, ctx := errgroup.WithContext(ctx) - uerrors.ErrGroupGoLog(eg, m.logger, func() error { - return m.PruningLoop(ctx) - }) - - // listen to new bonded sequencers events to add them in the sequencer set - go uevent.MustSubscribe(ctx, m.Pubsub, "newBondedSequencer", settlement.EventQueryNewBondedSequencer, m.UpdateSequencerSet, m.logger) + /* -------------------------------------------------------------------------- */ + /* sync section */ + /* -------------------------------------------------------------------------- */ /* ----------------------------- full node mode ----------------------------- */ if !isProposer { // Full-nodes can sync from DA but it is not necessary to wait for it, since it can sync from P2P as well in parallel. @@ -195,61 +287,30 @@ func (m *Manager) Start(ctx context.Context) error { // DA Sync. Subscribe to SL next batch events go uevent.MustSubscribe(ctx, m.Pubsub, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted, m.onNewStateUpdate, m.logger) }() + } else { - // P2P Sync. Subscribe to P2P received blocks events - go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewGossipedBlock, m.onReceivedBlock, m.logger) - go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockSyncBlocksLoop", p2p.EventQueryNewBlockSyncBlock, m.onReceivedBlock, m.logger) - return nil - } - - /* ----------------------------- sequencer mode ----------------------------- */ - // Subscribe to batch events, to update last submitted height in case batch confirmation was lost. This could happen if the sequencer crash/restarted just after submitting a batch to the settlement and by the time we query the last batch, this batch wasn't accepted yet. - go uevent.MustSubscribe(ctx, m.Pubsub, "updateSubmittedHeightLoop", settlement.EventQueryNewSettlementBatchAccepted, m.UpdateLastSubmittedHeight, m.logger) - - // Sequencer must wait till DA is synced to start submitting blobs - <-m.DAClient.Synced() - err = m.syncFromSettlement() - if err != nil { - return fmt.Errorf("sync block manager from settlement: %w", err) - } - // check if sequencer in the middle of rotation - nextSeqAddr, missing, err := m.MissingLastBatch() - if err != nil { - return fmt.Errorf("checking if missing last batch: %w", err) - } - // if sequencer is in the middle of rotation, complete rotation instead of running the main loop - if missing { - m.handleRotationReq(ctx, nextSeqAddr) - return nil - } - - // populate the bytes produced channel - bytesProducedC := make(chan int) - - // channel to signal sequencer rotation started - rotateSequencerC := make(chan string, 1) - - uerrors.ErrGroupGoLog(eg, m.logger, func() error { - return m.SubmitLoop(ctx, bytesProducedC) - }) - uerrors.ErrGroupGoLog(eg, m.logger, func() error { - bytesProducedC <- m.GetUnsubmittedBytes() // load unsubmitted bytes from previous run - return m.ProduceBlockLoop(ctx, bytesProducedC) - }) - uerrors.ErrGroupGoLog(eg, m.logger, func() error { - return m.MonitorSequencerRotation(ctx, rotateSequencerC) - }) - - go func() { - _ = eg.Wait() - // Check if exited due to sequencer rotation signal - select { - case nextSeqAddr := <-rotateSequencerC: + /* ----------------------------- sequencer mode ----------------------------- */ + // Sequencer must wait till DA is synced to start submitting blobs + <-m.DAClient.Synced() + err = m.syncFromSettlement() + if err != nil { + return fmt.Errorf("sync block manager from settlement: %w", err) + } + // check if sequencer in the middle of rotation + nextSeqAddr, missing, err := m.MissingLastBatch() + if err != nil { + return fmt.Errorf("checking if missing last batch: %w", err) + } + // if sequencer is in the middle of rotation, complete rotation instead of running the main loop + if missing { m.handleRotationReq(ctx, nextSeqAddr) - default: - m.logger.Info("Block manager err group finished.") + return nil } - }() + } + + /* -------------------------------------------------------------------------- */ + /* loops section */ + /* -------------------------------------------------------------------------- */ return nil } @@ -340,6 +401,8 @@ func (m *Manager) setDA(daconfig string, dalcKV store.KV, logger log.Logger) err return fmt.Errorf("get data availability client named '%s'", daLayer) } + // FIXME: have each client expected config struct. try to unmarshal + err := dalc.Init([]byte(daconfig), m.Pubsub, dalcKV, logger.With("module", string(dalc.GetClientType()))) if err != nil { return fmt.Errorf("data availability layer client initialization: %w", err) diff --git a/block/sequencers.go b/block/sequencers.go index 6c72d4d22..9add5c98d 100644 --- a/block/sequencers.go +++ b/block/sequencers.go @@ -73,6 +73,7 @@ func (m *Manager) IsProposer() bool { return bytes.Equal(l2Proposer, localProposerKey) || bytes.Equal(expectedHubProposer, localProposerKey) } +// FIXME: rename // MissingLastBatch checks if the sequencer is in the middle of rotation (I'm the proposer, but needs to complete rotation) // returns the next sequencer address and a flag if the sequencer is the old proposer and the hub waits for the last batch func (m *Manager) MissingLastBatch() (string, bool, error) { diff --git a/go.mod b/go.mod index c342ffd4b..a082aff5a 100644 --- a/go.mod +++ b/go.mod @@ -297,7 +297,6 @@ require ( replace ( github.com/centrifuge/go-substrate-rpc-client/v4 => github.com/availproject/go-substrate-rpc-client/v4 v4.0.12-avail-1.4.0-rc1-5e286e3 - github.com/dymensionxyz/dymension-rdk => github.com/dymensionxyz/dymension-rdk v1.6.1-0.20240827102903-08636e7ab3f8 github.com/evmos/evmos/v12 => github.com/dymensionxyz/evmos/v12 v12.1.6-dymension-v0.3 github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.2-alpha.regen.4 github.com/gorilla/rpc => github.com/dymensionxyz/rpc v1.3.1 diff --git a/node/node.go b/node/node.go index 1dea5dcf0..8824ef472 100644 --- a/node/node.go +++ b/node/node.go @@ -267,6 +267,8 @@ func (n *Node) OnStop() { n.Logger.Error("close store", "error", err) } + // FIXME: stop the pubsub????? + n.cancel() } diff --git a/utils/event/funcs.go b/utils/event/funcs.go index 8b76b7ce0..af634a6a3 100644 --- a/utils/event/funcs.go +++ b/utils/event/funcs.go @@ -33,8 +33,12 @@ func MustSubscribe( return } + // FIXME: validate it's still be unsubscribed if the context is cancelled + defer pubsubServer.UnsubscribeAll(ctx, clientID) //nolint:errcheck + for { select { + // FIXME: maybe this needs to be removed? as the ctx used for subscribing. case <-ctx.Done(): return case event := <-subscription.Out(): From 8f6ae0578f4ed8736571306a3ea858278a0ec2bb Mon Sep 17 00:00:00 2001 From: Michael Tsitrin <michael@dymension.xyz> Date: Mon, 21 Oct 2024 12:02:47 +0300 Subject: [PATCH 02/13] finsihd loops refactor --- block/block.go | 6 +-- block/manager.go | 107 +++++++++++++++++++++---------------------- block/pruning.go | 6 +-- block/retriever.go | 3 ++ block/sequencers.go | 8 +--- utils/event/funcs.go | 1 + 6 files changed, 63 insertions(+), 68 deletions(-) diff --git a/block/block.go b/block/block.go index ecd544146..c092f09d0 100644 --- a/block/block.go +++ b/block/block.go @@ -110,10 +110,10 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta m.blockCache.Delete(block.Header.Height) - if switchRole { - // TODO: graceful role change (https://github.com/dymensionxyz/dymint/issues/1008) + // TODO: graceful role change (https://github.com/dymensionxyz/dymint/issues/1008) + if switchRole && m.isProposer { + m.roleSwitchC <- true m.logger.Info("Node changing to proposer role") - panic("sequencer is no longer the proposer") } // validate whether configuration params and rollapp consensus params keep in line, after rollapp params are updated from the responses received in the block execution diff --git a/block/manager.go b/block/manager.go index 175301aef..27da98e8d 100644 --- a/block/manager.go +++ b/block/manager.go @@ -54,6 +54,8 @@ type Manager struct { DAClient da.DataAvailabilityLayerClient SLClient settlement.ClientI + isProposer bool // is the local node the proposer + roleSwitchC chan bool // channel to receive role switch signal /* Submission */ @@ -134,6 +136,7 @@ func NewManager( cache: make(map[uint64]types.CachedBlock), }, pruningC: make(chan int64, 10), // use of buffered channel to avoid blocking applyBlock thread. In case channel is full, pruning will be skipped, but the retain height can be pruned in the next iteration. + // todo: make roleSwitchC buffered } err = m.LoadStateOnInit(store, genesis, logger) @@ -156,17 +159,20 @@ func NewManager( } // runNonProducerLoops runs the loops that are common to all nodes, but not the proposer. +// This includes syncing from the DA and SL, and listening to new blocks from P2P. +// when ctx is cancelled, all loops will be unsubscribed. func (m *Manager) runNonProducerLoops(ctx context.Context) { // P2P Sync. Subscribe to P2P received blocks events go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewGossipedBlock, m.onReceivedBlock, m.logger) go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockSyncBlocksLoop", p2p.EventQueryNewBlockSyncBlock, m.onReceivedBlock, m.logger) - - // FIXME: we used to call this in full node, ONLY after DA sync. isn't this the same as the below?? - // FIXME: probably need to have some mutex and maintaining the last height synced from DA + // SL Sync. Subscribe to SL state update events go uevent.MustSubscribe(ctx, m.Pubsub, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted, m.onNewStateUpdate, m.logger) + } -func (m *Manager) runProducerLoops(ctx context.Context, eg *errgroup.Group) { +func (m *Manager) runProducerLoops(ctx context.Context) { + eg, ctx := errgroup.WithContext(ctx) + // populate the bytes produced channel bytesProducedC := make(chan int) uerrors.ErrGroupGoLog(eg, m.logger, func() error { @@ -188,67 +194,56 @@ func (m *Manager) runProducerLoops(ctx context.Context, eg *errgroup.Group) { select { case nextSeqAddr := <-rotateSequencerC: m.handleRotationReq(ctx, nextSeqAddr) - // FIXME: produce roleSwitch event + m.roleSwitchC <- false default: - m.logger.Info("Block manager err group finished.") + m.logger.Info("producer err group finished.") } } -func (m *Manager) RunLoops(ctx context.Context, roleSwitchC chan bool) error { - ctx, cancel := context.WithCancel(ctx) - eg, ctx := errgroup.WithContext(ctx) - +func (m *Manager) RunLoops(ctx context.Context) error { /* --------------------------------- common --------------------------------- */ // listen to new bonded sequencers events to add them in the sequencer set go uevent.MustSubscribe(ctx, m.Pubsub, "newBondedSequencer", settlement.EventQueryNewBondedSequencer, m.UpdateSequencerSet, m.logger) - uerrors.ErrGroupGoLog(eg, m.logger, func() error { - return m.PruningLoop(ctx) - }) + // run pruning loop + go m.PruningLoop(ctx) - // run loops initially if needed - if m.Proposer { - m.runProducerLoops() - } else { - m.runNonProducerLoops(ctx) - } + // run loops initially, by role + cancel := m.runLoopsWithCancelFunc(ctx) // listen to role switch trigger - newRoleC := make(chan bool) // channel to receive new role (true for proposer, false for non-producer) for { select { + // ctx cancelled, shutdown case <-ctx.Done(): - break - case proposer := <-newRoleC: - if proposer == m.Proposer { - //log error + cancel() + return nil + case proposer := <-m.roleSwitchC: + if proposer == m.isProposer { + m.logger.Error("Role switch signal received, but already in the same role", "proposer", proposer) continue } - m.Proposer = proposer - - // if false->true, start producing - if proposer { - m.runProducerLoops() - } - - // if true->false, stop producing - if !proposer { - cancel() // shutdown all loops - _ = eg.Wait() - // Check if exited due to sequencer rotation signal - select { - case nextSeqAddr := <-rotateSequencerC: - m.handleRotationReq(ctx, nextSeqAddr) - default: - m.logger.Info("Block manager err group finished.") - } - } - + m.isProposer = proposer + cancel() // shutdown all active loops + // FIXME: need to wait? + //(producer -> non-producer: guaranteed to be stopped) + //(non-producer -> producer: need to wait for all non-producer loops to stop) + // _ = eg.Wait() + + cancel = m.runLoopsWithCancelFunc(ctx) } - _ = eg.Wait() - return nil } } +func (m *Manager) runLoopsWithCancelFunc(ctx context.Context) context.CancelFunc { + loopCtx, cancel := context.WithCancel(ctx) + if m.isProposer { + go m.runProducerLoops(loopCtx) + } else { + m.runNonProducerLoops(loopCtx) + } + return cancel +} + // Start starts the block manager. func (m *Manager) Start(ctx context.Context) error { // Check if InitChain flow is needed @@ -267,17 +262,14 @@ func (m *Manager) Start(ctx context.Context) error { return err } - isProposer := m.IsProposer() - m.logger.Info("starting block manager", "proposer", isProposer) - - /* ---------------------------- common goroutines --------------------------- */ - eg, ctx := errgroup.WithContext(ctx) + m.isProposer = m.IsProposer() + m.logger.Info("starting block manager", "proposer", m.isProposer) /* -------------------------------------------------------------------------- */ /* sync section */ /* -------------------------------------------------------------------------- */ - /* ----------------------------- full node mode ----------------------------- */ - if !isProposer { + if !m.isProposer { + /* ----------------------------- full node mode ----------------------------- */ // Full-nodes can sync from DA but it is not necessary to wait for it, since it can sync from P2P as well in parallel. go func() { err := m.syncFromSettlement() @@ -288,7 +280,6 @@ func (m *Manager) Start(ctx context.Context) error { go uevent.MustSubscribe(ctx, m.Pubsub, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted, m.onNewStateUpdate, m.logger) }() } else { - /* ----------------------------- sequencer mode ----------------------------- */ // Sequencer must wait till DA is synced to start submitting blobs <-m.DAClient.Synced() @@ -304,7 +295,8 @@ func (m *Manager) Start(ctx context.Context) error { // if sequencer is in the middle of rotation, complete rotation instead of running the main loop if missing { m.handleRotationReq(ctx, nextSeqAddr) - return nil + m.isProposer = false + m.logger.Info("Sequencer is no longer the proposer") } } @@ -312,6 +304,11 @@ func (m *Manager) Start(ctx context.Context) error { /* loops section */ /* -------------------------------------------------------------------------- */ + err = m.RunLoops(ctx) + if err != nil { + return fmt.Errorf("run loops: %w", err) + } + return nil } diff --git a/block/pruning.go b/block/pruning.go index d4ab18cc8..74df8066f 100644 --- a/block/pruning.go +++ b/block/pruning.go @@ -42,17 +42,17 @@ func (m *Manager) PruneBlocks(retainHeight uint64) (uint64, error) { return pruned, nil } -func (m *Manager) PruningLoop(ctx context.Context) error { +// TODO: no need to return error here, just log it +func (m *Manager) PruningLoop(ctx context.Context) { for { select { case <-ctx.Done(): - return ctx.Err() + return case retainHeight := <-m.pruningC: _, err := m.PruneBlocks(uint64(retainHeight)) if err != nil { m.logger.Error("pruning blocks", "retainHeight", retainHeight, "err", err) } - } } } diff --git a/block/retriever.go b/block/retriever.go index 96c288c2a..57a1ce3c8 100644 --- a/block/retriever.go +++ b/block/retriever.go @@ -22,6 +22,9 @@ func (m *Manager) onNewStateUpdate(event pubsub.Message) { m.logger.Error("onReceivedBatch", "err", "wrong event data received") return } + + // FIXME: probably need to have some mutex over syncing with DA proccess + h := eventData.EndHeight m.UpdateTargetHeight(h) err := m.syncToTargetHeight(h) diff --git a/block/sequencers.go b/block/sequencers.go index 9add5c98d..121572280 100644 --- a/block/sequencers.go +++ b/block/sequencers.go @@ -45,9 +45,7 @@ func (m *Manager) MonitorSequencerRotation(ctx context.Context, rotateC chan str } // we get here once a sequencer rotation signal is received m.logger.Info("Sequencer rotation started.", "next_seq", nextSeqAddr) - go func() { - rotateC <- nextSeqAddr - }() + rotateC <- nextSeqAddr return fmt.Errorf("sequencer rotation started. signal to stop production") } @@ -100,10 +98,6 @@ func (m *Manager) handleRotationReq(ctx context.Context, nextSeqAddr string) { if err != nil { panic(err) } - - // TODO: graceful fallback to full node (https://github.com/dymensionxyz/dymint/issues/1008) - m.logger.Info("Sequencer is no longer the proposer") - panic("sequencer is no longer the proposer") } // CompleteRotation completes the sequencer rotation flow diff --git a/utils/event/funcs.go b/utils/event/funcs.go index af634a6a3..00b7bd281 100644 --- a/utils/event/funcs.go +++ b/utils/event/funcs.go @@ -34,6 +34,7 @@ func MustSubscribe( } // FIXME: validate it's still be unsubscribed if the context is cancelled + // FIXME: another alternative is to just have no-op in the event callbacks if the node is not the proposer defer pubsubServer.UnsubscribeAll(ctx, clientID) //nolint:errcheck for { From 031f33db20e52f414a482485656a57bb4a04ee88 Mon Sep 17 00:00:00 2001 From: Michael Tsitrin <michael@dymension.xyz> Date: Mon, 21 Oct 2024 12:23:58 +0300 Subject: [PATCH 03/13] error handling --- block/block.go | 11 ++++++++--- block/manager.go | 4 ++-- block/state.go | 15 ++++++--------- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/block/block.go b/block/block.go index c092f09d0..b2e13b25d 100644 --- a/block/block.go +++ b/block/block.go @@ -87,7 +87,10 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta } // check if the proposer needs to be changed - switchRole := m.Executor.UpdateProposerFromBlock(m.State, block) + switchRole, err := m.Executor.UpdateProposerFromBlock(m.State, block) + if err != nil { + return fmt.Errorf("update proposer from block: %w", err) + } // save sequencers to store to be queried over RPC batch := m.Store.NewBatch() @@ -110,12 +113,14 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta m.blockCache.Delete(block.Header.Height) - // TODO: graceful role change (https://github.com/dymensionxyz/dymint/issues/1008) - if switchRole && m.isProposer { + // signal the role switch, in case where this node is the new proposer + // the other direction is handled elsewhere + if switchRole && !m.isProposer { m.roleSwitchC <- true m.logger.Info("Node changing to proposer role") } + // FIXME: isn't this supposed to be checked before committing the state? // validate whether configuration params and rollapp consensus params keep in line, after rollapp params are updated from the responses received in the block execution err = m.ValidateConfigWithRollappParams() if err != nil { diff --git a/block/manager.go b/block/manager.go index 27da98e8d..f3c3d47bd 100644 --- a/block/manager.go +++ b/block/manager.go @@ -135,8 +135,8 @@ func NewManager( blockCache: &Cache{ cache: make(map[uint64]types.CachedBlock), }, - pruningC: make(chan int64, 10), // use of buffered channel to avoid blocking applyBlock thread. In case channel is full, pruning will be skipped, but the retain height can be pruned in the next iteration. - // todo: make roleSwitchC buffered + pruningC: make(chan int64, 10), // use of buffered channel to avoid blocking applyBlock thread. In case channel is full, pruning will be skipped, but the retain height can be pruned in the next iteration. + roleSwitchC: make(chan bool, 1), // channel to be used to signal role switch } err = m.LoadStateOnInit(store, genesis, logger) diff --git a/block/state.go b/block/state.go index f003f525c..82e561b33 100644 --- a/block/state.go +++ b/block/state.go @@ -137,12 +137,10 @@ func (e *Executor) UpdateStateAfterCommit(s *types.State, resp *tmstate.ABCIResp // UpdateProposerFromBlock updates the proposer from the block // The next proposer is defined in the block header (NextSequencersHash) // In case of a node that a becomes the proposer, we return true to mark the role change -// currently the node will rebooted to apply the new role -// TODO: (https://github.com/dymensionxyz/dymint/issues/1008) -func (e *Executor) UpdateProposerFromBlock(s *types.State, block *types.Block) bool { +func (e *Executor) UpdateProposerFromBlock(s *types.State, block *types.Block) (bool, error) { // no sequencer change if bytes.Equal(block.Header.SequencerHash[:], block.Header.NextSequencersHash[:]) { - return false + return false, nil } if block.Header.NextSequencersHash == [32]byte{} { @@ -150,20 +148,19 @@ func (e *Executor) UpdateProposerFromBlock(s *types.State, block *types.Block) b // TODO: recover from halt (https://github.com/dymensionxyz/dymint/issues/1021) e.logger.Info("rollapp left with no proposer. chain is halted") s.Sequencers.SetProposer(nil) - return false + return false, nil } // if hash changed, update the active sequencer err := s.Sequencers.SetProposerByHash(block.Header.NextSequencersHash[:]) if err != nil { - e.logger.Error("update new proposer", "err", err) - panic(fmt.Sprintf("failed to update new proposer: %v", err)) + return false, err } localSeq := s.Sequencers.GetByConsAddress(e.localAddress) if localSeq != nil && bytes.Equal(localSeq.Hash(), block.Header.NextSequencersHash[:]) { - return true + return true, nil } - return false + return false, nil } From 166b21d2e0b5e7c748555a722e27d84110549b4f Mon Sep 17 00:00:00 2001 From: Michael Tsitrin <michael@dymension.xyz> Date: Mon, 21 Oct 2024 12:45:55 +0300 Subject: [PATCH 04/13] cleanup --- block/block.go | 2 +- block/manager.go | 9 ++------- block/pruning.go | 1 - block/sequencers.go | 1 + node/node.go | 5 ++++- utils/event/funcs.go | 5 +---- 6 files changed, 9 insertions(+), 14 deletions(-) diff --git a/block/block.go b/block/block.go index b2e13b25d..941ca09df 100644 --- a/block/block.go +++ b/block/block.go @@ -72,8 +72,8 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta // Prune old heights, if requested by ABCI app. // retainHeight is determined by currentHeight - min-retain-blocks (app.toml config). // Unless max_age_num_blocks in consensus params is higher than min-retain-block, then max_age_num_blocks will be used instead of min-retain-blocks. - if 0 < retainHeight { + // TODO: can be called in intervals rather than every block (https://github.com/dymensionxyz/dymint/issues/334) select { case m.pruningC <- retainHeight: default: diff --git a/block/manager.go b/block/manager.go index f3c3d47bd..9d83320b6 100644 --- a/block/manager.go +++ b/block/manager.go @@ -160,7 +160,6 @@ func NewManager( // runNonProducerLoops runs the loops that are common to all nodes, but not the proposer. // This includes syncing from the DA and SL, and listening to new blocks from P2P. -// when ctx is cancelled, all loops will be unsubscribed. func (m *Manager) runNonProducerLoops(ctx context.Context) { // P2P Sync. Subscribe to P2P received blocks events go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewGossipedBlock, m.onReceivedBlock, m.logger) @@ -207,7 +206,7 @@ func (m *Manager) RunLoops(ctx context.Context) error { // run pruning loop go m.PruningLoop(ctx) - // run loops initially, by role + // run loops initially (producer or non-producer) cancel := m.runLoopsWithCancelFunc(ctx) // listen to role switch trigger @@ -224,11 +223,7 @@ func (m *Manager) RunLoops(ctx context.Context) error { } m.isProposer = proposer cancel() // shutdown all active loops - // FIXME: need to wait? - //(producer -> non-producer: guaranteed to be stopped) - //(non-producer -> producer: need to wait for all non-producer loops to stop) - // _ = eg.Wait() - + // run loops again with new role cancel = m.runLoopsWithCancelFunc(ctx) } } diff --git a/block/pruning.go b/block/pruning.go index 74df8066f..5f3b25d2c 100644 --- a/block/pruning.go +++ b/block/pruning.go @@ -42,7 +42,6 @@ func (m *Manager) PruneBlocks(retainHeight uint64) (uint64, error) { return pruned, nil } -// TODO: no need to return error here, just log it func (m *Manager) PruningLoop(ctx context.Context) { for { select { diff --git a/block/sequencers.go b/block/sequencers.go index 121572280..9b4a9fa61 100644 --- a/block/sequencers.go +++ b/block/sequencers.go @@ -96,6 +96,7 @@ func (m *Manager) handleRotationReq(ctx context.Context, nextSeqAddr string) { m.logger.Info("Sequencer rotation started. Production stopped on this sequencer", "nextSeqAddr", nextSeqAddr) err := m.CompleteRotation(ctx, nextSeqAddr) if err != nil { + m.logger.Error("Complete rotation", "err", err) panic(err) } } diff --git a/node/node.go b/node/node.go index 8824ef472..1d2a3f985 100644 --- a/node/node.go +++ b/node/node.go @@ -267,7 +267,10 @@ func (n *Node) OnStop() { n.Logger.Error("close store", "error", err) } - // FIXME: stop the pubsub????? + err = n.PubsubServer.Stop() + if err != nil { + n.Logger.Error("stop pubsub server", "error", err) + } n.cancel() } diff --git a/utils/event/funcs.go b/utils/event/funcs.go index 00b7bd281..57b546f3b 100644 --- a/utils/event/funcs.go +++ b/utils/event/funcs.go @@ -33,13 +33,10 @@ func MustSubscribe( return } - // FIXME: validate it's still be unsubscribed if the context is cancelled - // FIXME: another alternative is to just have no-op in the event callbacks if the node is not the proposer - defer pubsubServer.UnsubscribeAll(ctx, clientID) //nolint:errcheck + defer pubsubServer.UnsubscribeAll(context.Background(), clientID) //nolint:errcheck for { select { - // FIXME: maybe this needs to be removed? as the ctx used for subscribing. case <-ctx.Done(): return case event := <-subscription.Out(): From 8c2c934f108baa3d7540f932eb9684e700e350a7 Mon Sep 17 00:00:00 2001 From: Michael Tsitrin <michael@dymension.xyz> Date: Mon, 21 Oct 2024 13:37:22 +0300 Subject: [PATCH 05/13] update LastSubmitted height where needed --- block/manager.go | 62 ++++++++++++++++++++++++++-------------------- block/retriever.go | 4 +-- block/submit.go | 20 --------------- 3 files changed, 37 insertions(+), 49 deletions(-) diff --git a/block/manager.go b/block/manager.go index 9d83320b6..42efc2939 100644 --- a/block/manager.go +++ b/block/manager.go @@ -199,7 +199,7 @@ func (m *Manager) runProducerLoops(ctx context.Context) { } } -func (m *Manager) RunLoops(ctx context.Context) error { +func (m *Manager) RunLoops(ctx context.Context) { /* --------------------------------- common --------------------------------- */ // listen to new bonded sequencers events to add them in the sequencer set go uevent.MustSubscribe(ctx, m.Pubsub, "newBondedSequencer", settlement.EventQueryNewBondedSequencer, m.UpdateSequencerSet, m.logger) @@ -210,23 +210,26 @@ func (m *Manager) RunLoops(ctx context.Context) error { cancel := m.runLoopsWithCancelFunc(ctx) // listen to role switch trigger - for { - select { - // ctx cancelled, shutdown - case <-ctx.Done(): - cancel() - return nil - case proposer := <-m.roleSwitchC: - if proposer == m.isProposer { - m.logger.Error("Role switch signal received, but already in the same role", "proposer", proposer) - continue + go func() { + for { + select { + // ctx cancelled, shutdown + case <-ctx.Done(): + cancel() + return + case proposer := <-m.roleSwitchC: + if proposer == m.isProposer { + m.logger.Error("Role switch signal received, but already in the same role", "proposer", proposer) + continue + } + m.isProposer = proposer + + // shutdown all active loops and run loops again with new role + cancel() + cancel = m.runLoopsWithCancelFunc(ctx) } - m.isProposer = proposer - cancel() // shutdown all active loops - // run loops again with new role - cancel = m.runLoopsWithCancelFunc(ctx) } - } + }() } func (m *Manager) runLoopsWithCancelFunc(ctx context.Context) context.CancelFunc { @@ -271,8 +274,6 @@ func (m *Manager) Start(ctx context.Context) error { if err != nil { m.logger.Error("sync block manager from settlement", "err", err) } - // DA Sync. Subscribe to SL next batch events - go uevent.MustSubscribe(ctx, m.Pubsub, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted, m.onNewStateUpdate, m.logger) }() } else { /* ----------------------------- sequencer mode ----------------------------- */ @@ -298,11 +299,7 @@ func (m *Manager) Start(ctx context.Context) error { /* -------------------------------------------------------------------------- */ /* loops section */ /* -------------------------------------------------------------------------- */ - - err = m.RunLoops(ctx) - if err != nil { - return fmt.Errorf("run loops: %w", err) - } + m.RunLoops(ctx) return nil } @@ -344,9 +341,11 @@ func (m *Manager) syncFromSettlement() error { // TODO: separate between fresh rollapp and non-registered rollapp return err } - m.LastSubmittedHeight.Store(res.EndHeight) - err = m.syncToTargetHeight(res.EndHeight) + + m.UpdateLastSubmittedHeight(res.EndHeight) m.UpdateTargetHeight(res.EndHeight) + + err = m.syncToTargetHeight(res.EndHeight) if err != nil { return err } @@ -359,6 +358,7 @@ func (m *Manager) GetProposerPubKey() tmcrypto.PubKey { return m.State.Sequencers.GetProposerPubKey() } +// UpdateTargetHeight will update the highest height seen from either P2P or DA. func (m *Manager) UpdateTargetHeight(h uint64) { for { currentHeight := m.TargetHeight.Load() @@ -368,6 +368,16 @@ func (m *Manager) UpdateTargetHeight(h uint64) { } } +// UpdateLastSubmittedHeight will update last height seen on the settlement layer. +func (m *Manager) UpdateLastSubmittedHeight(h uint64) { + for { + curr := m.LastSubmittedHeight.Load() + if m.LastSubmittedHeight.CompareAndSwap(curr, max(curr, h)) { + break + } + } +} + // ValidateConfigWithRollappParams checks the configuration params are consistent with the params in the dymint state (e.g. DA and version) func (m *Manager) ValidateConfigWithRollappParams() error { if version.Commit != m.State.RollappParams.Version { @@ -393,8 +403,6 @@ func (m *Manager) setDA(daconfig string, dalcKV store.KV, logger log.Logger) err return fmt.Errorf("get data availability client named '%s'", daLayer) } - // FIXME: have each client expected config struct. try to unmarshal - err := dalc.Init([]byte(daconfig), m.Pubsub, dalcKV, logger.With("module", string(dalc.GetClientType()))) if err != nil { return fmt.Errorf("data availability layer client initialization: %w", err) diff --git a/block/retriever.go b/block/retriever.go index 57a1ce3c8..4c6a4bc16 100644 --- a/block/retriever.go +++ b/block/retriever.go @@ -23,10 +23,10 @@ func (m *Manager) onNewStateUpdate(event pubsub.Message) { return } - // FIXME: probably need to have some mutex over syncing with DA proccess - h := eventData.EndHeight m.UpdateTargetHeight(h) + m.UpdateLastSubmittedHeight(h) + err := m.syncToTargetHeight(h) if err != nil { m.logger.Error("sync until target", "err", err) diff --git a/block/submit.go b/block/submit.go index a330b2597..6549eaed4 100644 --- a/block/submit.go +++ b/block/submit.go @@ -8,11 +8,9 @@ import ( "time" "github.com/dymensionxyz/gerr-cosmos/gerrc" - "github.com/tendermint/tendermint/libs/pubsub" "golang.org/x/sync/errgroup" "github.com/dymensionxyz/dymint/da" - "github.com/dymensionxyz/dymint/settlement" "github.com/dymensionxyz/dymint/types" uatomic "github.com/dymensionxyz/dymint/utils/atomic" uchannel "github.com/dymensionxyz/dymint/utils/channel" @@ -272,21 +270,3 @@ func (m *Manager) GetUnsubmittedBytes() int { func (m *Manager) GetUnsubmittedBlocks() uint64 { return m.State.Height() - m.LastSubmittedHeight.Load() } - -// UpdateLastSubmittedHeight will update last height submitted height upon events. -// This may be necessary in case we crashed/restarted before getting response for our submission to the settlement layer. -func (m *Manager) UpdateLastSubmittedHeight(event pubsub.Message) { - eventData, ok := event.Data().(*settlement.EventDataNewBatchAccepted) - if !ok { - m.logger.Error("onReceivedBatch", "err", "wrong event data received") - return - } - h := eventData.EndHeight - - for { - curr := m.LastSubmittedHeight.Load() - if m.LastSubmittedHeight.CompareAndSwap(curr, max(curr, h)) { - break - } - } -} From 339db349da618c1ac13250c2912363505596b221 Mon Sep 17 00:00:00 2001 From: Michael Tsitrin <michael@dymension.xyz> Date: Mon, 21 Oct 2024 13:55:24 +0300 Subject: [PATCH 06/13] removed TargetHeight as it was unused --- block/block_cache.go | 11 ++++++++++- block/manager.go | 14 -------------- block/manager_test.go | 35 +++++++++++++++++------------------ block/p2p.go | 5 ++++- block/retriever.go | 1 - rpc/client/client.go | 4 +++- 6 files changed, 34 insertions(+), 36 deletions(-) diff --git a/block/block_cache.go b/block/block_cache.go index b224f69fc..020c805bf 100644 --- a/block/block_cache.go +++ b/block/block_cache.go @@ -6,12 +6,17 @@ import ( type Cache struct { // concurrency managed by Manager.retrieverMu mutex - cache map[uint64]types.CachedBlock + cache map[uint64]types.CachedBlock + lastSeenHeight uint64 } func (m *Cache) Add(h uint64, b *types.Block, c *types.Commit, source types.BlockSource) { m.cache[h] = types.CachedBlock{Block: b, Commit: c, Source: source} types.BlockCacheSizeGauge.Set(float64(m.Size())) + + if h > m.lastSeenHeight { + m.lastSeenHeight = h + } } func (m *Cache) Delete(h uint64) { @@ -32,3 +37,7 @@ func (m *Cache) Has(h uint64) bool { func (m *Cache) Size() int { return len(m.cache) } + +func (m *Cache) LastSeenHeight() uint64 { + return m.lastSeenHeight +} diff --git a/block/manager.go b/block/manager.go index 42efc2939..c38604599 100644 --- a/block/manager.go +++ b/block/manager.go @@ -78,9 +78,6 @@ type Manager struct { // we can only do full validation in sequential order. blockCache *Cache - // TargetHeight holds the value of the current highest block seen from either p2p (probably higher) or the DA - TargetHeight atomic.Uint64 - // channel used to send the retain height to the pruning background loop pruningC chan int64 @@ -343,7 +340,6 @@ func (m *Manager) syncFromSettlement() error { } m.UpdateLastSubmittedHeight(res.EndHeight) - m.UpdateTargetHeight(res.EndHeight) err = m.syncToTargetHeight(res.EndHeight) if err != nil { @@ -358,16 +354,6 @@ func (m *Manager) GetProposerPubKey() tmcrypto.PubKey { return m.State.Sequencers.GetProposerPubKey() } -// UpdateTargetHeight will update the highest height seen from either P2P or DA. -func (m *Manager) UpdateTargetHeight(h uint64) { - for { - currentHeight := m.TargetHeight.Load() - if m.TargetHeight.CompareAndSwap(currentHeight, max(currentHeight, h)) { - break - } - } -} - // UpdateLastSubmittedHeight will update last height seen on the settlement layer. func (m *Manager) UpdateLastSubmittedHeight(h uint64) { for { diff --git a/block/manager_test.go b/block/manager_test.go index 287757189..85fa6d91c 100644 --- a/block/manager_test.go +++ b/block/manager_test.go @@ -542,34 +542,33 @@ func TestDAFetch(t *testing.T) { }) } } - -func TestManager_updateTargetHeight(t *testing.T) { +func TestManager_updateLastSubmittedHeight(t *testing.T) { tests := []struct { - name string - TargetHeight uint64 - h uint64 - expTargetHeight uint64 + name string + LastSubmittedHeight uint64 + h uint64 + expLastSubmittedHeight uint64 }{ { - name: "no update target height", - TargetHeight: 100, - h: 99, - expTargetHeight: 100, + name: "no update last submitted height", + LastSubmittedHeight: 100, + h: 99, + expLastSubmittedHeight: 100, }, { - name: "update target height", - TargetHeight: 100, - h: 101, - expTargetHeight: 101, + name: "update last submitted height", + LastSubmittedHeight: 100, + h: 101, + expLastSubmittedHeight: 101, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { m := &block.Manager{ - TargetHeight: atomic.Uint64{}, + LastSubmittedHeight: atomic.Uint64{}, } - m.TargetHeight.Store(tt.TargetHeight) - m.UpdateTargetHeight(tt.h) - assert.Equal(t, tt.expTargetHeight, m.TargetHeight.Load()) + m.LastSubmittedHeight.Store(tt.LastSubmittedHeight) + m.UpdateLastSubmittedHeight(tt.h) + assert.Equal(t, tt.expLastSubmittedHeight, m.LastSubmittedHeight.Load()) }) } } diff --git a/block/p2p.go b/block/p2p.go index 1fc904bf9..1825d336e 100644 --- a/block/p2p.go +++ b/block/p2p.go @@ -46,7 +46,6 @@ func (m *Manager) onReceivedBlock(event pubsub.Message) { return } - m.UpdateTargetHeight(height) types.LastReceivedP2PHeightGauge.Set(float64(height)) m.logger.Debug("Received new block from p2p.", "block height", height, "source", source.String(), "store height", m.State.Height(), "n cachedBlocks", m.blockCache.Size()) @@ -94,3 +93,7 @@ func (m *Manager) saveP2PBlockToBlockSync(block *types.Block, commit *types.Comm } return nil } + +func (m *Manager) GetLatestP2PHeight() uint64 { + return m.blockCache.lastSeenHeight +} diff --git a/block/retriever.go b/block/retriever.go index 4c6a4bc16..b05da59c1 100644 --- a/block/retriever.go +++ b/block/retriever.go @@ -24,7 +24,6 @@ func (m *Manager) onNewStateUpdate(event pubsub.Message) { } h := eventData.EndHeight - m.UpdateTargetHeight(h) m.UpdateLastSubmittedHeight(h) err := m.syncToTargetHeight(h) diff --git a/rpc/client/client.go b/rpc/client/client.go index 6fa48a6bb..eb74f839c 100644 --- a/rpc/client/client.go +++ b/rpc/client/client.go @@ -737,6 +737,8 @@ func (c *Client) Status(ctx context.Context) (*ctypes.ResultStatus, error) { id, addr, network := c.node.P2P.Info() txIndexerStatus := "on" + lastSeenHeight := max(c.node.BlockManager.LastSubmittedHeight.Load(), c.node.BlockManager.GetLatestP2PHeight()) + result := &ctypes.ResultStatus{ // TODO(ItzhakBokris): update NodeInfo fields NodeInfo: p2p.DefaultNodeInfo{ @@ -758,7 +760,7 @@ func (c *Client) Status(ctx context.Context) (*ctypes.ResultStatus, error) { LatestBlockHeight: int64(latestHeight), LatestBlockTime: time.Unix(0, int64(latestBlockTimeNano)), // CatchingUp is true if the node is not at the latest height received from p2p or da. - CatchingUp: c.node.BlockManager.TargetHeight.Load() > latestHeight, + CatchingUp: lastSeenHeight > latestHeight, // TODO(tzdybal): add missing fields // EarliestBlockHash: earliestBlockHash, // EarliestAppHash: earliestAppHash, From 54250997f508f61fa53cb2b7d127e3c2c19feb75 Mon Sep 17 00:00:00 2001 From: Michael Tsitrin <michael@dymension.xyz> Date: Mon, 21 Oct 2024 13:55:59 +0300 Subject: [PATCH 07/13] linter --- block/manager.go | 1 - block/manager_test.go | 1 + block/pruning_test.go | 1 - indexers/blockindexer/kv/kv_test.go | 2 -- indexers/txindex/indexer_service_test.go | 1 - indexers/txindex/kv/kv_test.go | 3 +-- types/pb/dymint/state.pb.go | 2 +- 7 files changed, 3 insertions(+), 8 deletions(-) diff --git a/block/manager.go b/block/manager.go index c38604599..59ab3fb28 100644 --- a/block/manager.go +++ b/block/manager.go @@ -163,7 +163,6 @@ func (m *Manager) runNonProducerLoops(ctx context.Context) { go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockSyncBlocksLoop", p2p.EventQueryNewBlockSyncBlock, m.onReceivedBlock, m.logger) // SL Sync. Subscribe to SL state update events go uevent.MustSubscribe(ctx, m.Pubsub, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted, m.onNewStateUpdate, m.logger) - } func (m *Manager) runProducerLoops(ctx context.Context) { diff --git a/block/manager_test.go b/block/manager_test.go index 85fa6d91c..0d91f10a8 100644 --- a/block/manager_test.go +++ b/block/manager_test.go @@ -542,6 +542,7 @@ func TestDAFetch(t *testing.T) { }) } } + func TestManager_updateLastSubmittedHeight(t *testing.T) { tests := []struct { name string diff --git a/block/pruning_test.go b/block/pruning_test.go index e008634a0..9590be35b 100644 --- a/block/pruning_test.go +++ b/block/pruning_test.go @@ -76,5 +76,4 @@ func TestPruningRetainHeight(t *testing.T) { require.Error(gerrc.ErrInvalidArgument) } } - } diff --git a/indexers/blockindexer/kv/kv_test.go b/indexers/blockindexer/kv/kv_test.go index b7a8a17f6..b5de64338 100644 --- a/indexers/blockindexer/kv/kv_test.go +++ b/indexers/blockindexer/kv/kv_test.go @@ -148,7 +148,6 @@ func TestBlockIndexer(t *testing.T) { } func TestBlockIndexerPruning(t *testing.T) { - // init the block indexer prefixStore := store.NewPrefixKV(store.NewDefaultInMemoryKVStore(), []byte("block_events")) indexer := blockidxkv.New(prefixStore) @@ -179,7 +178,6 @@ func TestBlockIndexerPruning(t *testing.T) { results, err = indexer.Search(context.Background(), q) require.NoError(t, err) require.Equal(t, 0, len(results)) - } func getBeginBlock() abci.ResponseBeginBlock { diff --git a/indexers/txindex/indexer_service_test.go b/indexers/txindex/indexer_service_test.go index abd281605..79f2ee22a 100644 --- a/indexers/txindex/indexer_service_test.go +++ b/indexers/txindex/indexer_service_test.go @@ -88,5 +88,4 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) { require.NoError(t, err) expectedTxPruned := uint64(2) require.Equal(t, expectedTxPruned, txPruned) - } diff --git a/indexers/txindex/kv/kv_test.go b/indexers/txindex/kv/kv_test.go index 60caf7f54..8f807bfd0 100644 --- a/indexers/txindex/kv/kv_test.go +++ b/indexers/txindex/kv/kv_test.go @@ -315,7 +315,6 @@ func TestTxSearchMultipleTxs(t *testing.T) { } func TestTxIndexerPruning(t *testing.T) { - // init the block indexer indexer := NewTxIndex(store.NewDefaultInMemoryKVStore()) numBlocks := uint64(100) @@ -356,7 +355,6 @@ func TestTxIndexerPruning(t *testing.T) { results := indexer.match(context.Background(), c, startKeyForCondition(c, 0), nil, true) require.Equal(t, 0, len(results)) } - } func txResultWithEvents(events []abci.Event) *abci.TxResult { @@ -373,6 +371,7 @@ func txResultWithEvents(events []abci.Event) *abci.TxResult { }, } } + func getRandomTxResult(height int64, events []abci.Event) *abci.TxResult { tx := types.Tx(randomTxHash()) return &abci.TxResult{ diff --git a/types/pb/dymint/state.pb.go b/types/pb/dymint/state.pb.go index 1cb1dfca9..4b318b849 100644 --- a/types/pb/dymint/state.pb.go +++ b/types/pb/dymint/state.pb.go @@ -195,7 +195,7 @@ func (m *State) GetRollappParams() RollappParams { return RollappParams{} } -//rollapp params defined in genesis and updated via gov proposal +// rollapp params defined in genesis and updated via gov proposal type RollappParams struct { //data availability type (e.g. celestia) used in the rollapp Da string `protobuf:"bytes,1,opt,name=da,proto3" json:"da,omitempty"` From 3d1593e61cffcf64060a8dd703ce02cb149de161 Mon Sep 17 00:00:00 2001 From: Michael Tsitrin <michael@dymension.xyz> Date: Mon, 21 Oct 2024 15:04:02 +0300 Subject: [PATCH 08/13] brought back TargetHeight --- block/block.go | 2 +- block/block_cache.go | 11 +---------- block/manager.go | 14 ++++++++++++++ block/manager_test.go | 31 +++++++++++++++++++++++++++++++ block/p2p.go | 5 +---- block/retriever.go | 1 + block/state.go | 1 + rpc/client/client.go | 4 +--- 8 files changed, 51 insertions(+), 18 deletions(-) diff --git a/block/block.go b/block/block.go index 941ca09df..e8db54903 100644 --- a/block/block.go +++ b/block/block.go @@ -115,7 +115,7 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta // signal the role switch, in case where this node is the new proposer // the other direction is handled elsewhere - if switchRole && !m.isProposer { + if switchRole && !m.isProposer && block.Header.Height == m.TargetHeight.Load() { m.roleSwitchC <- true m.logger.Info("Node changing to proposer role") } diff --git a/block/block_cache.go b/block/block_cache.go index 020c805bf..b224f69fc 100644 --- a/block/block_cache.go +++ b/block/block_cache.go @@ -6,17 +6,12 @@ import ( type Cache struct { // concurrency managed by Manager.retrieverMu mutex - cache map[uint64]types.CachedBlock - lastSeenHeight uint64 + cache map[uint64]types.CachedBlock } func (m *Cache) Add(h uint64, b *types.Block, c *types.Commit, source types.BlockSource) { m.cache[h] = types.CachedBlock{Block: b, Commit: c, Source: source} types.BlockCacheSizeGauge.Set(float64(m.Size())) - - if h > m.lastSeenHeight { - m.lastSeenHeight = h - } } func (m *Cache) Delete(h uint64) { @@ -37,7 +32,3 @@ func (m *Cache) Has(h uint64) bool { func (m *Cache) Size() int { return len(m.cache) } - -func (m *Cache) LastSeenHeight() uint64 { - return m.lastSeenHeight -} diff --git a/block/manager.go b/block/manager.go index 59ab3fb28..e1a0c5f33 100644 --- a/block/manager.go +++ b/block/manager.go @@ -78,6 +78,9 @@ type Manager struct { // we can only do full validation in sequential order. blockCache *Cache + // TargetHeight holds the value of the current highest block seen from either p2p (probably higher) or the DA + TargetHeight atomic.Uint64 + // channel used to send the retain height to the pruning background loop pruningC chan int64 @@ -339,6 +342,7 @@ func (m *Manager) syncFromSettlement() error { } m.UpdateLastSubmittedHeight(res.EndHeight) + m.UpdateTargetHeight(res.EndHeight) err = m.syncToTargetHeight(res.EndHeight) if err != nil { @@ -353,6 +357,16 @@ func (m *Manager) GetProposerPubKey() tmcrypto.PubKey { return m.State.Sequencers.GetProposerPubKey() } +// UpdateTargetHeight will update the highest height seen from either P2P or DA. +func (m *Manager) UpdateTargetHeight(h uint64) { + for { + currentHeight := m.TargetHeight.Load() + if m.TargetHeight.CompareAndSwap(currentHeight, max(currentHeight, h)) { + break + } + } +} + // UpdateLastSubmittedHeight will update last height seen on the settlement layer. func (m *Manager) UpdateLastSubmittedHeight(h uint64) { for { diff --git a/block/manager_test.go b/block/manager_test.go index 0d91f10a8..4e14c2633 100644 --- a/block/manager_test.go +++ b/block/manager_test.go @@ -573,3 +573,34 @@ func TestManager_updateLastSubmittedHeight(t *testing.T) { }) } } + +func TestManager_updateTargetHeight(t *testing.T) { + tests := []struct { + name string + TargetHeight uint64 + h uint64 + expTargetHeight uint64 + }{ + { + name: "no update target height", + TargetHeight: 100, + h: 99, + expTargetHeight: 100, + }, { + name: "update target height", + TargetHeight: 100, + h: 101, + expTargetHeight: 101, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := &block.Manager{ + TargetHeight: atomic.Uint64{}, + } + m.TargetHeight.Store(tt.TargetHeight) + m.UpdateTargetHeight(tt.h) + assert.Equal(t, tt.expTargetHeight, m.TargetHeight.Load()) + }) + } +} diff --git a/block/p2p.go b/block/p2p.go index 1825d336e..1fc904bf9 100644 --- a/block/p2p.go +++ b/block/p2p.go @@ -46,6 +46,7 @@ func (m *Manager) onReceivedBlock(event pubsub.Message) { return } + m.UpdateTargetHeight(height) types.LastReceivedP2PHeightGauge.Set(float64(height)) m.logger.Debug("Received new block from p2p.", "block height", height, "source", source.String(), "store height", m.State.Height(), "n cachedBlocks", m.blockCache.Size()) @@ -93,7 +94,3 @@ func (m *Manager) saveP2PBlockToBlockSync(block *types.Block, commit *types.Comm } return nil } - -func (m *Manager) GetLatestP2PHeight() uint64 { - return m.blockCache.lastSeenHeight -} diff --git a/block/retriever.go b/block/retriever.go index b05da59c1..4c6a4bc16 100644 --- a/block/retriever.go +++ b/block/retriever.go @@ -24,6 +24,7 @@ func (m *Manager) onNewStateUpdate(event pubsub.Message) { } h := eventData.EndHeight + m.UpdateTargetHeight(h) m.UpdateLastSubmittedHeight(h) err := m.syncToTargetHeight(h) diff --git a/block/state.go b/block/state.go index 82e561b33..ddef76626 100644 --- a/block/state.go +++ b/block/state.go @@ -157,6 +157,7 @@ func (e *Executor) UpdateProposerFromBlock(s *types.State, block *types.Block) ( return false, err } + // check if this node is the new proposer localSeq := s.Sequencers.GetByConsAddress(e.localAddress) if localSeq != nil && bytes.Equal(localSeq.Hash(), block.Header.NextSequencersHash[:]) { return true, nil diff --git a/rpc/client/client.go b/rpc/client/client.go index eb74f839c..6fa48a6bb 100644 --- a/rpc/client/client.go +++ b/rpc/client/client.go @@ -737,8 +737,6 @@ func (c *Client) Status(ctx context.Context) (*ctypes.ResultStatus, error) { id, addr, network := c.node.P2P.Info() txIndexerStatus := "on" - lastSeenHeight := max(c.node.BlockManager.LastSubmittedHeight.Load(), c.node.BlockManager.GetLatestP2PHeight()) - result := &ctypes.ResultStatus{ // TODO(ItzhakBokris): update NodeInfo fields NodeInfo: p2p.DefaultNodeInfo{ @@ -760,7 +758,7 @@ func (c *Client) Status(ctx context.Context) (*ctypes.ResultStatus, error) { LatestBlockHeight: int64(latestHeight), LatestBlockTime: time.Unix(0, int64(latestBlockTimeNano)), // CatchingUp is true if the node is not at the latest height received from p2p or da. - CatchingUp: lastSeenHeight > latestHeight, + CatchingUp: c.node.BlockManager.TargetHeight.Load() > latestHeight, // TODO(tzdybal): add missing fields // EarliestBlockHash: earliestBlockHash, // EarliestAppHash: earliestAppHash, From 2577b2381ac8220f940b541d94ebc8a62119be8c Mon Sep 17 00:00:00 2001 From: Michael Tsitrin <michael@dymension.xyz> Date: Mon, 21 Oct 2024 15:19:07 +0300 Subject: [PATCH 09/13] syncing metadata from SL on boot --- block/manager.go | 42 +++++++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/block/manager.go b/block/manager.go index e1a0c5f33..252a28315 100644 --- a/block/manager.go +++ b/block/manager.go @@ -259,9 +259,13 @@ func (m *Manager) Start(ctx context.Context) error { return err } - m.isProposer = m.IsProposer() - m.logger.Info("starting block manager", "proposer", m.isProposer) + err = m.syncMetadataFromSettlement() + if err != nil { + return fmt.Errorf("sync block manager from settlement: %w", err) + } + targetHeight := m.TargetHeight.Load() + m.logger.Info("starting block manager", "proposer", m.isProposer, "targetHeight", targetHeight) /* -------------------------------------------------------------------------- */ /* sync section */ /* -------------------------------------------------------------------------- */ @@ -269,19 +273,20 @@ func (m *Manager) Start(ctx context.Context) error { /* ----------------------------- full node mode ----------------------------- */ // Full-nodes can sync from DA but it is not necessary to wait for it, since it can sync from P2P as well in parallel. go func() { - err := m.syncFromSettlement() + err = m.syncToTargetHeight(targetHeight) if err != nil { - m.logger.Error("sync block manager from settlement", "err", err) + m.logger.Error("sync to target height", "error", err) } }() } else { /* ----------------------------- sequencer mode ----------------------------- */ // Sequencer must wait till DA is synced to start submitting blobs <-m.DAClient.Synced() - err = m.syncFromSettlement() + err = m.syncToTargetHeight(targetHeight) if err != nil { return fmt.Errorf("sync block manager from settlement: %w", err) } + // check if sequencer in the middle of rotation nextSeqAddr, missing, err := m.MissingLastBatch() if err != nil { @@ -321,35 +326,30 @@ func (m *Manager) NextHeightToSubmit() uint64 { return m.LastSubmittedHeight.Load() + 1 } -// syncFromSettlement enforces the node to be synced on initial run from SL and DA. -func (m *Manager) syncFromSettlement() error { +// syncMetadataFromSettlement gets the latest height and sequencer set from the settlement layer. +func (m *Manager) syncMetadataFromSettlement() error { + m.isProposer = m.IsProposer() + err := m.UpdateSequencerSetFromSL() if err != nil { return fmt.Errorf("update bonded sequencer set: %w", err) } + targetHeight := uint64(m.Genesis.InitialHeight - 1) + res, err := m.SLClient.GetLatestBatch() if errors.Is(err, gerrc.ErrNotFound) { // The SL hasn't got any batches for this chain yet. m.logger.Info("No batches for chain found in SL.") - m.LastSubmittedHeight.Store(uint64(m.Genesis.InitialHeight - 1)) - return nil - } - - if err != nil { + } else if err != nil { // TODO: separate between fresh rollapp and non-registered rollapp return err + } else { + targetHeight = res.EndHeight } - m.UpdateLastSubmittedHeight(res.EndHeight) - m.UpdateTargetHeight(res.EndHeight) - - err = m.syncToTargetHeight(res.EndHeight) - if err != nil { - return err - } - - m.logger.Info("Synced.", "current height", m.State.Height(), "last submitted height", m.LastSubmittedHeight.Load()) + m.UpdateLastSubmittedHeight(targetHeight) + m.UpdateTargetHeight(targetHeight) return nil } From b5b2e6cd3bf7ee77ea201588b851bc83d11157ad Mon Sep 17 00:00:00 2001 From: Michael Tsitrin <michael@dymension.xyz> Date: Mon, 21 Oct 2024 15:28:25 +0300 Subject: [PATCH 10/13] cleanup the handler for missing last batch --- block/manager.go | 8 +------- block/sequencers.go | 22 ++++++++++++---------- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/block/manager.go b/block/manager.go index 252a28315..1f0c4d029 100644 --- a/block/manager.go +++ b/block/manager.go @@ -288,16 +288,10 @@ func (m *Manager) Start(ctx context.Context) error { } // check if sequencer in the middle of rotation - nextSeqAddr, missing, err := m.MissingLastBatch() + err := m.CompleteRotationIfNeeded(ctx) if err != nil { return fmt.Errorf("checking if missing last batch: %w", err) } - // if sequencer is in the middle of rotation, complete rotation instead of running the main loop - if missing { - m.handleRotationReq(ctx, nextSeqAddr) - m.isProposer = false - m.logger.Info("Sequencer is no longer the proposer") - } } /* -------------------------------------------------------------------------- */ diff --git a/block/sequencers.go b/block/sequencers.go index 9b4a9fa61..aa6448fe8 100644 --- a/block/sequencers.go +++ b/block/sequencers.go @@ -71,23 +71,25 @@ func (m *Manager) IsProposer() bool { return bytes.Equal(l2Proposer, localProposerKey) || bytes.Equal(expectedHubProposer, localProposerKey) } -// FIXME: rename -// MissingLastBatch checks if the sequencer is in the middle of rotation (I'm the proposer, but needs to complete rotation) +// CompleteRotationIfNeeded checks if the sequencer is in the middle of rotation (I'm the proposer, but needs to complete rotation) // returns the next sequencer address and a flag if the sequencer is the old proposer and the hub waits for the last batch -func (m *Manager) MissingLastBatch() (string, bool, error) { +func (m *Manager) CompleteRotationIfNeeded(ctx context.Context) error { localProposerKey, _ := m.LocalKey.GetPublic().Raw() next, err := m.SLClient.CheckRotationInProgress() if err != nil { - return "", false, err + return err } if next == nil { - return "", false, nil + return nil + } + // rotation in not completed yet! check if we're the old proposer and needs to complete rotation + if bytes.Equal(m.SLClient.GetProposer().PubKey().Bytes(), localProposerKey) { + // complete rotation before running the main loops + m.handleRotationReq(ctx, next.SettlementAddress) + m.isProposer = false + m.logger.Info("Sequencer is no longer the proposer") } - // rotation in progress, - // check if we're the old proposer and needs to complete rotation - curr := m.SLClient.GetProposer() - isProposer := bytes.Equal(curr.PubKey().Bytes(), localProposerKey) - return next.SettlementAddress, isProposer, nil + return nil } // handleRotationReq completes the rotation flow once a signal is received from the SL From ba4ff2a254d88c3814918e3fb03ec70ff18d3f93 Mon Sep 17 00:00:00 2001 From: Michael Tsitrin <michael@dymension.xyz> Date: Mon, 21 Oct 2024 17:31:37 +0300 Subject: [PATCH 11/13] minor rename --- block/block.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/block/block.go b/block/block.go index e8db54903..eaeac33ce 100644 --- a/block/block.go +++ b/block/block.go @@ -86,8 +86,9 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta m.Executor.UpdateStateAfterCommit(m.State, responses, appHash, block.Header.Height) } - // check if the proposer needs to be changed - switchRole, err := m.Executor.UpdateProposerFromBlock(m.State, block) + // update proposer from block header if needed + // if new proposer is set, we become the proposer + isNewProposer, err := m.Executor.UpdateProposerFromBlock(m.State, block) if err != nil { return fmt.Errorf("update proposer from block: %w", err) } @@ -115,12 +116,11 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta // signal the role switch, in case where this node is the new proposer // the other direction is handled elsewhere - if switchRole && !m.isProposer && block.Header.Height == m.TargetHeight.Load() { + if isNewProposer && block.Header.Height == m.TargetHeight.Load() { m.roleSwitchC <- true m.logger.Info("Node changing to proposer role") } - // FIXME: isn't this supposed to be checked before committing the state? // validate whether configuration params and rollapp consensus params keep in line, after rollapp params are updated from the responses received in the block execution err = m.ValidateConfigWithRollappParams() if err != nil { From 09ea8de8c8c7126810ac98ba1bfe6d69e09f9a36 Mon Sep 17 00:00:00 2001 From: Michael Tsitrin <michael@dymension.xyz> Date: Mon, 21 Oct 2024 18:46:56 +0300 Subject: [PATCH 12/13] logging --- block/manager.go | 1 + 1 file changed, 1 insertion(+) diff --git a/block/manager.go b/block/manager.go index 1f0c4d029..8d9fccebc 100644 --- a/block/manager.go +++ b/block/manager.go @@ -221,6 +221,7 @@ func (m *Manager) RunLoops(ctx context.Context) { m.logger.Error("Role switch signal received, but already in the same role", "proposer", proposer) continue } + m.logger.Info("Role switch signal received", "from", m.isProposer, "to", proposer) m.isProposer = proposer // shutdown all active loops and run loops again with new role From 07bec4ee85b5eb6c190986c613defaf34e8b3754 Mon Sep 17 00:00:00 2001 From: Michael Tsitrin <michael@dymension.xyz> Date: Mon, 21 Oct 2024 21:04:39 +0300 Subject: [PATCH 13/13] fixed submit height issue --- block/initchain.go | 4 ++++ block/manager.go | 28 ++++++++++++++++++---------- block/manager_test.go | 37 ++++++------------------------------- 3 files changed, 28 insertions(+), 41 deletions(-) diff --git a/block/initchain.go b/block/initchain.go index 224ae8cf1..83113cd7d 100644 --- a/block/initchain.go +++ b/block/initchain.go @@ -27,5 +27,9 @@ func (m *Manager) RunInitChain(ctx context.Context) error { if _, err := m.Store.SaveState(m.State, nil); err != nil { return err } + + targetHeight := uint64(m.Genesis.InitialHeight - 1) + m.UpdateLastSubmittedHeight(targetHeight) + m.UpdateTargetHeight(targetHeight) return nil } diff --git a/block/manager.go b/block/manager.go index 8d9fccebc..9bef65171 100644 --- a/block/manager.go +++ b/block/manager.go @@ -7,7 +7,6 @@ import ( "sync" "sync/atomic" - "github.com/dymensionxyz/gerr-cosmos/gerrc" "golang.org/x/sync/errgroup" "github.com/dymensionxyz/dymint/da/registry" @@ -16,6 +15,7 @@ import ( uerrors "github.com/dymensionxyz/dymint/utils/errors" uevent "github.com/dymensionxyz/dymint/utils/event" "github.com/dymensionxyz/dymint/version" + "github.com/dymensionxyz/gerr-cosmos/gerrc" "github.com/libp2p/go-libp2p/core/crypto" tmcrypto "github.com/tendermint/tendermint/crypto" @@ -235,6 +235,9 @@ func (m *Manager) RunLoops(ctx context.Context) { func (m *Manager) runLoopsWithCancelFunc(ctx context.Context) context.CancelFunc { loopCtx, cancel := context.WithCancel(ctx) if m.isProposer { + // we processed the last block, so it will be the committed height (submitted by the previous proposer) + m.UpdateLastSubmittedHeight(m.State.Height()) + go m.runProducerLoops(loopCtx) } else { m.runNonProducerLoops(loopCtx) @@ -266,7 +269,7 @@ func (m *Manager) Start(ctx context.Context) error { } targetHeight := m.TargetHeight.Load() - m.logger.Info("starting block manager", "proposer", m.isProposer, "targetHeight", targetHeight) + m.logger.Info("starting block manager", "proposer", m.isProposer, "targetHeight", targetHeight, "height", m.State.Height()) /* -------------------------------------------------------------------------- */ /* sync section */ /* -------------------------------------------------------------------------- */ @@ -330,21 +333,26 @@ func (m *Manager) syncMetadataFromSettlement() error { return fmt.Errorf("update bonded sequencer set: %w", err) } - targetHeight := uint64(m.Genesis.InitialHeight - 1) + err = m.syncLastCommittedHeight() + if err != nil { + return fmt.Errorf("sync last committed height: %w", err) + } + return nil +} + +func (m *Manager) syncLastCommittedHeight() error { res, err := m.SLClient.GetLatestBatch() + // TODO: separate between fresh rollapp and non-registered rollapp + // The SL hasn't got any batches for this chain yet. if errors.Is(err, gerrc.ErrNotFound) { - // The SL hasn't got any batches for this chain yet. - m.logger.Info("No batches for chain found in SL.") + return nil } else if err != nil { - // TODO: separate between fresh rollapp and non-registered rollapp return err - } else { - targetHeight = res.EndHeight } - m.UpdateLastSubmittedHeight(targetHeight) - m.UpdateTargetHeight(targetHeight) + m.UpdateLastSubmittedHeight(res.EndHeight) + m.UpdateTargetHeight(res.EndHeight) return nil } diff --git a/block/manager_test.go b/block/manager_test.go index 4e14c2633..81febab24 100644 --- a/block/manager_test.go +++ b/block/manager_test.go @@ -543,37 +543,6 @@ func TestDAFetch(t *testing.T) { } } -func TestManager_updateLastSubmittedHeight(t *testing.T) { - tests := []struct { - name string - LastSubmittedHeight uint64 - h uint64 - expLastSubmittedHeight uint64 - }{ - { - name: "no update last submitted height", - LastSubmittedHeight: 100, - h: 99, - expLastSubmittedHeight: 100, - }, { - name: "update last submitted height", - LastSubmittedHeight: 100, - h: 101, - expLastSubmittedHeight: 101, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - m := &block.Manager{ - LastSubmittedHeight: atomic.Uint64{}, - } - m.LastSubmittedHeight.Store(tt.LastSubmittedHeight) - m.UpdateLastSubmittedHeight(tt.h) - assert.Equal(t, tt.expLastSubmittedHeight, m.LastSubmittedHeight.Load()) - }) - } -} - func TestManager_updateTargetHeight(t *testing.T) { tests := []struct { name string @@ -592,6 +561,12 @@ func TestManager_updateTargetHeight(t *testing.T) { h: 101, expTargetHeight: 101, }, + { + name: "same height", + TargetHeight: 100, + h: 100, + expTargetHeight: 100, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {