Skip to content

Commit

Permalink
Merge branch 'main' into chore/update-contracts-2
Browse files Browse the repository at this point in the history
  • Loading branch information
leovct authored Nov 11, 2023
2 parents f3e4b0b + 2b5567b commit 84c222d
Show file tree
Hide file tree
Showing 10 changed files with 490 additions and 245 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ Note: Do not modify this section! It is auto-generated by `cobra` using `make ge

- [polycli abi](doc/polycli_abi.md) - Parse an ABI and print the encoded signatures.

- [polycli dbbench](doc/polycli_dbbench.md) - Perform a level/pebble db benchmark

- [polycli dumpblocks](doc/polycli_dumpblocks.md) - Export a range of blocks from a JSON-RPC endpoint.

- [polycli enr](doc/polycli_enr.md) - Convert between ENR and Enode format
Expand All @@ -50,8 +52,6 @@ Note: Do not modify this section! It is auto-generated by `cobra` using `make ge

- [polycli hash](doc/polycli_hash.md) - Provide common crypto hashing functions.

- [polycli leveldbbench](doc/polycli_leveldbbench.md) - Perform a level db benchmark

- [polycli loadtest](doc/polycli_loadtest.md) - Run a generic load test against an Eth/EVM style JSON-RPC endpoint.

- [polycli metrics-to-dash](doc/polycli_metrics-to-dash.md) - Create a dashboard from an Openmetrics / Prometheus response.
Expand Down
163 changes: 89 additions & 74 deletions cmd/leveldbbench/leveldbbench.go → cmd/dbbench/dbbench.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package leveldbbench
package dbbench

import (
"context"
Expand All @@ -22,11 +22,7 @@ import (
"github.com/rs/zerolog/log"
progressbar "github.com/schollz/progressbar/v3"
"github.com/spf13/cobra"
leveldb "github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)

var (
Expand Down Expand Up @@ -55,6 +51,7 @@ var (
readOnly *bool
dbPath *string
fullScan *bool
dbMode *string
)

const (
Expand Down Expand Up @@ -96,12 +93,11 @@ type (
TestDuration time.Duration
Description string
OpCount uint64
Stats *leveldb.DBStats
OpRate float64
ValueDist []uint64
}
RandomKeySeeker struct {
db *leveldb.DB
db KeyValueDB
iterator iterator.Iterator
iteratorMutex sync.Mutex
firstKey []byte
Expand All @@ -115,16 +111,19 @@ type (
ranges []IORange
totalFrequency int
}
// KeyValueDB directly exposes the necessary methods of leveldb.DB that we need to run the test so that they can be
// implemented by other KV stores
KeyValueDB interface {
Close() error
Compact() error
NewIterator() iterator.Iterator
Get([]byte) ([]byte, error)
Put([]byte, []byte) error
}
)

func NewTestResult(startTime, endTime time.Time, desc string, opCount uint64, db *leveldb.DB) *TestResult {
func NewTestResult(startTime, endTime time.Time, desc string, opCount uint64) *TestResult {
tr := new(TestResult)
s := new(leveldb.DBStats)
err := db.Stats(s)
if err != nil {
log.Error().Err(err).Msg("Unable to retrieve db stats")
}
tr.Stats = s
tr.StartTime = startTime
tr.EndTime = endTime
tr.TestDuration = endTime.Sub(startTime)
Expand All @@ -137,41 +136,31 @@ func NewTestResult(startTime, endTime time.Time, desc string, opCount uint64, db
return tr
}

var LevelDBBenchCmd = &cobra.Command{
Use: "leveldbbench [flags]",
Short: "Perform a level db benchmark",
var DBBenchCmd = &cobra.Command{
Use: "dbbench [flags]",
Short: "Perform a level/pebble db benchmark",
Long: usage,
RunE: func(cmd *cobra.Command, args []string) error {
log.Info().Msg("Starting level db test")
db, err := leveldb.OpenFile(*dbPath, &opt.Options{
Filter: filter.NewBloomFilter(10),
DisableSeeksCompaction: true,
OpenFilesCacheCapacity: *openFilesCacheCapacity,
BlockCacheCapacity: *cacheSize / 2 * opt.MiB,
WriteBuffer: *cacheSize / 4 * opt.MiB,
// if we've disabled writes, or we're doing a full scan, we should open the database in read only mode
ReadOnly: *readOnly || *fullScan,
})
if err != nil {
return err
log.Info().Msg("Starting db test")
var kvdb KeyValueDB
var err error
switch *dbMode {
case "leveldb":
kvdb, err = NewWrappedLevelDB()
if err != nil {
return err
}
case "pebbledb":
kvdb, err = NewWrappedPebbleDB()
if err != nil {
return err
}
default:
return fmt.Errorf("the mode %s is not recognized", *dbMode)
}

ctx := context.Background()
wo := opt.WriteOptions{
NoWriteMerge: *noWriteMerge,
Sync: *syncWrites,
}
ro := &opt.ReadOptions{
DontFillCache: *dontFillCache,
}
if *readStrict {
ro.Strict = opt.StrictAll
} else {
ro.Strict = opt.DefaultStrict
}
if *nilReadOptions {
ro = nil
}

var start time.Time
trs := make([]*TestResult, 0)

Expand All @@ -186,8 +175,8 @@ var LevelDBBenchCmd = &cobra.Command{

if *fullScan {
start = time.Now()
opCount, valueDist := runFullScan(ctx, db, &wo, ro)
tr := NewTestResult(start, time.Now(), "full scan", opCount, db)
opCount, valueDist := runFullScan(ctx, kvdb)
tr := NewTestResult(start, time.Now(), "full scan", opCount)
tr.ValueDist = valueDist
trs = append(trs, tr)
return printSummary(trs)
Expand All @@ -196,32 +185,32 @@ var LevelDBBenchCmd = &cobra.Command{
// in no write mode, we assume the database as already been populated in a previous run or we're using some other database
if !*readOnly {
start = time.Now()
writeData(ctx, db, &wo, 0, *writeLimit, *sequentialWrites)
trs = append(trs, NewTestResult(start, time.Now(), fmt.Sprintf("initial %s write", sequentialWritesDesc), *writeLimit, db))
writeData(ctx, kvdb, 0, *writeLimit, *sequentialWrites)
trs = append(trs, NewTestResult(start, time.Now(), fmt.Sprintf("initial %s write", sequentialWritesDesc), *writeLimit))

for i := 0; i < int(*overwriteCount); i += 1 {
start = time.Now()
writeData(ctx, db, &wo, 0, *writeLimit, *sequentialWrites)
trs = append(trs, NewTestResult(start, time.Now(), fmt.Sprintf("%s overwrite %d", sequentialWritesDesc, i), *writeLimit, db))
writeData(ctx, kvdb, 0, *writeLimit, *sequentialWrites)
trs = append(trs, NewTestResult(start, time.Now(), fmt.Sprintf("%s overwrite %d", sequentialWritesDesc, i), *writeLimit))
}

start = time.Now()
runFullCompact(ctx, db, &wo)
trs = append(trs, NewTestResult(start, time.Now(), "compaction", 1, db))
runFullCompact(ctx, kvdb)
trs = append(trs, NewTestResult(start, time.Now(), "compaction", 1))
}

if *sequentialReads {
start = time.Now()
readSeq(ctx, db, &wo, *readLimit)
trs = append(trs, NewTestResult(start, time.Now(), fmt.Sprintf("%s read", sequentialReadsDesc), *readLimit, db))
readSeq(ctx, kvdb, *readLimit)
trs = append(trs, NewTestResult(start, time.Now(), fmt.Sprintf("%s read", sequentialReadsDesc), *readLimit))
} else {
start = time.Now()
readRandom(ctx, db, ro, *readLimit)
trs = append(trs, NewTestResult(start, time.Now(), fmt.Sprintf("%s read", sequentialWritesDesc), *readLimit, db))
readRandom(ctx, kvdb, *readLimit)
trs = append(trs, NewTestResult(start, time.Now(), fmt.Sprintf("%s read", sequentialWritesDesc), *readLimit))
}

log.Info().Msg("Close DB")
err = db.Close()
err = kvdb.Close()
if err != nil {
log.Error().Err(err).Msg("Error while closing db")
}
Expand Down Expand Up @@ -250,19 +239,19 @@ func printSummary(trs []*TestResult) error {
return nil
}

func runFullCompact(ctx context.Context, db *leveldb.DB, wo *opt.WriteOptions) {
err := db.CompactRange(util.Range{Start: nil, Limit: nil})
func runFullCompact(ctx context.Context, db KeyValueDB) {
err := db.Compact()
if err != nil {
log.Fatal().Err(err).Msg("Error compacting data")
}
}
func runFullScan(ctx context.Context, db *leveldb.DB, wo *opt.WriteOptions, ro *opt.ReadOptions) (uint64, []uint64) {
func runFullScan(ctx context.Context, db KeyValueDB) (uint64, []uint64) {
pool := make(chan bool, *degreeOfParallelism)
var wg sync.WaitGroup
// 32 should be safe here. That would correspond to a single value that's 4.2 GB
buckets := make([]uint64, 32)
var bucketsMutex sync.Mutex
iter := db.NewIterator(nil, nil)
iter := db.NewIterator()
var opCount uint64 = 0
for iter.Next() {
pool <- true
Expand Down Expand Up @@ -314,7 +303,7 @@ func runFullScan(ctx context.Context, db *leveldb.DB, wo *opt.WriteOptions, ro *
}
return opCount, buckets
}
func writeData(ctx context.Context, db *leveldb.DB, wo *opt.WriteOptions, startIndex, writeLimit uint64, sequential bool) {
func writeData(ctx context.Context, db KeyValueDB, startIndex, writeLimit uint64, sequential bool) {
var i uint64 = startIndex
var wg sync.WaitGroup
pool := make(chan bool, *degreeOfParallelism)
Expand All @@ -326,7 +315,7 @@ func writeData(ctx context.Context, db *leveldb.DB, wo *opt.WriteOptions, startI
go func(i uint64) {
_ = bar.Add(1)
k, v := makeKV(i, sizeDistribution.GetSizeSample(), sequential)
err := db.Put(k, v, wo)
err := db.Put(k, v)
if err != nil {
log.Fatal().Err(err).Msg("Failed to put value")
}
Expand All @@ -338,14 +327,14 @@ func writeData(ctx context.Context, db *leveldb.DB, wo *opt.WriteOptions, startI
_ = bar.Finish()
}

func readSeq(ctx context.Context, db *leveldb.DB, wo *opt.WriteOptions, limit uint64) {
func readSeq(ctx context.Context, db KeyValueDB, limit uint64) {
pb := getNewProgressBar(int64(limit), "sequential reads")
var rCount uint64 = 0
pool := make(chan bool, *degreeOfParallelism)
var wg sync.WaitGroup
benchLoop:
for {
iter := db.NewIterator(nil, nil)
iter := db.NewIterator()
for iter.Next() {
rCount += 1
_ = pb.Add(1)
Expand All @@ -372,42 +361,56 @@ benchLoop:
wg.Wait()
_ = pb.Finish()
}
func readRandom(ctx context.Context, db *leveldb.DB, ro *opt.ReadOptions, limit uint64) {
func readRandom(ctx context.Context, db KeyValueDB, limit uint64) {
pb := getNewProgressBar(int64(limit), "random reads")
var rCount uint64 = 0
pool := make(chan bool, *degreeOfParallelism)
var wg sync.WaitGroup
rks := NewRandomKeySeeker(db)
defer rks.iterator.Release()

var rCountLock sync.Mutex
var keyLock sync.Mutex
benchLoop:
for {
for {
pool <- true
wg.Add(1)
go func() {
rCountLock.Lock()
rCount += 1
rCountLock.Unlock()
_ = pb.Add(1)

_, err := db.Get(rks.Key(), ro)
// It's not entirely obvious WHY this is needed, but without it, there are issues with the way that
// pebble db manages it's iterators and internal state. Level db works fine though.
keyLock.Lock()
tmpKey := rks.Key()
_, err := db.Get(tmpKey)
keyLock.Unlock()
if err != nil {
log.Error().Err(err).Msg("Level db random read error")
log.Error().Str("key", hex.EncodeToString(tmpKey)).Err(err).Msg("db random read error")
}
wg.Done()
<-pool
}()
rCountLock.Lock()
if rCount >= limit {
rCountLock.Unlock()
break benchLoop
}
rCountLock.Unlock()

}
}
wg.Wait()
_ = pb.Finish()
}

func NewRandomKeySeeker(db *leveldb.DB) *RandomKeySeeker {
func NewRandomKeySeeker(db KeyValueDB) *RandomKeySeeker {
rks := new(RandomKeySeeker)
rks.db = db
rks.iterator = db.NewIterator(nil, nil)
rks.iterator = db.NewIterator()
rks.firstKey = rks.iterator.Key()
return rks
}
Expand All @@ -420,23 +423,34 @@ func (r *RandomKeySeeker) Key() []byte {
log.Trace().Str("seekKey", hex.EncodeToString(seekKey)).Msg("Searching for key")

r.iteratorMutex.Lock()
defer r.iteratorMutex.Unlock()

// first try to just get a random key
exists := r.iterator.Seek(seekKey)

// if that key doesn't exist exactly advance to the next key
if !exists {
exists = r.iterator.Next()
}

// if there is no next key, to back to the beginning
if !exists {
r.iterator.First()
r.iterator.Next()
exists = r.iterator.First()
}

// if there is no first key try advancing again
if !exists {
exists = r.iterator.Next()
}

// if after trying to all these ways to find a valid key... something must be very wrong
if !exists {
log.Fatal().Msg("Unable to select random key!?")
}
if err := r.iterator.Error(); err != nil {
log.Error().Err(err).Msg("Issue getting random key")
}
resultKey := r.iterator.Key()
r.iteratorMutex.Unlock()
log.Trace().Str("seekKey", hex.EncodeToString(seekKey)).Str("resultKey", hex.EncodeToString(resultKey)).Msg("Found random key")
return resultKey
}
Expand Down Expand Up @@ -596,7 +610,7 @@ func parseRawSizeDistribution(dist string) (*IODistribution, error) {
}

func init() {
flagSet := LevelDBBenchCmd.PersistentFlags()
flagSet := DBBenchCmd.PersistentFlags()
writeLimit = flagSet.Uint64("write-limit", 1000000, "The number of entries to write in the db")
readLimit = flagSet.Uint64("read-limit", 10000000, "the number of reads will attempt to complete in a given test")
overwriteCount = flagSet.Uint64("overwrite-count", 5, "the number of times to overwrite the data")
Expand All @@ -618,6 +632,7 @@ func init() {
readOnly = flagSet.Bool("read-only", false, "if true, we'll skip all the write operations and open the DB in read only mode")
dbPath = flagSet.String("db-path", "_benchmark_db", "the path of the database that we'll use for testing")
fullScan = flagSet.Bool("full-scan-mode", false, "if true, the application will scan the full database as fast as possible and print a summary")
dbMode = flagSet.String("db-mode", "leveldb", "The mode to use: leveldb or pebbledb")

randSrc = rand.New(rand.NewSource(1))
}
Loading

0 comments on commit 84c222d

Please sign in to comment.