diff --git a/go/vt/vttablet/endtoend/config_test.go b/go/vt/vttablet/endtoend/config_test.go index 99f1bc8583e..2eb36e2574a 100644 --- a/go/vt/vttablet/endtoend/config_test.go +++ b/go/vt/vttablet/endtoend/config_test.go @@ -116,64 +116,88 @@ func TestDisableConsolidator(t *testing.T) { } func TestConsolidatorReplicasOnly(t *testing.T) { - totalConsolidationsTag := "Waits/Histograms/Consolidations/Count" - initial := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag) - var wg sync.WaitGroup - wg.Add(2) - go func() { - framework.NewClient().Execute("select sleep(0.5) from dual", nil) - wg.Done() - }() - go func() { - framework.NewClient().Execute("select sleep(0.5) from dual", nil) - wg.Done() - }() - wg.Wait() - afterOne := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag) - assert.Equal(t, initial+1, afterOne, "expected one consolidation") - - revert := changeVar(t, "Consolidator", tabletenv.NotOnPrimary) - defer revert() - - // primary should not do query consolidation - var wg2 sync.WaitGroup - wg2.Add(2) - go func() { - framework.NewClient().Execute("select sleep(0.5) from dual", nil) - wg2.Done() - }() - go func() { - framework.NewClient().Execute("select sleep(0.5) from dual", nil) - wg2.Done() - }() - wg2.Wait() - noNewConsolidations := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag) - assert.Equal(t, afterOne, noNewConsolidations, "expected no new consolidations") - - // become a replica, where query consolidation should happen - client := framework.NewClientWithTabletType(topodatapb.TabletType_REPLICA) - - err := client.SetServingType(topodatapb.TabletType_REPLICA) - require.NoError(t, err) - defer func() { - err = client.SetServingType(topodatapb.TabletType_PRIMARY) - require.NoError(t, err) - }() + type executeFn func( + query string, bindvars map[string]*querypb.BindVariable, + ) (*sqltypes.Result, error) + + testCases := []struct { + name string + getExecuteFn func(qc *framework.QueryClient) executeFn + totalConsolidationsTag string + }{ + { + name: "Execute", + getExecuteFn: func(qc *framework.QueryClient) executeFn { return qc.Execute }, + totalConsolidationsTag: "Waits/Histograms/Consolidations/Count", + }, + { + name: "StreamExecute", + getExecuteFn: func(qc *framework.QueryClient) executeFn { return qc.StreamExecute }, + totalConsolidationsTag: "Waits/Histograms/StreamConsolidations/Count", + }, + } - initial = framework.FetchInt(framework.DebugVars(), totalConsolidationsTag) - var wg3 sync.WaitGroup - wg3.Add(2) - go func() { - client.Execute("select sleep(0.5) from dual", nil) - wg3.Done() - }() - go func() { - client.Execute("select sleep(0.5) from dual", nil) - wg3.Done() - }() - wg3.Wait() - afterOne = framework.FetchInt(framework.DebugVars(), totalConsolidationsTag) - assert.Equal(t, initial+1, afterOne, "expected another consolidation") + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + initial := framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag) + var wg sync.WaitGroup + wg.Add(2) + go func() { + testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil) + wg.Done() + }() + go func() { + testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil) + wg.Done() + }() + wg.Wait() + afterOne := framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag) + assert.Equal(t, initial+1, afterOne, "expected one consolidation") + + revert := changeVar(t, "Consolidator", tabletenv.NotOnPrimary) + defer revert() + + // primary should not do query consolidation + var wg2 sync.WaitGroup + wg2.Add(2) + go func() { + testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil) + wg2.Done() + }() + go func() { + testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil) + wg2.Done() + }() + wg2.Wait() + noNewConsolidations := framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag) + assert.Equal(t, afterOne, noNewConsolidations, "expected no new consolidations") + + // become a replica, where query consolidation should happen + client := framework.NewClientWithTabletType(topodatapb.TabletType_REPLICA) + + err := client.SetServingType(topodatapb.TabletType_REPLICA) + require.NoError(t, err) + defer func() { + err = client.SetServingType(topodatapb.TabletType_PRIMARY) + require.NoError(t, err) + }() + + initial = framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag) + var wg3 sync.WaitGroup + wg3.Add(2) + go func() { + testCase.getExecuteFn(client)("select sleep(0.5) from dual", nil) + wg3.Done() + }() + go func() { + testCase.getExecuteFn(client)("select sleep(0.5) from dual", nil) + wg3.Done() + }() + wg3.Wait() + afterOne = framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag) + assert.Equal(t, initial+1, afterOne, "expected another consolidation") + }) + } } func TestQueryPlanCache(t *testing.T) { diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index cf26d7c5019..efd872e1f1d 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -436,7 +436,7 @@ func (qre *QueryExecutor) Stream(callback StreamCallback) error { if consolidator := qre.tsv.qe.streamConsolidator; consolidator != nil { if qre.connID == 0 && qre.plan.PlanID == p.PlanSelectStream && qre.shouldConsolidate() { - return consolidator.Consolidate(qre.logStats, sqlWithoutComments, callback, + return consolidator.Consolidate(qre.tsv.stats.WaitTimings, qre.logStats, sqlWithoutComments, callback, func(callback StreamCallback) error { dbConn, err := qre.getStreamConn() if err != nil { diff --git a/go/vt/vttablet/tabletserver/stream_consolidator.go b/go/vt/vttablet/tabletserver/stream_consolidator.go index 497c9011040..22888419dc9 100644 --- a/go/vt/vttablet/tabletserver/stream_consolidator.go +++ b/go/vt/vttablet/tabletserver/stream_consolidator.go @@ -19,9 +19,11 @@ package tabletserver import ( "sync" "sync/atomic" + "time" "vitess.io/vitess/go/sqltypes" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" ) @@ -70,7 +72,7 @@ func (sc *StreamConsolidator) SetBlocking(block bool) { // `callback`. A `leaderCallback` must also be supplied: this function must perform the actual // query in the upstream MySQL server, yielding results into the modified callback that it receives // as an argument. -func (sc *StreamConsolidator) Consolidate(logStats *tabletenv.LogStats, sql string, callback StreamCallback, leaderCallback func(StreamCallback) error) error { +func (sc *StreamConsolidator) Consolidate(waitTimings *servenv.TimingsWrapper, logStats *tabletenv.LogStats, sql string, callback StreamCallback, leaderCallback func(StreamCallback) error) error { var ( inflight *streamInFlight catchup []*sqltypes.Result @@ -100,9 +102,11 @@ func (sc *StreamConsolidator) Consolidate(logStats *tabletenv.LogStats, sql stri // if we have a followChan, we're following up on a query that is already being served if followChan != nil { + startTime := time.Now() defer func() { memchange := inflight.unfollow(followChan, sc.cleanup) atomic.AddInt64(&sc.memory, memchange) + waitTimings.Record("StreamConsolidations", startTime) }() logStats.QuerySources |= tabletenv.QuerySourceConsolidator diff --git a/go/vt/vttablet/tabletserver/stream_consolidator_flaky_test.go b/go/vt/vttablet/tabletserver/stream_consolidator_flaky_test.go index 0c903933412..caa519cc477 100644 --- a/go/vt/vttablet/tabletserver/stream_consolidator_flaky_test.go +++ b/go/vt/vttablet/tabletserver/stream_consolidator_flaky_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/require" + "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/sqltypes" @@ -123,10 +124,12 @@ func (ct *consolidationTest) run(workers int, generateCallback func(int) (string go func(worker int) { defer wg.Done() + exporter := servenv.NewExporter("ConsolidatorTest", "") + timings := exporter.NewTimings("ConsolidatorWaits", "", "StreamConsolidations") logStats := tabletenv.NewLogStats(context.Background(), "StreamConsolidation") query, callback := generateCallback(worker) start := time.Now() - err := ct.cc.Consolidate(logStats, query, func(result *sqltypes.Result) error { + err := ct.cc.Consolidate(timings, logStats, query, func(result *sqltypes.Result) error { cr := ct.results[worker] cr.items = append(cr.items, result) atomic.AddInt64(&cr.count, 1) diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 090098aded6..681c1c33fd0 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -886,6 +886,7 @@ func (tsv *TabletServer) streamExecute(ctx context.Context, target *querypb.Targ ctx: ctx, logStats: logStats, tsv: tsv, + tabletType: target.GetTabletType(), setting: connSetting, } return qre.Stream(callback)