From eb90501de32b8d95f9b96e94019456512ae06b66 Mon Sep 17 00:00:00 2001 From: "vitess-bot[bot]" <108069721+vitess-bot[bot]@users.noreply.github.com> Date: Sat, 21 Sep 2024 13:12:04 +0200 Subject: [PATCH] Cherry-pick c3bbce2edd6ed24ad6d2b5a2f39966a0461cc2b0 with conflicts --- go/test/endtoend/vreplication/migrate_test.go | 96 +++++++++++++++++++ go/vt/vtctl/workflow/materializer.go | 19 ++++ go/vt/vtctl/workflow/server.go | 9 +- go/vt/wrangler/materializer.go | 18 ++++ test/config.json | 2 +- 5 files changed, 140 insertions(+), 4 deletions(-) diff --git a/go/test/endtoend/vreplication/migrate_test.go b/go/test/endtoend/vreplication/migrate_test.go index 75ab6a3151b..27f435680d6 100644 --- a/go/test/endtoend/vreplication/migrate_test.go +++ b/go/test/endtoend/vreplication/migrate_test.go @@ -18,10 +18,13 @@ package vreplication import ( "fmt" + "strings" "testing" "github.com/tidwall/gjson" + "vitess.io/vitess/go/test/endtoend/cluster" + "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" @@ -174,6 +177,7 @@ func TestVtctlMigrate(t *testing.T) { // However now we need to create an external Vitess cluster. For this we need a different VTDATAROOT and // hence the VTDATAROOT env variable gets overwritten. // Each time we need to create vt processes in the "other" cluster we need to set the appropriate VTDATAROOT +<<<<<<< HEAD func TestVtctldMigrate(t *testing.T) { defaultCellName := "zone1" cells := []string{"zone1"} @@ -184,6 +188,21 @@ func TestVtctldMigrate(t *testing.T) { defaultReplicas = 0 defaultRdonly = 0 defer vc.TearDown(t) +======= +func TestVtctldMigrateUnsharded(t *testing.T) { + vc = NewVitessCluster(t, nil) + + oldDefaultReplicas := defaultReplicas + oldDefaultRdonly := defaultRdonly + defaultReplicas = 0 + defaultRdonly = 0 + defer func() { + defaultReplicas = oldDefaultReplicas + defaultRdonly = oldDefaultRdonly + }() + + defer vc.TearDown() +>>>>>>> c3bbce2edd (Migrate Workflow: Scope vindex names correctly when target and source keyspace have different names (#16769)) defaultCell = vc.Cells[defaultCellName] _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "product", "0", @@ -314,3 +333,80 @@ func TestVtctldMigrate(t *testing.T) { require.Errorf(t, err, "there is no vitess cluster named ext1") }) } + +// TestVtctldMigrate adds a test for a sharded cluster to validate a fix for a bug where the target keyspace name +// doesn't match that of the source cluster. The test migrates from a cluster with keyspace customer to an "external" +// cluster with keyspace rating. +func TestVtctldMigrateSharded(t *testing.T) { + oldDefaultReplicas := defaultReplicas + oldDefaultRdonly := defaultRdonly + defaultReplicas = 1 + defaultRdonly = 1 + defer func() { + defaultReplicas = oldDefaultReplicas + defaultRdonly = oldDefaultRdonly + }() + + setSidecarDBName("_vt") + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables + vc = setupCluster(t) + vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + defer vc.TearDown() + setupCustomerKeyspace(t) + createMoveTablesWorkflow(t, "customer,Lead,datze,customer2") + tstWorkflowSwitchReadsAndWrites(t) + tstWorkflowComplete(t) + + var err error + // create external cluster + extCell := "extcell1" + extCells := []string{extCell} + extVc := NewVitessCluster(t, &clusterOptions{ + cells: extCells, + clusterConfig: externalClusterConfig, + }) + defer extVc.TearDown() + + setupExtKeyspace(t, extVc, "rating", extCell) + err = cluster.WaitForHealthyShard(extVc.VtctldClient, "rating", "-80") + require.NoError(t, err) + err = cluster.WaitForHealthyShard(extVc.VtctldClient, "rating", "80-") + require.NoError(t, err) + verifyClusterHealth(t, extVc) + extVtgateConn := getConnection(t, extVc.ClusterConfig.hostname, extVc.ClusterConfig.vtgateMySQLPort) + defer extVtgateConn.Close() + + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Migrate + var output string + if output, err = extVc.VtctldClient.ExecuteCommandWithOutput("Mount", "register", "--name=external", "--topo-type=etcd2", + fmt.Sprintf("--topo-server=localhost:%d", vc.ClusterConfig.topoPort), "--topo-root=/vitess/global"); err != nil { + require.FailNow(t, "Mount command failed with %+v : %s\n", err, output) + } + ksWorkflow := "rating.e1" + if output, err = extVc.VtctldClient.ExecuteCommandWithOutput("Migrate", + "--target-keyspace", "rating", "--workflow", "e1", + "create", "--source-keyspace", "customer", "--mount-name", "external", "--all-tables", "--cells=zone1", + "--tablet-types=primary,replica"); err != nil { + require.FailNow(t, "Migrate command failed with %+v : %s\n", err, output) + } + waitForWorkflowState(t, extVc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) + // this is because currently doVtctldclientVDiff is using the global vc :-( and we want to run a diff on the extVc cluster + vc = extVc + doVtctldclientVDiff(t, "rating", "e1", "zone1", nil) +} + +func setupExtKeyspace(t *testing.T, vc *VitessCluster, ksName, cellName string) { + numReplicas := 1 + shards := []string{"-80", "80-"} + if _, err := vc.AddKeyspace(t, []*Cell{vc.Cells[cellName]}, ksName, strings.Join(shards, ","), + customerVSchema, customerSchema, numReplicas, 0, 1200, nil); err != nil { + t.Fatal(err) + } + vtgate := vc.Cells[cellName].Vtgates[0] + for _, shard := range shards { + err := cluster.WaitForHealthyShard(vc.VtctldClient, ksName, shard) + require.NoError(t, err) + require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", ksName, shard), numReplicas, waitTimeout)) + } +} diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index 152409540c8..dba3fe19823 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -301,9 +301,28 @@ func (mz *materializer) generateBinlogSources(ctx context.Context, targetShard * for _, mappedCol := range mappedCols { subExprs = append(subExprs, &sqlparser.AliasedExpr{Expr: mappedCol}) } +<<<<<<< HEAD vindexName := fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name) subExprs = append(subExprs, &sqlparser.AliasedExpr{Expr: sqlparser.NewStrLiteral(vindexName)}) subExprs = append(subExprs, &sqlparser.AliasedExpr{Expr: sqlparser.NewStrLiteral(key.KeyRangeString(targetShard.KeyRange))}) +======= + var vindexName string + if mz.workflowType == binlogdatapb.VReplicationWorkflowType_Migrate { + // For a Migrate, if the TargetKeyspace name is different from the SourceKeyspace name, we need to use the + // SourceKeyspace name to determine the vindex since the TargetKeyspace name is not known to the source. + // Note: it is expected that the source and target keyspaces have the same vindex name and data type. + keyspace := mz.ms.TargetKeyspace + if mz.ms.ExternalCluster != "" { + keyspace = mz.ms.SourceKeyspace + } + vindexName = fmt.Sprintf("%s.%s", keyspace, cv.Name) + } else { + vindexName = fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name) + } + + subExprs = append(subExprs, sqlparser.NewStrLiteral(vindexName)) + subExprs = append(subExprs, sqlparser.NewStrLiteral(key.KeyRangeString(targetShard.KeyRange))) +>>>>>>> c3bbce2edd (Migrate Workflow: Scope vindex names correctly when target and source keyspace have different names (#16769)) inKeyRange := &sqlparser.FuncExpr{ Name: sqlparser.NewIdentifierCI("in_keyrange"), Exprs: subExprs, diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 43d1f1a2b05..36e98d794ca 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1925,7 +1925,6 @@ func (s *Server) WorkflowStatus(ctx context.Context, req *vtctldatapb.WorkflowSt if err != nil { return nil, err } - // The stream key is target keyspace/tablet alias, e.g. 0/test-0000000100. // We sort the keys for intuitive and consistent output. streamKeys := make([]string, 0, len(workflow.ShardStreams)) @@ -1978,9 +1977,13 @@ func (s *Server) WorkflowStatus(ctx context.Context, req *vtctldatapb.WorkflowSt return resp, nil } -// GetCopyProgress returns the progress of all tables being copied in the -// workflow. +// GetCopyProgress returns the progress of all tables being copied in the workflow. func (s *Server) GetCopyProgress(ctx context.Context, ts *trafficSwitcher, state *State) (*copyProgress, error) { + if ts.workflowType == binlogdatapb.VReplicationWorkflowType_Migrate { + // The logic below expects the source primaries to be in the same cluster as the target. + // For now we don't report progress for Migrate workflows. + return nil, nil + } getTablesQuery := "select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = %d" getRowCountQuery := "select table_name, table_rows, data_length from information_schema.tables where table_schema = %s and table_name in (%s)" tables := make(map[string]bool) diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 0fba424eacd..51f9a284836 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -1370,9 +1370,27 @@ func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*top for _, mappedCol := range mappedCols { subExprs = append(subExprs, &sqlparser.AliasedExpr{Expr: mappedCol}) } +<<<<<<< HEAD vindexName := fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name) subExprs = append(subExprs, &sqlparser.AliasedExpr{Expr: sqlparser.NewStrLiteral(vindexName)}) subExprs = append(subExprs, &sqlparser.AliasedExpr{Expr: sqlparser.NewStrLiteral("{{.keyrange}}")}) +======= + var vindexName string + if mz.getWorkflowType() == binlogdatapb.VReplicationWorkflowType_Migrate { + // For a Migrate, if the TargetKeyspace name is different from the SourceKeyspace name, we need to use the + // SourceKeyspace name to determine the vindex since the TargetKeyspace name is not known to the source. + // Note: it is expected that the source and target keyspaces have the same vindex name and data type. + keyspace := mz.ms.TargetKeyspace + if mz.ms.ExternalCluster != "" { + keyspace = mz.ms.SourceKeyspace + } + vindexName = fmt.Sprintf("%s.%s", keyspace, cv.Name) + } else { + vindexName = fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name) + } + subExprs = append(subExprs, sqlparser.NewStrLiteral(vindexName)) + subExprs = append(subExprs, sqlparser.NewStrLiteral("{{.keyrange}}")) +>>>>>>> c3bbce2edd (Migrate Workflow: Scope vindex names correctly when target and source keyspace have different names (#16769)) inKeyRange := &sqlparser.FuncExpr{ Name: sqlparser.NewIdentifierCI("in_keyrange"), Exprs: subExprs, diff --git a/test/config.json b/test/config.json index 341693f3628..f25c3175186 100644 --- a/test/config.json +++ b/test/config.json @@ -1015,7 +1015,7 @@ }, "vreplication_materialize": { "File": "unused.go", - "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMaterialize"], + "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "Materialize"], "Command": [], "Manual": false, "Shard": "vreplication_multicell",