Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add standalone dbs and configs #1354

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
27 changes: 27 additions & 0 deletions plugin/evm/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"time"

"github.com/ava-labs/avalanchego/database/pebbledb"
"github.com/ava-labs/subnet-evm/core/txpool/legacypool"
"github.com/ava-labs/subnet-evm/eth"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -60,8 +61,11 @@ const (
// - state sync time: ~6 hrs.
defaultStateSyncMinBlocks = 300_000
defaultStateSyncRequestSize = 1024 // the number of key/values to ask peers for per request
defaultDBType = pebbledb.Name
)

type PBool bool

var (
defaultEnabledAPIs = []string{
"eth",
Expand Down Expand Up @@ -225,6 +229,14 @@ type Config struct {

// RPC settings
HttpBodyLimit uint64 `json:"http-body-limit"`

// Database settings
UseStandaloneDatabase *PBool `json:"use-standalone-database"`
DatabaseConfigContent string `json:"database-config"`
DatabaseConfigFile string `json:"database-config-file"`
DatabaseType string `json:"database-type"`
DatabasePath string `json:"database-path"`
DatabaseReadOnly bool `json:"database-read-only"`
}

// EthAPIs returns an array of strings representing the Eth APIs that should be enabled
Expand Down Expand Up @@ -284,6 +296,7 @@ func (c *Config) SetDefaults() {
c.StateSyncRequestSize = defaultStateSyncRequestSize
c.AllowUnprotectedTxHashes = defaultAllowUnprotectedTxHashes
c.AcceptedCacheSize = defaultAcceptedCacheSize
c.DatabaseType = defaultDBType
}

func (d *Duration) UnmarshalJSON(data []byte) (err error) {
Expand Down Expand Up @@ -338,3 +351,17 @@ func (c *Config) Deprecate() string {

return msg
}

func (p *PBool) String() string {
if p == nil {
return "nil"
}
return fmt.Sprintf("%t", *p)
}

func (p *PBool) Bool() bool {
if p == nil {
return false
}
return bool(*p)
}
4 changes: 1 addition & 3 deletions plugin/evm/syncervm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,7 @@ func TestVMShutdownWhileSyncing(t *testing.T) {
}

func createSyncServerAndClientVMs(t *testing.T, test syncTest, numBlocks int) *syncVMSetup {
var (
require = require.New(t)
)
require := require.New(t)
// configure [serverVM]
_, serverVM, _, serverAppSender := GenesisVM(t, true, genesisJSONLatest, "", "")
t.Cleanup(func() {
Expand Down
200 changes: 176 additions & 24 deletions plugin/evm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package evm

import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/ava-labs/avalanchego/network/p2p/gossip"
"github.com/prometheus/client_golang/prometheus"

avalancheNode "github.com/ava-labs/avalanchego/node"
"github.com/ava-labs/subnet-evm/commontype"
"github.com/ava-labs/subnet-evm/consensus/dummy"
"github.com/ava-labs/subnet-evm/constants"
Expand Down Expand Up @@ -66,7 +68,10 @@ import (
avalancheRPC "github.com/gorilla/rpc/v2"

"github.com/ava-labs/avalanchego/codec"
"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/database/leveldb"
"github.com/ava-labs/avalanchego/database/memdb"
"github.com/ava-labs/avalanchego/database/meterdb"
"github.com/ava-labs/avalanchego/database/pebbledb"
"github.com/ava-labs/avalanchego/database/prefixdb"
"github.com/ava-labs/avalanchego/database/versiondb"
"github.com/ava-labs/avalanchego/ids"
Expand All @@ -81,7 +86,10 @@ import (

commonEng "github.com/ava-labs/avalanchego/snow/engine/common"

avalanchemetrics "github.com/ava-labs/avalanchego/api/metrics"
"github.com/ava-labs/avalanchego/database"
avalancheUtils "github.com/ava-labs/avalanchego/utils"
avalancheconstants "github.com/ava-labs/avalanchego/utils/constants"
avalancheJSON "github.com/ava-labs/avalanchego/utils/json"
)

Expand All @@ -106,6 +114,7 @@ const (
ethMetricsPrefix = "eth"
sdkMetricsPrefix = "sdk"
chainStateMetricsPrefix = "chain_state"
dbMetricsPrefix = "db"

// gossip constants
pushGossipDiscardedElements = 16_384
Expand Down Expand Up @@ -201,7 +210,6 @@ type VM struct {
// [acceptedBlockDB] is the database to store the last accepted
// block.
acceptedBlockDB database.Database

// [warpDB] is used to store warp message signatures
// set to a prefixDB with the prefix [warpPrefix]
warpDB database.Database
Expand Down Expand Up @@ -246,6 +254,7 @@ type VM struct {
ethTxPushGossiper avalancheUtils.Atomic[*gossip.PushGossiper[*GossipEthTx]]
ethTxPullGossiper gossip.Gossiper

chainAlias string
// RPC handlers (should be stopped before closing chaindb)
rpcHandlers []interface{ Stop() }
}
Expand Down Expand Up @@ -284,8 +293,9 @@ func (vm *VM) Initialize(
// fallback to ChainID string instead of erroring
alias = vm.ctx.ChainID.String()
}
vm.chainAlias = alias

subnetEVMLogger, err := InitLogger(alias, vm.config.LogLevel, vm.config.LogJSONFormat, vm.ctx.Log)
subnetEVMLogger, err := InitLogger(vm.chainAlias, vm.config.LogLevel, vm.config.LogJSONFormat, vm.ctx.Log)
if err != nil {
return fmt.Errorf("failed to initialize logger due to: %w ", err)
}
Expand All @@ -306,16 +316,15 @@ func (vm *VM) Initialize(

vm.toEngine = toEngine
vm.shutdownChan = make(chan struct{}, 1)
// Use NewNested rather than New so that the structure of the database
// remains the same regardless of the provided baseDB type.
vm.chaindb = rawdb.NewDatabase(Database{prefixdb.NewNested(ethDBPrefix, db)})
vm.db = versiondb.New(db)
vm.acceptedBlockDB = prefixdb.New(acceptedPrefix, vm.db)
vm.metadataDB = prefixdb.New(metadataPrefix, vm.db)
// Note warpDB is not part of versiondb because it is not necessary
// that warp signatures are committed to the database atomically with
// the last accepted block.
vm.warpDB = prefixdb.New(warpPrefix, db)

if err := vm.initializeMetrics(); err != nil {
return fmt.Errorf("failed to initialize metrics: %w", err)
}

// Initialize the database
if err := vm.initializeDBs(db); err != nil {
return fmt.Errorf("failed to initialize databases: %w", err)
}

if vm.config.InspectDatabase {
start := time.Now()
Expand Down Expand Up @@ -466,10 +475,6 @@ func (vm *VM) Initialize(
}
log.Info(fmt.Sprintf("lastAccepted = %s", lastAcceptedHash))

if err := vm.initializeMetrics(); err != nil {
return err
}

// initialize peer network
if vm.p2pSender == nil {
vm.p2pSender = appSender
Expand Down Expand Up @@ -800,8 +805,8 @@ func (vm *VM) initBlockBuilding() error {
// setAppRequestHandlers sets the request handlers for the VM to serve state sync
// requests.
func (vm *VM) setAppRequestHandlers() {
// Create separate EVM TrieDB (read only) for serving leafs requests.
// We create a separate TrieDB here, so that it has a separate cache from the one
// Create standalone EVM TrieDB (read only) for serving leafs requests.
// We create a standalone TrieDB here, so that it has a standalone cache from the one
// used by the node when processing blocks.
evmTrieDB := triedb.NewDatabase(
vm.chaindb,
Expand Down Expand Up @@ -1010,13 +1015,9 @@ func (vm *VM) CreateHandlers(context.Context) (map[string]http.Handler, error) {
return nil, err
}

primaryAlias, err := vm.ctx.BCLookup.PrimaryAlias(vm.ctx.ChainID)
if err != nil {
return nil, fmt.Errorf("failed to get primary alias for chain due to %w", err)
}
apis := make(map[string]http.Handler)
if vm.config.AdminAPIEnabled {
adminAPI, err := newHandler("admin", NewAdminService(vm, os.ExpandEnv(fmt.Sprintf("%s_subnet_evm_performance_%s", vm.config.AdminAPIDir, primaryAlias))))
adminAPI, err := newHandler("admin", NewAdminService(vm, os.ExpandEnv(fmt.Sprintf("%s_subnet_evm_performance_%s", vm.config.AdminAPIDir, vm.chainAlias))))
if err != nil {
return nil, fmt.Errorf("failed to register service for admin API due to %w", err)
}
Expand Down Expand Up @@ -1190,3 +1191,154 @@ func attachEthService(handler *rpc.Server, apis []rpc.API, names []string) error

return nil
}

// useStandaloneDatabase returns true if the chain can and should use a standalone database
// other than given by [db] in Initialize()
func (vm *VM) useStandaloneDatabase(acceptedDB database.Database) (bool, error) {
// no config provided, use default
standaloneDBFlag := vm.config.UseStandaloneDatabase
if standaloneDBFlag != nil {
return standaloneDBFlag.Bool(), nil
}

// check if the chain can use a standalone database
_, err := acceptedDB.Get(lastAcceptedKey)
if err == database.ErrNotFound {
// If there is nothing in the database, we can use the standalone database
return true, nil
}
return false, err
}

// getDatabaseConfig returns the database configuration for the chain
// to be used by separate, standalone database.
func getDatabaseConfig(config Config, chainDataDir string) (avalancheNode.DatabaseConfig, error) {
var (
configBytes []byte
err error
)
if len(config.DatabaseConfigContent) != 0 {
dbConfigContent := config.DatabaseConfigContent
configBytes, err = base64.StdEncoding.DecodeString(dbConfigContent)
if err != nil {
return avalancheNode.DatabaseConfig{}, fmt.Errorf("unable to decode base64 content: %w", err)
}
} else if len(config.DatabaseConfigFile) != 0 {
configPath := config.DatabaseConfigFile
configBytes, err = os.ReadFile(configPath)
if err != nil {
return avalancheNode.DatabaseConfig{}, err
}
}

dbPath := filepath.Join(chainDataDir, "db")
if len(config.DatabasePath) != 0 {
dbPath = config.DatabasePath
}

return avalancheNode.DatabaseConfig{
Name: config.DatabaseType,
ReadOnly: config.DatabaseReadOnly,
Path: dbPath,
Config: configBytes,
}, nil
}

// initializeDBs initializes the databases used by the VM.
// If [useStandaloneDB] is true, the chain will use a standalone database for its state.
// Otherwise, the chain will use the provided [avaDB] for its state.
func (vm *VM) initializeDBs(avaDB database.Database) error {
db := avaDB
// skip standalone database initialization if we are running in unit tests
if vm.ctx.NetworkID != avalancheconstants.UnitTestID {
// first initialize the accepted block database to check if we need to use a standalone database
verDB := versiondb.New(avaDB)
acceptedDB := prefixdb.New(acceptedPrefix, verDB)
useStandAloneDB, err := vm.useStandaloneDatabase(acceptedDB)
if err != nil {
return err
}
if useStandAloneDB {
// If we are using a standalone database, we need to create a new database
// for the chain state.
dbConfig, err := getDatabaseConfig(vm.config, vm.ctx.ChainDataDir)
if err != nil {
return err
}
log.Info("Using standalone database for the chain state", "DatabaseConfig", dbConfig)
db, err = vm.createDatabase(dbConfig)
if err != nil {
return err
}
}
}
// Use NewNested rather than New so that the structure of the database
// remains the same regardless of the provided baseDB type.
vm.chaindb = rawdb.NewDatabase(Database{prefixdb.NewNested(ethDBPrefix, db)})
vm.db = versiondb.New(db)
vm.acceptedBlockDB = prefixdb.New(acceptedPrefix, db)
vm.metadataDB = prefixdb.New(metadataPrefix, db)
// Note warpDB is not part of versiondb because it is not necessary
// that warp signatures are committed to the database atomically with
// the last accepted block.
// [warpDB] is used to store warp message signatures
// set to a prefixDB with the prefix [warpPrefix]
vm.warpDB = prefixdb.New(warpPrefix, db)
return nil
}

// createDatabase returns a new database instance with the provided configuration
func (vm *VM) createDatabase(dbConfig avalancheNode.DatabaseConfig) (database.Database, error) {
dbRegisterer, err := avalanchemetrics.MakeAndRegister(
vm.ctx.Metrics,
dbMetricsPrefix,
)
if err != nil {
return nil, err
}
var db database.Database
// start the db
switch dbConfig.Name {
case leveldb.Name:
dbPath := filepath.Join(dbConfig.Path, leveldb.Name)
db, err = leveldb.New(dbPath, dbConfig.Config, vm.ctx.Log, dbRegisterer)
if err != nil {
return nil, fmt.Errorf("couldn't create %s at %s: %w", leveldb.Name, dbPath, err)
}
case memdb.Name:
db = memdb.New()
case pebbledb.Name:
dbPath := filepath.Join(dbConfig.Path, pebbledb.Name)
db, err = pebbledb.New(dbPath, dbConfig.Config, vm.ctx.Log, dbRegisterer)
if err != nil {
return nil, fmt.Errorf("couldn't create %s at %s: %w", pebbledb.Name, dbPath, err)
}
default:
return nil, fmt.Errorf(
"db-type was %q but should have been one of {%s, %s, %s}",
dbConfig.Name,
leveldb.Name,
memdb.Name,
pebbledb.Name,
)
}

if dbConfig.ReadOnly && dbConfig.Name != memdb.Name {
db = versiondb.New(db)
}

meterDBReg, err := avalanchemetrics.MakeAndRegister(
vm.ctx.Metrics,
"meterdb",
)
if err != nil {
return nil, err
}

db, err = meterdb.New(meterDBReg, db)
if err != nil {
return nil, fmt.Errorf("failed to create meterdb: %w", err)
}

return db, nil
}
7 changes: 2 additions & 5 deletions plugin/evm/vm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,8 @@ func setupGenesis(
genesisBytes := buildGenesisTest(t, genesisJSON)
ctx := NewContext()

baseDB := memdb.New()

// initialize the atomic memory
atomicMemory := atomic.NewMemory(prefixdb.New([]byte{0}, baseDB))
atomicMemory := atomic.NewMemory(prefixdb.New([]byte{0}, memdb.New()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this file could be reverted if preferred

ctx.SharedMemory = atomicMemory.NewSharedMemory(ctx.ChainID)

// NB: this lock is intentionally left locked when this function returns.
Expand All @@ -206,8 +204,7 @@ func setupGenesis(
ctx.Keystore = userKeystore.NewBlockchainKeyStore(ctx.ChainID)

issuer := make(chan commonEng.Message, 1)
prefixedDB := prefixdb.New([]byte{1}, baseDB)
return ctx, prefixedDB, genesisBytes, issuer, atomicMemory
return ctx, memdb.New(), genesisBytes, issuer, atomicMemory
}

// GenesisVM creates a VM instance with the genesis test bytes and returns
Expand Down
4 changes: 1 addition & 3 deletions plugin/evm/vm_upgrade_bytes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ import (
"github.com/stretchr/testify/require"
)

var (
DefaultEtnaTime = uint64(upgrade.GetConfig(testNetworkID).EtnaTime.Unix())
)
var DefaultEtnaTime = uint64(upgrade.GetConfig(testNetworkID).EtnaTime.Unix())

func TestVMUpgradeBytesPrecompile(t *testing.T) {
// Make a TxAllowListConfig upgrade at genesis and convert it to JSON to apply as upgradeBytes.
Expand Down
Loading
Loading