From 2a786e52dadbef030f5bb1b64e7488456a38e116 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Fri, 21 Jun 2024 19:32:22 -0400 Subject: [PATCH] add support for vtgate traffic mirroring (query serving) Signed-off-by: Max Englander --- go/test/vschemawrapper/vschema_wrapper.go | 14 + go/vt/vtgate/bench_test.go | 1 - go/vt/vtgate/endtoend/main_test.go | 72 ++++ go/vt/vtgate/endtoend/mirror_test.go | 180 ++++++++ go/vt/vtgate/endtoend/schema.sql | 12 +- go/vt/vtgate/endtoend/vstream_test.go | 10 +- go/vt/vtgate/engine/cached_size.go | 38 +- go/vt/vtgate/engine/fake_vcursor_test.go | 27 ++ go/vt/vtgate/engine/mirror.go | 219 ++++++++++ go/vt/vtgate/engine/mirror_test.go | 382 +++++++++++++++++ go/vt/vtgate/engine/plan.go | 14 +- go/vt/vtgate/engine/primitive.go | 3 + go/vt/vtgate/executor_select_test.go | 57 +++ go/vt/vtgate/logstats/logstats.go | 11 +- go/vt/vtgate/logstats/logstats_test.go | 6 +- go/vt/vtgate/planbuilder/builder.go | 116 ++++- go/vt/vtgate/planbuilder/ddl.go | 2 +- go/vt/vtgate/planbuilder/operators/delete.go | 9 +- go/vt/vtgate/planbuilder/operators/helpers.go | 74 ++-- go/vt/vtgate/planbuilder/operators/insert.go | 9 +- go/vt/vtgate/planbuilder/operators/route.go | 6 +- go/vt/vtgate/planbuilder/operators/table.go | 9 +- go/vt/vtgate/planbuilder/operators/update.go | 9 +- go/vt/vtgate/planbuilder/operators/vindex.go | 9 +- go/vt/vtgate/planbuilder/plan_test.go | 41 ++ .../planbuilder/plancontext/mirror_vschema.go | 265 ++++++++++++ .../vtgate/planbuilder/plancontext/vschema.go | 4 + go/vt/vtgate/planbuilder/select.go | 12 +- .../planbuilder/single_sharded_shortcut.go | 2 +- .../planbuilder/testdata/mirror_cases.json | 397 ++++++++++++++++++ .../testdata/vschemas/mirror_schema.json | 99 +++++ go/vt/vtgate/planbuilder/vexplain.go | 7 +- go/vt/vtgate/sandbox_test.go | 13 + go/vt/vtgate/vcursor_impl.go | 50 ++- go/vt/vtgate/vindexes/vschema.go | 260 +++++++++++- go/vt/vtgate/vindexes/vschema_test.go | 287 ++++++++++++- go/vt/vtgate/vschema_manager_test.go | 4 + .../tabletserver/vstreamer/engine_test.go | 1 + 38 files changed, 2631 insertions(+), 100 deletions(-) create mode 100644 go/vt/vtgate/endtoend/mirror_test.go create mode 100644 go/vt/vtgate/engine/mirror.go create mode 100644 go/vt/vtgate/engine/mirror_test.go create mode 100644 go/vt/vtgate/planbuilder/plancontext/mirror_vschema.go create mode 100644 go/vt/vtgate/planbuilder/testdata/mirror_cases.json create mode 100644 go/vt/vtgate/planbuilder/testdata/vschemas/mirror_schema.json diff --git a/go/test/vschemawrapper/vschema_wrapper.go b/go/test/vschemawrapper/vschema_wrapper.go index 4d1c424dda8..e7214783595 100644 --- a/go/test/vschemawrapper/vschema_wrapper.go +++ b/go/test/vschemawrapper/vschema_wrapper.go @@ -342,3 +342,17 @@ func (vw *VSchemaWrapper) FindRoutedShard(keyspace, shard string) (string, error func (vw *VSchemaWrapper) IsViewsEnabled() bool { return vw.EnableViews } + +// FindMirrorRule finds the mirror rule for the requested keyspace, table +// name, and the tablet type in the VSchema. +func (vs *VSchemaWrapper) FindMirrorRule(tab sqlparser.TableName) (*vindexes.MirrorRule, string, topodatapb.TabletType, key.Destination, error) { + destKeyspace, destTabletType, dest, err := topoproto.ParseDestination(tab.Qualifier.String(), topodatapb.TabletType_PRIMARY) + if err != nil { + return nil, "", destTabletType, nil, err + } + mirrorRule, err := vs.V.FindMirrorRule(destKeyspace, tab.Name.String(), destTabletType) + if err != nil { + return nil, "", destTabletType, nil, err + } + return mirrorRule, destKeyspace, destTabletType, dest, err +} diff --git a/go/vt/vtgate/bench_test.go b/go/vt/vtgate/bench_test.go index 5c64c7e3473..5da1af896d0 100644 --- a/go/vt/vtgate/bench_test.go +++ b/go/vt/vtgate/bench_test.go @@ -54,7 +54,6 @@ func init() { fmt.Fprintf(buf, " and v%d = '%d%s'", i, i, baseval.String()) } benchQuery = buf.String() - // fmt.Printf("len: %d\n", len(benchQuery)) } func BenchmarkWithNormalizer(b *testing.B) { diff --git a/go/vt/vtgate/endtoend/main_test.go b/go/vt/vtgate/endtoend/main_test.go index b471786b78e..8f8cfb4b5a3 100644 --- a/go/vt/vtgate/endtoend/main_test.go +++ b/go/vt/vtgate/endtoend/main_test.go @@ -166,6 +166,18 @@ var ( Type: sqltypes.Char, }}, }, + "mirror_tbl1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "id", + Name: "hash", + }}, + }, + "mirror_tbl2": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "id", + Name: "hash", + }}, + }, }, } @@ -175,6 +187,11 @@ create table t1_copy_all_ks2( id2 bigint, primary key(id1) ) Engine=InnoDB; + +create table mirror_tbl1( + id bigint not null, + primary key(id) +) Engine=InnoDB; ` vschema2 = &vschemapb.Keyspace{ @@ -191,6 +208,48 @@ create table t1_copy_all_ks2( Name: "hash", }}, }, + "mirror_tbl1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "id", + Name: "hash", + }}, + }, + }, + } + + schema3 = ` +create table t1_copy_all_ks3( + id1 bigint, + id2 bigint, + primary key(id1) +) Engine=InnoDB; + +create table mirror_tbl2( + id bigint not null, + primary key(id) +) Engine=InnoDB; +` + + vschema3 = &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1_copy_all_ks3": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "id1", + Name: "hash", + }}, + }, + "mirror_tbl2": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "id", + Name: "hash", + }}, + }, }, } ) @@ -218,6 +277,14 @@ func TestMain(m *testing.M) { Name: "80-", }}, }, + { + Name: "ks3", + Shards: []*vttestpb.Shard{{ + Name: "-80", + }, { + Name: "80-", + }}, + }, }, } if err := cfg.InitSchemas("ks", Schema, vschema); err != nil { @@ -231,6 +298,11 @@ func TestMain(m *testing.M) { os.RemoveAll(cfg.SchemaDir) return 1 } + if err := cfg.InitSchemas("ks3", schema3, vschema3); err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.RemoveAll(cfg.SchemaDir) + return 1 + } cluster = &vttest.LocalCluster{ Config: cfg, diff --git a/go/vt/vtgate/endtoend/mirror_test.go b/go/vt/vtgate/endtoend/mirror_test.go new file mode 100644 index 00000000000..26175d1a8a8 --- /dev/null +++ b/go/vt/vtgate/endtoend/mirror_test.go @@ -0,0 +1,180 @@ +package endtoend + +import ( + "bytes" + "context" + "fmt" + "math/rand" + osExec "os/exec" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" +) + +var mirrorInitOnce sync.Once + +func BenchmarkMirror(b *testing.B) { + const numRows = 10000 + + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + if err != nil { + b.Fatal(err) + } + defer conn.Close() + + var query bytes.Buffer + + mirrorInitOnce.Do(func() { + b.Logf("seeding database for benchmark...") + + for i := 0; i < numRows; i++ { + query.Reset() + query.WriteString(fmt.Sprintf("INSERT INTO ks.mirror_tbl1(id) VALUES(%d)", i)) + _, err = conn.ExecuteFetch(query.String(), -1, false) + if err != nil { + b.Fatal(err) + } + + query.Reset() + query.WriteString(fmt.Sprintf("INSERT INTO ks2.mirror_tbl1(id) VALUES(%d)", i)) + _, err = conn.ExecuteFetch(query.String(), -1, false) + if err != nil { + b.Fatal(err) + } + + query.Reset() + query.WriteString(fmt.Sprintf("INSERT INTO ks.mirror_tbl2(id) VALUES(%d)", i)) + _, err = conn.ExecuteFetch(query.String(), -1, false) + if err != nil { + b.Fatal(err) + } + + query.Reset() + query.WriteString(fmt.Sprintf("INSERT INTO ks3.mirror_tbl2(id) VALUES(%d)", i)) + _, err = conn.ExecuteFetch(query.String(), -1, false) + if err != nil { + b.Fatal(err) + } + } + + b.Logf("finished (inserted %d rows)", numRows) + }) + + testCases := []struct { + name string + run func(*testing.B, *rand.Rand) + }{ + { + name: "point select, { ks => ks1 }.tbl1", + run: func(b *testing.B, rnd *rand.Rand) { + for i := 0; i < b.N; i++ { + id := rnd.Intn(numRows) + query.Reset() + _, _ = fmt.Fprintf( + &query, + "SELECT t1.id, t2.id FROM ks.mirror_tbl1 AS t1, ks.mirror_tbl2 AS t2 WHERE t1.id = %d AND t2.id = %d", + id, id, + ) + _, err := conn.ExecuteFetch(query.String(), 1, false) + if err != nil { + b.Error(err) + } + } + }, + }, + { + name: "point select, { ks => ks2 }.tbl1, { ks => ks3 }.tbl2", + run: func(b *testing.B, rnd *rand.Rand) { + for i := 0; i < b.N; i++ { + id := rnd.Intn(numRows) + query.Reset() + _, _ = fmt.Fprintf( + &query, + "SELECT t1.id, t2.id FROM ks.mirror_tbl1 AS t1, ks.mirror_tbl2 AS t2 WHERE t1.id = %d AND t2.id = %d", + id, id, + ) + _, err := conn.ExecuteFetch(query.String(), 1, false) + if err != nil { + b.Error(err) + } + } + }, + }, + } + + // Each time this BenchmarkMirror runs, use a different source of + // random-ness. But use the same source of randomness across test cases and + // mirror percentages sub-tests. + randSeed := time.Now().UnixNano() + + for _, tc := range testCases { + b.Run(tc.name, func(b *testing.B) { + b.Run("mirror 0%", func(b *testing.B) { + mirrorTraffic(b, 0) + b.ResetTimer() + tc.run(b, rand.New(rand.NewSource(randSeed))) + }) + + b.Run("mirror 1%", func(b *testing.B) { + mirrorTraffic(b, 1) + b.ResetTimer() + tc.run(b, rand.New(rand.NewSource(randSeed))) + }) + + b.Run("mirror 5%", func(b *testing.B) { + mirrorTraffic(b, 5) + b.ResetTimer() + tc.run(b, rand.New(rand.NewSource(randSeed))) + }) + + b.Run("mirror 10%", func(b *testing.B) { + mirrorTraffic(b, 10) + b.ResetTimer() + tc.run(b, rand.New(rand.NewSource(randSeed))) + }) + + b.Run("mirror 25%", func(b *testing.B) { + mirrorTraffic(b, 25) + b.ResetTimer() + tc.run(b, rand.New(rand.NewSource(randSeed))) + }) + + b.Run("mirror 50%", func(b *testing.B) { + mirrorTraffic(b, 50) + b.ResetTimer() + tc.run(b, rand.New(rand.NewSource(randSeed))) + }) + + b.Run("mirror 100%", func(b *testing.B) { + mirrorTraffic(b, 100) + b.ResetTimer() + tc.run(b, rand.New(rand.NewSource(randSeed))) + }) + }) + } +} + +func mirrorTraffic(b *testing.B, percent float32) { + server := fmt.Sprintf("localhost:%v", cluster.VTProcess().PortGrpc) + rules := fmt.Sprintf(`{ + "rules": [ + { + "from_table": "ks.mirror_tbl1", + "to_table": "ks2.mirror_tbl1", + "percent": %f + }, + { + "from_table": "ks.mirror_tbl2", + "to_table": "ks3.mirror_tbl2", + "percent": %f + } + ] + }`, percent, percent) + _, err := osExec.Command("vtctldclient", "--server", server, "ApplyMirrorRules", "--rules", rules).CombinedOutput() + require.NoError(b, err) +} diff --git a/go/vt/vtgate/endtoend/schema.sql b/go/vt/vtgate/endtoend/schema.sql index d543d130c14..4713c9057cf 100644 --- a/go/vt/vtgate/endtoend/schema.sql +++ b/go/vt/vtgate/endtoend/schema.sql @@ -79,4 +79,14 @@ create table oltp_test( c char(120) default '' not null, pad char(60) default '' not null, primary key (id) -) Engine=InnoDB; \ No newline at end of file +) Engine=InnoDB; + +create table mirror_tbl1( + id bigint not null, + primary key(id) +) Engine=InnoDB; + +create table mirror_tbl2( + id bigint not null, + primary key(id) +) Engine=InnoDB; diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index 246d17f88b5..1381b9448bd 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -279,6 +279,11 @@ func TestVStreamCopyUnspecifiedShardGtid(t *testing.T) { require.NoError(t, err) } + _, err = conn.ExecuteFetch("insert into t1_copy_all_ks3(id1,id2) values(30,30), (40,40)", 1, false) + if err != nil { + require.NoError(t, err) + } + filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ Match: "/t1_copy_all.*/", @@ -303,6 +308,7 @@ func TestVStreamCopyUnspecifiedShardGtid(t *testing.T) { // copy phase operations in the vstream. expectedKs1EventNum := 2 /* num shards */ * (9 /* begin/field/vgtid:pos/4 rowevents avg/vgitd: lastpk/commit) */ + 3 /* begin/vgtid/commit for completed table */ + 1 /* copy operation completed */) expectedKs2EventNum := 2 /* num shards */ * (6 /* begin/field/vgtid:pos/1 rowevents avg/vgitd: lastpk/commit) */ + 3 /* begin/vgtid/commit for completed table */ + 1 /* copy operation completed */) + expectedKs3EventNum := 2 /* num shards */ * (6 /* begin/field/vgtid:pos/1 rowevents avg/vgitd: lastpk/commit) */ + 3 /* begin/vgtid/commit for completed table */ + 1 /* copy operation completed */) expectedFullyCopyCompletedNum := 1 cases := []struct { @@ -316,12 +322,14 @@ func TestVStreamCopyUnspecifiedShardGtid(t *testing.T) { shardGtid: &binlogdatapb.ShardGtid{ Keyspace: "/.*", }, - expectedEventNum: expectedKs1EventNum + expectedKs2EventNum + expectedFullyCopyCompletedNum, + expectedEventNum: expectedKs1EventNum + expectedKs2EventNum + expectedKs3EventNum + expectedFullyCopyCompletedNum, expectedCompletedEvents: []string{ `type:COPY_COMPLETED keyspace:"ks" shard:"-80"`, `type:COPY_COMPLETED keyspace:"ks" shard:"80-"`, `type:COPY_COMPLETED keyspace:"ks2" shard:"-80"`, `type:COPY_COMPLETED keyspace:"ks2" shard:"80-"`, + `type:COPY_COMPLETED keyspace:"ks3" shard:"-80"`, + `type:COPY_COMPLETED keyspace:"ks3" shard:"80-"`, `type:COPY_COMPLETED`, }, }, diff --git a/go/vt/vtgate/engine/cached_size.go b/go/vt/vtgate/engine/cached_size.go index e65ff61a9f6..500587d0754 100644 --- a/go/vt/vtgate/engine/cached_size.go +++ b/go/vt/vtgate/engine/cached_size.go @@ -731,6 +731,24 @@ func (cached *MergeSort) CachedSize(alloc bool) int64 { } return size } +func (cached *Mirror) CachedSize(alloc bool) int64 { + if cached == nil { + return int64(0) + } + size := int64(0) + if alloc { + size += int64(32) + } + // field Primitive vitess.io/vitess/go/vt/vtgate/engine.Primitive + if cc, ok := cached.Primitive.(cachedObject); ok { + size += cc.CachedSize(true) + } + // field Target vitess.io/vitess/go/vt/vtgate/engine.MirrorTarget + if cc, ok := cached.Target.(cachedObject); ok { + size += cc.CachedSize(true) + } + return size +} func (cached *NonLiteralUpdateInfo) CachedSize(alloc bool) int64 { if cached == nil { return int64(0) @@ -795,6 +813,20 @@ func (cached *OrderedAggregate) CachedSize(alloc bool) int64 { } return size } +func (cached *PercentMirrorTarget) CachedSize(alloc bool) int64 { + if cached == nil { + return int64(0) + } + size := int64(0) + if alloc { + size += int64(24) + } + // field Primitive vitess.io/vitess/go/vt/vtgate/engine.Primitive + if cc, ok := cached.Primitive.(cachedObject); ok { + size += cc.CachedSize(true) + } + return size +} func (cached *Plan) CachedSize(alloc bool) int64 { if cached == nil { return int64(0) @@ -818,11 +850,11 @@ func (cached *Plan) CachedSize(alloc bool) int64 { size += elem.CachedSize(true) } } - // field TablesUsed []string + // field TablesUsed vitess.io/vitess/go/vt/sqlparser.TableNames { - size += hack.RuntimeAllocSize(int64(cap(cached.TablesUsed)) * int64(16)) + size += hack.RuntimeAllocSize(int64(cap(cached.TablesUsed)) * int64(32)) for _, elem := range cached.TablesUsed { - size += hack.RuntimeAllocSize(int64(len(elem))) + size += elem.CachedSize(false) } } return size diff --git a/go/vt/vtgate/engine/fake_vcursor_test.go b/go/vt/vtgate/engine/fake_vcursor_test.go index 1ba0abfa2ef..1a4d76bdc03 100644 --- a/go/vt/vtgate/engine/fake_vcursor_test.go +++ b/go/vt/vtgate/engine/fake_vcursor_test.go @@ -112,6 +112,10 @@ func (t *noopVCursor) CloneForReplicaWarming(ctx context.Context) VCursor { panic("implement me") } +func (t *noopVCursor) CloneForMirroring(ctx context.Context) VCursor { + panic("implement me") +} + func (t *noopVCursor) SetExec(ctx context.Context, name string, value string) error { panic("implement me") } @@ -421,6 +425,10 @@ type loggingVCursor struct { shardSession []*srvtopo.ResolvedShard parser *sqlparser.Parser + + handleMirrorClonesFn func(context.Context) VCursor + onExecuteMultiShardFn func(context.Context, Primitive, []*srvtopo.ResolvedShard, []*querypb.BoundQuery, bool, bool) + onStreamExecuteMultiFn func(context.Context, Primitive, string, []*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, bool, bool, func(*sqltypes.Result) error) } func (f *loggingVCursor) HasCreatedTempTable() { @@ -536,6 +544,13 @@ func (f *loggingVCursor) CloneForReplicaWarming(ctx context.Context) VCursor { return f } +func (f *loggingVCursor) CloneForMirroring(ctx context.Context) VCursor { + if f.handleMirrorClonesFn != nil { + return f.handleMirrorClonesFn(ctx) + } + panic("no mirror clones available") +} + func (f *loggingVCursor) Execute(ctx context.Context, method string, query string, bindvars map[string]*querypb.BindVariable, rollbackOnError bool, co vtgatepb.CommitOrder) (*sqltypes.Result, error) { name := "Unknown" switch co { @@ -553,7 +568,12 @@ func (f *loggingVCursor) Execute(ctx context.Context, method string, query strin } func (f *loggingVCursor) ExecuteMultiShard(ctx context.Context, primitive Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, rollbackOnError, canAutocommit bool) (*sqltypes.Result, []error) { + f.mu.Lock() + defer f.mu.Unlock() f.log = append(f.log, fmt.Sprintf("ExecuteMultiShard %v%v %v", printResolvedShardQueries(rss, queries), rollbackOnError, canAutocommit)) + if f.onExecuteMultiShardFn != nil { + f.onExecuteMultiShardFn(ctx, primitive, rss, queries, rollbackOnError, canAutocommit) + } res, err := f.nextResult() if err != nil { return nil, []error{err} @@ -574,6 +594,9 @@ func (f *loggingVCursor) ExecuteStandalone(ctx context.Context, primitive Primit func (f *loggingVCursor) StreamExecuteMulti(ctx context.Context, primitive Primitive, query string, rss []*srvtopo.ResolvedShard, bindVars []map[string]*querypb.BindVariable, rollbackOnError bool, autocommit bool, callback func(reply *sqltypes.Result) error) []error { f.mu.Lock() f.log = append(f.log, fmt.Sprintf("StreamExecuteMulti %s %s", query, printResolvedShardsBindVars(rss, bindVars))) + if f.onStreamExecuteMultiFn != nil { + f.onStreamExecuteMultiFn(ctx, primitive, query, rss, bindVars, rollbackOnError, autocommit, callback) + } r, err := f.nextResult() f.mu.Unlock() if err != nil { @@ -725,6 +748,8 @@ func (f *loggingVCursor) ResolveDestinationsMultiCol(ctx context.Context, keyspa func (f *loggingVCursor) ExpectLog(t *testing.T, want []string) { t.Helper() + f.mu.Lock() + defer f.mu.Unlock() if len(f.log) == 0 && len(want) == 0 { return } @@ -742,6 +767,8 @@ func (f *loggingVCursor) ExpectWarnings(t *testing.T, want []*querypb.QueryWarni } func (f *loggingVCursor) Rewind() { + f.mu.Lock() + defer f.mu.Unlock() f.curShardForKsid = 0 f.curResult = 0 f.log = nil diff --git a/go/vt/vtgate/engine/mirror.go b/go/vt/vtgate/engine/mirror.go new file mode 100644 index 00000000000..2cfc447c7a0 --- /dev/null +++ b/go/vt/vtgate/engine/mirror.go @@ -0,0 +1,219 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package engine + +import ( + "context" + "math/rand" + "time" + + "vitess.io/vitess/go/sqltypes" + querypb "vitess.io/vitess/go/vt/proto/query" +) + +type ( + // Mirror represents the instructions to execute an authoritative source, + // and compare the results of that execution to those of one or more + // non-authoritative mirroring targets. + Mirror struct { + Primitive Primitive + Target MirrorTarget + } + + // MirrorTarget contains the Primitive for mirroring a query to the + // non-authoritative target of the Mirror primitive. + MirrorTarget interface { + Primitive + Accept() bool + } + + // PercentMirrorTarget contains the Primitive to mirror to, an will + // Accept() an execution based if a random dice-roll is less than Percent. + PercentMirrorTarget struct { + Percent float32 + Primitive Primitive + } +) + +const ( + // maxMirrorTargetLag limits how long a mirror target may continue + // executing after the main primitive has finished. + maxMirrorTargetLag = 100 * time.Millisecond +) + +var ( + _ Primitive = (*Mirror)(nil) + _ Primitive = (MirrorTarget)(nil) + _ Primitive = (*PercentMirrorTarget)(nil) + _ MirrorTarget = (*PercentMirrorTarget)(nil) +) + +// NewMirror creates a Mirror. +func NewMirror(primitive Primitive, target MirrorTarget) *Mirror { + return &Mirror{primitive, target} +} + +// NewPercentMirrorTarget creates a percentage-based Mirror target. +func NewPercentMirrorTarget(percent float32, primitive Primitive) *PercentMirrorTarget { + return &PercentMirrorTarget{percent, primitive} +} + +func (m *Mirror) RouteType() string { + return "Mirror" +} + +func (m *Mirror) GetKeyspaceName() string { + return m.Primitive.GetKeyspaceName() +} + +func (m *Mirror) GetTableName() string { + return m.Primitive.GetTableName() +} + +func (m *Mirror) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { + return m.Primitive.GetFields(ctx, vcursor, bindVars) +} + +func (m *Mirror) NeedsTransaction() bool { + return m.Primitive.NeedsTransaction() +} + +func (m *Mirror) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { + var mirrorCh chan any + var mirrorCtxCancel func() + + if m.Target.Accept() { + mirrorCh = make(chan any) + + var mirrorCtx context.Context + mirrorCtx, mirrorCtxCancel = context.WithCancel(ctx) + defer mirrorCtxCancel() + + go func(target Primitive, vcursor VCursor) { + defer close(mirrorCh) + _, _ = target.TryExecute(mirrorCtx, vcursor, bindVars, wantfields) + }(m.Target, vcursor.CloneForMirroring(mirrorCtx)) + } + + r, err := m.Primitive.TryExecute(ctx, vcursor, bindVars, wantfields) + + if mirrorCh != nil { + select { + case <-mirrorCh: + case <-time.After(maxMirrorTargetLag): + mirrorCtxCancel() + } + } + + return r, err +} + +func (m *Mirror) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { + var mirrorCn chan any + var mirrorCtxCancel func() + + if m.Target.Accept() { + mirrorCn = make(chan any) + + var mirrorCtx context.Context + mirrorCtx, mirrorCtxCancel = context.WithCancel(ctx) + defer mirrorCtxCancel() + + go func(target Primitive, vcursor VCursor) { + defer close(mirrorCn) + _ = target.TryStreamExecute(mirrorCtx, vcursor, bindVars, wantfields, func(_ *sqltypes.Result) error { + return nil + }) + }(m.Target, vcursor.CloneForMirroring(mirrorCtx)) + } + + err := m.Primitive.TryStreamExecute(ctx, vcursor, bindVars, wantfields, callback) + + if mirrorCn != nil { + select { + case <-mirrorCn: + case <-time.After(maxMirrorTargetLag): + mirrorCtxCancel() + } + } + + return err +} + +// Inputs is a slice containing the inputs to this Primitive. +// The returned map has additional information about the inputs, that is used in the description. +func (m *Mirror) Inputs() ([]Primitive, []map[string]any) { + return []Primitive{m.Primitive, m.Target}, nil +} + +// description is the description, sans the inputs, of this Primitive. +// to get the plan description with all children, use PrimitiveToPlanDescription() +func (m *Mirror) description() PrimitiveDescription { + return PrimitiveDescription{ + OperatorType: "Mirror", + } +} + +func (m *PercentMirrorTarget) RouteType() string { + return "PercentMirrorTarget" +} + +func (m *PercentMirrorTarget) GetKeyspaceName() string { + return m.Primitive.GetKeyspaceName() +} + +func (m *PercentMirrorTarget) GetTableName() string { + return m.Primitive.GetTableName() +} + +func (m *PercentMirrorTarget) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { + return m.Primitive.GetFields(ctx, vcursor, bindVars) +} + +func (m *PercentMirrorTarget) NeedsTransaction() bool { + return m.Primitive.NeedsTransaction() +} + +func (m *PercentMirrorTarget) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { + return m.Primitive.TryExecute(ctx, vcursor, bindVars, wantfields) +} + +func (m *PercentMirrorTarget) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { + return m.Primitive.TryStreamExecute(ctx, vcursor, bindVars, wantfields, callback) +} + +// Inputs is a slice containing the inputs to this Primitive. +// The returned map has additional information about the inputs, that is used in the description. +func (m *PercentMirrorTarget) Inputs() ([]Primitive, []map[string]any) { + return []Primitive{m.Primitive}, nil +} + +// description is the description, sans the inputs, of this Primitive. +// to get the plan description with all children, use PrimitiveToPlanDescription() +func (m *PercentMirrorTarget) description() PrimitiveDescription { + return PrimitiveDescription{ + OperatorType: "MirrorTarget", + Variant: "Percent", + Other: map[string]any{ + "Percent": m.Percent, + }, + } +} + +func (m *PercentMirrorTarget) Accept() bool { + return m.Percent >= (rand.Float32() * 100.0) +} diff --git a/go/vt/vtgate/engine/mirror_test.go b/go/vt/vtgate/engine/mirror_test.go new file mode 100644 index 00000000000..2777b6c35b2 --- /dev/null +++ b/go/vt/vtgate/engine/mirror_test.go @@ -0,0 +1,382 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package engine + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/sqltypes" + querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/srvtopo" + "vitess.io/vitess/go/vt/vtgate/evalengine" + "vitess.io/vitess/go/vt/vtgate/vindexes" +) + +func TestMirror(t *testing.T) { + vindex, _ := vindexes.CreateVindex("xxhash", "xxhash_vdx", nil) + + primitive := NewRoute( + Unsharded, + &vindexes.Keyspace{ + Name: "ks1", + }, + "select f.bar from foo f where f.id = 1", + "select 1 from foo f where f.id = 1 and 1 != 1", + ) + + mirrorPrimitive1 := NewRoute( + EqualUnique, + &vindexes.Keyspace{ + Name: "ks2", + Sharded: true, + }, + "select f.bar from foo f where f.id = 1", + "select 1 from foo f where f.id = 1 and 1 != 1", + ) + mirrorPrimitive1.Vindex = vindex.(vindexes.SingleColumn) + mirrorPrimitive1.Values = []evalengine.Expr{ + evalengine.NewLiteralInt(1), + } + + mirror := &Mirror{ + Primitive: primitive, + Target: &PercentMirrorTarget{ + Percent: 100, + Primitive: mirrorPrimitive1, + }, + } + + mirrorVC := &loggingVCursor{ + shards: []string{"-20", "20-"}, + ksShardMap: map[string][]string{ + "ks2": {"-20", "20-"}, + }, + results: []*sqltypes.Result{ + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "bar", + "varchar", + ), + "hello", + ), + }, + } + + vc := &loggingVCursor{ + shards: []string{"0"}, + ksShardMap: map[string][]string{ + "ks1": {"0"}, + }, + results: []*sqltypes.Result{ + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "bar", + "varchar", + ), + "hello", + ), + }, + handleMirrorClonesFn: func(ctx context.Context) VCursor { + return mirrorVC + }, + } + + t.Run("TryExecute success", func(t *testing.T) { + defer func() { + vc.Rewind() + mirrorVC.Rewind() + }() + + want := vc.results[0] + res, err := mirror.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) + require.Equal(t, want, res) + require.NoError(t, err) + + vc.ExpectLog(t, []string{ + "ResolveDestinations ks1 [] Destinations:DestinationAllShards()", + "ExecuteMultiShard ks1.0: select f.bar from foo f where f.id = 1 {} false false", + }) + mirrorVC.ExpectLog(t, []string{ + `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, + "ExecuteMultiShard ks2.-20: select f.bar from foo f where f.id = 1 {} false false", + }) + }) + + t.Run("TryExecute return primitive error", func(t *testing.T) { + results := vc.results + + defer func() { + vc.Rewind() + vc.results = results + vc.resultErr = nil + mirrorVC.Rewind() + }() + + vc.results = nil + vc.resultErr = fmt.Errorf("return me") + + ctx := context.Background() + res, err := mirror.TryExecute(ctx, vc, map[string]*querypb.BindVariable{}, true) + require.Nil(t, res) + require.Error(t, err) + require.Equal(t, vc.resultErr, err) + + vc.ExpectLog(t, []string{ + "ResolveDestinations ks1 [] Destinations:DestinationAllShards()", + "ExecuteMultiShard ks1.0: select f.bar from foo f where f.id = 1 {} false false", + }) + mirrorVC.ExpectLog(t, []string{ + `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, + "ExecuteMultiShard ks2.-20: select f.bar from foo f where f.id = 1 {} false false", + }) + }) + + t.Run("TryExecute ignore mirror target error", func(t *testing.T) { + results := mirrorVC.results + + defer func() { + vc.Rewind() + mirrorVC.Rewind() + mirrorVC.results = results + mirrorVC.resultErr = nil + }() + + mirrorVC.results = nil + mirrorVC.resultErr = fmt.Errorf("ignore me") + + want := vc.results[0] + res, err := mirror.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) + require.Equal(t, res, want) + require.NoError(t, err) + + vc.ExpectLog(t, []string{ + "ResolveDestinations ks1 [] Destinations:DestinationAllShards()", + "ExecuteMultiShard ks1.0: select f.bar from foo f where f.id = 1 {} false false", + }) + mirrorVC.ExpectLog(t, []string{ + `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, + "ExecuteMultiShard ks2.-20: select f.bar from foo f where f.id = 1 {} false false", + }) + }) + + t.Run("TryExecute slow mirror target", func(t *testing.T) { + defer func() { + vc.Rewind() + vc.onExecuteMultiShardFn = nil + mirrorVC.Rewind() + mirrorVC.onExecuteMultiShardFn = nil + }() + + primitiveLatency := maxMirrorTargetLag * 2 + vc.onExecuteMultiShardFn = func(ctx context.Context, _ Primitive, _ []*srvtopo.ResolvedShard, _ []*querypb.BoundQuery, _ bool, _ bool) { + time.Sleep(primitiveLatency) + select { + case <-ctx.Done(): + require.Fail(t, "primitive context done") + default: + } + } + + var wg sync.WaitGroup + defer wg.Wait() + wg.Add(1) + mirrorVC.onExecuteMultiShardFn = func(ctx context.Context, _ Primitive, _ []*srvtopo.ResolvedShard, _ []*querypb.BoundQuery, _ bool, _ bool) { + defer wg.Done() + time.Sleep(primitiveLatency + (2 * maxMirrorTargetLag)) + select { + case <-ctx.Done(): + default: + require.Fail(t, "mirror target context not done") + } + } + + want := vc.results[0] + res, err := mirror.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) + require.Equal(t, res, want) + require.NoError(t, err) + + vc.ExpectLog(t, []string{ + "ResolveDestinations ks1 [] Destinations:DestinationAllShards()", + "ExecuteMultiShard ks1.0: select f.bar from foo f where f.id = 1 {} false false", + }) + mirrorVC.ExpectLog(t, []string{ + `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, + "ExecuteMultiShard ks2.-20: select f.bar from foo f where f.id = 1 {} false false", + }) + }) + + t.Run("TryStreamExecute success", func(t *testing.T) { + defer func() { + vc.Rewind() + mirrorVC.Rewind() + }() + + want := vc.results[0] + err := mirror.TryStreamExecute( + context.Background(), + vc, + map[string]*querypb.BindVariable{}, + true, + func(result *sqltypes.Result) error { + require.Equal(t, want, result) + return nil + }, + ) + require.NoError(t, err) + + vc.ExpectLog(t, []string{ + "ResolveDestinations ks1 [] Destinations:DestinationAllShards()", + "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks1.0: {} ", + }) + mirrorVC.ExpectLog(t, []string{ + `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, + "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks2.-20: {} ", + }) + }) + + t.Run("TryStreamExecute return primitive error", func(t *testing.T) { + results := vc.results + + defer func() { + vc.Rewind() + vc.results = results + vc.resultErr = nil + mirrorVC.Rewind() + }() + + vc.results = nil + vc.resultErr = fmt.Errorf("return me") + + err := mirror.TryStreamExecute( + context.Background(), + vc, + map[string]*querypb.BindVariable{}, + true, + func(result *sqltypes.Result) error { + require.Nil(t, result) + return nil + }, + ) + require.Error(t, err) + require.Equal(t, vc.resultErr, err) + + vc.ExpectLog(t, []string{ + "ResolveDestinations ks1 [] Destinations:DestinationAllShards()", + "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks1.0: {} ", + }) + mirrorVC.ExpectLog(t, []string{ + `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, + "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks2.-20: {} ", + }) + }) + + t.Run("TryStreamExecute ignore mirror target error", func(t *testing.T) { + results := mirrorVC.results + + defer func() { + vc.Rewind() + mirrorVC.Rewind() + mirrorVC.results = results + mirrorVC.resultErr = nil + }() + + mirrorVC.results = nil + mirrorVC.resultErr = fmt.Errorf("ignore me") + + want := vc.results[0] + err := mirror.TryStreamExecute( + context.Background(), + vc, + map[string]*querypb.BindVariable{}, + true, + func(result *sqltypes.Result) error { + require.Equal(t, want, result) + return nil + }, + ) + require.NoError(t, err) + + vc.ExpectLog(t, []string{ + "ResolveDestinations ks1 [] Destinations:DestinationAllShards()", + "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks1.0: {} ", + }) + mirrorVC.ExpectLog(t, []string{ + `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, + "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks2.-20: {} ", + }) + }) + + t.Run("TryStreamExecute slow mirror target", func(t *testing.T) { + defer func() { + vc.Rewind() + vc.onStreamExecuteMultiFn = nil + mirrorVC.Rewind() + mirrorVC.onStreamExecuteMultiFn = nil + }() + + primitiveLatency := maxMirrorTargetLag * 2 + vc.onStreamExecuteMultiFn = func(ctx context.Context, _ Primitive, _ string, _ []*srvtopo.ResolvedShard, _ []map[string]*querypb.BindVariable, _ bool, _ bool, _ func(*sqltypes.Result) error) { + time.Sleep(primitiveLatency) + select { + case <-ctx.Done(): + require.Fail(t, "primitive context done") + default: + } + } + + var wg sync.WaitGroup + defer wg.Wait() + wg.Add(1) + mirrorVC.onStreamExecuteMultiFn = func(ctx context.Context, _ Primitive, _ string, _ []*srvtopo.ResolvedShard, _ []map[string]*querypb.BindVariable, _ bool, _ bool, _ func(*sqltypes.Result) error) { + defer wg.Done() + time.Sleep(primitiveLatency + (2 * maxMirrorTargetLag)) + select { + case <-ctx.Done(): + default: + require.Fail(t, "mirror target context not done") + } + } + + want := vc.results[0] + err := mirror.TryStreamExecute( + context.Background(), + vc, + map[string]*querypb.BindVariable{}, + true, + func(result *sqltypes.Result) error { + require.Equal(t, want, result) + return nil + }, + ) + require.NoError(t, err) + + vc.ExpectLog(t, []string{ + "ResolveDestinations ks1 [] Destinations:DestinationAllShards()", + "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks1.0: {} ", + }) + mirrorVC.ExpectLog(t, []string{ + `ResolveDestinations ks2 [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(d46405367612b4b7)`, + "StreamExecuteMulti select f.bar from foo f where f.id = 1 ks2.-20: {} ", + }) + }) +} diff --git a/go/vt/vtgate/engine/plan.go b/go/vt/vtgate/engine/plan.go index 769c69aaa06..8de1ed808d5 100644 --- a/go/vt/vtgate/engine/plan.go +++ b/go/vt/vtgate/engine/plan.go @@ -19,6 +19,7 @@ package engine import ( "bytes" "encoding/json" + "fmt" "sync/atomic" "time" @@ -37,7 +38,7 @@ type Plan struct { Instructions Primitive // Instructions contains the instructions needed to fulfil the query. BindVarNeeds *sqlparser.BindVarNeeds // Stores BindVars needed to be provided as part of expression rewriting Warnings []*query.QueryWarning // Warnings that need to be yielded every time this query runs - TablesUsed []string // TablesUsed is the list of tables that this plan will query + TablesUsed sqlparser.TableNames // TablesUsed is the list of tables that this plan will query ExecCount uint64 // Count of times this plan was executed ExecTime uint64 // Total execution time @@ -76,6 +77,15 @@ func (p *Plan) MarshalJSON() ([]byte, error) { instructions = &description } + tablesUsed := make([]string, len(p.TablesUsed)) + for i, table := range p.TablesUsed { + if table.Qualifier.NotEmpty() { + tablesUsed[i] = fmt.Sprintf("%s.%s", table.Qualifier.String(), table.Name.String()) + } else { + tablesUsed[i] = table.Name.String() + } + } + marshalPlan := struct { QueryType string Original string `json:",omitempty"` @@ -97,7 +107,7 @@ func (p *Plan) MarshalJSON() ([]byte, error) { RowsAffected: atomic.LoadUint64(&p.RowsAffected), RowsReturned: atomic.LoadUint64(&p.RowsReturned), Errors: atomic.LoadUint64(&p.Errors), - TablesUsed: p.TablesUsed, + TablesUsed: tablesUsed, } b := new(bytes.Buffer) diff --git a/go/vt/vtgate/engine/primitive.go b/go/vt/vtgate/engine/primitive.go index 1c0e7de7a19..99b14e4a5af 100644 --- a/go/vt/vtgate/engine/primitive.go +++ b/go/vt/vtgate/engine/primitive.go @@ -131,6 +131,9 @@ type ( // CloneForReplicaWarming clones the VCursor for re-use in warming queries to replicas CloneForReplicaWarming(ctx context.Context) VCursor + + // CloneForMirroring clones the VCursor for re-use in mirroring queries to other keyspaces + CloneForMirroring(ctx context.Context) VCursor } // SessionActions gives primitives ability to interact with the session state diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index bd24907af9b..d88cb9e8e44 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -4335,3 +4335,60 @@ func TestStreamJoinQuery(t *testing.T) { utils.MustMatch(t, wantResult.Rows[idx], result.Rows[idx], "mismatched on: ", strconv.Itoa(idx)) } } + +func BenchmarkSelectMirror(b *testing.B) { + ctx := context.Background() + cell := "aa" + sql := fmt.Sprintf("select id from %s.user where id = 1", KsTestUnsharded) + + currentSandboxMirrorRules := sandboxMirrorRules + b.Cleanup(func() { + setSandboxMirrorRules(currentSandboxMirrorRules) + }) + + // Don't use createExecutorEnv. Doesn't work with benchmarks because of + // utils.EnsureNoLeak. + createBenchmarkExecutor := func(b *testing.B) (context.Context, *Executor) { + ctx, cancel := context.WithCancel(ctx) + b.Cleanup(cancel) + hc := discovery.NewFakeHealthCheck(nil) + u := createSandbox(KsTestUnsharded) + s := createSandbox(KsTestSharded) + s.VSchema = executorVSchema + u.VSchema = unshardedVSchema + serv := newSandboxForCells(ctx, []string{cell}) + resolver := newTestResolver(ctx, hc, serv, cell) + shards := []string{"-20", "20-40", "40-60", "60-80", "80-a0", "a0-c0", "c0-e0", "e0-"} + for _, shard := range shards { + hc.AddTestTablet(cell, shard, 1, KsTestSharded, shard, topodatapb.TabletType_PRIMARY, true, 1, nil) + } + hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil) + return ctx, createExecutor(ctx, serv, cell, resolver) + } + + for _, percent := range []float32{0, 1, 5, 10, 25, 50, 100} { + b.Run(fmt.Sprintf("mirror %.2f%%", percent), func(b *testing.B) { + setSandboxMirrorRules(fmt.Sprintf(`{ + "rules": [ + { + "from_table": "%s.user", + "to_table": "%s.user", + "percent": %.2f + } + ] + }`, KsTestUnsharded, KsTestSharded, percent)) + + ctx, executor := createBenchmarkExecutor(b) + session := &vtgatepb.Session{ + TargetString: "@primary", + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + executorExec(ctx, executor, session, sql, nil) + } + b.StopTimer() + }) + } +} diff --git a/go/vt/vtgate/logstats/logstats.go b/go/vt/vtgate/logstats/logstats.go index 8f8ba41e3cd..dd32d1aa5a6 100644 --- a/go/vt/vtgate/logstats/logstats.go +++ b/go/vt/vtgate/logstats/logstats.go @@ -28,6 +28,8 @@ import ( "vitess.io/vitess/go/streamlog" "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/callinfo" + "vitess.io/vitess/go/vt/sqlparser" + querypb "vitess.io/vitess/go/vt/proto/query" ) @@ -48,7 +50,7 @@ type LogStats struct { ExecuteTime time.Duration CommitTime time.Duration Error error - TablesUsed []string + TablesUsed []sqlparser.TableName SessionUUID string CachedPlan bool ActiveKeyspace string // ActiveKeyspace is the selected keyspace `use ks` @@ -174,9 +176,12 @@ func (stats *LogStats) Logf(w io.Writer, params url.Values) error { log.Key("Cached Plan") log.Bool(stats.CachedPlan) log.Key("TablesUsed") - log.Strings(stats.TablesUsed) + tablesUsed := make([]string, len(stats.TablesUsed)) + for i, table := range stats.TablesUsed { + tablesUsed[i] = sqlparser.String(table) + } + log.Strings(tablesUsed) log.Key("ActiveKeyspace") log.String(stats.ActiveKeyspace) - return log.Flush(w) } diff --git a/go/vt/vtgate/logstats/logstats_test.go b/go/vt/vtgate/logstats/logstats_test.go index ae3c01e0f0b..c66aecdf01b 100644 --- a/go/vt/vtgate/logstats/logstats_test.go +++ b/go/vt/vtgate/logstats/logstats_test.go @@ -40,6 +40,7 @@ import ( "vitess.io/vitess/go/vt/callinfo" "vitess.io/vitess/go/vt/callinfo/fakecallinfo" querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/sqlparser" ) func TestMain(m *testing.M) { @@ -62,7 +63,10 @@ func TestLogStatsFormat(t *testing.T) { logStats := NewLogStats(context.Background(), "test", "sql1", "suuid", nil) logStats.StartTime = time.Date(2017, time.January, 1, 1, 2, 3, 0, time.UTC) logStats.EndTime = time.Date(2017, time.January, 1, 1, 2, 4, 1234, time.UTC) - logStats.TablesUsed = []string{"ks1.tbl1", "ks2.tbl2"} + logStats.TablesUsed = []sqlparser.TableName{ + sqlparser.NewTableNameWithQualifier("tbl1", "ks1"), + sqlparser.NewTableNameWithQualifier("tbl2", "ks2"), + } logStats.TabletType = "PRIMARY" logStats.ActiveKeyspace = "db" params := map[string][]string{"full": {}} diff --git a/go/vt/vtgate/planbuilder/builder.go b/go/vt/vtgate/planbuilder/builder.go index 5d1d4ecd622..ee4907be922 100644 --- a/go/vt/vtgate/planbuilder/builder.go +++ b/go/vt/vtgate/planbuilder/builder.go @@ -29,6 +29,7 @@ import ( "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/engine" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" "vitess.io/vitess/go/vt/vtgate/vindexes" ) @@ -49,13 +50,13 @@ var ( type ( planResult struct { primitive engine.Primitive - tables []string + tables sqlparser.TableNames } stmtPlanner func(sqlparser.Statement, *sqlparser.ReservedVars, plancontext.VSchema) (*planResult, error) ) -func newPlanResult(prim engine.Primitive, tablesUsed ...string) *planResult { +func newPlanResult(prim engine.Primitive, tablesUsed ...sqlparser.TableName) *planResult { return &planResult{primitive: prim, tables: tablesUsed} } @@ -100,7 +101,7 @@ func BuildFromStmt(ctx context.Context, query string, stmt sqlparser.Statement, } var primitive engine.Primitive - var tablesUsed []string + var tablesUsed []sqlparser.TableName if planResult != nil { primitive = planResult.primitive tablesUsed = planResult.tables @@ -151,9 +152,83 @@ func buildRoutePlan(stmt sqlparser.Statement, reservedVars *sqlparser.ReservedVa return f(stmt, reservedVars, vschema) } +func buildRoutePlanWithMirroring(stmt sqlparser.Statement, reservedVars *sqlparser.ReservedVars, vschema plancontext.VSchema, f func(statement sqlparser.Statement, reservedVars *sqlparser.ReservedVars, schema plancontext.VSchema) (*planResult, error)) (*planResult, error) { + // Building a plan changes the statement, so clone it before we create the + // original plan. + // + // TODO(maxeng): find a way to avoid this work unless all tables have a + // mirror rule. At present, we don't know which tables to check until we + // plan the original statement. + mirrorStmt := sqlparser.CloneStatement(stmt) + + plan, err := buildRoutePlan(stmt, reservedVars, vschema, f) + if err != nil { + return nil, err + } + + // Avoid mirroring work unless all tables have a mirror rule. + mirrorRules := make(map[sqlparser.TableName]*vindexes.MirrorRule) + for _, table := range plan.tables { + mirrorRule, _, _, _, err := vschema.FindMirrorRule(table) + if err != nil || mirrorRule == nil { + break + } + // Forbid self-mirroring. + if mirrorRule.Table.Keyspace.Name == table.Qualifier.String() { + break + } + mirrorRules[table] = mirrorRule + } + if len(mirrorRules) == 0 || len(mirrorRules) != len(plan.tables) { + return plan, nil + } + + // Set up a vschema for mirroring. + mirrorVSchema := plancontext.ForMirroring(vschema) + + // Use the smallest mirror percent. + var i int + var percent float32 + for _, mirrorRule := range mirrorRules { + if i == 0 || mirrorRule.Percent < percent { + percent = mirrorRule.Percent + } + i++ + } + + // Create plan with cloned statement and mirrored vschema. + target, err := buildRoutePlan(mirrorStmt, reservedVars, mirrorVSchema, f) + if err != nil { + // We don't want to return the error here. Mirroring is meant to be + // best effort, and should not stand in the way of production work. + // + // TODO(maxeng): log an error or increase a metric. + return plan, nil + } + + // Build a new planResult from the original plan and mirror target plan. + tables := make(sqlparser.TableNames, len(plan.tables)+len(target.tables)) + copy(tables, plan.tables) + copy(tables[len(plan.tables):], target.tables) + operators.SortTableNames(tables) + return &planResult{ + engine.NewMirror( + plan.primitive, + engine.NewPercentMirrorTarget(percent, target.primitive), + ), + tables, + }, nil +} + func createInstructionFor(ctx context.Context, query string, stmt sqlparser.Statement, reservedVars *sqlparser.ReservedVars, vschema plancontext.VSchema, enableOnlineDDL, enableDirectDDL bool) (*planResult, error) { switch stmt := stmt.(type) { - case *sqlparser.Select, *sqlparser.Insert, *sqlparser.Update, *sqlparser.Delete: + case *sqlparser.Select: + configuredPlanner, err := getConfiguredPlanner(vschema, stmt, query) + if err != nil { + return nil, err + } + return buildRoutePlanWithMirroring(stmt, reservedVars, vschema, configuredPlanner) + case *sqlparser.Insert, *sqlparser.Update, *sqlparser.Delete: configuredPlanner, err := getConfiguredPlanner(vschema, stmt, query) if err != nil { return nil, err @@ -164,7 +239,7 @@ func createInstructionFor(ctx context.Context, query string, stmt sqlparser.Stat if err != nil { return nil, err } - return buildRoutePlan(stmt, reservedVars, vschema, configuredPlanner) + return buildRoutePlanWithMirroring(stmt, reservedVars, vschema, configuredPlanner) case sqlparser.DDLStatement: return buildGeneralDDLPlan(ctx, query, stmt, reservedVars, vschema, enableOnlineDDL, enableDirectDDL) case *sqlparser.AlterMigration: @@ -265,7 +340,7 @@ func buildAnalyzePlan(stmt sqlparser.Statement, _ *sqlparser.ReservedVars, vsche TargetDestination: dest, Query: sqlparser.String(analyzeStmt), } - return newPlanResult(prim, sqlparser.String(analyzeStmt.Table)), nil + return newPlanResult(prim, analyzeStmt.Table), nil } func buildDBDDLPlan(stmt sqlparser.Statement, _ *sqlparser.ReservedVars, vschema plancontext.VSchema) (*planResult, error) { @@ -337,7 +412,7 @@ func buildVSchemaDDLPlan(stmt *sqlparser.AlterVschema, vschema plancontext.VSche return newPlanResult(&engine.AlterVSchema{ Keyspace: keyspace, AlterVschemaDDL: stmt, - }, singleTable(keyspace.Name, stmt.Table.Name.String())), nil + }, sqlparser.NewTableNameWithQualifier(stmt.Table.Name.String(), keyspace.Name)), nil } func buildFlushPlan(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*planResult, error) { @@ -397,7 +472,7 @@ func buildFlushTables(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*plan if tbl == nil { return nil, vindexes.NotFoundError{TableName: tab.Name.String()} } - tc.addTable(tbl.Keyspace.Name, tbl.Name.String()) + tc.addTable(sqlparser.NewTableNameWithQualifier(tbl.Name.String(), tbl.Keyspace.Name)) ksTab = tbl.Keyspace stmt.TableNames[i] = sqlparser.TableName{ Name: tbl.Name, @@ -441,28 +516,23 @@ func buildFlushTables(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*plan } type tableCollector struct { - tables map[string]any + tables map[sqlparser.TableName]any } -func (tc *tableCollector) addTable(ks, tbl string) { +func (tc *tableCollector) addTable(t sqlparser.TableName) { if tc.tables == nil { - tc.tables = map[string]any{} + tc.tables = map[sqlparser.TableName]any{} } - tc.tables[fmt.Sprintf("%s.%s", ks, tbl)] = nil + tc.tables[t] = nil } -func (tc *tableCollector) addASTTable(ks string, tbl sqlparser.TableName) { - tc.addTable(ks, tbl.Name.String()) -} - -func (tc *tableCollector) getTables() []string { - tableNames := make([]string, 0, len(tc.tables)) - for tbl := range tc.tables { - tableNames = append(tableNames, tbl) +func (tc *tableCollector) getTables() []sqlparser.TableName { + tables := make([]sqlparser.TableName, 0, len(tc.tables)) + for t := range tc.tables { + tables = append(tables, t) } - - sort.Strings(tableNames) - return tableNames + operators.SortTableNames(tables) + return tables } func newFlushStmt(stmt *sqlparser.Flush, tables sqlparser.TableNames) *sqlparser.Flush { diff --git a/go/vt/vtgate/planbuilder/ddl.go b/go/vt/vtgate/planbuilder/ddl.go index 4c4b3791c20..eddab9b3a4b 100644 --- a/go/vt/vtgate/planbuilder/ddl.go +++ b/go/vt/vtgate/planbuilder/ddl.go @@ -74,7 +74,7 @@ func buildGeneralDDLPlan(ctx context.Context, sql string, ddlStatement sqlparser } tc := &tableCollector{} for _, tbl := range ddlStatement.AffectedTables() { - tc.addASTTable(normalDDLPlan.Keyspace.Name, tbl) + tc.addTable(sqlparser.NewTableNameWithQualifier(tbl.Name.String(), normalDDLPlan.Keyspace.Name)) } return newPlanResult(eddl, tc.getTables()...), nil diff --git a/go/vt/vtgate/planbuilder/operators/delete.go b/go/vt/vtgate/planbuilder/operators/delete.go index 5bbf5218bd7..797ca3c7977 100644 --- a/go/vt/vtgate/planbuilder/operators/delete.go +++ b/go/vt/vtgate/planbuilder/operators/delete.go @@ -33,6 +33,11 @@ type Delete struct { noPredicates } +var ( + _ Operator = (*Delete)(nil) + _ TableUser = (*Delete)(nil) +) + // Clone implements the Operator interface func (d *Delete) Clone(inputs []Operator) Operator { newD := *d @@ -55,8 +60,8 @@ func (d *Delete) GetOrdering(*plancontext.PlanningContext) []OrderBy { return nil } -func (d *Delete) TablesUsed() []string { - return SingleQualifiedIdentifier(d.Target.VTable.Keyspace, d.Target.VTable.Name) +func (d *Delete) TablesUsed() []sqlparser.TableName { + return SingleTableName(d.Target.VTable.Keyspace, d.Target.VTable.Name) } func (d *Delete) ShortDescription() string { diff --git a/go/vt/vtgate/planbuilder/operators/helpers.go b/go/vt/vtgate/planbuilder/operators/helpers.go index 31d9bcfd279..3f23ca1b43c 100644 --- a/go/vt/vtgate/planbuilder/operators/helpers.go +++ b/go/vt/vtgate/planbuilder/operators/helpers.go @@ -17,8 +17,8 @@ limitations under the License. package operators import ( - "fmt" - "sort" + "slices" + "strings" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" @@ -82,15 +82,15 @@ func TableID(op Operator) (result semantics.TableSet) { // TableUser is used to signal that this operator directly interacts with one or more tables type TableUser interface { - TablesUsed() []string + TablesUsed() []sqlparser.TableName } -func TablesUsed(op Operator) []string { - addString, collect := collectSortedUniqueStrings() +func TablesUsed(op Operator) []sqlparser.TableName { + addTableName, collect := collectSortedUniqueTableNames() _ = Visit(op, func(this Operator) error { if tbl, ok := this.(TableUser); ok { for _, u := range tbl.TablesUsed() { - addString(u) + addTableName(u) } } return nil @@ -116,53 +116,59 @@ func CostOf(op Operator) (cost int) { return } -func QualifiedIdentifier(ks *vindexes.Keyspace, i sqlparser.IdentifierCS) string { - return QualifiedString(ks, i.String()) -} - -func QualifiedString(ks *vindexes.Keyspace, s string) string { - return fmt.Sprintf("%s.%s", ks.Name, s) -} - -func QualifiedTableName(ks *vindexes.Keyspace, t sqlparser.TableName) string { - return QualifiedIdentifier(ks, t.Name) -} - -func QualifiedTableNames(ks *vindexes.Keyspace, ts []sqlparser.TableName) []string { - add, collect := collectSortedUniqueStrings() +func QualifiedTableNames(ks *vindexes.Keyspace, ts []sqlparser.TableName) []sqlparser.TableName { + add, collect := collectSortedUniqueTableNames() for _, t := range ts { - add(QualifiedTableName(ks, t)) + add(sqlparser.NewTableNameWithQualifier(t.Name.String(), ks.Name)) } return collect() } -func QualifiedTables(ks *vindexes.Keyspace, vts []*vindexes.Table) []string { - add, collect := collectSortedUniqueStrings() +func QualifiedTables(ks *vindexes.Keyspace, vts []*vindexes.Table) []sqlparser.TableName { + add, collect := collectSortedUniqueTableNames() for _, vt := range vts { - add(QualifiedIdentifier(ks, vt.Name)) + add(sqlparser.NewTableNameWithQualifier(vt.Name.String(), ks.Name)) } return collect() } -func SingleQualifiedIdentifier(ks *vindexes.Keyspace, i sqlparser.IdentifierCS) []string { - return SingleQualifiedString(ks, i.String()) +func SingleTableName(ks *vindexes.Keyspace, i sqlparser.IdentifierCS) []sqlparser.TableName { + return []sqlparser.TableName{sqlparser.NewTableNameWithQualifier(i.String(), ks.Name)} } -func SingleQualifiedString(ks *vindexes.Keyspace, s string) []string { - return []string{QualifiedString(ks, s)} +func SortTableNames(ts []sqlparser.TableName) { + slices.SortFunc[[]sqlparser.TableName, sqlparser.TableName](ts, func(a, b sqlparser.TableName) int { + if a.Qualifier.NotEmpty() && b.Qualifier.NotEmpty() { + if cq := strings.Compare(a.Qualifier.String(), b.Qualifier.String()); cq != 0 { + return cq + } + return strings.Compare(a.Name.String(), b.Name.String()) + } + if a.Qualifier.NotEmpty() { + return strings.Compare(a.Qualifier.String(), b.Name.String()) + } + if b.Qualifier.NotEmpty() { + return strings.Compare(a.Name.String(), b.Qualifier.String()) + } + return strings.Compare(a.Name.String(), b.Name.String()) + }) } -func collectSortedUniqueStrings() (add func(string), collect func() []string) { - uniq := make(map[string]any) - add = func(v string) { +func collectSortedUniqueTableNames() (add func(sqlparser.TableName), collect func() []sqlparser.TableName) { + uniq := make(map[sqlparser.TableName]any) + + add = func(v sqlparser.TableName) { uniq[v] = nil } - collect = func() []string { - sorted := make([]string, 0, len(uniq)) + + collect = func() []sqlparser.TableName { + sorted := make([]sqlparser.TableName, 0, len(uniq)) for v := range uniq { sorted = append(sorted, v) } - sort.Strings(sorted) + + SortTableNames(sorted) + return sorted } diff --git a/go/vt/vtgate/planbuilder/operators/insert.go b/go/vt/vtgate/planbuilder/operators/insert.go index 6832dc363d5..1a129946812 100644 --- a/go/vt/vtgate/planbuilder/operators/insert.go +++ b/go/vt/vtgate/planbuilder/operators/insert.go @@ -82,7 +82,10 @@ func (i *Insert) GetOrdering(*plancontext.PlanningContext) []OrderBy { return nil } -var _ Operator = (*Insert)(nil) +var ( + _ Operator = (*Insert)(nil) + _ TableUser = (*Insert)(nil) +) func (i *Insert) Clone([]Operator) Operator { return &Insert{ @@ -96,8 +99,8 @@ func (i *Insert) Clone([]Operator) Operator { } } -func (i *Insert) TablesUsed() []string { - return SingleQualifiedIdentifier(i.VTable.Keyspace, i.VTable.Name) +func (i *Insert) TablesUsed() []sqlparser.TableName { + return SingleTableName(i.VTable.Keyspace, i.VTable.Name) } func (i *Insert) Statement() sqlparser.Statement { diff --git a/go/vt/vtgate/planbuilder/operators/route.go b/go/vt/vtgate/planbuilder/operators/route.go index feeb091a725..e215b52132a 100644 --- a/go/vt/vtgate/planbuilder/operators/route.go +++ b/go/vt/vtgate/planbuilder/operators/route.go @@ -762,11 +762,11 @@ func (r *Route) GetOrdering(ctx *plancontext.PlanningContext) []OrderBy { // TablesUsed returns tables used by MergedWith routes, which are not included // in Inputs() and thus not a part of the operator tree -func (r *Route) TablesUsed() []string { - addString, collect := collectSortedUniqueStrings() +func (r *Route) TablesUsed() []sqlparser.TableName { + addTable, collect := collectSortedUniqueTableNames() for _, mw := range r.MergedWith { for _, u := range TablesUsed(mw) { - addString(u) + addTable(u) } } return collect() diff --git a/go/vt/vtgate/planbuilder/operators/table.go b/go/vt/vtgate/planbuilder/operators/table.go index 3ecd4982ece..e1e4493df30 100644 --- a/go/vt/vtgate/planbuilder/operators/table.go +++ b/go/vt/vtgate/planbuilder/operators/table.go @@ -41,6 +41,11 @@ type ( } ) +var ( + _ Operator = (*Table)(nil) + _ TableUser = (*Table)(nil) +) + // Clone implements the Operator interface func (to *Table) Clone([]Operator) Operator { var columns []*sqlparser.ColName @@ -107,11 +112,11 @@ func (to *Table) AddCol(col *sqlparser.ColName) { to.Columns = append(to.Columns, col) } -func (to *Table) TablesUsed() []string { +func (to *Table) TablesUsed() []sqlparser.TableName { if sqlparser.SystemSchema(to.QTable.Table.Qualifier.String()) { return nil } - return SingleQualifiedIdentifier(to.VTable.Keyspace, to.VTable.Name) + return SingleTableName(to.VTable.Keyspace, to.VTable.Name) } func addColumn(ctx *plancontext.PlanningContext, op ColNameColumns, e sqlparser.Expr) int { diff --git a/go/vt/vtgate/planbuilder/operators/update.go b/go/vt/vtgate/planbuilder/operators/update.go index e843155246c..40cc8a68574 100644 --- a/go/vt/vtgate/planbuilder/operators/update.go +++ b/go/vt/vtgate/planbuilder/operators/update.go @@ -58,6 +58,11 @@ type ( } ) +var ( + _ Operator = (*Update)(nil) + _ TableUser = (*Update)(nil) +) + func (u *Update) Inputs() []Operator { if u.Source == nil { return nil @@ -89,8 +94,8 @@ func (u *Update) GetOrdering(*plancontext.PlanningContext) []OrderBy { return nil } -func (u *Update) TablesUsed() []string { - return SingleQualifiedIdentifier(u.Target.VTable.Keyspace, u.Target.VTable.Name) +func (u *Update) TablesUsed() []sqlparser.TableName { + return SingleTableName(u.Target.VTable.Keyspace, u.Target.VTable.Name) } func (u *Update) ShortDescription() string { diff --git a/go/vt/vtgate/planbuilder/operators/vindex.go b/go/vt/vtgate/planbuilder/operators/vindex.go index fd907fdad27..ce56554d568 100644 --- a/go/vt/vtgate/planbuilder/operators/vindex.go +++ b/go/vt/vtgate/planbuilder/operators/vindex.go @@ -50,6 +50,11 @@ type ( const wrongWhereCond = "WHERE clause for vindex function must be of the form id = or id in(,...)" +var ( + _ Operator = (*Vindex)(nil) + _ TableUser = (*Vindex)(nil) +) + // Introduces implements the Operator interface func (v *Vindex) introducesTableID() semantics.TableSet { return v.Solved @@ -164,8 +169,8 @@ func (v *Vindex) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.E // TablesUsed implements the Operator interface. // It is not keyspace-qualified. -func (v *Vindex) TablesUsed() []string { - return []string{v.Table.Table.Name.String()} +func (v *Vindex) TablesUsed() []sqlparser.TableName { + return []sqlparser.TableName{sqlparser.NewTableName(v.Table.Table.Name.String())} } func (v *Vindex) ShortDescription() string { diff --git a/go/vt/vtgate/planbuilder/plan_test.go b/go/vt/vtgate/planbuilder/plan_test.go index f49994d37b2..7c2f20a4d3a 100644 --- a/go/vt/vtgate/planbuilder/plan_test.go +++ b/go/vt/vtgate/planbuilder/plan_test.go @@ -591,6 +591,18 @@ func (s *planTestSuite) TestOtherPlanningFromFile() { s.testFile("other_admin_cases.json", vschema, false) } +func (s *planTestSuite) TestMirrorPlanning() { + vschema := &vschemawrapper.VSchemaWrapper{ + V: loadSchema(s.T(), "vschemas/mirror_schema.json", true), + TabletType_: topodatapb.TabletType_PRIMARY, + SysVarEnabled: true, + TestBuilder: TestBuilder, + Env: vtenv.NewTestEnv(), + } + + s.testFile("mirror_cases.json", vschema, false) +} + func loadSchema(t testing.TB, filename string, setCollation bool) *vindexes.VSchema { formal, err := vindexes.LoadFormal(locateFile(filename)) require.NoError(t, err) @@ -839,6 +851,35 @@ func BenchmarkSelectVsDML(b *testing.B) { }) } +func BenchmarkBaselineVsMirrored(b *testing.B) { + baseline := loadSchema(b, "vschemas/mirror_schema.json", true) + baseline.MirrorRules = map[string]*vindexes.MirrorRule{} + baselineVschema := &vschemawrapper.VSchemaWrapper{ + V: baseline, + SysVarEnabled: true, + Version: Gen4, + Env: vtenv.NewTestEnv(), + } + + mirroredSchema := loadSchema(b, "vschemas/mirror_schema.json", true) + mirroredVschema := &vschemawrapper.VSchemaWrapper{ + V: mirroredSchema, + SysVarEnabled: true, + Version: Gen4, + Env: vtenv.NewTestEnv(), + } + + cases := readJSONTests("mirror_cases.json") + + b.Run("Baseline", func(b *testing.B) { + benchmarkPlanner(b, Gen4, cases, baselineVschema) + }) + + b.Run("Mirrored", func(b *testing.B) { + benchmarkPlanner(b, Gen4, cases, mirroredVschema) + }) +} + func benchmarkPlanner(b *testing.B, version plancontext.PlannerVersion, testCases []planTest, vschema *vschemawrapper.VSchemaWrapper) { b.ReportAllocs() for n := 0; n < b.N; n++ { diff --git a/go/vt/vtgate/planbuilder/plancontext/mirror_vschema.go b/go/vt/vtgate/planbuilder/plancontext/mirror_vschema.go new file mode 100644 index 00000000000..e957c282fb8 --- /dev/null +++ b/go/vt/vtgate/planbuilder/plancontext/mirror_vschema.go @@ -0,0 +1,265 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plancontext + +import ( + "context" + + "vitess.io/vitess/go/mysql/collations" + "vitess.io/vitess/go/vt/key" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vtenv" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/engine" + "vitess.io/vitess/go/vt/vtgate/semantics" + "vitess.io/vitess/go/vt/vtgate/vindexes" +) + +// mirrorVSchema is a wrapper which returns mirrored versions of values +// return by the wrapped vschema. +// +// For example, if the wrapped VSchema defines any mirror rules from ks1 to +// ks2, calls to FindTable for which the wrapped VSchema returns tables in +// ks1 will return tables in ks2. +// +// The returned VSchema cannot be reflected back again by passing it to +// ForMirroring. This restriction allows the returned VSchema to be used in +// recursive planning calls without creating an infinite loop. +type mirrorVSchema struct { + vschema VSchema +} + +// FindTable finds and returns the table with the requested name. +// +// mirrorVSchema returns a mirrored version of the value returned by the underlying VSchema. +// If the underlying VSchema returns a table t1 in ks1, and there is exists a +// routing rule from ks1 to ks2, and ks2 has a table t1, then mirrorVSchema +// returns ks2.t1. +// +// If no table with the requested name can be found, or not mirror rule is +// defined on the keyspace of the found table, or no table is found in the +// target keyspace defined by the mirror rule, then nil is returned. +func (m *mirrorVSchema) FindTable(tablename sqlparser.TableName) (*vindexes.Table, string, topodatapb.TabletType, key.Destination, error) { + mirrorRule, destKeyspace, destTabletType, dest, err := m.vschema.FindMirrorRule(tablename) + if err != nil { + return nil, "", destTabletType, nil, err + } + return mirrorRule.Table, destKeyspace, destTabletType, dest, err +} + +func (m *mirrorVSchema) FindView(name sqlparser.TableName) sqlparser.SelectStatement { + // TODO(maxeng): we don't have all the information we need here to mirror + // views. May need to expose a new method on VSchema interface, such as + // ParseDestinationTarget + return nil +} + +// FindTableOrVindex returns a routed table or vindex. +// +// mirrorVSchema returns a mirrored version of the value returned by the +// underlying VSchema. If the underlying VSchema returns a table t1 in ks1, +// and there is exists a mirror rule from ks1.t1 to ks2.t1, then mirrorVSchema +// returns ks2.t1. Vindexes are not mirrored. +func (m *mirrorVSchema) FindTableOrVindex(tablename sqlparser.TableName) (*vindexes.Table, vindexes.Vindex, string, topodatapb.TabletType, key.Destination, error) { + fromTable, vindex, destKeyspace, destTabletType, dest, err := m.vschema.FindTableOrVindex(tablename) + if err != nil { + return nil, vindex, "", destTabletType, nil, err + } + mirrorRule, err := m.GetVSchema().FindMirrorRule(fromTable.Keyspace.Name, fromTable.Name.String(), destTabletType) + if err != nil { + return nil, vindex, "", destTabletType, nil, err + } + // If mirror rule not found, just use the table we initially found. + if mirrorRule == nil { + return fromTable, vindex, destKeyspace, destTabletType, dest, nil + } + return mirrorRule.Table, vindex, destKeyspace, destTabletType, dest, err +} + +func (m *mirrorVSchema) DefaultKeyspace() (*vindexes.Keyspace, error) { + return m.vschema.DefaultKeyspace() +} + +func (m *mirrorVSchema) TargetString() string { + return m.vschema.TargetString() +} + +func (m *mirrorVSchema) Destination() key.Destination { + return m.vschema.Destination() +} + +func (m *mirrorVSchema) TabletType() topodatapb.TabletType { + return m.vschema.TabletType() +} + +func (m *mirrorVSchema) TargetDestination(qualifier string) (key.Destination, *vindexes.Keyspace, topodatapb.TabletType, error) { + return m.vschema.TargetDestination(qualifier) +} + +func (m *mirrorVSchema) AnyKeyspace() (*vindexes.Keyspace, error) { + return m.vschema.AnyKeyspace() +} + +func (m *mirrorVSchema) FirstSortedKeyspace() (*vindexes.Keyspace, error) { + return m.vschema.FirstSortedKeyspace() +} + +func (m *mirrorVSchema) SysVarSetEnabled() bool { + return m.vschema.SysVarSetEnabled() +} + +func (m *mirrorVSchema) KeyspaceExists(keyspace string) bool { + return m.vschema.KeyspaceExists(keyspace) +} + +func (m *mirrorVSchema) AllKeyspace() ([]*vindexes.Keyspace, error) { + return m.vschema.AllKeyspace() +} + +func (m *mirrorVSchema) FindKeyspace(keyspace string) (*vindexes.Keyspace, error) { + return m.vschema.FindKeyspace(keyspace) +} + +func (m *mirrorVSchema) GetSemTable() *semantics.SemTable { + return m.vschema.GetSemTable() +} + +func (m *mirrorVSchema) Planner() PlannerVersion { + return m.vschema.Planner() +} + +func (m *mirrorVSchema) SetPlannerVersion(pv PlannerVersion) { + m.vschema.SetPlannerVersion(pv) +} + +func (m *mirrorVSchema) ConnCollation() collations.ID { + return m.vschema.ConnCollation() +} + +func (m *mirrorVSchema) Environment() *vtenv.Environment { + return m.vschema.Environment() +} + +// ErrorIfShardedF will return an error if the keyspace is sharded, +// and produce a warning if the vtgate if configured to do so +func (m *mirrorVSchema) ErrorIfShardedF(keyspace *vindexes.Keyspace, warn string, errFmt string, params ...any) error { + return m.vschema.ErrorIfShardedF(keyspace, warn, errFmt, params...) +} + +// WarnUnshardedOnly is used when a feature is only supported in unsharded mode. +// This will let the user know that they are using something +// that could become a problem if they move to a sharded keyspace +func (m *mirrorVSchema) WarnUnshardedOnly(format string, params ...any) { + m.vschema.WarnUnshardedOnly(format, params...) +} + +// PlannerWarning records warning created during planning. +func (m *mirrorVSchema) PlannerWarning(message string) { + m.vschema.PlannerWarning(message) +} + +// ForeignKeyMode returns the foreign_key flag value +func (m *mirrorVSchema) ForeignKeyMode(keyspace string) (vschemapb.Keyspace_ForeignKeyMode, error) { + return m.vschema.ForeignKeyMode(keyspace) +} + +// KeyspaceError returns any error in the keyspace vschema. +func (m *mirrorVSchema) KeyspaceError(keyspace string) error { + return m.vschema.KeyspaceError(keyspace) +} + +func (m *mirrorVSchema) GetForeignKeyChecksState() *bool { + return m.vschema.GetForeignKeyChecksState() +} + +// GetVSchema returns the latest cached vindexes.VSchema +func (m *mirrorVSchema) GetVSchema() *vindexes.VSchema { + return m.vschema.GetVSchema() +} + +func (m *mirrorVSchema) GetSrvVschema() *vschemapb.SrvVSchema { + return m.vschema.GetSrvVschema() +} + +// FindRoutedShard looks up shard routing rules for a shard +func (m *mirrorVSchema) FindRoutedShard(keyspace string, shard string) (string, error) { + return m.vschema.FindRoutedShard(keyspace, shard) +} + +// IsShardRoutingEnabled returns true if partial shard routing is enabled +func (m *mirrorVSchema) IsShardRoutingEnabled() bool { + return m.vschema.IsShardRoutingEnabled() +} + +// IsViewsEnabled returns true if Vitess manages the views. +func (m *mirrorVSchema) IsViewsEnabled() bool { + return m.vschema.IsViewsEnabled() +} + +// GetUDV returns user defined value from the variable passed. +func (m *mirrorVSchema) GetUDV(name string) *querypb.BindVariable { + return m.vschema.GetUDV(name) +} + +// PlanPrepareStatement plans the prepared statement. +func (m *mirrorVSchema) PlanPrepareStatement(ctx context.Context, query string) (*engine.Plan, sqlparser.Statement, error) { + return m.vschema.PlanPrepareStatement(ctx, query) +} + +// ClearPrepareData clears the prepared data from the session. +func (m *mirrorVSchema) ClearPrepareData(stmtName string) { + m.vschema.ClearPrepareData(stmtName) +} + +// GetPrepareData returns the prepared data for the statement from the session. +func (m *mirrorVSchema) GetPrepareData(stmtName string) *vtgatepb.PrepareData { + return m.vschema.GetPrepareData(stmtName) +} + +// StorePrepareData stores the prepared data in the session. +func (m *mirrorVSchema) StorePrepareData(name string, v *vtgatepb.PrepareData) { + m.vschema.StorePrepareData(name, v) +} + +// GetAggregateUDFs returns the list of aggregate UDFs. +func (m *mirrorVSchema) GetAggregateUDFs() []string { + return m.vschema.GetAggregateUDFs() +} + +// FindMirrorRule finds the mirror rule for the requested table name and +// VSchema tablet type. +func (m *mirrorVSchema) FindMirrorRule(name sqlparser.TableName) (*vindexes.MirrorRule, string, topodatapb.TabletType, key.Destination, error) { + return nil, "", topodatapb.TabletType_UNKNOWN, nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "[BUG] refusing to perform chained traffic mirroring") +} + +// ForMirroring returns a wrapper which returns mirrored versions of values +// return by the wrapped vschema. +// +// For example, if the underlying VSchema defines any mirror rules from ks1 to +// ks2, calls to FindTable for which the wrapped VSchema returns tables in +// ks1 will return tables in ks2. +// +// The returned VSchema cannot be reflected back again by passing it to +// ForMirroring. This restriction prevents infinite mirroring loops. +func ForMirroring(vschema VSchema) VSchema { + return &mirrorVSchema{vschema} +} diff --git a/go/vt/vtgate/planbuilder/plancontext/vschema.go b/go/vt/vtgate/planbuilder/plancontext/vschema.go index 8ac4c57bfd7..64712fd2b66 100644 --- a/go/vt/vtgate/planbuilder/plancontext/vschema.go +++ b/go/vt/vtgate/planbuilder/plancontext/vschema.go @@ -96,6 +96,10 @@ type VSchema interface { // GetAggregateUDFs returns the list of aggregate UDFs. GetAggregateUDFs() []string + + // FindMirrorRule finds the mirror rule for the requested keyspace, table + // name, and the tablet type in the VSchema. + FindMirrorRule(tablename sqlparser.TableName) (*vindexes.MirrorRule, string, topodatapb.TabletType, key.Destination, error) } // PlannerNameToVersion returns the numerical representation of the planner diff --git a/go/vt/vtgate/planbuilder/select.go b/go/vt/vtgate/planbuilder/select.go index 01dfd8aa387..9880f32e500 100644 --- a/go/vt/vtgate/planbuilder/select.go +++ b/go/vt/vtgate/planbuilder/select.go @@ -45,12 +45,12 @@ func gen4SelectStmtPlanner( return nil, err } if p != nil { - used := "dual" + used := sqlparser.NewTableName("dual") keyspace, ksErr := vschema.DefaultKeyspace() if ksErr == nil { // we are just getting the ks to log the correct table use. // no need to fail this if we can't find the default keyspace - used = keyspace.Name + ".dual" + used = sqlparser.NewTableNameWithQualifier("dual", keyspace.Name) } return newPlanResult(p, used), nil } @@ -62,7 +62,7 @@ func gen4SelectStmtPlanner( sel.SQLCalcFoundRows = false } - getPlan := func(selStatement sqlparser.SelectStatement) (engine.Primitive, []string, error) { + getPlan := func(selStatement sqlparser.SelectStatement) (engine.Primitive, []sqlparser.TableName, error) { return newBuildSelectPlan(selStatement, reservedVars, vschema, plannerVersion) } @@ -123,7 +123,7 @@ func buildSQLCalcFoundRowsPlan( sel *sqlparser.Select, reservedVars *sqlparser.ReservedVars, vschema plancontext.VSchema, -) (engine.Primitive, []string, error) { +) (engine.Primitive, []sqlparser.TableName, error) { limitPlan, _, err := newBuildSelectPlan(sel, reservedVars, vschema, Gen4) if err != nil { return nil, nil, err @@ -180,7 +180,7 @@ func buildSQLCalcFoundRowsPlan( }, tablesUsed, nil } -func gen4PredicateRewrite(stmt sqlparser.Statement, getPlan func(selStatement sqlparser.SelectStatement) (engine.Primitive, []string, error)) (engine.Primitive, []string) { +func gen4PredicateRewrite(stmt sqlparser.Statement, getPlan func(selStatement sqlparser.SelectStatement) (engine.Primitive, []sqlparser.TableName, error)) (engine.Primitive, []sqlparser.TableName) { rewritten, isSel := sqlparser.RewritePredicate(stmt).(sqlparser.SelectStatement) if !isSel { // Fail-safe code, should never happen @@ -199,7 +199,7 @@ func newBuildSelectPlan( reservedVars *sqlparser.ReservedVars, vschema plancontext.VSchema, version querypb.ExecuteOptions_PlannerVersion, -) (plan engine.Primitive, tablesUsed []string, err error) { +) (plan engine.Primitive, tablesUsed []sqlparser.TableName, err error) { ctx, err := plancontext.CreatePlanningContext(selStmt, reservedVars, vschema, version) if err != nil { return nil, nil, err diff --git a/go/vt/vtgate/planbuilder/single_sharded_shortcut.go b/go/vt/vtgate/planbuilder/single_sharded_shortcut.go index 5d877cd341d..ad6329f4f0a 100644 --- a/go/vt/vtgate/planbuilder/single_sharded_shortcut.go +++ b/go/vt/vtgate/planbuilder/single_sharded_shortcut.go @@ -28,7 +28,7 @@ import ( "vitess.io/vitess/go/vt/vtgate/vindexes" ) -func selectUnshardedShortcut(ctx *plancontext.PlanningContext, stmt sqlparser.SelectStatement, ks *vindexes.Keyspace) (engine.Primitive, []string, error) { +func selectUnshardedShortcut(ctx *plancontext.PlanningContext, stmt sqlparser.SelectStatement, ks *vindexes.Keyspace) (engine.Primitive, []sqlparser.TableName, error) { // this method is used when the query we are handling has all tables in the same unsharded keyspace sqlparser.SafeRewrite(stmt, nil, func(cursor *sqlparser.Cursor) bool { switch node := cursor.Node().(type) { diff --git a/go/vt/vtgate/planbuilder/testdata/mirror_cases.json b/go/vt/vtgate/planbuilder/testdata/mirror_cases.json new file mode 100644 index 00000000000..c8014721d5f --- /dev/null +++ b/go/vt/vtgate/planbuilder/testdata/mirror_cases.json @@ -0,0 +1,397 @@ +[ + { + "comment": "select unsharded, qualified, table mirrored to unsharded table", + "query": "select t1.id from unsharded_src1.t1 where t1.id = 1", + "plan": { + "QueryType": "SELECT", + "Original": "select t1.id from unsharded_src1.t1 where t1.id = 1", + "Instructions": { + "OperatorType": "Mirror", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_src1", + "Sharded": false + }, + "FieldQuery": "select t1.id from t1 where 1 != 1", + "Query": "select t1.id from t1 where t1.id = 1", + "Table": "t1" + }, + { + "OperatorType": "MirrorTarget", + "Variant": "Percent", + "Percent": 50, + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_dst1", + "Sharded": false + }, + "FieldQuery": "select t1.id from t1 where 1 != 1", + "Query": "select t1.id from t1 where t1.id = 1", + "Table": "t1" + } + ] + } + ] + }, + "TablesUsed": [ + "unsharded_dst1.t1", + "unsharded_src1.t1" + ] + } + }, + { + "comment": "select unsharded, qualified, table mirrored to sharded table", + "query": "select t1.id from unsharded_src1.t1 where t1.id = 1", + "plan": { + "QueryType": "SELECT", + "Original": "select t1.id from unsharded_src1.t1 where t1.id = 1", + "Instructions": { + "OperatorType": "Mirror", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_src1", + "Sharded": false + }, + "FieldQuery": "select t1.id from t1 where 1 != 1", + "Query": "select t1.id from t1 where t1.id = 1", + "Table": "t1" + }, + { + "OperatorType": "MirrorTarget", + "Variant": "Percent", + "Percent": 50, + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_dst1", + "Sharded": false + }, + "FieldQuery": "select t1.id from t1 where 1 != 1", + "Query": "select t1.id from t1 where t1.id = 1", + "Table": "t1" + } + ] + } + ] + }, + "TablesUsed": [ + "unsharded_dst1.t1", + "unsharded_src1.t1" + ] + } + }, + { + "comment": "select two unsharded, qualified, tables, one mirrored to unsharded table, other to sharded table", + "query": "select t1.id, t2.id from unsharded_src1.t1, unsharded_src1.t2 where t1.id = t2.id", + "plan": { + "QueryType": "SELECT", + "Original": "select t1.id, t2.id from unsharded_src1.t1, unsharded_src1.t2 where t1.id = t2.id", + "Instructions": { + "OperatorType": "Mirror", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_src1", + "Sharded": false + }, + "FieldQuery": "select t1.id, t2.id from t1, t2 where 1 != 1", + "Query": "select t1.id, t2.id from t1, t2 where t1.id = t2.id", + "Table": "t1, t2" + }, + { + "OperatorType": "MirrorTarget", + "Variant": "Percent", + "Percent": 50, + "Inputs": [ + { + "OperatorType": "Join", + "Variant": "Join", + "JoinColumnIndexes": "L:0,R:0", + "JoinVars": { + "t1_id": 0 + }, + "TableName": "t1_t1", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_dst1", + "Sharded": false + }, + "FieldQuery": "select t1.id from t1 where 1 != 1", + "Query": "select t1.id from t1", + "Table": "t1" + }, + { + "OperatorType": "Route", + "Variant": "EqualUnique", + "Keyspace": { + "Name": "sharded_dst1", + "Sharded": true + }, + "FieldQuery": "select t2.id from t1 as t2 where 1 != 1", + "Query": "select t2.id from t1 as t2 where t2.id = :t1_id", + "Table": "t1", + "Values": [ + ":t1_id" + ], + "Vindex": "xxhash" + } + ] + } + ] + } + ] + }, + "TablesUsed": [ + "sharded_dst1.t1", + "unsharded_dst1.t1", + "unsharded_src1.t1", + "unsharded_src1.t2" + ] + } + }, + { + "comment": "union of selects from unsharded, qualified, tables, one mirrored to unsharded table, other to sharded table", + "query": "select t1.id from unsharded_src1.t1 union select t2.id from unsharded_src1.t2", + "plan": { + "QueryType": "SELECT", + "Original": "select t1.id from unsharded_src1.t1 union select t2.id from unsharded_src1.t2", + "Instructions": { + "OperatorType": "Mirror", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_src1", + "Sharded": false + }, + "FieldQuery": "select t1.id from t1 where 1 != 1 union select t2.id from t2 where 1 != 1", + "Query": "select t1.id from t1 union select t2.id from t2", + "Table": "t1, t2" + }, + { + "OperatorType": "MirrorTarget", + "Variant": "Percent", + "Percent": 50, + "Inputs": [ + { + "OperatorType": "Distinct", + "Collations": [ + "(0:1)" + ], + "ResultColumns": 1, + "Inputs": [ + { + "OperatorType": "Concatenate", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_dst1", + "Sharded": false + }, + "FieldQuery": "select dt.c0 as id, weight_string(dt.c0) from (select t1.id from t1 where 1 != 1) as dt(c0) where 1 != 1", + "Query": "select dt.c0 as id, weight_string(dt.c0) from (select distinct t1.id from t1) as dt(c0)", + "Table": "t1" + }, + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "sharded_dst1", + "Sharded": true + }, + "FieldQuery": "select dt.c0 as id, weight_string(dt.c0) from (select t2.id from t1 as t2 where 1 != 1) as dt(c0) where 1 != 1", + "Query": "select dt.c0 as id, weight_string(dt.c0) from (select distinct t2.id from t1 as t2) as dt(c0)", + "Table": "t1" + } + ] + } + ] + } + ] + } + ] + }, + "TablesUsed": [ + "sharded_dst1.t1", + "unsharded_dst1.t1", + "unsharded_src1.t1", + "unsharded_src1.t2" + ] + } + }, + { + "comment": "inserts are not mirrored", + "query": "insert into unsharded_src1.t1 (id) values(1)", + "plan": { + "QueryType": "INSERT", + "Original": "insert into unsharded_src1.t1 (id) values(1)", + "Instructions": { + "OperatorType": "Insert", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_src1", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "insert into t1(id) values (1)", + "TableName": "t1" + }, + "TablesUsed": [ + "unsharded_src1.t1" + ] + } + }, + { + "comment": "updates are not mirrored", + "query": "update unsharded_src1.t1 set data = 'a' where id = 1", + "plan": { + "QueryType": "UPDATE", + "Original": "update unsharded_src1.t1 set data = 'a' where id = 1", + "Instructions": { + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_src1", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update t1 set `data` = 'a' where id = 1", + "Table": "t1" + }, + "TablesUsed": [ + "unsharded_src1.t1" + ] + } + }, + { + "comment": "deletes are not mirrored", + "query": "delete from unsharded_src1.t1 where id = 1", + "plan": { + "QueryType": "DELETE", + "Original": "delete from unsharded_src1.t1 where id = 1", + "Instructions": { + "OperatorType": "Delete", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_src1", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "delete from t1 where id = 1", + "Table": "t1" + }, + "TablesUsed": [ + "unsharded_src1.t1" + ] + } + }, + { + "comment": "self-mirror is not allowed", + "query": "select t1.id from unsharded_src1.t3", + "plan": { + "QueryType": "SELECT", + "Original": "select t1.id from unsharded_src1.t3", + "Instructions": { + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_src1", + "Sharded": false + }, + "FieldQuery": "select t1.id from t3 where 1 != 1", + "Query": "select t1.id from t3", + "Table": "t3" + }, + "TablesUsed": [ + "unsharded_src1.t3" + ] + } + }, + { + "comment": "chained mirror is not allowed", + "query": "select t2.id from unsharded_src2.t2", + "plan": { + "QueryType": "SELECT", + "Original": "select t2.id from unsharded_src2.t2", + "Instructions": { + "OperatorType": "Mirror", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_src2", + "Sharded": false + }, + "FieldQuery": "select t2.id from t2 where 1 != 1", + "Query": "select t2.id from t2", + "Table": "t2" + }, + { + "OperatorType": "MirrorTarget", + "Variant": "Percent", + "Percent": 50, + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_dst2", + "Sharded": false + }, + "FieldQuery": "select t2.id from t2 where 1 != 1", + "Query": "select t2.id from t2", + "Table": "t2" + } + ] + } + ] + }, + "TablesUsed": [ + "unsharded_dst2.t2", + "unsharded_src2.t2" + ] + } + }, + { + "comment": "circular mirror is not allowed", + "query": "select t1.id from unsharded_src3.t1", + "plan": { + "QueryType": "SELECT", + "Original": "select t1.id from unsharded_src3.t1", + "Instructions": { + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "unsharded_src3", + "Sharded": false + }, + "FieldQuery": "select t1.id from t1 where 1 != 1", + "Query": "select t1.id from t1", + "Table": "t1" + }, + "TablesUsed": [ + "unsharded_src3.t1" + ] + } + } +] diff --git a/go/vt/vtgate/planbuilder/testdata/vschemas/mirror_schema.json b/go/vt/vtgate/planbuilder/testdata/vschemas/mirror_schema.json new file mode 100644 index 00000000000..faa54a15f08 --- /dev/null +++ b/go/vt/vtgate/planbuilder/testdata/vschemas/mirror_schema.json @@ -0,0 +1,99 @@ +{ + "mirror_rules": { + "rules": [ + { + "from_table": "unsharded_src1.t1", + "to_table": "unsharded_dst1.t1", + "percent": 50 + }, + { + "from_table": "unsharded_src1.t2", + "to_table": "sharded_dst1.t1", + "percent": 50 + }, + { + "from_table": "unsharded_src2.t1", + "to_table": "unsharded_src2.t1", + "percent": 50 + }, + { + "from_table": "unsharded_src2.t2", + "to_table": "unsharded_dst2.t2", + "percent": 50 + }, + { + "from_table": "unsharded_dst2.t2", + "to_table": "unsharded_dst3.t2", + "percent": 50 + }, + { + "from_table": "unsharded_src3.t1", + "to_table": "unsharded_dst4.t1", + "percent": 50 + }, + { + "from_table": "unsharded_dst4.t2", + "to_table": "unsharded_src3.t2", + "percent": 50 + }, + { + "from_table": "sharded_src1.t1", + "to_table": "sharded_dst1.t1", + "percent": 50 + } + + ] + }, + "keyspaces": { + "main": { + "sharded": false, + "tables": {} + }, + "unsharded_src1": { + "sharded": false, + "tables": {} + }, + "unsharded_src2": { + "sharded": false, + "tables": {} + }, + "unsharded_src3": { + "sharded": false, + "tables": {} + }, + "unsharded_dst1": { + "sharded": false, + "tables": {} + }, + "unsharded_dst2": { + "sharded": false, + "tables": {} + }, + "unsharded_dst3": { + "sharded": false, + "tables": {} + }, + "unsharded_dst4": { + "sharded": false, + "tables": {} + }, + "sharded_dst1": { + "sharded": true, + "vindexes": { + "xxhash": { + "type": "xxhash" + } + }, + "tables": { + "t1": { + "columnVindexes": [ + { + "column": "id", + "name": "xxhash" + } + ] + } + } + } + } +} diff --git a/go/vt/vtgate/planbuilder/vexplain.go b/go/vt/vtgate/planbuilder/vexplain.go index 21a35f02967..cc5cf6a10db 100644 --- a/go/vt/vtgate/planbuilder/vexplain.go +++ b/go/vt/vtgate/planbuilder/vexplain.go @@ -27,7 +27,6 @@ import ( "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/engine" - "vitess.io/vitess/go/vt/vtgate/planbuilder/operators" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" "vitess.io/vitess/go/vt/vtgate/vindexes" ) @@ -79,7 +78,7 @@ func explainTabPlan(explain *sqlparser.ExplainTab, vschema plancontext.VSchema) TargetDestination: destination, Query: sqlparser.String(explain), SingleShardOnly: true, - }, singleTable(keyspace.Name, explain.Table.Name.String())), nil + }, sqlparser.NewTableNameWithQualifier(explain.Table.Name.String(), keyspace.Name)), nil } func buildVExplainVtgatePlan(ctx context.Context, explainStatement sqlparser.Statement, reservedVars *sqlparser.ReservedVars, vschema plancontext.VSchema, enableOnlineDDL, enableDirectDDL bool) (*planResult, error) { @@ -148,7 +147,7 @@ func explainPlan(explain *sqlparser.ExplainStmt, reservedVars *sqlparser.Reserve // Remove keyspace qualifier from columns and tables. sqlparser.RemoveKeyspace(explain.Statement) - var tables []string + var tables []sqlparser.TableName for _, table := range ctx.SemTable.Tables { name, err := table.Name() if err != nil { @@ -156,7 +155,7 @@ func explainPlan(explain *sqlparser.ExplainStmt, reservedVars *sqlparser.Reserve // it's OK to ignore errors here continue } - tables = append(tables, operators.QualifiedString(ks, name.Name.String())) + tables = append(tables, sqlparser.NewTableNameWithQualifier(name.Name.String(), ks.Name)) } return newPlanResult(&engine.Send{ diff --git a/go/vt/vtgate/sandbox_test.go b/go/vt/vtgate/sandbox_test.go index dc3c1f103af..e8ae096e062 100644 --- a/go/vt/vtgate/sandbox_test.go +++ b/go/vt/vtgate/sandbox_test.go @@ -48,6 +48,7 @@ const ( func init() { ksToSandbox = make(map[string]*sandbox) + sandboxMirrorRules = `{"rules":[]}` createSandbox(KsTestSharded) createSandbox(KsTestUnsharded) createSandbox(KsTestBadVSchema) @@ -57,6 +58,7 @@ func init() { var sandboxMu sync.Mutex var ksToSandbox map[string]*sandbox +var sandboxMirrorRules string func createSandbox(keyspace string) *sandbox { sandboxMu.Lock() @@ -86,9 +88,20 @@ func getSandboxSrvVSchema() *vschemapb.SrvVSchema { } result.Keyspaces[keyspace] = &vs } + var mrs vschemapb.MirrorRules + if err := json2.Unmarshal([]byte(sandboxMirrorRules), &mrs); err != nil { + panic(err) + } + result.MirrorRules = &mrs return result } +func setSandboxMirrorRules(mirrorRules string) { + sandboxMu.Lock() + defer sandboxMu.Unlock() + sandboxMirrorRules = mirrorRules +} + type sandbox struct { // Use sandmu to access the variables below sandmu sync.Mutex diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index 9372012f77d..53c3feeab63 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -250,7 +250,7 @@ func (vc *vcursorImpl) IsShardRoutingEnabled() bool { return enableShardRouting } -// FindTable finds the specified table. If the keyspace what specified in the input, it gets used as qualifier. +// FindTable finds the specified table. If the keyspace was specified in the input, it gets used as qualifier. // Otherwise, the keyspace from the request is used, if one was provided. func (vc *vcursorImpl) FindTable(name sqlparser.TableName) (*vindexes.Table, string, topodatapb.TabletType, key.Destination, error) { destKeyspace, destTabletType, dest, err := vc.executor.ParseDestinationTarget(name.Qualifier.String()) @@ -1079,6 +1079,23 @@ func (vc *vcursorImpl) GetAggregateUDFs() []string { return vc.vschema.GetAggregateUDFs() } +// FindMirrorRule finds the mirror rule for the requested table name and +// VSchema tablet type. +func (vc *vcursorImpl) FindMirrorRule(name sqlparser.TableName) (*vindexes.MirrorRule, string, topodatapb.TabletType, key.Destination, error) { + destKeyspace, destTabletType, dest, err := vc.executor.ParseDestinationTarget(name.Qualifier.String()) + if err != nil { + return nil, "", destTabletType, nil, err + } + if destKeyspace == "" { + destKeyspace = vc.keyspace + } + mirrorRule, err := vc.vschema.FindMirrorRule(destKeyspace, name.Name.String(), destTabletType) + if err != nil { + return nil, "", destTabletType, nil, err + } + return mirrorRule, destKeyspace, destTabletType, dest, err +} + // ParseDestinationTarget parses destination target string and sets default keyspace if possible. func parseDestinationTarget(targetString string, vschema *vindexes.VSchema) (string, topodatapb.TabletType, key.Destination, error) { destKeyspace, destTabletType, dest, err := topoprotopb.ParseDestination(targetString, defaultTabletType) @@ -1356,6 +1373,37 @@ func (vc *vcursorImpl) CloneForReplicaWarming(ctx context.Context) engine.VCurso return v } +func (vc *vcursorImpl) CloneForMirroring(ctx context.Context) engine.VCursor { + callerId := callerid.EffectiveCallerIDFromContext(ctx) + immediateCallerId := callerid.ImmediateCallerIDFromContext(ctx) + + clonedCtx := callerid.NewContext(ctx, callerId, immediateCallerId) + + v := &vcursorImpl{ + safeSession: NewAutocommitSession(vc.safeSession.Session), + keyspace: vc.keyspace, + tabletType: vc.tabletType, + destination: vc.destination, + marginComments: vc.marginComments, + executor: vc.executor, + resolver: vc.resolver, + topoServer: vc.topoServer, + logStats: &logstats.LogStats{Ctx: clonedCtx}, + collation: vc.collation, + ignoreMaxMemoryRows: vc.ignoreMaxMemoryRows, + vschema: vc.vschema, + vm: vc.vm, + semTable: vc.semTable, + warnShardedOnly: vc.warnShardedOnly, + warnings: vc.warnings, + pv: vc.pv, + } + + v.marginComments.Trailing += "/* mirror query */" + + return v +} + // UpdateForeignKeyChecksState updates the foreign key checks state of the vcursor. func (vc *vcursorImpl) UpdateForeignKeyChecksState(fkStateFromQuery *bool) { // Initialize the state to unspecified. diff --git a/go/vt/vtgate/vindexes/vschema.go b/go/vt/vtgate/vindexes/vschema.go index 924a28b309d..4b7ec271393 100644 --- a/go/vt/vtgate/vindexes/vschema.go +++ b/go/vt/vtgate/vindexes/vschema.go @@ -63,6 +63,7 @@ const ( // VSchema represents the denormalized version of SrvVSchema, // used for building routing plans. type VSchema struct { + MirrorRules map[string]*MirrorRule `json:"mirror_rules"` RoutingRules map[string]*RoutingRule `json:"routing_rules"` // globalTables contains the name of all tables in all keyspaces. If the @@ -79,13 +80,34 @@ type VSchema struct { created time.Time } +// MirrorRule represents one mirror rule. +type MirrorRule struct { + Error error + Percent float32 `json:"percent,omitempty"` + Table *Table `json:"table,omitempty"` +} + +// MarshalJSON returns a JSON representation of MirrorRule. +func (mr *MirrorRule) MarshalJSON() ([]byte, error) { + if mr.Error != nil { + return json.Marshal(mr.Error.Error()) + } + return json.Marshal(struct { + Percent float32 + Table *Table + }{ + Percent: mr.Percent, + Table: mr.Table, + }) +} + // RoutingRule represents one routing rule. type RoutingRule struct { Tables []*Table Error error } -// MarshalJSON returns a JSON representation of Column. +// MarshalJSON returns a JSON representation of RoutingRule. func (rr *RoutingRule) MarshalJSON() ([]byte, error) { if rr.Error != nil { return json.Marshal(rr.Error.Error()) @@ -324,6 +346,7 @@ func (source *Source) String() string { // BuildVSchema builds a VSchema from a SrvVSchema. func BuildVSchema(source *vschemapb.SrvVSchema, parser *sqlparser.Parser) (vschema *VSchema) { vschema = &VSchema{ + MirrorRules: make(map[string]*MirrorRule), RoutingRules: make(map[string]*RoutingRule), globalTables: make(map[string]*Table), uniqueVindexes: make(map[string]Vindex), @@ -338,6 +361,7 @@ func BuildVSchema(source *vschemapb.SrvVSchema, parser *sqlparser.Parser) (vsche buildRoutingRule(source, vschema, parser) buildShardRoutingRule(source, vschema) buildKeyspaceRoutingRule(source, vschema) + buildMirrorRule(source, vschema, parser) // Resolve auto-increments after routing rules are built since sequence tables also obey routing rules. resolveAutoIncrement(source, vschema, parser) return vschema @@ -895,7 +919,7 @@ func escapeQualifiedTable(qualifiedTableName string) (string, error) { } func extractTableParts(tableName string, allowUnqualified bool) (string, string, error) { - errMsgFormat := "invalid table name: %s, it must be of the " + errMsgFormat := "invalid table name: '%s', it must be of the " if allowUnqualified { errMsgFormat = errMsgFormat + "unqualified form or the " } @@ -1025,6 +1049,216 @@ func buildKeyspaceRoutingRule(source *vschemapb.SrvVSchema, vschema *VSchema) { vschema.KeyspaceRoutingRules = rulesMap } +func buildMirrorRule(source *vschemapb.SrvVSchema, vschema *VSchema, parser *sqlparser.Parser) { + if source.MirrorRules == nil { + return + } + + // Used to validate no mirror chains exist. + fromTableKeyspaces := make(map[string]string) + toKeyspaces := make(map[string]struct{}) + + for _, rule := range source.MirrorRules.Rules { + toTable := rule.ToTable + + // + // Forbid duplicate FromTables expressions. + // + + if _, ok := vschema.MirrorRules[rule.FromTable]; ok { + vschema.MirrorRules[rule.FromTable] = &MirrorRule{ + Error: vterrors.Errorf( + vtrpcpb.Code_ALREADY_EXISTS, + "from table: duplicate rule for entry '%s'", + rule.FromTable, + ), + } + continue + } + + // + // Parse and validate FromTable. + // + + // Separate tablet-type from rest of FromTable. + fromTableParts := strings.Split(rule.FromTable, "@") + if len(fromTableParts) == 0 { + vschema.MirrorRules[rule.FromTable] = &MirrorRule{ + Error: vterrors.Errorf( + vtrpcpb.Code_INVALID_ARGUMENT, + "from table: invalid table name: '%s'", + rule.FromTable, + ), + } + } + + // Escape and parse the FromTable, without table-type specifier. + fromTable, err := escapeQualifiedTable(fromTableParts[0]) + if err != nil { + vschema.MirrorRules[rule.FromTable] = &MirrorRule{ + Error: vterrors.Errorf( + vtrpcpb.Code_INVALID_ARGUMENT, + "from table: %s", + err.Error(), + ), + } + continue + } + fromKeyspace, fromTableName, err := parser.ParseTable(fromTable) + if err != nil { + vschema.MirrorRules[rule.FromTable] = &MirrorRule{ + Error: vterrors.Errorf( + vtrpcpb.Code_INVALID_ARGUMENT, + "from table: invalid table name: '%s'", + err.Error(), + ), + } + continue + } + + // Find the from table. + _, err = vschema.FindTable(fromKeyspace, fromTableName) + if err != nil { + vschema.MirrorRules[rule.FromTable] = &MirrorRule{ + Error: vterrors.Errorf( + vtrpcpb.Code_INVALID_ARGUMENT, + "from table: %s", + err.Error(), + ), + } + continue + } + + // Validate the table-type, if specified. + if len(fromTableParts) > 1 { + fromTabletTypeSuffix := "@" + strings.Join(fromTableParts[1:], "") + var ok bool + for _, tabletTypeSuffix := range TabletTypeSuffix { + if tabletTypeSuffix == fromTabletTypeSuffix { + ok = true + break + } + } + if !ok { + vschema.MirrorRules[rule.FromTable] = &MirrorRule{ + Error: vterrors.Errorf( + vtrpcpb.Code_INVALID_ARGUMENT, + "from table: invalid tablet type: '%s'", + rule.FromTable, + ), + } + continue + } + } + + // + // Parse and validate ToTable. + // + + // Forbid tablet-type specifier. + toTableParts := strings.Split(toTable, "@") + if len(toTableParts) != 1 || toTableParts[0] == "@" { + vschema.MirrorRules[rule.FromTable] = &MirrorRule{ + Error: vterrors.Errorf( + vtrpcpb.Code_INVALID_ARGUMENT, + "to table: tablet type may not be specified: '%s'", + rule.ToTable, + ), + } + continue + } + + // Escape and parse the table. + toTable, err = escapeQualifiedTable(toTable) + if err != nil { + vschema.MirrorRules[rule.FromTable] = &MirrorRule{ + Error: vterrors.Errorf( + vtrpcpb.Code_INVALID_ARGUMENT, + "to table: %s", + err.Error(), + ), + } + continue + } + toKeyspace, toTableName, err := parser.ParseTable(toTable) + if err != nil { + vschema.MirrorRules[rule.FromTable] = &MirrorRule{ + Error: vterrors.Errorf( + vtrpcpb.Code_INVALID_ARGUMENT, + "to table: invalid table name: '%s'", + rule.ToTable, + ), + } + continue + } + + // Forbid self-mirroring. + if fromKeyspace == toKeyspace { + vschema.MirrorRules[rule.FromTable] = &MirrorRule{ + Error: vterrors.Errorf( + vtrpcpb.Code_INVALID_ARGUMENT, + "to table: cannot reside in same keyspace as from table", + ), + } + continue + } + + // + // Find table in VSchema. + // + + t, err := vschema.FindTable(toKeyspace, toTableName) + if err != nil { + vschema.MirrorRules[rule.FromTable] = &MirrorRule{ + Error: vterrors.Errorf( + vtrpcpb.Code_INVALID_ARGUMENT, + "to table: %s", + err.Error(), + ), + } + continue + } + + // + // Return non-error mirror rule. + // + + vschema.MirrorRules[rule.FromTable] = &MirrorRule{ + Table: t, + Percent: rule.Percent, + } + + // + // Save some info for validating no mirror chains exist + // + + fromTableKeyspaces[rule.FromTable] = fromKeyspace + toKeyspaces[toKeyspace] = struct{}{} + } + + // Forbid mirror chains. Keyspaces which are the target of a mirror rule + // may not be the source of another. + for fromTable, rule := range vschema.MirrorRules { + if rule.Error != nil { + continue + } + fromKeyspace, ok := fromTableKeyspaces[fromTable] + if !ok { + rule.Error = vterrors.Errorf( + vtrpcpb.Code_INTERNAL, + "[BUG] from table: failed to determine keyspace", + ) + continue + } + if _, ok := toKeyspaces[fromKeyspace]; ok { + rule.Error = vterrors.Errorf( + vtrpcpb.Code_INVALID_ARGUMENT, + "mirror chaining is not allowed", + ) + } + } +} + // FindTable returns a pointer to the Table. If a keyspace is specified, only tables // from that keyspace are searched. If the specified keyspace is unsharded // and no tables matched, it's considered valid: FindTable will construct a table @@ -1325,6 +1559,28 @@ func (vschema *VSchema) GetAggregateUDFs() (udfs []string) { return } +// FindMirroredTable finds a mirror rule from the keyspace, table name and +// tablet type. +func (vschema *VSchema) FindMirrorRule(keyspace, tablename string, tabletType topodatapb.TabletType) (*MirrorRule, error) { + qualified := tablename + if keyspace != "" { + qualified = keyspace + "." + tablename + } + fqtn := qualified + TabletTypeSuffix[tabletType] + // First look for a fully qualified table name: keyspace.table@tablet_type. + // Then look for one without tablet type: keyspace.table. + for _, name := range []string{fqtn, qualified} { + mr, ok := vschema.MirrorRules[name] + if ok { + if mr.Error != nil { + return nil, mr.Error + } + return mr, nil + } + } + return nil, nil +} + // ByCost provides the interface needed for ColumnVindexes to // be sorted by cost order. type ByCost []*ColumnVindex diff --git a/go/vt/vtgate/vindexes/vschema_test.go b/go/vt/vtgate/vindexes/vschema_test.go index 40cba720a0c..1014c752037 100644 --- a/go/vt/vtgate/vindexes/vschema_test.go +++ b/go/vt/vtgate/vindexes/vschema_test.go @@ -838,6 +838,7 @@ func TestVSchemaRoutingRules(t *testing.T) { Keyspace: ks2, } want := &VSchema{ + MirrorRules: map[string]*MirrorRule{}, RoutingRules: map[string]*RoutingRule{ "rt1": { Error: errors.New("table rt1 has more than one target: [ks1.t1 ks2.t2]"), @@ -852,10 +853,10 @@ func TestVSchemaRoutingRules(t *testing.T) { Error: errors.New("duplicate rule for entry dup"), }, "badname": { - Error: errors.New("invalid table name: t1.t2.t3, it must be of the qualified form . (dots are not allowed in either name)"), + Error: errors.New("invalid table name: 't1.t2.t3', it must be of the qualified form . (dots are not allowed in either name)"), }, "unqualified": { - Error: errors.New("invalid table name: t1, it must be of the qualified form . (dots are not allowed in either name)"), + Error: errors.New("invalid table name: 't1', it must be of the qualified form . (dots are not allowed in either name)"), }, "badkeyspace": { Error: errors.New("VT05003: unknown database 'ks3' in vschema"), @@ -897,6 +898,282 @@ func TestVSchemaRoutingRules(t *testing.T) { assert.Equal(t, string(wantb), string(gotb), string(gotb)) } +func TestVSchemaMirrorRules(t *testing.T) { + input := vschemapb.SrvVSchema{ + MirrorRules: &vschemapb.MirrorRules{ + Rules: []*vschemapb.MirrorRule{ + // Empty FromTable not allowed. + { + FromTable: "", + ToTable: "ks2.ks2t1", + }, + // Invalid FromTable, needs to be .[@]. + { + FromTable: "ks1", + ToTable: "ks2.ks2t1", + }, + // Invalid ToTable, needs to be .
. + { + FromTable: "ks1.ks1t1", + ToTable: "ks2", + }, + // Invalid ToTable, needs to be .
. + { + FromTable: "ks1.ks1t2", + ToTable: "ks2.ks2t2.c", + }, + // OK, unsharded => unsharded. + { + FromTable: "ks1.ks1t3", + ToTable: "ks2.ks2t3", + Percent: 50, + }, + // Invalid ToTable, needs to be .
. + { + FromTable: "ks1.ks1t4", + ToTable: "ks2.ks2t4@replica", + }, + // OK, unsharded@tablet-type => unsharded. + { + FromTable: "ks1.ks1t5@replica", + ToTable: "ks2.ks2t5", + }, + // Invalid FromTable tablet type.. + { + FromTable: "ks1.ks1t6@stone", + ToTable: "ks2.ks2t6", + }, + // OK, sharded => sharded. + { + FromTable: "ks3.ks3t1", + ToTable: "ks4.ks4t1", + Percent: 50, + }, + // OK, unsharded => sharded. + { + FromTable: "ks1.ks1t7", + ToTable: "ks4.ks4t1", + Percent: 50, + }, + // Destination sharded table must be defined in VSchema. + { + FromTable: "ks1.ks1t8", + ToTable: "ks4.ks4t2", + Percent: 50, + }, + // Source sharded table must be defined in VSchema. + { + FromTable: "ks3.ks3t2", + ToTable: "ks4.ks4t1", + Percent: 50, + }, + // Keyspaces that are the target of a rule may not be the + // source of another. + { + FromTable: "ks2.ks2t9", + ToTable: "ks4.ks4t1", + Percent: 50, + }, + }, + }, + RoutingRules: &vschemapb.RoutingRules{}, + Keyspaces: map[string]*vschemapb.Keyspace{ + "ks1": { + ForeignKeyMode: vschemapb.Keyspace_unmanaged, + Tables: map[string]*vschemapb.Table{}, + }, + "ks2": { + ForeignKeyMode: vschemapb.Keyspace_unmanaged, + Tables: map[string]*vschemapb.Table{}, + }, + "ks3": { + Sharded: true, + ForeignKeyMode: vschemapb.Keyspace_unmanaged, + Vindexes: map[string]*vschemapb.Vindex{ + "stfu1": { + Type: "stfu", + }, + }, + Tables: map[string]*vschemapb.Table{ + "ks3t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{ + { + Column: "id", + Name: "stfu1", + }, + }, + }, + }, + }, + "ks4": { + Sharded: true, + ForeignKeyMode: vschemapb.Keyspace_unmanaged, + Vindexes: map[string]*vschemapb.Vindex{ + "stfu1": { + Type: "stfu", + }, + }, + Tables: map[string]*vschemapb.Table{ + "ks4t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{ + { + Column: "id", + Name: "stfu1", + }, + }, + }, + }, + }, + }, + } + got := BuildVSchema(&input, sqlparser.NewTestParser()) + + ks1 := &Keyspace{ + Name: "ks1", + Sharded: false, + } + ks2 := &Keyspace{ + Name: "ks2", + Sharded: false, + } + ks3 := &Keyspace{ + Name: "ks3", + Sharded: true, + } + ks4 := &Keyspace{ + Name: "ks4", + Sharded: true, + } + + vindex1 := &stFU{ + name: "stfu1", + } + + ks3t1 := &Table{ + Name: sqlparser.NewIdentifierCS("ks3t1"), + Keyspace: ks3, + ColumnVindexes: []*ColumnVindex{{ + Columns: []sqlparser.IdentifierCI{sqlparser.NewIdentifierCI("id")}, + Type: "stfu", + Name: "stfu1", + Vindex: vindex1, + isUnique: vindex1.IsUnique(), + cost: vindex1.Cost(), + }}, + } + ks3t1.Ordered = []*ColumnVindex{ + ks3t1.ColumnVindexes[0], + } + + ks4t1 := &Table{ + Name: sqlparser.NewIdentifierCS("ks4t1"), + Keyspace: ks4, + ColumnVindexes: []*ColumnVindex{{ + Columns: []sqlparser.IdentifierCI{sqlparser.NewIdentifierCI("id")}, + Type: "stfu", + Name: "stfu1", + Vindex: vindex1, + isUnique: vindex1.IsUnique(), + cost: vindex1.Cost(), + }}, + } + ks4t1.Ordered = []*ColumnVindex{ + ks4t1.ColumnVindexes[0], + } + + want := &VSchema{ + MirrorRules: map[string]*MirrorRule{ + "": { + Error: errors.New("from table: invalid table name: '', it must be of the qualified form . (dots are not allowed in either name)"), + }, + "ks1": { + Error: errors.New("from table: invalid table name: 'ks1', it must be of the qualified form . (dots are not allowed in either name)"), + }, + "ks1.ks1t1": { + Error: errors.New("to table: invalid table name: 'ks2', it must be of the qualified form . (dots are not allowed in either name)"), + }, + "ks1.ks1t2": { + Error: errors.New("to table: invalid table name: 'ks2.ks2t2.c', it must be of the qualified form . (dots are not allowed in either name)"), + }, + "ks1.ks1t3": { + Table: &Table{ + Name: sqlparser.NewIdentifierCS("ks2t3"), + }, + Percent: 50, + }, + "ks1.ks1t4": { + Error: errors.New("to table: tablet type may not be specified: 'ks2.ks2t4@replica'"), + }, + "ks1.ks1t5@replica": { + Table: &Table{ + Name: sqlparser.NewIdentifierCS("ks2t5"), + }, + }, + "ks1.ks1t6@stone": { + Error: errors.New("from table: invalid tablet type: 'ks1.ks1t6@stone'"), + }, + "ks3.ks3t1": { + Table: ks4t1, + Percent: 50, + }, + "ks1.ks1t7": { + Table: ks4t1, + Percent: 50, + }, + "ks1.ks1t8": { + Error: errors.New("to table: table ks4t2 not found"), + }, + "ks3.ks3t2": { + Error: errors.New("from table: table ks3t2 not found"), + }, + "ks2.ks2t9": { + Error: errors.New("mirror chaining is not allowed"), + }, + }, + RoutingRules: map[string]*RoutingRule{}, + Keyspaces: map[string]*KeyspaceSchema{ + "ks1": { + Keyspace: ks1, + ForeignKeyMode: vschemapb.Keyspace_unmanaged, + Tables: map[string]*Table{}, + Vindexes: map[string]Vindex{}, + }, + "ks2": { + ForeignKeyMode: vschemapb.Keyspace_unmanaged, + Keyspace: ks2, + Tables: map[string]*Table{}, + Vindexes: map[string]Vindex{}, + }, + "ks3": { + ForeignKeyMode: vschemapb.Keyspace_unmanaged, + Keyspace: ks3, + Tables: map[string]*Table{ + "ks3t1": ks3t1, + }, + Vindexes: map[string]Vindex{ + "stfu1": vindex1, + }, + }, + "ks4": { + ForeignKeyMode: vschemapb.Keyspace_unmanaged, + Keyspace: ks4, + Tables: map[string]*Table{ + "ks4t1": ks4t1, + }, + Vindexes: map[string]Vindex{ + "stfu1": vindex1, + }, + }, + }, + } + + gotb, err := json.MarshalIndent(got, "", " ") + assert.NoError(t, err) + wantb, err := json.MarshalIndent(want, "", " ") + assert.NoError(t, err) + assert.Equal(t, string(wantb), string(gotb), string(gotb)) +} + func TestChooseVindexForType(t *testing.T) { testcases := []struct { in querypb.Type @@ -1247,6 +1524,7 @@ func TestShardedVSchemaMultiColumnVindex(t *testing.T) { t1.ColumnVindexes[0], } want := &VSchema{ + MirrorRules: map[string]*MirrorRule{}, RoutingRules: map[string]*RoutingRule{}, globalTables: map[string]*Table{ "t1": t1, @@ -1323,6 +1601,7 @@ func TestShardedVSchemaNotOwned(t *testing.T) { t1.ColumnVindexes[1], t1.ColumnVindexes[0]} want := &VSchema{ + MirrorRules: map[string]*MirrorRule{}, RoutingRules: map[string]*RoutingRule{}, globalTables: map[string]*Table{ "t1": t1, @@ -1430,6 +1709,7 @@ func TestBuildVSchemaDupSeq(t *testing.T) { Keyspace: ksb, Type: "sequence"} want := &VSchema{ + MirrorRules: map[string]*MirrorRule{}, RoutingRules: map[string]*RoutingRule{}, globalTables: map[string]*Table{ "t1": nil, @@ -1491,6 +1771,7 @@ func TestBuildVSchemaDupTable(t *testing.T) { Keyspace: ksb, } want := &VSchema{ + MirrorRules: map[string]*MirrorRule{}, RoutingRules: map[string]*RoutingRule{}, globalTables: map[string]*Table{ "t1": nil, @@ -1620,6 +1901,7 @@ func TestBuildVSchemaDupVindex(t *testing.T) { t2.ColumnVindexes[0], } want := &VSchema{ + MirrorRules: map[string]*MirrorRule{}, RoutingRules: map[string]*RoutingRule{}, globalTables: map[string]*Table{ "t1": nil, @@ -2206,6 +2488,7 @@ func TestSequence(t *testing.T) { t2.ColumnVindexes[0], } want := &VSchema{ + MirrorRules: map[string]*MirrorRule{}, RoutingRules: map[string]*RoutingRule{}, globalTables: map[string]*Table{ "seq": seq, diff --git a/go/vt/vtgate/vschema_manager_test.go b/go/vt/vtgate/vschema_manager_test.go index 32f83f0021a..4dfbf46db0a 100644 --- a/go/vt/vtgate/vschema_manager_test.go +++ b/go/vt/vtgate/vschema_manager_test.go @@ -234,6 +234,7 @@ func TestVSchemaUpdate(t *testing.T) { }, }, expected: &vindexes.VSchema{ + MirrorRules: map[string]*vindexes.MirrorRule{}, RoutingRules: map[string]*vindexes.RoutingRule{}, Keyspaces: map[string]*vindexes.KeyspaceSchema{ "ks": { @@ -456,6 +457,7 @@ func TestVSchemaUDFsUpdate(t *testing.T) { }, nil) utils.MustMatchFn(".globalTables", ".uniqueVindexes")(t, &vindexes.VSchema{ + MirrorRules: map[string]*vindexes.MirrorRule{}, RoutingRules: map[string]*vindexes.RoutingRule{}, Keyspaces: map[string]*vindexes.KeyspaceSchema{ "ks": { @@ -778,6 +780,7 @@ func TestVSchemaUpdateWithFKReferenceToInternalTables(t *testing.T) { }, nil) utils.MustMatchFn(".globalTables", ".uniqueVindexes")(t, &vindexes.VSchema{ + MirrorRules: map[string]*vindexes.MirrorRule{}, RoutingRules: map[string]*vindexes.RoutingRule{}, Keyspaces: map[string]*vindexes.KeyspaceSchema{ "ks": { @@ -827,6 +830,7 @@ func makeTestVSchema(ks string, sharded bool, tbls map[string]*vindexes.Table) * func makeTestEmptyVSchema() *vindexes.VSchema { return &vindexes.VSchema{ + MirrorRules: map[string]*vindexes.MirrorRule{}, RoutingRules: map[string]*vindexes.RoutingRule{}, Keyspaces: map[string]*vindexes.KeyspaceSchema{}, } diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine_test.go b/go/vt/vttablet/tabletserver/vstreamer/engine_test.go index b0b31e256cc..56a634ca70e 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine_test.go @@ -106,6 +106,7 @@ func TestUpdateVSchema(t *testing.T) { expectUpdateCount(t, startCount+1) want := `{ + "mirror_rules": {}, "routing_rules": {}, "keyspaces": { "vttest": {