Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate Workflow: Scope vindex names correctly when target and source keyspace have different names #16769

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 78 additions & 1 deletion go/test/endtoend/vreplication/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ package vreplication

import (
"fmt"
"strings"
"testing"

"github.com/tidwall/gjson"

"vitess.io/vitess/go/test/endtoend/cluster"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -165,7 +168,7 @@ func TestVtctlMigrate(t *testing.T) {
// However now we need to create an external Vitess cluster. For this we need a different VTDATAROOT and
// hence the VTDATAROOT env variable gets overwritten.
// Each time we need to create vt processes in the "other" cluster we need to set the appropriate VTDATAROOT
func TestVtctldMigrate(t *testing.T) {
func TestVtctldMigrateUnsharded(t *testing.T) {
mattlord marked this conversation as resolved.
Show resolved Hide resolved
vc = NewVitessCluster(t, nil)

defaultReplicas = 0
Expand Down Expand Up @@ -299,3 +302,77 @@ func TestVtctldMigrate(t *testing.T) {
require.Errorf(t, err, "there is no vitess cluster named ext1")
})
}

// TestShardedMigrate adds a test for a sharded cluster to validate a fix for a bug where the target keyspace name
// doesn't match that of the source cluster. The test migrates from a cluster with keyspace customer to an "external"
// cluster with keyspace rating.
func TestMigrateSharded(t *testing.T) {
mattlord marked this conversation as resolved.
Show resolved Hide resolved
setSidecarDBName("_vt")
defaultRdonly = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why set this? I don't see where we use it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is that global variable that we have not refactored yet. It is used in creating future clusters in the test. Added code to keep previous values and reset in a defer.

currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
vc = setupCluster(t)
vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
defer vc.TearDown()
setupCustomerKeyspace(t)
createMoveTablesWorkflow(t, "customer,Lead,datze,customer2")
tstWorkflowSwitchReadsAndWrites(t)
tstWorkflowComplete(t)

var err error
// create external cluster
extCell := "extcell1"
extCells := []string{extCell}
extVc := NewVitessCluster(t, &clusterOptions{
cells: extCells,
clusterConfig: externalClusterConfig,
})
defer extVc.TearDown()

setupExtKeyspace(t, extVc, "rating", extCell)
err = cluster.WaitForHealthyShard(extVc.VtctldClient, "rating", "-80")
require.NoError(t, err)
err = cluster.WaitForHealthyShard(extVc.VtctldClient, "rating", "80-")
require.NoError(t, err)
verifyClusterHealth(t, extVc)
extVtgateConn := getConnection(t, extVc.ClusterConfig.hostname, extVc.ClusterConfig.vtgateMySQLPort)
defer extVtgateConn.Close()

currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Migrate
var output string
if output, err = extVc.VtctldClient.ExecuteCommandWithOutput("Mount", "register", "--name=external", "--topo-type=etcd2",
fmt.Sprintf("--topo-server=localhost:%d", vc.ClusterConfig.topoPort), "--topo-root=/vitess/global"); err != nil {
t.Fatalf("Mount command failed with %+v : %s\n", err, output)
}
ksWorkflow := "rating.e1"
if output, err = extVc.VtctldClient.ExecuteCommandWithOutput("Migrate",
"--target-keyspace", "rating", "--workflow", "e1",
"create", "--source-keyspace", "customer", "--mount-name", "external", "--all-tables", "--cells=zone1",
"--tablet-types=primary,replica"); err != nil {
t.Fatalf("Migrate command failed with %+v : %s\n", err, output)
mattlord marked this conversation as resolved.
Show resolved Hide resolved
}
waitForWorkflowState(t, extVc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
// this is because currently doVtctldclientVDiff is using the global vc :-( and we want to run a diff on the extVc cluster
vc = extVc
doVtctldclientVDiff(t, "rating", "e1", "zone1", nil)
}

func setupExtKeyspace(t *testing.T, vc *VitessCluster, ksName, cellName string) {
rdonly := 0
mattlord marked this conversation as resolved.
Show resolved Hide resolved
shards := []string{"-80", "80-"}
if _, err := vc.AddKeyspace(t, []*Cell{vc.Cells[cellName]}, ksName, strings.Join(shards, ","),
customerVSchema, customerSchema, defaultReplicas, rdonly, 1200, nil); err != nil {
t.Fatal(err)
}
vtgate := vc.Cells[cellName].Vtgates[0]
for _, shard := range shards {
err := cluster.WaitForHealthyShard(vc.VtctldClient, ksName, shard)
require.NoError(t, err)
if defaultReplicas > 0 {
require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", ksName, shard), defaultReplicas, waitTimeout))
}
if rdonly > 0 {
require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.rdonly", ksName, shard), defaultRdonly, waitTimeout))
}
}
}
15 changes: 14 additions & 1 deletion go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,20 @@ func (mz *materializer) generateBinlogSources(ctx context.Context, targetShard *
for _, mappedCol := range mappedCols {
subExprs = append(subExprs, mappedCol)
}
vindexName := fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name)
var vindexName string
if mz.workflowType == binlogdatapb.VReplicationWorkflowType_Migrate {
// For a Migrate, if the TargetKeyspace name is different from the SourceKeyspace name, we need to use the
// SourceKeyspace name to determine the vindex since the TargetKeyspace name is not known to the source.
// Note: it is expected that the source and target keyspaces have the same vindex name and data type.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should check and confirm this expectation and error out if it's not true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is assumed that the vschemas and tables etc are identical on source and target for Migrate. We can/should add all vaidations for this and all other workflow types in another PR. I don't want to add that to this PR.

keyspace := mz.ms.TargetKeyspace
if mz.ms.ExternalCluster != "" {
keyspace = mz.ms.SourceKeyspace
}
vindexName = fmt.Sprintf("%s.%s", keyspace, cv.Name)
} else {
vindexName = fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name)
}

subExprs = append(subExprs, sqlparser.NewStrLiteral(vindexName))
subExprs = append(subExprs, sqlparser.NewStrLiteral(key.KeyRangeString(targetShard.KeyRange)))
inKeyRange := &sqlparser.FuncExpr{
Expand Down
9 changes: 6 additions & 3 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2134,7 +2134,6 @@ func (s *Server) WorkflowStatus(ctx context.Context, req *vtctldatapb.WorkflowSt
if err != nil {
return nil, err
}

// The stream key is target keyspace/tablet alias, e.g. 0/test-0000000100.
// We sort the keys for intuitive and consistent output.
streamKeys := make([]string, 0, len(workflow.ShardStreams))
Expand Down Expand Up @@ -2190,9 +2189,13 @@ func (s *Server) WorkflowStatus(ctx context.Context, req *vtctldatapb.WorkflowSt
return resp, nil
}

// GetCopyProgress returns the progress of all tables being copied in the
// workflow.
// GetCopyProgress returns the progress of all tables being copied in the workflow.
func (s *Server) GetCopyProgress(ctx context.Context, ts *trafficSwitcher, state *State) (*copyProgress, error) {
if ts.workflowType == binlogdatapb.VReplicationWorkflowType_Migrate {
// The logic below expects the source primaries to be in the same cluster as the target.
// For now we don't report progress for Migrate workflows.
return nil, nil
}
getTablesQuery := "select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = %d"
getRowCountQuery := "select table_name, table_rows, data_length from information_schema.tables where table_schema = %s and table_name in (%s)"
tables := make(map[string]bool)
Expand Down
14 changes: 13 additions & 1 deletion go/vt/wrangler/materializer.go
mattlord marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -1398,7 +1398,19 @@ func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*top
for _, mappedCol := range mappedCols {
subExprs = append(subExprs, mappedCol)
}
vindexName := fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name)
var vindexName string
if mz.getWorkflowType() == binlogdatapb.VReplicationWorkflowType_Migrate {
// For a Migrate, if the TargetKeyspace name is different from the SourceKeyspace name, we need to use the
// SourceKeyspace name to determine the vindex since the TargetKeyspace name is not known to the source.
// Note: it is expected that the source and target keyspaces have the same vindex name and data type.
keyspace := mz.ms.TargetKeyspace
if mz.ms.ExternalCluster != "" {
keyspace = mz.ms.SourceKeyspace
}
vindexName = fmt.Sprintf("%s.%s", keyspace, cv.Name)
} else {
vindexName = fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name)
}
subExprs = append(subExprs, sqlparser.NewStrLiteral(vindexName))
subExprs = append(subExprs, sqlparser.NewStrLiteral("{{.keyrange}}"))
inKeyRange := &sqlparser.FuncExpr{
Expand Down
2 changes: 1 addition & 1 deletion test/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -1042,7 +1042,7 @@
},
"vreplication_materialize": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMaterialize"],
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "Materialize"],
"Command": [],
"Manual": false,
"Shard": "vreplication_partial_movetables_and_materialize",
Expand Down
Loading