Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pebble DB Integration into DB Bench Tool #155

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ Note: Do not modify this section! It is auto-generated by `cobra` using `make ge

- [polycli abi](doc/polycli_abi.md) - Parse an ABI and print the encoded signatures.

- [polycli dbbench](doc/polycli_dbbench.md) - Perform a level/pebble db benchmark

- [polycli dumpblocks](doc/polycli_dumpblocks.md) - Export a range of blocks from a JSON-RPC endpoint.

- [polycli enr](doc/polycli_enr.md) - Convert between ENR and Enode format
Expand All @@ -50,8 +52,6 @@ Note: Do not modify this section! It is auto-generated by `cobra` using `make ge

- [polycli hash](doc/polycli_hash.md) - Provide common crypto hashing functions.

- [polycli leveldbbench](doc/polycli_leveldbbench.md) - Perform a level db benchmark

- [polycli loadtest](doc/polycli_loadtest.md) - Run a generic load test against an Eth/EVM style JSON-RPC endpoint.

- [polycli metrics-to-dash](doc/polycli_metrics-to-dash.md) - Create a dashboard from an Openmetrics / Prometheus response.
Expand Down
163 changes: 89 additions & 74 deletions cmd/leveldbbench/leveldbbench.go → cmd/dbbench/dbbench.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package leveldbbench
package dbbench

import (
"context"
Expand All @@ -22,11 +22,7 @@ import (
"github.com/rs/zerolog/log"
progressbar "github.com/schollz/progressbar/v3"
"github.com/spf13/cobra"
leveldb "github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)

var (
Expand Down Expand Up @@ -55,6 +51,7 @@ var (
readOnly *bool
dbPath *string
fullScan *bool
dbMode *string
)

const (
Expand Down Expand Up @@ -96,12 +93,11 @@ type (
TestDuration time.Duration
Description string
OpCount uint64
Stats *leveldb.DBStats
OpRate float64
ValueDist []uint64
}
RandomKeySeeker struct {
db *leveldb.DB
db KeyValueDB
iterator iterator.Iterator
iteratorMutex sync.Mutex
firstKey []byte
Expand All @@ -115,16 +111,19 @@ type (
ranges []IORange
totalFrequency int
}
// KeyValueDB directly exposes the necessary methods of leveldb.DB that we need to run the test so that they can be
// implemented by other KV stores
KeyValueDB interface {
Close() error
Compact() error
NewIterator() iterator.Iterator
Get([]byte) ([]byte, error)
Put([]byte, []byte) error
}
)

func NewTestResult(startTime, endTime time.Time, desc string, opCount uint64, db *leveldb.DB) *TestResult {
func NewTestResult(startTime, endTime time.Time, desc string, opCount uint64) *TestResult {
tr := new(TestResult)
s := new(leveldb.DBStats)
err := db.Stats(s)
if err != nil {
log.Error().Err(err).Msg("Unable to retrieve db stats")
}
tr.Stats = s
tr.StartTime = startTime
tr.EndTime = endTime
tr.TestDuration = endTime.Sub(startTime)
Expand All @@ -137,41 +136,31 @@ func NewTestResult(startTime, endTime time.Time, desc string, opCount uint64, db
return tr
}

var LevelDBBenchCmd = &cobra.Command{
Use: "leveldbbench [flags]",
Short: "Perform a level db benchmark",
var DBBenchCmd = &cobra.Command{
Use: "dbbench [flags]",
Short: "Perform a level/pebble db benchmark",
Long: usage,
RunE: func(cmd *cobra.Command, args []string) error {
log.Info().Msg("Starting level db test")
db, err := leveldb.OpenFile(*dbPath, &opt.Options{
Filter: filter.NewBloomFilter(10),
DisableSeeksCompaction: true,
OpenFilesCacheCapacity: *openFilesCacheCapacity,
BlockCacheCapacity: *cacheSize / 2 * opt.MiB,
WriteBuffer: *cacheSize / 4 * opt.MiB,
// if we've disabled writes, or we're doing a full scan, we should open the database in read only mode
ReadOnly: *readOnly || *fullScan,
})
if err != nil {
return err
log.Info().Msg("Starting db test")
var kvdb KeyValueDB
var err error
switch *dbMode {
case "leveldb":
kvdb, err = NewWrappedLevelDB()
if err != nil {
return err
}
case "pebbledb":
kvdb, err = NewWrappedPebbleDB()
if err != nil {
return err
}
default:
return fmt.Errorf("the mode %s is not recognized", *dbMode)
}

ctx := context.Background()
wo := opt.WriteOptions{
NoWriteMerge: *noWriteMerge,
Sync: *syncWrites,
}
ro := &opt.ReadOptions{
DontFillCache: *dontFillCache,
}
if *readStrict {
ro.Strict = opt.StrictAll
} else {
ro.Strict = opt.DefaultStrict
}
if *nilReadOptions {
ro = nil
}

var start time.Time
trs := make([]*TestResult, 0)

Expand All @@ -186,8 +175,8 @@ var LevelDBBenchCmd = &cobra.Command{

if *fullScan {
start = time.Now()
opCount, valueDist := runFullScan(ctx, db, &wo, ro)
tr := NewTestResult(start, time.Now(), "full scan", opCount, db)
opCount, valueDist := runFullScan(ctx, kvdb)
tr := NewTestResult(start, time.Now(), "full scan", opCount)
tr.ValueDist = valueDist
trs = append(trs, tr)
return printSummary(trs)
Expand All @@ -196,32 +185,32 @@ var LevelDBBenchCmd = &cobra.Command{
// in no write mode, we assume the database as already been populated in a previous run or we're using some other database
if !*readOnly {
start = time.Now()
writeData(ctx, db, &wo, 0, *writeLimit, *sequentialWrites)
trs = append(trs, NewTestResult(start, time.Now(), fmt.Sprintf("initial %s write", sequentialWritesDesc), *writeLimit, db))
writeData(ctx, kvdb, 0, *writeLimit, *sequentialWrites)
trs = append(trs, NewTestResult(start, time.Now(), fmt.Sprintf("initial %s write", sequentialWritesDesc), *writeLimit))

for i := 0; i < int(*overwriteCount); i += 1 {
start = time.Now()
writeData(ctx, db, &wo, 0, *writeLimit, *sequentialWrites)
trs = append(trs, NewTestResult(start, time.Now(), fmt.Sprintf("%s overwrite %d", sequentialWritesDesc, i), *writeLimit, db))
writeData(ctx, kvdb, 0, *writeLimit, *sequentialWrites)
trs = append(trs, NewTestResult(start, time.Now(), fmt.Sprintf("%s overwrite %d", sequentialWritesDesc, i), *writeLimit))
}

start = time.Now()
runFullCompact(ctx, db, &wo)
trs = append(trs, NewTestResult(start, time.Now(), "compaction", 1, db))
runFullCompact(ctx, kvdb)
trs = append(trs, NewTestResult(start, time.Now(), "compaction", 1))
}

if *sequentialReads {
start = time.Now()
readSeq(ctx, db, &wo, *readLimit)
trs = append(trs, NewTestResult(start, time.Now(), fmt.Sprintf("%s read", sequentialReadsDesc), *readLimit, db))
readSeq(ctx, kvdb, *readLimit)
trs = append(trs, NewTestResult(start, time.Now(), fmt.Sprintf("%s read", sequentialReadsDesc), *readLimit))
} else {
start = time.Now()
readRandom(ctx, db, ro, *readLimit)
trs = append(trs, NewTestResult(start, time.Now(), fmt.Sprintf("%s read", sequentialWritesDesc), *readLimit, db))
readRandom(ctx, kvdb, *readLimit)
trs = append(trs, NewTestResult(start, time.Now(), fmt.Sprintf("%s read", sequentialWritesDesc), *readLimit))
}

log.Info().Msg("Close DB")
err = db.Close()
err = kvdb.Close()
if err != nil {
log.Error().Err(err).Msg("Error while closing db")
}
Expand Down Expand Up @@ -250,19 +239,19 @@ func printSummary(trs []*TestResult) error {
return nil
}

func runFullCompact(ctx context.Context, db *leveldb.DB, wo *opt.WriteOptions) {
err := db.CompactRange(util.Range{Start: nil, Limit: nil})
func runFullCompact(ctx context.Context, db KeyValueDB) {
err := db.Compact()
if err != nil {
log.Fatal().Err(err).Msg("Error compacting data")
}
}
func runFullScan(ctx context.Context, db *leveldb.DB, wo *opt.WriteOptions, ro *opt.ReadOptions) (uint64, []uint64) {
func runFullScan(ctx context.Context, db KeyValueDB) (uint64, []uint64) {
pool := make(chan bool, *degreeOfParallelism)
var wg sync.WaitGroup
// 32 should be safe here. That would correspond to a single value that's 4.2 GB
buckets := make([]uint64, 32)
var bucketsMutex sync.Mutex
iter := db.NewIterator(nil, nil)
iter := db.NewIterator()
var opCount uint64 = 0
for iter.Next() {
pool <- true
Expand Down Expand Up @@ -314,7 +303,7 @@ func runFullScan(ctx context.Context, db *leveldb.DB, wo *opt.WriteOptions, ro *
}
return opCount, buckets
}
func writeData(ctx context.Context, db *leveldb.DB, wo *opt.WriteOptions, startIndex, writeLimit uint64, sequential bool) {
func writeData(ctx context.Context, db KeyValueDB, startIndex, writeLimit uint64, sequential bool) {
var i uint64 = startIndex
var wg sync.WaitGroup
pool := make(chan bool, *degreeOfParallelism)
Expand All @@ -326,7 +315,7 @@ func writeData(ctx context.Context, db *leveldb.DB, wo *opt.WriteOptions, startI
go func(i uint64) {
_ = bar.Add(1)
k, v := makeKV(i, sizeDistribution.GetSizeSample(), sequential)
err := db.Put(k, v, wo)
err := db.Put(k, v)
if err != nil {
log.Fatal().Err(err).Msg("Failed to put value")
}
Expand All @@ -338,14 +327,14 @@ func writeData(ctx context.Context, db *leveldb.DB, wo *opt.WriteOptions, startI
_ = bar.Finish()
}

func readSeq(ctx context.Context, db *leveldb.DB, wo *opt.WriteOptions, limit uint64) {
func readSeq(ctx context.Context, db KeyValueDB, limit uint64) {
pb := getNewProgressBar(int64(limit), "sequential reads")
var rCount uint64 = 0
pool := make(chan bool, *degreeOfParallelism)
var wg sync.WaitGroup
benchLoop:
for {
iter := db.NewIterator(nil, nil)
iter := db.NewIterator()
for iter.Next() {
rCount += 1
_ = pb.Add(1)
Expand All @@ -372,42 +361,56 @@ benchLoop:
wg.Wait()
_ = pb.Finish()
}
func readRandom(ctx context.Context, db *leveldb.DB, ro *opt.ReadOptions, limit uint64) {
func readRandom(ctx context.Context, db KeyValueDB, limit uint64) {
pb := getNewProgressBar(int64(limit), "random reads")
var rCount uint64 = 0
pool := make(chan bool, *degreeOfParallelism)
var wg sync.WaitGroup
rks := NewRandomKeySeeker(db)
defer rks.iterator.Release()

var rCountLock sync.Mutex
var keyLock sync.Mutex
benchLoop:
for {
for {
pool <- true
wg.Add(1)
go func() {
rCountLock.Lock()
rCount += 1
rCountLock.Unlock()
_ = pb.Add(1)

_, err := db.Get(rks.Key(), ro)
// It's not entirely obvious WHY this is needed, but without it, there are issues with the way that
// pebble db manages it's iterators and internal state. Level db works fine though.
Comment on lines +385 to +386
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nivida fyi lol

keyLock.Lock()
tmpKey := rks.Key()
_, err := db.Get(tmpKey)
keyLock.Unlock()
if err != nil {
log.Error().Err(err).Msg("Level db random read error")
log.Error().Str("key", hex.EncodeToString(tmpKey)).Err(err).Msg("db random read error")
}
wg.Done()
<-pool
}()
rCountLock.Lock()
if rCount >= limit {
rCountLock.Unlock()
break benchLoop
}
rCountLock.Unlock()

}
}
wg.Wait()
_ = pb.Finish()
}

func NewRandomKeySeeker(db *leveldb.DB) *RandomKeySeeker {
func NewRandomKeySeeker(db KeyValueDB) *RandomKeySeeker {
rks := new(RandomKeySeeker)
rks.db = db
rks.iterator = db.NewIterator(nil, nil)
rks.iterator = db.NewIterator()
rks.firstKey = rks.iterator.Key()
return rks
}
Expand All @@ -420,23 +423,34 @@ func (r *RandomKeySeeker) Key() []byte {
log.Trace().Str("seekKey", hex.EncodeToString(seekKey)).Msg("Searching for key")

r.iteratorMutex.Lock()
defer r.iteratorMutex.Unlock()

// first try to just get a random key
exists := r.iterator.Seek(seekKey)

// if that key doesn't exist exactly advance to the next key
if !exists {
exists = r.iterator.Next()
}

// if there is no next key, to back to the beginning
if !exists {
r.iterator.First()
r.iterator.Next()
exists = r.iterator.First()
}

// if there is no first key try advancing again
if !exists {
exists = r.iterator.Next()
}

// if after trying to all these ways to find a valid key... something must be very wrong
if !exists {
log.Fatal().Msg("Unable to select random key!?")
}
if err := r.iterator.Error(); err != nil {
log.Error().Err(err).Msg("Issue getting random key")
}
resultKey := r.iterator.Key()
r.iteratorMutex.Unlock()
log.Trace().Str("seekKey", hex.EncodeToString(seekKey)).Str("resultKey", hex.EncodeToString(resultKey)).Msg("Found random key")
return resultKey
}
Expand Down Expand Up @@ -596,7 +610,7 @@ func parseRawSizeDistribution(dist string) (*IODistribution, error) {
}

func init() {
flagSet := LevelDBBenchCmd.PersistentFlags()
flagSet := DBBenchCmd.PersistentFlags()
writeLimit = flagSet.Uint64("write-limit", 1000000, "The number of entries to write in the db")
readLimit = flagSet.Uint64("read-limit", 10000000, "the number of reads will attempt to complete in a given test")
overwriteCount = flagSet.Uint64("overwrite-count", 5, "the number of times to overwrite the data")
Expand All @@ -618,6 +632,7 @@ func init() {
readOnly = flagSet.Bool("read-only", false, "if true, we'll skip all the write operations and open the DB in read only mode")
dbPath = flagSet.String("db-path", "_benchmark_db", "the path of the database that we'll use for testing")
fullScan = flagSet.Bool("full-scan-mode", false, "if true, the application will scan the full database as fast as possible and print a summary")
dbMode = flagSet.String("db-mode", "leveldb", "The mode to use: leveldb or pebbledb")

randSrc = rand.New(rand.NewSource(1))
}
Loading