Skip to content

Commit

Permalink
db: add TestDBCompactionCrash
Browse files Browse the repository at this point in the history
Add a new unit test that stresses behavior under hard crashes. The test runs a
small write workload while simulating hard crashes at the k-th write operation,
ratcheting k up until the workload completes without running k write operations.

Informs cockroachdb/cockroach#124468.
  • Loading branch information
jbowens committed Jun 17, 2024
1 parent 82bca51 commit 3b3f10c
Showing 1 changed file with 135 additions and 0 deletions.
135 changes: 135 additions & 0 deletions error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ import (
"bytes"
"fmt"
"math"
"math/rand"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/testkeys"
"github.com/cockroachdb/pebble/vfs"
"github.com/cockroachdb/pebble/vfs/errorfs"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -411,3 +414,135 @@ func TestDBWALRotationCrash(t *testing.T) {
require.NoError(t, run(fs, k))
}
}

func TestDBCompactionCrash(t *testing.T) {
seed := time.Now().UnixNano()
t.Log("seed", seed)

// This test uses the strict MemFS with the error injector to simulate
// crashes. Each subtest runs with a crash induced at the k-th write
// operation. Each subsequent run increases k by +1-5 until a subtest runs
// to completion without performing a k-th write operation.
//
// crashIndex holds the value of k at which the crash is induced and is
// decremented by the errorfs on each write operation.
var crashIndex atomic.Int32
mkFS := func() (vfs.FS, *vfs.MemFS) {
memfs := vfs.NewStrictMem()
inj := errorfs.InjectorFunc(func(op errorfs.Op) error {
if op.Kind.ReadOrWrite() == errorfs.OpIsWrite && crashIndex.Add(-1) == -1 {
memfs.SetIgnoreSyncs(true)
}
return nil
})
return errorfs.Wrap(memfs, inj), memfs
}
triggered := func() bool { return crashIndex.Load() < 0 }

// run opens the store and performs some random writes, simulating a crash
// at the k-th write operation.
run := func(t *testing.T, fs vfs.FS, k int32, seed int64) (i int64, err error) {
rng := rand.New(rand.NewSource(seed))
maxConcurrentCompactions := rng.Intn(3) + 2
opts := &Options{
DisableTableStats: true,
FS: errorfs.Wrap(fs, errorfs.RandomLatency(nil, 20*time.Microsecond, seed)),
Logger: testLogger{t: t},
MemTableSize: 128 << 10,
MaxConcurrentCompactions: func() int { return maxConcurrentCompactions },
LBaseMaxBytes: 64 << 10,
L0CompactionThreshold: 2,
L0CompactionFileThreshold: 2,
MemTableStopWritesThreshold: 10,
L0StopWritesThreshold: 10,
}
if testing.Verbose() {
lel := MakeLoggingEventListener(opts.Logger)
opts.EventListener = &lel
}
d, err := Open("", opts)
if err != nil || triggered() {
return 0, err
}

// Set index to k so that the k-th write operation decrements it to zero
// and simulates a crash.
crashIndex.Store(k)

// Write keys in random order in batches of random sizes.
const maxKeyLength = 2
const valLength = 4 << 10
timestamps := []int{10, 5}
ks := testkeys.Alpha(maxKeyLength)
ks = ks.EveryN(10)
buf := make([]byte, ks.MaxLen()+testkeys.MaxSuffixLen)
vbuf := make([]byte, valLength)
b := d.NewBatch()
perm := rng.Perm(int(ks.Count()))
done:
for _, ts := range timestamps {
for _, i := range perm {
n := testkeys.WriteKeyAt(buf, ks, int64(i), int64(ts))
_, err = rng.Read(vbuf)
require.NoError(t, err)
require.NoError(t, b.Set(buf[:n], vbuf, nil))
if rng.Intn(10) == 1 {
if err = d.Apply(b, nil); err != nil || triggered() {
b = nil
break done
}
b.Reset()
}
if rng.Intn(100) == 1 {
if err = d.Flush(); err != nil || triggered() {
break done
}
}
}
}
if b != nil && b.Count() > 0 {
err = firstError(err, d.Apply(b, nil))
}
err = firstError(err, d.Close())
return i, err
}

// Run the test with increasing values of k until a subtest runs to
// completion without performing a k-th write operation.
done := false
rng := rand.New(rand.NewSource(seed))
for k := int32(0); !done; k += rng.Int31n(5) + 1 {
t.Run(fmt.Sprintf("k=%d", k), func(t *testing.T) {
// Run, simulating a crash by ignoring syncs after the k-th write
// operation after Open.
crashIndex.Store(math.MaxInt32)
fs, memfs := mkFS()
i, err := run(t, fs, k, seed)
if !triggered() {
// Stop when we reach a value of k greater than the number of
// write operations performed during `run`.
t.Logf("No crash at write operation %d\n", k)
if err != nil {
t.Fatalf("Filesystem did not 'crash', but error returned: %s", err)
}
done = true
return
}
t.Logf("Simulated crash at write operation % 2d after writing %d keys, error: %v\n", k, i, err)

// Reset the filesystem to its state right before the simulated
// "crash", restore syncs and run again without crashing. No errors
// should be encountered.
//
// TODO(jackson): Allow an arbitrary subset of synced state to
// survive beyond the crash point.
memfs.ResetToSyncedState()
memfs.SetIgnoreSyncs(false)
crashIndex.Store(math.MaxInt32)
_, err = run(t, fs, math.MaxInt32, seed)
require.False(t, triggered())
// TODO(jackson): Add assertions on the database keys.
require.NoError(t, err)
})
}
}

0 comments on commit 3b3f10c

Please sign in to comment.