Skip to content

Commit

Permalink
fix: Move the check to deploySchema
Browse files Browse the repository at this point in the history
Signed-off-by: Noble Mittal <[email protected]>
  • Loading branch information
beingnoble03 committed Sep 26, 2024
1 parent a9ea332 commit 10ee600
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 46 deletions.
49 changes: 35 additions & 14 deletions go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"vitess.io/vitess/go/vt/schemadiff"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtctl/schematools"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
Expand Down Expand Up @@ -264,6 +263,41 @@ func (mz *materializer) deploySchema() error {
var sourceDDLs map[string]string
var mu sync.Mutex

targetKeyspaceTables, err := getTablesInKeyspace(mz.ctx, mz.sourceTs, mz.tmc, mz.ms.TargetKeyspace)
if err != nil {
return err
}

tables := map[string]struct{}{}
for _, t := range mz.ms.TableSettings {
tables[t.TargetTable] = struct{}{}
}

hasTargetTable := map[string]bool{}
for _, t := range targetKeyspaceTables {
if _, ok := tables[t]; ok {
hasTargetTable[t] = true
}
}

// Check if any table being moved is already non-empty in the target keyspace.
// Skip this check for multi-tenant migrations.
if !mz.IsMultiTenantMigration() {
alreadyExistingTables := make([]string, len(hasTargetTable))
i := 0
for t := range hasTargetTable {
alreadyExistingTables[i] = t
i++
}

if len(alreadyExistingTables) > 0 {
err = validateEmptyTables(mz.ctx, mz.sourceTs, mz.tmc, mz.ms.TargetKeyspace, alreadyExistingTables)
if err != nil {
return err
}
}
}

// Auto-increment columns are typically used with unsharded MySQL tables
// but should not generally be used with sharded ones. Because it's common
// to use MoveTables to move table(s) from an unsharded keyspace to a
Expand All @@ -279,19 +313,6 @@ func (mz *materializer) deploySchema() error {
}

return forAllShards(mz.targetShards, func(target *topo.ShardInfo) error {
allTables := []string{"/.*/"}

hasTargetTable := map[string]bool{}
req := &tabletmanagerdatapb.GetSchemaRequest{Tables: allTables}
targetSchema, err := schematools.GetSchema(mz.ctx, mz.ts, mz.tmc, target.PrimaryAlias, req)
if err != nil {
return err
}

for _, td := range targetSchema.TableDefinitions {
hasTargetTable[td.Name] = true
}

targetTablet, err := mz.ts.GetTablet(mz.ctx, target.PrimaryAlias)
if err != nil {
return err
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vtctl/workflow/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
)

const (
getNonEmptyTable = "(select 't1' from t1 limit 1)"
position = "MySQL56/9d10e6ec-07a0-11ee-ae73-8e53f4cf3083:1-97"
mzSelectFrozenQuery = "select 1 from _vt.vreplication where db_name='vt_targetks' and message='FROZEN' and workflow_sub_type != 1"
mzCheckJournal = "/select val from _vt.resharding_journal where id="
Expand Down Expand Up @@ -518,6 +519,7 @@ func TestMigrateVSchema(t *testing.T) {
env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"})
defer env.close()

env.tmc.expectVRQuery(200, getNonEmptyTable, &sqltypes.Result{})
env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzGetCopyState, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzGetLatestCopyState, &sqltypes.Result{})
Expand Down Expand Up @@ -576,6 +578,7 @@ func TestMoveTablesDDLFlag(t *testing.T) {
// TabletManager. Importing the tabletmanager package, however, causes
// a circular dependency.
// The TabletManager portion is tested in rpc_vreplication_test.go.
env.tmc.expectVRQuery(200, getNonEmptyTable, &sqltypes.Result{})
env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzGetCopyState, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzGetLatestCopyState, &sqltypes.Result{})
Expand Down Expand Up @@ -625,6 +628,7 @@ func TestMoveTablesNoRoutingRules(t *testing.T) {
// TabletManager. Importing the tabletmanager package, however, causes
// a circular dependency.
// The TabletManager portion is tested in rpc_vreplication_test.go.
env.tmc.expectVRQuery(200, getNonEmptyTable, &sqltypes.Result{})
env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzGetCopyState, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzGetLatestCopyState, &sqltypes.Result{})
Expand Down Expand Up @@ -2691,6 +2695,7 @@ func TestKeyRangesEqualOptimization(t *testing.T) {
if tablet.Keyspace != targetKs || tablet.Type != topodatapb.TabletType_PRIMARY {
continue
}
env.tmc.expectVRQuery(int(tablet.Alias.Uid), getNonEmptyTable, &sqltypes.Result{})
// If we are doing a partial MoveTables, we will only perform the workflow
// stream creation / INSERT statment on the shard(s) we're migrating.
if len(tc.moveTablesReq.SourceShards) > 0 && !slices.Contains(tc.moveTablesReq.SourceShards, tablet.Shard) {
Expand Down
23 changes: 0 additions & 23 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1395,29 +1395,6 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
}
s.Logger().Infof("Found tables to move: %s", strings.Join(tables, ","))

// Check if any table being moved is already non-empty in the target keyspace.
// Skip this check for multi-tenant migrations.
if req.GetWorkflowOptions().GetTenantId() == "" {
targetKeyspaceTables, err := getTablesInKeyspace(ctx, sourceTopo, s.tmc, targetKeyspace)
if err != nil {
return nil, err
}

var alreadyExistingTables []string
for _, t := range targetKeyspaceTables {
if slices.Contains(tables, t) {
alreadyExistingTables = append(alreadyExistingTables, t)
}
}

if len(alreadyExistingTables) > 0 {
err = validateEmptyTables(ctx, sourceTopo, s.tmc, targetKeyspace, alreadyExistingTables)
if err != nil {
return nil, err
}
}
}

if !vschema.Sharded {
// Save the original in case we need to restore it for a late failure
// in the defer().
Expand Down
35 changes: 26 additions & 9 deletions go/vt/vtctl/workflow/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,10 @@ func applyTargetShards(ts *trafficSwitcher, targetShards []string) error {
// It queries each shard's primary tablet and if any non-empty table is found, it returns an error
// containing a list of non-empty tables.
func validateEmptyTables(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, keyspace string, tables []string) error {
if len(tables) == 0 {
return nil
}

shards, err := ts.GetServingShards(ctx, keyspace)
if err != nil {
return err
Expand All @@ -1024,8 +1028,16 @@ func validateEmptyTables(ctx context.Context, ts *topo.Server, tmc tmclient.Tabl
return fmt.Errorf("keyspace %s has no shards", keyspace)
}

var selectQueries []string
for _, t := range tables {
selectQueries = append(selectQueries, fmt.Sprintf("(select '%s' from %s limit 1)", t, t))
}
query := strings.Join(selectQueries, "union all")

var mu sync.Mutex
isFaultyTable := map[string]bool{}
for _, shard := range shards {

err = forAllShards(shards, func(shard *topo.ShardInfo) error {
primary := shard.PrimaryAlias
if primary == nil {
return fmt.Errorf("shard does not have a primary: %v", shard.ShardName())
Expand All @@ -1036,27 +1048,32 @@ func validateEmptyTables(ctx context.Context, ts *topo.Server, tmc tmclient.Tabl
return err
}

var selectQueries []string
for _, t := range tables {
selectQueries = append(selectQueries, fmt.Sprintf("(select '%s' from %s limit 1)", t, t))
}
query := strings.Join(selectQueries, "union all")

res, err := tmc.ExecuteFetchAsDba(ctx, ti.Tablet, true, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{
Query: []byte(query),
MaxRows: uint64(len(tables)),
})
if err != nil {
return err
}

mu.Lock()
for _, row := range res.Rows {
isFaultyTable[string(row.Values)] = true
}
mu.Unlock()

return nil
})

if err != nil {
return err
}

var faultyTables []string
faultyTables := make([]string, len(isFaultyTable))
i := 0
for table := range isFaultyTable {
faultyTables = append(faultyTables, table)
faultyTables[i] = table
i++
}

if len(faultyTables) > 0 {
Expand Down
107 changes: 107 additions & 0 deletions go/vt/vtctl/workflow/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"

Expand All @@ -21,6 +22,10 @@ import (
"vitess.io/vitess/go/vt/topo/etcd2topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vtctl/grpcvtctldserver/testutil"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// TestCreateDefaultShardRoutingRules confirms that the default shard routing rules are created correctly for sharded
Expand Down Expand Up @@ -243,3 +248,105 @@ func startEtcd(t *testing.T) string {

return clientAddr
}

func TestValidateEmptyTables(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ts := memorytopo.NewServer(ctx, "zone1")
defer ts.Close()

ks := "test_keyspace"
shard1 := "-80"
shard2 := "80-"
err := ts.CreateKeyspace(ctx, ks, &topodatapb.Keyspace{})
require.NoError(t, err)

err = ts.CreateShard(ctx, ks, shard1)
require.NoError(t, err)
err = ts.CreateShard(ctx, ks, shard2)
require.NoError(t, err)

tablet1 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 100,
},
Keyspace: ks,
Shard: shard1,
}
tablet2 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 200,
},
Keyspace: ks,
Shard: shard2,
}
err = ts.CreateTablet(ctx, tablet1)
require.NoError(t, err)
err = ts.CreateTablet(ctx, tablet2)
require.NoError(t, err)

_, err = ts.UpdateShardFields(ctx, ks, shard1, func(si *topo.ShardInfo) error {
si.Shard.PrimaryAlias = tablet1.Alias
return nil
})
require.NoError(t, err)
_, err = ts.UpdateShardFields(ctx, ks, shard2, func(si *topo.ShardInfo) error {
si.Shard.PrimaryAlias = tablet2.Alias
return nil
})
require.NoError(t, err)

tmc := &testutil.TabletManagerClient{
ExecuteFetchAsDbaResults: map[string]struct {
Response *querypb.QueryResult
Error error
}{
"zone1-0000000100": {
Response: &querypb.QueryResult{
Fields: []*querypb.Field{
{
Name: "table_name",
Type: querypb.Type_VARCHAR,
},
},
Rows: []*querypb.Row{{
Lengths: []int64{6},
Values: []byte("table1"),
}, {
Lengths: []int64{6},
Values: []byte("table2"),
},
},
},
},
"zone1-0000000200": {
Response: &querypb.QueryResult{
Fields: []*querypb.Field{
{
Name: "table_name",
Type: querypb.Type_VARCHAR,
},
},
Rows: []*querypb.Row{{
Lengths: []int64{6},
Values: []byte("table2"),
}, {
Lengths: []int64{6},
Values: []byte("table3"),
},
},
},
},
},
}

err = validateEmptyTables(ctx, ts, tmc, ks, []string{"table1", "table2", "table3", "table4"})
assert.ErrorContains(t, err, "table1")
assert.ErrorContains(t, err, "table2")
assert.ErrorContains(t, err, "table3")

err = validateEmptyTables(ctx, ts, tmc, ks, []string{})
assert.NoError(t, err, "should not throw any error for empty tables slice")
}

0 comments on commit 10ee600

Please sign in to comment.