Skip to content

Commit

Permalink
[release-17.0] vtexplain: Ensure memory topo is set up for throttler (#…
Browse files Browse the repository at this point in the history
…15279) (#15283)

Signed-off-by: Dirkjan Bussink <[email protected]>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
  • Loading branch information
vitess-bot[bot] authored Feb 19, 2024
1 parent 2326dea commit cee8bd3
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 42 deletions.
4 changes: 3 additions & 1 deletion go/cmd/vtexplain/vtexplain.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"vitess.io/vitess/go/exit"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/vtexplain"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"

Expand Down Expand Up @@ -147,7 +148,8 @@ func parseAndRun() error {
Target: dbName,
}

vte, err := vtexplain.Init(vschema, schema, ksShardMap, opts)
ts := memorytopo.NewServer(vtexplain.Cell)
vte, err := vtexplain.Init(ts, vschema, schema, ksShardMap, opts)
if err != nil {
return err
}
Expand Down
7 changes: 5 additions & 2 deletions go/vt/srvtopo/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,11 @@ func (entry *watchEntry) onErrorLocked(err error, init bool) {
entry.value = nil
}
} else {
entry.lastError = fmt.Errorf("ResilientWatch stream failed for %v: %w", entry.key, err)
log.Errorf("%v", entry.lastError)
if !topo.IsErrType(err, topo.Interrupted) {
// No need to log if we're explicitly interrupted.
entry.lastError = fmt.Errorf("ResilientWatch stream failed for %v: %w", entry.key, err)
log.Errorf("%v", entry.lastError)
}

// Even though we didn't get a new value, update the lastValueTime
// here since the watch was successfully running before and we want
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vtadmin/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtadmin/cluster"
"vitess.io/vitess/go/vt/vtadmin/cluster/dynamic"
Expand Down Expand Up @@ -2148,7 +2149,8 @@ func (api *API) VTExplain(ctx context.Context, req *vtadminpb.VTExplainRequest)
return nil, er.Error()
}

vte, err := vtexplain.Init(srvVSchema, schema, shardMap, &vtexplain.Options{ReplicationMode: "ROW"})
ts := memorytopo.NewServer(vtexplain.Cell)
vte, err := vtexplain.Init(ts, srvVSchema, schema, shardMap, &vtexplain.Options{ReplicationMode: "ROW"})
if err != nil {
return nil, fmt.Errorf("error initilaizing vtexplain: %w", err)
}
Expand Down
7 changes: 4 additions & 3 deletions go/vt/vtexplain/vtexplain.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtgate"

"vitess.io/vitess/go/jsonutil"
Expand All @@ -53,7 +54,7 @@ func init() {
}

const (
vtexplainCell = "explainCell"
Cell = "explainCell"

// ModeMulti is the default mode with autocommit implemented at vtgate
ModeMulti = "multi"
Expand Down Expand Up @@ -180,7 +181,7 @@ type TabletActions struct {
}

// Init sets up the fake execution environment
func Init(vSchemaStr, sqlSchema, ksShardMapStr string, opts *Options) (*VTExplain, error) {
func Init(ts *topo.Server, vSchemaStr, sqlSchema, ksShardMapStr string, opts *Options) (*VTExplain, error) {
// Verify options
if opts.ReplicationMode != "ROW" && opts.ReplicationMode != "STATEMENT" {
return nil, fmt.Errorf("invalid replication mode \"%s\"", opts.ReplicationMode)
Expand All @@ -200,7 +201,7 @@ func Init(vSchemaStr, sqlSchema, ksShardMapStr string, opts *Options) (*VTExplai
Autocommit: true,
}}
vte.setGlobalTabletEnv(tabletEnv)
err = vte.initVtgateExecutor(vSchemaStr, ksShardMapStr, opts)
err = vte.initVtgateExecutor(ts, vSchemaStr, ksShardMapStr, opts)
if err != nil {
return nil, fmt.Errorf("initVtgateExecutor: %v", err.Error())
}
Expand Down
20 changes: 14 additions & 6 deletions go/vt/vtexplain/vtexplain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv/tabletenvtest"

querypb "vitess.io/vitess/go/vt/proto/query"
Expand All @@ -48,7 +49,7 @@ type testopts struct {
shardmap map[string]map[string]*topo.ShardInfo
}

func initTest(mode string, opts *Options, topts *testopts, t *testing.T) *VTExplain {
func initTest(ts *topo.Server, mode string, opts *Options, topts *testopts, t *testing.T) *VTExplain {
schema, err := os.ReadFile("testdata/test-schema.sql")
require.NoError(t, err)

Expand All @@ -64,7 +65,7 @@ func initTest(mode string, opts *Options, topts *testopts, t *testing.T) *VTExpl
}

opts.ExecutionMode = mode
vte, err := Init(string(vSchema), string(schema), shardmap, opts)
vte, err := Init(ts, string(vSchema), string(schema), shardmap, opts)
require.NoError(t, err, "vtexplain Init error\n%s", string(schema))
return vte
}
Expand All @@ -85,7 +86,9 @@ func testExplain(testcase string, opts *Options, t *testing.T) {

func runTestCase(testcase, mode string, opts *Options, topts *testopts, t *testing.T) {
t.Run(testcase, func(t *testing.T) {
vte := initTest(mode, opts, topts, t)
ts := memorytopo.NewServer(Cell)
vte := initTest(ts, mode, opts, topts, t)
defer vte.Stop()

sqlFile := fmt.Sprintf("testdata/%s-queries.sql", testcase)
sql, err := os.ReadFile(sqlFile)
Expand Down Expand Up @@ -171,7 +174,9 @@ func TestExplain(t *testing.T) {
}

func TestErrors(t *testing.T) {
vte := initTest(ModeMulti, defaultTestOpts(), &testopts{}, t)
ts := memorytopo.NewServer(Cell)
vte := initTest(ts, ModeMulti, defaultTestOpts(), &testopts{}, t)
defer vte.Stop()

tests := []struct {
SQL string
Expand Down Expand Up @@ -208,7 +213,9 @@ func TestErrors(t *testing.T) {
}

func TestJSONOutput(t *testing.T) {
vte := initTest(ModeMulti, defaultTestOpts(), &testopts{}, t)
ts := memorytopo.NewServer(Cell)
vte := initTest(ts, ModeMulti, defaultTestOpts(), &testopts{}, t)
defer vte.Stop()
sql := "select 1 from user where id = 1"
explains, err := vte.Run(sql)
require.NoError(t, err, "vtexplain error")
Expand Down Expand Up @@ -353,7 +360,8 @@ func TestInit(t *testing.T) {
}
}`
schema := "create table table_missing_primary_vindex (id int primary key)"
_, err := Init(vschema, schema, "", defaultTestOpts())
ts := memorytopo.NewServer(Cell)
_, err := Init(ts, vschema, schema, "", defaultTestOpts())
require.Error(t, err)
require.Contains(t, err.Error(), "missing primary col vindex")
}
Expand Down
60 changes: 43 additions & 17 deletions go/vt/vtexplain/vtexplain_vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,23 @@ package vtexplain
import (
"context"
"fmt"
"path"
"sort"
"strings"

"vitess.io/vitess/go/vt/vtgate/logstats"
"vitess.io/vitess/go/vt/vtgate/vindexes"

"vitess.io/vitess/go/cache"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"

"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/streamlog"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate"
"vitess.io/vitess/go/vt/vtgate/engine"
"vitess.io/vitess/go/vt/vtgate/logstats"
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vttablet/queryservice"

querypb "vitess.io/vitess/go/vt/proto/query"
Expand All @@ -50,14 +47,14 @@ import (
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
)

func (vte *VTExplain) initVtgateExecutor(vSchemaStr, ksShardMapStr string, opts *Options) error {
func (vte *VTExplain) initVtgateExecutor(ts *topo.Server, vSchemaStr, ksShardMapStr string, opts *Options) error {
vte.explainTopo = &ExplainTopo{NumShards: opts.NumShards}
vte.explainTopo.TopoServer = memorytopo.NewServer(vtexplainCell)
vte.explainTopo.TopoServer = ts
vte.healthCheck = discovery.NewFakeHealthCheck(nil)

resolver := vte.newFakeResolver(opts, vte.explainTopo, vtexplainCell)
resolver := vte.newFakeResolver(opts, vte.explainTopo, Cell)

err := vte.buildTopology(opts, vSchemaStr, ksShardMapStr, opts.NumShards)
err := vte.buildTopology(ts, opts, vSchemaStr, ksShardMapStr, opts.NumShards)
if err != nil {
return err
}
Expand All @@ -73,7 +70,7 @@ func (vte *VTExplain) initVtgateExecutor(vSchemaStr, ksShardMapStr string, opts

streamSize := 10
var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests
vte.vtgateExecutor = vtgate.NewExecutor(context.Background(), vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, cache.DefaultConfig, schemaTracker, false, opts.PlannerVersion)
vte.vtgateExecutor = vtgate.NewExecutor(context.Background(), vte.explainTopo, Cell, resolver, opts.Normalize, false, streamSize, cache.DefaultConfig, schemaTracker, false, opts.PlannerVersion)

queryLogBufferSize := 10
vtgate.SetQueryLogger(streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize))
Expand All @@ -96,7 +93,7 @@ func (vte *VTExplain) newFakeResolver(opts *Options, serv srvtopo.Server, cell s
return vtgate.NewResolver(srvResolver, serv, cell, sc)
}

func (vte *VTExplain) buildTopology(opts *Options, vschemaStr string, ksShardMapStr string, numShardsPerKeyspace int) error {
func (vte *VTExplain) buildTopology(ts *topo.Server, opts *Options, vschemaStr string, ksShardMapStr string, numShardsPerKeyspace int) error {
vte.explainTopo.Lock.Lock()
defer vte.explainTopo.Lock.Unlock()

Expand All @@ -121,6 +118,10 @@ func (vte *VTExplain) buildTopology(opts *Options, vschemaStr string, ksShardMap
return err
}

conn, err := ts.ConnForCell(context.Background(), Cell)
if err != nil {
return err
}
vte.explainTopo.TabletConns = make(map[string]*explainTablet)
vte.explainTopo.KeyspaceShards = make(map[string]map[string]*topodatapb.ShardReference)
for ks, vschema := range vte.explainTopo.Keyspaces {
Expand All @@ -131,6 +132,32 @@ func (vte *VTExplain) buildTopology(opts *Options, vschemaStr string, ksShardMap

vte.explainTopo.KeyspaceShards[ks] = make(map[string]*topodatapb.ShardReference)

srvPath := path.Join(topo.KeyspacesPath, ks, topo.SrvKeyspaceFile)
srvKeyspace := &topodatapb.SrvKeyspace{
Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{
{
ServedType: topodatapb.TabletType_PRIMARY,
ShardReferences: shards,
},
{
ServedType: topodatapb.TabletType_REPLICA,
ShardReferences: shards,
},
{
ServedType: topodatapb.TabletType_RDONLY,
ShardReferences: shards,
},
},
}
data, err := srvKeyspace.MarshalVT()
if err != nil {
return err
}
_, err = conn.Update(context.Background(), srvPath, data, nil)
if err != nil {
return err
}

for _, shard := range shards {
// If the topology is in the middle of a reshard, there can be two shards covering the same key range (e.g.
// both source shard 80- and target shard 80-c0 cover the keyrange 80-c0). For the purposes of explain, we
Expand All @@ -143,14 +170,13 @@ func (vte *VTExplain) buildTopology(opts *Options, vschemaStr string, ksShardMap
hostname := fmt.Sprintf("%s/%s", ks, shard.Name)
log.Infof("registering test tablet %s for keyspace %s shard %s", hostname, ks, shard.Name)

tablet := vte.healthCheck.AddFakeTablet(vtexplainCell, hostname, 1, ks, shard.Name, topodatapb.TabletType_PRIMARY, true, 1, nil, func(t *topodatapb.Tablet) queryservice.QueryService {
return vte.newTablet(opts, t)
tablet := vte.healthCheck.AddFakeTablet(Cell, hostname, 1, ks, shard.Name, topodatapb.TabletType_PRIMARY, true, 1, nil, func(t *topodatapb.Tablet) queryservice.QueryService {
return vte.newTablet(opts, t, ts)
})
vte.explainTopo.TabletConns[hostname] = tablet.(*explainTablet)
vte.explainTopo.KeyspaceShards[ks][shard.Name] = shard
}
}

return err
}

Expand Down
9 changes: 4 additions & 5 deletions go/vt/vtexplain/vtexplain_vttablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"strings"
"sync"

"vitess.io/vitess/go/vt/sidecardb"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/mysql/fakesqldb"
Expand All @@ -33,8 +31,9 @@ import (
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/sidecardb"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtgate/evalengine"

Expand Down Expand Up @@ -102,7 +101,7 @@ type explainTablet struct {

var _ queryservice.QueryService = (*explainTablet)(nil)

func (vte *VTExplain) newTablet(opts *Options, t *topodatapb.Tablet) *explainTablet {
func (vte *VTExplain) newTablet(opts *Options, t *topodatapb.Tablet, ts *topo.Server) *explainTablet {
db := fakesqldb.New(nil)
sidecardb.AddSchemaInitQueries(db, true)

Expand All @@ -117,7 +116,7 @@ func (vte *VTExplain) newTablet(opts *Options, t *topodatapb.Tablet) *explainTab
config.EnableTableGC = false

// XXX much of this is cloned from the tabletserver tests
tsv := tabletserver.NewTabletServer(topoproto.TabletAliasString(t.Alias), config, memorytopo.NewServer(""), t.Alias)
tsv := tabletserver.NewTabletServer(topoproto.TabletAliasString(t.Alias), config, ts, t.Alias)

tablet := explainTablet{db: db, tsv: tsv, vte: vte}
db.Handler = &tablet
Expand Down
18 changes: 13 additions & 5 deletions go/vt/vtexplain/vtexplain_vttablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package vtexplain
import (
"encoding/json"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand Down Expand Up @@ -67,7 +69,8 @@ create table t2 (
NumShards: 2,
}

vte, err := Init(testVSchema, testSchema, "", opts)
ts := memorytopo.NewServer(Cell)
vte, err := Init(ts, testVSchema, testSchema, "", opts)
require.NoError(t, err)
defer vte.Stop()

Expand Down Expand Up @@ -123,16 +126,21 @@ create table test_partitioned (
if err != nil {
t.Fatalf("parseSchema: %v", err)
}
vte := initTest(ModeMulti, defaultTestOpts(), &testopts{}, t)
ts := memorytopo.NewServer(Cell)
vte := initTest(ts, ModeMulti, defaultTestOpts(), &testopts{}, t)

tabletEnv, _ := newTabletEnvironment(ddls, defaultTestOpts())
vte.setGlobalTabletEnv(tabletEnv)

tablet := vte.newTablet(defaultTestOpts(), &topodatapb.Tablet{
Keyspace: "test_keyspace",
Keyspace: "ks_sharded",
Shard: "-80",
Alias: &topodatapb.TabletAlias{},
})
Alias: &topodatapb.TabletAlias{
Cell: Cell,
},
}, ts)

time.Sleep(10 * time.Millisecond)
se := tablet.tsv.SchemaEngine()
tables := se.GetSchema()

Expand Down
6 changes: 4 additions & 2 deletions go/vt/vttablet/tabletserver/throttle/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,9 @@ func (throttler *Throttler) normalizeThrottlerConfig(thottlerConfig *topodatapb.
func (throttler *Throttler) WatchSrvKeyspaceCallback(srvks *topodatapb.SrvKeyspace, err error) bool {
log.Infof("Throttler: WatchSrvKeyspaceCallback called with: %+v", srvks)
if err != nil {
log.Errorf("WatchSrvKeyspaceCallback error: %v", err)
if !topo.IsErrType(err, topo.Interrupted) && !errors.Is(err, context.Canceled) {
log.Errorf("WatchSrvKeyspaceCallback error: %v", err)
}
return false
}
throttlerConfig := throttler.normalizeThrottlerConfig(srvks.ThrottlerConfig)
Expand Down Expand Up @@ -477,7 +479,7 @@ func (throttler *Throttler) Open() error {

throttlerConfig, err := throttler.readThrottlerConfig(ctx)
if err == nil {
log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): success reading throttler config: %+v", throttlerConfig)
log.Infof("Throttler.retryReadAndApplyThrottlerConfig(): success reading throttler config: %+v", throttlerConfig)
// It's possible that during a retry-sleep, the throttler is closed and opened again, leading
// to two (or more) instances of this goroutine. That's not a big problem; it's fine if all
// attempt to read the throttler config; but we just want to ensure they don't step on each other
Expand Down

0 comments on commit cee8bd3

Please sign in to comment.