Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

сhore: Refactor bitcoind conn and client #966

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 38 additions & 24 deletions chain/bitcoind_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ type BitcoindClient struct {
// chain.
birthday time.Time

// id is the unique ID of this client assigned by the backing bitcoind
// connection.
id uint64

// chainConn is the backing client to our rescan client that contains
// the RPC and ZMQ connections to a bitcoind node.
chainConn *BitcoindConn
Expand Down Expand Up @@ -91,15 +87,6 @@ type BitcoindClient struct {
// need to process earlier notifications still waiting to be processed.
notificationQueue *ConcurrentQueue

// txNtfns is a channel through which transaction events will be
// retrieved from the backing bitcoind connection, either via ZMQ or
// polling RPC.
txNtfns chan *wire.MsgTx

// blockNtfns is a channel through block events will be retrieved from
// the backing bitcoind connection, either via ZMQ or polling RPC.
blockNtfns chan *wire.MsgBlock

quit chan struct{}
wg sync.WaitGroup
}
Expand All @@ -108,6 +95,27 @@ type BitcoindClient struct {
// chain.Interface interface.
var _ Interface = (*BitcoindClient)(nil)

// NewBitcoindClient returns a bitcoind client using the current bitcoind
// connection. This allows us to share the same connection using multiple
// clients.
func NewBitcoindClient(c *BitcoindConn) *BitcoindClient {
return &BitcoindClient{
quit: make(chan struct{}),

chainConn: c,

rescanUpdate: make(chan interface{}),
watchedAddresses: make(map[string]struct{}),
watchedOutPoints: make(map[wire.OutPoint]struct{}),
watchedTxs: make(map[chainhash.Hash]struct{}),

notificationQueue: NewConcurrentQueue(20), //nolint: mnd

mempool: make(map[chainhash.Hash]struct{}),
expiredMempool: make(map[int32]map[chainhash.Hash]struct{}),
}
}

// BackEnd returns the name of the driver.
func (c *BitcoindClient) BackEnd() string {
return "bitcoind"
Expand Down Expand Up @@ -374,11 +382,6 @@ func (c *BitcoindClient) NotifyBlocks() error {
c.bestBlock.Timestamp = time.Unix(bestHeader.Time, 0)
c.bestBlockMtx.Unlock()

// Include the client in the set of rescan clients of the backing
// bitcoind connection in order to receive ZMQ event notifications for
// new blocks and transactions.
c.chainConn.AddClient(c)

c.wg.Add(1)
go c.ntfnHandler()

Expand Down Expand Up @@ -531,6 +534,9 @@ func (c *BitcoindClient) Rescan(blockHash *chainhash.Hash,
// connection and starts all goroutines necessary in order to process rescans
// and ZMQ notifications.
//
// NB: make sure that the bitcoind connection has been started before otherwise
// you wouldn't receive notifications
//
// NOTE: This is part of the chain.Interface interface.
func (c *BitcoindClient) Start() error {
if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
Expand Down Expand Up @@ -579,10 +585,6 @@ func (c *BitcoindClient) Stop() {

close(c.quit)

// Remove this client's reference from the bitcoind connection to
// prevent sending notifications to it after it's been stopped.
c.chainConn.RemoveClient(c.id)

c.notificationQueue.Stop()
}

Expand Down Expand Up @@ -681,18 +683,30 @@ func (c *BitcoindClient) rescanHandler() {
// NOTE: This must be called as a goroutine.
func (c *BitcoindClient) ntfnHandler() {
defer c.wg.Done()
// Subscribe to receive ZMQ event notifications for
// new blocks and transactions.
subscription := c.chainConn.Subscribe()
defer c.chainConn.Unsubscribe(subscription)

for {
select {
case tx := <-c.txNtfns:
case tx, ok := <-subscription.TxNotifications():
if !ok {
// source closed the channel
return
}
txDetails := btcutil.NewTx(tx)
_, _, err := c.filterTx(txDetails, nil, true)
if err != nil {
log.Errorf("Unable to filter transaction %v: %v",
txDetails.Hash(), err)
}

case newBlock := <-c.blockNtfns:
case newBlock, ok := <-subscription.BlockNotifications():
if !ok {
// source closed the channel
return
}
// If the new block's previous hash matches the best
// hash known to us, then the new block is the next
// successor, so we'll update our best block to reflect
Expand Down
119 changes: 53 additions & 66 deletions chain/bitcoind_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,6 @@ type BitcoindConn struct {
started int32 // To be used atomically.
stopped int32 // To be used atomically.

// rescanClientCounter is an atomic counter that assigns a unique ID to
// each new bitcoind rescan client using the current bitcoind
// connection.
rescanClientCounter uint64

cfg BitcoindConfig

// client is the RPC client to the bitcoind node.
Expand All @@ -119,10 +114,10 @@ type BitcoindConn struct {
// retrieved from bitcoind.
events BitcoindEvents

// rescanClients is the set of active bitcoind rescan clients to which
// subscriptions is the set of active clients to which
// ZMQ event notifications will be sent to.
rescanClientsMtx sync.Mutex
rescanClients map[uint64]*BitcoindClient
subscriptions map[*bitcoindNotificationsImpl]struct{}
subscriptionsMtx sync.RWMutex

quit chan struct{}
wg sync.WaitGroup
Expand Down Expand Up @@ -199,7 +194,7 @@ func NewBitcoindConn(cfg *BitcoindConfig) (*BitcoindConn, error) {
cfg: *cfg,
client: client,
prunedBlockDispatcher: prunedBlockDispatcher,
rescanClients: make(map[uint64]*BitcoindClient),
subscriptions: make(map[*bitcoindNotificationsImpl]struct{}),
quit: make(chan struct{}),
}

Expand Down Expand Up @@ -243,10 +238,13 @@ func (c *BitcoindConn) Stop() {
if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
return
}

for _, client := range c.rescanClients {
client.Stop()
c.subscriptionsMtx.Lock()
defer c.subscriptionsMtx.Unlock()
for subscription := range c.subscriptions {
close(subscription.txNtfns)
close(subscription.blockNtfns)
}
c.subscriptions = nil

close(c.quit)
c.client.Shutdown()
Expand All @@ -271,28 +269,26 @@ func (c *BitcoindConn) sendBlockToClients() {
// sendBlock is a helper function that sends the given block to each
// of the rescan clients
sendBlock := func(block *wire.MsgBlock) {
c.rescanClientsMtx.Lock()
defer c.rescanClientsMtx.Unlock()
c.subscriptionsMtx.RLock()
defer c.subscriptionsMtx.RUnlock()

for _, client := range c.rescanClients {
for subscription := range c.subscriptions {
select {
case client.blockNtfns <- block:
case <-client.quit:
case subscription.blockNtfns <- block:
case <-c.quit:
return
}
}
}

var block *wire.MsgBlock
for {
select {
case block = <-c.events.BlockNotifications():
case block := <-c.events.BlockNotifications():
sendBlock(block)
case <-c.quit:
return
}

sendBlock(block)
}
}

Expand All @@ -302,28 +298,25 @@ func (c *BitcoindConn) sendTxToClients() {
defer c.wg.Done()

sendTx := func(tx *wire.MsgTx) {
c.rescanClientsMtx.Lock()
defer c.rescanClientsMtx.Unlock()
c.subscriptionsMtx.RLock()
defer c.subscriptionsMtx.RUnlock()

for _, client := range c.rescanClients {
for subscription := range c.subscriptions {
select {
case client.txNtfns <- tx:
case <-client.quit:
case subscription.txNtfns <- tx:
case <-c.quit:
return
}
}
}

var tx *wire.MsgTx
for {
select {
case tx = <-c.events.TxNotifications():
case tx := <-c.events.TxNotifications():
sendTx(tx)
case <-c.quit:
return
}

sendTx(tx)
}
}

Expand Down Expand Up @@ -396,53 +389,47 @@ func getCurrentNet(client *rpcclient.Client) (wire.BitcoinNet, error) {
}
}

// NewBitcoindClient returns a bitcoind client using the current bitcoind
// connection. This allows us to share the same connection using multiple
// clients.
func (c *BitcoindConn) NewBitcoindClient() *BitcoindClient {
return &BitcoindClient{
quit: make(chan struct{}),

id: atomic.AddUint64(&c.rescanClientCounter, 1),

chainConn: c,

rescanUpdate: make(chan interface{}),
watchedAddresses: make(map[string]struct{}),
watchedOutPoints: make(map[wire.OutPoint]struct{}),
watchedTxs: make(map[chainhash.Hash]struct{}),

notificationQueue: NewConcurrentQueue(20),
txNtfns: make(chan *wire.MsgTx, 1000),
blockNtfns: make(chan *wire.MsgBlock, 100),

mempool: make(map[chainhash.Hash]struct{}),
expiredMempool: make(map[int32]map[chainhash.Hash]struct{}),
}
}

// AddClient adds a client to the set of active rescan clients of the current
// Subscribe adds a subscription to the set of active subscriptions of the current
// chain connection. This allows the connection to include the specified client
// in its notification delivery.
//
// NB: the caller is responsible to call Unsubscribe when the subscription is not used anymore
//
// NOTE: This function is safe for concurrent access.
func (c *BitcoindConn) AddClient(client *BitcoindClient) {
c.rescanClientsMtx.Lock()
defer c.rescanClientsMtx.Unlock()
func (c *BitcoindConn) Subscribe() BitcoindNotifications {
c.subscriptionsMtx.Lock()
defer c.subscriptionsMtx.Unlock()

c.rescanClients[client.id] = client
s := &bitcoindNotificationsImpl{
txNtfns: make(chan *wire.MsgTx, 1000), //nolint: mnd
blockNtfns: make(chan *wire.MsgBlock, 100), //nolint: mnd
}
c.subscriptions[s] = struct{}{}

return s
}

// RemoveClient removes the client with the given ID from the set of active
// rescan clients. Once removed, the client will no longer receive block and
// Unsubscribe removes the subscription from the set of active subscriptions.
// Once removed, the client will no longer receive block and
// transaction notifications from the chain connection.
//
// NOTE: This function is safe for concurrent access.
func (c *BitcoindConn) RemoveClient(id uint64) {
c.rescanClientsMtx.Lock()
defer c.rescanClientsMtx.Unlock()

delete(c.rescanClients, id)
func (c *BitcoindConn) Unsubscribe(subscription BitcoindNotifications) {
c.subscriptionsMtx.Lock()
defer c.subscriptionsMtx.Unlock()

switch s := subscription.(type) {
case *bitcoindNotificationsImpl:
_, found := c.subscriptions[s]
if !found {
return
}
close(s.txNtfns)
close(s.blockNtfns)
delete(c.subscriptions, s)
default:
// Unexpected type, we can just ignore it
}
}

// isBlockPrunedErr determines if the error returned by the GetBlock RPC
Expand Down
26 changes: 23 additions & 3 deletions chain/bitcoind_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,21 @@ import (
"github.com/btcsuite/btcd/wire"
)

// BitcoindEvents is the interface that must be satisfied by any type that
// serves bitcoind block and transactions events.
type BitcoindEvents interface {
// BitcoindNotifications provides channels to listen to block and transaction notifications.
type BitcoindNotifications interface {
// TxNotifications will return a channel which will deliver new
// transactions.
TxNotifications() <-chan *wire.MsgTx

// BlockNotifications will return a channel which will deliver new
// blocks.
BlockNotifications() <-chan *wire.MsgBlock
}

// BitcoindEvents is the interface that must be satisfied by any type that
// serves bitcoind block and transactions events.
type BitcoindEvents interface {
BitcoindNotifications

// LookupInputSpend will return the transaction found in mempool that
// spends the given input.
Expand All @@ -31,6 +36,21 @@ type BitcoindEvents interface {
Stop() error
}

type bitcoindNotificationsImpl struct {
txNtfns chan *wire.MsgTx
blockNtfns chan *wire.MsgBlock
}

var _ BitcoindNotifications = (*bitcoindNotificationsImpl)(nil)

func (s *bitcoindNotificationsImpl) TxNotifications() <-chan *wire.MsgTx {
return s.txNtfns
}

func (s *bitcoindNotificationsImpl) BlockNotifications() <-chan *wire.MsgBlock {
return s.blockNtfns
}

// Ensure rpcclient.Client implements the rpcClient interface at compile time.
var _ batchClient = (*rpcclient.Client)(nil)

Expand Down
2 changes: 1 addition & 1 deletion chain/bitcoind_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ func setupBitcoind(t *testing.T, minerAddr string,
})

// Create a bitcoind client.
btcClient := chainConn.NewBitcoindClient()
btcClient := NewBitcoindClient(chainConn)
require.NoError(t, btcClient.Start())

t.Cleanup(func() {
Expand Down