Skip to content

Commit

Permalink
Changes from self review
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Sep 30, 2024
1 parent b110120 commit 6fc7fdc
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
"github.com/spf13/cobra"

"vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/common"
"vitess.io/vitess/go/vt/proto/vtctldata"
"vitess.io/vitess/go/vt/topo/topoproto"

vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

var (
Expand Down Expand Up @@ -56,7 +57,7 @@ func registerCommands(root *cobra.Command) {
create.Flags().StringVar(&createOptions.WorkflowOptions.TenantId, "tenant-id", "", "(EXPERIMENTAL: Multi-tenant migrations only) The tenant ID to use for the MoveTables workflow into a multi-tenant keyspace.")
create.Flags().StringSliceVar(&createOptions.WorkflowOptions.Shards, "shards", nil, "(EXPERIMENTAL: Multi-tenant migrations only) Specify that vreplication streams should only be created on this subset of target shards. Warning: you should first ensure that all rows on the source route to the specified subset of target shards using your VIndex of choice or you could lose data during the migration.")
create.Flags().StringVar(&createOptions.WorkflowOptions.GlobalKeyspace, "global-keyspace", "", "If specified, then attempt to create any global resources here such as sequence tables needed to replace auto_increment table clauses that are removed due to --remove-sharded-auto-increment=REPLACE. The value must be an unsharded keyspace that already exists.")
create.Flags().StringVar(&createOptions.StripShardedAutoIncrement, "remove-sharded-auto-increment", vtctldata.ShardedAutoIncrementHandling_REMOVE.String(),
create.Flags().StringVar(&createOptions.StripShardedAutoIncrement, "remove-sharded-auto-increment", vtctldatapb.ShardedAutoIncrementHandling_REMOVE.String(),
fmt.Sprintf("If moving the table(s) to a sharded keyspace, remove any MySQL auto_increment clauses when copying the schema to the target as sharded keyspaces should rely on either user/application generated values or Vitess sequences to ensure uniqueness. If REPLACE is specified then they are automatically replaced by Vitess sequence definitions. (options are: %s)",
stripShardedAutoIncOptions))
base.AddCommand(create)
Expand Down Expand Up @@ -110,7 +111,7 @@ func init() {
common.RegisterCommandHandler("MoveTables", registerCommands)

sb := strings.Builder{}
for i, v := range vtctldata.ShardedAutoIncrementHandling_name {
for i, v := range vtctldatapb.ShardedAutoIncrementHandling_name {
if i > 0 {
sb.WriteByte(',')
}
Expand Down
80 changes: 53 additions & 27 deletions go/vt/vtctl/workflow/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,15 +602,16 @@ func TestMoveTablesDDLFlag(t *testing.T) {
}

func TestShardedAutoIncHandling(t *testing.T) {
t1DDL := "create table t1 (id int not null auto_increment primary key, c1 varchar(10))"
tableName := "t1"
tableDDL := fmt.Sprintf("create table %s (id int not null auto_increment primary key, c1 varchar(10))", tableName)
ms := &vtctldatapb.MaterializeSettings{
Workflow: "workflow",
SourceKeyspace: "sourceks",
TargetKeyspace: "targetks",
TableSettings: []*vtctldatapb.TableMaterializeSettings{{
TargetTable: "t1",
CreateDdl: t1DDL,
SourceExpression: "select * from t1",
TargetTable: tableName,
CreateDdl: tableDDL,
SourceExpression: fmt.Sprintf("select * from %s", tableName),
}},
WorkflowOptions: &vtctldatapb.WorkflowOptions{},
}
Expand All @@ -631,14 +632,39 @@ func TestShardedAutoIncHandling(t *testing.T) {
globalKeyspace: "foo",
expectErr: "global-keyspace foo does not exist",
},
{
name: "global keyspace is sharded",
globalKeyspace: ms.TargetKeyspace,
targetShards: []string{"-80", "80-"},
targetVSchema: &vschemapb.Keyspace{
Sharded: true,
Tables: map[string]*vschemapb.Table{
tableName: {
ColumnVindexes: []*vschemapb.ColumnVindex{
{
Name: "xxhash",
Column: "id",
},
},
},
},
Vindexes: map[string]*vschemapb.Vindex{
"xxhash": {
Type: "xxhash",
},
},
},
expectErr: fmt.Sprintf("global-keyspace %s is sharded and thus cannot be used for global resources",
ms.TargetKeyspace),
},
{
name: "leave",
globalKeyspace: "sourceks",
globalKeyspace: ms.SourceKeyspace,
targetShards: []string{"-80", "80-"},
targetVSchema: &vschemapb.Keyspace{
Sharded: true,
Tables: map[string]*vschemapb.Table{
"t1": {
tableName: {
ColumnVindexes: []*vschemapb.ColumnVindex{
{
Name: "xxhash",
Expand All @@ -657,7 +683,7 @@ func TestShardedAutoIncHandling(t *testing.T) {
wantTargetVSchema: &vschemapb.Keyspace{
Sharded: true,
Tables: map[string]*vschemapb.Table{
"t1": {
tableName: {
ColumnVindexes: []*vschemapb.ColumnVindex{
{
Name: "xxhash",
Expand All @@ -673,17 +699,17 @@ func TestShardedAutoIncHandling(t *testing.T) {
},
},
expectQueries: []string{
t1DDL, // Unchanged
tableDDL, // Unchanged
},
},
{
name: "remove",
globalKeyspace: "sourceks",
globalKeyspace: ms.SourceKeyspace,
targetShards: []string{"-80", "80-"},
targetVSchema: &vschemapb.Keyspace{
Sharded: true,
Tables: map[string]*vschemapb.Table{
"t1": {
tableName: {
ColumnVindexes: []*vschemapb.ColumnVindex{
{
Name: "xxhash",
Expand All @@ -702,7 +728,7 @@ func TestShardedAutoIncHandling(t *testing.T) {
wantTargetVSchema: &vschemapb.Keyspace{
Sharded: true,
Tables: map[string]*vschemapb.Table{
"t1": {
tableName: {
ColumnVindexes: []*vschemapb.ColumnVindex{
{
Name: "xxhash",
Expand All @@ -718,20 +744,20 @@ func TestShardedAutoIncHandling(t *testing.T) {
},
},
expectQueries: []string{ // auto_increment clause removed
`create table t1 (
fmt.Sprintf(`create table %s (
id int not null primary key,
c1 varchar(10)
)`,
)`, tableName),
},
},
{
name: "replace, but vschema AutoIncrement already in place",
globalKeyspace: "sourceks",
globalKeyspace: ms.SourceKeyspace,
targetShards: []string{"-80", "80-"},
targetVSchema: &vschemapb.Keyspace{
Sharded: true,
Tables: map[string]*vschemapb.Table{
"t1": {
tableName: {
ColumnVindexes: []*vschemapb.ColumnVindex{
{
Name: "xxhash",
Expand All @@ -740,7 +766,7 @@ func TestShardedAutoIncHandling(t *testing.T) {
},
AutoIncrement: &vschemapb.AutoIncrement{ // AutoIncrement definition exists
Column: "id",
Sequence: "t1_non_default_seq_name",
Sequence: fmt.Sprintf("%s_non_default_seq_name", tableName),
},
},
},
Expand All @@ -754,7 +780,7 @@ func TestShardedAutoIncHandling(t *testing.T) {
wantTargetVSchema: &vschemapb.Keyspace{
Sharded: true,
Tables: map[string]*vschemapb.Table{
"t1": {
tableName: {
ColumnVindexes: []*vschemapb.ColumnVindex{
{
Name: "xxhash",
Expand All @@ -763,7 +789,7 @@ func TestShardedAutoIncHandling(t *testing.T) {
},
AutoIncrement: &vschemapb.AutoIncrement{ // AutoIncrement definition left alone
Column: "id",
Sequence: "t1_non_default_seq_name",
Sequence: fmt.Sprintf("%s_non_default_seq_name", tableName),
},
},
},
Expand All @@ -774,20 +800,20 @@ func TestShardedAutoIncHandling(t *testing.T) {
},
},
expectQueries: []string{ // auto_increment clause removed
`create table t1 (
fmt.Sprintf(`create table %s (
id int not null primary key,
c1 varchar(10)
)`,
)`, tableName),
},
},
{
name: "replace",
globalKeyspace: "sourceks",
globalKeyspace: ms.SourceKeyspace,
targetShards: []string{"-80", "80-"},
targetVSchema: &vschemapb.Keyspace{
Sharded: true,
Tables: map[string]*vschemapb.Table{
"t1": {
tableName: {
ColumnVindexes: []*vschemapb.ColumnVindex{
{
Name: "xxhash",
Expand All @@ -806,7 +832,7 @@ func TestShardedAutoIncHandling(t *testing.T) {
wantTargetVSchema: &vschemapb.Keyspace{
Sharded: true,
Tables: map[string]*vschemapb.Table{
"t1": {
tableName: {
ColumnVindexes: []*vschemapb.ColumnVindex{
{
Name: "xxhash",
Expand All @@ -815,7 +841,7 @@ func TestShardedAutoIncHandling(t *testing.T) {
},
AutoIncrement: &vschemapb.AutoIncrement{ // AutoIncrement definition added
Column: "id",
Sequence: "t1_seq",
Sequence: fmt.Sprintf("%s_seq", tableName),
},
},
},
Expand All @@ -826,10 +852,10 @@ func TestShardedAutoIncHandling(t *testing.T) {
},
},
expectQueries: []string{ // auto_increment clause removed
`create table t1 (
fmt.Sprintf(`create table %s (
id int not null primary key,
c1 varchar(10)
)`,
)`, tableName),
},
},
}
Expand Down Expand Up @@ -864,7 +890,7 @@ func TestShardedAutoIncHandling(t *testing.T) {
Workflow: ms.Workflow,
SourceKeyspace: ms.SourceKeyspace,
TargetKeyspace: ms.TargetKeyspace,
IncludeTables: []string{"t1"},
IncludeTables: []string{tableName},
WorkflowOptions: &vtctldatapb.WorkflowOptions{
StripShardedAutoIncrement: tc.value,
GlobalKeyspace: tc.globalKeyspace,
Expand Down
9 changes: 7 additions & 2 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1497,7 +1497,7 @@ func (ts *trafficSwitcher) getTargetSequenceMetadata(ctx context.Context) (map[s
// Try and create the backing sequence tables if we can.
globalKeyspace := ts.options.GetGlobalKeyspace()
if globalKeyspace == "" {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to locate all of the backing sequence tables being used and no keyspace was provided to auto create them in: %s",
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to locate all of the backing sequence tables being used and no global-keyspace was provided to auto create them in: %s",
strings.Join(maps.Keys(sequencesByBackingTable), ","))
}
shards, err := ts.ws.ts.GetShardNames(ctx, globalKeyspace)
Expand Down Expand Up @@ -1799,7 +1799,12 @@ func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequen
if ierr != nil {
return vterrors.Wrapf(vterr, "could not create missing sequence table: %v", err)
}
goto initialize
select {
case <-ctx.Done():
return vterrors.Wrapf(vterr, "could not create missing sequence table: %v", ctx.Err())
default:
goto initialize
}
}
// If we actually updated the backing sequence table, then we need
// to tell the primary tablet managing the sequence to refresh/reset
Expand Down

0 comments on commit 6fc7fdc

Please sign in to comment.