Skip to content

Commit

Permalink
remove the usage of atomic package
Browse files Browse the repository at this point in the history
to resolve the 64bit unaligned issue in arm32 by partially reverting
#373.

Refer to discussion in #577

Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Oct 20, 2023
1 parent 233156a commit 2e12f03
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 376 deletions.
6 changes: 3 additions & 3 deletions allocate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ func TestTx_allocatePageStats(t *testing.T) {
}

txStats := tx.Stats()
prePageCnt := txStats.GetPageCount()
prePageCnt := txStats.PageCount
allocateCnt := f.free_count()

if _, err := tx.allocate(allocateCnt); err != nil {
t.Fatal(err)
}

txStats = tx.Stats()
if txStats.GetPageCount() != prePageCnt+int64(allocateCnt) {
t.Errorf("Allocated %d but got %d page in stats", allocateCnt, txStats.GetPageCount())
if txStats.PageCount != prePageCnt+allocateCnt {
t.Errorf("Allocated %d but got %d page in stats", allocateCnt, txStats.PageCount)
}
}
4 changes: 2 additions & 2 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (b *Bucket) Writable() bool {
// Do not use a cursor after the transaction is closed.
func (b *Bucket) Cursor() *Cursor {
// Update transaction statistics.
b.tx.stats.IncCursorCount(1)
b.tx.stats.CursorCount++

// Allocate and return a cursor.
return &Cursor{
Expand Down Expand Up @@ -724,7 +724,7 @@ func (b *Bucket) node(pgId common.Pgid, parent *node) *node {
b.nodes[pgId] = n

// Update statistics.
b.tx.stats.IncNodeCount(1)
b.tx.stats.NodeCount++

return n
}
Expand Down
19 changes: 14 additions & 5 deletions cmd/bbolt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"runtime/pprof"
"strconv"
"strings"
"sync/atomic"
"sync"
"time"
"unicode"
"unicode/utf8"
Expand Down Expand Up @@ -1518,24 +1518,33 @@ type BenchOptions struct {

// BenchResults represents the performance results of the benchmark and is thread-safe.
type BenchResults struct {
mu sync.RWMutex
completedOps int64
duration int64
}

func (r *BenchResults) AddCompletedOps(amount int64) {
atomic.AddInt64(&r.completedOps, amount)
r.mu.Lock()
defer r.mu.Unlock()
r.completedOps += amount
}

func (r *BenchResults) CompletedOps() int64 {
return atomic.LoadInt64(&r.completedOps)
r.mu.RLock()
defer r.mu.RUnlock()
return r.completedOps
}

func (r *BenchResults) SetDuration(dur time.Duration) {
atomic.StoreInt64(&r.duration, int64(dur))
r.mu.Lock()
defer r.mu.Unlock()
r.duration += int64(dur)
}

func (r *BenchResults) Duration() time.Duration {
return time.Duration(atomic.LoadInt64(&r.duration))
r.mu.RLock()
defer r.mu.RUnlock()
return time.Duration(r.duration)
}

// Returns the duration for a single read/write operation.
Expand Down
8 changes: 4 additions & 4 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1051,8 +1051,8 @@ func TestDB_Stats(t *testing.T) {
}

stats := db.Stats()
if stats.TxStats.GetPageCount() != 2 {
t.Fatalf("unexpected TxStats.PageCount: %d", stats.TxStats.GetPageCount())
if stats.TxStats.PageCount != 2 {
t.Fatalf("unexpected TxStats.PageCount: %d", stats.TxStats.PageCount)
} else if stats.FreePageN != 0 {
t.Fatalf("unexpected FreePageN != 0: %d", stats.FreePageN)
} else if stats.PendingPageN != 2 {
Expand Down Expand Up @@ -1135,8 +1135,8 @@ func TestDBStats_Sub(t *testing.T) {
b.TxStats.PageCount = 10
b.FreePageN = 14
diff := b.Sub(&a)
if diff.TxStats.GetPageCount() != 7 {
t.Fatalf("unexpected TxStats.PageCount: %d", diff.TxStats.GetPageCount())
if diff.TxStats.PageCount != 7 {
t.Fatalf("unexpected TxStats.PageCount: %d", diff.TxStats.PageCount)
}

// free page stats are copied from the receiver and not subtracted
Expand Down
12 changes: 6 additions & 6 deletions internal/btesting/btesting.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,14 @@ func (db *DB) CopyTempFile() {
func (db *DB) PrintStats() {
var stats = db.Stats()
fmt.Printf("[db] %-20s %-20s %-20s\n",
fmt.Sprintf("pg(%d/%d)", stats.TxStats.GetPageCount(), stats.TxStats.GetPageAlloc()),
fmt.Sprintf("cur(%d)", stats.TxStats.GetCursorCount()),
fmt.Sprintf("node(%d/%d)", stats.TxStats.GetNodeCount(), stats.TxStats.GetNodeDeref()),
fmt.Sprintf("pg(%d/%d)", stats.TxStats.PageCount, stats.TxStats.PageAlloc),
fmt.Sprintf("cur(%d)", stats.TxStats.CursorCount),
fmt.Sprintf("node(%d/%d)", stats.TxStats.NodeCount, stats.TxStats.NodeDeref),
)
fmt.Printf(" %-20s %-20s %-20s\n",
fmt.Sprintf("rebal(%d/%v)", stats.TxStats.GetRebalance(), truncDuration(stats.TxStats.GetRebalanceTime())),
fmt.Sprintf("spill(%d/%v)", stats.TxStats.GetSpill(), truncDuration(stats.TxStats.GetSpillTime())),
fmt.Sprintf("w(%d/%v)", stats.TxStats.GetWrite(), truncDuration(stats.TxStats.GetWriteTime())),
fmt.Sprintf("rebal(%d/%v)", stats.TxStats.Rebalance, truncDuration(stats.TxStats.RebalanceTime)),
fmt.Sprintf("spill(%d/%v)", stats.TxStats.Spill, truncDuration(stats.TxStats.SpillTime)),
fmt.Sprintf("w(%d/%v)", stats.TxStats.Write, truncDuration(stats.TxStats.WriteTime)),
)
}

Expand Down
8 changes: 4 additions & 4 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (n *node) splitTwo(pageSize uintptr) (*node, *node) {
n.inodes = n.inodes[:splitIndex]

// Update the statistics.
n.bucket.tx.stats.IncSplit(1)
n.bucket.tx.stats.Split++

return n, next
}
Expand Down Expand Up @@ -347,7 +347,7 @@ func (n *node) spill() error {
}

// Update the statistics.
tx.stats.IncSpill(1)
tx.stats.Spill++
}

// If the root node split and created a new root then we need to spill that
Expand All @@ -369,7 +369,7 @@ func (n *node) rebalance() {
n.unbalanced = false

// Update statistics.
n.bucket.tx.stats.IncRebalance(1)
n.bucket.tx.stats.Rebalance++

// Ignore if node is above threshold (25% when FillPercent is set to DefaultFillPercent) and has enough keys.
var threshold = int(float64(n.bucket.tx.db.pageSize)*n.bucket.FillPercent) / 2
Expand Down Expand Up @@ -487,7 +487,7 @@ func (n *node) dereference() {
}

// Update statistics.
n.bucket.tx.stats.IncNodeDeref(1)
n.bucket.tx.stats.NodeDeref++
}

// free adds the node's underlying page to the freelist.
Expand Down
29 changes: 25 additions & 4 deletions simulation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"testing"

bolt "go.etcd.io/bbolt"
Expand Down Expand Up @@ -52,10 +51,32 @@ func testSimulate(t *testing.T, openOption *bolt.Options, round, threadCount, pa
var wg sync.WaitGroup

// counter for how many goroutines were fired
var ocMu sync.RWMutex
var opCount int64
incOpCount := func() {
ocMu.Lock()
defer ocMu.Unlock()
opCount++
}
getOpCount := func() int64 {
ocMu.RLock()
defer ocMu.RUnlock()
return opCount
}

// counter for ignored operations
var icMu sync.RWMutex
var igCount int64
incIgCount := func() {
icMu.Lock()
defer icMu.Unlock()
igCount++
}
getIgCount := func() int64 {
icMu.RLock()
defer icMu.RUnlock()
return igCount
}

var errCh = make(chan error, threadCount)

Expand All @@ -82,7 +103,7 @@ func testSimulate(t *testing.T, openOption *bolt.Options, round, threadCount, pa
// Execute a thread for the given operation.
go func(writable bool, handler simulateHandler) {
defer wg.Done()
atomic.AddInt64(&opCount, 1)
incOpCount()
// Start transaction.
tx, err := db.Begin(writable)
if err != nil {
Expand Down Expand Up @@ -116,7 +137,7 @@ func testSimulate(t *testing.T, openOption *bolt.Options, round, threadCount, pa

// Ignore operation if we don't have data yet.
if qdb == nil {
atomic.AddInt64(&igCount, 1)
incIgCount()
return
}

Expand All @@ -135,7 +156,7 @@ func testSimulate(t *testing.T, openOption *bolt.Options, round, threadCount, pa

// Wait until all threads are done.
wg.Wait()
t.Logf("transactions:%d ignored:%d", opCount, igCount)
t.Logf("transactions:%d ignored:%d", getOpCount(), getIgCount())
close(errCh)
for err := range errCh {
if err != nil {
Expand Down
Loading

0 comments on commit 2e12f03

Please sign in to comment.