diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index 6d032122430..b9574c24b8f 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -451,7 +451,17 @@ func checkIfTableExists(t *testing.T, vc *VitessCluster, tabletAlias string, tab return found, nil } -func checkIfDenyListExists(t *testing.T, vc *VitessCluster, ksShard string, table string) (bool, error) { +func validateTableInDenyList(t *testing.T, vc *VitessCluster, ksShard string, table string, mustExist bool) { + found, err := isTableInDenyList(t, vc, ksShard, table) + require.NoError(t, err) + if mustExist { + require.True(t, found, "Table %s not found in deny list", table) + } else { + require.False(t, found, "Table %s found in deny list", table) + } +} + +func isTableInDenyList(t *testing.T, vc *VitessCluster, ksShard string, table string) (bool, error) { var output string var err error found := false diff --git a/go/test/endtoend/vreplication/partial_movetables_test.go b/go/test/endtoend/vreplication/partial_movetables_test.go index 7c0784255d7..0ea9f9d3a08 100644 --- a/go/test/endtoend/vreplication/partial_movetables_test.go +++ b/go/test/endtoend/vreplication/partial_movetables_test.go @@ -21,6 +21,8 @@ import ( "strings" "testing" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + "github.com/stretchr/testify/require" "github.com/tidwall/gjson" @@ -28,6 +30,52 @@ import ( "vitess.io/vitess/go/vt/wrangler" ) +// testCancel() starts and cancels a partial MoveTables for one of the shards which will be actually moved later on. +// Before canceling, we first switch traffic to the target keyspace and then reverse it back to the source keyspace. +// This tests that artifacts are being properly cleaned up when a MoveTables ia canceled. +func testCancel(t *testing.T) { + targetKeyspace := "customer2" + sourceKeyspace := "customer" + workflowName := "partial80DashForCancel" + ksWorkflow := fmt.Sprintf("%s.%s", targetKeyspace, workflowName) + // We use a different table in this MoveTables than the subsequent one, so that setting up of the artifacts + // while creating MoveTables do not paper over any issues with cleaning up artifacts when MoveTables is canceled. + // Ref: https://github.com/vitessio/vitess/issues/13998 + table := "customer2" + shard := "80-" + // start the partial movetables for 80- + mt := newMoveTables(vc, &moveTables{ + workflowName: workflowName, + targetKeyspace: targetKeyspace, + sourceKeyspace: sourceKeyspace, + tables: table, + sourceShards: shard, + }, moveTablesFlavorRandom) + mt.Create() + + checkDenyList := func(keyspace string, expected bool) { + validateTableInDenyList(t, vc, fmt.Sprintf("%s:%s", keyspace, shard), table, expected) + } + + waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) + + checkDenyList(targetKeyspace, false) + checkDenyList(sourceKeyspace, false) + + mt.SwitchReadsAndWrites() + checkDenyList(targetKeyspace, false) + checkDenyList(sourceKeyspace, true) + + mt.ReverseReadsAndWrites() + checkDenyList(targetKeyspace, true) + checkDenyList(sourceKeyspace, false) + + mt.Cancel() + checkDenyList(targetKeyspace, false) + checkDenyList(sourceKeyspace, false) + +} + // TestPartialMoveTablesBasic tests partial move tables by moving each // customer shard -- -80,80- -- once a a time to customer2. func TestPartialMoveTablesBasic(t *testing.T) { @@ -58,7 +106,7 @@ func TestPartialMoveTablesBasic(t *testing.T) { // Move customer table from unsharded product keyspace to // sharded customer keyspace. - createMoveTablesWorkflow(t, "customer,loadtest") + createMoveTablesWorkflow(t, "customer,loadtest,customer2") tstWorkflowSwitchReadsAndWrites(t) tstWorkflowComplete(t) @@ -81,6 +129,9 @@ func TestPartialMoveTablesBasic(t *testing.T) { // move tables for one of the two shards: 80-. defaultRdonly = 0 setupCustomer2Keyspace(t) + + testCancel(t) + currentWorkflowType = wrangler.MoveTablesWorkflow wfName := "partial80Dash" sourceKs := "customer" diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 38c7aa8faa3..6e6c2a8b11b 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -435,7 +435,7 @@ func TestMultiCellVreplicationWorkflow(t *testing.T) { verifyClusterHealth(t, vc) insertInitialData(t) shardCustomer(t, true, []*Cell{cell1, cell2}, cell2.Name, true) - checkIfDenyListExists(t, vc, "product:0", "customer") + isTableInDenyList(t, vc, "product:0", "customer") // we tag along this test so as not to create the overhead of creating another cluster testVStreamCellFlag(t) } @@ -876,13 +876,13 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl switchWrites(t, workflowType, ksWorkflow, false) var exists bool - exists, err = checkIfDenyListExists(t, vc, "product:0", "customer") + exists, err = isTableInDenyList(t, vc, "product:0", "customer") require.NoError(t, err, "Error getting denylist for customer:0") require.True(t, exists) moveTablesAction(t, "Complete", allCellNames, workflow, sourceKs, targetKs, tables) - exists, err = checkIfDenyListExists(t, vc, "product:0", "customer") + exists, err = isTableInDenyList(t, vc, "product:0", "customer") require.NoError(t, err, "Error getting denylist for customer:0") require.False(t, exists) diff --git a/go/test/endtoend/vreplication/wrappers_test.go b/go/test/endtoend/vreplication/wrappers_test.go index 3d06c5029a6..6bd0bbb19d8 100644 --- a/go/test/endtoend/vreplication/wrappers_test.go +++ b/go/test/endtoend/vreplication/wrappers_test.go @@ -45,6 +45,7 @@ type moveTables struct { sourceKeyspace string tables string atomicCopy bool + sourceShards string } type iMoveTables interface { @@ -53,6 +54,7 @@ type iMoveTables interface { SwitchReads() SwitchWrites() SwitchReadsAndWrites() + ReverseReadsAndWrites() Cancel() Complete() Flavor() string @@ -91,7 +93,7 @@ func newVtctlMoveTables(mt *moveTables) *VtctlMoveTables { func (vmt *VtctlMoveTables) Create() { log.Infof("vmt is %+v", vmt.vc, vmt.tables) err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, - vmt.tables, workflowActionCreate, "", "", "", vmt.atomicCopy) + vmt.tables, workflowActionCreate, "", vmt.sourceShards, "", vmt.atomicCopy) require.NoError(vmt.vc.t, err) } @@ -101,6 +103,12 @@ func (vmt *VtctlMoveTables) SwitchReadsAndWrites() { require.NoError(vmt.vc.t, err) } +func (vmt *VtctlMoveTables) ReverseReadsAndWrites() { + err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, + vmt.tables, workflowActionReverseTraffic, "", "", "", vmt.atomicCopy) + require.NoError(vmt.vc.t, err) +} + func (vmt *VtctlMoveTables) Show() { //TODO implement me panic("implement me") @@ -117,8 +125,9 @@ func (vmt *VtctlMoveTables) SwitchWrites() { } func (vmt *VtctlMoveTables) Cancel() { - //TODO implement me - panic("implement me") + err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, + vmt.tables, workflowActionCancel, "", "", "", vmt.atomicCopy) + require.NoError(vmt.vc.t, err) } func (vmt *VtctlMoveTables) Complete() { @@ -158,6 +167,9 @@ func (v VtctldMoveTables) Create() { if v.atomicCopy { args = append(args, "--atomic-copy="+strconv.FormatBool(v.atomicCopy)) } + if v.sourceShards != "" { + args = append(args, "--source-shards="+v.sourceShards) + } v.exec(args...) } @@ -165,6 +177,10 @@ func (v VtctldMoveTables) SwitchReadsAndWrites() { v.exec("SwitchTraffic") } +func (v VtctldMoveTables) ReverseReadsAndWrites() { + v.exec("ReverseTraffic") +} + func (v VtctldMoveTables) Show() { //TODO implement me panic("implement me") @@ -181,8 +197,7 @@ func (v VtctldMoveTables) SwitchWrites() { } func (v VtctldMoveTables) Cancel() { - //TODO implement me - panic("implement me") + v.exec("Cancel") } func (v VtctldMoveTables) Complete() { diff --git a/go/vt/topo/shard.go b/go/vt/topo/shard.go index f0623f216d8..183ed409bbb 100644 --- a/go/vt/topo/shard.go +++ b/go/vt/topo/shard.go @@ -410,7 +410,7 @@ func (si *ShardInfo) GetTabletControl(tabletType topodatapb.TabletType) *topodat return nil } -// UpdateSourceDeniedTables will add or remove the listed tables +// UpdateDeniedTables will add or remove the listed tables // in the shard record's TabletControl structures. Note we don't // support a lot of the corner cases: // - only support one table list per shard. If we encounter a different @@ -419,7 +419,7 @@ func (si *ShardInfo) GetTabletControl(tabletType topodatapb.TabletType) *topodat // because it's not used in the same context (vertical vs horizontal sharding) // // This function should be called while holding the keyspace lock. -func (si *ShardInfo) UpdateSourceDeniedTables(ctx context.Context, tabletType topodatapb.TabletType, cells []string, remove bool, tables []string) error { +func (si *ShardInfo) UpdateDeniedTables(ctx context.Context, tabletType topodatapb.TabletType, cells []string, remove bool, tables []string) error { if err := CheckKeyspaceLocked(ctx, si.keyspace); err != nil { return err } diff --git a/go/vt/topo/shard_test.go b/go/vt/topo/shard_test.go index d0ec08f94ea..2c0b9082816 100644 --- a/go/vt/topo/shard_test.go +++ b/go/vt/topo/shard_test.go @@ -106,14 +106,14 @@ func lockedKeyspaceContext(keyspace string) context.Context { } func addToDenyList(ctx context.Context, si *ShardInfo, tabletType topodatapb.TabletType, cells, tables []string) error { - if err := si.UpdateSourceDeniedTables(ctx, tabletType, cells, false, tables); err != nil { + if err := si.UpdateDeniedTables(ctx, tabletType, cells, false, tables); err != nil { return err } return nil } func removeFromDenyList(ctx context.Context, si *ShardInfo, tabletType topodatapb.TabletType, cells, tables []string) error { - if err := si.UpdateSourceDeniedTables(ctx, tabletType, cells, true, tables); err != nil { + if err := si.UpdateDeniedTables(ctx, tabletType, cells, true, tables); err != nil { return err } return nil @@ -161,13 +161,13 @@ func TestUpdateSourceDeniedTables(t *testing.T) { // check we enforce the keyspace lock ctx := context.Background() - if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, nil, false, nil); err == nil || err.Error() != "keyspace ks is not locked (no locksInfo)" { + if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, nil, false, nil); err == nil || err.Error() != "keyspace ks is not locked (no locksInfo)" { t.Fatalf("unlocked keyspace produced wrong error: %v", err) } ctx = lockedKeyspaceContext("ks") // add one cell - if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first"}, false, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{ + if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first"}, false, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{ { TabletType: topodatapb.TabletType_RDONLY, Cells: []string{"first"}, @@ -178,20 +178,20 @@ func TestUpdateSourceDeniedTables(t *testing.T) { } // remove that cell, going back - if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first"}, true, nil); err != nil || len(si.TabletControls) != 0 { + if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first"}, true, nil); err != nil || len(si.TabletControls) != 0 { t.Fatalf("going back should have remove the record: %v", si) } // re-add a cell, then another with different table list to // make sure it fails - if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first"}, false, []string{"t1", "t2"}); err != nil { + if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first"}, false, []string{"t1", "t2"}); err != nil { t.Fatalf("one cell add failed: %v", si) } - if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"second"}, false, []string{"t2", "t3"}); err == nil || err.Error() != "trying to use two different sets of denied tables for shard ks/sh: [t1 t2] and [t2 t3]" { + if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"second"}, false, []string{"t2", "t3"}); err == nil || err.Error() != "trying to use two different sets of denied tables for shard ks/sh: [t1 t2] and [t2 t3]" { t.Fatalf("different table list should fail: %v", err) } // add another cell, see the list grow - if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"second"}, false, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{ + if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"second"}, false, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{ { TabletType: topodatapb.TabletType_RDONLY, Cells: []string{"first", "second"}, @@ -202,7 +202,7 @@ func TestUpdateSourceDeniedTables(t *testing.T) { } // add all cells, see the list grow to all - if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first", "second", "third"}, false, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{ + if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"first", "second", "third"}, false, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{ { TabletType: topodatapb.TabletType_RDONLY, Cells: []string{"first", "second", "third"}, @@ -213,7 +213,7 @@ func TestUpdateSourceDeniedTables(t *testing.T) { } // remove one cell from the full list - if err := si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"second"}, true, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{ + if err := si.UpdateDeniedTables(ctx, topodatapb.TabletType_RDONLY, []string{"second"}, true, []string{"t1", "t2"}); err != nil || !reflect.DeepEqual(si.TabletControls, []*topodatapb.Shard_TabletControl{ { TabletType: topodatapb.TabletType_RDONLY, Cells: []string{"first", "third"}, diff --git a/go/vt/vtctl/grpcvtctldserver/server.go b/go/vt/vtctl/grpcvtctldserver/server.go index 67ab532a5ef..18c4e1567c0 100644 --- a/go/vt/vtctl/grpcvtctldserver/server.go +++ b/go/vt/vtctl/grpcvtctldserver/server.go @@ -3286,7 +3286,7 @@ func (s *VtctldServer) SetShardTabletControl(ctx context.Context, req *vtctldata defer unlock(&err) si, err := s.ts.UpdateShardFields(ctx, req.Keyspace, req.Shard, func(si *topo.ShardInfo) error { - return si.UpdateSourceDeniedTables(ctx, req.TabletType, req.Cells, req.Remove, req.DeniedTables) + return si.UpdateDeniedTables(ctx, req.TabletType, req.Cells, req.Remove, req.DeniedTables) }) switch { diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 38b57795d19..f42a2dda59c 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1883,6 +1883,9 @@ func (s *Server) DropTargets(ctx context.Context, targetKeyspace, workflow strin if err := sw.dropSourceDeniedTables(ctx); err != nil { return nil, err } + if err := sw.dropTargetDeniedTables(ctx); err != nil { + return nil, err + } case binlogdatapb.MigrationType_SHARDS: if err := sw.dropTargetShards(ctx); err != nil { return nil, err @@ -2074,6 +2077,9 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy if err := sw.dropSourceDeniedTables(ctx); err != nil { return nil, err } + if err := sw.dropTargetDeniedTables(ctx); err != nil { + return nil, err + } case binlogdatapb.MigrationType_SHARDS: log.Infof("Removing shards") diff --git a/go/vt/vtctl/workflow/switcher.go b/go/vt/vtctl/workflow/switcher.go index e609d10d279..0cbdce164dc 100644 --- a/go/vt/vtctl/workflow/switcher.go +++ b/go/vt/vtctl/workflow/switcher.go @@ -46,6 +46,10 @@ func (r *switcher) dropSourceDeniedTables(ctx context.Context) error { return r.ts.dropSourceDeniedTables(ctx) } +func (r *switcher) dropTargetDeniedTables(ctx context.Context) error { + return r.ts.dropTargetDeniedTables(ctx) +} + func (r *switcher) validateWorkflowHasCompleted(ctx context.Context) error { return r.ts.validateWorkflowHasCompleted(ctx) } diff --git a/go/vt/vtctl/workflow/switcher_dry_run.go b/go/vt/vtctl/workflow/switcher_dry_run.go index 91b62d468f7..1c8a05e00c2 100644 --- a/go/vt/vtctl/workflow/switcher_dry_run.go +++ b/go/vt/vtctl/workflow/switcher_dry_run.go @@ -313,6 +313,17 @@ func (dr *switcherDryRun) dropSourceDeniedTables(ctx context.Context) error { return nil } +func (dr *switcherDryRun) dropTargetDeniedTables(ctx context.Context) error { + logs := make([]string, 0) + for _, si := range dr.ts.TargetShards() { + logs = append(logs, fmt.Sprintf("keyspace:%s;shard:%s;tablet:%d", si.Keyspace(), si.ShardName(), si.PrimaryAlias.Uid)) + } + if len(logs) > 0 { + dr.drLog.Logf("Denied tables records on [%s] will be removed from: [%s]", strings.Join(dr.ts.Tables(), ","), strings.Join(logs, ",")) + } + return nil +} + func (dr *switcherDryRun) logs() *[]string { return &dr.drLog.logs } diff --git a/go/vt/vtctl/workflow/switcher_interface.go b/go/vt/vtctl/workflow/switcher_interface.go index a9e803dd714..8d0f9e847be 100644 --- a/go/vt/vtctl/workflow/switcher_interface.go +++ b/go/vt/vtctl/workflow/switcher_interface.go @@ -42,6 +42,7 @@ type iswitcher interface { removeSourceTables(ctx context.Context, removalType TableRemovalType) error dropSourceShards(ctx context.Context) error dropSourceDeniedTables(ctx context.Context) error + dropTargetDeniedTables(ctx context.Context) error freezeTargetVReplication(ctx context.Context) error dropSourceReverseVReplicationStreams(ctx context.Context) error dropTargetVReplicationStreams(ctx context.Context) error diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 30c1f0f4119..d4fe77130ae 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -439,7 +439,7 @@ func (ts *trafficSwitcher) deleteShardRoutingRules(ctx context.Context) error { func (ts *trafficSwitcher) dropSourceDeniedTables(ctx context.Context) error { return ts.ForAllSources(func(source *MigrationSource) error { if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error { - return si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables()) + return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables()) }); err != nil { return err } @@ -450,6 +450,20 @@ func (ts *trafficSwitcher) dropSourceDeniedTables(ctx context.Context) error { }) } +func (ts *trafficSwitcher) dropTargetDeniedTables(ctx context.Context) error { + return ts.ForAllTargets(func(target *MigrationTarget) error { + if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error { + return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables()) + }); err != nil { + return err + } + rtbsCtx, cancel := context.WithTimeout(ctx, shardTabletRefreshTimeout) + defer cancel() + _, _, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), target.GetShard(), nil, ts.Logger()) + return err + }) +} + func (ts *trafficSwitcher) validateWorkflowHasCompleted(ctx context.Context) error { return doValidateWorkflowHasCompleted(ctx, ts) } @@ -668,7 +682,7 @@ func (ts *trafficSwitcher) allowTargetWrites(ctx context.Context) error { func (ts *trafficSwitcher) allowTableTargetWrites(ctx context.Context) error { return ts.ForAllTargets(func(target *MigrationTarget) error { if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error { - return si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables()) + return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables()) }); err != nil { return err } @@ -950,7 +964,7 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error { func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access accessType) error { err := ts.ForAllSources(func(source *MigrationSource) error { if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error { - return si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, access == allowWrites /* remove */, ts.Tables()) + return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, access == allowWrites /* remove */, ts.Tables()) }); err != nil { return err } diff --git a/go/vt/wrangler/switcher.go b/go/vt/wrangler/switcher.go index acf1ea194b0..0e1f33b90ea 100644 --- a/go/vt/wrangler/switcher.go +++ b/go/vt/wrangler/switcher.go @@ -48,6 +48,10 @@ func (r *switcher) dropSourceDeniedTables(ctx context.Context) error { return r.ts.dropSourceDeniedTables(ctx) } +func (r *switcher) dropTargetDeniedTables(ctx context.Context) error { + return r.ts.dropTargetDeniedTables(ctx) +} + func (r *switcher) validateWorkflowHasCompleted(ctx context.Context) error { return r.ts.validateWorkflowHasCompleted(ctx) } diff --git a/go/vt/wrangler/switcher_dry_run.go b/go/vt/wrangler/switcher_dry_run.go index 3af271b6b01..7b21ac65fe0 100644 --- a/go/vt/wrangler/switcher_dry_run.go +++ b/go/vt/wrangler/switcher_dry_run.go @@ -329,6 +329,18 @@ func (dr *switcherDryRun) dropSourceDeniedTables(ctx context.Context) error { return nil } +func (dr *switcherDryRun) dropTargetDeniedTables(ctx context.Context) error { + logs := make([]string, 0) + for _, si := range dr.ts.TargetShards() { + logs = append(logs, fmt.Sprintf("\tKeyspace %s Shard %s Tablet %d", si.Keyspace(), si.ShardName(), si.PrimaryAlias.Uid)) + } + if len(logs) > 0 { + dr.drLog.Log(fmt.Sprintf("Denied tables [%s] will be removed from:", strings.Join(dr.ts.Tables(), ","))) + dr.drLog.LogSlice(logs) + } + return nil +} + func (dr *switcherDryRun) logs() *[]string { return &dr.drLog.logs } diff --git a/go/vt/wrangler/switcher_interface.go b/go/vt/wrangler/switcher_interface.go index a29e178e9eb..bae165ec2ea 100644 --- a/go/vt/wrangler/switcher_interface.go +++ b/go/vt/wrangler/switcher_interface.go @@ -44,6 +44,7 @@ type iswitcher interface { removeSourceTables(ctx context.Context, removalType workflow.TableRemovalType) error dropSourceShards(ctx context.Context) error dropSourceDeniedTables(ctx context.Context) error + dropTargetDeniedTables(ctx context.Context) error freezeTargetVReplication(ctx context.Context) error dropSourceReverseVReplicationStreams(ctx context.Context) error dropTargetVReplicationStreams(ctx context.Context) error diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 115e299a792..654a5bd1588 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -712,6 +712,9 @@ func (wr *Wrangler) DropTargets(ctx context.Context, targetKeyspace, workflow st if err := sw.dropSourceDeniedTables(ctx); err != nil { return nil, err } + if err := sw.dropTargetDeniedTables(ctx); err != nil { + return nil, err + } case binlogdatapb.MigrationType_SHARDS: log.Infof("Removing target shards") if err := sw.dropTargetShards(ctx); err != nil { @@ -835,6 +838,9 @@ func (wr *Wrangler) DropSources(ctx context.Context, targetKeyspace, workflowNam if err := sw.dropSourceDeniedTables(ctx); err != nil { return nil, err } + if err := sw.dropTargetDeniedTables(ctx); err != nil { + return nil, err + } case binlogdatapb.MigrationType_SHARDS: log.Infof("Removing shards") @@ -1193,7 +1199,7 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error { func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access accessType) error { err := ts.ForAllSources(func(source *workflow.MigrationSource) error { if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error { - return si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, access == allowWrites /* remove */, ts.Tables()) + return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, access == allowWrites /* remove */, ts.Tables()) }); err != nil { return err } @@ -1498,7 +1504,7 @@ func (ts *trafficSwitcher) allowTargetWrites(ctx context.Context) error { func (ts *trafficSwitcher) allowTableTargetWrites(ctx context.Context) error { return ts.ForAllTargets(func(target *workflow.MigrationTarget) error { if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error { - return si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables()) + return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables()) }); err != nil { return err } @@ -1641,7 +1647,7 @@ func (ts *trafficSwitcher) TargetShards() []*topo.ShardInfo { func (ts *trafficSwitcher) dropSourceDeniedTables(ctx context.Context) error { return ts.ForAllSources(func(source *workflow.MigrationSource) error { if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error { - return si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables()) + return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables()) }); err != nil { return err } @@ -1652,6 +1658,20 @@ func (ts *trafficSwitcher) dropSourceDeniedTables(ctx context.Context) error { }) } +func (ts *trafficSwitcher) dropTargetDeniedTables(ctx context.Context) error { + return ts.ForAllTargets(func(target *workflow.MigrationTarget) error { + if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error { + return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables()) + }); err != nil { + return err + } + rtbsCtx, cancel := context.WithTimeout(ctx, shardTabletRefreshTimeout) + defer cancel() + _, _, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), target.GetShard(), nil, ts.Logger()) + return err + }) +} + func (ts *trafficSwitcher) validateWorkflowHasCompleted(ctx context.Context) error { return doValidateWorkflowHasCompleted(ctx, ts) } diff --git a/go/vt/wrangler/traffic_switcher_test.go b/go/vt/wrangler/traffic_switcher_test.go index 91bfe8a6445..6c97758ad48 100644 --- a/go/vt/wrangler/traffic_switcher_test.go +++ b/go/vt/wrangler/traffic_switcher_test.go @@ -891,7 +891,11 @@ func testTableMigrateOneToMany(t *testing.T, keepData, keepRoutingRules bool) { " Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t1", " Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t2", "Denied tables [t1,t2] will be removed from:", - " Keyspace ks1 Shard 0 Tablet 10") + " Keyspace ks1 Shard 0 Tablet 10", + "Denied tables [t1,t2] will be removed from:", + " Keyspace ks2 Shard -80 Tablet 20", + " Keyspace ks2 Shard 80- Tablet 30", + ) } wantdryRunDropSources = append(wantdryRunDropSources, "Delete reverse vreplication streams on source:", " Keyspace ks1 Shard 0 Workflow test_reverse DbName vt_ks1 Tablet 10", @@ -921,7 +925,11 @@ func testTableMigrateOneToMany(t *testing.T, keepData, keepRoutingRules bool) { "Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t1", " Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t2", "Denied tables [t1,t2] will be removed from:", - " Keyspace ks1 Shard 0 Tablet 10") + " Keyspace ks1 Shard 0 Tablet 10", + "Denied tables [t1,t2] will be removed from:", + " Keyspace ks2 Shard -80 Tablet 20", + " Keyspace ks2 Shard 80- Tablet 30", + ) } wantdryRunRenameSources = append(wantdryRunRenameSources, "Delete reverse vreplication streams on source:", " Keyspace ks1 Shard 0 Workflow test_reverse DbName vt_ks1 Tablet 10",