From 6fc7fdcfddf10b7e1166be1009c2ba20ab71f5c5 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 30 Sep 2024 11:07:08 -0400 Subject: [PATCH] Changes from self review Signed-off-by: Matt Lord --- .../vreplication/movetables/movetables.go | 7 +- go/vt/vtctl/workflow/materializer_test.go | 80 ++++++++++++------- go/vt/vtctl/workflow/traffic_switcher.go | 9 ++- 3 files changed, 64 insertions(+), 32 deletions(-) diff --git a/go/cmd/vtctldclient/command/vreplication/movetables/movetables.go b/go/cmd/vtctldclient/command/vreplication/movetables/movetables.go index ea0546e16d0..92d1c12423b 100644 --- a/go/cmd/vtctldclient/command/vreplication/movetables/movetables.go +++ b/go/cmd/vtctldclient/command/vreplication/movetables/movetables.go @@ -23,8 +23,9 @@ import ( "github.com/spf13/cobra" "vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/common" - "vitess.io/vitess/go/vt/proto/vtctldata" "vitess.io/vitess/go/vt/topo/topoproto" + + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) var ( @@ -56,7 +57,7 @@ func registerCommands(root *cobra.Command) { create.Flags().StringVar(&createOptions.WorkflowOptions.TenantId, "tenant-id", "", "(EXPERIMENTAL: Multi-tenant migrations only) The tenant ID to use for the MoveTables workflow into a multi-tenant keyspace.") create.Flags().StringSliceVar(&createOptions.WorkflowOptions.Shards, "shards", nil, "(EXPERIMENTAL: Multi-tenant migrations only) Specify that vreplication streams should only be created on this subset of target shards. Warning: you should first ensure that all rows on the source route to the specified subset of target shards using your VIndex of choice or you could lose data during the migration.") create.Flags().StringVar(&createOptions.WorkflowOptions.GlobalKeyspace, "global-keyspace", "", "If specified, then attempt to create any global resources here such as sequence tables needed to replace auto_increment table clauses that are removed due to --remove-sharded-auto-increment=REPLACE. The value must be an unsharded keyspace that already exists.") - create.Flags().StringVar(&createOptions.StripShardedAutoIncrement, "remove-sharded-auto-increment", vtctldata.ShardedAutoIncrementHandling_REMOVE.String(), + create.Flags().StringVar(&createOptions.StripShardedAutoIncrement, "remove-sharded-auto-increment", vtctldatapb.ShardedAutoIncrementHandling_REMOVE.String(), fmt.Sprintf("If moving the table(s) to a sharded keyspace, remove any MySQL auto_increment clauses when copying the schema to the target as sharded keyspaces should rely on either user/application generated values or Vitess sequences to ensure uniqueness. If REPLACE is specified then they are automatically replaced by Vitess sequence definitions. (options are: %s)", stripShardedAutoIncOptions)) base.AddCommand(create) @@ -110,7 +111,7 @@ func init() { common.RegisterCommandHandler("MoveTables", registerCommands) sb := strings.Builder{} - for i, v := range vtctldata.ShardedAutoIncrementHandling_name { + for i, v := range vtctldatapb.ShardedAutoIncrementHandling_name { if i > 0 { sb.WriteByte(',') } diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index c84388128a6..59151f5b9f7 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -602,15 +602,16 @@ func TestMoveTablesDDLFlag(t *testing.T) { } func TestShardedAutoIncHandling(t *testing.T) { - t1DDL := "create table t1 (id int not null auto_increment primary key, c1 varchar(10))" + tableName := "t1" + tableDDL := fmt.Sprintf("create table %s (id int not null auto_increment primary key, c1 varchar(10))", tableName) ms := &vtctldatapb.MaterializeSettings{ Workflow: "workflow", SourceKeyspace: "sourceks", TargetKeyspace: "targetks", TableSettings: []*vtctldatapb.TableMaterializeSettings{{ - TargetTable: "t1", - CreateDdl: t1DDL, - SourceExpression: "select * from t1", + TargetTable: tableName, + CreateDdl: tableDDL, + SourceExpression: fmt.Sprintf("select * from %s", tableName), }}, WorkflowOptions: &vtctldatapb.WorkflowOptions{}, } @@ -631,14 +632,39 @@ func TestShardedAutoIncHandling(t *testing.T) { globalKeyspace: "foo", expectErr: "global-keyspace foo does not exist", }, + { + name: "global keyspace is sharded", + globalKeyspace: ms.TargetKeyspace, + targetShards: []string{"-80", "80-"}, + targetVSchema: &vschemapb.Keyspace{ + Sharded: true, + Tables: map[string]*vschemapb.Table{ + tableName: { + ColumnVindexes: []*vschemapb.ColumnVindex{ + { + Name: "xxhash", + Column: "id", + }, + }, + }, + }, + Vindexes: map[string]*vschemapb.Vindex{ + "xxhash": { + Type: "xxhash", + }, + }, + }, + expectErr: fmt.Sprintf("global-keyspace %s is sharded and thus cannot be used for global resources", + ms.TargetKeyspace), + }, { name: "leave", - globalKeyspace: "sourceks", + globalKeyspace: ms.SourceKeyspace, targetShards: []string{"-80", "80-"}, targetVSchema: &vschemapb.Keyspace{ Sharded: true, Tables: map[string]*vschemapb.Table{ - "t1": { + tableName: { ColumnVindexes: []*vschemapb.ColumnVindex{ { Name: "xxhash", @@ -657,7 +683,7 @@ func TestShardedAutoIncHandling(t *testing.T) { wantTargetVSchema: &vschemapb.Keyspace{ Sharded: true, Tables: map[string]*vschemapb.Table{ - "t1": { + tableName: { ColumnVindexes: []*vschemapb.ColumnVindex{ { Name: "xxhash", @@ -673,17 +699,17 @@ func TestShardedAutoIncHandling(t *testing.T) { }, }, expectQueries: []string{ - t1DDL, // Unchanged + tableDDL, // Unchanged }, }, { name: "remove", - globalKeyspace: "sourceks", + globalKeyspace: ms.SourceKeyspace, targetShards: []string{"-80", "80-"}, targetVSchema: &vschemapb.Keyspace{ Sharded: true, Tables: map[string]*vschemapb.Table{ - "t1": { + tableName: { ColumnVindexes: []*vschemapb.ColumnVindex{ { Name: "xxhash", @@ -702,7 +728,7 @@ func TestShardedAutoIncHandling(t *testing.T) { wantTargetVSchema: &vschemapb.Keyspace{ Sharded: true, Tables: map[string]*vschemapb.Table{ - "t1": { + tableName: { ColumnVindexes: []*vschemapb.ColumnVindex{ { Name: "xxhash", @@ -718,20 +744,20 @@ func TestShardedAutoIncHandling(t *testing.T) { }, }, expectQueries: []string{ // auto_increment clause removed - `create table t1 ( + fmt.Sprintf(`create table %s ( id int not null primary key, c1 varchar(10) -)`, +)`, tableName), }, }, { name: "replace, but vschema AutoIncrement already in place", - globalKeyspace: "sourceks", + globalKeyspace: ms.SourceKeyspace, targetShards: []string{"-80", "80-"}, targetVSchema: &vschemapb.Keyspace{ Sharded: true, Tables: map[string]*vschemapb.Table{ - "t1": { + tableName: { ColumnVindexes: []*vschemapb.ColumnVindex{ { Name: "xxhash", @@ -740,7 +766,7 @@ func TestShardedAutoIncHandling(t *testing.T) { }, AutoIncrement: &vschemapb.AutoIncrement{ // AutoIncrement definition exists Column: "id", - Sequence: "t1_non_default_seq_name", + Sequence: fmt.Sprintf("%s_non_default_seq_name", tableName), }, }, }, @@ -754,7 +780,7 @@ func TestShardedAutoIncHandling(t *testing.T) { wantTargetVSchema: &vschemapb.Keyspace{ Sharded: true, Tables: map[string]*vschemapb.Table{ - "t1": { + tableName: { ColumnVindexes: []*vschemapb.ColumnVindex{ { Name: "xxhash", @@ -763,7 +789,7 @@ func TestShardedAutoIncHandling(t *testing.T) { }, AutoIncrement: &vschemapb.AutoIncrement{ // AutoIncrement definition left alone Column: "id", - Sequence: "t1_non_default_seq_name", + Sequence: fmt.Sprintf("%s_non_default_seq_name", tableName), }, }, }, @@ -774,20 +800,20 @@ func TestShardedAutoIncHandling(t *testing.T) { }, }, expectQueries: []string{ // auto_increment clause removed - `create table t1 ( + fmt.Sprintf(`create table %s ( id int not null primary key, c1 varchar(10) -)`, +)`, tableName), }, }, { name: "replace", - globalKeyspace: "sourceks", + globalKeyspace: ms.SourceKeyspace, targetShards: []string{"-80", "80-"}, targetVSchema: &vschemapb.Keyspace{ Sharded: true, Tables: map[string]*vschemapb.Table{ - "t1": { + tableName: { ColumnVindexes: []*vschemapb.ColumnVindex{ { Name: "xxhash", @@ -806,7 +832,7 @@ func TestShardedAutoIncHandling(t *testing.T) { wantTargetVSchema: &vschemapb.Keyspace{ Sharded: true, Tables: map[string]*vschemapb.Table{ - "t1": { + tableName: { ColumnVindexes: []*vschemapb.ColumnVindex{ { Name: "xxhash", @@ -815,7 +841,7 @@ func TestShardedAutoIncHandling(t *testing.T) { }, AutoIncrement: &vschemapb.AutoIncrement{ // AutoIncrement definition added Column: "id", - Sequence: "t1_seq", + Sequence: fmt.Sprintf("%s_seq", tableName), }, }, }, @@ -826,10 +852,10 @@ func TestShardedAutoIncHandling(t *testing.T) { }, }, expectQueries: []string{ // auto_increment clause removed - `create table t1 ( + fmt.Sprintf(`create table %s ( id int not null primary key, c1 varchar(10) -)`, +)`, tableName), }, }, } @@ -864,7 +890,7 @@ func TestShardedAutoIncHandling(t *testing.T) { Workflow: ms.Workflow, SourceKeyspace: ms.SourceKeyspace, TargetKeyspace: ms.TargetKeyspace, - IncludeTables: []string{"t1"}, + IncludeTables: []string{tableName}, WorkflowOptions: &vtctldatapb.WorkflowOptions{ StripShardedAutoIncrement: tc.value, GlobalKeyspace: tc.globalKeyspace, diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 723c7255335..4c047a4b200 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -1497,7 +1497,7 @@ func (ts *trafficSwitcher) getTargetSequenceMetadata(ctx context.Context) (map[s // Try and create the backing sequence tables if we can. globalKeyspace := ts.options.GetGlobalKeyspace() if globalKeyspace == "" { - return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to locate all of the backing sequence tables being used and no keyspace was provided to auto create them in: %s", + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to locate all of the backing sequence tables being used and no global-keyspace was provided to auto create them in: %s", strings.Join(maps.Keys(sequencesByBackingTable), ",")) } shards, err := ts.ws.ts.GetShardNames(ctx, globalKeyspace) @@ -1799,7 +1799,12 @@ func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequen if ierr != nil { return vterrors.Wrapf(vterr, "could not create missing sequence table: %v", err) } - goto initialize + select { + case <-ctx.Done(): + return vterrors.Wrapf(vterr, "could not create missing sequence table: %v", ctx.Err()) + default: + goto initialize + } } // If we actually updated the backing sequence table, then we need // to tell the primary tablet managing the sequence to refresh/reset