diff --git a/block/block.go b/block/block.go index ecd544146..eaeac33ce 100644 --- a/block/block.go +++ b/block/block.go @@ -72,8 +72,8 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta // Prune old heights, if requested by ABCI app. // retainHeight is determined by currentHeight - min-retain-blocks (app.toml config). // Unless max_age_num_blocks in consensus params is higher than min-retain-block, then max_age_num_blocks will be used instead of min-retain-blocks. - if 0 < retainHeight { + // TODO: can be called in intervals rather than every block (https://github.com/dymensionxyz/dymint/issues/334) select { case m.pruningC <- retainHeight: default: @@ -86,8 +86,12 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta m.Executor.UpdateStateAfterCommit(m.State, responses, appHash, block.Header.Height) } - // check if the proposer needs to be changed - switchRole := m.Executor.UpdateProposerFromBlock(m.State, block) + // update proposer from block header if needed + // if new proposer is set, we become the proposer + isNewProposer, err := m.Executor.UpdateProposerFromBlock(m.State, block) + if err != nil { + return fmt.Errorf("update proposer from block: %w", err) + } // save sequencers to store to be queried over RPC batch := m.Store.NewBatch() @@ -110,10 +114,11 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta m.blockCache.Delete(block.Header.Height) - if switchRole { - // TODO: graceful role change (https://github.com/dymensionxyz/dymint/issues/1008) + // signal the role switch, in case where this node is the new proposer + // the other direction is handled elsewhere + if isNewProposer && block.Header.Height == m.TargetHeight.Load() { + m.roleSwitchC <- true m.logger.Info("Node changing to proposer role") - panic("sequencer is no longer the proposer") } // validate whether configuration params and rollapp consensus params keep in line, after rollapp params are updated from the responses received in the block execution diff --git a/block/initchain.go b/block/initchain.go index db65485df..44fa930e5 100644 --- a/block/initchain.go +++ b/block/initchain.go @@ -24,5 +24,9 @@ func (m *Manager) RunInitChain(ctx context.Context) error { if _, err := m.Store.SaveState(m.State, nil); err != nil { return err } + + targetHeight := uint64(m.Genesis.InitialHeight - 1) + m.UpdateLastSubmittedHeight(targetHeight) + m.UpdateTargetHeight(targetHeight) return nil } diff --git a/block/manager.go b/block/manager.go index 89ee16df0..9bef65171 100644 --- a/block/manager.go +++ b/block/manager.go @@ -7,7 +7,6 @@ import ( "sync" "sync/atomic" - "github.com/dymensionxyz/gerr-cosmos/gerrc" "golang.org/x/sync/errgroup" "github.com/dymensionxyz/dymint/da/registry" @@ -16,6 +15,7 @@ import ( uerrors "github.com/dymensionxyz/dymint/utils/errors" uevent "github.com/dymensionxyz/dymint/utils/event" "github.com/dymensionxyz/dymint/version" + "github.com/dymensionxyz/gerr-cosmos/gerrc" "github.com/libp2p/go-libp2p/core/crypto" tmcrypto "github.com/tendermint/tendermint/crypto" @@ -54,6 +54,8 @@ type Manager struct { DAClient da.DataAvailabilityLayerClient SLClient settlement.ClientI + isProposer bool // is the local node the proposer + roleSwitchC chan bool // channel to receive role switch signal /* Submission */ @@ -133,7 +135,8 @@ func NewManager( blockCache: &Cache{ cache: make(map[uint64]types.CachedBlock), }, - pruningC: make(chan int64, 10), // use of buffered channel to avoid blocking applyBlock thread. In case channel is full, pruning will be skipped, but the retain height can be pruned in the next iteration. + pruningC: make(chan int64, 10), // use of buffered channel to avoid blocking applyBlock thread. In case channel is full, pruning will be skipped, but the retain height can be pruned in the next iteration. + roleSwitchC: make(chan bool, 1), // channel to be used to signal role switch } err = m.LoadStateOnInit(store, genesis, logger) @@ -155,6 +158,93 @@ func NewManager( return m, nil } +// runNonProducerLoops runs the loops that are common to all nodes, but not the proposer. +// This includes syncing from the DA and SL, and listening to new blocks from P2P. +func (m *Manager) runNonProducerLoops(ctx context.Context) { + // P2P Sync. Subscribe to P2P received blocks events + go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewGossipedBlock, m.onReceivedBlock, m.logger) + go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockSyncBlocksLoop", p2p.EventQueryNewBlockSyncBlock, m.onReceivedBlock, m.logger) + // SL Sync. Subscribe to SL state update events + go uevent.MustSubscribe(ctx, m.Pubsub, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted, m.onNewStateUpdate, m.logger) +} + +func (m *Manager) runProducerLoops(ctx context.Context) { + eg, ctx := errgroup.WithContext(ctx) + + // populate the bytes produced channel + bytesProducedC := make(chan int) + uerrors.ErrGroupGoLog(eg, m.logger, func() error { + return m.SubmitLoop(ctx, bytesProducedC) + }) + uerrors.ErrGroupGoLog(eg, m.logger, func() error { + bytesProducedC <- m.GetUnsubmittedBytes() // load unsubmitted bytes from previous run + return m.ProduceBlockLoop(ctx, bytesProducedC) + }) + + // channel to signal sequencer rotation started + rotateSequencerC := make(chan string, 1) + uerrors.ErrGroupGoLog(eg, m.logger, func() error { + return m.MonitorSequencerRotation(ctx, rotateSequencerC) + }) + + _ = eg.Wait() + // Check if exited due to sequencer rotation signal + select { + case nextSeqAddr := <-rotateSequencerC: + m.handleRotationReq(ctx, nextSeqAddr) + m.roleSwitchC <- false + default: + m.logger.Info("producer err group finished.") + } +} + +func (m *Manager) RunLoops(ctx context.Context) { + /* --------------------------------- common --------------------------------- */ + // listen to new bonded sequencers events to add them in the sequencer set + go uevent.MustSubscribe(ctx, m.Pubsub, "newBondedSequencer", settlement.EventQueryNewBondedSequencer, m.UpdateSequencerSet, m.logger) + // run pruning loop + go m.PruningLoop(ctx) + + // run loops initially (producer or non-producer) + cancel := m.runLoopsWithCancelFunc(ctx) + + // listen to role switch trigger + go func() { + for { + select { + // ctx cancelled, shutdown + case <-ctx.Done(): + cancel() + return + case proposer := <-m.roleSwitchC: + if proposer == m.isProposer { + m.logger.Error("Role switch signal received, but already in the same role", "proposer", proposer) + continue + } + m.logger.Info("Role switch signal received", "from", m.isProposer, "to", proposer) + m.isProposer = proposer + + // shutdown all active loops and run loops again with new role + cancel() + cancel = m.runLoopsWithCancelFunc(ctx) + } + } + }() +} + +func (m *Manager) runLoopsWithCancelFunc(ctx context.Context) context.CancelFunc { + loopCtx, cancel := context.WithCancel(ctx) + if m.isProposer { + // we processed the last block, so it will be the committed height (submitted by the previous proposer) + m.UpdateLastSubmittedHeight(m.State.Height()) + + go m.runProducerLoops(loopCtx) + } else { + m.runNonProducerLoops(loopCtx) + } + return cancel +} + // Start starts the block manager. func (m *Manager) Start(ctx context.Context) error { // Check if InitChain flow is needed @@ -173,83 +263,45 @@ func (m *Manager) Start(ctx context.Context) error { return err } - isProposer := m.IsProposer() - m.logger.Info("starting block manager", "proposer", isProposer) - - eg, ctx := errgroup.WithContext(ctx) - uerrors.ErrGroupGoLog(eg, m.logger, func() error { - return m.PruningLoop(ctx) - }) - - // listen to new bonded sequencers events to add them in the sequencer set - go uevent.MustSubscribe(ctx, m.Pubsub, "newBondedSequencer", settlement.EventQueryNewBondedSequencer, m.UpdateSequencerSet, m.logger) - - /* ----------------------------- full node mode ----------------------------- */ - if !isProposer { + err = m.syncMetadataFromSettlement() + if err != nil { + return fmt.Errorf("sync block manager from settlement: %w", err) + } + targetHeight := m.TargetHeight.Load() + + m.logger.Info("starting block manager", "proposer", m.isProposer, "targetHeight", targetHeight, "height", m.State.Height()) + /* -------------------------------------------------------------------------- */ + /* sync section */ + /* -------------------------------------------------------------------------- */ + if !m.isProposer { + /* ----------------------------- full node mode ----------------------------- */ // Full-nodes can sync from DA but it is not necessary to wait for it, since it can sync from P2P as well in parallel. go func() { - err := m.syncFromSettlement() + err = m.syncToTargetHeight(targetHeight) if err != nil { - m.logger.Error("sync block manager from settlement", "err", err) + m.logger.Error("sync to target height", "error", err) } - // DA Sync. Subscribe to SL next batch events - go uevent.MustSubscribe(ctx, m.Pubsub, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted, m.onNewStateUpdate, m.logger) }() + } else { + /* ----------------------------- sequencer mode ----------------------------- */ + // Sequencer must wait till DA is synced to start submitting blobs + <-m.DAClient.Synced() + err = m.syncToTargetHeight(targetHeight) + if err != nil { + return fmt.Errorf("sync block manager from settlement: %w", err) + } - // P2P Sync. Subscribe to P2P received blocks events - go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewGossipedBlock, m.onReceivedBlock, m.logger) - go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockSyncBlocksLoop", p2p.EventQueryNewBlockSyncBlock, m.onReceivedBlock, m.logger) - return nil - } - - /* ----------------------------- sequencer mode ----------------------------- */ - // Subscribe to batch events, to update last submitted height in case batch confirmation was lost. This could happen if the sequencer crash/restarted just after submitting a batch to the settlement and by the time we query the last batch, this batch wasn't accepted yet. - go uevent.MustSubscribe(ctx, m.Pubsub, "updateSubmittedHeightLoop", settlement.EventQueryNewSettlementBatchAccepted, m.UpdateLastSubmittedHeight, m.logger) - - // Sequencer must wait till DA is synced to start submitting blobs - <-m.DAClient.Synced() - err = m.syncFromSettlement() - if err != nil { - return fmt.Errorf("sync block manager from settlement: %w", err) - } - // check if sequencer in the middle of rotation - nextSeqAddr, missing, err := m.MissingLastBatch() - if err != nil { - return fmt.Errorf("checking if missing last batch: %w", err) - } - // if sequencer is in the middle of rotation, complete rotation instead of running the main loop - if missing { - m.handleRotationReq(ctx, nextSeqAddr) - return nil + // check if sequencer in the middle of rotation + err := m.CompleteRotationIfNeeded(ctx) + if err != nil { + return fmt.Errorf("checking if missing last batch: %w", err) + } } - // populate the bytes produced channel - bytesProducedC := make(chan int) - - // channel to signal sequencer rotation started - rotateSequencerC := make(chan string, 1) - - uerrors.ErrGroupGoLog(eg, m.logger, func() error { - return m.SubmitLoop(ctx, bytesProducedC) - }) - uerrors.ErrGroupGoLog(eg, m.logger, func() error { - bytesProducedC <- m.GetUnsubmittedBytes() // load unsubmitted bytes from previous run - return m.ProduceBlockLoop(ctx, bytesProducedC) - }) - uerrors.ErrGroupGoLog(eg, m.logger, func() error { - return m.MonitorSequencerRotation(ctx, rotateSequencerC) - }) - - go func() { - _ = eg.Wait() - // Check if exited due to sequencer rotation signal - select { - case nextSeqAddr := <-rotateSequencerC: - m.handleRotationReq(ctx, nextSeqAddr) - default: - m.logger.Info("Block manager err group finished.") - } - }() + /* -------------------------------------------------------------------------- */ + /* loops section */ + /* -------------------------------------------------------------------------- */ + m.RunLoops(ctx) return nil } @@ -272,33 +324,35 @@ func (m *Manager) NextHeightToSubmit() uint64 { return m.LastSubmittedHeight.Load() + 1 } -// syncFromSettlement enforces the node to be synced on initial run from SL and DA. -func (m *Manager) syncFromSettlement() error { +// syncMetadataFromSettlement gets the latest height and sequencer set from the settlement layer. +func (m *Manager) syncMetadataFromSettlement() error { + m.isProposer = m.IsProposer() + err := m.UpdateSequencerSetFromSL() if err != nil { return fmt.Errorf("update bonded sequencer set: %w", err) } + err = m.syncLastCommittedHeight() + if err != nil { + return fmt.Errorf("sync last committed height: %w", err) + } + + return nil +} + +func (m *Manager) syncLastCommittedHeight() error { res, err := m.SLClient.GetLatestBatch() + // TODO: separate between fresh rollapp and non-registered rollapp + // The SL hasn't got any batches for this chain yet. if errors.Is(err, gerrc.ErrNotFound) { - // The SL hasn't got any batches for this chain yet. - m.logger.Info("No batches for chain found in SL.") - m.LastSubmittedHeight.Store(uint64(m.Genesis.InitialHeight - 1)) return nil - } - - if err != nil { - // TODO: separate between fresh rollapp and non-registered rollapp - return err - } - m.LastSubmittedHeight.Store(res.EndHeight) - err = m.syncToTargetHeight(res.EndHeight) - m.UpdateTargetHeight(res.EndHeight) - if err != nil { + } else if err != nil { return err } - m.logger.Info("Synced.", "current height", m.State.Height(), "last submitted height", m.LastSubmittedHeight.Load()) + m.UpdateLastSubmittedHeight(res.EndHeight) + m.UpdateTargetHeight(res.EndHeight) return nil } @@ -306,6 +360,7 @@ func (m *Manager) GetProposerPubKey() tmcrypto.PubKey { return m.State.Sequencers.GetProposerPubKey() } +// UpdateTargetHeight will update the highest height seen from either P2P or DA. func (m *Manager) UpdateTargetHeight(h uint64) { for { currentHeight := m.TargetHeight.Load() @@ -315,6 +370,16 @@ func (m *Manager) UpdateTargetHeight(h uint64) { } } +// UpdateLastSubmittedHeight will update last height seen on the settlement layer. +func (m *Manager) UpdateLastSubmittedHeight(h uint64) { + for { + curr := m.LastSubmittedHeight.Load() + if m.LastSubmittedHeight.CompareAndSwap(curr, max(curr, h)) { + break + } + } +} + // ValidateConfigWithRollappParams checks the configuration params are consistent with the params in the dymint state (e.g. DA and version) func (m *Manager) ValidateConfigWithRollappParams() error { if version.Commit != m.State.RollappParams.Version { diff --git a/block/manager_test.go b/block/manager_test.go index 287757189..81febab24 100644 --- a/block/manager_test.go +++ b/block/manager_test.go @@ -561,6 +561,12 @@ func TestManager_updateTargetHeight(t *testing.T) { h: 101, expTargetHeight: 101, }, + { + name: "same height", + TargetHeight: 100, + h: 100, + expTargetHeight: 100, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/block/pruning.go b/block/pruning.go index d4ab18cc8..5f3b25d2c 100644 --- a/block/pruning.go +++ b/block/pruning.go @@ -42,17 +42,16 @@ func (m *Manager) PruneBlocks(retainHeight uint64) (uint64, error) { return pruned, nil } -func (m *Manager) PruningLoop(ctx context.Context) error { +func (m *Manager) PruningLoop(ctx context.Context) { for { select { case <-ctx.Done(): - return ctx.Err() + return case retainHeight := <-m.pruningC: _, err := m.PruneBlocks(uint64(retainHeight)) if err != nil { m.logger.Error("pruning blocks", "retainHeight", retainHeight, "err", err) } - } } } diff --git a/block/pruning_test.go b/block/pruning_test.go index e008634a0..9590be35b 100644 --- a/block/pruning_test.go +++ b/block/pruning_test.go @@ -76,5 +76,4 @@ func TestPruningRetainHeight(t *testing.T) { require.Error(gerrc.ErrInvalidArgument) } } - } diff --git a/block/retriever.go b/block/retriever.go index 96c288c2a..4c6a4bc16 100644 --- a/block/retriever.go +++ b/block/retriever.go @@ -22,8 +22,11 @@ func (m *Manager) onNewStateUpdate(event pubsub.Message) { m.logger.Error("onReceivedBatch", "err", "wrong event data received") return } + h := eventData.EndHeight m.UpdateTargetHeight(h) + m.UpdateLastSubmittedHeight(h) + err := m.syncToTargetHeight(h) if err != nil { m.logger.Error("sync until target", "err", err) diff --git a/block/sequencers.go b/block/sequencers.go index b6df0384f..14388cc19 100644 --- a/block/sequencers.go +++ b/block/sequencers.go @@ -45,9 +45,7 @@ func (m *Manager) MonitorSequencerRotation(ctx context.Context, rotateC chan str } // we get here once a sequencer rotation signal is received m.logger.Info("Sequencer rotation started.", "next_seq", nextSeqAddr) - go func() { - rotateC <- nextSeqAddr - }() + rotateC <- nextSeqAddr return fmt.Errorf("sequencer rotation started. signal to stop production") } @@ -73,22 +71,25 @@ func (m *Manager) IsProposer() bool { return bytes.Equal(l2Proposer, localProposerKey) || bytes.Equal(expectedHubProposer, localProposerKey) } -// MissingLastBatch checks if the sequencer is in the middle of rotation (I'm the proposer, but needs to complete rotation) +// CompleteRotationIfNeeded checks if the sequencer is in the middle of rotation (I'm the proposer, but needs to complete rotation) // returns the next sequencer address and a flag if the sequencer is the old proposer and the hub waits for the last batch -func (m *Manager) MissingLastBatch() (string, bool, error) { +func (m *Manager) CompleteRotationIfNeeded(ctx context.Context) error { localProposerKey, _ := m.LocalKey.GetPublic().Raw() next, err := m.SLClient.CheckRotationInProgress() if err != nil { - return "", false, err + return err } if next == nil { - return "", false, nil + return nil + } + // rotation in not completed yet! check if we're the old proposer and needs to complete rotation + if bytes.Equal(m.SLClient.GetProposer().PubKey().Bytes(), localProposerKey) { + // complete rotation before running the main loops + m.handleRotationReq(ctx, next.SettlementAddress) + m.isProposer = false + m.logger.Info("Sequencer is no longer the proposer") } - // rotation in progress, - // check if we're the old proposer and needs to complete rotation - curr := m.SLClient.GetProposer() - isProposer := bytes.Equal(curr.PubKey().Bytes(), localProposerKey) - return next.SettlementAddress, isProposer, nil + return nil } // handleRotationReq completes the rotation flow once a signal is received from the SL @@ -97,12 +98,9 @@ func (m *Manager) handleRotationReq(ctx context.Context, nextSeqAddr string) { m.logger.Info("Sequencer rotation started. Production stopped on this sequencer", "nextSeqAddr", nextSeqAddr) err := m.CompleteRotation(ctx, nextSeqAddr) if err != nil { + m.logger.Error("Complete rotation", "err", err) panic(err) } - - // TODO: graceful fallback to full node (https://github.com/dymensionxyz/dymint/issues/1008) - m.logger.Info("Sequencer is no longer the proposer") - panic("sequencer is no longer the proposer") } // CompleteRotation completes the sequencer rotation flow diff --git a/block/state.go b/block/state.go index e13d8c3bc..39d3ed31a 100644 --- a/block/state.go +++ b/block/state.go @@ -137,12 +137,10 @@ func (e *Executor) UpdateStateAfterCommit(s *types.State, resp *tmstate.ABCIResp // UpdateProposerFromBlock updates the proposer from the block // The next proposer is defined in the block header (NextSequencersHash) // In case of a node that a becomes the proposer, we return true to mark the role change -// currently the node will rebooted to apply the new role -// TODO: (https://github.com/dymensionxyz/dymint/issues/1008) -func (e *Executor) UpdateProposerFromBlock(s *types.State, block *types.Block) bool { +func (e *Executor) UpdateProposerFromBlock(s *types.State, block *types.Block) (bool, error) { // no sequencer change if bytes.Equal(block.Header.SequencerHash[:], block.Header.NextSequencersHash[:]) { - return false + return false, nil } if block.Header.NextSequencersHash == [32]byte{} { @@ -150,20 +148,20 @@ func (e *Executor) UpdateProposerFromBlock(s *types.State, block *types.Block) b // TODO: recover from halt (https://github.com/dymensionxyz/dymint/issues/1021) e.logger.Info("rollapp left with no proposer. chain is halted") s.Sequencers.SetProposer(nil) - return false + return false, nil } // if hash changed, update the active sequencer err := s.Sequencers.SetProposerByHash(block.Header.NextSequencersHash[:]) if err != nil { - e.logger.Error("update new proposer", "err", err) - panic(fmt.Sprintf("failed to update new proposer: %v", err)) + return false, err } + // check if this node is the new proposer localSeq := s.Sequencers.GetByConsAddress(e.localAddress) if localSeq != nil && bytes.Equal(localSeq.MustHash(), block.Header.NextSequencersHash[:]) { - return true + return true, nil } - return false + return false, nil } diff --git a/block/submit.go b/block/submit.go index a330b2597..6549eaed4 100644 --- a/block/submit.go +++ b/block/submit.go @@ -8,11 +8,9 @@ import ( "time" "github.com/dymensionxyz/gerr-cosmos/gerrc" - "github.com/tendermint/tendermint/libs/pubsub" "golang.org/x/sync/errgroup" "github.com/dymensionxyz/dymint/da" - "github.com/dymensionxyz/dymint/settlement" "github.com/dymensionxyz/dymint/types" uatomic "github.com/dymensionxyz/dymint/utils/atomic" uchannel "github.com/dymensionxyz/dymint/utils/channel" @@ -272,21 +270,3 @@ func (m *Manager) GetUnsubmittedBytes() int { func (m *Manager) GetUnsubmittedBlocks() uint64 { return m.State.Height() - m.LastSubmittedHeight.Load() } - -// UpdateLastSubmittedHeight will update last height submitted height upon events. -// This may be necessary in case we crashed/restarted before getting response for our submission to the settlement layer. -func (m *Manager) UpdateLastSubmittedHeight(event pubsub.Message) { - eventData, ok := event.Data().(*settlement.EventDataNewBatchAccepted) - if !ok { - m.logger.Error("onReceivedBatch", "err", "wrong event data received") - return - } - h := eventData.EndHeight - - for { - curr := m.LastSubmittedHeight.Load() - if m.LastSubmittedHeight.CompareAndSwap(curr, max(curr, h)) { - break - } - } -} diff --git a/indexers/blockindexer/kv/kv_test.go b/indexers/blockindexer/kv/kv_test.go index b7a8a17f6..b5de64338 100644 --- a/indexers/blockindexer/kv/kv_test.go +++ b/indexers/blockindexer/kv/kv_test.go @@ -148,7 +148,6 @@ func TestBlockIndexer(t *testing.T) { } func TestBlockIndexerPruning(t *testing.T) { - // init the block indexer prefixStore := store.NewPrefixKV(store.NewDefaultInMemoryKVStore(), []byte("block_events")) indexer := blockidxkv.New(prefixStore) @@ -179,7 +178,6 @@ func TestBlockIndexerPruning(t *testing.T) { results, err = indexer.Search(context.Background(), q) require.NoError(t, err) require.Equal(t, 0, len(results)) - } func getBeginBlock() abci.ResponseBeginBlock { diff --git a/indexers/txindex/indexer_service_test.go b/indexers/txindex/indexer_service_test.go index abd281605..79f2ee22a 100644 --- a/indexers/txindex/indexer_service_test.go +++ b/indexers/txindex/indexer_service_test.go @@ -88,5 +88,4 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) { require.NoError(t, err) expectedTxPruned := uint64(2) require.Equal(t, expectedTxPruned, txPruned) - } diff --git a/indexers/txindex/kv/kv_test.go b/indexers/txindex/kv/kv_test.go index 60caf7f54..8f807bfd0 100644 --- a/indexers/txindex/kv/kv_test.go +++ b/indexers/txindex/kv/kv_test.go @@ -315,7 +315,6 @@ func TestTxSearchMultipleTxs(t *testing.T) { } func TestTxIndexerPruning(t *testing.T) { - // init the block indexer indexer := NewTxIndex(store.NewDefaultInMemoryKVStore()) numBlocks := uint64(100) @@ -356,7 +355,6 @@ func TestTxIndexerPruning(t *testing.T) { results := indexer.match(context.Background(), c, startKeyForCondition(c, 0), nil, true) require.Equal(t, 0, len(results)) } - } func txResultWithEvents(events []abci.Event) *abci.TxResult { @@ -373,6 +371,7 @@ func txResultWithEvents(events []abci.Event) *abci.TxResult { }, } } + func getRandomTxResult(height int64, events []abci.Event) *abci.TxResult { tx := types.Tx(randomTxHash()) return &abci.TxResult{ diff --git a/node/node.go b/node/node.go index 1dea5dcf0..1d2a3f985 100644 --- a/node/node.go +++ b/node/node.go @@ -267,6 +267,11 @@ func (n *Node) OnStop() { n.Logger.Error("close store", "error", err) } + err = n.PubsubServer.Stop() + if err != nil { + n.Logger.Error("stop pubsub server", "error", err) + } + n.cancel() } diff --git a/types/pb/dymint/state.pb.go b/types/pb/dymint/state.pb.go index 1cb1dfca9..4b318b849 100644 --- a/types/pb/dymint/state.pb.go +++ b/types/pb/dymint/state.pb.go @@ -195,7 +195,7 @@ func (m *State) GetRollappParams() RollappParams { return RollappParams{} } -//rollapp params defined in genesis and updated via gov proposal +// rollapp params defined in genesis and updated via gov proposal type RollappParams struct { //data availability type (e.g. celestia) used in the rollapp Da string `protobuf:"bytes,1,opt,name=da,proto3" json:"da,omitempty"` diff --git a/utils/event/funcs.go b/utils/event/funcs.go index 8b76b7ce0..57b546f3b 100644 --- a/utils/event/funcs.go +++ b/utils/event/funcs.go @@ -33,6 +33,8 @@ func MustSubscribe( return } + defer pubsubServer.UnsubscribeAll(context.Background(), clientID) //nolint:errcheck + for { select { case <-ctx.Done():