Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support cluster bootstrapping in vtctldclient #14315

Merged
merged 10 commits into from
Nov 16, 2023
8 changes: 4 additions & 4 deletions examples/common/scripts/consul-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ sleep 5

# Add the CellInfo description for the cell.
# If the node already exists, it's fine, means we used existing data.
echo "add $cell CellInfo"
echo "add ${cell} CellInfo"
set +e
# shellcheck disable=SC2086
vtctl $TOPOLOGY_FLAGS VtctldCommand AddCellInfo \
--root "vitess/$cell" \
command vtctldclient --server internal --topo-implementation consul --topo-global-server "${CONSUL_SERVER}:${consul_http_port}" AddCellInfo \
--root "/vitess/${cell}" \
--server-address "${CONSUL_SERVER}:${consul_http_port}" \
"$cell"
"${cell}"
set -e

echo "consul start done..."
9 changes: 4 additions & 5 deletions examples/common/scripts/etcd-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,12 @@ sleep 5

# And also add the CellInfo description for the cell.
# If the node already exists, it's fine, means we used existing data.
echo "add $cell CellInfo"
echo "add ${cell} CellInfo"
set +e
# shellcheck disable=SC2086
vtctl $TOPOLOGY_FLAGS VtctldCommand AddCellInfo \
--root /vitess/$cell \
command vtctldclient --server internal AddCellInfo \
--root "/vitess/${cell}" \
--server-address "${ETCD_SERVER}" \
$cell
"${cell}"
set -e

echo "etcd is running!"
Expand Down
8 changes: 4 additions & 4 deletions examples/common/scripts/zk-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ echo "Started zk servers."
# If the node already exists, it's fine, means we used existing data.
set +e
# shellcheck disable=SC2086
vtctl $TOPOLOGY_FLAGS VtctldCommand AddCellInfo \
--root /vitess/$cell \
--server-address $ZK_SERVER \
$cell
command vtctldclient --server internal --topo-implementation zk2 --topo-global-server "${ZK_SERVER}" AddCellInfo \
--root "/vitess/${cell}" \
--server-address "${ZK_SERVER}" \
"${cell}"
set -e

echo "Configured zk servers."
65 changes: 65 additions & 0 deletions go/cmd/vtctldclient/command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,20 @@ import (
"fmt"
"io"
"strconv"
"strings"
"sync"
"time"

"github.com/spf13/cobra"

"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtctl/grpcvtctldserver"
"vitess.io/vitess/go/vt/vtctl/localvtctldclient"
"vitess.io/vitess/go/vt/vtctl/vtctldclient"
"vitess.io/vitess/go/vt/vttablet/tmclient"

// These imports ensure init()s within them get called and they register their commands/subcommands.
"vitess.io/vitess/go/cmd/vtctldclient/cli"
Expand All @@ -42,8 +48,16 @@ import (
_ "vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/reshard"
_ "vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/vdiff"
_ "vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/workflow"

// These imports register the topo factories to use when --server=internal.
_ "vitess.io/vitess/go/vt/topo/consultopo"
_ "vitess.io/vitess/go/vt/topo/etcd2topo"
_ "vitess.io/vitess/go/vt/topo/zk2topo"
)

// The --server value if you want to use a "local" vtctld server.
const useInternalVtctld = "internal"

var (
// VtctldClientProtocol is the protocol to use when creating the vtctldclient.VtctldClient.
VtctldClientProtocol = "grpc"
Expand All @@ -54,14 +68,37 @@ var (
commandCtx context.Context
commandCancel func()

// Register functions to be called when the command completes.
onTerm = []func(){}

// Register our nil tmclient grpc handler only one time.
// This is primarily for tests where we execute the root
// command multiple times.
once = sync.Once{}

server string
actionTimeout time.Duration
compactOutput bool

topoOptions = struct {
implementation string
globalServerAddresses []string
globalRoot string
}{ // Set defaults
implementation: "etcd2",
globalServerAddresses: []string{"localhost:2379"},
globalRoot: "/vitess/global",
}

// Root is the main entrypoint to the vtctldclient CLI.
Root = &cobra.Command{
Use: "vtctldclient",
Short: "Executes a cluster management command on the remote vtctld server.",
Long: fmt.Sprintf(`Executes a cluster management command on the remote vtctld server.
If there are no running vtctld servers -- for example when bootstrapping
a new Vitess cluster -- you can specify a --server value of '%s'.
When doing so, you would use the --topo* flags so that the client can
connect directly to the topo server(s).`, useInternalVtctld),
// We use PersistentPreRun to set up the tracer, grpc client, and
// command context for every command.
PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) {
Expand All @@ -87,6 +124,10 @@ var (
if client != nil {
err = client.Close()
}
// Execute any registered onTerm functions.
for _, f := range onTerm {
f()
}
trace.LogErrorsWhenClosing(traceCloser)
return err
},
Expand Down Expand Up @@ -152,12 +193,36 @@ func getClientForCommand(cmd *cobra.Command) (vtctldclient.VtctldClient, error)
return nil, errNoServer
}

if server == useInternalVtctld {
ts, err := topo.OpenServer(topoOptions.implementation, strings.Join(topoOptions.globalServerAddresses, ","), topoOptions.globalRoot)
if err != nil {
return nil, fmt.Errorf("failed to connect to the topology server: %v", err)
}
onTerm = append(onTerm, ts.Close)

// Use internal vtctld server implementation.
// Register a nil grpc handler -- we will not use tmclient at all but
// a factory still needs to be registered.
once.Do(func() {
tmclient.RegisterTabletManagerClientFactory("grpc", func() tmclient.TabletManagerClient {
return nil
})
})
vtctld := grpcvtctldserver.NewVtctldServer(ts)
localvtctldclient.SetServer(vtctld)
VtctldClientProtocol = "local"
server = ""
}

return vtctldclient.New(VtctldClientProtocol, server)
}

func init() {
Root.PersistentFlags().StringVar(&server, "server", "", "server to use for the connection (required)")
Root.PersistentFlags().DurationVar(&actionTimeout, "action_timeout", time.Hour, "timeout to use for the command")
Root.PersistentFlags().BoolVar(&compactOutput, "compact", false, "use compact format for otherwise verbose outputs")
Root.PersistentFlags().StringVar(&topoOptions.implementation, "topo-implementation", topoOptions.implementation, "the topology implementation to use")
Root.PersistentFlags().StringSliceVar(&topoOptions.globalServerAddresses, "topo-global-server-address", topoOptions.globalServerAddresses, "the address of the global topology server(s)")
Root.PersistentFlags().StringVar(&topoOptions.globalRoot, "topo-global-root", topoOptions.globalRoot, "the path of the global topology data in the global topology server")
vreplcommon.RegisterCommands(Root)
}
67 changes: 67 additions & 0 deletions go/cmd/vtctldclient/command/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@ limitations under the License.
package command_test

import (
"context"
"fmt"
"os"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/cmd/vtctldclient/command"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/vtctl/localvtctldclient"

vtctlservicepb "vitess.io/vitess/go/vt/proto/vtctlservice"
Expand Down Expand Up @@ -52,3 +58,64 @@ func TestRoot(t *testing.T) {
assert.Contains(t, err.Error(), "unknown command")
})
}

// TestRootWithInternalVtctld tests that the internal VtctldServer
// implementation -- used with --server=internal -- works for
// commands as expected.
func TestRootWithInternalVtctld(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
cell := "zone1"
ts, factory := memorytopo.NewServerAndFactory(ctx, cell)
topo.RegisterFactory("test", factory)
command.VtctldClientProtocol = "local"
baseArgs := []string{"vtctldclient", "--server", "internal", "--topo-implementation", "test"}

args := append([]string{}, os.Args...)
protocol := command.VtctldClientProtocol
t.Cleanup(func() {
ts.Close()
os.Args = append([]string{}, args...)
command.VtctldClientProtocol = protocol
})

testCases := []struct {
command string
args []string
expectErr string
}{
{
command: "AddCellInfo",
args: []string{"--root", fmt.Sprintf("/vitess/%s", cell), "--server-address", "", cell},
expectErr: "node already exists", // Cell already exists
},
{
command: "GetTablets",
},
{
command: "NoCommandDrJones",
expectErr: "unknown command", // Invalid command
},
}

for _, tc := range testCases {
t.Run(tc.command, func(t *testing.T) {
defer func() {
// Reset the OS args.
os.Args = append([]string{}, args...)
}()

os.Args = append(baseArgs, tc.command)
os.Args = append(os.Args, tc.args...)

err := command.Root.Execute()
if tc.expectErr != "" {
if !strings.Contains(err.Error(), tc.expectErr) {
t.Errorf(fmt.Sprintf("%s error = %v, expectErr = %v", tc.command, err, tc.expectErr))
}
} else {
require.NoError(t, err, "unexpected error: %v", err)
}
})
}
}
7 changes: 7 additions & 0 deletions go/flags/endtoend/vtctldclient.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
Executes a cluster management command on the remote vtctld server.
If there are no running vtctld servers -- for example when bootstrapping
a new Vitess cluster -- you can specify a --server value of 'internal'.
When doing so, you would use the --topo* flags so that the client can
connect directly to the topo server(s).

Usage:
vtctldclient [flags]
Expand Down Expand Up @@ -126,6 +130,9 @@ Flags:
--security_policy string the name of a registered security policy to use for controlling access to URLs - empty means allow all for anyone (built-in policies: deny-all, read-only)
--server string server to use for the connection (required)
--stderrthreshold severityFlag logs at or above this threshold go to stderr (default 1)
--topo-global-root string the path of the global topology data in the global topology server (default "/vitess/global")
--topo-global-server-address strings the address of the global topology server(s) (default [localhost:2379])
--topo-implementation string the topology implementation to use (default "etcd2")
-v, --v Level log level for V logs
--version version for vtctldclient
--vmodule vModuleFlag comma-separated list of pattern=N settings for file-filtered logging
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2018,7 +2018,7 @@ func (s *VtctldServer) GetTablets(ctx context.Context, req *vtctldatapb.GetTable

tablets := make([]*topodatapb.Tablet, 0, len(tabletMap))
for _, ti := range tabletMap {
if req.TabletType != 0 && ti.Type != req.TabletType {
if req.TabletType != topodatapb.TabletType_UNKNOWN && ti.Type != req.TabletType {
continue
}
adjustTypeForStalePrimary(ti, truePrimaryTimestamp)
Expand Down Expand Up @@ -2086,7 +2086,7 @@ func (s *VtctldServer) GetTablets(ctx context.Context, req *vtctldatapb.GetTable
if req.Keyspace != "" && tablet.Keyspace != req.Keyspace {
continue
}
if req.TabletType != 0 && tablet.Type != req.TabletType {
if req.TabletType != topodatapb.TabletType_UNKNOWN && tablet.Type != req.TabletType {
continue
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,6 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings
// deleteCopyState deletes the copy state entry for a table, signifying that the copy phase is complete for that table.
func (vc *vcopier) deleteCopyState(tableName string) error {
log.Infof("Deleting copy state for table %s", tableName)
//FIXME get sidecar db name
delQuery := fmt.Sprintf("delete from _vt.copy_state where table_name=%s and vrepl_id = %d", encodeString(tableName), vc.vr.id)
if _, err := vc.vr.dbClient.Execute(delQuery); err != nil {
return err
Expand Down
Loading