Skip to content

Commit

Permalink
MoveTables Cancel: drop denied tables on target when dropping source/…
Browse files Browse the repository at this point in the history
…target tables (#14008)

Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps authored Sep 19, 2023
1 parent 6167dbb commit 7540087
Show file tree
Hide file tree
Showing 17 changed files with 188 additions and 31 deletions.
12 changes: 11 additions & 1 deletion go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,17 @@ func checkIfTableExists(t *testing.T, vc *VitessCluster, tabletAlias string, tab
return found, nil
}

func checkIfDenyListExists(t *testing.T, vc *VitessCluster, ksShard string, table string) (bool, error) {
func validateTableInDenyList(t *testing.T, vc *VitessCluster, ksShard string, table string, mustExist bool) {
found, err := isTableInDenyList(t, vc, ksShard, table)
require.NoError(t, err)
if mustExist {
require.True(t, found, "Table %s not found in deny list", table)
} else {
require.False(t, found, "Table %s found in deny list", table)
}
}

func isTableInDenyList(t *testing.T, vc *VitessCluster, ksShard string, table string) (bool, error) {
var output string
var err error
found := false
Expand Down
53 changes: 52 additions & 1 deletion go/test/endtoend/vreplication/partial_movetables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,61 @@ import (
"strings"
"testing"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"

"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/wrangler"
)

// testCancel() starts and cancels a partial MoveTables for one of the shards which will be actually moved later on.
// Before canceling, we first switch traffic to the target keyspace and then reverse it back to the source keyspace.
// This tests that artifacts are being properly cleaned up when a MoveTables ia canceled.
func testCancel(t *testing.T) {
targetKeyspace := "customer2"
sourceKeyspace := "customer"
workflowName := "partial80DashForCancel"
ksWorkflow := fmt.Sprintf("%s.%s", targetKeyspace, workflowName)
// We use a different table in this MoveTables than the subsequent one, so that setting up of the artifacts
// while creating MoveTables do not paper over any issues with cleaning up artifacts when MoveTables is canceled.
// Ref: https://github.com/vitessio/vitess/issues/13998
table := "customer2"
shard := "80-"
// start the partial movetables for 80-
mt := newMoveTables(vc, &moveTables{
workflowName: workflowName,
targetKeyspace: targetKeyspace,
sourceKeyspace: sourceKeyspace,
tables: table,
sourceShards: shard,
}, moveTablesFlavorRandom)
mt.Create()

checkDenyList := func(keyspace string, expected bool) {
validateTableInDenyList(t, vc, fmt.Sprintf("%s:%s", keyspace, shard), table, expected)
}

waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())

checkDenyList(targetKeyspace, false)
checkDenyList(sourceKeyspace, false)

mt.SwitchReadsAndWrites()
checkDenyList(targetKeyspace, false)
checkDenyList(sourceKeyspace, true)

mt.ReverseReadsAndWrites()
checkDenyList(targetKeyspace, true)
checkDenyList(sourceKeyspace, false)

mt.Cancel()
checkDenyList(targetKeyspace, false)
checkDenyList(sourceKeyspace, false)

}

// TestPartialMoveTablesBasic tests partial move tables by moving each
// customer shard -- -80,80- -- once a a time to customer2.
func TestPartialMoveTablesBasic(t *testing.T) {
Expand Down Expand Up @@ -58,7 +106,7 @@ func TestPartialMoveTablesBasic(t *testing.T) {

// Move customer table from unsharded product keyspace to
// sharded customer keyspace.
createMoveTablesWorkflow(t, "customer,loadtest")
createMoveTablesWorkflow(t, "customer,loadtest,customer2")
tstWorkflowSwitchReadsAndWrites(t)
tstWorkflowComplete(t)

Expand All @@ -81,6 +129,9 @@ func TestPartialMoveTablesBasic(t *testing.T) {
// move tables for one of the two shards: 80-.
defaultRdonly = 0
setupCustomer2Keyspace(t)

testCancel(t)

currentWorkflowType = wrangler.MoveTablesWorkflow
wfName := "partial80Dash"
sourceKs := "customer"
Expand Down
6 changes: 3 additions & 3 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func TestMultiCellVreplicationWorkflow(t *testing.T) {
verifyClusterHealth(t, vc)
insertInitialData(t)
shardCustomer(t, true, []*Cell{cell1, cell2}, cell2.Name, true)
checkIfDenyListExists(t, vc, "product:0", "customer")
isTableInDenyList(t, vc, "product:0", "customer")
// we tag along this test so as not to create the overhead of creating another cluster
testVStreamCellFlag(t)
}
Expand Down Expand Up @@ -876,13 +876,13 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
switchWrites(t, workflowType, ksWorkflow, false)

var exists bool
exists, err = checkIfDenyListExists(t, vc, "product:0", "customer")
exists, err = isTableInDenyList(t, vc, "product:0", "customer")
require.NoError(t, err, "Error getting denylist for customer:0")
require.True(t, exists)

moveTablesAction(t, "Complete", allCellNames, workflow, sourceKs, targetKs, tables)

exists, err = checkIfDenyListExists(t, vc, "product:0", "customer")
exists, err = isTableInDenyList(t, vc, "product:0", "customer")
require.NoError(t, err, "Error getting denylist for customer:0")
require.False(t, exists)

Expand Down
25 changes: 20 additions & 5 deletions go/test/endtoend/vreplication/wrappers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type moveTables struct {
sourceKeyspace string
tables string
atomicCopy bool
sourceShards string
}

type iMoveTables interface {
Expand All @@ -53,6 +54,7 @@ type iMoveTables interface {
SwitchReads()
SwitchWrites()
SwitchReadsAndWrites()
ReverseReadsAndWrites()
Cancel()
Complete()
Flavor() string
Expand Down Expand Up @@ -91,7 +93,7 @@ func newVtctlMoveTables(mt *moveTables) *VtctlMoveTables {
func (vmt *VtctlMoveTables) Create() {
log.Infof("vmt is %+v", vmt.vc, vmt.tables)
err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
vmt.tables, workflowActionCreate, "", "", "", vmt.atomicCopy)
vmt.tables, workflowActionCreate, "", vmt.sourceShards, "", vmt.atomicCopy)
require.NoError(vmt.vc.t, err)
}

Expand All @@ -101,6 +103,12 @@ func (vmt *VtctlMoveTables) SwitchReadsAndWrites() {
require.NoError(vmt.vc.t, err)
}

func (vmt *VtctlMoveTables) ReverseReadsAndWrites() {
err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
vmt.tables, workflowActionReverseTraffic, "", "", "", vmt.atomicCopy)
require.NoError(vmt.vc.t, err)
}

func (vmt *VtctlMoveTables) Show() {
//TODO implement me
panic("implement me")
Expand All @@ -117,8 +125,9 @@ func (vmt *VtctlMoveTables) SwitchWrites() {
}

func (vmt *VtctlMoveTables) Cancel() {
//TODO implement me
panic("implement me")
err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
vmt.tables, workflowActionCancel, "", "", "", vmt.atomicCopy)
require.NoError(vmt.vc.t, err)
}

func (vmt *VtctlMoveTables) Complete() {
Expand Down Expand Up @@ -158,13 +167,20 @@ func (v VtctldMoveTables) Create() {
if v.atomicCopy {
args = append(args, "--atomic-copy="+strconv.FormatBool(v.atomicCopy))
}
if v.sourceShards != "" {
args = append(args, "--source-shards="+v.sourceShards)
}
v.exec(args...)
}

func (v VtctldMoveTables) SwitchReadsAndWrites() {
v.exec("SwitchTraffic")
}

func (v VtctldMoveTables) ReverseReadsAndWrites() {
v.exec("ReverseTraffic")
}

func (v VtctldMoveTables) Show() {
//TODO implement me
panic("implement me")
Expand All @@ -181,8 +197,7 @@ func (v VtctldMoveTables) SwitchWrites() {
}

func (v VtctldMoveTables) Cancel() {
//TODO implement me
panic("implement me")
v.exec("Cancel")
}

func (v VtctldMoveTables) Complete() {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/topo/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func (si *ShardInfo) GetTabletControl(tabletType topodatapb.TabletType) *topodat
return nil
}

// UpdateSourceDeniedTables will add or remove the listed tables
// UpdateDeniedTables will add or remove the listed tables
// in the shard record's TabletControl structures. Note we don't
// support a lot of the corner cases:
// - only support one table list per shard. If we encounter a different
Expand All @@ -419,7 +419,7 @@ func (si *ShardInfo) GetTabletControl(tabletType topodatapb.TabletType) *topodat
// because it's not used in the same context (vertical vs horizontal sharding)
//
// This function should be called while holding the keyspace lock.
func (si *ShardInfo) UpdateSourceDeniedTables(ctx context.Context, tabletType topodatapb.TabletType, cells []string, remove bool, tables []string) error {
func (si *ShardInfo) UpdateDeniedTables(ctx context.Context, tabletType topodatapb.TabletType, cells []string, remove bool, tables []string) error {
if err := CheckKeyspaceLocked(ctx, si.keyspace); err != nil {
return err
}
Expand Down
20 changes: 10 additions & 10 deletions go/vt/topo/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,14 @@ func lockedKeyspaceContext(keyspace string) context.Context {
}

func addToDenyList(ctx context.Context, si *ShardInfo, tabletType topodatapb.TabletType, cells, tables []string) error {
if err := si.UpdateSourceDeniedTables(ctx, tabletType, cells, false, tables); err != nil {
if err := si.UpdateDeniedTables(ctx, tabletType, cells, false, tables); err != nil {
return err
}
return nil
}

func removeFromDenyList(ctx context.Context, si *ShardInfo, tabletType topodatapb.TabletType, cells, tables []string) error {
if err := si.UpdateSourceDeniedTables(ctx, tabletType, cells, true, tables); err != nil {
if err := si.UpdateDeniedTables(ctx, tabletType, cells, true, tables); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -161,13 +161,13 @@ func TestUpdateSourceDeniedTables(t *testing.T) {

// check we enforce the keyspace lock
ctx := context.Background()
if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, nil, false, nil); err == nil || err.Error() != "keyspace ks is not locked (no locksInfo)" {
if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, nil, false, nil); err == nil || err.Error() != "keyspace ks is not locked (no locksInfo)" {
t.Fatalf("unlocked keyspace produced wrong error: %v", err)
}
ctx = lockedKeyspaceContext("ks")

// add one cell
if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first"}, false, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{
if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first"}, false, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{
{
TabletType: topodatapb.TabletType_RDONLY,
Cells: []string{"first"},
Expand All @@ -178,20 +178,20 @@ func TestUpdateSourceDeniedTables(t *testing.T) {
}

// remove that cell, going back
if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first"}, true, nil); err != nil || len(si.TabletControls) != 0 {
if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first"}, true, nil); err != nil || len(si.TabletControls) != 0 {
t.Fatalf("going back should have remove the record: %v", si)
}

// re-add a cell, then another with different table list to
// make sure it fails
if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first"}, false, []string{"t1", "t2"}); err != nil {
if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first"}, false, []string{"t1", "t2"}); err != nil {
t.Fatalf("one cell add failed: %v", si)
}
if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"second"}, false, []string{"t2", "t3"}); err == nil || err.Error() != "trying to use two different sets of denied tables for shard ks/sh: [t1 t2] and [t2 t3]" {
if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"second"}, false, []string{"t2", "t3"}); err == nil || err.Error() != "trying to use two different sets of denied tables for shard ks/sh: [t1 t2] and [t2 t3]" {
t.Fatalf("different table list should fail: %v", err)
}
// add another cell, see the list grow
if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"second"}, false, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{
if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"second"}, false, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{
{
TabletType: topodatapb.TabletType_RDONLY,
Cells: []string{"first", "second"},
Expand All @@ -202,7 +202,7 @@ func TestUpdateSourceDeniedTables(t *testing.T) {
}

// add all cells, see the list grow to all
if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first", "second", "third"}, false, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{
if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first", "second", "third"}, false, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{
{
TabletType: topodatapb.TabletType_RDONLY,
Cells: []string{"first", "second", "third"},
Expand All @@ -213,7 +213,7 @@ func TestUpdateSourceDeniedTables(t *testing.T) {
}

// remove one cell from the full list
if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"second"}, true, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{
if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"second"}, true, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{
{
TabletType: topodatapb.TabletType_RDONLY,
Cells: []string{"first", "third"},
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3286,7 +3286,7 @@ func (s *VtctldServer) SetShardTabletControl(ctx context.Context, req *vtctldata
defer unlock(&err)

si, err := s.ts.UpdateShardFields(ctx, req.Keyspace, req.Shard, func(si *topo.ShardInfo) error {
return si.UpdateSourceDeniedTables(ctx, req.TabletType, req.Cells, req.Remove, req.DeniedTables)
return si.UpdateDeniedTables(ctx, req.TabletType, req.Cells, req.Remove, req.DeniedTables)
})

switch {
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1883,6 +1883,9 @@ func (s *Server) DropTargets(ctx context.Context, targetKeyspace, workflow strin
if err := sw.dropSourceDeniedTables(ctx); err != nil {
return nil, err
}
if err := sw.dropTargetDeniedTables(ctx); err != nil {
return nil, err
}
case binlogdatapb.MigrationType_SHARDS:
if err := sw.dropTargetShards(ctx); err != nil {
return nil, err
Expand Down Expand Up @@ -2074,6 +2077,9 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy
if err := sw.dropSourceDeniedTables(ctx); err != nil {
return nil, err
}
if err := sw.dropTargetDeniedTables(ctx); err != nil {
return nil, err
}

case binlogdatapb.MigrationType_SHARDS:
log.Infof("Removing shards")
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtctl/workflow/switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func (r *switcher) dropSourceDeniedTables(ctx context.Context) error {
return r.ts.dropSourceDeniedTables(ctx)
}

func (r *switcher) dropTargetDeniedTables(ctx context.Context) error {
return r.ts.dropTargetDeniedTables(ctx)
}

func (r *switcher) validateWorkflowHasCompleted(ctx context.Context) error {
return r.ts.validateWorkflowHasCompleted(ctx)
}
Expand Down
11 changes: 11 additions & 0 deletions go/vt/vtctl/workflow/switcher_dry_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,17 @@ func (dr *switcherDryRun) dropSourceDeniedTables(ctx context.Context) error {
return nil
}

func (dr *switcherDryRun) dropTargetDeniedTables(ctx context.Context) error {
logs := make([]string, 0)
for _, si := range dr.ts.TargetShards() {
logs = append(logs, fmt.Sprintf("keyspace:%s;shard:%s;tablet:%d", si.Keyspace(), si.ShardName(), si.PrimaryAlias.Uid))
}
if len(logs) > 0 {
dr.drLog.Logf("Denied tables records on [%s] will be removed from: [%s]", strings.Join(dr.ts.Tables(), ","), strings.Join(logs, ","))
}
return nil
}

func (dr *switcherDryRun) logs() *[]string {
return &dr.drLog.logs
}
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtctl/workflow/switcher_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type iswitcher interface {
removeSourceTables(ctx context.Context, removalType TableRemovalType) error
dropSourceShards(ctx context.Context) error
dropSourceDeniedTables(ctx context.Context) error
dropTargetDeniedTables(ctx context.Context) error
freezeTargetVReplication(ctx context.Context) error
dropSourceReverseVReplicationStreams(ctx context.Context) error
dropTargetVReplicationStreams(ctx context.Context) error
Expand Down
Loading

0 comments on commit 7540087

Please sign in to comment.