From 24382d85ad35d5a2f200434936fd643a909b6eab Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy <65323360+algorandskiy@users.noreply.github.com> Date: Tue, 25 Jun 2024 13:05:27 -0400 Subject: [PATCH] node: close ledger and part keys on node shutdown (#6039) --- data/pools/transactionPool.go | 29 +++++++++++++++++++++++++++++ ledger/notifier.go | 2 ++ node/node.go | 18 ++++++++++++++---- 3 files changed, 45 insertions(+), 4 deletions(-) diff --git a/data/pools/transactionPool.go b/data/pools/transactionPool.go index 687a3db80c..afe12f2363 100644 --- a/data/pools/transactionPool.go +++ b/data/pools/transactionPool.go @@ -95,6 +95,11 @@ type TransactionPool struct { // stateproofOverflowed indicates that a stateproof transaction was allowed to // exceed the txPoolMaxSize. This flag is reset to false OnNewBlock stateproofOverflowed bool + + // shutdown is set to true when the pool is being shut down. It is checked in exported methods + // to prevent pool operations like remember and recomputing the block evaluator + // from using down stream resources like ledger that may be shutting down. + shutdown bool } // BlockEvaluator defines the block evaluator interface exposed by the ledger package. @@ -113,6 +118,8 @@ type VotingAccountSupplier interface { VotingAccountsForRound(basics.Round) []basics.Address } +var errPoolShutdown = errors.New("transaction pool is shutting down") + // MakeTransactionPool makes a transaction pool. func MakeTransactionPool(ledger *ledger.Ledger, cfg config.Local, log logging.Logger, vac VotingAccountSupplier) *TransactionPool { if cfg.TxPoolExponentialIncreaseFactor < 1 { @@ -430,6 +437,10 @@ func (pool *TransactionPool) ingest(txgroup []transactions.SignedTxn, params poo return ErrNoPendingBlockEvaluator } + if pool.shutdown { + return errPoolShutdown + } + if !params.recomputing { // Make sure that the latest block has been processed by OnNewBlock(). // If not, we might be in a race, so wait a little bit for OnNewBlock() @@ -441,6 +452,10 @@ func (pool *TransactionPool) ingest(txgroup []transactions.SignedTxn, params poo if pool.pendingBlockEvaluator == nil { return ErrNoPendingBlockEvaluator } + // recheck if the pool is shutting down since TimedWait above releases the lock + if pool.shutdown { + return errPoolShutdown + } } err := pool.checkSufficientFee(txgroup) @@ -529,6 +544,10 @@ func (pool *TransactionPool) OnNewBlock(block bookkeeping.Block, delta ledgercor pool.mu.Lock() defer pool.mu.Unlock() + if pool.shutdown { + return + } + defer pool.cond.Broadcast() if pool.pendingBlockEvaluator == nil || block.Round() >= pool.pendingBlockEvaluator.Round() { // Adjust the pool fee threshold. The rules are: @@ -1010,3 +1029,13 @@ func (pool *TransactionPool) AssembleDevModeBlock() (assembled *ledgercore.Unfin assembled, err = pool.AssembleBlock(pool.pendingBlockEvaluator.Round(), time.Now().Add(pool.proposalAssemblyTime)) return } + +// Shutdown stops the transaction pool from accepting new transactions and blocks. +// It takes the pool.mu lock in order to ensure there is no pending remember or block operations in flight +// and sets the shutdown flag to true. +func (pool *TransactionPool) Shutdown() { + pool.mu.Lock() + defer pool.mu.Unlock() + + pool.shutdown = true +} diff --git a/ledger/notifier.go b/ledger/notifier.go index aabf62d080..f97e1c77e6 100644 --- a/ledger/notifier.go +++ b/ledger/notifier.go @@ -74,6 +74,8 @@ func (bn *blockNotifier) worker() { func (bn *blockNotifier) close() { bn.mu.Lock() + bn.pendingBlocks = nil + bn.listeners = nil if bn.running { bn.running = false bn.cond.Broadcast() diff --git a/node/node.go b/node/node.go index d1c6cc4b82..6c77b4fbc0 100644 --- a/node/node.go +++ b/node/node.go @@ -152,6 +152,7 @@ type AlgorandFullNode struct { tracer messagetracer.MessageTracer stateProofWorker *stateproof.Worker + partHandles []db.Accessor } // TxnWithStatus represents information about a single transaction, @@ -418,6 +419,12 @@ func (node *AlgorandFullNode) Stop() { defer func() { node.mu.Unlock() node.waitMonitoringRoutines() + + // oldKeyDeletionThread uses accountManager registry so must be stopped before accountManager is closed + node.accountManager.Registry().Close() + for h := range node.partHandles { + node.partHandles[h].Close() + } }() node.net.ClearHandlers() @@ -430,6 +437,7 @@ func (node *AlgorandFullNode) Stop() { node.stateProofWorker.Stop() node.txHandler.Stop() node.agreementService.Shutdown() + node.agreementService.Accessor.Close() node.catchupService.Stop() node.txPoolSyncerService.Stop() node.blockService.Stop() @@ -441,7 +449,9 @@ func (node *AlgorandFullNode) Stop() { node.lowPriorityCryptoVerificationPool.Shutdown() node.cryptoPool.Shutdown() node.log.Debug("crypto worker pools have stopped") + node.transactionPool.Shutdown() node.cancelCtx() + node.ledger.Close() } // note: unlike the other two functions, this accepts a whole filename @@ -987,12 +997,12 @@ func (node *AlgorandFullNode) loadParticipationKeys() error { // These files are not ephemeral and must be deleted eventually since // this function is called to load files located in the node on startup added := node.accountManager.AddParticipation(part, false) - if added { - node.log.Infof("Loaded participation keys from storage: %s %s", part.Address(), info.Name()) - } else { + if !added { part.Close() continue } + node.log.Infof("Loaded participation keys from storage: %s %s", part.Address(), info.Name()) + node.partHandles = append(node.partHandles, handle) err = insertStateProofToRegistry(part, node) if err != nil { return err @@ -1024,7 +1034,7 @@ func (node *AlgorandFullNode) txPoolGaugeThread(done <-chan struct{}) { defer node.monitoringRoutinesWaitGroup.Done() ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() - for true { + for { select { case <-ticker.C: txPoolGauge.Set(uint64(node.transactionPool.PendingCount()))