Skip to content

Commit

Permalink
node: close ledger and part keys on node shutdown (#6039)
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy authored Jun 25, 2024
1 parent 052ceb2 commit 24382d8
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 4 deletions.
29 changes: 29 additions & 0 deletions data/pools/transactionPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ type TransactionPool struct {
// stateproofOverflowed indicates that a stateproof transaction was allowed to
// exceed the txPoolMaxSize. This flag is reset to false OnNewBlock
stateproofOverflowed bool

// shutdown is set to true when the pool is being shut down. It is checked in exported methods
// to prevent pool operations like remember and recomputing the block evaluator
// from using down stream resources like ledger that may be shutting down.
shutdown bool
}

// BlockEvaluator defines the block evaluator interface exposed by the ledger package.
Expand All @@ -113,6 +118,8 @@ type VotingAccountSupplier interface {
VotingAccountsForRound(basics.Round) []basics.Address
}

var errPoolShutdown = errors.New("transaction pool is shutting down")

// MakeTransactionPool makes a transaction pool.
func MakeTransactionPool(ledger *ledger.Ledger, cfg config.Local, log logging.Logger, vac VotingAccountSupplier) *TransactionPool {
if cfg.TxPoolExponentialIncreaseFactor < 1 {
Expand Down Expand Up @@ -430,6 +437,10 @@ func (pool *TransactionPool) ingest(txgroup []transactions.SignedTxn, params poo
return ErrNoPendingBlockEvaluator
}

if pool.shutdown {
return errPoolShutdown
}

if !params.recomputing {
// Make sure that the latest block has been processed by OnNewBlock().
// If not, we might be in a race, so wait a little bit for OnNewBlock()
Expand All @@ -441,6 +452,10 @@ func (pool *TransactionPool) ingest(txgroup []transactions.SignedTxn, params poo
if pool.pendingBlockEvaluator == nil {
return ErrNoPendingBlockEvaluator
}
// recheck if the pool is shutting down since TimedWait above releases the lock
if pool.shutdown {
return errPoolShutdown
}
}

err := pool.checkSufficientFee(txgroup)
Expand Down Expand Up @@ -529,6 +544,10 @@ func (pool *TransactionPool) OnNewBlock(block bookkeeping.Block, delta ledgercor

pool.mu.Lock()
defer pool.mu.Unlock()
if pool.shutdown {
return
}

defer pool.cond.Broadcast()
if pool.pendingBlockEvaluator == nil || block.Round() >= pool.pendingBlockEvaluator.Round() {
// Adjust the pool fee threshold. The rules are:
Expand Down Expand Up @@ -1010,3 +1029,13 @@ func (pool *TransactionPool) AssembleDevModeBlock() (assembled *ledgercore.Unfin
assembled, err = pool.AssembleBlock(pool.pendingBlockEvaluator.Round(), time.Now().Add(pool.proposalAssemblyTime))
return
}

// Shutdown stops the transaction pool from accepting new transactions and blocks.
// It takes the pool.mu lock in order to ensure there is no pending remember or block operations in flight
// and sets the shutdown flag to true.
func (pool *TransactionPool) Shutdown() {
pool.mu.Lock()
defer pool.mu.Unlock()

pool.shutdown = true
}
2 changes: 2 additions & 0 deletions ledger/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ func (bn *blockNotifier) worker() {

func (bn *blockNotifier) close() {
bn.mu.Lock()
bn.pendingBlocks = nil
bn.listeners = nil
if bn.running {
bn.running = false
bn.cond.Broadcast()
Expand Down
18 changes: 14 additions & 4 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ type AlgorandFullNode struct {
tracer messagetracer.MessageTracer

stateProofWorker *stateproof.Worker
partHandles []db.Accessor
}

// TxnWithStatus represents information about a single transaction,
Expand Down Expand Up @@ -418,6 +419,12 @@ func (node *AlgorandFullNode) Stop() {
defer func() {
node.mu.Unlock()
node.waitMonitoringRoutines()

// oldKeyDeletionThread uses accountManager registry so must be stopped before accountManager is closed
node.accountManager.Registry().Close()
for h := range node.partHandles {
node.partHandles[h].Close()
}
}()

node.net.ClearHandlers()
Expand All @@ -430,6 +437,7 @@ func (node *AlgorandFullNode) Stop() {
node.stateProofWorker.Stop()
node.txHandler.Stop()
node.agreementService.Shutdown()
node.agreementService.Accessor.Close()
node.catchupService.Stop()
node.txPoolSyncerService.Stop()
node.blockService.Stop()
Expand All @@ -441,7 +449,9 @@ func (node *AlgorandFullNode) Stop() {
node.lowPriorityCryptoVerificationPool.Shutdown()
node.cryptoPool.Shutdown()
node.log.Debug("crypto worker pools have stopped")
node.transactionPool.Shutdown()
node.cancelCtx()
node.ledger.Close()
}

// note: unlike the other two functions, this accepts a whole filename
Expand Down Expand Up @@ -987,12 +997,12 @@ func (node *AlgorandFullNode) loadParticipationKeys() error {
// These files are not ephemeral and must be deleted eventually since
// this function is called to load files located in the node on startup
added := node.accountManager.AddParticipation(part, false)
if added {
node.log.Infof("Loaded participation keys from storage: %s %s", part.Address(), info.Name())
} else {
if !added {
part.Close()
continue
}
node.log.Infof("Loaded participation keys from storage: %s %s", part.Address(), info.Name())
node.partHandles = append(node.partHandles, handle)
err = insertStateProofToRegistry(part, node)
if err != nil {
return err
Expand Down Expand Up @@ -1024,7 +1034,7 @@ func (node *AlgorandFullNode) txPoolGaugeThread(done <-chan struct{}) {
defer node.monitoringRoutinesWaitGroup.Done()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for true {
for {
select {
case <-ticker.C:
txPoolGauge.Set(uint64(node.transactionPool.PendingCount()))
Expand Down

0 comments on commit 24382d8

Please sign in to comment.