diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 850d9d6f6db..990492bd191 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -1029,11 +1029,12 @@ func (wr *Wrangler) prepareMaterializerStreams(ctx context.Context, ms *vtctldat insertMap := make(map[string]string, len(mz.targetShards)) for _, targetShard := range mz.targetShards { sourceShards := mz.filterSourceShards(targetShard) - // keyRangesEqual allows us to optimize the stream for the cases where - // while the target keyspace may be sharded, the shard mapping is 1:1 - // between the source and target and the key ranges are equal. This - // can be done, for example, when doing shard by shard migrations -- - // migrating a single shard at a time. + // streamKeyRangesEqual allows us to optimize the stream for the cases + // where while the target keyspace may be sharded, the target shard has + // a single source shard to stream data from and the target and source + // shard have equal key ranges. This can be done, for example, when doing + // shard by shard migrations -- migrating a single shard at a time between + // sharded source and sharded target keyspaces. streamKeyRangesEqual := false if len(sourceShards) == 1 && key.KeyRangeEqual(sourceShards[0].KeyRange, targetShard.KeyRange) { streamKeyRangesEqual = true diff --git a/go/vt/wrangler/materializer_env_test.go b/go/vt/wrangler/materializer_env_test.go index 6c236a038bf..b98621ffa1b 100644 --- a/go/vt/wrangler/materializer_env_test.go +++ b/go/vt/wrangler/materializer_env_test.go @@ -200,12 +200,13 @@ func (env *testMaterializerEnv) addTablet(id int, keyspace, shard string, tablet }, } env.tablets[id] = tablet - if err := env.wr.TopoServer().InitTablet(context.Background(), tablet, false /* allowPrimaryOverride */, true /* createShardAndKeyspace */, false /* allowUpdate */); err != nil { + if err := env.wr.ts.InitTablet(context.Background(), tablet, false /* allowPrimaryOverride */, true /* createShardAndKeyspace */, false /* allowUpdate */); err != nil { panic(err) } if tabletType == topodatapb.TabletType_PRIMARY { _, err := env.wr.ts.UpdateShardFields(context.Background(), keyspace, shard, func(si *topo.ShardInfo) error { si.PrimaryAlias = tablet.Alias + si.IsPrimaryServing = true return nil }) if err != nil { diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index 3984641fcf8..8e501e0c0f8 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -19,12 +19,15 @@ package wrangler import ( "context" "fmt" + "regexp" + "slices" "sort" "strings" "testing" "time" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" "vitess.io/vitess/go/sqltypes" @@ -3510,3 +3513,164 @@ func TestAddTablesToVSchema(t *testing.T) { }) } } + +// TestKeyRangesEqualOptimization tests that we optimize the source +// filtering when there's only one source shard for the stream and +// its keyrange is equal to the target shard for the stream. This +// means that even if the target keyspace is sharded, the source +// does not need to perform the in_keyrange filtering. +func TestKeyRangesEqualOptimization(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + workflow := "testwf" + sourceKs := "sourceks" + targetKs := "targetks" + table := "t1" + mzi := vtctldatapb.MaterializationIntent_MOVETABLES + tableMaterializeSettings := []*vtctldatapb.TableMaterializeSettings{ + { + TargetTable: table, + SourceExpression: fmt.Sprintf("select * from %s", table), + }, + } + targetVSchema := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "xxhash": { + Type: "xxhash", + }, + }, + Tables: map[string]*vschemapb.Table{ + table: { + ColumnVindexes: []*vschemapb.ColumnVindex{ + { + Column: "id", + Name: "xxhash", + }, + }, + }, + }, + } + + testCases := []struct { + name string + ms *vtctldatapb.MaterializeSettings + sourceShards []string + targetShards []string + wantBls map[string]*binlogdatapb.BinlogSource + }{ + { + name: "no in_keyrange filter -- partial, keyranges equal", + ms: &vtctldatapb.MaterializeSettings{ + MaterializationIntent: mzi, + Workflow: workflow, + TargetKeyspace: targetKs, + SourceKeyspace: sourceKs, + Cell: "cell", + SourceShards: []string{"-80"}, // Partial MoveTables just for this shard + TableSettings: tableMaterializeSettings, + }, + sourceShards: []string{"-80", "80-"}, + targetShards: []string{"-80", "80-"}, + wantBls: map[string]*binlogdatapb.BinlogSource{ + "-80": { + Keyspace: sourceKs, + Shard: "-80", // Keyranges are equal between the source and target + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s", table), + }, + }, + }, + }, + }, + }, + { + name: "in_keyrange filter -- unequal shards", + ms: &vtctldatapb.MaterializeSettings{ + MaterializationIntent: mzi, + Workflow: workflow, + TargetKeyspace: targetKs, + SourceKeyspace: sourceKs, + Cell: "cell", + TableSettings: tableMaterializeSettings, + }, + sourceShards: []string{"-"}, + targetShards: []string{"-80", "80-"}, + wantBls: map[string]*binlogdatapb.BinlogSource{ + "-80": { + Keyspace: sourceKs, + Shard: "-", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s where in_keyrange(id, '%s.xxhash', '-80')", table, targetKs), + }, + }, + }, + }, + "80-": { + Keyspace: sourceKs, + Shard: "-", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s where in_keyrange(id, '%s.xxhash', '80-')", table, targetKs), + }, + }, + }, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + env := newTestMaterializerEnv(t, ctx, tc.ms, tc.sourceShards, tc.targetShards) + defer env.close() + + // Target is always sharded. + err := env.wr.ts.SaveVSchema(ctx, targetKs, targetVSchema) + require.NoError(t, err, "SaveVSchema failed: %v", err) + + for _, tablet := range env.tablets { + // Queries will only be executed on primary tablets in the target keyspace. + if tablet.Keyspace != targetKs || tablet.Type != topodatapb.TabletType_PRIMARY { + continue + } + env.tmc.expectVRQuery(int(tablet.Alias.Uid), mzSelectFrozenQuery, &sqltypes.Result{}) + // If we are doing a partial MoveTables, we will only perform the workflow + // stream creation / INSERT statment on the shard(s) we're migrating. + if len(tc.ms.SourceShards) > 0 && !slices.Contains(tc.ms.SourceShards, tablet.Shard) { + continue + } + bls := tc.wantBls[tablet.Shard] + require.NotNil(t, bls, "no binlog source defined for tablet %+v", tablet) + if bls.Filter != nil { + for i, rule := range bls.Filter.Rules { + // It's escaped in the SQL statement. + bls.Filter.Rules[i].Filter = strings.ReplaceAll(rule.Filter, `'`, `\'`) + } + } + blsBytes, err := prototext.Marshal(bls) + require.NoError(t, err, "failed to marshal binlog source: %v", err) + // This is also escaped in the SQL statement. + blsStr := strings.ReplaceAll(string(blsBytes), `"`, `\"`) + // Escape the string for the regexp comparison. + blsStr = regexp.QuoteMeta(blsStr) + // For some reason we end up with an extra slash added by QuoteMeta for the + // escaped single quotes in the filter. + blsStr = strings.ReplaceAll(blsStr, `\\\\`, `\\\`) + expectedQuery := fmt.Sprintf(`/insert into _vt.vreplication.* values \('%s', '%s'`, workflow, blsStr) + env.tmc.expectVRQuery(int(tablet.Alias.Uid), expectedQuery, &sqltypes.Result{}) + } + + _, err = env.wr.prepareMaterializerStreams(ctx, tc.ms) + require.NoError(t, err, "prepareMaterializerStreams failed: %v", err) + }) + } +}