diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index 3d0ca674e02..28140203403 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -34,7 +34,6 @@ import ( "vitess.io/vitess/go/vt/schemadiff" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/vt/vtctl/schematools" "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/vindexes" @@ -264,6 +263,41 @@ func (mz *materializer) deploySchema() error { var sourceDDLs map[string]string var mu sync.Mutex + targetKeyspaceTables, err := getTablesInKeyspace(mz.ctx, mz.sourceTs, mz.tmc, mz.ms.TargetKeyspace) + if err != nil { + return err + } + + tables := map[string]struct{}{} + for _, t := range mz.ms.TableSettings { + tables[t.TargetTable] = struct{}{} + } + + hasTargetTable := map[string]bool{} + for _, t := range targetKeyspaceTables { + if _, ok := tables[t]; ok { + hasTargetTable[t] = true + } + } + + // Check if any table being moved is already non-empty in the target keyspace. + // Skip this check for multi-tenant migrations. + if !mz.IsMultiTenantMigration() { + alreadyExistingTables := make([]string, len(hasTargetTable)) + i := 0 + for t := range hasTargetTable { + alreadyExistingTables[i] = t + i++ + } + + if len(alreadyExistingTables) > 0 { + err = validateEmptyTables(mz.ctx, mz.sourceTs, mz.tmc, mz.ms.TargetKeyspace, alreadyExistingTables) + if err != nil { + return err + } + } + } + // Auto-increment columns are typically used with unsharded MySQL tables // but should not generally be used with sharded ones. Because it's common // to use MoveTables to move table(s) from an unsharded keyspace to a @@ -279,19 +313,6 @@ func (mz *materializer) deploySchema() error { } return forAllShards(mz.targetShards, func(target *topo.ShardInfo) error { - allTables := []string{"/.*/"} - - hasTargetTable := map[string]bool{} - req := &tabletmanagerdatapb.GetSchemaRequest{Tables: allTables} - targetSchema, err := schematools.GetSchema(mz.ctx, mz.ts, mz.tmc, target.PrimaryAlias, req) - if err != nil { - return err - } - - for _, td := range targetSchema.TableDefinitions { - hasTargetTable[td.Name] = true - } - targetTablet, err := mz.ts.GetTablet(mz.ctx, target.PrimaryAlias) if err != nil { return err diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index 763dd7c04d3..81e4145bcb4 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -44,6 +44,7 @@ import ( ) const ( + getNonEmptyTable = "(select 't1' from t1 limit 1)" position = "MySQL56/9d10e6ec-07a0-11ee-ae73-8e53f4cf3083:1-97" mzSelectFrozenQuery = "select 1 from _vt.vreplication where db_name='vt_targetks' and message='FROZEN' and workflow_sub_type != 1" mzCheckJournal = "/select val from _vt.resharding_journal where id=" @@ -518,6 +519,7 @@ func TestMigrateVSchema(t *testing.T) { env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) defer env.close() + env.tmc.expectVRQuery(200, getNonEmptyTable, &sqltypes.Result{}) env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{}) env.tmc.expectVRQuery(200, mzGetCopyState, &sqltypes.Result{}) env.tmc.expectVRQuery(200, mzGetLatestCopyState, &sqltypes.Result{}) @@ -576,6 +578,7 @@ func TestMoveTablesDDLFlag(t *testing.T) { // TabletManager. Importing the tabletmanager package, however, causes // a circular dependency. // The TabletManager portion is tested in rpc_vreplication_test.go. + env.tmc.expectVRQuery(200, getNonEmptyTable, &sqltypes.Result{}) env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{}) env.tmc.expectVRQuery(200, mzGetCopyState, &sqltypes.Result{}) env.tmc.expectVRQuery(200, mzGetLatestCopyState, &sqltypes.Result{}) @@ -625,6 +628,7 @@ func TestMoveTablesNoRoutingRules(t *testing.T) { // TabletManager. Importing the tabletmanager package, however, causes // a circular dependency. // The TabletManager portion is tested in rpc_vreplication_test.go. + env.tmc.expectVRQuery(200, getNonEmptyTable, &sqltypes.Result{}) env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{}) env.tmc.expectVRQuery(200, mzGetCopyState, &sqltypes.Result{}) env.tmc.expectVRQuery(200, mzGetLatestCopyState, &sqltypes.Result{}) @@ -2691,6 +2695,7 @@ func TestKeyRangesEqualOptimization(t *testing.T) { if tablet.Keyspace != targetKs || tablet.Type != topodatapb.TabletType_PRIMARY { continue } + env.tmc.expectVRQuery(int(tablet.Alias.Uid), getNonEmptyTable, &sqltypes.Result{}) // If we are doing a partial MoveTables, we will only perform the workflow // stream creation / INSERT statment on the shard(s) we're migrating. if len(tc.moveTablesReq.SourceShards) > 0 && !slices.Contains(tc.moveTablesReq.SourceShards, tablet.Shard) { diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 13902d919d0..f4fb4a354fb 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1395,29 +1395,6 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl } s.Logger().Infof("Found tables to move: %s", strings.Join(tables, ",")) - // Check if any table being moved is already non-empty in the target keyspace. - // Skip this check for multi-tenant migrations. - if req.GetWorkflowOptions().GetTenantId() == "" { - targetKeyspaceTables, err := getTablesInKeyspace(ctx, sourceTopo, s.tmc, targetKeyspace) - if err != nil { - return nil, err - } - - var alreadyExistingTables []string - for _, t := range targetKeyspaceTables { - if slices.Contains(tables, t) { - alreadyExistingTables = append(alreadyExistingTables, t) - } - } - - if len(alreadyExistingTables) > 0 { - err = validateEmptyTables(ctx, sourceTopo, s.tmc, targetKeyspace, alreadyExistingTables) - if err != nil { - return nil, err - } - } - } - if !vschema.Sharded { // Save the original in case we need to restore it for a late failure // in the defer(). diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go index 126e0398145..2bcd05e807f 100644 --- a/go/vt/vtctl/workflow/utils.go +++ b/go/vt/vtctl/workflow/utils.go @@ -1016,6 +1016,10 @@ func applyTargetShards(ts *trafficSwitcher, targetShards []string) error { // It queries each shard's primary tablet and if any non-empty table is found, it returns an error // containing a list of non-empty tables. func validateEmptyTables(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, keyspace string, tables []string) error { + if len(tables) == 0 { + return nil + } + shards, err := ts.GetServingShards(ctx, keyspace) if err != nil { return err @@ -1024,8 +1028,16 @@ func validateEmptyTables(ctx context.Context, ts *topo.Server, tmc tmclient.Tabl return fmt.Errorf("keyspace %s has no shards", keyspace) } + var selectQueries []string + for _, t := range tables { + selectQueries = append(selectQueries, fmt.Sprintf("(select '%s' from %s limit 1)", t, t)) + } + query := strings.Join(selectQueries, "union all") + + var mu sync.Mutex isFaultyTable := map[string]bool{} - for _, shard := range shards { + + err = forAllShards(shards, func(shard *topo.ShardInfo) error { primary := shard.PrimaryAlias if primary == nil { return fmt.Errorf("shard does not have a primary: %v", shard.ShardName()) @@ -1036,12 +1048,6 @@ func validateEmptyTables(ctx context.Context, ts *topo.Server, tmc tmclient.Tabl return err } - var selectQueries []string - for _, t := range tables { - selectQueries = append(selectQueries, fmt.Sprintf("(select '%s' from %s limit 1)", t, t)) - } - query := strings.Join(selectQueries, "union all") - res, err := tmc.ExecuteFetchAsDba(ctx, ti.Tablet, true, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{ Query: []byte(query), MaxRows: uint64(len(tables)), @@ -1049,14 +1055,25 @@ func validateEmptyTables(ctx context.Context, ts *topo.Server, tmc tmclient.Tabl if err != nil { return err } + + mu.Lock() for _, row := range res.Rows { isFaultyTable[string(row.Values)] = true } + mu.Unlock() + + return nil + }) + + if err != nil { + return err } - var faultyTables []string + faultyTables := make([]string, len(isFaultyTable)) + i := 0 for table := range isFaultyTable { - faultyTables = append(faultyTables, table) + faultyTables[i] = table + i++ } if len(faultyTables) > 0 { diff --git a/go/vt/vtctl/workflow/utils_test.go b/go/vt/vtctl/workflow/utils_test.go index b315e1aa991..55d51f952ad 100644 --- a/go/vt/vtctl/workflow/utils_test.go +++ b/go/vt/vtctl/workflow/utils_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" @@ -21,6 +22,10 @@ import ( "vitess.io/vitess/go/vt/topo/etcd2topo" "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/topotools" + "vitess.io/vitess/go/vt/vtctl/grpcvtctldserver/testutil" + + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) // TestCreateDefaultShardRoutingRules confirms that the default shard routing rules are created correctly for sharded @@ -243,3 +248,105 @@ func startEtcd(t *testing.T) string { return clientAddr } + +func TestValidateEmptyTables(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ts := memorytopo.NewServer(ctx, "zone1") + defer ts.Close() + + ks := "test_keyspace" + shard1 := "-80" + shard2 := "80-" + err := ts.CreateKeyspace(ctx, ks, &topodatapb.Keyspace{}) + require.NoError(t, err) + + err = ts.CreateShard(ctx, ks, shard1) + require.NoError(t, err) + err = ts.CreateShard(ctx, ks, shard2) + require.NoError(t, err) + + tablet1 := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + Keyspace: ks, + Shard: shard1, + } + tablet2 := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 200, + }, + Keyspace: ks, + Shard: shard2, + } + err = ts.CreateTablet(ctx, tablet1) + require.NoError(t, err) + err = ts.CreateTablet(ctx, tablet2) + require.NoError(t, err) + + _, err = ts.UpdateShardFields(ctx, ks, shard1, func(si *topo.ShardInfo) error { + si.Shard.PrimaryAlias = tablet1.Alias + return nil + }) + require.NoError(t, err) + _, err = ts.UpdateShardFields(ctx, ks, shard2, func(si *topo.ShardInfo) error { + si.Shard.PrimaryAlias = tablet2.Alias + return nil + }) + require.NoError(t, err) + + tmc := &testutil.TabletManagerClient{ + ExecuteFetchAsDbaResults: map[string]struct { + Response *querypb.QueryResult + Error error + }{ + "zone1-0000000100": { + Response: &querypb.QueryResult{ + Fields: []*querypb.Field{ + { + Name: "table_name", + Type: querypb.Type_VARCHAR, + }, + }, + Rows: []*querypb.Row{{ + Lengths: []int64{6}, + Values: []byte("table1"), + }, { + Lengths: []int64{6}, + Values: []byte("table2"), + }, + }, + }, + }, + "zone1-0000000200": { + Response: &querypb.QueryResult{ + Fields: []*querypb.Field{ + { + Name: "table_name", + Type: querypb.Type_VARCHAR, + }, + }, + Rows: []*querypb.Row{{ + Lengths: []int64{6}, + Values: []byte("table2"), + }, { + Lengths: []int64{6}, + Values: []byte("table3"), + }, + }, + }, + }, + }, + } + + err = validateEmptyTables(ctx, ts, tmc, ks, []string{"table1", "table2", "table3", "table4"}) + assert.ErrorContains(t, err, "table1") + assert.ErrorContains(t, err, "table2") + assert.ErrorContains(t, err, "table3") + + err = validateEmptyTables(ctx, ts, tmc, ks, []string{}) + assert.NoError(t, err, "should not throw any error for empty tables slice") +}