Skip to content

Commit

Permalink
add MoveBucket to support moving a sub-bucket from one bucket to anot…
Browse files Browse the repository at this point in the history
…her bucket

Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr authored and Elbehery committed Dec 12, 2023
1 parent a7a791c commit 3540f37
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 0 deletions.
53 changes: 53 additions & 0 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,59 @@ func (b *Bucket) DeleteBucket(key []byte) error {
return nil
}

// MoveBucket moves a sub-bucket from the source bucket to the destination bucket.
// Returns an error if
// 1. the sub-bucket cannot be found in the source bucket;
// 2. or the key already exists in the destination bucket;
// 3. the key represents a non-bucket value.
func (b *Bucket) MoveBucket(key []byte, dstBucket *Bucket) error {
if b.tx.db == nil || dstBucket.tx.db == nil {
return errors.ErrTxClosed
} else if !dstBucket.Writable() {
return errors.ErrTxNotWritable
}

// Move cursor to correct position.
c := b.Cursor()
k, v, flags := c.seek(key)

// Return an error if bucket doesn't exist or is not a bucket.
if !bytes.Equal(key, k) {
return errors.ErrBucketNotFound
} else if (flags & common.BucketLeafFlag) == 0 {
return fmt.Errorf("key %q isn't a bucket in the source bucket: %w", key, errors.ErrIncompatibleValue)
}

// Do nothing (return true directly) if the source bucket and the
// destination bucket are actually the same bucket.
if b == dstBucket || (b.RootPage() == dstBucket.RootPage() && b.RootPage() != 0) {
return nil
}

// check whether the key already exists in the destination bucket
curDst := dstBucket.Cursor()
k, _, flags = curDst.seek(key)

// Return an error if there is an existing key in the destination bucket.
if bytes.Equal(key, k) {
if (flags & common.BucketLeafFlag) != 0 {
return errors.ErrBucketExists
}
return fmt.Errorf("key %q already exists in the target bucket: %w", key, errors.ErrIncompatibleValue)
}

// remove the sub-bucket from the source bucket
delete(b.buckets, string(key))
c.node().del(key)

// add te sub-bucket to the destination bucket
newKey := cloneBytes(key)
newValue := cloneBytes(v)
curDst.node().put(newKey, newKey, newValue, 0, common.BucketLeafFlag)

return nil
}

// Get retrieves the value for a key in the bucket.
// Returns a nil value if the key does not exist or if the key is a nested bucket.
// The returned value is only valid for the life of the transaction.
Expand Down
18 changes: 18 additions & 0 deletions tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,24 @@ func (tx *Tx) DeleteBucket(name []byte) error {
return tx.root.DeleteBucket(name)
}

// MoveBucket moves a sub-bucket from the source bucket to the destination bucket.
// Returns an error if
// 1. the sub-bucket cannot be found in the source bucket;
// 2. or the key already exists in the destination bucket;
// 3. the key represents a non-bucket value.
//
// If src is nil, it means moving a top level bucket into the target bucket.
// If dst is nil, it means converting the child bucket into a top level bucket.
func (tx *Tx) MoveBucket(child []byte, src *Bucket, dst *Bucket) error {
if src == nil {
src = &tx.root
}
if dst == nil {
dst = &tx.root
}
return src.MoveBucket(child, dst)
}

// ForEach executes a function for each bucket in the root.
// If the provided function returns an error then the iteration is stopped and
// the error is returned to the caller.
Expand Down

0 comments on commit 3540f37

Please sign in to comment.