Skip to content

Commit

Permalink
server: update defrag logic to use bolt.Compact()
Browse files Browse the repository at this point in the history
Signed-off-by: Cenk Alti <[email protected]>
  • Loading branch information
cenkalti committed Mar 14, 2023
1 parent eba5845 commit d46cad6
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 63 deletions.
63 changes: 2 additions & 61 deletions server/storage/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var (
defaultBatchLimit = 10000
defaultBatchInterval = 100 * time.Millisecond

defragLimit = 10000
defragLimit = 10 * 1024 * 1024

// initialMmapSize is the initial size of the mmapped region. Setting this larger than
// the potential max db size can prevent writer from blocking reader.
Expand Down Expand Up @@ -503,7 +503,7 @@ func (b *backend) defrag() error {
)
}
// gofail: var defragBeforeCopy struct{}
err = defragdb(b.db, tmpdb, defragLimit)
err = bolt.Compact(tmpdb, b.db, int64(defragLimit))
if err != nil {
tmpdb.Close()
if rmErr := os.RemoveAll(tmpdb.Path()); rmErr != nil {
Expand Down Expand Up @@ -560,65 +560,6 @@ func (b *backend) defrag() error {
return nil
}

func defragdb(odb, tmpdb *bolt.DB, limit int) error {
// open a tx on tmpdb for writes
tmptx, err := tmpdb.Begin(true)
if err != nil {
return err
}
defer func() {
if err != nil {
tmptx.Rollback()
}
}()

// open a tx on old db for read
tx, err := odb.Begin(false)
if err != nil {
return err
}
defer tx.Rollback()

c := tx.Cursor()

count := 0
for next, _ := c.First(); next != nil; next, _ = c.Next() {
b := tx.Bucket(next)
if b == nil {
return fmt.Errorf("backend: cannot defrag bucket %s", string(next))
}

tmpb, berr := tmptx.CreateBucketIfNotExists(next)
if berr != nil {
return berr
}
tmpb.FillPercent = 0.9 // for bucket2seq write in for each

if err = b.ForEach(func(k, v []byte) error {
count++
if count > limit {
err = tmptx.Commit()
if err != nil {
return err
}
tmptx, err = tmpdb.Begin(true)
if err != nil {
return err
}
tmpb = tmptx.Bucket(next)
tmpb.FillPercent = 0.9 // for bucket2seq write in for each

count = 0
}
return tmpb.Put(k, v)
}); err != nil {
return err
}
}

return tmptx.Commit()
}

func (b *backend) begin(write bool) *bolt.Tx {
b.mu.RLock()
tx := b.unsafeBegin(write)
Expand Down
8 changes: 6 additions & 2 deletions server/storage/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,15 @@ func TestBackendDefrag(t *testing.T) {

defer betesting.Close(t, b)

// Put some data a bit over the defrag limit
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(schema.Test)
for i := 0; i < backend.DefragLimitForTest()+100; i++ {
tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("foo_%d", i)), []byte("bar"))
n := int(float32(backend.DefragLimitForTest())*1.1) / 1024
value := make([]byte, 1024)
for i := 0; i < n; i++ {
println(i)
tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("foo_%d", i)), value)
}
tx.Unlock()
b.ForceCommit()
Expand Down

0 comments on commit d46cad6

Please sign in to comment.