From ffa21e7013dcc6ee95b84266634f0b6cfc2a74a7 Mon Sep 17 00:00:00 2001 From: Rohit Nayak <57520317+rohit-nayak-ps@users.noreply.github.com> Date: Mon, 8 Apr 2024 18:34:53 +0200 Subject: [PATCH] VDiff/OnlineDDL: add support for running VDiffs for OnlineDDL migrations (#15546) Signed-off-by: Rohit Nayak --- .../endtoend/cluster/vtctldclient_process.go | 11 ++ go/test/endtoend/vreplication/helper_test.go | 3 +- .../vreplication/vdiff_helper_test.go | 13 +- .../vreplication/vdiff_online_ddl_test.go | 161 ++++++++++++++++++ go/vt/vtctl/workflow/server.go | 34 ++++ .../tabletmanager/vdiff/workflow_differ.go | 5 +- test/config.json | 9 + 7 files changed, 228 insertions(+), 8 deletions(-) create mode 100644 go/test/endtoend/vreplication/vdiff_online_ddl_test.go diff --git a/go/test/endtoend/cluster/vtctldclient_process.go b/go/test/endtoend/cluster/vtctldclient_process.go index 2c6d6028ee0..4ed5acde518 100644 --- a/go/test/endtoend/cluster/vtctldclient_process.go +++ b/go/test/endtoend/cluster/vtctldclient_process.go @@ -298,3 +298,14 @@ func (vtctldclient *VtctldClientProcess) OnlineDDLShowRecent(Keyspace string) (r "recent", ) } + +// OnlineDDLShow responds with recent schema migration list +func (vtctldclient *VtctldClientProcess) OnlineDDLShow(keyspace, workflow string) (result string, err error) { + return vtctldclient.ExecuteCommandWithOutput( + "OnlineDDL", + "show", + "--json", + keyspace, + workflow, + ) +} diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index e187c8398b6..29d8f518638 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -358,7 +358,7 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa log.Infof("Waiting for workflow %q to fully reach %q state", ksWorkflow, wantState) for { output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWorkflow, "show") - require.NoError(t, err) + require.NoError(t, err, output) done = true state := "" result := gjson.Get(output, "ShardStatuses") @@ -522,7 +522,6 @@ func validateDryRunResults(t *testing.T, output string, want []string) { w = strings.TrimSpace(w[1:]) result := strings.HasPrefix(g, w) match = result - //t.Logf("Partial match |%v|%v|%v\n", w, g, match) } else { match = g == w } diff --git a/go/test/endtoend/vreplication/vdiff_helper_test.go b/go/test/endtoend/vreplication/vdiff_helper_test.go index 91605bff402..53e19e56731 100644 --- a/go/test/endtoend/vreplication/vdiff_helper_test.go +++ b/go/test/endtoend/vreplication/vdiff_helper_test.go @@ -28,6 +28,7 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/log" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" vdiff2 "vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff" ) @@ -80,6 +81,7 @@ func doVtctlclientVDiff(t *testing.T, keyspace, workflow, cells string, want *ex } else { require.Equal(t, "completed", info.State, "vdiff results: %+v", info) require.False(t, info.HasMismatch, "vdiff results: %+v", info) + require.NotZero(t, info.RowsCompared) } if strings.Contains(t.Name(), "AcrossDBVersions") { log.Errorf("VDiff resume cannot be guaranteed between major MySQL versions due to implied collation differences, skipping resume test...") @@ -150,9 +152,10 @@ func waitForVDiff2ToComplete(t *testing.T, useVtctlclient bool, ksWorkflow, cell } type expectedVDiff2Result struct { - state string - shards []string - hasMismatch bool + state string + shards []string + hasMismatch bool + minimumRowsCompared int64 } func doVtctldclientVDiff(t *testing.T, keyspace, workflow, cells string, want *expectedVDiff2Result, extraFlags ...string) { @@ -172,6 +175,8 @@ func doVtctldclientVDiff(t *testing.T, keyspace, workflow, cells string, want *e require.Equal(t, want.state, info.State) require.Equal(t, strings.Join(want.shards, ","), info.Shards) require.Equal(t, want.hasMismatch, info.HasMismatch) + require.GreaterOrEqual(t, info.RowsCompared, want.minimumRowsCompared, + "not enough rows compared: want at least %d, got %d", want.minimumRowsCompared, info.RowsCompared) } else { require.Equal(t, "completed", info.State, "vdiff results: %+v", info) require.False(t, info.HasMismatch, "vdiff results: %+v", info) @@ -187,7 +192,7 @@ func performVDiff2Action(t *testing.T, useVtctlclient bool, ksWorkflow, cells, a var err error targetKeyspace, workflowName, ok := strings.Cut(ksWorkflow, ".") require.True(t, ok, "invalid keyspace.workflow value: %s", ksWorkflow) - + waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) if useVtctlclient { // This will always result in us using a PRIMARY tablet, which is all // we start in many e2e tests, but it avoids the tablet picker logic diff --git a/go/test/endtoend/vreplication/vdiff_online_ddl_test.go b/go/test/endtoend/vreplication/vdiff_online_ddl_test.go new file mode 100644 index 00000000000..bad1b840069 --- /dev/null +++ b/go/test/endtoend/vreplication/vdiff_online_ddl_test.go @@ -0,0 +1,161 @@ +package vreplication + +import ( + "context" + "fmt" + "strconv" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/encoding/protojson" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/proto/vtctldata" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" +) + +// TestOnlineDDLVDiff is to run a vdiff on a table that is part of an OnlineDDL workflow. +func TestOnlineDDLVDiff(t *testing.T) { + setSidecarDBName("_vt") + originalRdonly := defaultRdonly + originalReplicas := defaultReplicas + defaultRdonly = 0 + defaultReplicas = 0 + defer func() { + defaultRdonly = originalRdonly + defaultReplicas = originalReplicas + }() + + vc = setupMinimalCluster(t) + defer vc.TearDown() + keyspace := "product" + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + createQuery := "create table temp (id int, name varchar(100), blb blob, primary key (id))" + dropQuery := "drop table temp" + alterQuery := "alter table temp add column extra1 int not null default 0" + insertTemplate := "insert into temp (id, name, blb) values (%d, 'name%d', 'blb%d')" + updateTemplate := "update temp set name = 'name_%d' where id = %d" + execOnlineDDL(t, "direct", keyspace, createQuery) + defer execOnlineDDL(t, "direct", keyspace, dropQuery) + + var output string + + t.Run("OnlineDDL VDiff", func(t *testing.T) { + var done = make(chan bool) + go populate(ctx, t, done, insertTemplate, updateTemplate) + + waitForAdditionalRows(t, keyspace, "temp", 100) + output = execOnlineDDL(t, "vitess --postpone-completion", keyspace, alterQuery) + uuid := strings.TrimSpace(output) + waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, uuid), binlogdatapb.VReplicationWorkflowState_Running.String()) + waitForAdditionalRows(t, keyspace, "temp", 200) + + require.NoError(t, waitForCondition("online ddl migration to be ready to complete", func() bool { + response := onlineDDLShow(t, keyspace, uuid) + if len(response.Migrations) > 0 && + response.Migrations[0].ReadyToComplete == true { + return true + } + return false + }, defaultTimeout)) + + want := &expectedVDiff2Result{ + state: "completed", + minimumRowsCompared: 200, + hasMismatch: false, + shards: []string{"0"}, + } + doVtctldclientVDiff(t, keyspace, uuid, "zone1", want) + + cancel() + <-done + }) +} + +func onlineDDLShow(t *testing.T, keyspace, uuid string) *vtctldata.GetSchemaMigrationsResponse { + var response vtctldata.GetSchemaMigrationsResponse + output, err := vc.VtctldClient.OnlineDDLShow(keyspace, uuid) + require.NoError(t, err, output) + err = protojson.Unmarshal([]byte(output), &response) + require.NoErrorf(t, err, "error unmarshalling OnlineDDL showresponse") + return &response +} + +func execOnlineDDL(t *testing.T, strategy, keyspace, query string) string { + output, err := vc.VtctldClient.ExecuteCommandWithOutput("ApplySchema", "--ddl-strategy", strategy, "--sql", query, keyspace) + require.NoError(t, err, output) + uuid := strings.TrimSpace(output) + if strategy != "direct" { + err = waitForCondition("online ddl to start", func() bool { + response := onlineDDLShow(t, keyspace, uuid) + if len(response.Migrations) > 0 && + (response.Migrations[0].Status == vtctldata.SchemaMigration_RUNNING || + response.Migrations[0].Status == vtctldata.SchemaMigration_COMPLETE) { + return true + } + return false + }, defaultTimeout) + require.NoError(t, err) + + } + return uuid +} + +func waitForAdditionalRows(t *testing.T, keyspace, table string, count int) { + vtgateConn, cancel := getVTGateConn() + defer cancel() + + numRowsStart := getNumRows(t, vtgateConn, keyspace, table) + numRows := 0 + shortCtx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + for { + switch { + case shortCtx.Err() != nil: + require.FailNowf(t, "Timed out waiting for additional rows", "wanted %d rows, got %d rows", count, numRows) + default: + numRows = getNumRows(t, vtgateConn, keyspace, table) + if numRows >= numRowsStart+count { + return + } + time.Sleep(defaultTick) + } + } +} + +func getNumRows(t *testing.T, vtgateConn *mysql.Conn, keyspace, table string) int { + qr := execVtgateQuery(t, vtgateConn, keyspace, fmt.Sprintf("SELECT COUNT(*) FROM %s", table)) + require.NotNil(t, qr) + numRows, err := strconv.Atoi(qr.Rows[0][0].ToString()) + require.NoError(t, err) + return numRows +} + +func populate(ctx context.Context, t *testing.T, done chan bool, insertTemplate, updateTemplate string) { + defer close(done) + vtgateConn, closeConn := getVTGateConn() + defer closeConn() + id := 1 + for { + select { + case <-ctx.Done(): + log.Infof("load cancelled") + return + default: + query := fmt.Sprintf(insertTemplate, id, id, id) + _, err := vtgateConn.ExecuteFetch(query, 1, false) + require.NoErrorf(t, err, "error in insert") + query = fmt.Sprintf(updateTemplate, id, id) + _, err = vtgateConn.ExecuteFetch(query, 1, false) + require.NoErrorf(t, err, "error in update") + id++ + time.Sleep(10 * time.Millisecond) + } + } +} diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 53ca966d026..4197269feb6 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1792,6 +1792,16 @@ func (s *Server) VDiffCreate(ctx context.Context, req *vtctldatapb.VDiffCreateRe req.TargetKeyspace, req.Workflow) } + workflowStatus, err := s.getWorkflowStatus(ctx, req.TargetKeyspace, req.Workflow) + if err != nil { + return nil, err + } + if workflowStatus != binlogdatapb.VReplicationWorkflowState_Running { + log.Infof("Workflow %s.%s is not running, cannot start VDiff in state %s", req.TargetKeyspace, req.Workflow, workflowStatus) + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, + "not all streams are running in workflow %s.%s", req.TargetKeyspace, req.Workflow) + } + err = ts.ForAllTargets(func(target *MigrationTarget) error { _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) return err @@ -3949,3 +3959,27 @@ func (s *Server) MigrateCreate(ctx context.Context, req *vtctldatapb.MigrateCrea } return s.moveTablesCreate(ctx, moveTablesCreateRequest, binlogdatapb.VReplicationWorkflowType_Migrate) } + +// getWorkflowStatus gets the overall status of the workflow by checking the status of all the streams. If all streams are not +// in the same state, it returns the unknown state. +func (s *Server) getWorkflowStatus(ctx context.Context, keyspace string, workflow string) (binlogdatapb.VReplicationWorkflowState, error) { + workflowStatus := binlogdatapb.VReplicationWorkflowState_Unknown + wf, err := s.GetWorkflow(ctx, keyspace, workflow, false, nil) + if err != nil { + return workflowStatus, err + } + for _, shardStream := range wf.GetShardStreams() { + for _, stream := range shardStream.GetStreams() { + state, ok := binlogdatapb.VReplicationWorkflowState_value[stream.State] + if !ok { + return workflowStatus, fmt.Errorf("invalid state for stream %s of workflow %s.%s", stream.State, keyspace, workflow) + } + currentStatus := binlogdatapb.VReplicationWorkflowState(state) + if workflowStatus != binlogdatapb.VReplicationWorkflowState_Unknown && currentStatus != workflowStatus { + return binlogdatapb.VReplicationWorkflowState_Unknown, nil + } + workflowStatus = currentStatus + } + } + return workflowStatus, nil +} diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go index 97d2bd387cb..56b8d663a3c 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go @@ -24,6 +24,8 @@ import ( "strings" "time" + "vitess.io/vitess/go/vt/schema" + "google.golang.org/protobuf/encoding/prototext" "vitess.io/vitess/go/mysql/collations" @@ -31,7 +33,6 @@ import ( "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vtctl/schematools" "vitess.io/vitess/go/vt/vterrors" @@ -343,7 +344,7 @@ func (wd *workflowDiffer) buildPlan(dbClient binlogplayer.DBClient, filter *binl if len(specifiedTables) != 0 && !stringListContains(specifiedTables, table.Name) { continue } - if schema.IsInternalOperationTableName(table.Name) { + if schema.IsInternalOperationTableName(table.Name) && !schema.IsOnlineDDLTableName(table.Name) { continue } rule, err := vreplication.MatchTable(table.Name, filter) diff --git a/test/config.json b/test/config.json index 26efdee1f36..6db35dd8158 100644 --- a/test/config.json +++ b/test/config.json @@ -1076,6 +1076,15 @@ "RetryMax": 0, "Tags": [] }, + "vreplication_onlineddl_vdiff": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestOnlineDDLVDiff"], + "Command": [], + "Manual": false, + "Shard": "vreplication_cellalias", + "RetryMax": 2, + "Tags": [] + }, "vreplication_vschema_load": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVSchemaChangesUnderLoad"],