Skip to content

Commit

Permalink
Provide subset of shards for certain VReplication Commands (#14873)
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps authored Jan 9, 2024
1 parent fab65bf commit 9e3b332
Show file tree
Hide file tree
Showing 38 changed files with 1,793 additions and 654 deletions.
2 changes: 2 additions & 0 deletions go/cmd/vtctldclient/command/vreplication/common/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
var CancelOptions = struct {
KeepData bool
KeepRoutingRules bool
Shards []string
}{}

func GetCancelCommand(opts *SubCommandsOpts) *cobra.Command {
Expand Down Expand Up @@ -58,6 +59,7 @@ func commandCancel(cmd *cobra.Command, args []string) error {
Workflow: BaseOptions.Workflow,
KeepData: CancelOptions.KeepData,
KeepRoutingRules: CancelOptions.KeepRoutingRules,
Shards: CancelOptions.Shards,
}
resp, err := GetClient().WorkflowDelete(GetCommandCtx(), req)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var CompleteOptions = struct {
KeepRoutingRules bool
RenameTables bool
DryRun bool
Shards []string
}{}

func GetCompleteCommand(opts *SubCommandsOpts) *cobra.Command {
Expand Down
8 changes: 5 additions & 3 deletions go/cmd/vtctldclient/command/vreplication/common/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import (
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

var showOptions = struct {
var ShowOptions = struct {
IncludeLogs bool
Shards []string
}{}

func GetShowCommand(opts *SubCommandsOpts) *cobra.Command {
Expand All @@ -40,7 +41,7 @@ func GetShowCommand(opts *SubCommandsOpts) *cobra.Command {
Args: cobra.NoArgs,
RunE: commandShow,
}
cmd.Flags().BoolVar(&showOptions.IncludeLogs, "include-logs", true, "Include recent logs for the workflow.")
cmd.Flags().BoolVar(&ShowOptions.IncludeLogs, "include-logs", true, "Include recent logs for the workflow.")
return cmd
}

Expand All @@ -50,7 +51,8 @@ func commandShow(cmd *cobra.Command, args []string) error {
req := &vtctldatapb.GetWorkflowsRequest{
Keyspace: BaseOptions.TargetKeyspace,
Workflow: BaseOptions.Workflow,
IncludeLogs: showOptions.IncludeLogs,
IncludeLogs: ShowOptions.IncludeLogs,
Shards: ShowOptions.Shards,
}
resp, err := GetClient().GetWorkflows(GetCommandCtx(), req)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions go/cmd/vtctldclient/command/vreplication/common/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

var StatusOptions = struct {
Shards []string
}{}

func GetStatusCommand(opts *SubCommandsOpts) *cobra.Command {
cmd := &cobra.Command{
Use: "status",
Expand All @@ -49,6 +53,7 @@ func commandStatus(cmd *cobra.Command, args []string) error {
req := &vtctldatapb.WorkflowStatusRequest{
Keyspace: BaseOptions.TargetKeyspace,
Workflow: BaseOptions.Workflow,
Shards: StatusOptions.Shards,
}
resp, err := GetClient().WorkflowStatus(GetCommandCtx(), req)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions go/cmd/vtctldclient/command/vreplication/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ var SwitchTrafficOptions = struct {
DryRun bool
Direction workflow.TrafficSwitchDirection
InitializeTargetSequences bool
Shards []string
}{}

func AddCommonSwitchTrafficFlags(cmd *cobra.Command, initializeTargetSequences bool) {
Expand All @@ -246,3 +247,7 @@ func AddCommonSwitchTrafficFlags(cmd *cobra.Command, initializeTargetSequences b
cmd.Flags().BoolVar(&SwitchTrafficOptions.InitializeTargetSequences, "initialize-target-sequences", false, "When moving tables from an unsharded keyspace to a sharded keyspace, initialize any sequences that are being used on the target when switching writes.")
}
}

func AddShardSubsetFlag(cmd *cobra.Command, shardsOption *[]string) {
cmd.Flags().StringSliceVar(shardsOption, "shards", nil, "(Optional) Specifies a comma-separated list of shards to operate on.")
}
13 changes: 11 additions & 2 deletions go/cmd/vtctldclient/command/vreplication/movetables/movetables.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,30 +53,39 @@ func registerCommands(root *cobra.Command) {
SubCommand: "MoveTables",
Workflow: "commerce2customer",
}
base.AddCommand(common.GetShowCommand(opts))
base.AddCommand(common.GetStatusCommand(opts))
showCommand := common.GetShowCommand(opts)
common.AddShardSubsetFlag(showCommand, &common.ShowOptions.Shards)
base.AddCommand(showCommand)

statusCommand := common.GetStatusCommand(opts)
common.AddShardSubsetFlag(statusCommand, &common.StatusOptions.Shards)
base.AddCommand(statusCommand)

base.AddCommand(common.GetStartCommand(opts))
base.AddCommand(common.GetStopCommand(opts))

switchTrafficCommand := common.GetSwitchTrafficCommand(opts)
common.AddCommonSwitchTrafficFlags(switchTrafficCommand, true)
common.AddShardSubsetFlag(switchTrafficCommand, &common.SwitchTrafficOptions.Shards)
base.AddCommand(switchTrafficCommand)

reverseTrafficCommand := common.GetReverseTrafficCommand(opts)
common.AddCommonSwitchTrafficFlags(reverseTrafficCommand, false)
common.AddShardSubsetFlag(reverseTrafficCommand, &common.SwitchTrafficOptions.Shards)
base.AddCommand(reverseTrafficCommand)

complete := common.GetCompleteCommand(opts)
complete.Flags().BoolVar(&common.CompleteOptions.KeepData, "keep-data", false, "Keep the original source table data that was copied by the MoveTables workflow.")
complete.Flags().BoolVar(&common.CompleteOptions.KeepRoutingRules, "keep-routing-rules", false, "Keep the routing rules in place that direct table traffic from the source keyspace to the target keyspace of the MoveTables workflow.")
complete.Flags().BoolVar(&common.CompleteOptions.RenameTables, "rename-tables", false, "Keep the original source table data that was copied by the MoveTables workflow, but rename each table to '_<tablename>_old'.")
complete.Flags().BoolVar(&common.CompleteOptions.DryRun, "dry-run", false, "Print the actions that would be taken and report any known errors that would have occurred.")
common.AddShardSubsetFlag(complete, &common.CompleteOptions.Shards)
base.AddCommand(complete)

cancel := common.GetCancelCommand(opts)
cancel.Flags().BoolVar(&common.CancelOptions.KeepData, "keep-data", false, "Keep the partially copied table data from the MoveTables workflow in the target keyspace.")
cancel.Flags().BoolVar(&common.CancelOptions.KeepRoutingRules, "keep-routing-rules", false, "Keep the routing rules created for the MoveTables workflow.")
common.AddShardSubsetFlag(cancel, &common.CancelOptions.Shards)
base.AddCommand(cancel)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func commandDelete(cmd *cobra.Command, args []string) error {
Workflow: baseOptions.Workflow,
KeepData: deleteOptions.KeepData,
KeepRoutingRules: deleteOptions.KeepRoutingRules,
Shards: baseOptions.Shards,
}
resp, err := common.GetClient().WorkflowDelete(common.GetCommandCtx(), req)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions go/cmd/vtctldclient/command/vreplication/workflow/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func commandShow(cmd *cobra.Command, args []string) error {
Keyspace: baseOptions.Keyspace,
Workflow: baseOptions.Workflow,
IncludeLogs: workflowShowOptions.IncludeLogs,
Shards: baseOptions.Shards,
}
resp, err := common.GetClient().GetWorkflows(common.GetCommandCtx(), req)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions go/cmd/vtctldclient/command/vreplication/workflow/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func commandUpdateState(cmd *cobra.Command, args []string) error {
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)},
OnDdl: binlogdatapb.OnDDLAction(textutil.SimulatedNullInt),
State: state,
Shards: baseOptions.Shards,
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func commandUpdate(cmd *cobra.Command, args []string) error {
TabletTypes: updateOptions.TabletTypes,
TabletSelectionPreference: tsp,
OnDdl: binlogdatapb.OnDDLAction(onddl),
Shards: baseOptions.Shards,
},
}

Expand Down
7 changes: 7 additions & 0 deletions go/cmd/vtctldclient/command/vreplication/workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var (
baseOptions = struct {
Keyspace string
Workflow string
Shards []string
}{}

workflowShowOptions = struct {
Expand All @@ -59,21 +60,26 @@ func registerCommands(root *cobra.Command) {
delete.MarkFlagRequired("workflow")
delete.Flags().BoolVar(&deleteOptions.KeepData, "keep-data", false, "Keep the partially copied table data from the workflow in the target keyspace.")
delete.Flags().BoolVar(&deleteOptions.KeepRoutingRules, "keep-routing-rules", false, "Keep the routing rules created for the workflow.")
common.AddShardSubsetFlag(delete, &baseOptions.Shards)
base.AddCommand(delete)

common.AddShardSubsetFlag(workflowList, &baseOptions.Shards)
base.AddCommand(workflowList)

show.Flags().StringVarP(&baseOptions.Workflow, "workflow", "w", "", "The workflow you want the details for.")
show.MarkFlagRequired("workflow")
show.Flags().BoolVar(&workflowShowOptions.IncludeLogs, "include-logs", true, "Include recent logs for the workflow.")
common.AddShardSubsetFlag(show, &baseOptions.Shards)
base.AddCommand(show)

start.Flags().StringVarP(&baseOptions.Workflow, "workflow", "w", "", "The workflow you want to start.")
start.MarkFlagRequired("workflow")
common.AddShardSubsetFlag(start, &baseOptions.Shards)
base.AddCommand(start)

stop.Flags().StringVarP(&baseOptions.Workflow, "workflow", "w", "", "The workflow you want to stop.")
stop.MarkFlagRequired("workflow")
common.AddShardSubsetFlag(stop, &baseOptions.Shards)
base.AddCommand(stop)

update.Flags().StringVarP(&baseOptions.Workflow, "workflow", "w", "", "The workflow you want to update.")
Expand All @@ -82,6 +88,7 @@ func registerCommands(root *cobra.Command) {
update.Flags().VarP((*topoproto.TabletTypeListFlag)(&updateOptions.TabletTypes), "tablet-types", "t", "New source tablet types to replicate from (e.g. PRIMARY,REPLICA,RDONLY).")
update.Flags().BoolVar(&updateOptions.TabletTypesInPreferenceOrder, "tablet-types-in-order", true, "When performing source tablet selection, look for candidates in the type order as they are listed in the tablet-types flag.")
update.Flags().StringVar(&updateOptions.OnDDL, "on-ddl", "", "New instruction on what to do when DDL is encountered in the VReplication stream. Possible values are IGNORE, STOP, EXEC, and EXEC_IGNORE.")
common.AddShardSubsetFlag(update, &baseOptions.Shards)
base.AddCommand(update)
}

Expand Down
Loading

0 comments on commit 9e3b332

Please sign in to comment.