From 91c39ebae15bf23f56ba433ecb4523e667611dba Mon Sep 17 00:00:00 2001 From: Teo Date: Wed, 23 Dec 2020 20:33:14 -0500 Subject: [PATCH] Addressing issue #41 - Allow max concurrency limits configuration on the indexer --- Makefile | 2 +- configuration/configuration.go | 20 +++++++++- configuration/configuration_test.go | 58 ++++++++++++++++++++++++++--- go.mod | 2 + go.sum | 2 + indexer/indexer.go | 6 ++- indexer/indexer_test.go | 4 ++ 7 files changed, 85 insertions(+), 9 deletions(-) diff --git a/Makefile b/Makefile index 0737d29d..15bcc06f 100644 --- a/Makefile +++ b/Makefile @@ -37,7 +37,7 @@ run-mainnet-offline: docker run -d --rm -e "MODE=OFFLINE" -e "NETWORK=MAINNET" -e "PORT=8081" -p 8081:8081 rosetta-bitcoin:latest run-testnet-online: - docker run -d --rm --ulimit "nofile=${NOFILE}:${NOFILE}" -v "${PWD}/bitcoin-data:/data" -e "MODE=ONLINE" -e "NETWORK=TESTNET" -e "PORT=8080" -p 8080:8080 -p 18333:18333 rosetta-bitcoin:latest + docker run -d --rm --ulimit "nofile=${NOFILE}:${NOFILE}" -v "${PWD}/bitcoin-data:/data" -e "MAXSYNC"="16" -e "MODE=ONLINE" -e "NETWORK=TESTNET" -e "PORT=8080" -p 8080:8080 -p 18333:18333 rosetta-bitcoin:latest run-testnet-offline: docker run -d --rm -e "MODE=OFFLINE" -e "NETWORK=TESTNET" -e "PORT=8081" -p 8081:8081 rosetta-bitcoin:latest diff --git a/configuration/configuration.go b/configuration/configuration.go index 1c0c46eb..c6129c0a 100644 --- a/configuration/configuration.go +++ b/configuration/configuration.go @@ -17,6 +17,7 @@ package configuration import ( "errors" "fmt" + "github.com/coinbase/rosetta-sdk-go/syncer" "os" "path" "strconv" @@ -88,7 +89,11 @@ const ( // ModeEnv is the environment variable read // to determine mode. - ModeEnv = "MODE" + ModeEnv= "MODE" + + //MaxSyncConcurrency is an enviroment variable + //used to cap the syncer concurrency + MaxSyncConcurrency = "MAXSYNC" // NetworkEnv is the environment variable // read to determine network. @@ -122,18 +127,29 @@ type Configuration struct { IndexerPath string BitcoindPath string Compressors []*encoder.CompressorEntry + MaxSyncConcurrency int64 } // LoadConfiguration attempts to create a new Configuration // using the ENVs in the environment. func LoadConfiguration(baseDirectory string) (*Configuration, error) { + config := &Configuration{} config.Pruning = &PruningConfiguration{ Frequency: pruneFrequency, Depth: pruneDepth, MinHeight: minPruneHeight, } + maxSyncValue := os.Getenv(MaxSyncConcurrency) + switch maxSyncValue { + case "": + config.MaxSyncConcurrency = syncer.DefaultConcurrency + case "0": + return nil, errors.New("syncer concurrency must be greater than zero") + default: + config.MaxSyncConcurrency, _ = strconv.ParseInt(maxSyncValue,10,64) + } modeValue := Mode(os.Getenv(ModeEnv)) switch modeValue { case Online: @@ -180,6 +196,8 @@ func LoadConfiguration(baseDirectory string) (*Configuration, error) { } config.GenesisBlockIdentifier = bitcoin.TestnetGenesisBlockIdentifier config.Params = bitcoin.TestnetParams + + config.Currency = bitcoin.TestnetCurrency config.ConfigPath = testnetConfigPath config.RPCPort = testnetRPCPort diff --git a/configuration/configuration_test.go b/configuration/configuration_test.go index 2b63bccb..2ade8c5b 100644 --- a/configuration/configuration_test.go +++ b/configuration/configuration_test.go @@ -16,23 +16,24 @@ package configuration import ( "errors" + "github.com/coinbase/rosetta-bitcoin/bitcoin" "os" "path" "testing" - "github.com/coinbase/rosetta-bitcoin/bitcoin" - "github.com/coinbase/rosetta-sdk-go/storage/encoder" "github.com/coinbase/rosetta-sdk-go/types" "github.com/coinbase/rosetta-sdk-go/utils" "github.com/stretchr/testify/assert" ) + func TestLoadConfiguration(t *testing.T) { tests := map[string]struct { - Mode string - Network string - Port string + Mode string + Network string + Port string + MaxSyncConcurrency string cfg *Configuration err error @@ -44,15 +45,17 @@ func TestLoadConfiguration(t *testing.T) { Mode: string(Online), err: errors.New("NETWORK must be populated"), }, - "only mode and network set": { + "port not set": { Mode: string(Online), Network: Mainnet, + MaxSyncConcurrency: "64", err: errors.New("PORT must be populated"), }, "all set (mainnet)": { Mode: string(Online), Network: Mainnet, Port: "1000", + MaxSyncConcurrency: "64", cfg: &Configuration{ Mode: Online, Network: &types.NetworkIdentifier{ @@ -63,6 +66,7 @@ func TestLoadConfiguration(t *testing.T) { Currency: bitcoin.MainnetCurrency, GenesisBlockIdentifier: bitcoin.MainnetGenesisBlockIdentifier, Port: 1000, + MaxSyncConcurrency: 64, RPCPort: mainnetRPCPort, ConfigPath: mainnetConfigPath, Pruning: &PruningConfiguration{ @@ -82,6 +86,7 @@ func TestLoadConfiguration(t *testing.T) { Mode: string(Online), Network: Testnet, Port: "1000", + MaxSyncConcurrency: "64", cfg: &Configuration{ Mode: Online, Network: &types.NetworkIdentifier{ @@ -92,6 +97,7 @@ func TestLoadConfiguration(t *testing.T) { Currency: bitcoin.TestnetCurrency, GenesisBlockIdentifier: bitcoin.TestnetGenesisBlockIdentifier, Port: 1000, + MaxSyncConcurrency: 64, RPCPort: testnetRPCPort, ConfigPath: testnetConfigPath, Pruning: &PruningConfiguration{ @@ -107,6 +113,45 @@ func TestLoadConfiguration(t *testing.T) { }, }, }, + "max sync set": { + Mode: string(Online), + Network: Testnet, + Port: "1000", + MaxSyncConcurrency: "", + + cfg: &Configuration{ + Mode: Online, + Network: &types.NetworkIdentifier{ + Network: bitcoin.TestnetNetwork, + Blockchain: bitcoin.Blockchain, + }, + Params: bitcoin.TestnetParams, + Currency: bitcoin.TestnetCurrency, + GenesisBlockIdentifier: bitcoin.TestnetGenesisBlockIdentifier, + Port: 1000, + MaxSyncConcurrency: 64, + RPCPort: testnetRPCPort, + ConfigPath: testnetConfigPath, + Pruning: &PruningConfiguration{ + Frequency: pruneFrequency, + Depth: pruneDepth, + MinHeight: minPruneHeight, + }, + Compressors: []*encoder.CompressorEntry{ + { + Namespace: transactionNamespace, + DictionaryPath: testnetTransactionDictionary, + }, + }, + }, + }, + "invalid sync concurrency ": { + Mode: string(Online), + Network: Testnet, + Port: "1000", + MaxSyncConcurrency: "0", + err: errors.New("syncer concurrency must be greater than zero"), + }, "invalid mode": { Mode: "bad mode", Network: Testnet, @@ -136,6 +181,7 @@ func TestLoadConfiguration(t *testing.T) { os.Setenv(ModeEnv, test.Mode) os.Setenv(NetworkEnv, test.Network) os.Setenv(PortEnv, test.Port) + os.Setenv(MaxSyncConcurrency, test.MaxSyncConcurrency) cfg, err := LoadConfiguration(newDir) if test.err != nil { diff --git a/go.mod b/go.mod index 3d10bf54..9240048f 100644 --- a/go.mod +++ b/go.mod @@ -14,3 +14,5 @@ require ( golang.org/x/tools v0.0.0-20200904185747-39188db58858 // indirect honnef.co/go/tools v0.0.1-2020.1.5 // indirect ) + +//replace github.com/coinbase/rosetta-sdk-go v0.6.5 => ../rosetta-sdk-go diff --git a/go.sum b/go.sum index 75c15686..3c17a254 100644 --- a/go.sum +++ b/go.sum @@ -32,6 +32,7 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/aristanetworks/goarista v0.0.0-20170210015632-ea17b1a17847/go.mod h1:D/tb0zPVXnP7fmsLZjtdUhSsumbK/ij54UXjjVgMGxQ= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/aws/aws-sdk-go v1.25.48 h1:J82DYDGZHOKHdhx6hD24Tm30c2C3GchYGfN0mf9iKUk= github.com/aws/aws-sdk-go v1.25.48/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/btcsuite/btcd v0.0.0-20171128150713-2e60448ffcc6/go.mod h1:Dmm/EzmjnCiweXmzRIAiUWCInVmPgjkzgv5k4tVyXiQ= @@ -169,6 +170,7 @@ github.com/jackpal/go-nat-pmp v1.0.2-0.20160603034137-1fa385a6f458/go.mod h1:QPH github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89 h1:12K8AlpT0/6QUXSfV0yi4Q0jkbq8NDtIKFtF61AoqV0= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jrick/logrotate v1.0.0 h1:lQ1bL/n9mBNeIXoTUoYRlK4dHuNJVofX9oWqBtPnSzI= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= diff --git a/indexer/indexer.go b/indexer/indexer.go index 9f8bb199..989dd312 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -109,7 +109,6 @@ type Indexer struct { workers []modules.BlockWorker waiter *waitTable - // Store coins created in pre-store before persisted // in add block so we can optimistically populate // blocks before committed. @@ -123,6 +122,8 @@ type Indexer struct { seenMutex sync.Mutex seenSemaphore *semaphore.Weighted + + maxSync int64 } // CloseDatabase closes a storage.Database. This should be called @@ -229,6 +230,8 @@ func Initialize( coinCache: map[string]*types.AccountCoin{}, coinCacheMutex: new(sdkUtils.PriorityMutex), seenSemaphore: semaphore.NewWeighted(int64(runtime.NumCPU())), + maxSync: config.MaxSyncConcurrency, + } coinStorage := modules.NewCoinStorage( @@ -296,6 +299,7 @@ func (i *Indexer) Sync(ctx context.Context) error { syncer.WithCacheSize(syncer.DefaultCacheSize), syncer.WithSizeMultiplier(sizeMultiplier), syncer.WithPastBlocks(pastBlocks), + syncer.WithMaxConcurrency(i.maxSync), ) return syncer.Sync(ctx, startIndex, indexPlaceholder) diff --git a/indexer/indexer_test.go b/indexer/indexer_test.go index 08e05d31..373ac1aa 100644 --- a/indexer/indexer_test.go +++ b/indexer/indexer_test.go @@ -62,6 +62,7 @@ func TestIndexer_Pruning(t *testing.T) { Network: bitcoin.MainnetNetwork, Blockchain: bitcoin.Blockchain, }, + MaxSyncConcurrency: 1, GenesisBlockIdentifier: bitcoin.MainnetGenesisBlockIdentifier, Pruning: &configuration.PruningConfiguration{ Frequency: 50 * time.Millisecond, @@ -227,6 +228,7 @@ func TestIndexer_Transactions(t *testing.T) { Network: bitcoin.MainnetNetwork, Blockchain: bitcoin.Blockchain, }, + MaxSyncConcurrency: 2, GenesisBlockIdentifier: bitcoin.MainnetGenesisBlockIdentifier, IndexerPath: newDir, } @@ -445,6 +447,7 @@ func TestIndexer_Reorg(t *testing.T) { Network: bitcoin.MainnetNetwork, Blockchain: bitcoin.Blockchain, }, + MaxSyncConcurrency: 64, GenesisBlockIdentifier: bitcoin.MainnetGenesisBlockIdentifier, IndexerPath: newDir, } @@ -687,6 +690,7 @@ func TestIndexer_HeaderReorg(t *testing.T) { Network: bitcoin.MainnetNetwork, Blockchain: bitcoin.Blockchain, }, + MaxSyncConcurrency: 64, GenesisBlockIdentifier: bitcoin.MainnetGenesisBlockIdentifier, IndexerPath: newDir, }