From eaeba619b28db2afb1a9416f4da561110adac46b Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 24 Nov 2023 19:34:01 +0100 Subject: [PATCH] Add --shards to the VReplication Workflow command. Only partial movetables supported, for safety and since that is the main use case of --shards. Signed-off-by: Rohit Nayak --- go/vt/vtctl/vtctl.go | 22 ++- go/vt/vtctl/workflow/utils.go | 8 +- go/vt/wrangler/testdata/show-80minus.json | 64 +++++++ go/vt/wrangler/testdata/show-all-shards.json | 107 +++++++++++ go/vt/wrangler/testdata/show-minus80.json | 64 +++++++ go/vt/wrangler/traffic_switcher.go | 39 +++- go/vt/wrangler/vexec.go | 43 ++--- go/vt/wrangler/vexec_plan.go | 2 +- go/vt/wrangler/vexec_test.go | 176 +++++-------------- go/vt/wrangler/workflow.go | 19 +- go/vt/wrangler/workflow_test.go | 84 +++++++++ go/vt/wrangler/wrangler.go | 3 +- 12 files changed, 469 insertions(+), 162 deletions(-) create mode 100644 go/vt/wrangler/testdata/show-80minus.json create mode 100644 go/vt/wrangler/testdata/show-all-shards.json create mode 100644 go/vt/wrangler/testdata/show-minus80.json diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 90e5d0d8ddd..76e710fec3a 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -2099,6 +2099,7 @@ func commandVReplicationWorkflow(ctx context.Context, wr *wrangler.Wrangler, sub stopAfterCopy := subFlags.Bool("stop_after_copy", false, "Streams will be stopped once the copy phase is completed") dropForeignKeys := subFlags.Bool("drop_foreign_keys", false, "If true, tables in the target keyspace will be created without foreign keys.") maxReplicationLagAllowed := subFlags.Duration("max_replication_lag_allowed", defaultMaxReplicationLagAllowed, "Allow traffic to be switched only if vreplication lag is below this (in seconds)") + shards := subFlags.StringSlice("shards", nil, "(Optional) Specifies a comma-separated list of shards to operate on.") atomicCopy := subFlags.Bool("atomic-copy", false, "(EXPERIMENTAL) Use this if your source keyspace has tables which use foreign key constraints. All tables from the source will be moved.") onDDL := "IGNORE" @@ -2165,7 +2166,7 @@ func commandVReplicationWorkflow(ctx context.Context, wr *wrangler.Wrangler, sub printDetails := func() error { s := "" - res, err := wr.ShowWorkflow(ctx, workflowName, target) + res, err := wr.ShowWorkflow(ctx, workflowName, target, nil) if err != nil { return err } @@ -2329,6 +2330,7 @@ func commandVReplicationWorkflow(ctx context.Context, wr *wrangler.Wrangler, sub vrwp.KeepRoutingRules = *keepRoutingRules } vrwp.WorkflowType = workflowType + vrwp.ShardsToAffect = *shards wf, err := wr.NewVReplicationWorkflow(ctx, workflowType, vrwp) if err != nil { log.Warningf("NewVReplicationWorkflow returned error %+v", wf) @@ -2338,6 +2340,15 @@ func commandVReplicationWorkflow(ctx context.Context, wr *wrangler.Wrangler, sub return fmt.Errorf("workflow %s does not exist", ksWorkflow) } + if len(vrwp.ShardsToAffect) > 0 { + if workflowType == wrangler.MoveTablesWorkflow && wf.IsPartialMigration() && action != vReplicationWorkflowActionCreate { + log.Infof("Subset of shards: %s have been specified for keyspace %s, workflow %s, for action %s", + vrwp.ShardsToAffect, target, workflowName, action) + } else { + return fmt.Errorf("shards can only be specified for existing Partial MoveTables workflows") + } + } + printCopyProgress := func() error { copyProgress, err := wf.GetCopyProgress() if err != nil { @@ -2378,6 +2389,7 @@ func commandVReplicationWorkflow(ctx context.Context, wr *wrangler.Wrangler, sub } } + wr.WorkflowParams = vrwp var dryRunResults *[]string startState := wf.CachedState() switch action { @@ -3721,8 +3733,9 @@ func commandHelp(ctx context.Context, wr *wrangler.Wrangler, subFlags *pflag.Fla } func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *pflag.FlagSet, args []string) error { - usage := "usage: Workflow [--dry-run] [--cells] [--tablet-types] [.] start/stop/update/delete/show/listall/tags []" + usage := "usage: Workflow [--shards ] [--dry-run] [--cells] [--tablet-types] [.] start/stop/update/delete/show/listall/tags []" dryRun := subFlags.Bool("dry-run", false, "Does a dry run of the Workflow action and reports the query and list of tablets on which the operation will be applied") + shards := subFlags.StringSlice("shards", nil, "(Optional) Specifies a comma-separated list of shards to operate on.") cells := subFlags.StringSlice("cells", []string{}, "New Cell(s) or CellAlias(es) (comma-separated) to replicate from. (Update only)") tabletTypesStrs := subFlags.StringSlice("tablet-types", []string{}, "New source tablet types to replicate from (e.g. PRIMARY, REPLICA, RDONLY). (Update only)") onDDL := subFlags.String("on-ddl", "", "New instruction on what to do when DDL is encountered in the VReplication stream. Possible values are IGNORE, STOP, EXEC, and EXEC_IGNORE. (Update only)") @@ -3732,6 +3745,9 @@ func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *pflag if subFlags.NArg() < 2 { return fmt.Errorf(usage) } + if len(*shards) > 0 { + log.Infof("Subset of shards specified: %d, %v", len(*shards), strings.Join(*shards, ",")) + } keyspace := subFlags.Arg(0) action := strings.ToLower(subFlags.Arg(1)) var workflow string @@ -3818,7 +3834,7 @@ func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *pflag OnDdl: binlogdatapb.OnDDLAction(onddl), } } - results, err = wr.WorkflowAction(ctx, workflow, keyspace, action, *dryRun, rpcReq) // Only update currently uses the new RPC path + results, err = wr.WorkflowAction(ctx, workflow, keyspace, action, *dryRun, rpcReq, *shards) // Only update currently uses the new RPC path if err != nil { return err } diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go index 1a723c6192c..e72749f8f1a 100644 --- a/go/vt/vtctl/workflow/utils.go +++ b/go/vt/vtctl/workflow/utils.go @@ -657,12 +657,8 @@ func areTabletsAvailableToStreamFrom(ctx context.Context, req *vtctldatapb.Workf // New callers should instead use the new BuildTargets function. // // It returns ErrNoStreams if there are no targets found for the workflow. -func LegacyBuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, targetKeyspace string, workflow string) (*TargetInfo, error) { - targetShards, err := ts.GetShardNames(ctx, targetKeyspace) - if err != nil { - return nil, err - } - +func LegacyBuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, targetKeyspace string, + workflow string, targetShards []string) (*TargetInfo, error) { var ( frozen bool optCells string diff --git a/go/vt/wrangler/testdata/show-80minus.json b/go/vt/wrangler/testdata/show-80minus.json new file mode 100644 index 00000000000..6068dce9626 --- /dev/null +++ b/go/vt/wrangler/testdata/show-80minus.json @@ -0,0 +1,64 @@ +{ + "Workflow": "wrWorkflow", + "SourceLocation": { + "Keyspace": "source", + "Shards": [ + "0" + ] + }, + "TargetLocation": { + "Keyspace": "target", + "Shards": [ + "80-" + ] + }, + "MaxVReplicationLag": 0, + "MaxVReplicationTransactionLag": 0, + "Frozen": false, + "ShardStatuses": { + "80-/zone1-0000000210": { + "PrimaryReplicationStatuses": [ + { + "Shard": "80-", + "Tablet": "zone1-0000000210", + "ID": 1, + "Bls": { + "keyspace": "source", + "shard": "0", + "filter": { + "rules": [ + { + "match": "t1" + } + ] + } + }, + "Pos": "14b68925-696a-11ea-aee7-fec597a91f5e:1-3", + "StopPos": "", + "State": "Copying", + "DBName": "vt_target", + "TransactionTimestamp": 0, + "TimeUpdated": 1234, + "TimeHeartbeat": 1234, + "TimeThrottled": 0, + "ComponentThrottled": "", + "Message": "", + "Tags": "", + "WorkflowType": "Materialize", + "WorkflowSubType": "None", + "CopyState": [ + { + "Table": "t1", + "LastPK": "pk1" + } + ], + "RowsCopied": 1000 + } + ], + "TabletControls": null, + "PrimaryIsServing": true + } + }, + "SourceTimeZone": "", + "TargetTimeZone": "" +} \ No newline at end of file diff --git a/go/vt/wrangler/testdata/show-all-shards.json b/go/vt/wrangler/testdata/show-all-shards.json new file mode 100644 index 00000000000..4bc4aaa37b9 --- /dev/null +++ b/go/vt/wrangler/testdata/show-all-shards.json @@ -0,0 +1,107 @@ +{ + "Workflow": "wrWorkflow", + "SourceLocation": { + "Keyspace": "source", + "Shards": [ + "0" + ] + }, + "TargetLocation": { + "Keyspace": "target", + "Shards": [ + "-80", + "80-" + ] + }, + "MaxVReplicationLag": 0, + "MaxVReplicationTransactionLag": 0, + "Frozen": false, + "ShardStatuses": { + "-80/zone1-0000000200": { + "PrimaryReplicationStatuses": [ + { + "Shard": "-80", + "Tablet": "zone1-0000000200", + "ID": 1, + "Bls": { + "keyspace": "source", + "shard": "0", + "filter": { + "rules": [ + { + "match": "t1" + } + ] + } + }, + "Pos": "14b68925-696a-11ea-aee7-fec597a91f5e:1-3", + "StopPos": "", + "State": "Copying", + "DBName": "vt_target", + "TransactionTimestamp": 0, + "TimeUpdated": 1234, + "TimeHeartbeat": 1234, + "TimeThrottled": 0, + "ComponentThrottled": "", + "Message": "", + "Tags": "", + "WorkflowType": "Materialize", + "WorkflowSubType": "None", + "CopyState": [ + { + "Table": "t1", + "LastPK": "pk1" + } + ], + "RowsCopied": 1000 + } + ], + "TabletControls": null, + "PrimaryIsServing": true + }, + "80-/zone1-0000000210": { + "PrimaryReplicationStatuses": [ + { + "Shard": "80-", + "Tablet": "zone1-0000000210", + "ID": 1, + "Bls": { + "keyspace": "source", + "shard": "0", + "filter": { + "rules": [ + { + "match": "t1" + } + ] + } + }, + "Pos": "14b68925-696a-11ea-aee7-fec597a91f5e:1-3", + "StopPos": "", + "State": "Copying", + "DBName": "vt_target", + "TransactionTimestamp": 0, + "TimeUpdated": 1234, + "TimeHeartbeat": 1234, + "TimeThrottled": 0, + "ComponentThrottled": "", + "Message": "", + "Tags": "", + "WorkflowType": "Materialize", + "WorkflowSubType": "None", + "CopyState": [ + { + "Table": "t1", + "LastPK": "pk1" + } + ], + "RowsCopied": 1000 + } + ], + "TabletControls": null, + "PrimaryIsServing": true + } + }, + "SourceTimeZone": "", + "TargetTimeZone": "" +} \ No newline at end of file diff --git a/go/vt/wrangler/testdata/show-minus80.json b/go/vt/wrangler/testdata/show-minus80.json new file mode 100644 index 00000000000..85187246bd0 --- /dev/null +++ b/go/vt/wrangler/testdata/show-minus80.json @@ -0,0 +1,64 @@ +{ + "Workflow": "wrWorkflow", + "SourceLocation": { + "Keyspace": "source", + "Shards": [ + "0" + ] + }, + "TargetLocation": { + "Keyspace": "target", + "Shards": [ + "-80" + ] + }, + "MaxVReplicationLag": 0, + "MaxVReplicationTransactionLag": 0, + "Frozen": false, + "ShardStatuses": { + "-80/zone1-0000000200": { + "PrimaryReplicationStatuses": [ + { + "Shard": "-80", + "Tablet": "zone1-0000000200", + "ID": 1, + "Bls": { + "keyspace": "source", + "shard": "0", + "filter": { + "rules": [ + { + "match": "t1" + } + ] + } + }, + "Pos": "14b68925-696a-11ea-aee7-fec597a91f5e:1-3", + "StopPos": "", + "State": "Copying", + "DBName": "vt_target", + "TransactionTimestamp": 0, + "TimeUpdated": 1234, + "TimeHeartbeat": 1234, + "TimeThrottled": 0, + "ComponentThrottled": "", + "Message": "", + "Tags": "", + "WorkflowType": "Materialize", + "WorkflowSubType": "None", + "CopyState": [ + { + "Table": "t1", + "LastPK": "pk1" + } + ], + "RowsCopied": 1000 + } + ], + "TabletControls": null, + "PrimaryIsServing": true + } + }, + "SourceTimeZone": "", + "TargetTimeZone": "" +} \ No newline at end of file diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index ccbcee9c3b0..05f35fa3641 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -856,8 +856,45 @@ func (wr *Wrangler) DropSources(ctx context.Context, targetKeyspace, workflowNam return sw.logs(), nil } +func (wr *Wrangler) getShardsToAffect(ctx context.Context, keyspace string, shardsToAffect []string) ([]string, error) { + if wr.WorkflowParams != nil && len(wr.WorkflowParams.ShardsToAffect) > 0 { + shardsToAffect = wr.WorkflowParams.ShardsToAffect + } + allShards, err := wr.ts.GetShardNames(ctx, keyspace) + if err != nil { + return nil, err + } + if len(allShards) == 0 { + return nil, fmt.Errorf("no shards found in keyspace %s", keyspace) + } + + if len(shardsToAffect) == 0 { + return allShards, nil + } else { + for _, shard := range shardsToAffect { + // Validate that the provided shards are part of the keyspace. + found := false + for _, shard2 := range allShards { + if shard == shard2 { + found = true + } + } + if !found { + return nil, fmt.Errorf("shard %s not found in keyspace %s", shard, keyspace) + } + } + log.Infof("Selecting subset of shards in keyspace %s: %d from %d :: %+v", + keyspace, len(shardsToAffect), len(allShards), shardsToAffect) + return shardsToAffect, nil + } +} + func (wr *Wrangler) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workflowName string) (*trafficSwitcher, error) { - tgtInfo, err := workflow.LegacyBuildTargets(ctx, wr.ts, wr.tmc, targetKeyspace, workflowName) + shardsToAffect, err := wr.getShardsToAffect(ctx, targetKeyspace, nil) + if err != nil { + return nil, err + } + tgtInfo, err := workflow.LegacyBuildTargets(ctx, wr.ts, wr.tmc, targetKeyspace, workflowName, shardsToAffect) if err != nil { log.Infof("Error building targets: %s", err) return nil, err diff --git a/go/vt/wrangler/vexec.go b/go/vt/wrangler/vexec.go index 0734fa7b593..055f1014b0e 100644 --- a/go/vt/wrangler/vexec.go +++ b/go/vt/wrangler/vexec.go @@ -158,7 +158,7 @@ func (wr *Wrangler) VExec(ctx context.Context, workflow, keyspace, query string, if wr.VExecFunc != nil { return wr.VExecFunc(ctx, workflow, keyspace, query, dryRun) } - results, err := wr.runVexec(ctx, workflow, keyspace, query, nil, dryRun) + results, err := wr.runVexec(ctx, workflow, keyspace, query, nil, dryRun, nil) retResults := make(map[*topo.TabletInfo]*sqltypes.Result) for tablet, result := range results { retResults[tablet] = sqltypes.Proto3ToResult(result) @@ -167,10 +167,13 @@ func (wr *Wrangler) VExec(ctx context.Context, workflow, keyspace, query string, } // runVexec is the main function that runs a dry or wet execution of 'query' on backend shards. -func (wr *Wrangler) runVexec(ctx context.Context, workflow, keyspace, query string, callback func(context.Context, *topo.TabletInfo) (*querypb.QueryResult, error), dryRun bool) (map[*topo.TabletInfo]*querypb.QueryResult, error) { +func (wr *Wrangler) runVexec(ctx context.Context, workflow, keyspace, query string, + callback func(context.Context, *topo.TabletInfo) (*querypb.QueryResult, error), + dryRun bool, shards []string) (map[*topo.TabletInfo]*querypb.QueryResult, error) { + vx := newVExec(ctx, workflow, keyspace, query, wr) - if err := vx.getPrimaries(); err != nil { + if err := vx.getPrimaries(shards); err != nil { return nil, err } if callback == nil { // Using legacy SQL query path @@ -285,15 +288,12 @@ func (vx *vexec) parseQuery() (err error) { } // getPrimaries identifies primary tablet for all shards relevant to our keyspace -func (vx *vexec) getPrimaries() error { +func (vx *vexec) getPrimaries(shards []string) error { var err error - shards, err := vx.wr.ts.GetShardNames(vx.ctx, vx.keyspace) + shards, err = vx.wr.getShardsToAffect(vx.ctx, vx.keyspace, shards) if err != nil { return err } - if len(shards) == 0 { - return fmt.Errorf("no shards found in keyspace %s", vx.keyspace) - } var allPrimaries []*topo.TabletInfo var primary *topo.TabletInfo for _, shard := range shards { @@ -340,10 +340,11 @@ func (wr *Wrangler) convertQueryResultToSQLTypesResult(results map[*topo.TabletI // rpcReq is an optional argument for any actions that use the new RPC path. Today // that is only the update action. When using the SQL interface this is ignored and // you can pass nil. -func (wr *Wrangler) WorkflowAction(ctx context.Context, workflow, keyspace, action string, dryRun bool, rpcReq any) (map[*topo.TabletInfo]*sqltypes.Result, error) { +func (wr *Wrangler) WorkflowAction(ctx context.Context, workflow, keyspace, action string, dryRun bool, rpcReq any, + shards []string) (map[*topo.TabletInfo]*sqltypes.Result, error) { switch action { case "show": - replStatus, err := wr.ShowWorkflow(ctx, workflow, keyspace) + replStatus, err := wr.ShowWorkflow(ctx, workflow, keyspace, shards) if err != nil { return nil, err } @@ -358,7 +359,7 @@ func (wr *Wrangler) WorkflowAction(ctx context.Context, workflow, keyspace, acti return nil, err default: } - results, err := wr.execWorkflowAction(ctx, workflow, keyspace, action, dryRun, rpcReq) + results, err := wr.execWorkflowAction(ctx, workflow, keyspace, action, dryRun, rpcReq, shards) if err != nil { return nil, err } @@ -390,7 +391,7 @@ func (wr *Wrangler) getWorkflowActionQuery(action string) (string, error) { // canRestartWorkflow validates that, for an atomic copy workflow, none of the streams are still in the copy phase. // Since we copy all tables in a single snapshot, we cannot restart a workflow which broke before all tables were copied. func (wr *Wrangler) canRestartWorkflow(ctx context.Context, workflow, keyspace string) error { - res, err := wr.ShowWorkflow(ctx, workflow, keyspace) + res, err := wr.ShowWorkflow(ctx, workflow, keyspace, nil) if err != nil { return err } @@ -409,7 +410,9 @@ func (wr *Wrangler) canRestartWorkflow(ctx context.Context, workflow, keyspace s return nil } -func (wr *Wrangler) execWorkflowAction(ctx context.Context, workflow, keyspace, action string, dryRun bool, rpcReq any) (map[*topo.TabletInfo]*querypb.QueryResult, error) { +func (wr *Wrangler) execWorkflowAction(ctx context.Context, workflow, keyspace, action string, dryRun bool, + rpcReq any, shards []string) (map[*topo.TabletInfo]*querypb.QueryResult, error) { + var callback func(context.Context, *topo.TabletInfo) (*querypb.QueryResult, error) = nil query, err := wr.getWorkflowActionQuery(action) if err != nil { @@ -452,7 +455,7 @@ func (wr *Wrangler) execWorkflowAction(ctx context.Context, workflow, keyspace, wr.Logger().Printf("On the following tablets in the %s keyspace for workflow %s:\n", keyspace, workflow) vx := newVExec(ctx, workflow, keyspace, "", wr) - if err := vx.getPrimaries(); err != nil { + if err := vx.getPrimaries(shards); err != nil { return nil, err } tablets := vx.primaries @@ -475,13 +478,13 @@ func (wr *Wrangler) execWorkflowAction(ctx context.Context, workflow, keyspace, } } - return wr.runVexec(ctx, workflow, keyspace, query, callback, dryRun) + return wr.runVexec(ctx, workflow, keyspace, query, callback, dryRun, shards) } // WorkflowTagAction sets or clears the tags for a workflow in a keyspace func (wr *Wrangler) WorkflowTagAction(ctx context.Context, keyspace string, workflow string, tags string) (map[*topo.TabletInfo]*sqltypes.Result, error) { query := fmt.Sprintf("update _vt.vreplication set tags = %s", encodeString(tags)) - results, err := wr.runVexec(ctx, workflow, keyspace, query, nil, false) + results, err := wr.runVexec(ctx, workflow, keyspace, query, nil, false, nil) return wr.convertQueryResultToSQLTypesResult(results), err } @@ -697,7 +700,7 @@ func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row sqltype return status, bls.Keyspace, nil } -func (wr *Wrangler) getStreams(ctx context.Context, workflow, keyspace string) (*ReplicationStatusResult, error) { +func (wr *Wrangler) getStreams(ctx context.Context, workflow, keyspace string, shards []string) (*ReplicationStatusResult, error) { var rsr ReplicationStatusResult rsr.ShardStatuses = make(map[string]*ShardReplicationStatus) rsr.Workflow = workflow @@ -722,7 +725,7 @@ func (wr *Wrangler) getStreams(ctx context.Context, workflow, keyspace string) ( defer_secondary_keys, rows_copied from _vt.vreplication` - results, err := wr.runVexec(ctx, workflow, keyspace, query, nil, false) + results, err := wr.runVexec(ctx, workflow, keyspace, query, nil, false, shards) if err != nil { return nil, err } @@ -861,8 +864,8 @@ func (wr *Wrangler) ListAllWorkflows(ctx context.Context, keyspace string, activ } // ShowWorkflow will return all of the relevant replication related information for the given workflow. -func (wr *Wrangler) ShowWorkflow(ctx context.Context, workflow, keyspace string) (*ReplicationStatusResult, error) { - replStatus, err := wr.getStreams(ctx, workflow, keyspace) +func (wr *Wrangler) ShowWorkflow(ctx context.Context, workflow, keyspace string, shards []string) (*ReplicationStatusResult, error) { + replStatus, err := wr.getStreams(ctx, workflow, keyspace, shards) if err != nil { return nil, err } diff --git a/go/vt/wrangler/vexec_plan.go b/go/vt/wrangler/vexec_plan.go index 5b68d9ada5f..e0f04d830d0 100644 --- a/go/vt/wrangler/vexec_plan.go +++ b/go/vt/wrangler/vexec_plan.go @@ -83,7 +83,7 @@ func (p vreplicationPlanner) exec( return qr, nil } func (p vreplicationPlanner) dryRun(ctx context.Context) error { - rsr, err := p.vx.wr.getStreams(p.vx.ctx, p.vx.workflow, p.vx.keyspace) + rsr, err := p.vx.wr.getStreams(p.vx.ctx, p.vx.workflow, p.vx.keyspace, nil) if err != nil { return err } diff --git a/go/vt/wrangler/vexec_test.go b/go/vt/wrangler/vexec_test.go index ead2be6a56f..0ce113ac6a3 100644 --- a/go/vt/wrangler/vexec_test.go +++ b/go/vt/wrangler/vexec_test.go @@ -18,6 +18,7 @@ package wrangler import ( "context" + _ "embed" "fmt" "regexp" "sort" @@ -35,6 +36,15 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) +var ( + //go:embed testdata/show-all-shards.json + want_show_all_shards string + //go:embed testdata/show-minus80.json + want_show_minus_80 string + //go:embed testdata/show-80minus.json + want_show_80_minus string +) + func TestVExec(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -47,7 +57,7 @@ func TestVExec(t *testing.T) { wr := New(logger, env.topoServ, env.tmc) vx := newVExec(ctx, workflow, keyspace, query, wr) - err := vx.getPrimaries() + err := vx.getPrimaries(nil) require.Nil(t, err) primaries := vx.primaries require.NotNil(t, primaries) @@ -78,7 +88,7 @@ func TestVExec(t *testing.T) { vx.plannedQuery = plan.parsedQuery.Query vx.exec() - res, err := wr.getStreams(ctx, workflow, keyspace) + res, err := wr.getStreams(ctx, workflow, keyspace, nil) require.NoError(t, err) require.Less(t, res.MaxVReplicationLag, int64(3 /*seconds*/)) // lag should be very small @@ -191,138 +201,48 @@ func TestWorkflowListStreams(t *testing.T) { logger := logutil.NewMemoryLogger() wr := New(logger, env.topoServ, env.tmc) - _, err := wr.WorkflowAction(ctx, workflow, keyspace, "listall", false, nil) + _, err := wr.WorkflowAction(ctx, workflow, keyspace, "listall", false, nil, nil) require.NoError(t, err) - _, err = wr.WorkflowAction(ctx, workflow, "badks", "show", false, nil) + _, err = wr.WorkflowAction(ctx, workflow, "badks", "show", false, nil, nil) require.Errorf(t, err, "node doesn't exist: keyspaces/badks/shards") - _, err = wr.WorkflowAction(ctx, "badwf", keyspace, "show", false, nil) + _, err = wr.WorkflowAction(ctx, "badwf", keyspace, "show", false, nil, nil) require.Errorf(t, err, "no streams found for workflow badwf in keyspace target") logger.Clear() - _, err = wr.WorkflowAction(ctx, workflow, keyspace, "show", false, nil) - require.NoError(t, err) - want := `{ - "Workflow": "wrWorkflow", - "SourceLocation": { - "Keyspace": "source", - "Shards": [ - "0" - ] - }, - "TargetLocation": { - "Keyspace": "target", - "Shards": [ - "-80", - "80-" - ] - }, - "MaxVReplicationLag": 0, - "MaxVReplicationTransactionLag": 0, - "Frozen": false, - "ShardStatuses": { - "-80/zone1-0000000200": { - "PrimaryReplicationStatuses": [ - { - "Shard": "-80", - "Tablet": "zone1-0000000200", - "ID": 1, - "Bls": { - "keyspace": "source", - "shard": "0", - "filter": { - "rules": [ - { - "match": "t1" - } - ] - } - }, - "Pos": "14b68925-696a-11ea-aee7-fec597a91f5e:1-3", - "StopPos": "", - "State": "Copying", - "DBName": "vt_target", - "TransactionTimestamp": 0, - "TimeUpdated": 1234, - "TimeHeartbeat": 1234, - "TimeThrottled": 0, - "ComponentThrottled": "", - "Message": "", - "Tags": "", - "WorkflowType": "Materialize", - "WorkflowSubType": "None", - "CopyState": [ - { - "Table": "t1", - "LastPK": "pk1" - } - ], - "RowsCopied": 1000 - } - ], - "TabletControls": null, - "PrimaryIsServing": true - }, - "80-/zone1-0000000210": { - "PrimaryReplicationStatuses": [ - { - "Shard": "80-", - "Tablet": "zone1-0000000210", - "ID": 1, - "Bls": { - "keyspace": "source", - "shard": "0", - "filter": { - "rules": [ - { - "match": "t1" - } - ] - } - }, - "Pos": "14b68925-696a-11ea-aee7-fec597a91f5e:1-3", - "StopPos": "", - "State": "Copying", - "DBName": "vt_target", - "TransactionTimestamp": 0, - "TimeUpdated": 1234, - "TimeHeartbeat": 1234, - "TimeThrottled": 0, - "ComponentThrottled": "", - "Message": "", - "Tags": "", - "WorkflowType": "Materialize", - "WorkflowSubType": "None", - "CopyState": [ - { - "Table": "t1", - "LastPK": "pk1" - } - ], - "RowsCopied": 1000 - } - ], - "TabletControls": null, - "PrimaryIsServing": true - } - }, - "SourceTimeZone": "", - "TargetTimeZone": "" -} -` - got := logger.String() - // MaxVReplicationLag needs to be reset. This can't be determinable in this kind of a test because time.Now() is constantly shifting. - re := regexp.MustCompile(`"MaxVReplicationLag": \d+`) - got = re.ReplaceAllLiteralString(got, `"MaxVReplicationLag": 0`) - re = regexp.MustCompile(`"MaxVReplicationTransactionLag": \d+`) - got = re.ReplaceAllLiteralString(got, `"MaxVReplicationTransactionLag": 0`) - require.Equal(t, want, got) - - results, err := wr.execWorkflowAction(ctx, workflow, keyspace, "stop", false, nil) - require.Nil(t, err) + var testCases = []struct { + shards []string + want string + }{ + {[]string{"-80", "80-"}, want_show_all_shards}, + {[]string{"-80"}, want_show_minus_80}, + {[]string{"80-"}, want_show_80_minus}, + } + scrub := func(s string) string { + s = strings.ReplaceAll(s, "\t", "") + s = strings.ReplaceAll(s, "\n", "") + s = strings.ReplaceAll(s, " ", "") + return s + } + for _, testCase := range testCases { + t.Run(fmt.Sprintf("%v", testCase.shards), func(t *testing.T) { + want := scrub(testCase.want) + _, err = wr.WorkflowAction(ctx, workflow, keyspace, "show", false, nil, testCase.shards) + require.NoError(t, err) + got := scrub(logger.String()) + // MaxVReplicationLag needs to be reset. This can't be determinable in this kind of a test because + // time.Now() is constantly shifting. + re := regexp.MustCompile(`"MaxVReplicationLag":\d+`) + got = re.ReplaceAllLiteralString(got, `"MaxVReplicationLag":0`) + re = regexp.MustCompile(`"MaxVReplicationTransactionLag":\d+`) + got = re.ReplaceAllLiteralString(got, `"MaxVReplicationTransactionLag":0`) + require.Equal(t, want, got) + logger.Clear() + }) + } - // convert map to list and sort it for comparison + results, err := wr.execWorkflowAction(ctx, workflow, keyspace, "stop", false, nil, nil) // convert map to list and sort it for comparison var gotResults []string for key, result := range results { gotResults = append(gotResults, fmt.Sprintf("%s:%v", key.String(), result)) @@ -333,7 +253,7 @@ func TestWorkflowListStreams(t *testing.T) { require.ElementsMatch(t, wantResults, gotResults) logger.Clear() - results, err = wr.execWorkflowAction(ctx, workflow, keyspace, "stop", true, nil) + results, err = wr.execWorkflowAction(ctx, workflow, keyspace, "stop", true, nil, nil) require.Nil(t, err) require.Equal(t, "map[]", fmt.Sprintf("%v", results)) dryRunResult := `Query: update _vt.vreplication set state = 'Stopped' where db_name = 'vt_target' and workflow = 'wrWorkflow' @@ -528,7 +448,7 @@ func TestWorkflowUpdate(t *testing.T) { OnDdl: tcase.onDDL, } - _, err := wr.WorkflowAction(ctx, workflow, keyspace, "update", true, rpcReq) + _, err := wr.WorkflowAction(ctx, workflow, keyspace, "update", true, rpcReq, nil) if tcase.wantErr != "" { require.Error(t, err) require.Equal(t, err.Error(), tcase.wantErr) diff --git a/go/vt/wrangler/workflow.go b/go/vt/wrangler/workflow.go index d9dbcee7291..67f464064d4 100644 --- a/go/vt/wrangler/workflow.go +++ b/go/vt/wrangler/workflow.go @@ -77,6 +77,12 @@ type VReplicationWorkflowParams struct { // MoveTables only NoRoutingRules bool + + // Partial MoveTables only + // Only these shards will be expected to participate in the workflow. Expects user to know what they are doing + // and provide the correct set of shards associated with the workflow. This is for reducing latency for workflows + // that only use a small set of shards in a keyspace with a large number of shards. + ShardsToAffect []string } // VReplicationWorkflow stores various internal objects for a workflow @@ -101,6 +107,7 @@ func (vrw *VReplicationWorkflow) String() string { func (wr *Wrangler) NewVReplicationWorkflow(ctx context.Context, workflowType VReplicationWorkflowType, params *VReplicationWorkflowParams) (*VReplicationWorkflow, error) { + wr.WorkflowParams = params log.Infof("NewVReplicationWorkflow with params %+v", params) vrw := &VReplicationWorkflow{wr: wr, ctx: ctx, params: params, workflowType: workflowType} ts, ws, err := wr.getWorkflowState(ctx, params.TargetKeyspace, params.Workflow) @@ -258,7 +265,7 @@ func (vrw *VReplicationWorkflow) GetStreamCount() (int64, int64, []*WorkflowErro var err error var workflowErrors []*WorkflowError var total, started int64 - res, err := vrw.wr.ShowWorkflow(vrw.ctx, vrw.params.Workflow, vrw.params.TargetKeyspace) + res, err := vrw.wr.ShowWorkflow(vrw.ctx, vrw.params.Workflow, vrw.params.TargetKeyspace, nil) if err != nil { return 0, 0, nil, err } @@ -525,7 +532,11 @@ func (vrw *VReplicationWorkflow) canSwitch(keyspace, workflowName string) (reaso return "", nil } log.Infof("state:%s, direction %d, switched %t", vrw.CachedState(), vrw.params.Direction, ws.WritesSwitched) - result, err := vrw.wr.getStreams(vrw.ctx, workflowName, keyspace) + var shards []string + if vrw.params.ShardsToAffect != nil { + shards = vrw.params.ShardsToAffect + } + result, err := vrw.wr.getStreams(vrw.ctx, workflowName, keyspace, shards) if err != nil { return "", err } @@ -707,6 +718,10 @@ func (vrw *VReplicationWorkflow) GetCopyProgress() (*CopyProgress, error) { return ©Progress, nil } +func (vrw *VReplicationWorkflow) IsPartialMigration() bool { + return vrw.ws.IsPartialMigration +} + // endregion // region Workflow related utility functions diff --git a/go/vt/wrangler/workflow_test.go b/go/vt/wrangler/workflow_test.go index be3589a3f58..c6743900f93 100644 --- a/go/vt/wrangler/workflow_test.go +++ b/go/vt/wrangler/workflow_test.go @@ -378,6 +378,90 @@ func TestPartialMoveTables(t *testing.T) { require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState()) } +// TestPartialMoveTablesSpecifiedShards is a version of TestPartialMoveTables which uses the --shards option. +func TestPartialMoveTablesSpecifiedShards(t *testing.T) { + ctx := context.Background() + shards := []string{"-80", "80-"} + shardToMove := shards[0:1] + otherShard := shards[1:] + p := &VReplicationWorkflowParams{ + Workflow: "test", + WorkflowType: MoveTablesWorkflow, + SourceKeyspace: "ks1", + SourceShards: shardToMove, // shard by shard + TargetShards: shardToMove, // shard by shard + TargetKeyspace: "ks2", + Tables: "t1,t2", + Cells: "cell1,cell2", + TabletTypes: "REPLICA,RDONLY,PRIMARY", + Timeout: DefaultActionTimeout, + MaxAllowedTransactionLagSeconds: defaultMaxAllowedTransactionLagSeconds, + OnDDL: binlogdatapb.OnDDLAction_STOP.String(), + } + tme := newTestTablePartialMigrater(ctx, t, shards, shards[0:1], "select * %s") + defer tme.stopTablets(t) + + // Save some unrelated shard routing rules to be sure that + // they don't interfere in any way. + srr, err := tme.ts.GetShardRoutingRules(ctx) + require.NoError(t, err) + srr.Rules = append(srr.Rules, []*vschema.ShardRoutingRule{ + { + FromKeyspace: "wut", + Shard: "40-80", + ToKeyspace: "bloop", + }, + { + FromKeyspace: "haylo", + Shard: "-80", + ToKeyspace: "blarg", + }, + }...) + err = tme.ts.SaveShardRoutingRules(ctx, srr) + require.NoError(t, err) + + // Providing an incorrect shard should result in the workflow not being found. + p.ShardsToAffect = otherShard + wf, err := tme.wr.NewVReplicationWorkflow(ctx, MoveTablesWorkflow, p) + require.NoError(t, err) + require.Nil(t, wf.ts) + + p.ShardsToAffect = shardToMove + wf, err = tme.wr.NewVReplicationWorkflow(ctx, MoveTablesWorkflow, p) + require.NoError(t, err) + require.NotNil(t, wf) + require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState()) + require.True(t, wf.ts.isPartialMigration, "expected partial shard migration") + + // The default shard routing rule for the keyspace's other shard would + // normally be put in place, but the unit test does not execute the + // wrangler.MoveTables function which adds all of the default shard + // routing rules in the topo for the keyspace when the first workflow + // is run against it. So we simulate it here. + srr, err = tme.ts.GetShardRoutingRules(ctx) + require.NoError(t, err) + srr.Rules = append(srr.Rules, &vschema.ShardRoutingRule{ + FromKeyspace: "ks2", + Shard: "80-", + ToKeyspace: "ks1", + }) + err = tme.ts.SaveShardRoutingRules(ctx, srr) + require.NoError(t, err) + + tme.expectNoPreviousJournals() + expectMoveTablesQueries(t, tme, p) + tme.expectNoPreviousJournals() + wf.params.ShardsToAffect = shardToMove + require.NoError(t, testSwitchForward(t, wf)) + require.Equal(t, "Reads partially switched, for shards: -80. Writes partially switched, for shards: -80", wf.CurrentState()) + require.NoError(t, err) + + tme.expectNoPreviousJournals() + tme.expectNoPreviousReverseJournals() + require.NoError(t, testReverse(t, wf)) + require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState()) +} + func validateRoutingRuleCount(ctx context.Context, t *testing.T, ts *topo.Server, cnt int) { rr, err := ts.GetRoutingRules(ctx) require.NoError(t, err) diff --git a/go/vt/wrangler/wrangler.go b/go/vt/wrangler/wrangler.go index dbb046a36b3..fae6db2f4ee 100644 --- a/go/vt/wrangler/wrangler.go +++ b/go/vt/wrangler/wrangler.go @@ -56,7 +56,8 @@ type Wrangler struct { // DO NOT USE in production code. VExecFunc func(ctx context.Context, workflow, keyspace, query string, dryRun bool) (map[*topo.TabletInfo]*sqltypes.Result, error) // Limt the number of concurrent background goroutines if needed. - sem *semaphore.Weighted + sem *semaphore.Weighted + WorkflowParams *VReplicationWorkflowParams } // New creates a new Wrangler object.