Skip to content

Commit

Permalink
add more tests
Browse files Browse the repository at this point in the history
Signed-off-by: Jun Wang <[email protected]>
  • Loading branch information
Jun Wang committed Dec 4, 2024
1 parent 45a9194 commit 8e62c33
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 13 deletions.
14 changes: 6 additions & 8 deletions go/sync2/consolidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ type Consolidator interface {
Record(query string)
WaiterCountOfQuery(query string) int64
WaiterCountOfTotal() int64
RemoveWaiterFromTotal(query string)
}

// PendingResult is a wrapper for result of a query.
Expand All @@ -43,6 +42,7 @@ type PendingResult interface {
SetResult(*sqltypes.Result)
Result() *sqltypes.Result
Wait()
RemoveOneCount()
}

type consolidator struct {
Expand Down Expand Up @@ -96,18 +96,12 @@ func (co *consolidator) WaiterCountOfQuery(query string) int64 {
return int64(0)
}

// WaiterCountOfTotal returns *int64
// WaiterCountOfTotal returns int64
// which holds the total number of waiting count, for all queries.
func (co *consolidator) WaiterCountOfTotal() int64 {
return atomic.LoadInt64(co.totalWaiterCount)
}

// RemoveWaiterFromTotal removes waiting count of specified query
// from total number of waiting count.
func (co *consolidator) RemoveWaiterFromTotal(query string) {
atomic.AddInt64(co.totalWaiterCount, co.WaiterCountOfQuery(query)*-1)
}

// Broadcast removes the entry from current queries and releases the
// lock on its Result. Broadcast should be invoked when original
// query completes execution.
Expand Down Expand Up @@ -145,6 +139,10 @@ func (rs *pendingResult) Wait() {
rs.executing.RLock()
}

func (rs *pendingResult) RemoveOneCount() {
atomic.AddInt64(rs.consolidator.totalWaiterCount, int64(-1))
}

// ConsolidatorCache is a thread-safe object used for counting how often recent
// queries have been consolidated.
// It is also used by the txserializer package to count how often transactions
Expand Down
67 changes: 66 additions & 1 deletion go/sync2/consolidator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package sync2

import (
"reflect"
"sync"
"testing"
"time"

"vitess.io/vitess/go/sqltypes"
)
Expand Down Expand Up @@ -88,9 +90,72 @@ func TestConsolidator(t *testing.T) {
t.Fatalf("expected consolidator to have WaiterCountOfTotal %v and WaiterCountOfQuery %v", con.WaiterCountOfTotal(), con.WaiterCountOfQuery(sql))
}

con.Create(second_sql)
third, _ := con.Create(second_sql)

if con.WaiterCountOfTotal() != 2 || con.WaiterCountOfQuery(second_sql) != 0 {
t.Fatalf("expected consolidator to have WaiterCountOfTotal %v and WaiterCountOfQuery %v", con.WaiterCountOfTotal(), con.WaiterCountOfQuery(second_sql))
}

// Remove 2 wait count, prepare for concurrnt tests
third.RemoveOneCount()
third.RemoveOneCount()

// Run same_query_times concurrently for sql and second_sql
// Expect WaiterCountOfTotal() equals to (same_query_times)
// Also test WaiterCountOfTotal() is concurrent safe
same_query_times := 100
var wg1 sync.WaitGroup
var wg2 sync.WaitGroup
var wg3 sync.WaitGroup

go func() {
for i := 0; i < same_query_times; i++ {
wg1.Add(1)
go func(i int) {
defer wg1.Done()
this, _ := con.Create(sql)
go func() {
this.Wait()
}()
}(i)
}
}()

go func() {
for i := 0; i < same_query_times; i++ {
wg2.Add(1)
go func(i int) {
defer wg2.Done()
this, _ := con.Create(second_sql)
go func() {
this.Wait()
}()
}(i)
}
}()

// Test RemoveOneCount() is concurrent safe
// Remove same_query_times concurrently for second_sql
go func() {
for i := 0; i < same_query_times; i++ {
wg2.Add(1)
go func(i int) {
defer wg2.Done()
this, _ := con.Create(second_sql)
go func() {
this.RemoveOneCount()
}()
}(i)
}
}()

time.Sleep(500 * time.Millisecond)

wg1.Wait()
wg2.Wait()
wg3.Wait()

if con.WaiterCountOfTotal() != int64(same_query_times) {
t.Fatalf("expected consolidator to have WaiterCountOfTotal %v", int64(same_query_times))
}
}
6 changes: 3 additions & 3 deletions go/sync2/fake_consolidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,6 @@ func (fr *FakeConsolidator) WaiterCountOfTotal() int64 {
return int64(0)
}

// RemoveWaiterFromTotal is currently a no-op.
func (fr *FakeConsolidator) RemoveWaiterFromTotal(query string) {}

// Broadcast records the Broadcast call for later verification.
func (fr *FakePendingResult) Broadcast() {
fr.BroadcastCalls++
Expand Down Expand Up @@ -125,3 +122,6 @@ func (fr *FakePendingResult) SetResult(result *sqltypes.Result) {
func (fr *FakePendingResult) Wait() {
fr.WaitCalls++
}

// RemoveOneCount is currently a no-op.
func (fr *FakePendingResult) RemoveOneCount() {}
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,6 @@ func (qre *QueryExecutor) execSelect() (*sqltypes.Result, error) {
q, original := c.Create(sqlWithoutComments)
if original {
defer q.Broadcast()
defer c.RemoveWaiterFromTotal(sqlWithoutComments)
conn, err := qre.getConn()

if err != nil {
Expand All @@ -724,6 +723,7 @@ func (qre *QueryExecutor) execSelect() (*sqltypes.Result, error) {
startTime := time.Now()
q.Wait()
qre.tsv.stats.WaitTimings.Record("Consolidations", startTime)
q.RemoveOneCount()
}
}
if q.Err() != nil {
Expand Down

0 comments on commit 8e62c33

Please sign in to comment.