diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index f4fb4a354fb..13902d919d0 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1395,6 +1395,29 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl } s.Logger().Infof("Found tables to move: %s", strings.Join(tables, ",")) + // Check if any table being moved is already non-empty in the target keyspace. + // Skip this check for multi-tenant migrations. + if req.GetWorkflowOptions().GetTenantId() == "" { + targetKeyspaceTables, err := getTablesInKeyspace(ctx, sourceTopo, s.tmc, targetKeyspace) + if err != nil { + return nil, err + } + + var alreadyExistingTables []string + for _, t := range targetKeyspaceTables { + if slices.Contains(tables, t) { + alreadyExistingTables = append(alreadyExistingTables, t) + } + } + + if len(alreadyExistingTables) > 0 { + err = validateEmptyTables(ctx, sourceTopo, s.tmc, targetKeyspace, alreadyExistingTables) + if err != nil { + return nil, err + } + } + } + if !vschema.Sharded { // Save the original in case we need to restore it for a late failure // in the defer(). diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go index 2337fdbc747..126e0398145 100644 --- a/go/vt/vtctl/workflow/utils.go +++ b/go/vt/vtctl/workflow/utils.go @@ -1011,3 +1011,56 @@ func applyTargetShards(ts *trafficSwitcher, targetShards []string) error { } return nil } + +// validateEmptyTables checks if all specified tables in the keyspace are empty across all shards. +// It queries each shard's primary tablet and if any non-empty table is found, it returns an error +// containing a list of non-empty tables. +func validateEmptyTables(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, keyspace string, tables []string) error { + shards, err := ts.GetServingShards(ctx, keyspace) + if err != nil { + return err + } + if len(shards) == 0 { + return fmt.Errorf("keyspace %s has no shards", keyspace) + } + + isFaultyTable := map[string]bool{} + for _, shard := range shards { + primary := shard.PrimaryAlias + if primary == nil { + return fmt.Errorf("shard does not have a primary: %v", shard.ShardName()) + } + + ti, err := ts.GetTablet(ctx, primary) + if err != nil { + return err + } + + var selectQueries []string + for _, t := range tables { + selectQueries = append(selectQueries, fmt.Sprintf("(select '%s' from %s limit 1)", t, t)) + } + query := strings.Join(selectQueries, "union all") + + res, err := tmc.ExecuteFetchAsDba(ctx, ti.Tablet, true, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{ + Query: []byte(query), + MaxRows: uint64(len(tables)), + }) + if err != nil { + return err + } + for _, row := range res.Rows { + isFaultyTable[string(row.Values)] = true + } + } + + var faultyTables []string + for table := range isFaultyTable { + faultyTables = append(faultyTables, table) + } + + if len(faultyTables) > 0 { + return fmt.Errorf("target keyspace contains following non-empty table(s): %s", strings.Join(faultyTables, ", ")) + } + return nil +}