diff --git a/cmd/bbolt/command_bench.go b/cmd/bbolt/command_bench.go new file mode 100644 index 000000000..0b990f8e7 --- /dev/null +++ b/cmd/bbolt/command_bench.go @@ -0,0 +1,541 @@ +package main + +import ( + "encoding/binary" + "errors" + "fmt" + "io" + "math/rand" + "os" + "runtime" + "runtime/pprof" + "sync/atomic" + "time" + + "github.com/spf13/cobra" + "github.com/spf13/pflag" + bolt "go.etcd.io/bbolt" +) + +var ( + // ErrBatchNonDivisibleBatchSize is returned when the batch size can't be evenly + // divided by the iteration count. + ErrBatchNonDivisibleBatchSize = errors.New("the number of iterations must be divisible by the batch size") + + // ErrBatchInvalidWriteMode is returned when the write mode is other than seq, rnd, seq-nest, or rnd-nest. + ErrBatchInvalidWriteMode = errors.New("the write mode should be one of seq, rnd, seq-nest, or rnd-nest") +) + +var benchBucketName = []byte("bench") + +func newBenchCobraCommand() *cobra.Command { + o := newBenchOptions() + benchCmd := &cobra.Command{ + Use: "bench", + Short: "run synthetic benchmark against bbolt", + RunE: func(cmd *cobra.Command, args []string) error { + if err := o.Validate(); err != nil { + return err + } + if err := o.SetOptionValues(); err != nil { + return err + } + benchExec := newBenchExecution(cmd, o) + return benchExec.Run() + }, + } + + o.AddFlags(benchCmd.Flags()) + return benchCmd +} + +type benchOptions struct { + batchSize uint32 + blockProfile string + iterations uint32 + cpuProfile string + fillPercent float64 + keySize int + memProfile string + noSync bool + path string + profileMode string + valueSize int + work bool + writeMode string +} + +// Returns a new benchOptions for the `bench` command with the default values applied. +func newBenchOptions() benchOptions { + return benchOptions{ + iterations: 1000, + fillPercent: bolt.DefaultFillPercent, + keySize: 8, + profileMode: "rw", + valueSize: 32, + writeMode: "seq", + } +} + +// AddFlags sets the flags for the `bench` command. +func (o *benchOptions) AddFlags(fs *pflag.FlagSet) { + fs.Uint32Var(&o.batchSize, "batch-size", o.batchSize, "the step size for each iteration, if not provided iteration size is used, it needs to be evenly divided by the iteration count (count)") + fs.StringVar(&o.blockProfile, "block-profile", o.blockProfile, "output file for the pprof block profile") + fs.Uint32Var(&o.iterations, "count", o.iterations, "the number of iterations") + fs.StringVar(&o.cpuProfile, "cpu-profile", o.cpuProfile, "output file for the pprof CPU profile") + fs.Float64Var(&o.fillPercent, "fill-percent", o.fillPercent, "the percentage that split pages are filled") + fs.IntVar(&o.keySize, "key-size", o.keySize, "the size for the key, from the key value insertion") + fs.StringVar(&o.memProfile, "mem-profile", o.memProfile, "output file for the pprof memoery profile") + fs.BoolVar(&o.noSync, "no-sync", o.noSync, "skip fsync() calls after each commit") + fs.StringVar(&o.path, "path", o.path, "path to the database file") + fs.StringVar(&o.profileMode, "profile-mode", o.profileMode, "the profile mode to execute, valid modes are r, w and rw") + fs.IntVar(&o.valueSize, "value-size", o.valueSize, "the size for the value, from the key value insertion") + fs.BoolVar(&o.work, "work", o.work, "if set, the database path won't be removed after the execution") + fs.StringVar(&o.writeMode, "write-mode", o.writeMode, "the write mode, valid values are seq, rnd, seq-nest and rnd-nest") +} + +// Returns an error if `bench` options are not valid. +func (o *benchOptions) Validate() error { + // Require that batch size can be evenly divided by the iteration count if set. + if o.batchSize > 0 && o.iterations%o.batchSize != 0 { + return ErrBatchNonDivisibleBatchSize + } + + switch o.writeMode { + case "seq", "rnd", "seq-nest", "rnd-nest": + default: + return ErrBatchInvalidWriteMode + } + + return nil +} + +// Sets the `bench` option values that are dependent on other options. +func (o *benchOptions) SetOptionValues() error { + // Generate temp path if one is not passed in. + if o.path == "" { + f, err := os.CreateTemp("", "bolt-bench-") + if err != nil { + return fmt.Errorf("error creating temp file: %s", err) + } + f.Close() + os.Remove(f.Name()) + o.path = f.Name() + } + + // Set batch size to iteration size if not set. + if o.batchSize == 0 { + o.batchSize = o.iterations + } + + return nil +} + +type benchExecution struct { + cfg benchOptions + db *bolt.DB + writeResults *benchResults + readResults *benchResults + cpuProfile *os.File + memProfile *os.File + blockProfile *os.File + stderr io.Writer +} + +func newBenchExecution(cmd *cobra.Command, cfg benchOptions) *benchExecution { + return &benchExecution{ + cfg: cfg, + writeResults: new(benchResults), + readResults: new(benchResults), + stderr: cmd.OutOrStderr(), + } +} + +func (e *benchExecution) Run() error { + // Ensure that profiling files are closed if there's an early exit from an error. + defer e.stopProfiling() + + // Remove path if "-work" is not set. Otherwise keep path. + if e.cfg.work { + fmt.Fprintf(e.stderr, "work: %s\n", e.cfg.path) + } else { + defer os.Remove(e.cfg.path) + } + + var err error + // Create database. + e.db, err = bolt.Open(e.cfg.path, 0600, nil) + if err != nil { + return err + } + e.db.NoSync = e.cfg.noSync + defer e.db.Close() + + // Write to the database. + fmt.Fprintf(e.stderr, "starting write benchmark.\n") + if err := e.benchWrites(); err != nil { + return fmt.Errorf("write: %v", err) + } + + fmt.Fprintf(e.stderr, "starting read benchmark.\n") + // Read from the database. + if err := e.benchReads(); err != nil { + return fmt.Errorf("bench: read: %s", err) + } + + // Print results. + fmt.Fprintf(e.stderr, "# Write\t%s\n", e.writeResults) + fmt.Fprintf(e.stderr, "# Read\t%s\n\n", e.readResults) + return nil +} + +// benchResults represents the performance results of the benchmark and is thread-safe. +type benchResults struct { + completedOps int64 + duration int64 +} + +func (r *benchResults) AddCompletedOps(amount int64) { + atomic.AddInt64(&r.completedOps, amount) +} + +func (r *benchResults) CompletedOps() int64 { + return atomic.LoadInt64(&r.completedOps) +} + +func (r *benchResults) SetDuration(dur time.Duration) { + atomic.StoreInt64(&r.duration, int64(dur)) +} + +func (r *benchResults) Duration() time.Duration { + return time.Duration(atomic.LoadInt64(&r.duration)) +} + +// Returns the duration for a single read/write operation. +func (r *benchResults) OpDuration() time.Duration { + if r.CompletedOps() == 0 { + return 0 + } + return r.Duration() / time.Duration(r.CompletedOps()) +} + +// Returns average number of read/write operations that can be performed per second. +func (r *benchResults) OpsPerSecond() int { + var op = r.OpDuration() + if op == 0 { + return 0 + } + return int(time.Second) / int(op) +} + +// Returns the printable results. +func (r *benchResults) String() string { + return fmt.Sprintf("%v(ops)\t%v\t(%v/op)\t(%v op/sec)", r.CompletedOps(), r.Duration(), r.OpDuration(), r.OpsPerSecond()) +} + +func (e *benchExecution) benchWrites() error { + // Start profiling for writes. + if e.cfg.profileMode == "rw" || e.cfg.profileMode == "w" { + if err := e.startProfiling(); err != nil { + return err + } + } + + finishChan := make(chan interface{}) + go benchCheckProgress(e.writeResults, finishChan, e.stderr) + defer close(finishChan) + + t := time.Now() + + var err error + switch e.cfg.writeMode { + case "seq": + err = e.benchWritesWithSource(benchSequentialKeySource()) + case "rnd": + err = e.benchWritesWithSource(benchRandomKeySource()) + case "seq-nest": + err = e.benchWritesNestedWithSource(benchSequentialKeySource()) + case "rnd-nest": + err = e.benchWritesNestedWithSource(benchRandomKeySource()) + default: + return fmt.Errorf("invalid write mode: %s", e.cfg.writeMode) + } + + // Save time to write. + e.writeResults.SetDuration(time.Since(t)) + + // Stop profiling for writes only. + if e.cfg.profileMode == "w" { + e.stopProfiling() + } + + return err +} + +func benchSequentialKeySource() func() uint32 { + i := uint32(0) + return func() uint32 { i++; return i } +} + +func benchRandomKeySource() func() uint32 { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + return func() uint32 { return r.Uint32() } +} + +func (e *benchExecution) benchWritesWithSource(keySource func() uint32) error { + for i := uint32(0); i < e.cfg.iterations; i += e.cfg.batchSize { + if err := e.db.Update(func(tx *bolt.Tx) error { + b, _ := tx.CreateBucketIfNotExists(benchBucketName) + b.FillPercent = e.cfg.fillPercent + + fmt.Fprintf(e.stderr, "Starting write iteration %d/%d+%d\n", i, e.cfg.iterations, e.cfg.batchSize) + for j := uint32(0); j < e.cfg.batchSize; j++ { + key := make([]byte, e.cfg.keySize) + value := make([]byte, e.cfg.valueSize) + + // Write key as uint32. + binary.BigEndian.PutUint32(key, keySource()) + + // Insert key/value. + if err := b.Put(key, value); err != nil { + return err + } + + e.writeResults.AddCompletedOps(1) + } + fmt.Fprintf(e.stderr, "Finished write iteration %d\n", i) + + return nil + }); err != nil { + return err + } + } + return nil +} + +func (e *benchExecution) benchWritesNestedWithSource(keySource func() uint32) error { + for i := uint32(0); i < e.cfg.iterations; i += e.cfg.batchSize { + if err := e.db.Update(func(tx *bolt.Tx) error { + top, err := tx.CreateBucketIfNotExists(benchBucketName) + if err != nil { + return err + } + top.FillPercent = e.cfg.fillPercent + + // Create bucket key. + name := make([]byte, e.cfg.keySize) + binary.BigEndian.PutUint32(name, keySource()) + + // Create bucket. + b, err := top.CreateBucketIfNotExists(name) + if err != nil { + return err + } + b.FillPercent = e.cfg.fillPercent + + fmt.Fprintf(e.stderr, "Starting write iteration %d\n", i) + for j := uint32(0); j < e.cfg.batchSize; j++ { + var key = make([]byte, e.cfg.keySize) + var value = make([]byte, e.cfg.valueSize) + + // Generate key as uint32. + binary.BigEndian.PutUint32(key, keySource()) + + // Insert value into subbucket. + if err := b.Put(key, value); err != nil { + return err + } + + e.writeResults.AddCompletedOps(1) + } + fmt.Fprintf(e.stderr, "Finished write iteration %d\n", i) + + return nil + }); err != nil { + return err + } + } + return nil +} + +// Reads from the database. +func (e *benchExecution) benchReads() error { + // Start profiling for reads. + if e.cfg.profileMode == "r" { + if err := e.startProfiling(); err != nil { + return err + } + } + + finishChan := make(chan interface{}) + go benchCheckProgress(e.readResults, finishChan, e.stderr) + defer close(finishChan) + + t := time.Now() + + var err error + switch e.cfg.writeMode { + case "seq-nest", "rnd-nest": + err = e.benchReadsSequentialNested() + default: + err = e.benchReadsSequential() + } + + // Save read time. + e.readResults.SetDuration(time.Since(t)) + + // Stop profiling for reads. + if e.cfg.profileMode == "rw" || e.cfg.profileMode == "r" { + e.stopProfiling() + } + + return err +} + +func (e *benchExecution) benchReadsSequential() error { + return e.db.View(func(tx *bolt.Tx) error { + t := time.Now() + + for { + numReads := uint32(0) + c := tx.Bucket(benchBucketName).Cursor() + for k, v := c.First(); k != nil; k, v = c.Next() { + numReads++ + e.readResults.AddCompletedOps(1) + if v == nil { + return errors.New("invalid value") + } + } + + if e.cfg.writeMode == "seq" && numReads != e.cfg.iterations { + return fmt.Errorf("read seq: iter mismatch: expected %d, got %d", e.cfg.iterations, numReads) + } + + // Make sure we do this for at least a second. + if time.Since(t) >= time.Second { + break + } + } + + return nil + }) +} + +func (e *benchExecution) benchReadsSequentialNested() error { + return e.db.View(func(tx *bolt.Tx) error { + t := time.Now() + + for { + numReads := uint32(0) + var top = tx.Bucket(benchBucketName) + if err := top.ForEach(func(name, _ []byte) error { + if b := top.Bucket(name); b != nil { + c := b.Cursor() + for k, v := c.First(); k != nil; k, v = c.Next() { + numReads++ + e.readResults.AddCompletedOps(1) + if v == nil { + return ErrInvalidValue + } + } + } + return nil + }); err != nil { + return err + } + + if e.cfg.writeMode == "seq-nest" && numReads != e.cfg.iterations { + return fmt.Errorf("read seq-nest: iter mismatch: expected %d, got %d", e.cfg.iterations, numReads) + } + + // Make sure we do this for at least a second. + if time.Since(t) >= time.Second { + break + } + } + + return nil + }) +} + +func benchCheckProgress(results *benchResults, finishChan chan interface{}, stderr io.Writer) { + ticker := time.Tick(time.Second) + lastCompleted, lastTime := int64(0), time.Now() + for { + select { + case <-finishChan: + return + case t := <-ticker: + completed, taken := results.CompletedOps(), t.Sub(lastTime) + fmt.Fprintf(stderr, "Completed %d requests, %d/s \n", + completed, ((completed-lastCompleted)*int64(time.Second))/int64(taken), + ) + lastCompleted, lastTime = completed, t + } + } +} + +// Starts all profiles set on the options. +func (e *benchExecution) startProfiling() error { + var err error + + // Start CPU profiling. + if e.cfg.cpuProfile != "" { + e.cpuProfile, err = os.Create(e.cfg.cpuProfile) + if err != nil { + return fmt.Errorf("could not create cpu profile %q: %v\n", e.cfg.cpuProfile, err) + } + err = pprof.StartCPUProfile(e.cpuProfile) + if err != nil { + return fmt.Errorf("could not start cpu profile %q: %v\n", e.cfg.cpuProfile, err) + } + } + + // Start memory profiling. + if e.cfg.memProfile != "" { + e.memProfile, err = os.Create(e.cfg.memProfile) + if err != nil { + return fmt.Errorf("could not create memory profile %q: %v\n", e.cfg.memProfile, err) + } + runtime.MemProfileRate = 4096 + } + + // Start fatal profiling. + if e.cfg.blockProfile != "" { + e.blockProfile, err = os.Create(e.cfg.blockProfile) + if err != nil { + return fmt.Errorf("could not create block profile %q: %v\n", e.cfg.blockProfile, err) + } + runtime.SetBlockProfileRate(1) + } + + return nil +} + +// Stops all profiles. +func (e *benchExecution) stopProfiling() { + if e.cpuProfile != nil { + pprof.StopCPUProfile() + e.cpuProfile.Close() + e.cpuProfile = nil + } + + if e.memProfile != nil { + err := pprof.Lookup("heap").WriteTo(e.memProfile, 0) + if err != nil { + fmt.Fprint(e.stderr, "could not write memory profile") + } + e.memProfile.Close() + e.memProfile = nil + } + + if e.blockProfile != nil { + err := pprof.Lookup("block").WriteTo(e.blockProfile, 0) + if err != nil { + fmt.Fprint(e.stderr, "could not write block profile") + } + e.blockProfile.Close() + e.blockProfile = nil + runtime.SetBlockProfileRate(0) + } +} diff --git a/cmd/bbolt/command_bench_test.go b/cmd/bbolt/command_bench_test.go new file mode 100644 index 000000000..18dfb8dc1 --- /dev/null +++ b/cmd/bbolt/command_bench_test.go @@ -0,0 +1,52 @@ +package main_test + +import ( + "bytes" + "fmt" + "io" + "strings" + "testing" + + main "go.etcd.io/bbolt/cmd/bbolt" +) + +// Ensure the "bench" command runs and exits without errors +func TestBenchCommand_Run(t *testing.T) { + tests := map[string]struct { + args []string + }{ + "no-args": {}, + "100k count": {[]string{"--count", "100000"}}, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + // Run the command. + buf := new(bytes.Buffer) + rootCmd := main.NewRootCommand() + rootCmd.SetArgs(append([]string{"bench"}, test.args...)) + rootCmd.SetOut(buf) + if err := rootCmd.Execute(); err != nil { + t.Fatal(err) + } + + output, err := io.ReadAll(buf) + if err != nil { + t.Fatal(err) + } + stderr := string(output) + + if !strings.Contains(stderr, "starting write benchmark.") || !strings.Contains(stderr, "starting read benchmark.") { + t.Fatal(fmt.Errorf("benchmark result does not contain read/write start output:\n%s", stderr)) + } + + if strings.Contains(stderr, "iter mismatch") { + t.Fatal(fmt.Errorf("found iter mismatch in stdout:\n%s", stderr)) + } + + if !strings.Contains(stderr, "# Write") || !strings.Contains(stderr, "# Read") { + t.Fatal(fmt.Errorf("benchmark result does not contain read/write output:\n%s", stderr)) + } + }) + } +} diff --git a/cmd/bbolt/command_root.go b/cmd/bbolt/command_root.go index b69a619ed..caba01567 100644 --- a/cmd/bbolt/command_root.go +++ b/cmd/bbolt/command_root.go @@ -20,6 +20,7 @@ func NewRootCommand() *cobra.Command { newVersionCobraCommand(), newSurgeryCobraCommand(), newInspectCobraCommand(), + newBenchCobraCommand(), ) return rootCmd diff --git a/cmd/bbolt/main.go b/cmd/bbolt/main.go index 121fd4da9..c6b2b2b00 100644 --- a/cmd/bbolt/main.go +++ b/cmd/bbolt/main.go @@ -3,20 +3,14 @@ package main import ( "bytes" "crypto/sha256" - "encoding/binary" "encoding/hex" "errors" "flag" "fmt" "io" - "math/rand" "os" - "runtime" - "runtime/pprof" "strconv" "strings" - "sync/atomic" - "time" "unicode" "unicode/utf8" @@ -116,8 +110,6 @@ func (m *Main) Run(args ...string) error { case "help": fmt.Fprintln(m.Stderr, m.Usage()) return ErrUsage - case "bench": - return newBenchCommand(m).Run(args[1:]...) case "buckets": return newBucketsCommand(m).Run(args[1:]...) case "check": @@ -1057,493 +1049,6 @@ Additional options include: `, "\n") } -var benchBucketName = []byte("bench") - -// benchCommand represents the "bench" command execution. -type benchCommand struct { - baseCommand -} - -// newBenchCommand returns a BenchCommand using the -func newBenchCommand(m *Main) *benchCommand { - c := &benchCommand{} - c.baseCommand = m.baseCommand - return c -} - -// Run executes the "bench" command. -func (cmd *benchCommand) Run(args ...string) error { - // Parse CLI arguments. - options, err := cmd.ParseFlags(args) - if err != nil { - return err - } - - // Remove path if "-work" is not set. Otherwise keep path. - if options.Work { - fmt.Fprintf(cmd.Stderr, "work: %s\n", options.Path) - } else { - defer os.Remove(options.Path) - } - - // Create database. - db, err := bolt.Open(options.Path, 0600, nil) - if err != nil { - return err - } - db.NoSync = options.NoSync - defer db.Close() - - // Write to the database. - var writeResults BenchResults - fmt.Fprintf(cmd.Stderr, "starting write benchmark.\n") - if err := cmd.runWrites(db, options, &writeResults); err != nil { - return fmt.Errorf("write: %v", err) - } - - var readResults BenchResults - fmt.Fprintf(cmd.Stderr, "starting read benchmark.\n") - // Read from the database. - if err := cmd.runReads(db, options, &readResults); err != nil { - return fmt.Errorf("bench: read: %s", err) - } - - // Print results. - fmt.Fprintf(cmd.Stderr, "# Write\t%v(ops)\t%v\t(%v/op)\t(%v op/sec)\n", writeResults.CompletedOps(), writeResults.Duration(), writeResults.OpDuration(), writeResults.OpsPerSecond()) - fmt.Fprintf(cmd.Stderr, "# Read\t%v(ops)\t%v\t(%v/op)\t(%v op/sec)\n", readResults.CompletedOps(), readResults.Duration(), readResults.OpDuration(), readResults.OpsPerSecond()) - fmt.Fprintln(cmd.Stderr, "") - return nil -} - -// ParseFlags parses the command line flags. -func (cmd *benchCommand) ParseFlags(args []string) (*BenchOptions, error) { - var options BenchOptions - - // Parse flagset. - fs := flag.NewFlagSet("", flag.ContinueOnError) - fs.StringVar(&options.ProfileMode, "profile-mode", "rw", "") - fs.StringVar(&options.WriteMode, "write-mode", "seq", "") - fs.StringVar(&options.ReadMode, "read-mode", "seq", "") - fs.Int64Var(&options.Iterations, "count", 1000, "") - fs.Int64Var(&options.BatchSize, "batch-size", 0, "") - fs.IntVar(&options.KeySize, "key-size", 8, "") - fs.IntVar(&options.ValueSize, "value-size", 32, "") - fs.StringVar(&options.CPUProfile, "cpuprofile", "", "") - fs.StringVar(&options.MemProfile, "memprofile", "", "") - fs.StringVar(&options.BlockProfile, "blockprofile", "", "") - fs.Float64Var(&options.FillPercent, "fill-percent", bolt.DefaultFillPercent, "") - fs.BoolVar(&options.NoSync, "no-sync", false, "") - fs.BoolVar(&options.Work, "work", false, "") - fs.StringVar(&options.Path, "path", "", "") - fs.SetOutput(cmd.Stderr) - if err := fs.Parse(args); err != nil { - return nil, err - } - - // Set batch size to iteration size if not set. - // Require that batch size can be evenly divided by the iteration count. - if options.BatchSize == 0 { - options.BatchSize = options.Iterations - } else if options.Iterations%options.BatchSize != 0 { - return nil, ErrNonDivisibleBatchSize - } - - // Generate temp path if one is not passed in. - if options.Path == "" { - f, err := os.CreateTemp("", "bolt-bench-") - if err != nil { - return nil, fmt.Errorf("temp file: %s", err) - } - f.Close() - os.Remove(f.Name()) - options.Path = f.Name() - } - - return &options, nil -} - -// Writes to the database. -func (cmd *benchCommand) runWrites(db *bolt.DB, options *BenchOptions, results *BenchResults) error { - // Start profiling for writes. - if options.ProfileMode == "rw" || options.ProfileMode == "w" { - cmd.startProfiling(options) - } - - finishChan := make(chan interface{}) - go checkProgress(results, finishChan, cmd.Stderr) - defer close(finishChan) - - t := time.Now() - - var err error - switch options.WriteMode { - case "seq": - err = cmd.runWritesSequential(db, options, results) - case "rnd": - err = cmd.runWritesRandom(db, options, results) - case "seq-nest": - err = cmd.runWritesSequentialNested(db, options, results) - case "rnd-nest": - err = cmd.runWritesRandomNested(db, options, results) - default: - return fmt.Errorf("invalid write mode: %s", options.WriteMode) - } - - // Save time to write. - results.SetDuration(time.Since(t)) - - // Stop profiling for writes only. - if options.ProfileMode == "w" { - cmd.stopProfiling() - } - - return err -} - -func (cmd *benchCommand) runWritesSequential(db *bolt.DB, options *BenchOptions, results *BenchResults) error { - var i = uint32(0) - return cmd.runWritesWithSource(db, options, results, func() uint32 { i++; return i }) -} - -func (cmd *benchCommand) runWritesRandom(db *bolt.DB, options *BenchOptions, results *BenchResults) error { - r := rand.New(rand.NewSource(time.Now().UnixNano())) - return cmd.runWritesWithSource(db, options, results, func() uint32 { return r.Uint32() }) -} - -func (cmd *benchCommand) runWritesSequentialNested(db *bolt.DB, options *BenchOptions, results *BenchResults) error { - var i = uint32(0) - return cmd.runWritesNestedWithSource(db, options, results, func() uint32 { i++; return i }) -} - -func (cmd *benchCommand) runWritesRandomNested(db *bolt.DB, options *BenchOptions, results *BenchResults) error { - r := rand.New(rand.NewSource(time.Now().UnixNano())) - return cmd.runWritesNestedWithSource(db, options, results, func() uint32 { return r.Uint32() }) -} - -func (cmd *benchCommand) runWritesWithSource(db *bolt.DB, options *BenchOptions, results *BenchResults, keySource func() uint32) error { - for i := int64(0); i < options.Iterations; i += options.BatchSize { - if err := db.Update(func(tx *bolt.Tx) error { - b, _ := tx.CreateBucketIfNotExists(benchBucketName) - b.FillPercent = options.FillPercent - - fmt.Fprintf(cmd.Stderr, "Starting write iteration %d\n", i) - for j := int64(0); j < options.BatchSize; j++ { - key := make([]byte, options.KeySize) - value := make([]byte, options.ValueSize) - - // Write key as uint32. - binary.BigEndian.PutUint32(key, keySource()) - - // Insert key/value. - if err := b.Put(key, value); err != nil { - return err - } - - results.AddCompletedOps(1) - } - fmt.Fprintf(cmd.Stderr, "Finished write iteration %d\n", i) - - return nil - }); err != nil { - return err - } - } - return nil -} - -func (cmd *benchCommand) runWritesNestedWithSource(db *bolt.DB, options *BenchOptions, results *BenchResults, keySource func() uint32) error { - for i := int64(0); i < options.Iterations; i += options.BatchSize { - if err := db.Update(func(tx *bolt.Tx) error { - top, err := tx.CreateBucketIfNotExists(benchBucketName) - if err != nil { - return err - } - top.FillPercent = options.FillPercent - - // Create bucket key. - name := make([]byte, options.KeySize) - binary.BigEndian.PutUint32(name, keySource()) - - // Create bucket. - b, err := top.CreateBucketIfNotExists(name) - if err != nil { - return err - } - b.FillPercent = options.FillPercent - - fmt.Fprintf(cmd.Stderr, "Starting write iteration %d\n", i) - for j := int64(0); j < options.BatchSize; j++ { - var key = make([]byte, options.KeySize) - var value = make([]byte, options.ValueSize) - - // Generate key as uint32. - binary.BigEndian.PutUint32(key, keySource()) - - // Insert value into subbucket. - if err := b.Put(key, value); err != nil { - return err - } - - results.AddCompletedOps(1) - } - fmt.Fprintf(cmd.Stderr, "Finished write iteration %d\n", i) - - return nil - }); err != nil { - return err - } - } - return nil -} - -// Reads from the database. -func (cmd *benchCommand) runReads(db *bolt.DB, options *BenchOptions, results *BenchResults) error { - // Start profiling for reads. - if options.ProfileMode == "r" { - cmd.startProfiling(options) - } - - finishChan := make(chan interface{}) - go checkProgress(results, finishChan, cmd.Stderr) - defer close(finishChan) - - t := time.Now() - - var err error - switch options.ReadMode { - case "seq": - switch options.WriteMode { - case "seq-nest", "rnd-nest": - err = cmd.runReadsSequentialNested(db, options, results) - default: - err = cmd.runReadsSequential(db, options, results) - } - default: - return fmt.Errorf("invalid read mode: %s", options.ReadMode) - } - - // Save read time. - results.SetDuration(time.Since(t)) - - // Stop profiling for reads. - if options.ProfileMode == "rw" || options.ProfileMode == "r" { - cmd.stopProfiling() - } - - return err -} - -func (cmd *benchCommand) runReadsSequential(db *bolt.DB, options *BenchOptions, results *BenchResults) error { - return db.View(func(tx *bolt.Tx) error { - t := time.Now() - - for { - numReads := int64(0) - c := tx.Bucket(benchBucketName).Cursor() - for k, v := c.First(); k != nil; k, v = c.Next() { - numReads++ - results.AddCompletedOps(1) - if v == nil { - return errors.New("invalid value") - } - } - - if options.WriteMode == "seq" && numReads != options.Iterations { - return fmt.Errorf("read seq: iter mismatch: expected %d, got %d", options.Iterations, numReads) - } - - // Make sure we do this for at least a second. - if time.Since(t) >= time.Second { - break - } - } - - return nil - }) -} - -func (cmd *benchCommand) runReadsSequentialNested(db *bolt.DB, options *BenchOptions, results *BenchResults) error { - return db.View(func(tx *bolt.Tx) error { - t := time.Now() - - for { - numReads := int64(0) - var top = tx.Bucket(benchBucketName) - if err := top.ForEach(func(name, _ []byte) error { - if b := top.Bucket(name); b != nil { - c := b.Cursor() - for k, v := c.First(); k != nil; k, v = c.Next() { - numReads++ - results.AddCompletedOps(1) - if v == nil { - return ErrInvalidValue - } - } - } - return nil - }); err != nil { - return err - } - - if options.WriteMode == "seq-nest" && numReads != options.Iterations { - return fmt.Errorf("read seq-nest: iter mismatch: expected %d, got %d", options.Iterations, numReads) - } - - // Make sure we do this for at least a second. - if time.Since(t) >= time.Second { - break - } - } - - return nil - }) -} - -func checkProgress(results *BenchResults, finishChan chan interface{}, stderr io.Writer) { - ticker := time.Tick(time.Second) - lastCompleted, lastTime := int64(0), time.Now() - for { - select { - case <-finishChan: - return - case t := <-ticker: - completed, taken := results.CompletedOps(), t.Sub(lastTime) - fmt.Fprintf(stderr, "Completed %d requests, %d/s \n", - completed, ((completed-lastCompleted)*int64(time.Second))/int64(taken), - ) - lastCompleted, lastTime = completed, t - } - } -} - -// File handlers for the various profiles. -var cpuprofile, memprofile, blockprofile *os.File - -// Starts all profiles set on the options. -func (cmd *benchCommand) startProfiling(options *BenchOptions) { - var err error - - // Start CPU profiling. - if options.CPUProfile != "" { - cpuprofile, err = os.Create(options.CPUProfile) - if err != nil { - fmt.Fprintf(cmd.Stderr, "bench: could not create cpu profile %q: %v\n", options.CPUProfile, err) - os.Exit(1) - } - err = pprof.StartCPUProfile(cpuprofile) - if err != nil { - fmt.Fprintf(cmd.Stderr, "bench: could not start cpu profile %q: %v\n", options.CPUProfile, err) - os.Exit(1) - } - } - - // Start memory profiling. - if options.MemProfile != "" { - memprofile, err = os.Create(options.MemProfile) - if err != nil { - fmt.Fprintf(cmd.Stderr, "bench: could not create memory profile %q: %v\n", options.MemProfile, err) - os.Exit(1) - } - runtime.MemProfileRate = 4096 - } - - // Start fatal profiling. - if options.BlockProfile != "" { - blockprofile, err = os.Create(options.BlockProfile) - if err != nil { - fmt.Fprintf(cmd.Stderr, "bench: could not create block profile %q: %v\n", options.BlockProfile, err) - os.Exit(1) - } - runtime.SetBlockProfileRate(1) - } -} - -// Stops all profiles. -func (cmd *benchCommand) stopProfiling() { - if cpuprofile != nil { - pprof.StopCPUProfile() - cpuprofile.Close() - cpuprofile = nil - } - - if memprofile != nil { - err := pprof.Lookup("heap").WriteTo(memprofile, 0) - if err != nil { - fmt.Fprintf(cmd.Stderr, "bench: could not write mem profile") - } - memprofile.Close() - memprofile = nil - } - - if blockprofile != nil { - err := pprof.Lookup("block").WriteTo(blockprofile, 0) - if err != nil { - fmt.Fprintf(cmd.Stderr, "bench: could not write block profile") - } - blockprofile.Close() - blockprofile = nil - runtime.SetBlockProfileRate(0) - } -} - -// BenchOptions represents the set of options that can be passed to "bolt bench". -type BenchOptions struct { - ProfileMode string - WriteMode string - ReadMode string - Iterations int64 - BatchSize int64 - KeySize int - ValueSize int - CPUProfile string - MemProfile string - BlockProfile string - StatsInterval time.Duration - FillPercent float64 - NoSync bool - Work bool - Path string -} - -// BenchResults represents the performance results of the benchmark and is thread-safe. -type BenchResults struct { - completedOps int64 - duration int64 -} - -func (r *BenchResults) AddCompletedOps(amount int64) { - atomic.AddInt64(&r.completedOps, amount) -} - -func (r *BenchResults) CompletedOps() int64 { - return atomic.LoadInt64(&r.completedOps) -} - -func (r *BenchResults) SetDuration(dur time.Duration) { - atomic.StoreInt64(&r.duration, int64(dur)) -} - -func (r *BenchResults) Duration() time.Duration { - return time.Duration(atomic.LoadInt64(&r.duration)) -} - -// Returns the duration for a single read/write operation. -func (r *BenchResults) OpDuration() time.Duration { - if r.CompletedOps() == 0 { - return 0 - } - return r.Duration() / time.Duration(r.CompletedOps()) -} - -// Returns average number of read/write operations that can be performed per second. -func (r *BenchResults) OpsPerSecond() int { - var op = r.OpDuration() - if op == 0 { - return 0 - } - return int(time.Second) / int(op) -} - type PageError struct { ID int Err error diff --git a/cmd/bbolt/main_test.go b/cmd/bbolt/main_test.go index e137db3e9..c80e56308 100644 --- a/cmd/bbolt/main_test.go +++ b/cmd/bbolt/main_test.go @@ -479,40 +479,6 @@ func TestPagesCommand_Run(t *testing.T) { require.NoError(t, err) } -// Ensure the "bench" command runs and exits without errors -func TestBenchCommand_Run(t *testing.T) { - tests := map[string]struct { - args []string - }{ - "no-args": {}, - "100k count": {[]string{"-count", "100000"}}, - } - - for name, test := range tests { - t.Run(name, func(t *testing.T) { - // Run the command. - m := NewMain() - args := append([]string{"bench"}, test.args...) - if err := m.Run(args...); err != nil { - t.Fatal(err) - } - - stderr := m.Stderr.String() - if !strings.Contains(stderr, "starting write benchmark.") || !strings.Contains(stderr, "starting read benchmark.") { - t.Fatal(fmt.Errorf("benchmark result does not contain read/write start output:\n%s", stderr)) - } - - if strings.Contains(stderr, "iter mismatch") { - t.Fatal(fmt.Errorf("found iter mismatch in stdout:\n%s", stderr)) - } - - if !strings.Contains(stderr, "# Write") || !strings.Contains(stderr, "# Read") { - t.Fatal(fmt.Errorf("benchmark result does not contain read/write output:\n%s", stderr)) - } - }) - } -} - type ConcurrentBuffer struct { m sync.Mutex buf bytes.Buffer