Skip to content

Commit

Permalink
Add version support to GetTopologyPath
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed May 13, 2024
1 parent eb22cfb commit 1d0deed
Show file tree
Hide file tree
Showing 15 changed files with 1,419 additions and 1,162 deletions.
8 changes: 7 additions & 1 deletion go/cmd/vtctldclient/command/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ var (
Args: cobra.ExactArgs(1),
RunE: commandGetTopologyPath,
}

// The version of the key/path to get. If not specified, the latest/current
// version is returned.
version int64 = 0
)

func commandGetTopologyPath(cmd *cobra.Command, args []string) error {
Expand All @@ -43,7 +47,8 @@ func commandGetTopologyPath(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

resp, err := client.GetTopologyPath(commandCtx, &vtctldatapb.GetTopologyPathRequest{
Path: path,
Path: path,
Version: version,
})
if err != nil {
return err
Expand All @@ -60,5 +65,6 @@ func commandGetTopologyPath(cmd *cobra.Command, args []string) error {
}

func init() {
GetTopologyPath.Flags().Int64Var(&version, "version", version, "The version of the path's key to get. If not specified, the latest version is returned.")
Root.AddCommand(GetTopologyPath)
}
2,299 changes: 1,159 additions & 1,140 deletions go/vt/proto/vtctldata/vtctldata.pb.go

Large diffs are not rendered by default.

64 changes: 60 additions & 4 deletions go/vt/proto/vtctldata/vtctldata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions go/vt/topo/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ type Conn interface {
// Can return ErrNoNode if the file doesn't exist.
Get(ctx context.Context, filePath string) ([]byte, Version, error)

// GetVersion returns the content of a file at the given version.
// filePath is a path relative to the root directory of the cell.
// Can return ErrNoNode if the file doesn't exist and ErrNoVersion
// if the version doesn't exist.
GetVersion(ctx context.Context, filePath string, version Version) ([]byte, error)

// List returns KV pairs, along with metadata like the version, for
// entries where the key contains the specified prefix.
// filePathPrefix is a path relative to the root directory of the cell.
Expand Down
5 changes: 5 additions & 0 deletions go/vt/topo/consultopo/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ func (s *Server) Get(ctx context.Context, filePath string) ([]byte, topo.Version
return pair.Value, ConsulVersion(pair.ModifyIndex), nil
}

// GetVersion is part of topo.Conn interface.
func (s *Server) GetVersion(ctx context.Context, filePath string, version topo.Version) ([]byte, error) {
return nil, topo.NewError(topo.NoImplementation, "GetVersion not supported in consul topo")
}

// List is part of the topo.Conn interface.
func (s *Server) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, error) {
nodePathPrefix := path.Join(s.root, filePathPrefix)
Expand Down
15 changes: 15 additions & 0 deletions go/vt/topo/etcd2topo/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,21 @@ func (s *Server) Get(ctx context.Context, filePath string) ([]byte, topo.Version
return resp.Kvs[0].Value, EtcdVersion(resp.Kvs[0].ModRevision), nil
}

// GetVersion is part of the topo.Conn interface.
func (s *Server) GetVersion(ctx context.Context, filePath string, version topo.Version) ([]byte, error) {
nodePath := path.Join(s.root, filePath)

resp, err := s.cli.Get(ctx, nodePath, clientv3.WithRev(int64(version.(EtcdVersion))))
if err != nil {
return nil, convertError(err, nodePath)
}
if len(resp.Kvs) != 1 {
return nil, topo.NewError(topo.NoNode, nodePath)
}

return resp.Kvs[0].Value, nil
}

// List is part of the topo.Conn interface.
func (s *Server) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, error) {
nodePathPrefix := path.Join(s.root, filePathPrefix)
Expand Down
5 changes: 5 additions & 0 deletions go/vt/topo/faketopo/faketopo.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,11 @@ func (f *FakeConn) Get(ctx context.Context, filePath string) ([]byte, topo.Versi
return res.contents, memorytopo.NodeVersion(res.version), nil
}

// GetVersion is part of topo.Conn interface.
func (f *FakeConn) GetVersion(ctx context.Context, filePath string, version topo.Version) ([]byte, error) {
return nil, topo.NewError(topo.NoImplementation, "GetVersion not supported in fake topo")
}

// List is part of the topo.Conn interface.
func (f *FakeConn) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, error) {
return nil, topo.NewError(topo.NoImplementation, "List not supported in fake topo")
Expand Down
5 changes: 5 additions & 0 deletions go/vt/topo/memorytopo/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ func (c *Conn) Get(ctx context.Context, filePath string) ([]byte, topo.Version,
return n.contents, NodeVersion(n.version), nil
}

// GetVersion is part of topo.Conn interface.
func (c *Conn) GetVersion(ctx context.Context, filePath string, version topo.Version) ([]byte, error) {
return nil, topo.NewError(topo.NoImplementation, "GetVersion not supported in memory topo")
}

// List is part of the topo.Conn interface.
func (c *Conn) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, error) {
c.factory.callstats.Add([]string{"List"}, 1)
Expand Down
13 changes: 13 additions & 0 deletions go/vt/topo/stats_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,19 @@ func (st *StatsConn) Get(ctx context.Context, filePath string) ([]byte, Version,
return bytes, version, err
}

// GetVersion is part of the Conn interface.
func (st *StatsConn) GetVersion(ctx context.Context, filePath string, version Version) ([]byte, error) {
startTime := time.Now()
statsKey := []string{"GetVersion", st.cell}
defer topoStatsConnTimings.Record(statsKey, startTime)
bytes, err := st.conn.GetVersion(ctx, filePath, version)
if err != nil {
topoStatsConnErrors.Add(statsKey, int64(1))
return bytes, err
}
return bytes, err
}

// List is part of the Conn interface
func (st *StatsConn) List(ctx context.Context, filePathPrefix string) ([]KVInfo, error) {
startTime := time.Now()
Expand Down
9 changes: 9 additions & 0 deletions go/vt/topo/stats_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ func (st *fakeConn) Get(ctx context.Context, filePath string) (bytes []byte, ver
return bytes, ver, err
}

// GetVersion is part of the Conn interface
func (st *fakeConn) GetVersion(ctx context.Context, filePath string, version Version) (bytes []byte, err error) {
if filePath == "error" {
return bytes, fmt.Errorf("Dummy error")

}
return bytes, err
}

// List is part of the Conn interface
func (st *fakeConn) List(ctx context.Context, filePathPrefix string) (bytes []KVInfo, err error) {
if filePathPrefix == "error" {
Expand Down
5 changes: 5 additions & 0 deletions go/vt/topo/zk2topo/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ func (zs *Server) Get(ctx context.Context, filePath string) ([]byte, topo.Versio
return contents, ZKVersion(stat.Version), nil
}

// GetVersion is part of topo.Conn interface.
func (zs *Server) GetVersion(ctx context.Context, filePath string, version topo.Version) ([]byte, error) {
return nil, topo.NewError(topo.NoImplementation, "GetVersion not supported in ZK2 topo")
}

// List is part of the topo.Conn interface.
func (zs *Server) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, error) {
return nil, topo.NewError(topo.NoImplementation, "List not supported in ZK2 topo")
Expand Down
56 changes: 40 additions & 16 deletions go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"vitess.io/vitess/go/vt/schemamanager"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/etcd2topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/topotools/events"
Expand Down Expand Up @@ -2259,24 +2260,31 @@ func (s *VtctldServer) GetTopologyPath(ctx context.Context, req *vtctldatapb.Get
span, ctx := trace.NewSpan(ctx, "VtctldServer.GetTopology")
defer span.Finish()

// handle toplevel display: global, then one line per cell.
if req.Path == "/" {
span.Annotate("version", req.GetVersion())

// Handle toplevel display: global, then one line per cell.
if req.GetPath() == "/" {
cells, err := s.ts.GetKnownCells(ctx)
if err != nil {
return nil, err
}
resp := vtctldatapb.GetTopologyPathResponse{
Cell: &vtctldatapb.TopologyCell{
Path: req.Path,
Path: req.GetPath(),
// the toplevel display has no name, just children
Children: append([]string{topo.GlobalCell}, cells...),
},
}
return &resp, nil
}

// otherwise, delegate to getTopologyCell to parse the path and return the cell there
cell, err := s.getTopologyCell(ctx, req.Path)
// Otherwise, delegate to getTopologyCell to parse the path and return the cell there.
var version topo.Version
if req.GetVersion() != 0 {
// Getting specific versions is only supported with the etcd2topo today.
version = etcd2topo.EtcdVersion(req.GetVersion())
}
cell, err := s.getTopologyCell(ctx, req.GetPath(), version)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -5062,8 +5070,8 @@ func StartServer(s *grpc.Server, env *vtenv.Environment, ts *topo.Server) {
vtctlservicepb.RegisterVtctldServer(s, NewVtctldServer(env, ts))
}

// getTopologyCell is a helper method that returns a topology cell given its path.
func (s *VtctldServer) getTopologyCell(ctx context.Context, cellPath string) (*vtctldatapb.TopologyCell, error) {
// getTopologyKey is a helper method that returns a topology key given its path.
func (s *VtctldServer) getTopologyCell(ctx context.Context, cellPath string, version topo.Version) (*vtctldatapb.TopologyCell, error) {
// extract cell and relative path
parts := strings.Split(cellPath, "/")
if parts[0] != "" || len(parts) < 2 {
Expand All @@ -5080,16 +5088,32 @@ func (s *VtctldServer) getTopologyCell(ctx context.Context, cellPath string) (*v
return nil, err
}

if data, _, err := conn.Get(ctx, relativePath); err == nil {
result, err := topo.DecodeContent(relativePath, data, false)
if err != nil {
err := vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "error decoding file content for cell %s: %v", cellPath, err)
return nil, err
if version != nil {
if data, err := conn.GetVersion(ctx, relativePath, version); err == nil {
result, err := topo.DecodeContent(relativePath, data, false)
if err != nil {
err := vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "error decoding file content for cell %s (version: %s): %v", cellPath, version, err)
return nil, err
}
topoCell.Data = result
topoCell.Version = int64(version.(etcd2topo.EtcdVersion))
// since there is data at this cell, it cannot be a directory cell
// so we can early return the topocell
return &topoCell, nil
}
} else {
if data, curVersion, err := conn.Get(ctx, relativePath); err == nil {
result, err := topo.DecodeContent(relativePath, data, false)
if err != nil {
err := vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "error decoding file content for cell %s: %v", cellPath, err)
return nil, err
}
topoCell.Data = result
topoCell.Version = int64(curVersion.(etcd2topo.EtcdVersion))
// since there is data at this cell, it cannot be a directory cell
// so we can early return the topocell
return &topoCell, nil
}
topoCell.Data = result
// since there is data at this cell, it cannot be a directory cell
// so we can early return the topocell
return &topoCell, nil
}

children, err := conn.ListDir(ctx, relativePath, false /*full*/)
Expand Down
2 changes: 2 additions & 0 deletions proto/vtctldata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,7 @@ message GetTabletsResponse {

message GetTopologyPathRequest {
string path = 1;
int64 version = 2;
}

message GetTopologyPathResponse {
Expand All @@ -1060,6 +1061,7 @@ message TopologyCell {
// It is only populated if the cell is a terminal node.
string data = 3;
repeated string children = 4;
int64 version = 5;
}

message GetVSchemaRequest {
Expand Down
Loading

0 comments on commit 1d0deed

Please sign in to comment.