Skip to content

Commit

Permalink
pool: Don't give chainstate entire process cancel.
Browse files Browse the repository at this point in the history
The pool is currently designed to shutdown the entire process deep down
in the chainstate when some errors it deems to be fatal occur.  It
accomplishes this by passed the cancel func for the entire process all
the way down to the chainstate.  Unfortunately, this is not a good
design because goroutines deep in the guts of subsystems should not have
the ability to directly pull the rug out from under the under process
without the upper layers having a chance to do anything about it.

Moreover, some of those errors could actually be temporary errors such
as temporary loss of connection to the database or dcrd which then
result in the entire process shutting down.

This is a first pass at improving the situation slightly by no longer
passing the entire process's cancel func down the chainstate handler and
instead arranges for the chainstate handler to return an error back to
the hub's Run method which itself creates its own child context that is
then canceled to cause the hub to shutdown.

It also corrects a few other issues along the way such as:

- some errors did not have the correct parameters
- the notification callback handlers were not respecting the context
  which could lead to goroutine leaks
- receivers should never close channels, only senders once they are sure
  there are no more goroutines that can write to the channel

For now, the same process shutdown behavior is maintained by having the
main process shut itself down upon observing the hub shutdown which
ensures all other subsystems shutdown properly too.

In other words, the overall behavior is the same, aside from the
aforementioned fixes, but this change makes it feasible to further
improve the behavior in the future so the pool won't just die due to
what are likely temporary failures.
  • Loading branch information
davecgh committed Sep 17, 2023
1 parent 62a7f47 commit 44b1875
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 73 deletions.
19 changes: 16 additions & 3 deletions dcrpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var signals = []os.Signal{os.Interrupt}
// newHub returns a new pool hub configured with the provided details that is
// ready to connect to a consensus daemon and wallet in the case of publicly
// available pools.
func newHub(cfg *config, db pool.Database, cancel context.CancelFunc) (*pool.Hub, error) {
func newHub(cfg *config, db pool.Database) (*pool.Hub, error) {
dcrdRPCCfg := rpcclient.ConnConfig{
Host: cfg.DcrdRPCHost,
Endpoint: "ws",
Expand Down Expand Up @@ -65,7 +65,7 @@ func newHub(cfg *config, db pool.Database, cancel context.CancelFunc) (*pool.Hub
ClientTimeout: cfg.clientTimeout,
}

return pool.NewHub(cancel, hcfg)
return pool.NewHub(hcfg)
}

// newGUI returns a new GUI configured with the provided details that is ready
Expand Down Expand Up @@ -191,7 +191,7 @@ func realMain() error {

// Create a hub instance and attempt to perform initial connection and work
// acquisition.
hub, err := newHub(cfg, db, cancel)
hub, err := newHub(cfg, db)
if err != nil {
mpLog.Errorf("unable to initialize hub: %v", err)
return err
Expand All @@ -216,7 +216,20 @@ func realMain() error {
var wg sync.WaitGroup
wg.Add(2)
go func() {
// Ensure the overall process context is cancelled once Run returns
// since the pool can't operate without it and it's possible the hub
// shut itself down due to an error deep in the chain state.
//
// The entire pool really generally shouldn't be shutting itself down
// due to chainstate issues and should instead go into a temporarily
// disabled state where it stops serving miners while it retries with
// increasing backoffs while it attempt to resolve whatever caused it to
// shutdown to begin with.
//
// However, since the code is currently structured under the assumption
// the entire process exits, this retains that behavior.
hub.Run(ctx)
cancel()
wg.Done()
}()
go func() {
Expand Down
85 changes: 29 additions & 56 deletions pool/chainstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package pool
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -41,8 +42,6 @@ type ChainStateConfig struct {
// GetBlockConfirmations fetches the block confirmations with the provided
// block hash.
GetBlockConfirmations func(context.Context, *chainhash.Hash) (int64, error)
// Cancel represents the pool's context cancellation function.
Cancel context.CancelFunc
// SignalCache sends the provided cache update event to the gui cache.
SignalCache func(event CacheUpdateEvent)
}
Expand Down Expand Up @@ -193,13 +192,13 @@ func isTreasuryActive(tx *wire.MsgTx) bool {

// handleChainUpdates processes connected and disconnected block
// notifications from the consensus daemon.
func (cs *ChainState) handleChainUpdates(ctx context.Context) {
//
// This must be run as a goroutine.
func (cs *ChainState) handleChainUpdates(ctx context.Context) error {
for {
select {
case <-ctx.Done():
close(cs.discCh)
close(cs.connCh)
return
return nil

case msg := <-cs.connCh:
var header wire.BlockHeader
Expand All @@ -218,11 +217,9 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) {
// work are curently fatal because payments are
// sourced from coinbases. The chainstate process will be
// terminated as a result.
log.Errorf("unable to fetch block with hash %x: %v",
header.PrevBlock, err)
close(msg.Done)
cs.cfg.Cancel()
continue
return fmt.Errorf("unable to fetch block with hash %x: %w",
header.PrevBlock, err)
}

coinbaseTx := block.Transactions[0]
Expand All @@ -245,11 +242,9 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) {
// Errors generated pruning invalidated jobs indicate an
// underlying issue accessing the database. The chainstate
// process will be terminated as a result.
log.Errorf("unable to prune jobs to height %d: %v",
pruneLimit, err)
close(msg.Done)
cs.cfg.Cancel()
continue
return fmt.Errorf("unable to prune jobs to height %d: %w",
pruneLimit, err)
}

// Prune all hash data not updated in the past ten minutes.
Expand All @@ -263,10 +258,8 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) {
// indicate an underlying issue accessing the
// database. The chainstate process will be
// terminated as a result.
log.Errorf("unable to prune hash data: %v", err)
close(msg.Done)
cs.cfg.Cancel()
continue
return fmt.Errorf("unable to prune hash data: %w", err)
}

err = cs.pruneAcceptedWork(ctx, pruneLimit)
Expand All @@ -275,11 +268,9 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) {
// work indicate an underlying issue accessing
// the database. The chainstate process will be
// terminated as a result.
log.Errorf("unable to prune accepted work below "+
"height #%d: %v", pruneLimit, err)
close(msg.Done)
cs.cfg.Cancel()
continue
return fmt.Errorf("unable to prune accepted work below "+
"height #%d: %w", pruneLimit, err)
}

err = cs.prunePayments(ctx, header.Height)
Expand All @@ -288,11 +279,9 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) {
// indicate an underlying issue accessing the
// database. The chainstate process will be
// terminated as a result.
log.Errorf("unable to prune orphaned payments at "+
"height #%d: %v", header.Height, err)
close(msg.Done)
cs.cfg.Cancel()
continue
return fmt.Errorf("unable to prune orphaned payments at "+
"height #%d: %w", header.Height, err)
}
}

Expand All @@ -316,11 +305,9 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) {
// looking up accepted work indicates an underlying issue
// accessing the database. The chainstate process will be
// terminated as a result.
log.Errorf("unable to fetch accepted work for block #%d's "+
"parent %s : %v", header.Height, parentHash, err)
close(msg.Done)
cs.cfg.Cancel()
continue
return fmt.Errorf("unable to fetch accepted work for block "+
"#%d's parent %s : %w", header.Height, parentHash, err)
}

// If the parent block is already confirmed as mined by the pool,
Expand All @@ -337,11 +324,9 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) {
// Errors generated updating work state indicate an underlying
// issue accessing the database. The chainstate process will
// be terminated as a result.
log.Errorf("unable to confirm accepted work for block "+
"%s: %v", header.PrevBlock.String(), err)
close(msg.Done)
cs.cfg.Cancel()
continue
return fmt.Errorf("unable to confirm accepted work for block "+
"%s: %w", header.PrevBlock.String(), err)
}
log.Infof("Mined work %s confirmed by connected block #%d (%s)",
header.PrevBlock.String(), header.Height,
Expand All @@ -356,11 +341,9 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) {
// Errors generated looking up pending payments
// indicates an underlying issue accessing the database.
// The chainstate process will be terminated as a result.
log.Errorf("failed to fetch pending payments "+
"at height #%d: %v", parentHeight, err)
close(msg.Done)
cs.cfg.Cancel()
continue
return fmt.Errorf("failed to fetch pending payments "+
"at height #%d: %w", parentHeight, err)
}

// If the parent block already has payments generated for it
Expand Down Expand Up @@ -391,10 +374,8 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) {
// Errors generated creating payments are fatal since it is
// required to distribute payments to participating miners.
// The chainstate process will be terminated as a result.
log.Error(err)
close(msg.Done)
cs.cfg.Cancel()
continue
return err
}
}

Expand Down Expand Up @@ -423,11 +404,9 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) {
// accessing the database. The chainstate process will be
// terminated as a result.
if !errors.Is(err, errs.ValueNotFound) {
log.Errorf("unable to fetch accepted work for block "+
"#%d's parent %s: %v", header.Height, parentHash, err)
close(msg.Done)
cs.cfg.Cancel()
continue
return fmt.Errorf("unable to fetch accepted work for block "+
"#%d's parent %s: %w", header.Height, parentHash, err)
}

// If the parent of the disconnected block is not an accepted
Expand All @@ -441,11 +420,9 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) {
// Errors generated updating work state indicate an underlying
// issue accessing the database. The chainstate process will
// be terminated as a result.
log.Errorf("unable to unconfirm accepted work for block "+
"%s: %v", parentHash, err)
close(msg.Done)
cs.cfg.Cancel()
continue
return fmt.Errorf("unable to unconfirm accepted work for "+
"block %s: %w", parentHash, err)
}

log.Infof("Mined work %s unconfirmed via disconnected "+
Expand All @@ -469,11 +446,9 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) {
// looking up accepted work indicates an underlying issue
// accessing the database. The chainstate process will be
// terminated as a result.
log.Errorf("unable to fetch accepted work for block #%d: %v",
header.Height, blockHash, err)
close(msg.Done)
cs.cfg.Cancel()
continue
return fmt.Errorf("unable to fetch accepted work for block "+
"#%d (hash %s): %w", header.Height, blockHash, err)
}

work.Confirmed = false
Expand All @@ -482,11 +457,9 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) {
// Errors generated updating work state indicate an underlying
// issue accessing the database. The chainstate process will
// be terminated as a result.
log.Errorf("unable to unconfirm mined work at "+
"height #%d: %v", err)
close(msg.Done)
cs.cfg.Cancel()
continue
return fmt.Errorf("unable to unconfirm mined work at height "+
"#%d: %w", header.Height, err)
}

log.Infof("Disconnected mined work %s at height #%d",
Expand Down
10 changes: 6 additions & 4 deletions pool/chainstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func testChainState(t *testing.T) {
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var confHeader wire.BlockHeader
cCfg := &ChainStateConfig{
db: db,
Expand All @@ -80,7 +81,6 @@ func testChainState(t *testing.T) {
GetBlock: getBlock,
GetBlockConfirmations: getBlockConfirmations,
SignalCache: signalCache,
Cancel: cancel,
}

cs := NewChainState(cCfg)
Expand Down Expand Up @@ -231,7 +231,10 @@ func testChainState(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
cs.handleChainUpdates(ctx)
err := cs.handleChainUpdates(ctx)
if err != nil {
t.Logf("unexpected error from chain update handler: %v", err)
}
wg.Done()
}()

Expand All @@ -255,8 +258,7 @@ func testChainState(t *testing.T) {
job := NewJob(workE, 42)
err = cs.cfg.db.persistJob(job)
if err != nil {
log.Errorf("failed to persist job %v", err)
return
t.Fatalf("failed to persist job %v", err)
}

// Ensure a malformed connected block does not terminate the chain
Expand Down
34 changes: 26 additions & 8 deletions pool/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func generateBlake256Pad() []byte {
}

// NewHub initializes the mining pool hub.
func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) {
func NewHub(hcfg *HubConfig) (*Hub, error) {
h := &Hub{
cfg: hcfg,
limiter: NewRateLimiter(),
Expand Down Expand Up @@ -272,7 +272,6 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) {
GeneratePayments: h.paymentMgr.generatePayments,
GetBlock: h.getBlock,
GetBlockConfirmations: h.getBlockConfirmations,
Cancel: cancel,
SignalCache: h.SignalCache,
}
h.chainState = NewChainState(sCfg)
Expand Down Expand Up @@ -342,8 +341,8 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) {
// if the pool is a publicly available one.
func (h *Hub) Connect(ctx context.Context) error {
// Establish a connection to the mining node.
nodeConn, err := rpcclient.New(h.cfg.NodeRPCConfig,
h.createNotificationHandlers())
ntfnHandlers := h.createNotificationHandlers(ctx)
nodeConn, err := rpcclient.New(h.cfg.NodeRPCConfig, ntfnHandlers)
if err != nil {
return err
}
Expand Down Expand Up @@ -559,18 +558,27 @@ func (h *Hub) processWork(headerE string) {
}

// createNotificationHandlers returns handlers for block and work notifications.
func (h *Hub) createNotificationHandlers() *rpcclient.NotificationHandlers {
func (h *Hub) createNotificationHandlers(ctx context.Context) *rpcclient.NotificationHandlers {
var closeConnChOnce, closeDiscChOnce sync.Once
return &rpcclient.NotificationHandlers{
OnBlockConnected: func(headerB []byte, transactions [][]byte) {
h.chainState.connCh <- &blockNotification{
select {
case <-ctx.Done():
closeConnChOnce.Do(func() { close(h.chainState.connCh) })
case h.chainState.connCh <- &blockNotification{
Header: headerB,
Done: make(chan struct{}),
}:
}
},
OnBlockDisconnected: func(headerB []byte) {
h.chainState.discCh <- &blockNotification{
select {
case <-ctx.Done():
closeDiscChOnce.Do(func() { close(h.chainState.discCh) })
case h.chainState.discCh <- &blockNotification{
Header: headerB,
Done: make(chan struct{}),
}:
}
},
OnWork: func(headerB []byte, target []byte, reason string) {
Expand Down Expand Up @@ -623,14 +631,24 @@ func (h *Hub) shutdown() {

// Run handles the process lifecycles of the pool hub.
func (h *Hub) Run(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var wg sync.WaitGroup
wg.Add(3)
go func() {
h.endpoint.run(ctx)
wg.Done()
}()
go func() {
h.chainState.handleChainUpdates(ctx)
err := h.chainState.handleChainUpdates(ctx)
if err != nil {
// Ensure the context is canceled so the remaining goroutines exit
// when there was an error that caused the chain update handler to
// exit prematurely.
log.Error(err)
cancel()
}
wg.Done()
}()
go func() {
Expand Down
4 changes: 2 additions & 2 deletions pool/hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,12 +300,12 @@ func testHub(t *testing.T) {
MaxUpgradeTries: 5,
}
ctx, cancel := context.WithCancel(context.Background())
hub, err := NewHub(cancel, hcfg)
hub, err := NewHub(hcfg)
if err != nil {
t.Fatalf("[NewHub] unexpected error: %v", err)
}

notifHandlers := hub.createNotificationHandlers()
notifHandlers := hub.createNotificationHandlers(ctx)
if notifHandlers == nil {
t.Fatalf("[CreatNotificationHandlers] expected an "+
"initialized notifications handler: %v", err)
Expand Down

0 comments on commit 44b1875

Please sign in to comment.