From 7e3f1f0bec8f19c22b06445aec9b420d5de68a67 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 15 Jan 2020 17:10:45 -0800 Subject: [PATCH 1/4] build: upgrade to walletdb v1.2.0 --- bamboozle_unit_test.go | 2 +- banman/store_test.go | 2 +- blockmanager_test.go | 2 +- filterdb/db_test.go | 2 +- go.mod | 2 +- go.sum | 4 ++++ headerfs/index_test.go | 2 +- headerfs/store_test.go | 4 ++-- sync_test.go | 2 +- 9 files changed, 13 insertions(+), 9 deletions(-) diff --git a/bamboozle_unit_test.go b/bamboozle_unit_test.go index 8d874a0cb..1dbd64a7f 100644 --- a/bamboozle_unit_test.go +++ b/bamboozle_unit_test.go @@ -550,7 +550,7 @@ func runCheckCFCheckptSanityTestCase(t *testing.T, testCase *cfCheckptTestCase) } defer os.RemoveAll(tempDir) - db, err := walletdb.Create("bdb", tempDir+"/weks.db") + db, err := walletdb.Create("bdb", tempDir+"/weks.db", true) if err != nil { t.Fatalf("Error opening DB: %s", err) } diff --git a/banman/store_test.go b/banman/store_test.go index 9ad26fdee..324ea435f 100644 --- a/banman/store_test.go +++ b/banman/store_test.go @@ -23,7 +23,7 @@ func createTestBanStore(t *testing.T) (banman.Store, func()) { } dbPath := filepath.Join(dbDir, "test.db") - db, err := walletdb.Create("bdb", dbPath) + db, err := walletdb.Create("bdb", dbPath, true) if err != nil { os.RemoveAll(dbDir) t.Fatalf("unable to create db: %v", err) diff --git a/blockmanager_test.go b/blockmanager_test.go index fcf5b6703..aa3a3cdce 100644 --- a/blockmanager_test.go +++ b/blockmanager_test.go @@ -55,7 +55,7 @@ func setupBlockManager() (*blockManager, headerfs.BlockHeaderStore, "temporary directory: %s", err) } - db, err := walletdb.Create("bdb", tempDir+"/weks.db") + db, err := walletdb.Create("bdb", tempDir+"/weks.db", true) if err != nil { os.RemoveAll(tempDir) return nil, nil, nil, nil, fmt.Errorf("Error opening DB: %s", diff --git a/filterdb/db_test.go b/filterdb/db_test.go index b3d20043d..acc6c5e54 100644 --- a/filterdb/db_test.go +++ b/filterdb/db_test.go @@ -21,7 +21,7 @@ func createTestDatabase() (func(), FilterDatabase, error) { return nil, nil, err } - db, err := walletdb.Create("bdb", tempDir+"/test.db") + db, err := walletdb.Create("bdb", tempDir+"/test.db", true) if err != nil { return nil, nil, err } diff --git a/go.mod b/go.mod index e5b5b6d0c..8b5b4755e 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ require ( github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d github.com/btcsuite/btcwallet/wallet/txauthor v1.0.0 - github.com/btcsuite/btcwallet/walletdb v1.0.0 + github.com/btcsuite/btcwallet/walletdb v1.2.0 github.com/btcsuite/btcwallet/wtxmgr v1.0.0 github.com/davecgh/go-spew v1.1.1 github.com/lightningnetwork/lnd/queue v1.0.1 diff --git a/go.sum b/go.sum index a3907f029..2adaac59e 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,8 @@ github.com/btcsuite/btcwallet/wallet/txsizes v1.0.0 h1:6DxkcoMnCPY4E9cUDPB5tbuuf github.com/btcsuite/btcwallet/wallet/txsizes v1.0.0/go.mod h1:pauEU8UuMFiThe5PB3EO+gO5kx87Me5NvdQDsTuq6cs= github.com/btcsuite/btcwallet/walletdb v1.0.0 h1:mheT7vCWK5EP6rZzhxsQ7ms9+yX4VE8bwiJctECBeNw= github.com/btcsuite/btcwallet/walletdb v1.0.0/go.mod h1:bZTy9RyYZh9fLnSua+/CD48TJtYJSHjjYcSaszuxCCk= +github.com/btcsuite/btcwallet/walletdb v1.2.0 h1:E0+M4jHOToAvGWZ27ew5AaDAHDi6fUiXkjUJUnoEOD0= +github.com/btcsuite/btcwallet/walletdb v1.2.0/go.mod h1:9cwc1Yyg4uvd4ZdfdoMnALji+V9gfWSMfxEdLdR5Vwc= github.com/btcsuite/btcwallet/wtxmgr v1.0.0 h1:aIHgViEmZmZfe0tQQqF1xyd2qBqFWxX5vZXkkbjtbeA= github.com/btcsuite/btcwallet/wtxmgr v1.0.0/go.mod h1:vc4gBprll6BP0UJ+AIGDaySoc7MdAmZf8kelfNb8CFY= github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd h1:R/opQEbFEy9JGkIguV40SvRY1uliPX8ifOvi6ICsFCw= @@ -77,6 +79,8 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6Zh golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190904154756-749cb33beabd h1:DBH9mDw0zluJT/R+nGuV3jWFWLFaHyYZWD4tOT+cjn0= golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/headerfs/index_test.go b/headerfs/index_test.go index df807f28f..ac61d7980 100644 --- a/headerfs/index_test.go +++ b/headerfs/index_test.go @@ -17,7 +17,7 @@ func createTestIndex() (func(), *headerIndex, error) { return nil, nil, err } - db, err := walletdb.Create("bdb", tempDir+"/test.db") + db, err := walletdb.Create("bdb", tempDir+"/test.db", true) if err != nil { return nil, nil, err } diff --git a/headerfs/store_test.go b/headerfs/store_test.go index 19867c00a..cf1555904 100644 --- a/headerfs/store_test.go +++ b/headerfs/store_test.go @@ -27,7 +27,7 @@ func createTestBlockHeaderStore() (func(), walletdb.DB, string, } dbPath := filepath.Join(tempDir, "test.db") - db, err := walletdb.Create("bdb", dbPath) + db, err := walletdb.Create("bdb", dbPath, true) if err != nil { return nil, nil, "", nil, err } @@ -230,7 +230,7 @@ func createTestFilterHeaderStore() (func(), walletdb.DB, string, *FilterHeaderSt } dbPath := filepath.Join(tempDir, "test.db") - db, err := walletdb.Create("bdb", dbPath) + db, err := walletdb.Create("bdb", dbPath, true) if err != nil { return nil, nil, "", nil, err } diff --git a/sync_test.go b/sync_test.go index 1bb910cf1..0b0d06511 100644 --- a/sync_test.go +++ b/sync_test.go @@ -1111,7 +1111,7 @@ func TestNeutrinoSync(t *testing.T) { t.Fatalf("Failed to create temporary directory: %s", err) } defer os.RemoveAll(tempDir) - db, err := walletdb.Create("bdb", tempDir+"/weks.db") + db, err := walletdb.Create("bdb", tempDir+"/weks.db", true) defer db.Close() if err != nil { t.Fatalf("Error opening DB: %s\n", err) From bcf1bbc636fc4c112112598ab25bc21b5d6e4eb6 Mon Sep 17 00:00:00 2001 From: Roei Erez Date: Tue, 21 Jan 2020 11:41:00 +0200 Subject: [PATCH 2/4] filterdb: Implement PurgeFilters. This commit adds the ability to purge the filters from the db. It is enables mobile apps that use neutrino in the background to implement their own eviction strategy preventing from the db to grow unboundly. --- filterdb/db.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/filterdb/db.go b/filterdb/db.go index 3f34b8528..4f7d312cb 100644 --- a/filterdb/db.go +++ b/filterdb/db.go @@ -51,6 +51,9 @@ type FilterDatabase interface { // target block hash cannot be found, then ErrFilterNotFound is to be // returned. FetchFilter(*chainhash.Hash, FilterType) (*gcs.Filter, error) + + // PurgeFilters purge all filters with a given type from persistent storage. + PurgeFilters(FilterType) error } // FilterStore is an implementation of the FilterDatabase interface which is @@ -107,6 +110,29 @@ func New(db walletdb.DB, params chaincfg.Params) (*FilterStore, error) { }, nil } +// PurgeFilters purge all filters with a given type from persistent storage. +// +// NOTE: This method is a part of the FilterDatabase interface. +func (f *FilterStore) PurgeFilters(fType FilterType) error { + return walletdb.Update(f.db, func(tx walletdb.ReadWriteTx) error { + filters := tx.ReadWriteBucket(filterBucket) + + switch fType { + case RegularFilter: + if err := filters.DeleteNestedBucket(regBucket); err != nil { + return err + } + if _, err := filters.CreateBucket(regBucket); err != nil { + return err + } + default: + return fmt.Errorf("unknown filter type: %v", fType) + } + + return nil + }) +} + // putFilter stores a filter in the database according to the corresponding // block hash. The passed bucket is expected to be the proper bucket for the // passed filter type. From 8da748b03c944b2e9cfe89ab33db1968b338c3a8 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 23 Jan 2020 11:39:21 -0800 Subject: [PATCH 3/4] blockmanager: update in-memory chain tip before notification dispatch --- blockmanager.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/blockmanager.go b/blockmanager.go index 748a89593..04354f0d9 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -1268,6 +1268,16 @@ func (b *blockManager) writeCFHeadersMsg(msg *wire.MsgCFHeaders, return nil, 0, err } + // We'll also set the new header tip and notify any peers that the tip + // has changed as well. Unlike the set of notifications below, this is + // for sub-system that only need to know the height has changed rather + // than know each new header that's been added to the tip. + b.newFilterHeadersMtx.Lock() + b.filterHeaderTip = lastHeight + b.filterHeaderTipHash = lastHash + b.newFilterHeadersMtx.Unlock() + b.newFilterHeadersSignal.Broadcast() + // Notify subscribers, and also update the filter header progress // logger at the same time. for i, header := range matchingBlockHeaders { @@ -1281,16 +1291,6 @@ func (b *blockManager) writeCFHeadersMsg(msg *wire.MsgCFHeaders, b.onBlockConnected(header, headerHeight) } - // We'll also set the new header tip and notify any peers that the tip - // has changed as well. Unlike the set of notifications above, this is - // for sub-system that only need to know the height has changed rather - // than know each new header that's been added to the tip. - b.newFilterHeadersMtx.Lock() - b.filterHeaderTip = lastHeight - b.filterHeaderTipHash = lastHash - b.newFilterHeadersMtx.Unlock() - b.newFilterHeadersSignal.Broadcast() - return &lastHeader, lastHeight, nil } From 42a0bdf361ba31025422bb7aad83e0b0ac74c52b Mon Sep 17 00:00:00 2001 From: Roei Erez Date: Mon, 9 Mar 2020 23:18:17 +0200 Subject: [PATCH 4/4] neutrino: Add MaxBatchSize query option. This commit adds a query option MaxBatchSize that allows the caller to limit the batch size when using either OptimisticBatch or OptimisticReverseBatch. Since the caller is not exposed to the max size defined in neutrino, passing the maxBatcSize serves as a hint to neutrino to limit the "default request size" and by that save bandwith. --- query.go | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/query.go b/query.go index bee81ba55..2f260645d 100644 --- a/query.go +++ b/query.go @@ -88,6 +88,12 @@ type queryOptions struct { // and that we should attempt to batch more items with the query such // that they can be cached, avoiding the extra round trip. optimisticBatch optimisticBatchType + + // maxBatchSize is the maximum items that the query should return in the + // case the optimisticBatch option is used. It saves bandwidth in the case + // the caller has a limited amount of items to fetch but still wants to use + // batching. + maxBatchSize int64 } // optimisticBatchType is a type indicating the kind of batching we want to @@ -197,6 +203,14 @@ func OptimisticReverseBatch() QueryOption { } } +// MaxBatchSize allows the caller to limit the number of items fetched +// in a batch. +func MaxBatchSize(maxSize int64) QueryOption { + return func(qo *queryOptions) { + qo.maxBatchSize = maxSize + } +} + // queryState is an atomically updated per-query state for each query in a // batch. // @@ -583,9 +597,17 @@ func (s *ChainService) prepareCFiltersQuery(blockHash chainhash.Hash, qo.applyQueryOptions(options...) // If the query specifies an optimistic batch we will attempt to fetch - // the maximum number of filters in anticipation of calls for the - // following or preceding filters. + // the maximum number of filters, which is defaulted to + // wire.MaxGetCFiltersReqRange, in anticipation of calls for the following + // or preceding filters. var startHeight, stopHeight int64 + batchSize := int64(wire.MaxGetCFiltersReqRange) + + // If the query specifies a maximum batch size, we will limit the number of + // requested filters accordingly. + if qo.maxBatchSize > 0 && qo.maxBatchSize < wire.MaxGetCFiltersReqRange { + batchSize = qo.maxBatchSize + } switch qo.optimisticBatch { @@ -597,7 +619,7 @@ func (s *ChainService) prepareCFiltersQuery(blockHash chainhash.Hash, // Forward batch, fetch as many of the following filters as possible. case forwardBatch: startHeight = int64(height) - stopHeight = startHeight + wire.MaxGetCFiltersReqRange - 1 + stopHeight = startHeight + batchSize - 1 // We need a longer timeout, since we are going to receive more // than a single response. @@ -606,7 +628,7 @@ func (s *ChainService) prepareCFiltersQuery(blockHash chainhash.Hash, // Reverse batch, fetch as many of the preceding filters as possible. case reverseBatch: stopHeight = int64(height) - startHeight = stopHeight - wire.MaxGetCFiltersReqRange + 1 + startHeight = stopHeight - batchSize + 1 // We need a longer timeout, since we are going to receive more // than a single response.