Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tabletserver: do not consolidate streams on primary tablet when consolidator mode is notOnPrimary #14332

Merged
merged 3 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions changelog/19.0/19.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
- [VTTablet Flags](#vttablet-flags)
- **[Docker](#docker)**
- [New MySQL Image](#mysql-image)
- **[New Stats](#new-stats)**
- [Stream Consolidations](#stream-consolidations)
- **[VTGate](#vtgate)**
- [`FOREIGN_KEY_CHECKS` is now a Vitess Aware Variable](#fk-checks-vitess-aware)
- **[Query Compatibility](#query-compatibility)**
Expand All @@ -33,6 +35,12 @@ This lightweight image is a replacement of `vitess/lite` to only run `mysqld`.

Several tags are available to let you choose what version of MySQL you want to use: `vitess/mysql:8.0.30`, `vitess/mysql:8.0.34`.

### <a id="new-stats"/>new stats

#### <a id="stream-consolidations"/>Stream Consolidations

Prior to 19.0 VTTablet reported how much time non-streaming executions spend waiting for consolidations to occur. In 19.0, VTTablet reports a similar stat for streaming executions in `/debug/vars` stat `Waits.Histograms.StreamConsolidations`.

### <a id="vtgate"/>VTGate

#### <a id="fk-checks-vitess-aware"/>`FOREIGN_KEY_CHECKS` is now a Vitess Aware Variable
Expand Down
138 changes: 81 additions & 57 deletions go/vt/vttablet/endtoend/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,64 +108,88 @@ func TestDisableConsolidator(t *testing.T) {
}

func TestConsolidatorReplicasOnly(t *testing.T) {
totalConsolidationsTag := "Waits/Histograms/Consolidations/Count"
initial := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
var wg sync.WaitGroup
wg.Add(2)
go func() {
framework.NewClient().Execute("select sleep(0.5) from dual", nil)
wg.Done()
}()
go func() {
framework.NewClient().Execute("select sleep(0.5) from dual", nil)
wg.Done()
}()
wg.Wait()
afterOne := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
assert.Equal(t, initial+1, afterOne, "expected one consolidation")

revert := changeVar(t, "Consolidator", tabletenv.NotOnPrimary)
defer revert()

// primary should not do query consolidation
var wg2 sync.WaitGroup
wg2.Add(2)
go func() {
framework.NewClient().Execute("select sleep(0.5) from dual", nil)
wg2.Done()
}()
go func() {
framework.NewClient().Execute("select sleep(0.5) from dual", nil)
wg2.Done()
}()
wg2.Wait()
noNewConsolidations := framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
assert.Equal(t, afterOne, noNewConsolidations, "expected no new consolidations")

// become a replica, where query consolidation should happen
client := framework.NewClientWithTabletType(topodatapb.TabletType_REPLICA)

err := client.SetServingType(topodatapb.TabletType_REPLICA)
require.NoError(t, err)
defer func() {
err = client.SetServingType(topodatapb.TabletType_PRIMARY)
require.NoError(t, err)
}()
type executeFn func(
query string, bindvars map[string]*querypb.BindVariable,
) (*sqltypes.Result, error)

testCases := []struct {
name string
getExecuteFn func(qc *framework.QueryClient) executeFn
totalConsolidationsTag string
}{
{
name: "Execute",
getExecuteFn: func(qc *framework.QueryClient) executeFn { return qc.Execute },
totalConsolidationsTag: "Waits/Histograms/Consolidations/Count",
},
{
name: "StreamExecute",
getExecuteFn: func(qc *framework.QueryClient) executeFn { return qc.StreamExecute },
totalConsolidationsTag: "Waits/Histograms/StreamConsolidations/Count",
},
}

initial = framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
var wg3 sync.WaitGroup
wg3.Add(2)
go func() {
client.Execute("select sleep(0.5) from dual", nil)
wg3.Done()
}()
go func() {
client.Execute("select sleep(0.5) from dual", nil)
wg3.Done()
}()
wg3.Wait()
afterOne = framework.FetchInt(framework.DebugVars(), totalConsolidationsTag)
assert.Equal(t, initial+1, afterOne, "expected another consolidation")
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
initial := framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
var wg sync.WaitGroup
wg.Add(2)
go func() {
testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil)
wg.Done()
}()
go func() {
testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil)
wg.Done()
}()
wg.Wait()
afterOne := framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
assert.Equal(t, initial+1, afterOne, "expected one consolidation")

revert := changeVar(t, "Consolidator", tabletenv.NotOnPrimary)
defer revert()

// primary should not do query consolidation
var wg2 sync.WaitGroup
wg2.Add(2)
go func() {
testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil)
wg2.Done()
}()
go func() {
testCase.getExecuteFn(framework.NewClient())("select sleep(0.5) from dual", nil)
wg2.Done()
}()
wg2.Wait()
noNewConsolidations := framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
assert.Equal(t, afterOne, noNewConsolidations, "expected no new consolidations")

// become a replica, where query consolidation should happen
client := framework.NewClientWithTabletType(topodatapb.TabletType_REPLICA)

err := client.SetServingType(topodatapb.TabletType_REPLICA)
require.NoError(t, err)
defer func() {
err = client.SetServingType(topodatapb.TabletType_PRIMARY)
require.NoError(t, err)
}()

initial = framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
var wg3 sync.WaitGroup
wg3.Add(2)
go func() {
testCase.getExecuteFn(client)("select sleep(0.5) from dual", nil)
wg3.Done()
}()
go func() {
testCase.getExecuteFn(client)("select sleep(0.5) from dual", nil)
wg3.Done()
}()
wg3.Wait()
afterOne = framework.FetchInt(framework.DebugVars(), testCase.totalConsolidationsTag)
assert.Equal(t, initial+1, afterOne, "expected another consolidation")
})
}
}

func TestQueryPlanCache(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (qre *QueryExecutor) Stream(callback StreamCallback) error {

if consolidator := qre.tsv.qe.streamConsolidator; consolidator != nil {
if qre.connID == 0 && qre.plan.PlanID == p.PlanSelectStream && qre.shouldConsolidate() {
return consolidator.Consolidate(qre.logStats, sqlWithoutComments, callback,
return consolidator.Consolidate(qre.tsv.stats.WaitTimings, qre.logStats, sqlWithoutComments, callback,
func(callback StreamCallback) error {
dbConn, err := qre.getStreamConn()
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion go/vt/vttablet/tabletserver/stream_consolidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package tabletserver
import (
"sync"
"sync/atomic"
"time"

"vitess.io/vitess/go/sqltypes"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
)
Expand Down Expand Up @@ -70,7 +72,7 @@ func (sc *StreamConsolidator) SetBlocking(block bool) {
// `callback`. A `leaderCallback` must also be supplied: this function must perform the actual
// query in the upstream MySQL server, yielding results into the modified callback that it receives
// as an argument.
func (sc *StreamConsolidator) Consolidate(logStats *tabletenv.LogStats, sql string, callback StreamCallback, leaderCallback func(StreamCallback) error) error {
func (sc *StreamConsolidator) Consolidate(waitTimings *servenv.TimingsWrapper, logStats *tabletenv.LogStats, sql string, callback StreamCallback, leaderCallback func(StreamCallback) error) error {
var (
inflight *streamInFlight
catchup []*sqltypes.Result
Expand Down Expand Up @@ -100,9 +102,11 @@ func (sc *StreamConsolidator) Consolidate(logStats *tabletenv.LogStats, sql stri

// if we have a followChan, we're following up on a query that is already being served
if followChan != nil {
startTime := time.Now()
defer func() {
memchange := inflight.unfollow(followChan, sc.cleanup)
atomic.AddInt64(&sc.memory, memchange)
waitTimings.Record("StreamConsolidations", startTime)
}()

logStats.QuerySources |= tabletenv.QuerySourceConsolidator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -123,10 +124,12 @@ func (ct *consolidationTest) run(workers int, generateCallback func(int) (string

go func(worker int) {
defer wg.Done()
exporter := servenv.NewExporter("ConsolidatorTest", "")
timings := exporter.NewTimings("ConsolidatorWaits", "", "StreamConsolidations")
logStats := tabletenv.NewLogStats(context.Background(), "StreamConsolidation")
query, callback := generateCallback(worker)
start := time.Now()
err := ct.cc.Consolidate(logStats, query, func(result *sqltypes.Result) error {
err := ct.cc.Consolidate(timings, logStats, query, func(result *sqltypes.Result) error {
cr := ct.results[worker]
cr.items = append(cr.items, result)
atomic.AddInt64(&cr.count, 1)
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,7 @@ func (tsv *TabletServer) streamExecute(ctx context.Context, target *querypb.Targ
ctx: ctx,
logStats: logStats,
tsv: tsv,
tabletType: target.GetTabletType(),
setting: connSetting,
}
return qre.Stream(callback)
Expand Down
Loading