Skip to content

Commit

Permalink
Add vtctlclient/wrangler unit test
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Oct 29, 2023
1 parent b03bca5 commit 6528a3a
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 6 deletions.
11 changes: 6 additions & 5 deletions go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1029,11 +1029,12 @@ func (wr *Wrangler) prepareMaterializerStreams(ctx context.Context, ms *vtctldat
insertMap := make(map[string]string, len(mz.targetShards))
for _, targetShard := range mz.targetShards {
sourceShards := mz.filterSourceShards(targetShard)
// keyRangesEqual allows us to optimize the stream for the cases where
// while the target keyspace may be sharded, the shard mapping is 1:1
// between the source and target and the key ranges are equal. This
// can be done, for example, when doing shard by shard migrations --
// migrating a single shard at a time.
// streamKeyRangesEqual allows us to optimize the stream for the cases
// where while the target keyspace may be sharded, the target shard has
// a single source shard to stream data from and the target and source
// shard have equal key ranges. This can be done, for example, when doing
// shard by shard migrations -- migrating a single shard at a time between
// sharded source and sharded target keyspaces.
streamKeyRangesEqual := false
if len(sourceShards) == 1 && key.KeyRangeEqual(sourceShards[0].KeyRange, targetShard.KeyRange) {
streamKeyRangesEqual = true
Expand Down
3 changes: 2 additions & 1 deletion go/vt/wrangler/materializer_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,13 @@ func (env *testMaterializerEnv) addTablet(id int, keyspace, shard string, tablet
},
}
env.tablets[id] = tablet
if err := env.wr.TopoServer().InitTablet(context.Background(), tablet, false /* allowPrimaryOverride */, true /* createShardAndKeyspace */, false /* allowUpdate */); err != nil {
if err := env.wr.ts.InitTablet(context.Background(), tablet, false /* allowPrimaryOverride */, true /* createShardAndKeyspace */, false /* allowUpdate */); err != nil {
panic(err)
}
if tabletType == topodatapb.TabletType_PRIMARY {
_, err := env.wr.ts.UpdateShardFields(context.Background(), keyspace, shard, func(si *topo.ShardInfo) error {
si.PrimaryAlias = tablet.Alias
si.IsPrimaryServing = true
return nil
})
if err != nil {
Expand Down
164 changes: 164 additions & 0 deletions go/vt/wrangler/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ package wrangler
import (
"context"
"fmt"
"regexp"
"slices"
"sort"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -3510,3 +3513,164 @@ func TestAddTablesToVSchema(t *testing.T) {
})
}
}

// TestKeyRangesEqualOptimization tests that we optimize the source
// filtering when there's only one source shard for the stream and
// its keyrange is equal to the target shard for the stream. This
// means that even if the target keyspace is sharded, the source
// does not need to perform the in_keyrange filtering.
func TestKeyRangesEqualOptimization(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
workflow := "testwf"
sourceKs := "sourceks"
targetKs := "targetks"
table := "t1"
mzi := vtctldatapb.MaterializationIntent_MOVETABLES
tableMaterializeSettings := []*vtctldatapb.TableMaterializeSettings{
{
TargetTable: table,
SourceExpression: fmt.Sprintf("select * from %s", table),
},
}
targetVSchema := &vschemapb.Keyspace{
Sharded: true,
Vindexes: map[string]*vschemapb.Vindex{
"xxhash": {
Type: "xxhash",
},
},
Tables: map[string]*vschemapb.Table{
table: {
ColumnVindexes: []*vschemapb.ColumnVindex{
{
Column: "id",
Name: "xxhash",
},
},
},
},
}

testCases := []struct {
name string
ms *vtctldatapb.MaterializeSettings
sourceShards []string
targetShards []string
wantBls map[string]*binlogdatapb.BinlogSource
}{
{
name: "no in_keyrange filter -- partial, keyranges equal",
ms: &vtctldatapb.MaterializeSettings{
MaterializationIntent: mzi,
Workflow: workflow,
TargetKeyspace: targetKs,
SourceKeyspace: sourceKs,
Cell: "cell",
SourceShards: []string{"-80"}, // Partial MoveTables just for this shard
TableSettings: tableMaterializeSettings,
},
sourceShards: []string{"-80", "80-"},
targetShards: []string{"-80", "80-"},
wantBls: map[string]*binlogdatapb.BinlogSource{
"-80": {
Keyspace: sourceKs,
Shard: "-80", // Keyranges are equal between the source and target
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{
{
Match: table,
Filter: fmt.Sprintf("select * from %s", table),
},
},
},
},
},
},
{
name: "in_keyrange filter -- unequal shards",
ms: &vtctldatapb.MaterializeSettings{
MaterializationIntent: mzi,
Workflow: workflow,
TargetKeyspace: targetKs,
SourceKeyspace: sourceKs,
Cell: "cell",
TableSettings: tableMaterializeSettings,
},
sourceShards: []string{"-"},
targetShards: []string{"-80", "80-"},
wantBls: map[string]*binlogdatapb.BinlogSource{
"-80": {
Keyspace: sourceKs,
Shard: "-",
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{
{
Match: table,
Filter: fmt.Sprintf("select * from %s where in_keyrange(id, '%s.xxhash', '-80')", table, targetKs),
},
},
},
},
"80-": {
Keyspace: sourceKs,
Shard: "-",
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{
{
Match: table,
Filter: fmt.Sprintf("select * from %s where in_keyrange(id, '%s.xxhash', '80-')", table, targetKs),
},
},
},
},
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
env := newTestMaterializerEnv(t, ctx, tc.ms, tc.sourceShards, tc.targetShards)
defer env.close()

// Target is always sharded.
err := env.wr.ts.SaveVSchema(ctx, targetKs, targetVSchema)
require.NoError(t, err, "SaveVSchema failed: %v", err)

for _, tablet := range env.tablets {
// Queries will only be executed on primary tablets in the target keyspace.
if tablet.Keyspace != targetKs || tablet.Type != topodatapb.TabletType_PRIMARY {
continue
}
env.tmc.expectVRQuery(int(tablet.Alias.Uid), mzSelectFrozenQuery, &sqltypes.Result{})
// If we are doing a partial MoveTables, we will only perform the workflow
// stream creation / INSERT statment on the shard(s) we're migrating.
if len(tc.ms.SourceShards) > 0 && !slices.Contains(tc.ms.SourceShards, tablet.Shard) {
continue
}
bls := tc.wantBls[tablet.Shard]
require.NotNil(t, bls, "no binlog source defined for tablet %+v", tablet)
if bls.Filter != nil {
for i, rule := range bls.Filter.Rules {
// It's escaped in the SQL statement.
bls.Filter.Rules[i].Filter = strings.ReplaceAll(rule.Filter, `'`, `\'`)
}
}
blsBytes, err := prototext.Marshal(bls)
require.NoError(t, err, "failed to marshal binlog source: %v", err)
// This is also escaped in the SQL statement.
blsStr := strings.ReplaceAll(string(blsBytes), `"`, `\"`)
// Escape the string for the regexp comparison.
blsStr = regexp.QuoteMeta(blsStr)
// For some reason we end up with an extra slash added by QuoteMeta for the
// escaped single quotes in the filter.
blsStr = strings.ReplaceAll(blsStr, `\\\\`, `\\\`)
expectedQuery := fmt.Sprintf(`/insert into _vt.vreplication.* values \('%s', '%s'`, workflow, blsStr)
env.tmc.expectVRQuery(int(tablet.Alias.Uid), expectedQuery, &sqltypes.Result{})
}

_, err = env.wr.prepareMaterializerStreams(ctx, tc.ms)
require.NoError(t, err, "prepareMaterializerStreams failed: %v", err)
})
}
}

0 comments on commit 6528a3a

Please sign in to comment.