Skip to content

Commit

Permalink
Add unit test
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Sep 29, 2024
1 parent 1cb3de5 commit f5eb4e2
Show file tree
Hide file tree
Showing 3 changed files with 307 additions and 6 deletions.
26 changes: 22 additions & 4 deletions go/vt/vtctl/workflow/materializer_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,18 @@ func newTestMaterializerEnv(t *testing.T, ctx context.Context, ms *vtctldatapb.M
logger := logutil.NewConsoleLogger()
require.NoError(t, topoServ.RebuildSrvVSchema(ctx, []string{"cell"}))

tabletID := 100
tabletID := startingSourceTabletUID
sourceShardsMap := make(map[string]any)
for _, shard := range sourceShards {
sourceShardsMap[shard] = nil
require.NoError(t, topoServ.CreateShard(ctx, ms.SourceKeyspace, shard))
_ = env.addTablet(t, tabletID, env.ms.SourceKeyspace, shard, topodatapb.TabletType_PRIMARY)
tabletID += 10
tabletID += tabletUIDStep
}

require.NoError(t, topotools.RebuildKeyspace(ctx, logger, topoServ, ms.SourceKeyspace, []string{"cell"}, false))

tabletID = 200
tabletID = startingTargetTabletUID
for _, shard := range targetShards {
if ms.SourceKeyspace == ms.TargetKeyspace {
if _, ok := sourceShardsMap[shard]; ok {
Expand All @@ -112,7 +112,7 @@ func newTestMaterializerEnv(t *testing.T, ctx context.Context, ms *vtctldatapb.M
}
require.NoError(t, topoServ.CreateShard(ctx, ms.TargetKeyspace, shard))
_ = env.addTablet(t, tabletID, env.ms.TargetKeyspace, shard, topodatapb.TabletType_PRIMARY)
tabletID += 10
tabletID += tabletUIDStep
}

for _, ts := range ms.TableSettings {
Expand Down Expand Up @@ -230,6 +230,9 @@ type testMaterializerTMClient struct {

// Used to override the response to ReadVReplicationWorkflow.
readVReplicationWorkflow readVReplicationWorkflowFunc

// Responses to GetSchema RPCs for individual tablets.
getSchemaResponses map[uint32]*tabletmanagerdatapb.SchemaDefinition
}

func newTestMaterializerTMClient(keyspace string, sourceShards []string, tableSettings []*vtctldatapb.TableMaterializeSettings) *testMaterializerTMClient {
Expand All @@ -240,6 +243,7 @@ func newTestMaterializerTMClient(keyspace string, sourceShards []string, tableSe
tableSettings: tableSettings,
vrQueries: make(map[int][]*queryResult),
createVReplicationWorkflowRequests: make(map[uint32]*createVReplicationWorkflowRequestResponse),
getSchemaResponses: make(map[uint32]*tabletmanagerdatapb.SchemaDefinition),
}
}

Expand Down Expand Up @@ -314,10 +318,24 @@ func (tmc *testMaterializerTMClient) DeleteVReplicationWorkflow(ctx context.Cont
}, nil
}

func (tmc *testMaterializerTMClient) SetGetSchemaResponse(tabletUID int, res *tabletmanagerdatapb.SchemaDefinition) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

if tmc.getSchemaResponses == nil {
tmc.getSchemaResponses = make(map[uint32]*tabletmanagerdatapb.SchemaDefinition)
}
tmc.getSchemaResponses[uint32(tabletUID)] = res
}

func (tmc *testMaterializerTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.GetSchemaRequest) (*tabletmanagerdatapb.SchemaDefinition, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

if tmc.getSchemaResponses != nil && tmc.getSchemaResponses[tablet.Alias.Uid] != nil {
return tmc.getSchemaResponses[tablet.Alias.Uid], nil
}

schemaDefn := &tabletmanagerdatapb.SchemaDefinition{}
for _, table := range request.Tables {
if table == "/.*/" {
Expand Down
283 changes: 283 additions & 0 deletions go/vt/vtctl/workflow/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,289 @@ func TestMoveTablesDDLFlag(t *testing.T) {
}
}

func TestShardedAutoIncHandling(t *testing.T) {
t1DDL := "create table t1 (id int not null auto_increment primary key, c1 varchar(10))"
ms := &vtctldatapb.MaterializeSettings{
Workflow: "workflow",
SourceKeyspace: "sourceks",
TargetKeyspace: "targetks",
TableSettings: []*vtctldatapb.TableMaterializeSettings{{
TargetTable: "t1",
CreateDdl: t1DDL,
SourceExpression: "select * from t1",
}},
WorkflowOptions: &vtctldatapb.WorkflowOptions{},
}

type testcase struct {
name string
value vtctldatapb.ShardedAutoIncrementHandling
globalKeyspace string
targetShards []string
targetVSchema *vschemapb.Keyspace
wantTargetVSchema *vschemapb.Keyspace
expectQueries []string
expectErr string
}
testcases := []testcase{
{
name: "global-keyspace does not exist",
globalKeyspace: "foo",
expectErr: "global-keyspace foo does not exist",
},
{
name: "leave",
globalKeyspace: "sourceks",
targetShards: []string{"-80", "80-"},
targetVSchema: &vschemapb.Keyspace{
Sharded: true,
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{
{
Name: "xxhash",
Column: "id",
},
},
},
},
Vindexes: map[string]*vschemapb.Vindex{
"xxhash": {
Type: "xxhash",
},
},
},
value: vtctldatapb.ShardedAutoIncrementHandling_LEAVE,
wantTargetVSchema: &vschemapb.Keyspace{
Sharded: true,
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{
{
Name: "xxhash",
Column: "id",
},
},
},
},
Vindexes: map[string]*vschemapb.Vindex{
"xxhash": {
Type: "xxhash",
},
},
},
expectQueries: []string{
t1DDL, // Unchanged
},
},
{
name: "remove",
globalKeyspace: "sourceks",
targetShards: []string{"-80", "80-"},
targetVSchema: &vschemapb.Keyspace{
Sharded: true,
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{
{
Name: "xxhash",
Column: "id",
},
},
},
},
Vindexes: map[string]*vschemapb.Vindex{
"xxhash": {
Type: "xxhash",
},
},
},
value: vtctldatapb.ShardedAutoIncrementHandling_REMOVE,
wantTargetVSchema: &vschemapb.Keyspace{
Sharded: true,
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{
{
Name: "xxhash",
Column: "id",
},
},
},
},
Vindexes: map[string]*vschemapb.Vindex{
"xxhash": {
Type: "xxhash",
},
},
},
expectQueries: []string{ // auto_increment clause removed
`create table t1 (
id int not null primary key,
c1 varchar(10)
)`,
},
},
{
name: "replace, but vschema AutoIncrement already in place",
globalKeyspace: "sourceks",
targetShards: []string{"-80", "80-"},
targetVSchema: &vschemapb.Keyspace{
Sharded: true,
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{
{
Name: "xxhash",
Column: "id",
},
},
AutoIncrement: &vschemapb.AutoIncrement{ // AutoIncrement definition exists
Column: "id",
Sequence: "t1_non_default_seq_name",
},
},
},
Vindexes: map[string]*vschemapb.Vindex{
"xxhash": {
Type: "xxhash",
},
},
},
value: vtctldatapb.ShardedAutoIncrementHandling_REPLACE,
wantTargetVSchema: &vschemapb.Keyspace{
Sharded: true,
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{
{
Name: "xxhash",
Column: "id",
},
},
AutoIncrement: &vschemapb.AutoIncrement{ // AutoIncrement definition left alone
Column: "id",
Sequence: "t1_non_default_seq_name",
},
},
},
Vindexes: map[string]*vschemapb.Vindex{
"xxhash": {
Type: "xxhash",
},
},
},
expectQueries: []string{ // auto_increment clause removed
`create table t1 (
id int not null primary key,
c1 varchar(10)
)`,
},
},
{
name: "replace",
globalKeyspace: "sourceks",
targetShards: []string{"-80", "80-"},
targetVSchema: &vschemapb.Keyspace{
Sharded: true,
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{
{
Name: "xxhash",
Column: "id",
},
},
},
},
Vindexes: map[string]*vschemapb.Vindex{
"xxhash": {
Type: "xxhash",
},
},
},
value: vtctldatapb.ShardedAutoIncrementHandling_REPLACE,
wantTargetVSchema: &vschemapb.Keyspace{
Sharded: true,
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{
{
Name: "xxhash",
Column: "id",
},
},
AutoIncrement: &vschemapb.AutoIncrement{ // AutoIncrement definition added
Column: "id",
Sequence: "t1_seq",
},
},
},
Vindexes: map[string]*vschemapb.Vindex{
"xxhash": {
Type: "xxhash",
},
},
},
expectQueries: []string{ // auto_increment clause removed
`create table t1 (
id int not null primary key,
c1 varchar(10)
)`,
},
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if tc.targetShards == nil {
tc.targetShards = []string{"0"}
}
env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, tc.targetShards)
defer env.close()

env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{})
for i := range tc.targetShards {
uid := startingTargetTabletUID + (i * tabletUIDStep)
for _, query := range tc.expectQueries {
env.tmc.expectVRQuery(uid, query, &sqltypes.Result{})
}
env.tmc.expectVRQuery(uid, mzGetCopyState, &sqltypes.Result{})
env.tmc.expectVRQuery(uid, mzGetLatestCopyState, &sqltypes.Result{})
env.tmc.SetGetSchemaResponse(uid, &tabletmanagerdatapb.SchemaDefinition{}) // So that the schema is copied from the source
}

if tc.targetVSchema != nil {
err := env.ws.ts.SaveVSchema(ctx, ms.TargetKeyspace, tc.targetVSchema)
require.NoError(t, err)
}

_, err := env.ws.MoveTablesCreate(ctx, &vtctldatapb.MoveTablesCreateRequest{
Workflow: ms.Workflow,
SourceKeyspace: ms.SourceKeyspace,
TargetKeyspace: ms.TargetKeyspace,
IncludeTables: []string{"t1"},
WorkflowOptions: &vtctldatapb.WorkflowOptions{
StripShardedAutoIncrement: tc.value,
GlobalKeyspace: tc.globalKeyspace,
},
})
if tc.expectErr != "" {
require.EqualError(t, err, tc.expectErr)
} else {
require.NoError(t, err)
if tc.wantTargetVSchema != nil {
targetVSchema, err := env.ws.ts.GetVSchema(ctx, ms.TargetKeyspace)
require.NoError(t, err)
require.True(t, proto.Equal(targetVSchema, tc.wantTargetVSchema))
}
}
})
}
}

// TestMoveTablesNoRoutingRules confirms that MoveTables does not create routing rules if --no-routing-rules is specified.
func TestMoveTablesNoRoutingRules(t *testing.T) {
ms := &vtctldatapb.MaterializeSettings{
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1440,8 +1440,8 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
s.Logger().Infof("Found tables to move: %s", strings.Join(tables, ","))

if !vschema.Sharded {
// Save the original in case we need to restore it for a late failure
// in the defer().
// Save the original in case we need to restore it for a late failure in
// the defer().
origVSchema = vschema.CloneVT()
if err := s.addTablesToVSchema(ctx, sourceKeyspace, vschema, tables, externalTopo == nil); err != nil {
return nil, err
Expand Down

0 comments on commit f5eb4e2

Please sign in to comment.