Skip to content

Commit

Permalink
Merge pull request #779 from ahrtr/concurent_writeto_20240627
Browse files Browse the repository at this point in the history
Enhance `TestDB_Concurrent_WriteTo` to check consistent read
  • Loading branch information
ahrtr authored Jul 2, 2024
2 parents d537eff + 848f5fb commit ee58c7e
Showing 1 changed file with 68 additions and 41 deletions.
109 changes: 68 additions & 41 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,68 +668,95 @@ func TestDB_BeginRW(t *testing.T) {
}

// TestDB_Concurrent_WriteTo checks that issuing WriteTo operations concurrently
// with commits does not produce corrupted db files.
func TestDB_Concurrent_WriteTo(t *testing.T) {
o := &bolt.Options{NoFreelistSync: false}
// with commits does not produce corrupted db files. It also verifies that all
// readonly transactions, which are created based on the same data view, should
// always read the same data.
func TestDB_Concurrent_WriteTo_and_ConsistentRead(t *testing.T) {
o := &bolt.Options{
NoFreelistSync: false,
PageSize: 4096,
}
db := btesting.MustCreateDBWithOption(t, o)

wtxs, rtxs := 50, 5
bucketName := []byte("data")

var dataLock sync.Mutex
dataCache := make(map[int][]map[string]string)

var wg sync.WaitGroup
wtxs, rtxs := 5, 5
wg.Add(wtxs * rtxs)
f := func(tx *bolt.Tx) {
f := func(round int, tx *bolt.Tx) {
defer wg.Done()
f, err := os.CreateTemp("", "bolt-")
if err != nil {
panic(err)
}
time.Sleep(time.Duration(rand.Intn(20)+1) * time.Millisecond)
_, err = tx.WriteTo(f)
if err != nil {
panic(err)
}
time.Sleep(time.Duration(rand.Intn(200)+10) * time.Millisecond)
f := filepath.Join(t.TempDir(), fmt.Sprintf("%d-bolt-", round))
err := tx.CopyFile(f, 0600)
require.NoError(t, err)

// read all the data
b := tx.Bucket(bucketName)
data := make(map[string]string)
err = b.ForEach(func(k, v []byte) error {
data[string(k)] = string(v)
return nil
})
require.NoError(t, err)

// cache the data
dataLock.Lock()
dataSlice := dataCache[round]
dataSlice = append(dataSlice, data)
dataCache[round] = dataSlice
dataLock.Unlock()

err = tx.Rollback()
if err != nil {
panic(err)
}
f.Close()
require.NoError(t, err)

copyOpt := *o
snap := btesting.MustOpenDBWithOption(t, f.Name(), &copyOpt)
snap := btesting.MustOpenDBWithOption(t, f, &copyOpt)
defer snap.MustClose()
snap.MustCheck()
}

tx1, err := db.Begin(true)
if err != nil {
t.Fatal(err)
}
if _, err := tx1.CreateBucket([]byte("abc")); err != nil {
t.Fatal(err)
}
if err := tx1.Commit(); err != nil {
t.Fatal(err)
}
err := db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucket(bucketName)
return err
})
require.NoError(t, err)

for i := 0; i < wtxs; i++ {
tx, err := db.Begin(true)
if err != nil {
t.Fatal(err)
}
if err := tx.Bucket([]byte("abc")).Put([]byte{0}, []byte{0}); err != nil {
t.Fatal(err)
}
require.NoError(t, err)

b := tx.Bucket(bucketName)

for j := 0; j < rtxs; j++ {
rtx, rerr := db.Begin(false)
if rerr != nil {
t.Fatal(rerr)
require.NoError(t, rerr)
go f(i, rtx)

for k := 0; k < 10; k++ {
key, value := fmt.Sprintf("key_%d", rand.Intn(10)), fmt.Sprintf("value_%d", rand.Intn(100))
perr := b.Put([]byte(key), []byte(value))
require.NoError(t, perr)
}
go f(rtx)
}
if err := tx.Commit(); err != nil {
t.Fatal(err)
}
err = tx.Commit()
require.NoError(t, err)
}
wg.Wait()

// compare the data. The data generated in the same round
// should be exactly the same.
for round, dataSlice := range dataCache {
data0 := dataSlice[0]

for i := 1; i < len(dataSlice); i++ {
datai := dataSlice[i]
same := reflect.DeepEqual(data0, datai)
require.True(t, same, fmt.Sprintf("found inconsistent data in round %d, data[0]: %v, data[%d] : %v", round, data0, i, datai))
}
}
}

// Ensure that opening a transaction while the DB is closed returns an error.
Expand Down

0 comments on commit ee58c7e

Please sign in to comment.