Skip to content

Commit

Permalink
VDiff/OnlineDDL: add support for running VDiffs for OnlineDDL migrati…
Browse files Browse the repository at this point in the history
…ons (#15546)

Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps authored Apr 8, 2024
1 parent ab6a5b4 commit ffa21e7
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 8 deletions.
11 changes: 11 additions & 0 deletions go/test/endtoend/cluster/vtctldclient_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,14 @@ func (vtctldclient *VtctldClientProcess) OnlineDDLShowRecent(Keyspace string) (r
"recent",
)
}

// OnlineDDLShow responds with recent schema migration list
func (vtctldclient *VtctldClientProcess) OnlineDDLShow(keyspace, workflow string) (result string, err error) {
return vtctldclient.ExecuteCommandWithOutput(
"OnlineDDL",
"show",
"--json",
keyspace,
workflow,
)
}
3 changes: 1 addition & 2 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa
log.Infof("Waiting for workflow %q to fully reach %q state", ksWorkflow, wantState)
for {
output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWorkflow, "show")
require.NoError(t, err)
require.NoError(t, err, output)
done = true
state := ""
result := gjson.Get(output, "ShardStatuses")
Expand Down Expand Up @@ -522,7 +522,6 @@ func validateDryRunResults(t *testing.T, output string, want []string) {
w = strings.TrimSpace(w[1:])
result := strings.HasPrefix(g, w)
match = result
//t.Logf("Partial match |%v|%v|%v\n", w, g, match)
} else {
match = g == w
}
Expand Down
13 changes: 9 additions & 4 deletions go/test/endtoend/vreplication/vdiff_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
vdiff2 "vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff"
)

Expand Down Expand Up @@ -80,6 +81,7 @@ func doVtctlclientVDiff(t *testing.T, keyspace, workflow, cells string, want *ex
} else {
require.Equal(t, "completed", info.State, "vdiff results: %+v", info)
require.False(t, info.HasMismatch, "vdiff results: %+v", info)
require.NotZero(t, info.RowsCompared)
}
if strings.Contains(t.Name(), "AcrossDBVersions") {
log.Errorf("VDiff resume cannot be guaranteed between major MySQL versions due to implied collation differences, skipping resume test...")
Expand Down Expand Up @@ -150,9 +152,10 @@ func waitForVDiff2ToComplete(t *testing.T, useVtctlclient bool, ksWorkflow, cell
}

type expectedVDiff2Result struct {
state string
shards []string
hasMismatch bool
state string
shards []string
hasMismatch bool
minimumRowsCompared int64
}

func doVtctldclientVDiff(t *testing.T, keyspace, workflow, cells string, want *expectedVDiff2Result, extraFlags ...string) {
Expand All @@ -172,6 +175,8 @@ func doVtctldclientVDiff(t *testing.T, keyspace, workflow, cells string, want *e
require.Equal(t, want.state, info.State)
require.Equal(t, strings.Join(want.shards, ","), info.Shards)
require.Equal(t, want.hasMismatch, info.HasMismatch)
require.GreaterOrEqual(t, info.RowsCompared, want.minimumRowsCompared,
"not enough rows compared: want at least %d, got %d", want.minimumRowsCompared, info.RowsCompared)
} else {
require.Equal(t, "completed", info.State, "vdiff results: %+v", info)
require.False(t, info.HasMismatch, "vdiff results: %+v", info)
Expand All @@ -187,7 +192,7 @@ func performVDiff2Action(t *testing.T, useVtctlclient bool, ksWorkflow, cells, a
var err error
targetKeyspace, workflowName, ok := strings.Cut(ksWorkflow, ".")
require.True(t, ok, "invalid keyspace.workflow value: %s", ksWorkflow)

waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
if useVtctlclient {
// This will always result in us using a PRIMARY tablet, which is all
// we start in many e2e tests, but it avoids the tablet picker logic
Expand Down
161 changes: 161 additions & 0 deletions go/test/endtoend/vreplication/vdiff_online_ddl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package vreplication

import (
"context"
"fmt"
"strconv"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
"google.golang.org/protobuf/encoding/protojson"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/vtctldata"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

// TestOnlineDDLVDiff is to run a vdiff on a table that is part of an OnlineDDL workflow.
func TestOnlineDDLVDiff(t *testing.T) {
setSidecarDBName("_vt")
originalRdonly := defaultRdonly
originalReplicas := defaultReplicas
defaultRdonly = 0
defaultReplicas = 0
defer func() {
defaultRdonly = originalRdonly
defaultReplicas = originalReplicas
}()

vc = setupMinimalCluster(t)
defer vc.TearDown()
keyspace := "product"
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

createQuery := "create table temp (id int, name varchar(100), blb blob, primary key (id))"
dropQuery := "drop table temp"
alterQuery := "alter table temp add column extra1 int not null default 0"
insertTemplate := "insert into temp (id, name, blb) values (%d, 'name%d', 'blb%d')"
updateTemplate := "update temp set name = 'name_%d' where id = %d"
execOnlineDDL(t, "direct", keyspace, createQuery)
defer execOnlineDDL(t, "direct", keyspace, dropQuery)

var output string

t.Run("OnlineDDL VDiff", func(t *testing.T) {
var done = make(chan bool)
go populate(ctx, t, done, insertTemplate, updateTemplate)

waitForAdditionalRows(t, keyspace, "temp", 100)
output = execOnlineDDL(t, "vitess --postpone-completion", keyspace, alterQuery)
uuid := strings.TrimSpace(output)
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, uuid), binlogdatapb.VReplicationWorkflowState_Running.String())
waitForAdditionalRows(t, keyspace, "temp", 200)

require.NoError(t, waitForCondition("online ddl migration to be ready to complete", func() bool {
response := onlineDDLShow(t, keyspace, uuid)
if len(response.Migrations) > 0 &&
response.Migrations[0].ReadyToComplete == true {
return true
}
return false
}, defaultTimeout))

want := &expectedVDiff2Result{
state: "completed",
minimumRowsCompared: 200,
hasMismatch: false,
shards: []string{"0"},
}
doVtctldclientVDiff(t, keyspace, uuid, "zone1", want)

cancel()
<-done
})
}

func onlineDDLShow(t *testing.T, keyspace, uuid string) *vtctldata.GetSchemaMigrationsResponse {
var response vtctldata.GetSchemaMigrationsResponse
output, err := vc.VtctldClient.OnlineDDLShow(keyspace, uuid)
require.NoError(t, err, output)
err = protojson.Unmarshal([]byte(output), &response)
require.NoErrorf(t, err, "error unmarshalling OnlineDDL showresponse")
return &response
}

func execOnlineDDL(t *testing.T, strategy, keyspace, query string) string {
output, err := vc.VtctldClient.ExecuteCommandWithOutput("ApplySchema", "--ddl-strategy", strategy, "--sql", query, keyspace)
require.NoError(t, err, output)
uuid := strings.TrimSpace(output)
if strategy != "direct" {
err = waitForCondition("online ddl to start", func() bool {
response := onlineDDLShow(t, keyspace, uuid)
if len(response.Migrations) > 0 &&
(response.Migrations[0].Status == vtctldata.SchemaMigration_RUNNING ||
response.Migrations[0].Status == vtctldata.SchemaMigration_COMPLETE) {
return true
}
return false
}, defaultTimeout)
require.NoError(t, err)

}
return uuid
}

func waitForAdditionalRows(t *testing.T, keyspace, table string, count int) {
vtgateConn, cancel := getVTGateConn()
defer cancel()

numRowsStart := getNumRows(t, vtgateConn, keyspace, table)
numRows := 0
shortCtx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
for {
switch {
case shortCtx.Err() != nil:
require.FailNowf(t, "Timed out waiting for additional rows", "wanted %d rows, got %d rows", count, numRows)
default:
numRows = getNumRows(t, vtgateConn, keyspace, table)
if numRows >= numRowsStart+count {
return
}
time.Sleep(defaultTick)
}
}
}

func getNumRows(t *testing.T, vtgateConn *mysql.Conn, keyspace, table string) int {
qr := execVtgateQuery(t, vtgateConn, keyspace, fmt.Sprintf("SELECT COUNT(*) FROM %s", table))
require.NotNil(t, qr)
numRows, err := strconv.Atoi(qr.Rows[0][0].ToString())
require.NoError(t, err)
return numRows
}

func populate(ctx context.Context, t *testing.T, done chan bool, insertTemplate, updateTemplate string) {
defer close(done)
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
id := 1
for {
select {
case <-ctx.Done():
log.Infof("load cancelled")
return
default:
query := fmt.Sprintf(insertTemplate, id, id, id)
_, err := vtgateConn.ExecuteFetch(query, 1, false)
require.NoErrorf(t, err, "error in insert")
query = fmt.Sprintf(updateTemplate, id, id)
_, err = vtgateConn.ExecuteFetch(query, 1, false)
require.NoErrorf(t, err, "error in update")
id++
time.Sleep(10 * time.Millisecond)
}
}
}
34 changes: 34 additions & 0 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1792,6 +1792,16 @@ func (s *Server) VDiffCreate(ctx context.Context, req *vtctldatapb.VDiffCreateRe
req.TargetKeyspace, req.Workflow)
}

workflowStatus, err := s.getWorkflowStatus(ctx, req.TargetKeyspace, req.Workflow)
if err != nil {
return nil, err
}
if workflowStatus != binlogdatapb.VReplicationWorkflowState_Running {
log.Infof("Workflow %s.%s is not running, cannot start VDiff in state %s", req.TargetKeyspace, req.Workflow, workflowStatus)
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION,
"not all streams are running in workflow %s.%s", req.TargetKeyspace, req.Workflow)
}

err = ts.ForAllTargets(func(target *MigrationTarget) error {
_, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq)
return err
Expand Down Expand Up @@ -3949,3 +3959,27 @@ func (s *Server) MigrateCreate(ctx context.Context, req *vtctldatapb.MigrateCrea
}
return s.moveTablesCreate(ctx, moveTablesCreateRequest, binlogdatapb.VReplicationWorkflowType_Migrate)
}

// getWorkflowStatus gets the overall status of the workflow by checking the status of all the streams. If all streams are not
// in the same state, it returns the unknown state.
func (s *Server) getWorkflowStatus(ctx context.Context, keyspace string, workflow string) (binlogdatapb.VReplicationWorkflowState, error) {
workflowStatus := binlogdatapb.VReplicationWorkflowState_Unknown
wf, err := s.GetWorkflow(ctx, keyspace, workflow, false, nil)
if err != nil {
return workflowStatus, err
}
for _, shardStream := range wf.GetShardStreams() {
for _, stream := range shardStream.GetStreams() {
state, ok := binlogdatapb.VReplicationWorkflowState_value[stream.State]
if !ok {
return workflowStatus, fmt.Errorf("invalid state for stream %s of workflow %s.%s", stream.State, keyspace, workflow)
}
currentStatus := binlogdatapb.VReplicationWorkflowState(state)
if workflowStatus != binlogdatapb.VReplicationWorkflowState_Unknown && currentStatus != workflowStatus {
return binlogdatapb.VReplicationWorkflowState_Unknown, nil
}
workflowStatus = currentStatus
}
}
return workflowStatus, nil
}
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ import (
"strings"
"time"

"vitess.io/vitess/go/vt/schema"

"google.golang.org/protobuf/encoding/prototext"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtctl/schematools"
"vitess.io/vitess/go/vt/vterrors"
Expand Down Expand Up @@ -343,7 +344,7 @@ func (wd *workflowDiffer) buildPlan(dbClient binlogplayer.DBClient, filter *binl
if len(specifiedTables) != 0 && !stringListContains(specifiedTables, table.Name) {
continue
}
if schema.IsInternalOperationTableName(table.Name) {
if schema.IsInternalOperationTableName(table.Name) && !schema.IsOnlineDDLTableName(table.Name) {
continue
}
rule, err := vreplication.MatchTable(table.Name, filter)
Expand Down
9 changes: 9 additions & 0 deletions test/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,15 @@
"RetryMax": 0,
"Tags": []
},
"vreplication_onlineddl_vdiff": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestOnlineDDLVDiff"],
"Command": [],
"Manual": false,
"Shard": "vreplication_cellalias",
"RetryMax": 2,
"Tags": []
},
"vreplication_vschema_load": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVSchemaChangesUnderLoad"],
Expand Down

0 comments on commit ffa21e7

Please sign in to comment.