diff --git a/bucket.go b/bucket.go index f9f2381..8e1a98a 100644 --- a/bucket.go +++ b/bucket.go @@ -327,19 +327,22 @@ func (b *Bucket) Put(key []byte, value []byte) error { return errors.ErrValueTooLarge } + // Insert into node. + // Tip: Use a new variable `newKey` instead of reusing the existing `key` to prevent + // it from being marked as leaking, and accordingly cannot be allocated on stack. + newKey := cloneBytes(key) + // Move cursor to correct position. c := b.Cursor() - k, _, flags := c.seek(key) + k, _, flags := c.seek(newKey) // Return an error if there is an existing key with a bucket value. - if bytes.Equal(key, k) && (flags&common.BucketLeafFlag) != 0 { + if bytes.Equal(newKey, k) && (flags&common.BucketLeafFlag) != 0 { return errors.ErrIncompatibleValue } - // Insert into node. - // Tip: Use a new variable `newKey` instead of reusing the existing `key` to prevent - // it from being marked as leaking, and accordingly cannot be allocated on stack. - newKey := cloneBytes(key) + // gofail: var beforeBucketPut struct{} + c.node().put(newKey, newKey, value, 0, 0) return nil diff --git a/tests/failpoint/db_failpoint_test.go b/tests/failpoint/db_failpoint_test.go index c1da5b5..3255ba2 100644 --- a/tests/failpoint/db_failpoint_test.go +++ b/tests/failpoint/db_failpoint_test.go @@ -155,3 +155,118 @@ func TestFailpoint_LackOfDiskSpace(t *testing.T) { require.Error(t, err) require.ErrorIs(t, err, errors.ErrTxClosed) } + +// TestIssue72 reproduces issue 72. +// +// When bbolt is processing a `Put` invocation, the key might be concurrently +// updated by the application which calls the `Put` API (although it shouldn't). +// It might lead to a situation that bbolt use an old key to find a proper +// position to insert the key/value pair, but actually inserts a new key. +// Eventually it might break the rule that all keys should be sorted. In a +// worse case, it might cause page elements to point to already freed pages. +// +// REF: https://github.com/etcd-io/bbolt/issues/72 +func TestIssue72(t *testing.T) { + db := btesting.MustCreateDBWithOption(t, &bolt.Options{PageSize: 4096}) + + bucketName := []byte(t.Name()) + err := db.Update(func(tx *bolt.Tx) error { + _, txerr := tx.CreateBucket(bucketName) + return txerr + }) + require.NoError(t, err) + + // The layout is like: + // + // +--+--+--+ + // +------+1 |3 |10+---+ + // | +-++--+--+ | + // | | | + // | | | + // +v-+--+ +v-+--+ +-v+--+--+ + // |1 |2 | |3 |4 | |10|11|12| + // +--+--+ +--+--+ +--+--+--+ + // + err = db.Update(func(tx *bolt.Tx) error { + bk := tx.Bucket(bucketName) + + for _, id := range []int{1, 2, 3, 4, 10, 11, 12} { + if txerr := bk.Put(idToBytes(id), make([]byte, 1000)); txerr != nil { + return txerr + } + } + return nil + }) + require.NoError(t, err) + + require.NoError(t, gofail.Enable("beforeBucketPut", `sleep(5000)`)) + + // +--+--+--+ + // +------+1 |3 |1 +---+ + // | +-++--+--+ | + // | | | + // | | | + // +v-+--+ +v-+--+ +-v+--+--+--+ + // |1 |2 | |3 |4 | |1 |10|11|12| + // +--+--+ +--+--+ +--+--+--+--+ + // + key := idToBytes(13) + updatedKey := idToBytes(1) + err = db.Update(func(tx *bolt.Tx) error { + bk := tx.Bucket(bucketName) + + go func() { + time.Sleep(3 * time.Second) + copy(key, updatedKey) + }() + return bk.Put(key, make([]byte, 100)) + }) + require.NoError(t, err) + + require.NoError(t, gofail.Disable("beforeBucketPut")) + + // bbolt inserts 100 into last branch page. Since there are two `1` + // keys in branch, spill operation will update first `1` pointer and + // then last one won't be updated and continues to point to freed page. + // + // + // +--+--+--+ + // +---------------+1 |3 |1 +---------+ + // | +--++-+--+ | + // | | | + // | | | + // | +--+--+ +v-+--+ +-----v-----+ + // | |1 |2 | |3 |4 | |freed page | + // | +--+--+ +--+--+ +-----------+ + // | + // +v-+--+--+--+---+ + // |1 |10|11|12|100| + // +--+--+--+--+---+ + err = db.Update(func(tx *bolt.Tx) error { + return tx.Bucket(bucketName).Put(idToBytes(100), make([]byte, 100)) + }) + require.NoError(t, err) + + defer func() { + if r := recover(); r != nil { + t.Logf("panic info:\n %v", r) + } + }() + + // Add more keys to ensure branch node to spill. + err = db.Update(func(tx *bolt.Tx) error { + bk := tx.Bucket(bucketName) + + for _, id := range []int{101, 102, 103, 104, 105} { + if txerr := bk.Put(idToBytes(id), make([]byte, 1000)); txerr != nil { + return txerr + } + } + return nil + }) + require.NoError(t, err) +} + +func idToBytes(id int) []byte { + return []byte(fmt.Sprintf("%010d", id)) +}