Skip to content

Commit

Permalink
Merge pull request #659 from Elbehery/add_test_check_page
Browse files Browse the repository at this point in the history
Enhance check functionality to support checking starting from a pageId
  • Loading branch information
ahrtr authored Feb 4, 2024
2 parents 1906d5a + 29d1e3d commit da5975b
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 9 deletions.
74 changes: 65 additions & 9 deletions tx_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,63 @@ func (tx *Tx) check(cfg checkConfig, ch chan error) {
}
}

// Recursively check buckets.
tx.recursivelyCheckBucket(&tx.root, reachable, freed, cfg.kvStringer, ch)

// Ensure all pages below high water mark are either reachable or freed.
for i := common.Pgid(0); i < tx.meta.Pgid(); i++ {
_, isReachable := reachable[i]
if !isReachable && !freed[i] {
ch <- fmt.Errorf("page %d: unreachable unfreed", int(i))
if cfg.pageId == 0 {
// Check the whole db file, starting from the root bucket and
// recursively check all child buckets.
tx.recursivelyCheckBucket(&tx.root, reachable, freed, cfg.kvStringer, ch)

// Ensure all pages below high water mark are either reachable or freed.
for i := common.Pgid(0); i < tx.meta.Pgid(); i++ {
_, isReachable := reachable[i]
if !isReachable && !freed[i] {
ch <- fmt.Errorf("page %d: unreachable unfreed", int(i))
}
}
} else {
// Check the db file starting from a specified pageId.
if cfg.pageId < 2 || cfg.pageId >= uint(tx.meta.Pgid()) {
ch <- fmt.Errorf("page ID (%d) out of range [%d, %d)", cfg.pageId, 2, tx.meta.Pgid())
return
}

tx.recursivelyCheckPage(common.Pgid(cfg.pageId), reachable, freed, cfg.kvStringer, ch)
}
}

func (tx *Tx) recursivelyCheckPage(pageId common.Pgid, reachable map[common.Pgid]*common.Page, freed map[common.Pgid]bool,
kvStringer KVStringer, ch chan error) {
tx.checkInvariantProperties(pageId, reachable, freed, kvStringer, ch)
tx.recursivelyCheckBucketInPage(pageId, reachable, freed, kvStringer, ch)
}

func (tx *Tx) recursivelyCheckBucketInPage(pageId common.Pgid, reachable map[common.Pgid]*common.Page, freed map[common.Pgid]bool,
kvStringer KVStringer, ch chan error) {
p := tx.page(pageId)

switch {
case p.IsBranchPage():
for i := range p.BranchPageElements() {
elem := p.BranchPageElement(uint16(i))
tx.recursivelyCheckBucketInPage(elem.Pgid(), reachable, freed, kvStringer, ch)
}
case p.IsLeafPage():
for i := range p.LeafPageElements() {
elem := p.LeafPageElement(uint16(i))
if elem.Flags()&common.BucketLeafFlag != 0 {

inBkt := common.NewInBucket(pageId, 0)
tmpBucket := Bucket{
InBucket: &inBkt,
rootNode: &node{isLeaf: p.IsLeafPage()},
FillPercent: DefaultFillPercent,
}
if child := tmpBucket.Bucket(elem.Key()); child != nil {
tx.recursivelyCheckBucket(&tmpBucket, reachable, freed, kvStringer, ch)
}
}
}
default:
ch <- fmt.Errorf("unexpected page type (flags: %x) for pgId:%d", p.Flags(), pageId)
}
}

Expand Down Expand Up @@ -167,7 +215,7 @@ func (tx *Tx) recursivelyCheckPageKeyOrderInternal(
return p.LeafPageElement(p.Count() - 1).Key()
}
default:
ch <- fmt.Errorf("unexpected page type for pgId:%d", pgId)
ch <- fmt.Errorf("unexpected page type (flags: %x) for pgId:%d", p.Flags(), pgId)
}
return maxKeyInSubtree
}
Expand Down Expand Up @@ -202,6 +250,7 @@ func verifyKeyOrder(pgId common.Pgid, pageType string, index int, key []byte, pr

type checkConfig struct {
kvStringer KVStringer
pageId uint
}

type CheckOption func(options *checkConfig)
Expand All @@ -212,6 +261,13 @@ func WithKVStringer(kvStringer KVStringer) CheckOption {
}
}

// WithPageId sets a page ID from which the check command starts to check
func WithPageId(pageId uint) CheckOption {
return func(c *checkConfig) {
c.pageId = pageId
}
}

// KVStringer allows to prepare human-readable diagnostic messages.
type KVStringer interface {
KeyToString([]byte) string
Expand Down
109 changes: 109 additions & 0 deletions tx_check_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package bbolt_test

import (
"fmt"
"math/rand"
"testing"

"github.com/stretchr/testify/require"

"go.etcd.io/bbolt"
"go.etcd.io/bbolt/internal/btesting"
"go.etcd.io/bbolt/internal/common"
"go.etcd.io/bbolt/internal/guts_cli"
)

func TestTx_Check_CorruptPage(t *testing.T) {
t.Log("Creating db file.")
db := btesting.MustCreateDBWithOption(t, &bbolt.Options{PageSize: 4096})

// Each page can hold roughly 20 key/values pair, so 100 such
// key/value pairs will consume about 5 leaf pages.
err := db.Fill([]byte("data"), 1, 100,
func(tx int, k int) []byte { return []byte(fmt.Sprintf("%04d", k)) },
func(tx int, k int) []byte { return make([]byte, 100) },
)
require.NoError(t, err)

t.Log("Corrupting random leaf page.")
victimPageId, validPageIds := corruptRandomLeafPage(t, db.DB)

t.Log("Running consistency check.")
vErr := db.View(func(tx *bbolt.Tx) error {
var cErrs []error

t.Log("Check corrupted page.")
errChan := tx.Check(bbolt.WithPageId(uint(victimPageId)))
for cErr := range errChan {
cErrs = append(cErrs, cErr)
}
require.Greater(t, len(cErrs), 0)

t.Log("Check valid pages.")
cErrs = cErrs[:0]
for _, pgId := range validPageIds {
errChan = tx.Check(bbolt.WithPageId(uint(pgId)))
for cErr := range errChan {
cErrs = append(cErrs, cErr)
}
require.Equal(t, 0, len(cErrs))
}
return nil
})
require.NoError(t, vErr)
t.Log("All check passed")

// Manually close the db, otherwise the PostTestCleanup will
// check the db again and accordingly fail the test.
db.MustClose()
}

// corruptRandomLeafPage corrupts one random leaf page.
func corruptRandomLeafPage(t testing.TB, db *bbolt.DB) (victimPageId common.Pgid, validPageIds []common.Pgid) {
victimPageId, validPageIds = pickupRandomLeafPage(t, db)
victimPage, victimBuf, err := guts_cli.ReadPage(db.Path(), uint64(victimPageId))
require.NoError(t, err)
require.True(t, victimPage.IsLeafPage())
require.True(t, victimPage.Count() > 1)

// intentionally make the second key < the first key.
element := victimPage.LeafPageElement(1)
key := element.Key()
key[0] = 0

// Write the corrupt page to db file.
err = guts_cli.WritePage(db.Path(), victimBuf)
require.NoError(t, err)
return victimPageId, validPageIds
}

// pickupRandomLeafPage picks up a random leaf page.
func pickupRandomLeafPage(t testing.TB, db *bbolt.DB) (victimPageId common.Pgid, validPageIds []common.Pgid) {
// Read DB's RootPage, which should be a leaf page.
rootPageId, _, err := guts_cli.GetRootPage(db.Path())
require.NoError(t, err)
rootPage, _, err := guts_cli.ReadPage(db.Path(), uint64(rootPageId))
require.NoError(t, err)
require.True(t, rootPage.IsLeafPage())

// The leaf page contains only one item, namely the bucket
require.Equal(t, uint16(1), rootPage.Count())
lpe := rootPage.LeafPageElement(uint16(0))
require.True(t, lpe.IsBucketEntry())

// The bucket should be pointing to a branch page
bucketRootPageId := lpe.Bucket().RootPage()
bucketRootPage, _, err := guts_cli.ReadPage(db.Path(), uint64(bucketRootPageId))
require.NoError(t, err)
require.True(t, bucketRootPage.IsBranchPage())

// Retrieve all the leaf pages included in the branch page, and pick up random one from them.
var bucketPageIds []common.Pgid
for _, bpe := range bucketRootPage.BranchPageElements() {
bucketPageIds = append(bucketPageIds, bpe.Pgid())
}
randomIdx := rand.Intn(len(bucketPageIds))
victimPageId = bucketPageIds[randomIdx]
validPageIds = append(bucketPageIds[:randomIdx], bucketPageIds[randomIdx+1:]...)
return victimPageId, validPageIds
}

0 comments on commit da5975b

Please sign in to comment.