Skip to content

Commit

Permalink
Add --shards to the VReplication Workflow command. Only partial movet…
Browse files Browse the repository at this point in the history
…ables supported, for safety and since that is the main use case of --shards.

Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps committed Nov 24, 2023
1 parent be1e8f8 commit eaeba61
Show file tree
Hide file tree
Showing 12 changed files with 469 additions and 162 deletions.
22 changes: 19 additions & 3 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2099,6 +2099,7 @@ func commandVReplicationWorkflow(ctx context.Context, wr *wrangler.Wrangler, sub
stopAfterCopy := subFlags.Bool("stop_after_copy", false, "Streams will be stopped once the copy phase is completed")
dropForeignKeys := subFlags.Bool("drop_foreign_keys", false, "If true, tables in the target keyspace will be created without foreign keys.")
maxReplicationLagAllowed := subFlags.Duration("max_replication_lag_allowed", defaultMaxReplicationLagAllowed, "Allow traffic to be switched only if vreplication lag is below this (in seconds)")
shards := subFlags.StringSlice("shards", nil, "(Optional) Specifies a comma-separated list of shards to operate on.")
atomicCopy := subFlags.Bool("atomic-copy", false, "(EXPERIMENTAL) Use this if your source keyspace has tables which use foreign key constraints. All tables from the source will be moved.")

onDDL := "IGNORE"
Expand Down Expand Up @@ -2165,7 +2166,7 @@ func commandVReplicationWorkflow(ctx context.Context, wr *wrangler.Wrangler, sub

printDetails := func() error {
s := ""
res, err := wr.ShowWorkflow(ctx, workflowName, target)
res, err := wr.ShowWorkflow(ctx, workflowName, target, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -2329,6 +2330,7 @@ func commandVReplicationWorkflow(ctx context.Context, wr *wrangler.Wrangler, sub
vrwp.KeepRoutingRules = *keepRoutingRules
}
vrwp.WorkflowType = workflowType
vrwp.ShardsToAffect = *shards
wf, err := wr.NewVReplicationWorkflow(ctx, workflowType, vrwp)
if err != nil {
log.Warningf("NewVReplicationWorkflow returned error %+v", wf)
Expand All @@ -2338,6 +2340,15 @@ func commandVReplicationWorkflow(ctx context.Context, wr *wrangler.Wrangler, sub
return fmt.Errorf("workflow %s does not exist", ksWorkflow)
}

if len(vrwp.ShardsToAffect) > 0 {
if workflowType == wrangler.MoveTablesWorkflow && wf.IsPartialMigration() && action != vReplicationWorkflowActionCreate {
log.Infof("Subset of shards: %s have been specified for keyspace %s, workflow %s, for action %s",
vrwp.ShardsToAffect, target, workflowName, action)
} else {
return fmt.Errorf("shards can only be specified for existing Partial MoveTables workflows")
}
}

printCopyProgress := func() error {
copyProgress, err := wf.GetCopyProgress()
if err != nil {
Expand Down Expand Up @@ -2378,6 +2389,7 @@ func commandVReplicationWorkflow(ctx context.Context, wr *wrangler.Wrangler, sub
}
}

wr.WorkflowParams = vrwp
var dryRunResults *[]string
startState := wf.CachedState()
switch action {
Expand Down Expand Up @@ -3721,8 +3733,9 @@ func commandHelp(ctx context.Context, wr *wrangler.Wrangler, subFlags *pflag.Fla
}

func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *pflag.FlagSet, args []string) error {
usage := "usage: Workflow [--dry-run] [--cells] [--tablet-types] <keyspace>[.<workflow>] start/stop/update/delete/show/listall/tags [<tags>]"
usage := "usage: Workflow [--shards <shards>] [--dry-run] [--cells] [--tablet-types] <keyspace>[.<workflow>] start/stop/update/delete/show/listall/tags [<tags>]"
dryRun := subFlags.Bool("dry-run", false, "Does a dry run of the Workflow action and reports the query and list of tablets on which the operation will be applied")
shards := subFlags.StringSlice("shards", nil, "(Optional) Specifies a comma-separated list of shards to operate on.")
cells := subFlags.StringSlice("cells", []string{}, "New Cell(s) or CellAlias(es) (comma-separated) to replicate from. (Update only)")
tabletTypesStrs := subFlags.StringSlice("tablet-types", []string{}, "New source tablet types to replicate from (e.g. PRIMARY, REPLICA, RDONLY). (Update only)")
onDDL := subFlags.String("on-ddl", "", "New instruction on what to do when DDL is encountered in the VReplication stream. Possible values are IGNORE, STOP, EXEC, and EXEC_IGNORE. (Update only)")
Expand All @@ -3732,6 +3745,9 @@ func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *pflag
if subFlags.NArg() < 2 {
return fmt.Errorf(usage)
}
if len(*shards) > 0 {
log.Infof("Subset of shards specified: %d, %v", len(*shards), strings.Join(*shards, ","))
}
keyspace := subFlags.Arg(0)
action := strings.ToLower(subFlags.Arg(1))
var workflow string
Expand Down Expand Up @@ -3818,7 +3834,7 @@ func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *pflag
OnDdl: binlogdatapb.OnDDLAction(onddl),
}
}
results, err = wr.WorkflowAction(ctx, workflow, keyspace, action, *dryRun, rpcReq) // Only update currently uses the new RPC path
results, err = wr.WorkflowAction(ctx, workflow, keyspace, action, *dryRun, rpcReq, *shards) // Only update currently uses the new RPC path
if err != nil {
return err
}
Expand Down
8 changes: 2 additions & 6 deletions go/vt/vtctl/workflow/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,12 +657,8 @@ func areTabletsAvailableToStreamFrom(ctx context.Context, req *vtctldatapb.Workf
// New callers should instead use the new BuildTargets function.
//
// It returns ErrNoStreams if there are no targets found for the workflow.
func LegacyBuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, targetKeyspace string, workflow string) (*TargetInfo, error) {
targetShards, err := ts.GetShardNames(ctx, targetKeyspace)
if err != nil {
return nil, err
}

func LegacyBuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, targetKeyspace string,
workflow string, targetShards []string) (*TargetInfo, error) {
var (
frozen bool
optCells string
Expand Down
64 changes: 64 additions & 0 deletions go/vt/wrangler/testdata/show-80minus.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{
"Workflow": "wrWorkflow",
"SourceLocation": {
"Keyspace": "source",
"Shards": [
"0"
]
},
"TargetLocation": {
"Keyspace": "target",
"Shards": [
"80-"
]
},
"MaxVReplicationLag": 0,
"MaxVReplicationTransactionLag": 0,
"Frozen": false,
"ShardStatuses": {
"80-/zone1-0000000210": {
"PrimaryReplicationStatuses": [
{
"Shard": "80-",
"Tablet": "zone1-0000000210",
"ID": 1,
"Bls": {
"keyspace": "source",
"shard": "0",
"filter": {
"rules": [
{
"match": "t1"
}
]
}
},
"Pos": "14b68925-696a-11ea-aee7-fec597a91f5e:1-3",
"StopPos": "",
"State": "Copying",
"DBName": "vt_target",
"TransactionTimestamp": 0,
"TimeUpdated": 1234,
"TimeHeartbeat": 1234,
"TimeThrottled": 0,
"ComponentThrottled": "",
"Message": "",
"Tags": "",
"WorkflowType": "Materialize",
"WorkflowSubType": "None",
"CopyState": [
{
"Table": "t1",
"LastPK": "pk1"
}
],
"RowsCopied": 1000
}
],
"TabletControls": null,
"PrimaryIsServing": true
}
},
"SourceTimeZone": "",
"TargetTimeZone": ""
}
107 changes: 107 additions & 0 deletions go/vt/wrangler/testdata/show-all-shards.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
{
"Workflow": "wrWorkflow",
"SourceLocation": {
"Keyspace": "source",
"Shards": [
"0"
]
},
"TargetLocation": {
"Keyspace": "target",
"Shards": [
"-80",
"80-"
]
},
"MaxVReplicationLag": 0,
"MaxVReplicationTransactionLag": 0,
"Frozen": false,
"ShardStatuses": {
"-80/zone1-0000000200": {
"PrimaryReplicationStatuses": [
{
"Shard": "-80",
"Tablet": "zone1-0000000200",
"ID": 1,
"Bls": {
"keyspace": "source",
"shard": "0",
"filter": {
"rules": [
{
"match": "t1"
}
]
}
},
"Pos": "14b68925-696a-11ea-aee7-fec597a91f5e:1-3",
"StopPos": "",
"State": "Copying",
"DBName": "vt_target",
"TransactionTimestamp": 0,
"TimeUpdated": 1234,
"TimeHeartbeat": 1234,
"TimeThrottled": 0,
"ComponentThrottled": "",
"Message": "",
"Tags": "",
"WorkflowType": "Materialize",
"WorkflowSubType": "None",
"CopyState": [
{
"Table": "t1",
"LastPK": "pk1"
}
],
"RowsCopied": 1000
}
],
"TabletControls": null,
"PrimaryIsServing": true
},
"80-/zone1-0000000210": {
"PrimaryReplicationStatuses": [
{
"Shard": "80-",
"Tablet": "zone1-0000000210",
"ID": 1,
"Bls": {
"keyspace": "source",
"shard": "0",
"filter": {
"rules": [
{
"match": "t1"
}
]
}
},
"Pos": "14b68925-696a-11ea-aee7-fec597a91f5e:1-3",
"StopPos": "",
"State": "Copying",
"DBName": "vt_target",
"TransactionTimestamp": 0,
"TimeUpdated": 1234,
"TimeHeartbeat": 1234,
"TimeThrottled": 0,
"ComponentThrottled": "",
"Message": "",
"Tags": "",
"WorkflowType": "Materialize",
"WorkflowSubType": "None",
"CopyState": [
{
"Table": "t1",
"LastPK": "pk1"
}
],
"RowsCopied": 1000
}
],
"TabletControls": null,
"PrimaryIsServing": true
}
},
"SourceTimeZone": "",
"TargetTimeZone": ""
}
64 changes: 64 additions & 0 deletions go/vt/wrangler/testdata/show-minus80.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{
"Workflow": "wrWorkflow",
"SourceLocation": {
"Keyspace": "source",
"Shards": [
"0"
]
},
"TargetLocation": {
"Keyspace": "target",
"Shards": [
"-80"
]
},
"MaxVReplicationLag": 0,
"MaxVReplicationTransactionLag": 0,
"Frozen": false,
"ShardStatuses": {
"-80/zone1-0000000200": {
"PrimaryReplicationStatuses": [
{
"Shard": "-80",
"Tablet": "zone1-0000000200",
"ID": 1,
"Bls": {
"keyspace": "source",
"shard": "0",
"filter": {
"rules": [
{
"match": "t1"
}
]
}
},
"Pos": "14b68925-696a-11ea-aee7-fec597a91f5e:1-3",
"StopPos": "",
"State": "Copying",
"DBName": "vt_target",
"TransactionTimestamp": 0,
"TimeUpdated": 1234,
"TimeHeartbeat": 1234,
"TimeThrottled": 0,
"ComponentThrottled": "",
"Message": "",
"Tags": "",
"WorkflowType": "Materialize",
"WorkflowSubType": "None",
"CopyState": [
{
"Table": "t1",
"LastPK": "pk1"
}
],
"RowsCopied": 1000
}
],
"TabletControls": null,
"PrimaryIsServing": true
}
},
"SourceTimeZone": "",
"TargetTimeZone": ""
}
39 changes: 38 additions & 1 deletion go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,8 +856,45 @@ func (wr *Wrangler) DropSources(ctx context.Context, targetKeyspace, workflowNam
return sw.logs(), nil
}

func (wr *Wrangler) getShardsToAffect(ctx context.Context, keyspace string, shardsToAffect []string) ([]string, error) {
if wr.WorkflowParams != nil && len(wr.WorkflowParams.ShardsToAffect) > 0 {
shardsToAffect = wr.WorkflowParams.ShardsToAffect
}
allShards, err := wr.ts.GetShardNames(ctx, keyspace)
if err != nil {
return nil, err
}
if len(allShards) == 0 {
return nil, fmt.Errorf("no shards found in keyspace %s", keyspace)
}

if len(shardsToAffect) == 0 {
return allShards, nil
} else {
for _, shard := range shardsToAffect {
// Validate that the provided shards are part of the keyspace.
found := false
for _, shard2 := range allShards {
if shard == shard2 {
found = true
}
}
if !found {
return nil, fmt.Errorf("shard %s not found in keyspace %s", shard, keyspace)
}
}
log.Infof("Selecting subset of shards in keyspace %s: %d from %d :: %+v",
keyspace, len(shardsToAffect), len(allShards), shardsToAffect)
return shardsToAffect, nil
}
}

func (wr *Wrangler) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workflowName string) (*trafficSwitcher, error) {
tgtInfo, err := workflow.LegacyBuildTargets(ctx, wr.ts, wr.tmc, targetKeyspace, workflowName)
shardsToAffect, err := wr.getShardsToAffect(ctx, targetKeyspace, nil)
if err != nil {
return nil, err
}
tgtInfo, err := workflow.LegacyBuildTargets(ctx, wr.ts, wr.tmc, targetKeyspace, workflowName, shardsToAffect)
if err != nil {
log.Infof("Error building targets: %s", err)
return nil, err
Expand Down
Loading

0 comments on commit eaeba61

Please sign in to comment.