diff --git a/go/sync2/consolidator.go b/go/sync2/consolidator.go index f0090cfce39..2f052633691 100644 --- a/go/sync2/consolidator.go +++ b/go/sync2/consolidator.go @@ -32,7 +32,6 @@ type Consolidator interface { Record(query string) WaiterCountOfQuery(query string) int64 WaiterCountOfTotal() int64 - RemoveWaiterFromTotal(query string) } // PendingResult is a wrapper for result of a query. @@ -43,6 +42,7 @@ type PendingResult interface { SetResult(*sqltypes.Result) Result() *sqltypes.Result Wait() + RemoveOneCount() } type consolidator struct { @@ -96,18 +96,12 @@ func (co *consolidator) WaiterCountOfQuery(query string) int64 { return int64(0) } -// WaiterCountOfTotal returns *int64 +// WaiterCountOfTotal returns int64 // which holds the total number of waiting count, for all queries. func (co *consolidator) WaiterCountOfTotal() int64 { return atomic.LoadInt64(co.totalWaiterCount) } -// RemoveWaiterFromTotal removes waiting count of specified query -// from total number of waiting count. -func (co *consolidator) RemoveWaiterFromTotal(query string) { - atomic.AddInt64(co.totalWaiterCount, co.WaiterCountOfQuery(query)*-1) -} - // Broadcast removes the entry from current queries and releases the // lock on its Result. Broadcast should be invoked when original // query completes execution. @@ -145,6 +139,10 @@ func (rs *pendingResult) Wait() { rs.executing.RLock() } +func (rs *pendingResult) RemoveOneCount() { + atomic.AddInt64(rs.consolidator.totalWaiterCount, int64(-1)) +} + // ConsolidatorCache is a thread-safe object used for counting how often recent // queries have been consolidated. // It is also used by the txserializer package to count how often transactions diff --git a/go/sync2/consolidator_test.go b/go/sync2/consolidator_test.go index cea0298cecf..8e6f81a30b0 100644 --- a/go/sync2/consolidator_test.go +++ b/go/sync2/consolidator_test.go @@ -18,7 +18,9 @@ package sync2 import ( "reflect" + "sync" "testing" + "time" "vitess.io/vitess/go/sqltypes" ) @@ -88,9 +90,72 @@ func TestConsolidator(t *testing.T) { t.Fatalf("expected consolidator to have WaiterCountOfTotal %v and WaiterCountOfQuery %v", con.WaiterCountOfTotal(), con.WaiterCountOfQuery(sql)) } - con.Create(second_sql) + third, _ := con.Create(second_sql) if con.WaiterCountOfTotal() != 2 || con.WaiterCountOfQuery(second_sql) != 0 { t.Fatalf("expected consolidator to have WaiterCountOfTotal %v and WaiterCountOfQuery %v", con.WaiterCountOfTotal(), con.WaiterCountOfQuery(second_sql)) } + + // Remove 2 wait count, prepare for concurrnt tests + third.RemoveOneCount() + third.RemoveOneCount() + + // Run same_query_times concurrently for sql and second_sql + // Expect WaiterCountOfTotal() equals to (same_query_times) + // Also test WaiterCountOfTotal() is concurrent safe + same_query_times := 100 + var wg1 sync.WaitGroup + var wg2 sync.WaitGroup + var wg3 sync.WaitGroup + + go func() { + for i := 0; i < same_query_times; i++ { + wg1.Add(1) + go func(i int) { + defer wg1.Done() + this, _ := con.Create(sql) + go func() { + this.Wait() + }() + }(i) + } + }() + + go func() { + for i := 0; i < same_query_times; i++ { + wg2.Add(1) + go func(i int) { + defer wg2.Done() + this, _ := con.Create(second_sql) + go func() { + this.Wait() + }() + }(i) + } + }() + + // Test RemoveOneCount() is concurrent safe + // Remove same_query_times concurrently for second_sql + go func() { + for i := 0; i < same_query_times; i++ { + wg2.Add(1) + go func(i int) { + defer wg2.Done() + this, _ := con.Create(second_sql) + go func() { + this.RemoveOneCount() + }() + }(i) + } + }() + + time.Sleep(500 * time.Millisecond) + + wg1.Wait() + wg2.Wait() + wg3.Wait() + + if con.WaiterCountOfTotal() != int64(same_query_times) { + t.Fatalf("expected consolidator to have WaiterCountOfTotal %v", int64(same_query_times)) + } } diff --git a/go/sync2/fake_consolidator.go b/go/sync2/fake_consolidator.go index 6427a605f82..174f3ce7bfb 100644 --- a/go/sync2/fake_consolidator.go +++ b/go/sync2/fake_consolidator.go @@ -93,9 +93,6 @@ func (fr *FakeConsolidator) WaiterCountOfTotal() int64 { return int64(0) } -// RemoveWaiterFromTotal is currently a no-op. -func (fr *FakeConsolidator) RemoveWaiterFromTotal(query string) {} - // Broadcast records the Broadcast call for later verification. func (fr *FakePendingResult) Broadcast() { fr.BroadcastCalls++ @@ -125,3 +122,6 @@ func (fr *FakePendingResult) SetResult(result *sqltypes.Result) { func (fr *FakePendingResult) Wait() { fr.WaitCalls++ } + +// RemoveOneCount is currently a no-op. +func (fr *FakePendingResult) RemoveOneCount() {} diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 29b49327613..b5fb406ade7 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -706,7 +706,6 @@ func (qre *QueryExecutor) execSelect() (*sqltypes.Result, error) { q, original := c.Create(sqlWithoutComments) if original { defer q.Broadcast() - defer c.RemoveWaiterFromTotal(sqlWithoutComments) conn, err := qre.getConn() if err != nil { @@ -724,6 +723,7 @@ func (qre *QueryExecutor) execSelect() (*sqltypes.Result, error) { startTime := time.Now() q.Wait() qre.tsv.stats.WaitTimings.Record("Consolidations", startTime) + q.RemoveOneCount() } } if q.Err() != nil {