Skip to content

Commit

Permalink
Addressing issue coinbase#41 - Allow max concurrency limits configura…
Browse files Browse the repository at this point in the history
…tion on the indexer
  • Loading branch information
smeyerhot committed Dec 24, 2020
1 parent 1b4c984 commit 91c39eb
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 9 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ run-mainnet-offline:
docker run -d --rm -e "MODE=OFFLINE" -e "NETWORK=MAINNET" -e "PORT=8081" -p 8081:8081 rosetta-bitcoin:latest

run-testnet-online:
docker run -d --rm --ulimit "nofile=${NOFILE}:${NOFILE}" -v "${PWD}/bitcoin-data:/data" -e "MODE=ONLINE" -e "NETWORK=TESTNET" -e "PORT=8080" -p 8080:8080 -p 18333:18333 rosetta-bitcoin:latest
docker run -d --rm --ulimit "nofile=${NOFILE}:${NOFILE}" -v "${PWD}/bitcoin-data:/data" -e "MAXSYNC"="16" -e "MODE=ONLINE" -e "NETWORK=TESTNET" -e "PORT=8080" -p 8080:8080 -p 18333:18333 rosetta-bitcoin:latest

run-testnet-offline:
docker run -d --rm -e "MODE=OFFLINE" -e "NETWORK=TESTNET" -e "PORT=8081" -p 8081:8081 rosetta-bitcoin:latest
Expand Down
20 changes: 19 additions & 1 deletion configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package configuration
import (
"errors"
"fmt"
"github.com/coinbase/rosetta-sdk-go/syncer"
"os"
"path"
"strconv"
Expand Down Expand Up @@ -88,7 +89,11 @@ const (

// ModeEnv is the environment variable read
// to determine mode.
ModeEnv = "MODE"
ModeEnv= "MODE"

//MaxSyncConcurrency is an enviroment variable
//used to cap the syncer concurrency
MaxSyncConcurrency = "MAXSYNC"

// NetworkEnv is the environment variable
// read to determine network.
Expand Down Expand Up @@ -122,18 +127,29 @@ type Configuration struct {
IndexerPath string
BitcoindPath string
Compressors []*encoder.CompressorEntry
MaxSyncConcurrency int64
}

// LoadConfiguration attempts to create a new Configuration
// using the ENVs in the environment.
func LoadConfiguration(baseDirectory string) (*Configuration, error) {

config := &Configuration{}
config.Pruning = &PruningConfiguration{
Frequency: pruneFrequency,
Depth: pruneDepth,
MinHeight: minPruneHeight,
}
maxSyncValue := os.Getenv(MaxSyncConcurrency)
switch maxSyncValue {

case "":
config.MaxSyncConcurrency = syncer.DefaultConcurrency
case "0":
return nil, errors.New("syncer concurrency must be greater than zero")
default:
config.MaxSyncConcurrency, _ = strconv.ParseInt(maxSyncValue,10,64)
}
modeValue := Mode(os.Getenv(ModeEnv))
switch modeValue {
case Online:
Expand Down Expand Up @@ -180,6 +196,8 @@ func LoadConfiguration(baseDirectory string) (*Configuration, error) {
}
config.GenesisBlockIdentifier = bitcoin.TestnetGenesisBlockIdentifier
config.Params = bitcoin.TestnetParams


config.Currency = bitcoin.TestnetCurrency
config.ConfigPath = testnetConfigPath
config.RPCPort = testnetRPCPort
Expand Down
58 changes: 52 additions & 6 deletions configuration/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,24 @@ package configuration

import (
"errors"
"github.com/coinbase/rosetta-bitcoin/bitcoin"
"os"
"path"
"testing"

"github.com/coinbase/rosetta-bitcoin/bitcoin"

"github.com/coinbase/rosetta-sdk-go/storage/encoder"
"github.com/coinbase/rosetta-sdk-go/types"
"github.com/coinbase/rosetta-sdk-go/utils"
"github.com/stretchr/testify/assert"
)


func TestLoadConfiguration(t *testing.T) {
tests := map[string]struct {
Mode string
Network string
Port string
Mode string
Network string
Port string
MaxSyncConcurrency string

cfg *Configuration
err error
Expand All @@ -44,15 +45,17 @@ func TestLoadConfiguration(t *testing.T) {
Mode: string(Online),
err: errors.New("NETWORK must be populated"),
},
"only mode and network set": {
"port not set": {
Mode: string(Online),
Network: Mainnet,
MaxSyncConcurrency: "64",
err: errors.New("PORT must be populated"),
},
"all set (mainnet)": {
Mode: string(Online),
Network: Mainnet,
Port: "1000",
MaxSyncConcurrency: "64",
cfg: &Configuration{
Mode: Online,
Network: &types.NetworkIdentifier{
Expand All @@ -63,6 +66,7 @@ func TestLoadConfiguration(t *testing.T) {
Currency: bitcoin.MainnetCurrency,
GenesisBlockIdentifier: bitcoin.MainnetGenesisBlockIdentifier,
Port: 1000,
MaxSyncConcurrency: 64,
RPCPort: mainnetRPCPort,
ConfigPath: mainnetConfigPath,
Pruning: &PruningConfiguration{
Expand All @@ -82,6 +86,7 @@ func TestLoadConfiguration(t *testing.T) {
Mode: string(Online),
Network: Testnet,
Port: "1000",
MaxSyncConcurrency: "64",
cfg: &Configuration{
Mode: Online,
Network: &types.NetworkIdentifier{
Expand All @@ -92,6 +97,7 @@ func TestLoadConfiguration(t *testing.T) {
Currency: bitcoin.TestnetCurrency,
GenesisBlockIdentifier: bitcoin.TestnetGenesisBlockIdentifier,
Port: 1000,
MaxSyncConcurrency: 64,
RPCPort: testnetRPCPort,
ConfigPath: testnetConfigPath,
Pruning: &PruningConfiguration{
Expand All @@ -107,6 +113,45 @@ func TestLoadConfiguration(t *testing.T) {
},
},
},
"max sync set": {
Mode: string(Online),
Network: Testnet,
Port: "1000",
MaxSyncConcurrency: "",

cfg: &Configuration{
Mode: Online,
Network: &types.NetworkIdentifier{
Network: bitcoin.TestnetNetwork,
Blockchain: bitcoin.Blockchain,
},
Params: bitcoin.TestnetParams,
Currency: bitcoin.TestnetCurrency,
GenesisBlockIdentifier: bitcoin.TestnetGenesisBlockIdentifier,
Port: 1000,
MaxSyncConcurrency: 64,
RPCPort: testnetRPCPort,
ConfigPath: testnetConfigPath,
Pruning: &PruningConfiguration{
Frequency: pruneFrequency,
Depth: pruneDepth,
MinHeight: minPruneHeight,
},
Compressors: []*encoder.CompressorEntry{
{
Namespace: transactionNamespace,
DictionaryPath: testnetTransactionDictionary,
},
},
},
},
"invalid sync concurrency ": {
Mode: string(Online),
Network: Testnet,
Port: "1000",
MaxSyncConcurrency: "0",
err: errors.New("syncer concurrency must be greater than zero"),
},
"invalid mode": {
Mode: "bad mode",
Network: Testnet,
Expand Down Expand Up @@ -136,6 +181,7 @@ func TestLoadConfiguration(t *testing.T) {
os.Setenv(ModeEnv, test.Mode)
os.Setenv(NetworkEnv, test.Network)
os.Setenv(PortEnv, test.Port)
os.Setenv(MaxSyncConcurrency, test.MaxSyncConcurrency)

cfg, err := LoadConfiguration(newDir)
if test.err != nil {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ require (
golang.org/x/tools v0.0.0-20200904185747-39188db58858 // indirect
honnef.co/go/tools v0.0.1-2020.1.5 // indirect
)

//replace github.com/coinbase/rosetta-sdk-go v0.6.5 => ../rosetta-sdk-go
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/aristanetworks/goarista v0.0.0-20170210015632-ea17b1a17847/go.mod h1:D/tb0zPVXnP7fmsLZjtdUhSsumbK/ij54UXjjVgMGxQ=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/aws/aws-sdk-go v1.25.48 h1:J82DYDGZHOKHdhx6hD24Tm30c2C3GchYGfN0mf9iKUk=
github.com/aws/aws-sdk-go v1.25.48/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/btcsuite/btcd v0.0.0-20171128150713-2e60448ffcc6/go.mod h1:Dmm/EzmjnCiweXmzRIAiUWCInVmPgjkzgv5k4tVyXiQ=
Expand Down Expand Up @@ -169,6 +170,7 @@ github.com/jackpal/go-nat-pmp v1.0.2-0.20160603034137-1fa385a6f458/go.mod h1:QPH
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89 h1:12K8AlpT0/6QUXSfV0yi4Q0jkbq8NDtIKFtF61AoqV0=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jrick/logrotate v1.0.0 h1:lQ1bL/n9mBNeIXoTUoYRlK4dHuNJVofX9oWqBtPnSzI=
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
Expand Down
6 changes: 5 additions & 1 deletion indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ type Indexer struct {
workers []modules.BlockWorker

waiter *waitTable

// Store coins created in pre-store before persisted
// in add block so we can optimistically populate
// blocks before committed.
Expand All @@ -123,6 +122,8 @@ type Indexer struct {
seenMutex sync.Mutex

seenSemaphore *semaphore.Weighted

maxSync int64
}

// CloseDatabase closes a storage.Database. This should be called
Expand Down Expand Up @@ -229,6 +230,8 @@ func Initialize(
coinCache: map[string]*types.AccountCoin{},
coinCacheMutex: new(sdkUtils.PriorityMutex),
seenSemaphore: semaphore.NewWeighted(int64(runtime.NumCPU())),
maxSync: config.MaxSyncConcurrency,

}

coinStorage := modules.NewCoinStorage(
Expand Down Expand Up @@ -296,6 +299,7 @@ func (i *Indexer) Sync(ctx context.Context) error {
syncer.WithCacheSize(syncer.DefaultCacheSize),
syncer.WithSizeMultiplier(sizeMultiplier),
syncer.WithPastBlocks(pastBlocks),
syncer.WithMaxConcurrency(i.maxSync),
)

return syncer.Sync(ctx, startIndex, indexPlaceholder)
Expand Down
4 changes: 4 additions & 0 deletions indexer/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func TestIndexer_Pruning(t *testing.T) {
Network: bitcoin.MainnetNetwork,
Blockchain: bitcoin.Blockchain,
},
MaxSyncConcurrency: 1,
GenesisBlockIdentifier: bitcoin.MainnetGenesisBlockIdentifier,
Pruning: &configuration.PruningConfiguration{
Frequency: 50 * time.Millisecond,
Expand Down Expand Up @@ -227,6 +228,7 @@ func TestIndexer_Transactions(t *testing.T) {
Network: bitcoin.MainnetNetwork,
Blockchain: bitcoin.Blockchain,
},
MaxSyncConcurrency: 2,
GenesisBlockIdentifier: bitcoin.MainnetGenesisBlockIdentifier,
IndexerPath: newDir,
}
Expand Down Expand Up @@ -445,6 +447,7 @@ func TestIndexer_Reorg(t *testing.T) {
Network: bitcoin.MainnetNetwork,
Blockchain: bitcoin.Blockchain,
},
MaxSyncConcurrency: 64,
GenesisBlockIdentifier: bitcoin.MainnetGenesisBlockIdentifier,
IndexerPath: newDir,
}
Expand Down Expand Up @@ -687,6 +690,7 @@ func TestIndexer_HeaderReorg(t *testing.T) {
Network: bitcoin.MainnetNetwork,
Blockchain: bitcoin.Blockchain,
},
MaxSyncConcurrency: 64,
GenesisBlockIdentifier: bitcoin.MainnetGenesisBlockIdentifier,
IndexerPath: newDir,
}
Expand Down

0 comments on commit 91c39eb

Please sign in to comment.