From b2e2ba67590de93266cb41a3416f31933cb9cb7c Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 17 Dec 2024 13:22:14 -0500 Subject: [PATCH] Get all e2e tests working Signed-off-by: Matt Lord --- go/cmd/vtcombo/cli/vschema_watcher.go | 11 ++- .../keyspace_watches/keyspace_watch_test.go | 1 - go/vt/topo/helpers/copy.go | 7 +- go/vt/topo/srv_vschema.go | 13 ++- go/vt/topo/vschema.go | 53 +++++++---- go/vt/topotools/vschema_ddl.go | 91 +++++++++---------- go/vt/vtcombo/tablet_map.go | 6 +- go/vt/vtctl/grpcvtctldserver/server.go | 42 +++++---- go/vt/vtctl/vtctl.go | 56 +++++++----- go/vt/vtctl/workflow/materializer.go | 8 +- go/vt/vtctl/workflow/resharder.go | 3 +- go/vt/vtctl/workflow/server.go | 32 ++++--- go/vt/vtctl/workflow/traffic_switcher.go | 10 +- go/vt/vtgate/executorcontext/vcursor_impl.go | 26 ++++-- go/vt/vtgate/vschema_manager.go | 12 +-- go/vt/wrangler/materializer.go | 26 +++--- go/vt/wrangler/resharder.go | 3 +- go/vt/wrangler/traffic_switcher.go | 8 +- 18 files changed, 235 insertions(+), 173 deletions(-) diff --git a/go/cmd/vtcombo/cli/vschema_watcher.go b/go/cmd/vtcombo/cli/vschema_watcher.go index 484c7736424..7f6adad3c22 100644 --- a/go/cmd/vtcombo/cli/vschema_watcher.go +++ b/go/cmd/vtcombo/cli/vschema_watcher.go @@ -56,17 +56,20 @@ func loadKeyspacesFromDir(ctx context.Context, dir string, ts *topo.Server) { log.Fatalf("Unable to read keyspace file %v: %v", ksFile, err) } - keyspace := &vschemapb.Keyspace{} - err = json.Unmarshal(jsonData, keyspace) + ksvs := &topo.KeyspaceVSchemaInfo{ + Name: ks.Name, + Keyspace: &vschemapb.Keyspace{}, + } + err = json.Unmarshal(jsonData, ksvs.Keyspace) if err != nil { log.Fatalf("Unable to parse keyspace file %v: %v", ksFile, err) } - _, err = vindexes.BuildKeyspace(keyspace, env.Parser()) + _, err = vindexes.BuildKeyspace(ksvs.Keyspace, env.Parser()) if err != nil { log.Fatalf("Invalid keyspace definition: %v", err) } - ts.SaveVSchema(ctx, ks.Name, keyspace) + ts.SaveVSchema(ctx, ksvs) log.Infof("Loaded keyspace %v from %v\n", ks.Name, ksFile) } } diff --git a/go/test/endtoend/vtgate/keyspace_watches/keyspace_watch_test.go b/go/test/endtoend/vtgate/keyspace_watches/keyspace_watch_test.go index 1eb31663577..d7ccbb1b0b9 100644 --- a/go/test/endtoend/vtgate/keyspace_watches/keyspace_watch_test.go +++ b/go/test/endtoend/vtgate/keyspace_watches/keyspace_watch_test.go @@ -140,7 +140,6 @@ func TestRoutingWithKeyspacesToWatch(t *testing.T) { } func TestVSchemaDDLWithKeyspacesToWatch(t *testing.T) { - extraVTGateArgs := []string{ "--vschema_ddl_authorized_users", "%", } diff --git a/go/vt/topo/helpers/copy.go b/go/vt/topo/helpers/copy.go index 6dff1c6ac22..94e2d8aca70 100644 --- a/go/vt/topo/helpers/copy.go +++ b/go/vt/topo/helpers/copy.go @@ -41,7 +41,6 @@ func CopyKeyspaces(ctx context.Context, fromTS, toTS *topo.Server, parser *sqlpa } for _, keyspace := range keyspaces { - ki, err := fromTS.GetKeyspace(ctx, keyspace) if err != nil { return fmt.Errorf("GetKeyspace(%v): %w", keyspace, err) @@ -55,15 +54,15 @@ func CopyKeyspaces(ctx context.Context, fromTS, toTS *topo.Server, parser *sqlpa } } - vs, err := fromTS.GetVSchema(ctx, keyspace) + ksvs, err := fromTS.GetVSchema(ctx, keyspace) switch { case err == nil: - _, err = vindexes.BuildKeyspace(vs, parser) + _, err = vindexes.BuildKeyspace(ksvs.Keyspace, parser) if err != nil { log.Errorf("BuildKeyspace(%v): %v", keyspace, err) break } - if err := toTS.SaveVSchema(ctx, keyspace, vs); err != nil { + if err := toTS.SaveVSchema(ctx, ksvs); err != nil { log.Errorf("SaveVSchema(%v): %v", keyspace, err) } case topo.IsErrType(err, topo.NoNode): diff --git a/go/vt/topo/srv_vschema.go b/go/vt/topo/srv_vschema.go index f69fca83537..909a5337919 100644 --- a/go/vt/topo/srv_vschema.go +++ b/go/vt/topo/srv_vschema.go @@ -171,10 +171,17 @@ func (ts *Server) RebuildSrvVSchema(ctx context.Context, cells []string) error { go func(keyspace string) { defer wg.Done() - k, err := ts.GetVSchema(ctx, keyspace) + ksvs, err := ts.GetVSchema(ctx, keyspace) if IsErrType(err, NoNode) { err = nil - k = &vschemapb.Keyspace{} + ksvs = &KeyspaceVSchemaInfo{ + Name: keyspace, + Keyspace: &vschemapb.Keyspace{ + Sharded: false, + Vindexes: make(map[string]*vschemapb.Vindex), + Tables: make(map[string]*vschemapb.Table), + }, + } } mu.Lock() @@ -184,7 +191,7 @@ func (ts *Server) RebuildSrvVSchema(ctx context.Context, cells []string) error { finalErr = err return } - srvVSchema.Keyspaces[keyspace] = k + srvVSchema.Keyspaces[keyspace] = ksvs.Keyspace }(keyspace) } wg.Wait() diff --git a/go/vt/topo/vschema.go b/go/vt/topo/vschema.go index 21192e1aacb..6e685423dbf 100644 --- a/go/vt/topo/vschema.go +++ b/go/vt/topo/vschema.go @@ -28,26 +28,38 @@ import ( vschemapb "vitess.io/vitess/go/vt/proto/vschema" ) +// KeyspaceVSchemaInfo wraps a vschemapb.Keyspace and is a meta +// struct that contains metadata to give the data more context +// and convenience. This is the main way we interact with a +// keyspace's vschema. +type KeyspaceVSchemaInfo struct { + Name string + *vschemapb.Keyspace + version Version +} + // SaveVSchema saves a Vschema. A valid Vschema should be passed in. It does not verify its correctness. // If the VSchema is empty, just remove it. -func (ts *Server) SaveVSchema(ctx context.Context, keyspace string, vschema *vschemapb.Keyspace) error { +func (ts *Server) SaveVSchema(ctx context.Context, ksvs *KeyspaceVSchemaInfo) error { if err := ctx.Err(); err != nil { return err } - nodePath := path.Join(KeyspacesPath, keyspace, VSchemaFile) - data, err := vschema.MarshalVT() + nodePath := path.Join(KeyspacesPath, ksvs.Name, VSchemaFile) + data, err := ksvs.MarshalVT() if err != nil { return err } - _, err = ts.globalCell.Update(ctx, nodePath, data, nil) + version, err := ts.globalCell.Update(ctx, nodePath, data, ksvs.version) if err != nil { - log.Errorf("failed to update vschema for keyspace %s: %v", keyspace, err) - } else { - log.Infof("successfully updated vschema for keyspace %s: %+v", keyspace, vschema) + log.Errorf("failed to update vschema for keyspace %s: %v", ksvs.Name, err) + return err } - return err + ksvs.version = version + log.Infof("successfully updated vschema for keyspace %s: %+v", ksvs.Name, ksvs.Keyspace) + + return nil } // DeleteVSchema delete the keyspace if it exists @@ -61,13 +73,13 @@ func (ts *Server) DeleteVSchema(ctx context.Context, keyspace string) error { } // GetVSchema fetches the vschema from the topo. -func (ts *Server) GetVSchema(ctx context.Context, keyspace string) (*vschemapb.Keyspace, error) { +func (ts *Server) GetVSchema(ctx context.Context, keyspace string) (*KeyspaceVSchemaInfo, error) { if err := ctx.Err(); err != nil { return nil, err } nodePath := path.Join(KeyspacesPath, keyspace, VSchemaFile) - data, _, err := ts.globalCell.Get(ctx, nodePath) + data, version, err := ts.globalCell.Get(ctx, nodePath) if err != nil { return nil, err } @@ -76,20 +88,27 @@ func (ts *Server) GetVSchema(ctx context.Context, keyspace string) (*vschemapb.K if err != nil { return nil, vterrors.Wrapf(err, "bad vschema data: %q", data) } - return &vs, nil + return &KeyspaceVSchemaInfo{ + Name: keyspace, + Keyspace: &vs, + version: version, + }, nil } // EnsureVSchema makes sure that a vschema is present for this keyspace or creates a blank one if it is missing func (ts *Server) EnsureVSchema(ctx context.Context, keyspace string) error { - vschema, err := ts.GetVSchema(ctx, keyspace) + ksvs, err := ts.GetVSchema(ctx, keyspace) if err != nil && !IsErrType(err, NoNode) { log.Infof("error in getting vschema for keyspace %s: %v", keyspace, err) } - if vschema == nil || IsErrType(err, NoNode) { - err = ts.SaveVSchema(ctx, keyspace, &vschemapb.Keyspace{ - Sharded: false, - Vindexes: make(map[string]*vschemapb.Vindex), - Tables: make(map[string]*vschemapb.Table), + if ksvs == nil || ksvs.Keyspace == nil || IsErrType(err, NoNode) { + err = ts.SaveVSchema(ctx, &KeyspaceVSchemaInfo{ + Name: keyspace, + Keyspace: &vschemapb.Keyspace{ + Sharded: false, + Vindexes: make(map[string]*vschemapb.Vindex), + Tables: make(map[string]*vschemapb.Table), + }, }) if err != nil { log.Errorf("could not create blank vschema: %v", err) diff --git a/go/vt/topotools/vschema_ddl.go b/go/vt/topotools/vschema_ddl.go index 3c6f5bced3c..8ddfce86b02 100644 --- a/go/vt/topotools/vschema_ddl.go +++ b/go/vt/topotools/vschema_ddl.go @@ -20,6 +20,7 @@ import ( "reflect" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vterrors" vschemapb "vitess.io/vitess/go/vt/proto/vschema" @@ -28,55 +29,51 @@ import ( // ApplyVSchemaDDL applies the given DDL statement to the vschema // keyspace definition and returns the modified keyspace object. -func ApplyVSchemaDDL(ksName string, ks *vschemapb.Keyspace, alterVschema *sqlparser.AlterVschema) (*vschemapb.Keyspace, error) { - if ks == nil { - ks = new(vschemapb.Keyspace) +func ApplyVSchemaDDL(ksName string, ksvs *topo.KeyspaceVSchemaInfo, alterVschema *sqlparser.AlterVschema) (*topo.KeyspaceVSchemaInfo, error) { + if ksvs.Tables == nil { + ksvs.Tables = map[string]*vschemapb.Table{} } - if ks.Tables == nil { - ks.Tables = map[string]*vschemapb.Table{} - } - - if ks.Vindexes == nil { - ks.Vindexes = map[string]*vschemapb.Vindex{} + if ksvs.Vindexes == nil { + ksvs.Vindexes = map[string]*vschemapb.Vindex{} } var tableName string var table *vschemapb.Table if !alterVschema.Table.IsEmpty() { tableName = alterVschema.Table.Name.String() - table = ks.Tables[tableName] + table = ksvs.Tables[tableName] } switch alterVschema.Action { case sqlparser.CreateVindexDDLAction: name := alterVschema.VindexSpec.Name.String() - if _, ok := ks.Vindexes[name]; ok { + if _, ok := ksvs.Vindexes[name]; ok { return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s already exists in keyspace %s", name, ksName) } // Make sure the keyspace has the sharded bit set to true // if this is the first vindex defined in the keyspace. - if len(ks.Vindexes) == 0 { - ks.Sharded = true + if len(ksvs.Vindexes) == 0 { + ksvs.Sharded = true } owner, params := alterVschema.VindexSpec.ParseParams() - ks.Vindexes[name] = &vschemapb.Vindex{ + ksvs.Vindexes[name] = &vschemapb.Vindex{ Type: alterVschema.VindexSpec.Type.String(), Params: params, Owner: owner, } - return ks, nil + return ksvs, nil case sqlparser.DropVindexDDLAction: name := alterVschema.VindexSpec.Name.String() - if _, ok := ks.Vindexes[name]; !ok { + if _, ok := ksvs.Vindexes[name]; !ok { return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s does not exists in keyspace %s", name, ksName) } - for tableName, table := range ks.Tables { + for tableName, table := range ksvs.Tables { // Make sure there isn't a vindex with the same name left on the table. for _, vindex := range table.ColumnVindexes { if vindex.Name == name { @@ -85,33 +82,33 @@ func ApplyVSchemaDDL(ksName string, ks *vschemapb.Keyspace, alterVschema *sqlpar } } - delete(ks.Vindexes, name) + delete(ksvs.Vindexes, name) - return ks, nil + return ksvs, nil case sqlparser.AddVschemaTableDDLAction: - if ks.Sharded { + if ksvs.Sharded { return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "add vschema table: unsupported on sharded keyspace %s", ksName) } name := alterVschema.Table.Name.String() - if _, ok := ks.Tables[name]; ok { + if _, ok := ksvs.Tables[name]; ok { return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vschema already contains table %s in keyspace %s", name, ksName) } - ks.Tables[name] = &vschemapb.Table{} + ksvs.Tables[name] = &vschemapb.Table{} - return ks, nil + return ksvs, nil case sqlparser.DropVschemaTableDDLAction: name := alterVschema.Table.Name.String() - if _, ok := ks.Tables[name]; !ok { + if _, ok := ksvs.Tables[name]; !ok { return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vschema does not contain table %s in keyspace %s", name, ksName) } - delete(ks.Tables, name) + delete(ksvs.Tables, name) - return ks, nil + return ksvs, nil case sqlparser.AddColVindexDDLAction: // Support two cases: @@ -126,7 +123,7 @@ func ApplyVSchemaDDL(ksName string, ks *vschemapb.Keyspace, alterVschema *sqlpar name := spec.Name.String() if spec.Type.NotEmpty() { owner, params := spec.ParseParams() - if vindex, ok := ks.Vindexes[name]; ok { + if vindex, ok := ksvs.Vindexes[name]; ok { if vindex.Type != spec.Type.String() { return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s defined with type %s not %s", name, vindex.Type, spec.Type.String()) } @@ -139,17 +136,17 @@ func ApplyVSchemaDDL(ksName string, ks *vschemapb.Keyspace, alterVschema *sqlpar } else { // Make sure the keyspace has the sharded bit set to true // if this is the first vindex defined in the keyspace. - if len(ks.Vindexes) == 0 { - ks.Sharded = true + if len(ksvs.Vindexes) == 0 { + ksvs.Sharded = true } - ks.Vindexes[name] = &vschemapb.Vindex{ + ksvs.Vindexes[name] = &vschemapb.Vindex{ Type: spec.Type.String(), Params: params, Owner: owner, } } } else { - if _, ok := ks.Vindexes[name]; !ok { + if _, ok := ksvs.Vindexes[name]; !ok { return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s does not exist in keyspace %s", name, ksName) } } @@ -178,9 +175,9 @@ func ApplyVSchemaDDL(ksName string, ks *vschemapb.Keyspace, alterVschema *sqlpar Name: name, Columns: columns, }) - ks.Tables[tableName] = table + ksvs.Tables[tableName] = table - return ks, nil + return ksvs, nil case sqlparser.DropColVindexDDLAction: spec := alterVschema.VindexSpec @@ -193,44 +190,44 @@ func ApplyVSchemaDDL(ksName string, ks *vschemapb.Keyspace, alterVschema *sqlpar if colVindex.Name == name { table.ColumnVindexes = append(table.ColumnVindexes[:i], table.ColumnVindexes[i+1:]...) if len(table.ColumnVindexes) == 0 { - delete(ks.Tables, tableName) + delete(ksvs.Tables, tableName) } - return ks, nil + return ksvs, nil } } return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vindex %s not defined in table %s.%s", name, ksName, tableName) case sqlparser.AddSequenceDDLAction: - if ks.Sharded { + if ksvs.Sharded { return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "add sequence table: unsupported on sharded keyspace %s", ksName) } name := alterVschema.Table.Name.String() - if _, ok := ks.Tables[name]; ok { + if _, ok := ksvs.Tables[name]; ok { return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vschema already contains sequence %s in keyspace %s", name, ksName) } - ks.Tables[name] = &vschemapb.Table{Type: "sequence"} + ksvs.Tables[name] = &vschemapb.Table{Type: "sequence"} - return ks, nil + return ksvs, nil case sqlparser.DropSequenceDDLAction: - if ks.Sharded { + if ksvs.Sharded { return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "drop sequence table: unsupported on sharded keyspace %s", ksName) } name := alterVschema.Table.Name.String() - if _, ok := ks.Tables[name]; !ok { + if _, ok := ksvs.Tables[name]; !ok { return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vschema does not contain sequence %s in keyspace %s", name, ksName) } - delete(ks.Tables, name) + delete(ksvs.Tables, name) - return ks, nil + return ksvs, nil case sqlparser.AddAutoIncDDLAction: name := alterVschema.Table.Name.String() - table := ks.Tables[name] + table := ksvs.Tables[name] if table == nil { return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vschema does not contain table %s in keyspace %s", name, ksName) } @@ -244,11 +241,11 @@ func ApplyVSchemaDDL(ksName string, ks *vschemapb.Keyspace, alterVschema *sqlpar Sequence: sqlparser.String(alterVschema.AutoIncSpec.Sequence), } - return ks, nil + return ksvs, nil case sqlparser.DropAutoIncDDLAction: name := alterVschema.Table.Name.String() - table := ks.Tables[name] + table := ksvs.Tables[name] if table == nil { return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "vschema does not contain table %s in keyspace %s", name, ksName) } @@ -259,7 +256,7 @@ func ApplyVSchemaDDL(ksName string, ks *vschemapb.Keyspace, alterVschema *sqlpar table.AutoIncrement = nil - return ks, nil + return ksvs, nil } return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected vindex ddl operation %s", alterVschema.Action.ToString()) diff --git a/go/vt/vtcombo/tablet_map.go b/go/vt/vtcombo/tablet_map.go index fc02409bd5f..64b49ae2bed 100644 --- a/go/vt/vtcombo/tablet_map.go +++ b/go/vt/vtcombo/tablet_map.go @@ -410,7 +410,11 @@ func CreateKs( if err != nil { return 0, fmt.Errorf("BuildKeyspace(%v) failed: %v", keyspace, err) } - if err := ts.SaveVSchema(ctx, keyspace, formal); err != nil { + ksvs := &topo.KeyspaceVSchemaInfo{ + Name: keyspace, + Keyspace: formal, + } + if err := ts.SaveVSchema(ctx, ksvs); err != nil { return 0, fmt.Errorf("SaveVSchema(%v) failed: %v", keyspace, err) } } else { diff --git a/go/vt/vtctl/grpcvtctldserver/server.go b/go/vt/vtctl/grpcvtctldserver/server.go index c3dc22d21b4..2ed6468f111 100644 --- a/go/vt/vtctl/grpcvtctldserver/server.go +++ b/go/vt/vtctl/grpcvtctldserver/server.go @@ -338,7 +338,9 @@ func (s *VtctldServer) ApplyVSchema(ctx context.Context, req *vtctldatapb.ApplyV return nil, err } - var vs *vschemapb.Keyspace + ksvs := &topo.KeyspaceVSchemaInfo{ + Name: req.Keyspace, + } if req.Sql != "" { span.Annotate("sql_mode", true) @@ -355,29 +357,29 @@ func (s *VtctldServer) ApplyVSchema(ctx context.Context, req *vtctldatapb.ApplyV return nil, err } - vs, err = s.ts.GetVSchema(ctx, req.Keyspace) + ksvs, err = s.ts.GetVSchema(ctx, req.Keyspace) if err != nil && !topo.IsErrType(err, topo.NoNode) { err = vterrors.Wrapf(err, "GetVSchema(%s)", req.Keyspace) return nil, err } // otherwise, we keep the empty vschema object from above - vs, err = topotools.ApplyVSchemaDDL(req.Keyspace, vs, ddl) + ksvs, err = topotools.ApplyVSchemaDDL(req.Keyspace, ksvs, ddl) if err != nil { - err = vterrors.Wrapf(err, "ApplyVSchemaDDL(%s,%v,%v)", req.Keyspace, vs, ddl) + err = vterrors.Wrapf(err, "ApplyVSchemaDDL(%s,%v,%v)", req.Keyspace, ksvs, ddl) return nil, err } } else { // "jsonMode" span.Annotate("sql_mode", false) - vs = req.VSchema + ksvs.Keyspace = req.VSchema } - ksVs, err := vindexes.BuildKeyspace(vs, s.ws.SQLParser()) + ksVs, err := vindexes.BuildKeyspace(ksvs.Keyspace, s.ws.SQLParser()) if err != nil { err = vterrors.Wrapf(err, "BuildKeyspace(%s)", req.Keyspace) return nil, err } response := &vtctldatapb.ApplyVSchemaResponse{ - VSchema: vs, + VSchema: ksvs.Keyspace, UnknownVindexParams: make(map[string]*vtctldatapb.ApplyVSchemaResponse_ParamList), } @@ -409,7 +411,7 @@ func (s *VtctldServer) ApplyVSchema(ctx context.Context, req *vtctldatapb.ApplyV return response, err } - if err = s.ts.SaveVSchema(ctx, req.Keyspace, vs); err != nil { + if err = s.ts.SaveVSchema(ctx, ksvs); err != nil { err = vterrors.Wrapf(err, "SaveVSchema(%s, %v)", req.Keyspace, req.VSchema) return nil, err } @@ -425,7 +427,7 @@ func (s *VtctldServer) ApplyVSchema(ctx context.Context, req *vtctldatapb.ApplyV err = vterrors.Wrapf(err, "GetVSchema(%s)", req.Keyspace) return nil, err } - response.VSchema = updatedVS + response.VSchema = updatedVS.Keyspace return response, nil } @@ -934,13 +936,17 @@ func (s *VtctldServer) CreateKeyspace(ctx context.Context, req *vtctldatapb.Crea } if req.Type == topodatapb.KeyspaceType_SNAPSHOT { - var vs *vschemapb.Keyspace - vs, err = s.ts.GetVSchema(ctx, req.BaseKeyspace) + // Copy vschema from the base keyspace. + bksvs, err := s.ts.GetVSchema(ctx, req.BaseKeyspace) + ksvs := &topo.KeyspaceVSchemaInfo{ + Name: req.Name, + } if err != nil { log.Infof("error from GetVSchema(%v) = %v", req.BaseKeyspace, err) if topo.IsErrType(err, topo.NoNode) { log.Infof("base keyspace %v does not exist; continuing with bare, unsharded vschema", req.BaseKeyspace) - vs = &vschemapb.Keyspace{ + // Create an empty vschema for the keyspace. + ksvs.Keyspace = &vschemapb.Keyspace{ Sharded: false, Tables: map[string]*vschemapb.Table{}, Vindexes: map[string]*vschemapb.Vindex{}, @@ -950,11 +956,13 @@ func (s *VtctldServer) CreateKeyspace(ctx context.Context, req *vtctldatapb.Crea } } + // Copy the vschema from the base keyspace to the new one. + ksvs.Keyspace = bksvs.Keyspace.CloneVT() // SNAPSHOT keyspaces are excluded from global routing. - vs.RequireExplicitRouting = true + ksvs.RequireExplicitRouting = true - if err = s.ts.SaveVSchema(ctx, req.Name, vs); err != nil { - err = fmt.Errorf("SaveVSchema(%v) = %w", vs, err) + if err = s.ts.SaveVSchema(ctx, ksvs); err != nil { + err = fmt.Errorf("SaveVSchema(%v) = %w", ksvs, err) return nil, err } } @@ -2656,13 +2664,13 @@ func (s *VtctldServer) GetVSchema(ctx context.Context, req *vtctldatapb.GetVSche span.Annotate("keyspace", req.Keyspace) - vschema, err := s.ts.GetVSchema(ctx, req.Keyspace) + ks, err := s.ts.GetVSchema(ctx, req.Keyspace) if err != nil { return nil, err } return &vtctldatapb.GetVSchemaResponse{ - VSchema: vschema, + VSchema: ks.Keyspace, }, nil } diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 1a4735b1c82..fadb1423a86 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -1885,31 +1885,35 @@ func commandCreateKeyspace(ctx context.Context, wr *wrangler.Wrangler, subFlags } if ktype == topodatapb.KeyspaceType_SNAPSHOT { - // copy vschema from base keyspace - vs, err := wr.TopoServer().GetVSchema(ctx, *baseKeyspace) + // Copy vschema from the base keyspace. + bksvs, err := wr.TopoServer().GetVSchema(ctx, *baseKeyspace) + ksvs := &topo.KeyspaceVSchemaInfo{ + Name: keyspace, + } if err != nil { wr.Logger().Infof("error from GetVSchema for base_keyspace: %v, %v", *baseKeyspace, err) if topo.IsErrType(err, topo.NoNode) { - vs = &vschemapb.Keyspace{ - Sharded: false, - Tables: make(map[string]*vschemapb.Table), - Vindexes: make(map[string]*vschemapb.Vindex), - RequireExplicitRouting: true, + // Create an empty vschema for the keyspace. + ksvs.Keyspace = &vschemapb.Keyspace{ + Sharded: false, + Tables: make(map[string]*vschemapb.Table), + Vindexes: make(map[string]*vschemapb.Vindex), } } else { return err } - } else { - // SNAPSHOT keyspaces are excluded from global routing. - vs.RequireExplicitRouting = true } - if err := wr.TopoServer().SaveVSchema(ctx, keyspace, vs); err != nil { - wr.Logger().Infof("error from SaveVSchema %v:%v", vs, err) + // Copy the vschema from the base keyspace to the new one. + ksvs.Keyspace = bksvs.Keyspace.CloneVT() + // SNAPSHOT keyspaces are excluded from global routing. + ksvs.RequireExplicitRouting = true + if err := wr.TopoServer().SaveVSchema(ctx, ksvs); err != nil { + wr.Logger().Infof("error from SaveVSchema %v:%v", ksvs, err) return err } } - return wr.TopoServer().RebuildSrvVSchema(ctx, []string{} /* cells */) + return wr.TopoServer().RebuildSrvVSchema(ctx, nil /* cells */) } func commandDeleteKeyspace(ctx context.Context, wr *wrangler.Wrangler, subFlags *pflag.FlagSet, args []string) error { @@ -3343,7 +3347,7 @@ func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *p } keyspace := subFlags.Arg(0) - var vs *vschemapb.Keyspace + var ksvs *topo.KeyspaceVSchemaInfo var err error sqlMode := (*sql != "") != (*sqlFile != "") @@ -3375,16 +3379,19 @@ func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *p return fmt.Errorf("error parsing vschema statement `%s`: not a ddl statement", *sql) } - vs, err = wr.TopoServer().GetVSchema(ctx, keyspace) + ksvs, err = wr.TopoServer().GetVSchema(ctx, keyspace) if err != nil { if topo.IsErrType(err, topo.NoNode) { - vs = &vschemapb.Keyspace{} + ksvs = &topo.KeyspaceVSchemaInfo{ + Name: keyspace, + Keyspace: &vschemapb.Keyspace{}, + } } else { return err } } - vs, err = topotools.ApplyVSchemaDDL(keyspace, vs, ddl) + ksvs, err = topotools.ApplyVSchemaDDL(keyspace, ksvs, ddl) if err != nil { return err } @@ -3402,14 +3409,17 @@ func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *p schema = []byte(*vschema) } - vs = &vschemapb.Keyspace{} - err := json2.UnmarshalPB(schema, vs) + ksvs = &topo.KeyspaceVSchemaInfo{ + Name: keyspace, + Keyspace: &vschemapb.Keyspace{}, + } + err := json2.UnmarshalPB(schema, ksvs.Keyspace) if err != nil { return err } } - b, err := json2.MarshalIndentPB(vs, " ") + b, err := json2.MarshalIndentPB(ksvs.Keyspace, " ") if err != nil { wr.Logger().Errorf2(err, "Failed to marshal VSchema for display") } else { @@ -3417,7 +3427,7 @@ func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *p } // Validate the VSchema. - ksVs, err := vindexes.BuildKeyspace(vs, wr.SQLParser()) + ksVs, err := vindexes.BuildKeyspace(ksvs.Keyspace, wr.SQLParser()) if err != nil { return err } @@ -3449,11 +3459,11 @@ func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *p return err } - if _, err := vindexes.BuildKeyspace(vs, wr.SQLParser()); err != nil { + if _, err := vindexes.BuildKeyspace(ksvs.Keyspace, wr.SQLParser()); err != nil { return err } - if err := wr.TopoServer().SaveVSchema(ctx, keyspace, vs); err != nil { + if err := wr.TopoServer().SaveVSchema(ctx, ksvs); err != nil { return err } diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index c65a00bf614..8cd99d384ee 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -283,7 +283,7 @@ func (mz *materializer) deploySchema() error { // We do, however, allow the user to override this behavior and retain them. removeAutoInc := false updatedVSchema := false - var targetVSchema *vschemapb.Keyspace + var targetVSchema *topo.KeyspaceVSchemaInfo if mz.workflowType == binlogdatapb.VReplicationWorkflowType_MoveTables && (mz.targetVSchema != nil && mz.targetVSchema.Keyspace != nil && mz.targetVSchema.Keyspace.Sharded) && (mz.ms.GetWorkflowOptions() != nil && mz.ms.GetWorkflowOptions().ShardedAutoIncrementHandling != vtctldatapb.ShardedAutoIncrementHandling_LEAVE) { @@ -472,7 +472,7 @@ func (mz *materializer) deploySchema() error { } if updatedVSchema { - return mz.ts.SaveVSchema(mz.ctx, mz.ms.TargetKeyspace, targetVSchema) + return mz.ts.SaveVSchema(mz.ctx, targetVSchema) } return nil @@ -485,7 +485,7 @@ func (mz *materializer) buildMaterializer() error { if err != nil { return err } - targetVSchema, err := vindexes.BuildKeyspaceSchema(vschema, ms.TargetKeyspace, mz.env.Parser()) + targetVSchema, err := vindexes.BuildKeyspaceSchema(vschema.Keyspace, ms.TargetKeyspace, mz.env.Parser()) if err != nil { return err } @@ -563,7 +563,7 @@ func (mz *materializer) buildMaterializer() error { if err != nil { return fmt.Errorf("failed to get source keyspace vschema: %v", err) } - differentPVs = primaryVindexesDiffer(ms, sourceVSchema, vschema) + differentPVs = primaryVindexesDiffer(ms, sourceVSchema.Keyspace, vschema.Keyspace) mz.targetVSchema = targetVSchema mz.sourceShards = sourceShards diff --git a/go/vt/vtctl/workflow/resharder.go b/go/vt/vtctl/workflow/resharder.go index e3f7380af69..c270a9a6f0b 100644 --- a/go/vt/vtctl/workflow/resharder.go +++ b/go/vt/vtctl/workflow/resharder.go @@ -38,7 +38,6 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" - vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) @@ -50,7 +49,7 @@ type resharder struct { sourcePrimaries map[string]*topo.TabletInfo targetShards []*topo.ShardInfo targetPrimaries map[string]*topo.TabletInfo - vschema *vschemapb.Keyspace + vschema *topo.KeyspaceVSchemaInfo refStreams map[string]*refStream // This can be single cell name or cell alias but it can // also be a comma-separated list of cells. diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index baea602b7a4..c31992f1e6b 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -576,7 +576,7 @@ func (s *Server) LookupVindexCreate(ctx context.Context, req *vtctldatapb.Lookup return nil, err } - if err := s.ts.SaveVSchema(ctx, ms.TargetKeyspace, targetVSchema); err != nil { + if err := s.ts.SaveVSchema(ctx, targetVSchema); err != nil { return nil, vterrors.Wrapf(err, "failed to save updated vschema '%v' in the %s keyspace", targetVSchema, ms.TargetKeyspace) } @@ -592,7 +592,7 @@ func (s *Server) LookupVindexCreate(ctx context.Context, req *vtctldatapb.Lookup return nil, err } if ms.SourceKeyspace != ms.TargetKeyspace { - if err := s.ts.SaveVSchema(ctx, ms.SourceKeyspace, sourceVSchema); err != nil { + if err := s.ts.SaveVSchema(ctx, sourceVSchema); err != nil { return nil, vterrors.Wrapf(err, "failed to save updated vschema '%v' in the %s keyspace", sourceVSchema, ms.SourceKeyspace) } @@ -687,7 +687,7 @@ func (s *Server) LookupVindexExternalize(ctx context.Context, req *vtctldatapb.L // Remove the write_only param and save the source vschema. delete(vindex.Params, "write_only") - if err := s.ts.SaveVSchema(ctx, req.Keyspace, sourceVschema); err != nil { + if err := s.ts.SaveVSchema(ctx, sourceVschema); err != nil { return nil, err } return resp, s.ts.RebuildSrvVSchema(ctx, nil) @@ -800,9 +800,10 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl s.Logger().Infof("Successfully opened external topo: %+v", externalTopo) } - var vschema *vschemapb.Keyspace - var origVSchema *vschemapb.Keyspace // If we need to rollback a failed create - vschema, err = s.ts.GetVSchema(ctx, targetKeyspace) + origVSchema := &topo.KeyspaceVSchemaInfo{ // If we need to rollback a failed create + Name: targetKeyspace, + } + vschema, err := s.ts.GetVSchema(ctx, targetKeyspace) if err != nil { return nil, err } @@ -859,11 +860,11 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl if !vschema.Sharded { // Save the original in case we need to restore it for a late failure in // the defer(). - origVSchema = vschema.CloneVT() - if err := s.addTablesToVSchema(ctx, sourceKeyspace, vschema, tables, externalTopo == nil); err != nil { + origVSchema.Keyspace = vschema.CloneVT() + if err := s.addTablesToVSchema(ctx, sourceKeyspace, vschema.Keyspace, tables, externalTopo == nil); err != nil { return nil, err } - if err := s.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil { + if err := s.ts.SaveVSchema(ctx, vschema); err != nil { return nil, err } } @@ -964,7 +965,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl if origVSchema == nil { // There's no previous version to restore return } - if cerr := s.ts.SaveVSchema(ctx, targetKeyspace, origVSchema); cerr != nil { + if cerr := s.ts.SaveVSchema(ctx, origVSchema); cerr != nil { err = vterrors.Wrapf(err, "failed to restore original target vschema: %v", cerr) } } @@ -2310,7 +2311,7 @@ func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workf if err != nil { return nil, err } - ts.sourceKSSchema, err = vindexes.BuildKeyspaceSchema(vs, ts.sourceKeyspace, s.env.Parser()) + ts.sourceKSSchema, err = vindexes.BuildKeyspaceSchema(vs.Keyspace, ts.sourceKeyspace, s.env.Parser()) if err != nil { return nil, err } @@ -3411,7 +3412,7 @@ func fillStringTemplate(tmpl string, vars any) (string, error) { // prepareCreateLookup performs the preparatory steps for creating a // Lookup Vindex. func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace string, specs *vschemapb.Keyspace, continueAfterCopyWithOwner bool) ( - ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *vschemapb.Keyspace, cancelFunc func() error, err error) { + ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *topo.KeyspaceVSchemaInfo, cancelFunc func() error, err error) { // Important variables are pulled out here. var ( vindexName string @@ -3702,7 +3703,10 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str // Save a copy of the original vschema if we modify it and need to provide // a cancelFunc. - ogTargetVSchema := targetVSchema.CloneVT() + ogTargetVSchema := &topo.KeyspaceVSchemaInfo{ + Name: targetKeyspace, + } + ogTargetVSchema.Keyspace = targetVSchema.CloneVT() targetChanged := false // Update targetVSchema. @@ -3769,7 +3773,7 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str if targetChanged { cancelFunc = func() error { // Restore the original target vschema. - return s.ts.SaveVSchema(ctx, targetKeyspace, ogTargetVSchema) + return s.ts.SaveVSchema(ctx, ogTargetVSchema) } } diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 4fc34992b0f..3b151103d7f 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -409,7 +409,7 @@ func (ts *trafficSwitcher) addParticipatingTablesToKeyspace(ctx context.Context, vschema.Tables[table] = &vschemapb.Table{} } } - return ts.TopoServer().SaveVSchema(ctx, keyspace, vschema) + return ts.TopoServer().SaveVSchema(ctx, vschema) } func (ts *trafficSwitcher) deleteRoutingRules(ctx context.Context) error { @@ -538,7 +538,7 @@ func (ts *trafficSwitcher) dropParticipatingTablesFromKeyspace(ctx context.Conte for _, tableName := range ts.Tables() { delete(vschema.Tables, tableName) } - return ts.TopoServer().SaveVSchema(ctx, keyspace, vschema) + return ts.TopoServer().SaveVSchema(ctx, vschema) } func (ts *trafficSwitcher) removeSourceTables(ctx context.Context, removalType TableRemovalType) error { @@ -1013,7 +1013,7 @@ func (ts *trafficSwitcher) buildTenantPredicate(ctx context.Context) (*sqlparser if err != nil { return nil, err } - targetVSchema, err := vindexes.BuildKeyspaceSchema(vschema, ts.targetKeyspace, parser) + targetVSchema, err := vindexes.BuildKeyspaceSchema(vschema.Keyspace, ts.targetKeyspace, parser) if err != nil { return nil, err } @@ -1441,7 +1441,7 @@ func (ts *trafficSwitcher) getTargetSequenceMetadata(ctx context.Context) (map[s return nil, nil } - sequencesByBackingTable, backingTablesFound, err := ts.findSequenceUsageInKeyspace(vschema) + sequencesByBackingTable, backingTablesFound, err := ts.findSequenceUsageInKeyspace(vschema.Keyspace) if err != nil { return nil, err } @@ -1609,7 +1609,7 @@ func (ts trafficSwitcher) createMissingSequenceTables(ctx context.Context, seque } } if updatedGlobalVSchema { - err = ts.ws.ts.SaveVSchema(ctx, globalKeyspace, globalVSchema) + err = ts.ws.ts.SaveVSchema(ctx, globalVSchema) if err != nil { return vterrors.Wrapf(err, "failed to update vschema in the global-keyspace %s", globalKeyspace) } diff --git a/go/vt/vtgate/executorcontext/vcursor_impl.go b/go/vt/vtgate/executorcontext/vcursor_impl.go index c1f341b38cf..3cf37625dcc 100644 --- a/go/vt/vtgate/executorcontext/vcursor_impl.go +++ b/go/vt/vtgate/executorcontext/vcursor_impl.go @@ -122,7 +122,7 @@ type ( // VSchemaOperator is an interface to Vschema Operations VSchemaOperator interface { GetCurrentSrvVschema() *vschemapb.SrvVSchema - UpdateVSchema(ctx context.Context, ksName string, vschema *vschemapb.SrvVSchema) error + UpdateVSchema(ctx context.Context, ks *topo.KeyspaceVSchemaInfo, vschema *vschemapb.SrvVSchema) error } // VCursorImpl implements the VCursor functionality used by dependent @@ -1359,7 +1359,10 @@ func (vc *VCursorImpl) ExecuteVSchema(ctx context.Context, keyspace string, vsch } // Resolve the keyspace either from the table qualifier or the target keyspace - var ksName string + var ( + ksName string + err error + ) if !vschemaDDL.Table.IsEmpty() { ksName = vschemaDDL.Table.Qualifier.String() } @@ -1370,15 +1373,26 @@ func (vc *VCursorImpl) ExecuteVSchema(ctx context.Context, keyspace string, vsch return ErrNoKeyspace } - ks := srvVschema.Keyspaces[ksName] - ks, err := topotools.ApplyVSchemaDDL(ksName, ks, vschemaDDL) + ksvs := &topo.KeyspaceVSchemaInfo{} + if vc.topoServer != nil { + // Get the most recent version if we can. + ksvs, err = vc.topoServer.GetVSchema(ctx, ksName) + if err != nil { + return err + } + } else { + // Use the cached version. + ksvs.Name = ksName + ksvs.Keyspace = srvVschema.Keyspaces[ksName] + } + ksvs, err = topotools.ApplyVSchemaDDL(ksName, ksvs, vschemaDDL) if err != nil { return err } - srvVschema.Keyspaces[ksName] = ks + srvVschema.Keyspaces[ksName] = ksvs.Keyspace - return vc.vm.UpdateVSchema(ctx, ksName, srvVschema) + return vc.vm.UpdateVSchema(ctx, ksvs, srvVschema) } func (vc *VCursorImpl) MessageStream(ctx context.Context, rss []*srvtopo.ResolvedShard, tableName string, callback func(*sqltypes.Result) error) error { diff --git a/go/vt/vtgate/vschema_manager.go b/go/vt/vtgate/vschema_manager.go index 62ea2cd3455..15ca140094f 100644 --- a/go/vt/vtgate/vschema_manager.go +++ b/go/vt/vtgate/vschema_manager.go @@ -64,20 +64,18 @@ func (vm *VSchemaManager) GetCurrentSrvVschema() *vschemapb.SrvVSchema { // UpdateVSchema propagates the updated vschema to the topo. The entry for // the given keyspace is updated in the global topo, and the full SrvVSchema // is updated in all known cells. -func (vm *VSchemaManager) UpdateVSchema(ctx context.Context, ksName string, vschema *vschemapb.SrvVSchema) error { +func (vm *VSchemaManager) UpdateVSchema(ctx context.Context, ks *topo.KeyspaceVSchemaInfo, srv *vschemapb.SrvVSchema) error { topoServer, err := vm.serv.GetTopoServer() if err != nil { return err } - ks := vschema.Keyspaces[ksName] - - _, err = vindexes.BuildKeyspace(ks, vm.parser) + _, err = vindexes.BuildKeyspace(ks.Keyspace, vm.parser) if err != nil { return err } - err = topoServer.SaveVSchema(ctx, ksName, ks) + err = topoServer.SaveVSchema(ctx, ks) if err != nil { return err } @@ -89,7 +87,7 @@ func (vm *VSchemaManager) UpdateVSchema(ctx context.Context, ksName string, vsch // even if one cell fails, continue to try the others for _, cell := range cells { - cellErr := topoServer.UpdateSrvVSchema(ctx, cell, vschema) + cellErr := topoServer.UpdateSrvVSchema(ctx, cell, srv) if cellErr != nil { err = cellErr log.Errorf("error updating vschema in cell %s: %v", cell, cellErr) @@ -100,7 +98,7 @@ func (vm *VSchemaManager) UpdateVSchema(ctx context.Context, ksName string, vsch } // Update all the local copy of VSchema if the topo update is successful. - vm.VSchemaUpdate(vschema, err) + vm.VSchemaUpdate(srv, err) return nil } diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index bd7ae553130..96067cd64e4 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -148,8 +148,10 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta log.Infof("Successfully opened external topo: %+v", externalTopo) } - var vschema *vschemapb.Keyspace - var origVSchema *vschemapb.Keyspace // If we need to rollback a failed create + var vschema *topo.KeyspaceVSchemaInfo + origVSchema := &topo.KeyspaceVSchemaInfo{ // If we need to rollback a failed create + Name: targetKeyspace, + } vschema, err = wr.ts.GetVSchema(ctx, targetKeyspace) if err != nil { return err @@ -214,8 +216,8 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta if !vschema.Sharded { // Save the original in case we need to restore it for a late failure // in the defer(). - origVSchema = vschema.CloneVT() - if err := wr.addTablesToVSchema(ctx, sourceKeyspace, vschema, tables, externalTopo == nil); err != nil { + origVSchema.Keyspace = vschema.CloneVT() + if err := wr.addTablesToVSchema(ctx, sourceKeyspace, vschema.Keyspace, tables, externalTopo == nil); err != nil { return err } } @@ -280,7 +282,7 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta if origVSchema == nil { // There's no previous version to restore return } - if cerr := wr.ts.SaveVSchema(ctx, targetKeyspace, origVSchema); cerr != nil { + if cerr := wr.ts.SaveVSchema(ctx, origVSchema); cerr != nil { err = vterrors.Wrapf(err, "failed to restore original target vschema: %v", cerr) } } @@ -321,7 +323,7 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta } // We added to the vschema. - if err := wr.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil { + if err := wr.ts.SaveVSchema(ctx, vschema); err != nil { return err } } @@ -477,7 +479,7 @@ func (wr *Wrangler) CreateLookupVindex(ctx context.Context, keyspace string, spe if err != nil { return err } - if err := wr.ts.SaveVSchema(ctx, ms.TargetKeyspace, targetVSchema); err != nil { + if err := wr.ts.SaveVSchema(ctx, targetVSchema); err != nil { return err } ms.Cell = cell @@ -495,7 +497,7 @@ func (wr *Wrangler) CreateLookupVindex(ctx context.Context, keyspace string, spe if err := wr.Materialize(ctx, ms); err != nil { return err } - if err := wr.ts.SaveVSchema(ctx, keyspace, sourceVSchema); err != nil { + if err := wr.ts.SaveVSchema(ctx, sourceVSchema); err != nil { return err } @@ -503,7 +505,7 @@ func (wr *Wrangler) CreateLookupVindex(ctx context.Context, keyspace string, spe } // prepareCreateLookup performs the preparatory steps for creating a lookup vindex. -func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, specs *vschemapb.Keyspace, continueAfterCopyWithOwner bool) (ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *vschemapb.Keyspace, err error) { +func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, specs *vschemapb.Keyspace, continueAfterCopyWithOwner bool) (ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *topo.KeyspaceVSchemaInfo, err error) { // Important variables are pulled out here. var ( // lookup vindex info @@ -928,7 +930,7 @@ func (wr *Wrangler) ExternalizeVindex(ctx context.Context, qualifiedVindexName s // Remove the write_only param and save the source vschema. delete(sourceVindex.Params, "write_only") - if err := wr.ts.SaveVSchema(ctx, sourceKeyspace, sourceVSchema); err != nil { + if err := wr.ts.SaveVSchema(ctx, sourceVSchema); err != nil { return err } return wr.ts.RebuildSrvVSchema(ctx, nil) @@ -1062,7 +1064,7 @@ func (wr *Wrangler) buildMaterializer(ctx context.Context, ms *vtctldatapb.Mater if err != nil { return nil, err } - targetVSchema, err := vindexes.BuildKeyspaceSchema(vschema, ms.TargetKeyspace, wr.env.Parser()) + targetVSchema, err := vindexes.BuildKeyspaceSchema(vschema.Keyspace, ms.TargetKeyspace, wr.env.Parser()) if err != nil { return nil, err } @@ -1129,7 +1131,7 @@ func (wr *Wrangler) buildMaterializer(ctx context.Context, ms *vtctldatapb.Mater if err != nil { return nil, fmt.Errorf("failed to get source keyspace vschema: %v", err) } - differentPVs = primaryVindexesDiffer(ms, sourceVSchema, vschema) + differentPVs = primaryVindexesDiffer(ms, sourceVSchema.Keyspace, vschema.Keyspace) return &materializer{ wr: wr, diff --git a/go/vt/wrangler/resharder.go b/go/vt/wrangler/resharder.go index b041ce32041..09004032dd3 100644 --- a/go/vt/wrangler/resharder.go +++ b/go/vt/wrangler/resharder.go @@ -39,7 +39,6 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - vschemapb "vitess.io/vitess/go/vt/proto/vschema" ) type resharder struct { @@ -50,7 +49,7 @@ type resharder struct { sourcePrimaries map[string]*topo.TabletInfo targetShards []*topo.ShardInfo targetPrimaries map[string]*topo.TabletInfo - vschema *vschemapb.Keyspace + vschema *topo.KeyspaceVSchemaInfo refStreams map[string]*refStream cell string //single cell or cellsAlias or comma-separated list of cells/cellsAliases tabletTypes string diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index d337c1ee515..9006e8b1555 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -995,7 +995,7 @@ func (wr *Wrangler) buildTrafficSwitcher(ctx context.Context, targetKeyspace, wo if err != nil { return nil, err } - ts.sourceKSSchema, err = vindexes.BuildKeyspaceSchema(vs, ts.sourceKeyspace, wr.env.Parser()) + ts.sourceKSSchema, err = vindexes.BuildKeyspaceSchema(vs.Keyspace, ts.sourceKeyspace, wr.env.Parser()) if err != nil { return nil, err } @@ -1833,7 +1833,7 @@ func (ts *trafficSwitcher) dropParticipatingTablesFromKeyspace(ctx context.Conte for _, tableName := range ts.Tables() { delete(vschema.Tables, tableName) } - return ts.TopoServer().SaveVSchema(ctx, keyspace, vschema) + return ts.TopoServer().SaveVSchema(ctx, vschema) } // FIXME: even after dropSourceShards there are still entries in the topo, need to research and fix @@ -1995,7 +1995,7 @@ func (ts *trafficSwitcher) addParticipatingTablesToKeyspace(ctx context.Context, vschema.Tables[table] = &vschemapb.Table{} } } - return ts.TopoServer().SaveVSchema(ctx, keyspace, vschema) + return ts.TopoServer().SaveVSchema(ctx, vschema) } func (ts *trafficSwitcher) isSequenceParticipating(ctx context.Context) (bool, error) { @@ -2034,7 +2034,7 @@ func (ts *trafficSwitcher) getTargetSequenceMetadata(ctx context.Context) (map[s return nil, nil } - sequencesByBackingTable, backingTablesFound, err := ts.findSequenceUsageInKeyspace(vschema) + sequencesByBackingTable, backingTablesFound, err := ts.findSequenceUsageInKeyspace(vschema.Keyspace) if err != nil { return nil, err }