diff --git a/changelog/19.0/19.0.0/summary.md b/changelog/19.0/19.0.0/summary.md
index af9c83399a3..2b5fdfe3d65 100644
--- a/changelog/19.0/19.0.0/summary.md
+++ b/changelog/19.0/19.0.0/summary.md
@@ -8,6 +8,8 @@
- [VTTablet Flags](#vttablet-flags)
- **[Docker](#docker)**
- [New MySQL Image](#mysql-image)
+ - **[New Stats](#new-stats)**
+ - [Stream Consolidations](#stream-consolidations)
- **[VTGate](#vtgate)**
- [`FOREIGN_KEY_CHECKS` is now a Vitess Aware Variable](#fk-checks-vitess-aware)
- **[Query Compatibility](#query-compatibility)**
@@ -43,6 +45,12 @@ This lightweight image is a replacement of `vitess/lite` to only run `mysqld`.
Several tags are available to let you choose what version of MySQL you want to use: `vitess/mysql:8.0.30`, `vitess/mysql:8.0.34`.
+### new stats
+
+#### Stream Consolidations
+
+Prior to 19.0 VTTablet reported how much time non-streaming executions spend waiting for consolidations to occur. In 19.0, VTTablet reports a similar stat for streaming executions in `/debug/vars` stat `Waits.Histograms.StreamConsolidations`.
+
### VTGate
#### `FOREIGN_KEY_CHECKS` is now a Vitess Aware Variable
diff --git a/go/vt/vttablet/endtoend/config_test.go b/go/vt/vttablet/endtoend/config_test.go
index 60303cf4bf5..9eef54bd0bb 100644
--- a/go/vt/vttablet/endtoend/config_test.go
+++ b/go/vt/vttablet/endtoend/config_test.go
@@ -108,64 +108,88 @@ func TestDisableConsolidator(t *testing.T) {
}
func TestConsolidatorReplicasOnly(t *testing.T) {
- totalConsolidationsTag := "Waits/Histograms/Consolidations/Count"
- initial := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
- var wg sync.WaitGroup
- wg.Add(2)
- go func() {
- framework.NewClient().Execute("select sleep(0.5) from dual", nil)
- wg.Done()
- }()
- go func() {
- framework.NewClient().Execute("select sleep(0.5) from dual", nil)
- wg.Done()
- }()
- wg.Wait()
- afterOne := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
- assert.Equal(t, initial+1, afterOne, "expected one consolidation")
-
- revert := changeVar(t, "Consolidator", tabletenv.NotOnPrimary)
- defer revert()
-
- // primary should not do query consolidation
- var wg2 sync.WaitGroup
- wg2.Add(2)
- go func() {
- framework.NewClient().Execute("select sleep(0.5) from dual", nil)
- wg2.Done()
- }()
- go func() {
- framework.NewClient().Execute("select sleep(0.5) from dual", nil)
- wg2.Done()
- }()
- wg2.Wait()
- noNewConsolidations := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
- assert.Equal(t, afterOne, noNewConsolidations, "expected no new consolidations")
-
- // become a replica, where query consolidation should happen
- client := framework.NewClientWithTabletType(topodatapb.TabletType_REPLICA)
-
- err := client.SetServingType(topodatapb.TabletType_REPLICA)
- require.NoError(t, err)
- defer func() {
- err = client.SetServingType(topodatapb.TabletType_PRIMARY)
- require.NoError(t, err)
- }()
+ type executeFn func(
+ query string, bindvars map[string]*querypb.BindVariable,
+ ) (*sqltypes.Result, error)
+
+ testCases := []struct {
+ name string
+ getExecuteFn func(qc *framework.QueryClient) executeFn
+ totalConsolidationsTag string
+ }{
+ {
+ name: "Execute",
+ getExecuteFn: func(qc *framework.QueryClient) executeFn { return qc.Execute },
+ totalConsolidationsTag: "Waits/Histograms/Consolidations/Count",
+ },
+ {
+ name: "StreamExecute",
+ getExecuteFn: func(qc *framework.QueryClient) executeFn { return qc.StreamExecute },
+ totalConsolidationsTag: "Waits/Histograms/StreamConsolidations/Count",
+ },
+ }
- initial = framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
- var wg3 sync.WaitGroup
- wg3.Add(2)
- go func() {
- client.Execute("select sleep(0.5) from dual", nil)
- wg3.Done()
- }()
- go func() {
- client.Execute("select sleep(0.5) from dual", nil)
- wg3.Done()
- }()
- wg3.Wait()
- afterOne = framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
- assert.Equal(t, initial+1, afterOne, "expected another consolidation")
+ for _, testCase := range testCases {
+ t.Run(testCase.name, func(t *testing.T) {
+ initial := framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
+ var wg sync.WaitGroup
+ wg.Add(2)
+ go func() {
+ testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil)
+ wg.Done()
+ }()
+ go func() {
+ testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil)
+ wg.Done()
+ }()
+ wg.Wait()
+ afterOne := framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
+ assert.Equal(t, initial+1, afterOne, "expected one consolidation")
+
+ revert := changeVar(t, "Consolidator", tabletenv.NotOnPrimary)
+ defer revert()
+
+ // primary should not do query consolidation
+ var wg2 sync.WaitGroup
+ wg2.Add(2)
+ go func() {
+ testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil)
+ wg2.Done()
+ }()
+ go func() {
+ testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil)
+ wg2.Done()
+ }()
+ wg2.Wait()
+ noNewConsolidations := framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
+ assert.Equal(t, afterOne, noNewConsolidations, "expected no new consolidations")
+
+ // become a replica, where query consolidation should happen
+ client := framework.NewClientWithTabletType(topodatapb.TabletType_REPLICA)
+
+ err := client.SetServingType(topodatapb.TabletType_REPLICA)
+ require.NoError(t, err)
+ defer func() {
+ err = client.SetServingType(topodatapb.TabletType_PRIMARY)
+ require.NoError(t, err)
+ }()
+
+ initial = framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
+ var wg3 sync.WaitGroup
+ wg3.Add(2)
+ go func() {
+ testCase.getExecuteFn(client)("select sleep(0.5) from dual", nil)
+ wg3.Done()
+ }()
+ go func() {
+ testCase.getExecuteFn(client)("select sleep(0.5) from dual", nil)
+ wg3.Done()
+ }()
+ wg3.Wait()
+ afterOne = framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
+ assert.Equal(t, initial+1, afterOne, "expected another consolidation")
+ })
+ }
}
func TestQueryPlanCache(t *testing.T) {
diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go
index 63dcd42d0a8..735562c536f 100644
--- a/go/vt/vttablet/tabletserver/query_executor.go
+++ b/go/vt/vttablet/tabletserver/query_executor.go
@@ -340,7 +340,7 @@ func (qre *QueryExecutor) Stream(callback StreamCallback) error {
if consolidator := qre.tsv.qe.streamConsolidator; consolidator != nil {
if qre.connID == 0 && qre.plan.PlanID == p.PlanSelectStream && qre.shouldConsolidate() {
- return consolidator.Consolidate(qre.logStats, sqlWithoutComments, callback,
+ return consolidator.Consolidate(qre.tsv.stats.WaitTimings, qre.logStats, sqlWithoutComments, callback,
func(callback StreamCallback) error {
dbConn, err := qre.getStreamConn()
if err != nil {
diff --git a/go/vt/vttablet/tabletserver/stream_consolidator.go b/go/vt/vttablet/tabletserver/stream_consolidator.go
index 9f720059dce..cbf99eaffd4 100644
--- a/go/vt/vttablet/tabletserver/stream_consolidator.go
+++ b/go/vt/vttablet/tabletserver/stream_consolidator.go
@@ -19,9 +19,11 @@ package tabletserver
import (
"sync"
"sync/atomic"
+ "time"
"vitess.io/vitess/go/sqltypes"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
+ "vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
)
@@ -70,7 +72,7 @@ func (sc *StreamConsolidator) SetBlocking(block bool) {
// `callback`. A `leaderCallback` must also be supplied: this function must perform the actual
// query in the upstream MySQL server, yielding results into the modified callback that it receives
// as an argument.
-func (sc *StreamConsolidator) Consolidate(logStats *tabletenv.LogStats, sql string, callback StreamCallback, leaderCallback func(StreamCallback) error) error {
+func (sc *StreamConsolidator) Consolidate(waitTimings *servenv.TimingsWrapper, logStats *tabletenv.LogStats, sql string, callback StreamCallback, leaderCallback func(StreamCallback) error) error {
var (
inflight *streamInFlight
catchup []*sqltypes.Result
@@ -100,9 +102,11 @@ func (sc *StreamConsolidator) Consolidate(logStats *tabletenv.LogStats, sql stri
// if we have a followChan, we're following up on a query that is already being served
if followChan != nil {
+ startTime := time.Now()
defer func() {
memchange := inflight.unfollow(followChan, sc.cleanup)
atomic.AddInt64(&sc.memory, memchange)
+ waitTimings.Record("StreamConsolidations", startTime)
}()
logStats.QuerySources |= tabletenv.QuerySourceConsolidator
diff --git a/go/vt/vttablet/tabletserver/stream_consolidator_flaky_test.go b/go/vt/vttablet/tabletserver/stream_consolidator_flaky_test.go
index 0c903933412..caa519cc477 100644
--- a/go/vt/vttablet/tabletserver/stream_consolidator_flaky_test.go
+++ b/go/vt/vttablet/tabletserver/stream_consolidator_flaky_test.go
@@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/require"
+ "vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/sqltypes"
@@ -123,10 +124,12 @@ func (ct *consolidationTest) run(workers int, generateCallback func(int) (string
go func(worker int) {
defer wg.Done()
+ exporter := servenv.NewExporter("ConsolidatorTest", "")
+ timings := exporter.NewTimings("ConsolidatorWaits", "", "StreamConsolidations")
logStats := tabletenv.NewLogStats(context.Background(), "StreamConsolidation")
query, callback := generateCallback(worker)
start := time.Now()
- err := ct.cc.Consolidate(logStats, query, func(result *sqltypes.Result) error {
+ err := ct.cc.Consolidate(timings, logStats, query, func(result *sqltypes.Result) error {
cr := ct.results[worker]
cr.items = append(cr.items, result)
atomic.AddInt64(&cr.count, 1)
diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go
index 3b0ac598ad0..12c6a40868f 100644
--- a/go/vt/vttablet/tabletserver/tabletserver.go
+++ b/go/vt/vttablet/tabletserver/tabletserver.go
@@ -948,6 +948,7 @@ func (tsv *TabletServer) streamExecute(ctx context.Context, target *querypb.Targ
ctx: ctx,
logStats: logStats,
tsv: tsv,
+ tabletType: target.GetTabletType(),
setting: connSetting,
}
return qre.Stream(callback)