From 44b1875c686a10e8d2d67fdba6641797ab7c0a84 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Sat, 16 Sep 2023 23:57:36 -0500 Subject: [PATCH] pool: Don't give chainstate entire process cancel. The pool is currently designed to shutdown the entire process deep down in the chainstate when some errors it deems to be fatal occur. It accomplishes this by passed the cancel func for the entire process all the way down to the chainstate. Unfortunately, this is not a good design because goroutines deep in the guts of subsystems should not have the ability to directly pull the rug out from under the under process without the upper layers having a chance to do anything about it. Moreover, some of those errors could actually be temporary errors such as temporary loss of connection to the database or dcrd which then result in the entire process shutting down. This is a first pass at improving the situation slightly by no longer passing the entire process's cancel func down the chainstate handler and instead arranges for the chainstate handler to return an error back to the hub's Run method which itself creates its own child context that is then canceled to cause the hub to shutdown. It also corrects a few other issues along the way such as: - some errors did not have the correct parameters - the notification callback handlers were not respecting the context which could lead to goroutine leaks - receivers should never close channels, only senders once they are sure there are no more goroutines that can write to the channel For now, the same process shutdown behavior is maintained by having the main process shut itself down upon observing the hub shutdown which ensures all other subsystems shutdown properly too. In other words, the overall behavior is the same, aside from the aforementioned fixes, but this change makes it feasible to further improve the behavior in the future so the pool won't just die due to what are likely temporary failures. --- dcrpool.go | 19 +++++++-- pool/chainstate.go | 85 ++++++++++++++--------------------------- pool/chainstate_test.go | 10 +++-- pool/hub.go | 34 +++++++++++++---- pool/hub_test.go | 4 +- 5 files changed, 79 insertions(+), 73 deletions(-) diff --git a/dcrpool.go b/dcrpool.go index 64123dd5..1c553353 100644 --- a/dcrpool.go +++ b/dcrpool.go @@ -30,7 +30,7 @@ var signals = []os.Signal{os.Interrupt} // newHub returns a new pool hub configured with the provided details that is // ready to connect to a consensus daemon and wallet in the case of publicly // available pools. -func newHub(cfg *config, db pool.Database, cancel context.CancelFunc) (*pool.Hub, error) { +func newHub(cfg *config, db pool.Database) (*pool.Hub, error) { dcrdRPCCfg := rpcclient.ConnConfig{ Host: cfg.DcrdRPCHost, Endpoint: "ws", @@ -65,7 +65,7 @@ func newHub(cfg *config, db pool.Database, cancel context.CancelFunc) (*pool.Hub ClientTimeout: cfg.clientTimeout, } - return pool.NewHub(cancel, hcfg) + return pool.NewHub(hcfg) } // newGUI returns a new GUI configured with the provided details that is ready @@ -191,7 +191,7 @@ func realMain() error { // Create a hub instance and attempt to perform initial connection and work // acquisition. - hub, err := newHub(cfg, db, cancel) + hub, err := newHub(cfg, db) if err != nil { mpLog.Errorf("unable to initialize hub: %v", err) return err @@ -216,7 +216,20 @@ func realMain() error { var wg sync.WaitGroup wg.Add(2) go func() { + // Ensure the overall process context is cancelled once Run returns + // since the pool can't operate without it and it's possible the hub + // shut itself down due to an error deep in the chain state. + // + // The entire pool really generally shouldn't be shutting itself down + // due to chainstate issues and should instead go into a temporarily + // disabled state where it stops serving miners while it retries with + // increasing backoffs while it attempt to resolve whatever caused it to + // shutdown to begin with. + // + // However, since the code is currently structured under the assumption + // the entire process exits, this retains that behavior. hub.Run(ctx) + cancel() wg.Done() }() go func() { diff --git a/pool/chainstate.go b/pool/chainstate.go index 82b4fa05..eaa3417b 100644 --- a/pool/chainstate.go +++ b/pool/chainstate.go @@ -7,6 +7,7 @@ package pool import ( "context" "errors" + "fmt" "sync" "sync/atomic" "time" @@ -41,8 +42,6 @@ type ChainStateConfig struct { // GetBlockConfirmations fetches the block confirmations with the provided // block hash. GetBlockConfirmations func(context.Context, *chainhash.Hash) (int64, error) - // Cancel represents the pool's context cancellation function. - Cancel context.CancelFunc // SignalCache sends the provided cache update event to the gui cache. SignalCache func(event CacheUpdateEvent) } @@ -193,13 +192,13 @@ func isTreasuryActive(tx *wire.MsgTx) bool { // handleChainUpdates processes connected and disconnected block // notifications from the consensus daemon. -func (cs *ChainState) handleChainUpdates(ctx context.Context) { +// +// This must be run as a goroutine. +func (cs *ChainState) handleChainUpdates(ctx context.Context) error { for { select { case <-ctx.Done(): - close(cs.discCh) - close(cs.connCh) - return + return nil case msg := <-cs.connCh: var header wire.BlockHeader @@ -218,11 +217,9 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) { // work are curently fatal because payments are // sourced from coinbases. The chainstate process will be // terminated as a result. - log.Errorf("unable to fetch block with hash %x: %v", - header.PrevBlock, err) close(msg.Done) - cs.cfg.Cancel() - continue + return fmt.Errorf("unable to fetch block with hash %x: %w", + header.PrevBlock, err) } coinbaseTx := block.Transactions[0] @@ -245,11 +242,9 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) { // Errors generated pruning invalidated jobs indicate an // underlying issue accessing the database. The chainstate // process will be terminated as a result. - log.Errorf("unable to prune jobs to height %d: %v", - pruneLimit, err) close(msg.Done) - cs.cfg.Cancel() - continue + return fmt.Errorf("unable to prune jobs to height %d: %w", + pruneLimit, err) } // Prune all hash data not updated in the past ten minutes. @@ -263,10 +258,8 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) { // indicate an underlying issue accessing the // database. The chainstate process will be // terminated as a result. - log.Errorf("unable to prune hash data: %v", err) close(msg.Done) - cs.cfg.Cancel() - continue + return fmt.Errorf("unable to prune hash data: %w", err) } err = cs.pruneAcceptedWork(ctx, pruneLimit) @@ -275,11 +268,9 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) { // work indicate an underlying issue accessing // the database. The chainstate process will be // terminated as a result. - log.Errorf("unable to prune accepted work below "+ - "height #%d: %v", pruneLimit, err) close(msg.Done) - cs.cfg.Cancel() - continue + return fmt.Errorf("unable to prune accepted work below "+ + "height #%d: %w", pruneLimit, err) } err = cs.prunePayments(ctx, header.Height) @@ -288,11 +279,9 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) { // indicate an underlying issue accessing the // database. The chainstate process will be // terminated as a result. - log.Errorf("unable to prune orphaned payments at "+ - "height #%d: %v", header.Height, err) close(msg.Done) - cs.cfg.Cancel() - continue + return fmt.Errorf("unable to prune orphaned payments at "+ + "height #%d: %w", header.Height, err) } } @@ -316,11 +305,9 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) { // looking up accepted work indicates an underlying issue // accessing the database. The chainstate process will be // terminated as a result. - log.Errorf("unable to fetch accepted work for block #%d's "+ - "parent %s : %v", header.Height, parentHash, err) close(msg.Done) - cs.cfg.Cancel() - continue + return fmt.Errorf("unable to fetch accepted work for block "+ + "#%d's parent %s : %w", header.Height, parentHash, err) } // If the parent block is already confirmed as mined by the pool, @@ -337,11 +324,9 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) { // Errors generated updating work state indicate an underlying // issue accessing the database. The chainstate process will // be terminated as a result. - log.Errorf("unable to confirm accepted work for block "+ - "%s: %v", header.PrevBlock.String(), err) close(msg.Done) - cs.cfg.Cancel() - continue + return fmt.Errorf("unable to confirm accepted work for block "+ + "%s: %w", header.PrevBlock.String(), err) } log.Infof("Mined work %s confirmed by connected block #%d (%s)", header.PrevBlock.String(), header.Height, @@ -356,11 +341,9 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) { // Errors generated looking up pending payments // indicates an underlying issue accessing the database. // The chainstate process will be terminated as a result. - log.Errorf("failed to fetch pending payments "+ - "at height #%d: %v", parentHeight, err) close(msg.Done) - cs.cfg.Cancel() - continue + return fmt.Errorf("failed to fetch pending payments "+ + "at height #%d: %w", parentHeight, err) } // If the parent block already has payments generated for it @@ -391,10 +374,8 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) { // Errors generated creating payments are fatal since it is // required to distribute payments to participating miners. // The chainstate process will be terminated as a result. - log.Error(err) close(msg.Done) - cs.cfg.Cancel() - continue + return err } } @@ -423,11 +404,9 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) { // accessing the database. The chainstate process will be // terminated as a result. if !errors.Is(err, errs.ValueNotFound) { - log.Errorf("unable to fetch accepted work for block "+ - "#%d's parent %s: %v", header.Height, parentHash, err) close(msg.Done) - cs.cfg.Cancel() - continue + return fmt.Errorf("unable to fetch accepted work for block "+ + "#%d's parent %s: %w", header.Height, parentHash, err) } // If the parent of the disconnected block is not an accepted @@ -441,11 +420,9 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) { // Errors generated updating work state indicate an underlying // issue accessing the database. The chainstate process will // be terminated as a result. - log.Errorf("unable to unconfirm accepted work for block "+ - "%s: %v", parentHash, err) close(msg.Done) - cs.cfg.Cancel() - continue + return fmt.Errorf("unable to unconfirm accepted work for "+ + "block %s: %w", parentHash, err) } log.Infof("Mined work %s unconfirmed via disconnected "+ @@ -469,11 +446,9 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) { // looking up accepted work indicates an underlying issue // accessing the database. The chainstate process will be // terminated as a result. - log.Errorf("unable to fetch accepted work for block #%d: %v", - header.Height, blockHash, err) close(msg.Done) - cs.cfg.Cancel() - continue + return fmt.Errorf("unable to fetch accepted work for block "+ + "#%d (hash %s): %w", header.Height, blockHash, err) } work.Confirmed = false @@ -482,11 +457,9 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) { // Errors generated updating work state indicate an underlying // issue accessing the database. The chainstate process will // be terminated as a result. - log.Errorf("unable to unconfirm mined work at "+ - "height #%d: %v", err) close(msg.Done) - cs.cfg.Cancel() - continue + return fmt.Errorf("unable to unconfirm mined work at height "+ + "#%d: %w", header.Height, err) } log.Infof("Disconnected mined work %s at height #%d", diff --git a/pool/chainstate_test.go b/pool/chainstate_test.go index 260e30c5..962af97c 100644 --- a/pool/chainstate_test.go +++ b/pool/chainstate_test.go @@ -71,6 +71,7 @@ func testChainState(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) + defer cancel() var confHeader wire.BlockHeader cCfg := &ChainStateConfig{ db: db, @@ -80,7 +81,6 @@ func testChainState(t *testing.T) { GetBlock: getBlock, GetBlockConfirmations: getBlockConfirmations, SignalCache: signalCache, - Cancel: cancel, } cs := NewChainState(cCfg) @@ -231,7 +231,10 @@ func testChainState(t *testing.T) { var wg sync.WaitGroup wg.Add(1) go func() { - cs.handleChainUpdates(ctx) + err := cs.handleChainUpdates(ctx) + if err != nil { + t.Logf("unexpected error from chain update handler: %v", err) + } wg.Done() }() @@ -255,8 +258,7 @@ func testChainState(t *testing.T) { job := NewJob(workE, 42) err = cs.cfg.db.persistJob(job) if err != nil { - log.Errorf("failed to persist job %v", err) - return + t.Fatalf("failed to persist job %v", err) } // Ensure a malformed connected block does not terminate the chain diff --git a/pool/hub.go b/pool/hub.go index dbe42ebb..88aca3da 100644 --- a/pool/hub.go +++ b/pool/hub.go @@ -223,7 +223,7 @@ func generateBlake256Pad() []byte { } // NewHub initializes the mining pool hub. -func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) { +func NewHub(hcfg *HubConfig) (*Hub, error) { h := &Hub{ cfg: hcfg, limiter: NewRateLimiter(), @@ -272,7 +272,6 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) { GeneratePayments: h.paymentMgr.generatePayments, GetBlock: h.getBlock, GetBlockConfirmations: h.getBlockConfirmations, - Cancel: cancel, SignalCache: h.SignalCache, } h.chainState = NewChainState(sCfg) @@ -342,8 +341,8 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) { // if the pool is a publicly available one. func (h *Hub) Connect(ctx context.Context) error { // Establish a connection to the mining node. - nodeConn, err := rpcclient.New(h.cfg.NodeRPCConfig, - h.createNotificationHandlers()) + ntfnHandlers := h.createNotificationHandlers(ctx) + nodeConn, err := rpcclient.New(h.cfg.NodeRPCConfig, ntfnHandlers) if err != nil { return err } @@ -559,18 +558,27 @@ func (h *Hub) processWork(headerE string) { } // createNotificationHandlers returns handlers for block and work notifications. -func (h *Hub) createNotificationHandlers() *rpcclient.NotificationHandlers { +func (h *Hub) createNotificationHandlers(ctx context.Context) *rpcclient.NotificationHandlers { + var closeConnChOnce, closeDiscChOnce sync.Once return &rpcclient.NotificationHandlers{ OnBlockConnected: func(headerB []byte, transactions [][]byte) { - h.chainState.connCh <- &blockNotification{ + select { + case <-ctx.Done(): + closeConnChOnce.Do(func() { close(h.chainState.connCh) }) + case h.chainState.connCh <- &blockNotification{ Header: headerB, Done: make(chan struct{}), + }: } }, OnBlockDisconnected: func(headerB []byte) { - h.chainState.discCh <- &blockNotification{ + select { + case <-ctx.Done(): + closeDiscChOnce.Do(func() { close(h.chainState.discCh) }) + case h.chainState.discCh <- &blockNotification{ Header: headerB, Done: make(chan struct{}), + }: } }, OnWork: func(headerB []byte, target []byte, reason string) { @@ -623,6 +631,9 @@ func (h *Hub) shutdown() { // Run handles the process lifecycles of the pool hub. func (h *Hub) Run(ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + var wg sync.WaitGroup wg.Add(3) go func() { @@ -630,7 +641,14 @@ func (h *Hub) Run(ctx context.Context) { wg.Done() }() go func() { - h.chainState.handleChainUpdates(ctx) + err := h.chainState.handleChainUpdates(ctx) + if err != nil { + // Ensure the context is canceled so the remaining goroutines exit + // when there was an error that caused the chain update handler to + // exit prematurely. + log.Error(err) + cancel() + } wg.Done() }() go func() { diff --git a/pool/hub_test.go b/pool/hub_test.go index 398a3fd7..c6a86133 100644 --- a/pool/hub_test.go +++ b/pool/hub_test.go @@ -300,12 +300,12 @@ func testHub(t *testing.T) { MaxUpgradeTries: 5, } ctx, cancel := context.WithCancel(context.Background()) - hub, err := NewHub(cancel, hcfg) + hub, err := NewHub(hcfg) if err != nil { t.Fatalf("[NewHub] unexpected error: %v", err) } - notifHandlers := hub.createNotificationHandlers() + notifHandlers := hub.createNotificationHandlers(ctx) if notifHandlers == nil { t.Fatalf("[CreatNotificationHandlers] expected an "+ "initialized notifications handler: %v", err)