Skip to content

Commit

Permalink
[release-20.0-rc] VReplication: Improve workflow cancel/delete (#15977)…
Browse files Browse the repository at this point in the history
… (#16130)

Signed-off-by: Matt Lord <[email protected]>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
vitess-bot[bot] authored Jun 12, 2024
1 parent 965da11 commit bfbae24
Showing 13 changed files with 1,452 additions and 227 deletions.
10 changes: 8 additions & 2 deletions go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
@@ -74,6 +74,7 @@ type VttabletProcess struct {
SupportsBackup bool
ExplicitServingStatus bool
ServingStatus string
DbName string
DbPassword string
DbPort int
DbFlavor string
@@ -148,6 +149,8 @@ func (vttablet *VttabletProcess) Setup() (err error) {
return
}

vttablet.DbName = "vt_" + vttablet.Keyspace

vttablet.exit = make(chan error)
go func() {
if vttablet.proc != nil {
@@ -442,8 +445,11 @@ func (vttablet *VttabletProcess) TearDownWithTimeout(timeout time.Duration) erro

// CreateDB creates the database for keyspace
func (vttablet *VttabletProcess) CreateDB(keyspace string) error {
_, _ = vttablet.QueryTablet(fmt.Sprintf("drop database IF EXISTS vt_%s", keyspace), keyspace, false)
_, err := vttablet.QueryTablet(fmt.Sprintf("create database IF NOT EXISTS vt_%s", keyspace), keyspace, false)
if vttablet.DbName == "" {
vttablet.DbName = "vt_" + keyspace
}
_, _ = vttablet.QueryTablet(fmt.Sprintf("drop database IF EXISTS %s", vttablet.DbName), keyspace, false)
_, err := vttablet.QueryTablet(fmt.Sprintf("create database IF NOT EXISTS %s", vttablet.DbName), keyspace, false)
return err
}

39 changes: 15 additions & 24 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
@@ -92,7 +92,7 @@ func execQueryWithRetry(t *testing.T, conn *mysql.Conn, query string, timeout ti
select {
case <-ctx.Done():
require.FailNow(t, fmt.Sprintf("query %q did not succeed before the timeout of %s; last seen result: %v",
query, timeout, qr.Rows))
query, timeout, qr))
case <-ticker.C:
log.Infof("query %q failed with error %v, retrying in %ds", query, err, defaultTick)
}
@@ -147,19 +147,6 @@ func execVtgateQuery(t *testing.T, conn *mysql.Conn, database string, query stri
return qr
}

func execVtgateQueryWithRetry(t *testing.T, conn *mysql.Conn, database string, query string, timeout time.Duration) *sqltypes.Result {
if strings.TrimSpace(query) == "" {
return nil
}
if database != "" {
execQuery(t, conn, "use `"+database+"`;")
}
execQuery(t, conn, "begin")
qr := execQueryWithRetry(t, conn, query, timeout)
execQuery(t, conn, "commit")
return qr
}

func checkHealth(t *testing.T, url string) bool {
resp, err := http.Get(url)
require.NoError(t, err)
@@ -516,20 +503,24 @@ func validateDryRunResults(t *testing.T, output string, want []string) {
require.NotEmpty(t, output)
gotDryRun := strings.Split(output, "\n")
require.True(t, len(gotDryRun) > 3)
startRow := 3
if strings.Contains(gotDryRun[0], "deprecated") {
var startRow int
if strings.HasPrefix(gotDryRun[1], "Parameters:") { // vtctlclient
startRow = 3
} else if strings.Contains(gotDryRun[0], "deprecated") {
startRow = 4
} else {
startRow = 2
}
gotDryRun = gotDryRun[startRow : len(gotDryRun)-1]
if len(want) != len(gotDryRun) {
t.Fatalf("want and got: lengths don't match, \nwant\n%s\n\ngot\n%s", strings.Join(want, "\n"), strings.Join(gotDryRun, "\n"))
require.Fail(t, "invalid dry run results", "want and got: lengths don't match, \nwant\n%s\n\ngot\n%s", strings.Join(want, "\n"), strings.Join(gotDryRun, "\n"))
}
var match, fail bool
fail = false
for i, w := range want {
w = strings.TrimSpace(w)
g := strings.TrimSpace(gotDryRun[i])
if w[0] == '/' {
if len(w) > 0 && w[0] == '/' {
w = strings.TrimSpace(w[1:])
result := strings.HasPrefix(g, w)
match = result
@@ -538,11 +529,11 @@ func validateDryRunResults(t *testing.T, output string, want []string) {
}
if !match {
fail = true
t.Fatalf("want %s, got %s\n", w, gotDryRun[i])
require.Fail(t, "invlaid dry run results", "want %s, got %s\n", w, gotDryRun[i])
}
}
if fail {
t.Fatalf("Dry run results don't match, want %s, got %s", want, gotDryRun)
require.Fail(t, "invalid dry run results", "Dry run results don't match, want %s, got %s", want, gotDryRun)
}
}

@@ -578,7 +569,7 @@ func isTableInDenyList(t *testing.T, vc *VitessCluster, ksShard string, table st
var err error
found := false
if output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetShard", ksShard); err != nil {
t.Fatalf("%v %v", err, output)
require.Fail(t, "GetShard error", "%v %v", err, output)
return false, err
}
jsonparser.ArrayEach([]byte(output), func(value []byte, dataType jsonparser.ValueType, offset int, err error) {
@@ -602,8 +593,8 @@ func expectNumberOfStreams(t *testing.T, vtgateConn *mysql.Conn, name string, wo
waitForQueryResult(t, vtgateConn, database, query, fmt.Sprintf(`[[INT64(%d)]]`, want))
}

// confirmAllStreamsRunning confirms that all of the migrated streams are running
// after a Reshard.
// confirmAllStreamsRunning confirms that all of the workflow's streams are
// in the running state.
func confirmAllStreamsRunning(t *testing.T, vtgateConn *mysql.Conn, database string) {
query := sqlparser.BuildParsedQuery("select count(*) from %s.vreplication where state != '%s'",
sidecarDBIdentifier, binlogdatapb.VReplicationWorkflowState_Running.String()).Query
@@ -801,7 +792,7 @@ func isBinlogRowImageNoBlob(t *testing.T, tablet *cluster.VttabletProcess) bool

func getRowCount(t *testing.T, vtgateConn *mysql.Conn, table string) int {
query := fmt.Sprintf("select count(*) from %s", table)
qr := execVtgateQuery(t, vtgateConn, "", query)
qr := execQuery(t, vtgateConn, query)
numRows, _ := qr.Rows[0][0].ToInt()
return numRows
}
51 changes: 32 additions & 19 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
@@ -367,6 +367,7 @@ func testVreplicationWorkflows(t *testing.T, limited bool, binlogRowImage string
expectNumberOfStreams(t, vtgateConn, "Customer3to2", "sales", "product:0", 3)
reshardCustomer3to1Merge(t)
confirmAllStreamsRunning(t, vtgateConn, "customer:0")

expectNumberOfStreams(t, vtgateConn, "Customer3to1", "sales", "product:0", 1)

t.Run("Verify CopyState Is Optimized Afterwards", func(t *testing.T) {
@@ -605,7 +606,7 @@ func TestCellAliasVreplicationWorkflow(t *testing.T) {
vc.AddKeyspace(t, []*Cell{cell1, cell2}, keyspace, shard, initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, sourceKsOpts)

// Add cell alias containing only zone2
result, err := vc.VtctlClient.ExecuteCommandWithOutput("AddCellsAlias", "--", "--cells", "zone2", "alias")
result, err := vc.VtctldClient.ExecuteCommandWithOutput("AddCellsAlias", "--cells", "zone2", "alias")
require.NoError(t, err, "command failed with output: %v", result)

verifyClusterHealth(t, vc)
@@ -722,10 +723,13 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
execVtgateQuery(t, vtgateConn, sourceKs, "update json_tbl set j1 = null, j2 = 'null', j3 = '\"null\"'")
execVtgateQuery(t, vtgateConn, sourceKs, "insert into json_tbl(id, j1, j2, j3) values (7, null, 'null', '\"null\"')")
waitForNoWorkflowLag(t, vc, targetKs, workflow)
for _, shard := range []string{"-80", "80-"} {
shardTarget := fmt.Sprintf("%s:%s", targetKs, shard)
if res := execVtgateQuery(t, vtgateConn, shardTarget, "select cid from customer"); len(res.Rows) > 0 {
waitForQueryResult(t, vtgateConn, shardTarget, "select distinct dec80 from customer", `[[DECIMAL(0)]]`)
for _, tablet := range []*cluster.VttabletProcess{customerTab1, customerTab2} {
// Query the tablet's mysqld directly as the targets will have denied table entries.
dbc, err := tablet.TabletConn(targetKs, true)
require.NoError(t, err)
defer dbc.Close()
if res := execQuery(t, dbc, "select cid from customer"); len(res.Rows) > 0 {
waitForQueryResult(t, dbc, tablet.DbName, "select distinct dec80 from customer", `[[DECIMAL(0)]]`)
dec80Replicated = true
}
}
@@ -833,7 +837,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
printShardPositions(vc, ksShards)
switchWrites(t, workflowType, ksWorkflow, true)

output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWorkflow, "show")
output, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", targetKs, "show", "--workflow", workflow)
require.NoError(t, err)
require.Contains(t, output, "'customer.reverse_bits'")
require.Contains(t, output, "'customer.bmd5'")
@@ -942,7 +946,7 @@ func reshardMerchant2to3SplitMerge(t *testing.T) {
var err error

for _, shard := range strings.Split("-80,80-", ",") {
output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetShard", "merchant:"+shard)
output, err = vc.VtctldClient.ExecuteCommandWithOutput("GetShard", "merchant:"+shard)
if err == nil {
t.Fatal("GetShard merchant:-80 failed")
}
@@ -951,7 +955,7 @@ func reshardMerchant2to3SplitMerge(t *testing.T) {

for _, shard := range strings.Split("-40,40-c0,c0-", ",") {
ksShard := fmt.Sprintf("%s:%s", merchantKeyspace, shard)
output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetShard", ksShard)
output, err = vc.VtctldClient.ExecuteCommandWithOutput("GetShard", ksShard)
if err != nil {
t.Fatalf("GetShard merchant failed for: %s: %v", shard, err)
}
@@ -1400,7 +1404,7 @@ func waitForLowLag(t *testing.T, keyspace, workflow string) {
waitDuration := 500 * time.Millisecond
duration := maxWait
for duration > 0 {
output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", fmt.Sprintf("%s.%s", keyspace, workflow), "Show")
output, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", "show", "--workflow", workflow)
require.NoError(t, err)
lagSeconds, err = jsonparser.GetInt([]byte(output), "MaxVReplicationTransactionLag")

@@ -1483,7 +1487,7 @@ func reshardAction(t *testing.T, action, workflow, keyspaceName, sourceShards, t
}

func applyVSchema(t *testing.T, vschema, keyspace string) {
err := vc.VtctlClient.ExecuteCommand("ApplyVSchema", "--", "--vschema", vschema, keyspace)
err := vc.VtctldClient.ExecuteCommand("ApplyVSchema", "--vschema", vschema, keyspace)
require.NoError(t, err)
}

@@ -1494,19 +1498,24 @@ func switchReadsDryRun(t *testing.T, workflowType, cells, ksWorkflow string, dry
"workflow type specified: %s", workflowType)
}
ensureCanSwitch(t, workflowType, cells, ksWorkflow)
output, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--tablet_types=rdonly,replica",
"--dry_run", "SwitchTraffic", ksWorkflow)
ks, wf, ok := strings.Cut(ksWorkflow, ".")
require.True(t, ok)
output, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, "SwitchTraffic",
"--cells="+cells, "--tablet-types=rdonly,replica", "--dry-run")
require.NoError(t, err, fmt.Sprintf("Switching Reads DryRun Error: %s: %s", err, output))
if dryRunResults != nil {
validateDryRunResults(t, output, dryRunResults)
}
}

func ensureCanSwitch(t *testing.T, workflowType, cells, ksWorkflow string) {
ks, wf, ok := strings.Cut(ksWorkflow, ".")
require.True(t, ok)
timer := time.NewTimer(defaultTimeout)
defer timer.Stop()
for {
_, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--dry_run", "SwitchTraffic", ksWorkflow)
_, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, "SwitchTraffic",
"--cells="+cells, "--dry-run")
if err == nil {
return
}
@@ -1532,11 +1541,13 @@ func switchReads(t *testing.T, workflowType, cells, ksWorkflow string, reverse b
command = "ReverseTraffic"
}
ensureCanSwitch(t, workflowType, cells, ksWorkflow)
output, err = vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--tablet_types=rdonly",
command, ksWorkflow)
ks, wf, ok := strings.Cut(ksWorkflow, ".")
require.True(t, ok)
output, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, command,
"--cells="+cells, "--tablet-types=rdonly")
require.NoError(t, err, fmt.Sprintf("%s Error: %s: %s", command, err, output))
output, err = vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--tablet_types=replica",
command, ksWorkflow)
output, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, command,
"--cells="+cells, "--tablet-types=replica")
require.NoError(t, err, fmt.Sprintf("%s Error: %s: %s", command, err, output))
}

@@ -1575,8 +1586,10 @@ func switchWritesDryRun(t *testing.T, workflowType, ksWorkflow string, dryRunRes
require.FailNowf(t, "Invalid workflow type for SwitchTraffic, must be MoveTables or Reshard",
"workflow type specified: %s", workflowType)
}
output, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--tablet_types=primary", "--dry_run",
"SwitchTraffic", ksWorkflow)
ks, wf, ok := strings.Cut(ksWorkflow, ".")
require.True(t, ok)
output, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks,
"SwitchTraffic", "--tablet-types=primary", "--dry-run")
require.NoError(t, err, fmt.Sprintf("Switch writes DryRun Error: %s: %s", err, output))
validateDryRunResults(t, output, dryRunResults)
}
23 changes: 10 additions & 13 deletions go/test/endtoend/vreplication/vreplication_test_env.go
Original file line number Diff line number Diff line change
@@ -19,31 +19,28 @@ package vreplication
var dryRunResultsSwitchWritesCustomerShard = []string{
"Lock keyspace product",
"Lock keyspace customer",
"Stop writes on keyspace product, tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]:",
"/ Keyspace product, Shard 0 at Position",
"Wait for VReplication on stopped streams to catchup for up to 30s",
"Create reverse replication workflow p2c_reverse",
"/Stop writes on keyspace product for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]: [keyspace:product;shard:0;position:",
"Wait for vreplication on stopped streams to catchup for up to 30s",
"Create reverse vreplication workflow p2c_reverse",
"Create journal entries on source databases",
"Enable writes on keyspace customer tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]",
"Enable writes on keyspace customer for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]",
"Switch routing from keyspace product to keyspace customer",
"Routing rules for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] will be updated",
"Switch writes completed, freeze and delete vreplication streams on:",
" tablet 200 ",
" tablet 300 ",
"Start reverse replication streams on:",
" tablet 100 ",
"Mark vreplication streams frozen on:",
" Keyspace customer, Shard -80, Tablet 200, Workflow p2c, DbName vt_customer",
" Keyspace customer, Shard 80-, Tablet 300, Workflow p2c, DbName vt_customer",
"Switch writes completed, freeze and delete vreplication streams on: [tablet:200,tablet:300]",
"Start reverse vreplication streams on: [tablet:100]",
"Mark vreplication streams frozen on: [keyspace:customer;shard:-80;tablet:200;workflow:p2c;dbname:vt_customer,keyspace:customer;shard:80-;tablet:300;workflow:p2c;dbname:vt_customer]",
"Unlock keyspace customer",
"Unlock keyspace product",
"", // Additional empty newline in the output
}

var dryRunResultsReadCustomerShard = []string{
"Lock keyspace product",
"Switch reads for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] to keyspace customer for tablet types [RDONLY,REPLICA]",
"Routing rules for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] will be updated",
"Serving VSchema will be rebuilt for the customer keyspace",
"Unlock keyspace product",
"", // Additional empty newline in the output
}

var dryRunResultsSwitchWritesM2m3 = []string{
35 changes: 22 additions & 13 deletions go/vt/topo/shard.go
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@ import (
"encoding/hex"
"fmt"
"path"
"reflect"
"slices"
"sort"
"strings"
"sync"
@@ -44,8 +44,8 @@ import (
)

const (
dlTablesAlreadyPresent = "one or more tables are already present in the denylist"
dlTablesNotPresent = "cannot remove tables since one or more do not exist in the denylist"
dlTablesAlreadyPresent = "one or more tables were already present in the denylist"
dlTablesNotPresent = "one or more tables did not exist in the denylist"
dlNoCellsForPrimary = "you cannot specify cells for a primary's tablet control"
)

@@ -397,16 +397,15 @@ func (si *ShardInfo) UpdateDeniedTables(ctx context.Context, tabletType topodata
}
tc := si.GetTabletControl(tabletType)
if tc == nil {

// handle the case where the TabletControl object is new
// Handle the case where the TabletControl object is new.
if remove {
// we try to remove from something that doesn't exist,
// log, but we're done.
// We tried to remove something that doesn't exist, log a warning.
// But we know that our work is done.
log.Warningf("Trying to remove TabletControl.DeniedTables for missing type %v in shard %v/%v", tabletType, si.keyspace, si.shardName)
return nil
}

// trying to add more constraints with no existing record
// Add constraints to the new record.
si.TabletControls = append(si.TabletControls, &topodatapb.Shard_TabletControl{
TabletType: tabletType,
Cells: cells,
@@ -422,16 +421,16 @@ func (si *ShardInfo) UpdateDeniedTables(ctx context.Context, tabletType topodata
return nil
}

// we have an existing record, check table lists matches and
// We have an existing record, update the table lists.
if remove {
si.removeCellsFromTabletControl(tc, tabletType, cells)
} else {
if !reflect.DeepEqual(tc.DeniedTables, tables) {
if !slices.Equal(tc.DeniedTables, tables) {
return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "trying to use two different sets of denied tables for shard %v/%v: %v and %v", si.keyspace, si.shardName, tc.DeniedTables, tables)
}

tc.Cells = addCells(tc.Cells, cells)
}

return nil
}

@@ -451,7 +450,8 @@ func (si *ShardInfo) updatePrimaryTabletControl(tc *topodatapb.Shard_TabletContr
}
if remove {
if len(newTables) != 0 {
return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, dlTablesNotPresent)
// These tables did not exist in the denied list so we don't need to remove them.
log.Warningf("%s:%s", dlTablesNotPresent, strings.Join(newTables, ","))
}
var newDenyList []string
if len(tables) != 0 { // legacy uses
@@ -475,7 +475,16 @@ func (si *ShardInfo) updatePrimaryTabletControl(tc *topodatapb.Shard_TabletContr
return nil
}
if len(newTables) != len(tables) {
return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, dlTablesAlreadyPresent)
// Some of the tables already existed in the DeniedTables list so we don't
// need to add them.
log.Warningf("%s:%s", dlTablesAlreadyPresent, strings.Join(tables, ","))
// We do need to merge the lists, however.
tables = append(tables, newTables...)
tc.DeniedTables = append(tc.DeniedTables, tables...)
// And be sure to remove any duplicates.
slices.Sort(tc.DeniedTables)
tc.DeniedTables = slices.Compact(tc.DeniedTables)
return nil
}
tc.DeniedTables = append(tc.DeniedTables, tables...)
return nil
9 changes: 5 additions & 4 deletions go/vt/topo/shard_test.go
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/utils"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

@@ -121,14 +122,14 @@ func TestUpdateSourcePrimaryDeniedTables(t *testing.T) {
require.NoError(t, addToDenyList(ctx, si, primary, nil, tables2))
validateDenyList(t, si, primary, nil, append(tables1, tables2...))

require.Error(t, addToDenyList(ctx, si, primary, nil, tables2), dlTablesAlreadyPresent)
require.Error(t, addToDenyList(ctx, si, primary, nil, []string{t1}), dlTablesAlreadyPresent)
require.NoError(t, addToDenyList(ctx, si, primary, nil, tables2))
require.NoError(t, addToDenyList(ctx, si, primary, nil, []string{t1}))

require.NoError(t, removeFromDenyList(ctx, si, primary, nil, tables2))
validateDenyList(t, si, primary, nil, tables1)

require.Error(t, removeFromDenyList(ctx, si, primary, nil, tables2), dlTablesNotPresent)
require.Error(t, removeFromDenyList(ctx, si, primary, nil, []string{t3}), dlTablesNotPresent)
require.NoError(t, removeFromDenyList(ctx, si, primary, nil, tables2))
require.NoError(t, removeFromDenyList(ctx, si, primary, nil, []string{t3}))
validateDenyList(t, si, primary, nil, tables1)

require.NoError(t, removeFromDenyList(ctx, si, primary, nil, []string{t1}))
458 changes: 458 additions & 0 deletions go/vt/vtctl/workflow/framework_test.go

Large diffs are not rendered by default.

16 changes: 2 additions & 14 deletions go/vt/vtctl/workflow/materializer_env_test.go
Original file line number Diff line number Diff line change
@@ -19,7 +19,6 @@ package workflow
import (
"context"
"fmt"
"os"
"regexp"
"strings"
"sync"
@@ -37,7 +36,6 @@ import (
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tmclient"

_flag "vitess.io/vitess/go/internal/flag"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
@@ -46,11 +44,6 @@ import (
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

type queryResult struct {
query string
result *querypb.QueryResult
}

type testMaterializerEnv struct {
ws *Server
ms *vtctldatapb.MaterializeSettings
@@ -65,20 +58,15 @@ type testMaterializerEnv struct {
//----------------------------------------------
// testMaterializerEnv

func TestMain(m *testing.M) {
_flag.ParseFlagsForTest()
os.Exit(m.Run())
}

func newTestMaterializerEnv(t *testing.T, ctx context.Context, ms *vtctldatapb.MaterializeSettings, sources, targets []string) *testMaterializerEnv {
t.Helper()
env := &testMaterializerEnv{
ms: ms,
sources: sources,
targets: targets,
tablets: make(map[int]*topodatapb.Tablet),
topoServ: memorytopo.NewServer(ctx, "cell"),
cell: "cell",
topoServ: memorytopo.NewServer(ctx, defaultCellName),
cell: defaultCellName,
tmc: newTestMaterializerTMClient(),
}
venv := vtenv.NewTestEnv()
145 changes: 94 additions & 51 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
@@ -22,7 +22,6 @@ import (
"errors"
"fmt"
"math"
"reflect"
"slices"
"sort"
"strings"
@@ -1452,13 +1451,31 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
return nil, err
}

isStandardMoveTables := func() bool {
return !mz.IsMultiTenantMigration() && !mz.isPartial
}

ts, err := s.buildTrafficSwitcher(ctx, req.GetTargetKeyspace(), req.GetWorkflow())
if err != nil {
return nil, err
}
sw := &switcher{s: s, ts: ts}
lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "MoveTablesCreate")
if lockErr != nil {
ts.Logger().Errorf("Locking target keyspace %s failed: %v", ts.TargetKeyspaceName(), lockErr)
return nil, lockErr
}
defer targetUnlock(&err)
ctx = lockCtx

// If we get an error after this point, where the vreplication streams/records
// have been created, then we clean up the workflow's artifacts.
defer func() {
if err != nil {
ts, cerr := s.buildTrafficSwitcher(ctx, ms.TargetKeyspace, ms.Workflow)
if cerr != nil {
err = vterrors.Wrapf(err, "failed to cleanup workflow artifacts: %v", cerr)
if isStandardMoveTables() { // Non-standard ones do not use shard scoped mechanisms
if cerr := ts.dropTargetDeniedTables(ctx); cerr != nil {
err = vterrors.Wrapf(err, "failed to cleanup denied table entries: %v", cerr)
}
}
if cerr := s.dropArtifacts(ctx, false, &switcher{s: s, ts: ts}); cerr != nil {
err = vterrors.Wrapf(err, "failed to cleanup workflow artifacts: %v", cerr)
@@ -1473,9 +1490,9 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
}()

// Now that the streams have been successfully created, let's put the associated
// routing rules in place.
// routing rules and denied tables entries in place.
if externalTopo == nil {
if err := s.setupInitialRoutingRules(ctx, req, mz, tables, vschema); err != nil {
if err := s.setupInitialRoutingRules(ctx, req, mz, tables); err != nil {
return nil, err
}

@@ -1484,6 +1501,11 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
return nil, err
}
}
if isStandardMoveTables() { // Non-standard ones do not use shard scoped mechanisms
if err := s.setupInitialDeniedTables(ctx, ts); err != nil {
return nil, vterrors.Wrapf(err, "failed to put initial denied tables entries in place on the target shards")
}
}
if err := s.ts.RebuildSrvVSchema(ctx, nil); err != nil {
return nil, err
}
@@ -1547,7 +1569,24 @@ func (s *Server) validateRoutingRuleFlags(req *vtctldatapb.MoveTablesCreateReque
return nil
}

func (s *Server) setupInitialRoutingRules(ctx context.Context, req *vtctldatapb.MoveTablesCreateRequest, mz *materializer, tables []string, vschema *vschemapb.Keyspace) error {
func (s *Server) setupInitialDeniedTables(ctx context.Context, ts *trafficSwitcher) error {
if ts.MigrationType() != binlogdatapb.MigrationType_TABLES {
return nil
}
return ts.ForAllTargets(func(target *MigrationTarget) error {
if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error {
return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, false, ts.Tables())
}); err != nil {
return err
}
strCtx, cancel := context.WithTimeout(ctx, shardTabletRefreshTimeout)
defer cancel()
_, _, err := topotools.RefreshTabletsByShard(strCtx, ts.TopoServer(), ts.TabletManagerClient(), target.GetShard(), nil, ts.Logger())
return err
})
}

func (s *Server) setupInitialRoutingRules(ctx context.Context, req *vtctldatapb.MoveTablesCreateRequest, mz *materializer, tables []string) error {
if err := s.validateRoutingRuleFlags(req, mz); err != nil {
return err
}
@@ -1616,7 +1655,7 @@ func (s *Server) MoveTablesComplete(ctx context.Context, req *vtctldatapb.MoveTa
span, ctx := trace.NewSpan(ctx, "workflow.Server.MoveTablesComplete")
defer span.Finish()

ts, state, err := s.getWorkflowState(ctx, req.TargetKeyspace, req.Workflow)
ts, state, err := s.getWorkflowState(ctx, req.GetTargetKeyspace(), req.GetWorkflow())
if err != nil {
return nil, err
}
@@ -1630,8 +1669,7 @@ func (s *Server) MoveTablesComplete(ctx context.Context, req *vtctldatapb.MoveTa
var dryRunResults *[]string

if state.WorkflowType == TypeMigrate {
dryRunResults, err = s.finalizeMigrateWorkflow(ctx, req.TargetKeyspace, req.Workflow, strings.Join(ts.tables, ","),
false, req.KeepData, req.KeepRoutingRules, req.DryRun)
dryRunResults, err = s.finalizeMigrateWorkflow(ctx, ts, strings.Join(ts.tables, ","), false, req.KeepData, req.KeepRoutingRules, req.DryRun)
if err != nil {
return nil, vterrors.Wrapf(err, "failed to finalize the %s workflow in the %s keyspace",
req.Workflow, req.TargetKeyspace)
@@ -1970,11 +2008,21 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe
span.Annotate("keep_routing_rules", req.KeepRoutingRules)
span.Annotate("shards", req.Shards)

// Cleanup related data and artifacts.
if _, err := s.DropTargets(ctx, req.Keyspace, req.Workflow, req.KeepData, req.KeepRoutingRules, false); err != nil {
if topo.IsErrType(err, topo.NoNode) {
return nil, vterrors.Wrapf(err, "%s keyspace does not exist", req.Keyspace)
ts, state, err := s.getWorkflowState(ctx, req.GetKeyspace(), req.GetWorkflow())
if err != nil {
log.Errorf("failed to get VReplication workflow state for %s.%s: %v", req.GetKeyspace(), req.GetWorkflow(), err)
return nil, err
}

if ts.workflowType != binlogdatapb.VReplicationWorkflowType_CreateLookupIndex {
// Return an error if the workflow traffic is partially switched.
if state.WritesSwitched || len(state.ReplicaCellsSwitched) > 0 || len(state.RdonlyCellsSwitched) > 0 {
return nil, ErrWorkflowPartiallySwitched
}
}

if state.WorkflowType == TypeMigrate {
_, err := s.finalizeMigrateWorkflow(ctx, ts, "", true, req.GetKeepData(), req.GetKeepRoutingRules(), false)
return nil, err
}

@@ -1993,7 +2041,9 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe
s.optimizeCopyStateTable(tablet.Tablet)
return res.Result, err
}
res, err := vx.CallbackContext(ctx, callback)
delCtx, delCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout*2)
defer delCancel()
res, err := vx.CallbackContext(delCtx, callback)
if err != nil {
return nil, err
}
@@ -2002,6 +2052,16 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "the %s workflow does not exist in the %s keyspace", req.Workflow, req.Keyspace)
}

// Cleanup related data and artifacts. There are none for a LookupVindex workflow.
if ts.workflowType != binlogdatapb.VReplicationWorkflowType_CreateLookupIndex {
if _, err := s.DropTargets(delCtx, ts, req.GetKeepData(), req.GetKeepRoutingRules(), false); err != nil {
if topo.IsErrType(err, topo.NoNode) {
return nil, vterrors.Wrapf(err, "%s keyspace does not exist", req.GetKeyspace())
}
return nil, err
}
}

response := &vtctldatapb.WorkflowDeleteResponse{}
response.Summary = fmt.Sprintf("Successfully cancelled the %s workflow in the %s keyspace", req.Workflow, req.Keyspace)
details := make([]*vtctldatapb.WorkflowDeleteResponse_TabletInfo, 0, len(res))
@@ -2012,6 +2072,9 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe
}
details = append(details, result)
}
sort.Slice(details, func(i, j int) bool { // Ensure deterministic output
return topoproto.TabletAliasString(details[i].Tablet) < topoproto.TabletAliasString(details[j].Tablet)
})
response.Details = details
return response, nil
}
@@ -2269,7 +2332,9 @@ func (s *Server) WorkflowUpdate(ctx context.Context, req *vtctldatapb.WorkflowUp
}
return res.Result, err
}
res, err := vx.CallbackContext(ctx, callback)
updCtx, updCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout*2)
defer updCancel()
res, err := vx.CallbackContext(updCtx, callback)
if err != nil {
if topo.IsErrType(err, topo.NoNode) {
return nil, vterrors.Wrapf(err, "%s keyspace does not exist", req.Keyspace)
@@ -2497,28 +2562,8 @@ func (s *Server) optimizeCopyStateTable(tablet *topodatapb.Tablet) {

// DropTargets cleans up target tables, shards and denied tables if a MoveTables/Reshard
// is cancelled.
func (s *Server) DropTargets(ctx context.Context, targetKeyspace, workflow string, keepData, keepRoutingRules, dryRun bool) (*[]string, error) {
ts, state, err := s.getWorkflowState(ctx, targetKeyspace, workflow)
if err != nil {
log.Errorf("Failed to get VReplication workflow state for %s.%s: %v", targetKeyspace, workflow, err)
return nil, err
}

// There is nothing to drop for a LookupVindex workflow.
if ts.workflowType == binlogdatapb.VReplicationWorkflowType_CreateLookupIndex {
return nil, nil
}

// Return an error if the workflow traffic is partially switched.
if state.WritesSwitched || len(state.ReplicaCellsSwitched) > 0 || len(state.RdonlyCellsSwitched) > 0 {
return nil, ErrWorkflowPartiallySwitched
}

if state.WorkflowType == TypeMigrate {
_, err := s.finalizeMigrateWorkflow(ctx, targetKeyspace, workflow, "", true, keepData, keepRoutingRules, dryRun)
return nil, err
}

func (s *Server) DropTargets(ctx context.Context, ts *trafficSwitcher, keepData, keepRoutingRules, dryRun bool) (*[]string, error) {
var err error
ts.keepRoutingRules = keepRoutingRules
var sw iswitcher
if dryRun {
@@ -2629,7 +2674,7 @@ func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workf
tables = append(tables, rule.Match)
}
sort.Strings(tables)
if !reflect.DeepEqual(ts.tables, tables) {
if !slices.Equal(ts.tables, tables) {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "table lists are mismatched across streams: %v vs %v", ts.tables, tables)
}
}
@@ -2803,8 +2848,8 @@ func (s *Server) DeleteShard(ctx context.Context, keyspace, shard string, recurs
shardInfo, err := s.ts.GetShard(ctx, keyspace, shard)
if err != nil {
if topo.IsErrType(err, topo.NoNode) {
log.Infof("Shard %v/%v doesn't seem to exist, cleaning up any potential leftover", keyspace, shard)
return s.ts.DeleteShard(ctx, keyspace, shard)
log.Warningf("Shard %v/%v did not exist when attempting to remove it", keyspace, shard)
return nil
}
return err
}
@@ -2942,13 +2987,11 @@ func (s *Server) refreshPrimaryTablets(ctx context.Context, shards []*topo.Shard

// finalizeMigrateWorkflow deletes the streams for the Migrate workflow.
// We only cleanup the target for external sources.
func (s *Server) finalizeMigrateWorkflow(ctx context.Context, targetKeyspace, workflow, tableSpecs string, cancel, keepData, keepRoutingRules, dryRun bool) (*[]string, error) {
ts, err := s.buildTrafficSwitcher(ctx, targetKeyspace, workflow)
if err != nil {
ts.Logger().Errorf("buildTrafficSwitcher failed: %v", err)
return nil, err
}
var sw iswitcher
func (s *Server) finalizeMigrateWorkflow(ctx context.Context, ts *trafficSwitcher, tableSpecs string, cancel, keepData, keepRoutingRules, dryRun bool) (*[]string, error) {
var (
sw iswitcher
err error
)
if dryRun {
sw = &switcherDryRun{ts: ts, drLog: NewLogRecorder()}
} else {
@@ -2966,7 +3009,7 @@ func (s *Server) finalizeMigrateWorkflow(ctx context.Context, targetKeyspace, wo
return nil, err
}
if !cancel {
if err := sw.addParticipatingTablesToKeyspace(ctx, targetKeyspace, tableSpecs); err != nil {
if err := sw.addParticipatingTablesToKeyspace(ctx, ts.targetKeyspace, tableSpecs); err != nil {
return nil, err
}
if err := ts.TopoServer().RebuildSrvVSchema(ctx, nil); err != nil {
@@ -3072,7 +3115,7 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor
resp.Summary = fmt.Sprintf("%s dry run results for workflow %s.%s at %v", cmd, req.Keyspace, req.Workflow, time.Now().UTC().Format(time.RFC822))
resp.DryRunResults = dryRunResults
} else {
log.Infof("SwitchTraffic done for workflow %s.%s", req.Keyspace, req.Workflow)
log.Infof("%s done for workflow %s.%s", cmd, req.Keyspace, req.Workflow)
resp.Summary = fmt.Sprintf("%s was successful for workflow %s.%s", cmd, req.Keyspace, req.Workflow)
// Reload the state after the SwitchTraffic operation
// and return that as a string.
@@ -3090,7 +3133,7 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor
} else {
resp.CurrentState = currentState.String()
}
log.Infof("SwitchTraffic done for workflow %s.%s, returning response %v", req.Keyspace, req.Workflow, resp)
log.Infof("%s done for workflow %s.%s, returning response %v", cmd, req.Keyspace, req.Workflow, resp)
}
return resp, nil
}
581 changes: 581 additions & 0 deletions go/vt/vtctl/workflow/server_test.go

Large diffs are not rendered by default.

148 changes: 126 additions & 22 deletions go/vt/vtctl/workflow/switcher_dry_run.go

Large diffs are not rendered by default.

157 changes: 92 additions & 65 deletions go/vt/vtctl/workflow/traffic_switcher.go

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
@@ -412,6 +412,13 @@ func TestMoveTables(t *testing.T) {
),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Stopped||%s|1||0|0|0||0|1", vreplID, bls, position, targetKs),
), nil)
ftc.vrdbClient.ExpectRequest(fmt.Sprintf(readWorkflow, wf, tenv.dbName), sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys",
"int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64",
),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Stopped||%s|1||0|0|0||0|1", vreplID, bls, position, targetKs),
), nil)
ftc.vrdbClient.ExpectRequest(fmt.Sprintf(readWorkflowConfig, wf), sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|source|cell|tablet_types|state|message",

0 comments on commit bfbae24

Please sign in to comment.