From fa79c3fef83f3f20da5a1e8ced53e7fa608928e4 Mon Sep 17 00:00:00 2001 From: Noble Mittal <62551163+beingnoble03@users.noreply.github.com> Date: Sun, 22 Dec 2024 16:08:26 +0530 Subject: [PATCH] Refactor `Server.LookupVindexCreate` (#17242) Signed-off-by: Noble Mittal --- go/vt/vtctl/workflow/lookup_vindex.go | 542 ++++++++++++++++++++++ go/vt/vtctl/workflow/materializer_test.go | 69 +-- go/vt/vtctl/workflow/server.go | 433 +---------------- 3 files changed, 597 insertions(+), 447 deletions(-) create mode 100644 go/vt/vtctl/workflow/lookup_vindex.go diff --git a/go/vt/vtctl/workflow/lookup_vindex.go b/go/vt/vtctl/workflow/lookup_vindex.go new file mode 100644 index 00000000000..cf9b4833c28 --- /dev/null +++ b/go/vt/vtctl/workflow/lookup_vindex.go @@ -0,0 +1,542 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workflow + +import ( + "context" + "fmt" + "slices" + "strings" + + "golang.org/x/exp/maps" + "google.golang.org/protobuf/proto" + + "vitess.io/vitess/go/sqlescape" + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/schema" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vtctl/schematools" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/vindexes" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" +) + +// lookupVindex is responsible for performing actions related to lookup vindexes. +type lookupVindex struct { + ts *topo.Server + tmc tmclient.TabletManagerClient + + logger logutil.Logger + parser *sqlparser.Parser +} + +// newLookupVindex creates a new lookupVindex instance which is responsible +// for performing actions related to lookup vindexes. +func newLookupVindex(ws *Server) *lookupVindex { + return &lookupVindex{ + ts: ws.ts, + tmc: ws.tmc, + logger: ws.Logger(), + parser: ws.SQLParser(), + } +} + +// prepareCreate performs the preparatory steps for creating a Lookup Vindex. +func (lv *lookupVindex) prepareCreate(ctx context.Context, workflow, keyspace string, specs *vschemapb.Keyspace, continueAfterCopyWithOwner bool) ( + ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *vschemapb.Keyspace, cancelFunc func() error, err error) { + var ( + // sourceVSchemaTable is the table info present in the vschema. + sourceVSchemaTable *vschemapb.Table + // sourceVindexColumns are computed from the input sourceTable. + sourceVindexColumns []string + + // Target table info. + createDDL string + materializeQuery string + ) + + // Validate input vindex. + vindex, vInfo, err := lv.validateAndGetVindex(specs) + if err != nil { + return nil, nil, nil, nil, err + } + + vInfo.sourceTable, vInfo.sourceTableName, err = getSourceTable(specs, vInfo.targetTableName, vInfo.fromCols) + if err != nil { + return nil, nil, nil, nil, err + } + + sourceVindexColumns, err = validateSourceTableAndGetVindexColumns(vInfo, vindex, keyspace) + if err != nil { + return nil, nil, nil, nil, err + } + + sourceVSchema, targetVSchema, err = lv.getTargetAndSourceVSchema(ctx, keyspace, vInfo.targetKeyspace) + if err != nil { + return nil, nil, nil, nil, err + } + + if existing, ok := sourceVSchema.Vindexes[vInfo.name]; ok { + if !proto.Equal(existing, vindex) { // If the exact same vindex already exists then we can re-use it + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "a conflicting vindex named %s already exists in the %s keyspace", + vInfo.name, keyspace) + } + } + + sourceVSchemaTable = sourceVSchema.Tables[vInfo.sourceTableName] + if sourceVSchemaTable == nil && !schema.IsInternalOperationTableName(vInfo.sourceTableName) { + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "table %s not found in the %s keyspace", vInfo.sourceTableName, keyspace) + } + if err := validateNonConflictingColumnVindex(sourceVSchemaTable, vInfo, sourceVindexColumns, keyspace); err != nil { + return nil, nil, nil, nil, err + } + + // Validate against source schema. + sourceShards, err := lv.ts.GetServingShards(ctx, keyspace) + if err != nil { + return nil, nil, nil, nil, err + } + onesource := sourceShards[0] + if onesource.PrimaryAlias == nil { + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "source shard %s has no primary", onesource.ShardName()) + } + + req := &tabletmanagerdatapb.GetSchemaRequest{Tables: []string{vInfo.sourceTableName}} + tableSchema, err := schematools.GetSchema(ctx, lv.ts, lv.tmc, onesource.PrimaryAlias, req) + if err != nil { + return nil, nil, nil, nil, err + } + if len(tableSchema.TableDefinitions) != 1 { + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected number of tables (%d) returned from %s schema", + len(tableSchema.TableDefinitions), keyspace) + } + + // Generate "create table" statement. + createDDL, err = lv.generateCreateDDLStatement(tableSchema, sourceVindexColumns, vInfo, vindex) + if err != nil { + return nil, nil, nil, nil, err + } + + // Generate vreplication query. + materializeQuery = generateMaterializeQuery(vInfo, vindex, sourceVindexColumns) + + // Save a copy of the original vschema if we modify it and need to provide + // a cancelFunc. + ogTargetVSchema := targetVSchema.CloneVT() + targetChanged := false + + // Update targetVSchema. + targetTable := specs.Tables[vInfo.targetTableName] + if targetVSchema.Sharded { + // Choose a primary vindex type for the lookup table based on the source + // definition if one was not explicitly specified. + var targetVindexType string + var targetVindex *vschemapb.Vindex + for _, field := range tableSchema.TableDefinitions[0].Fields { + if sourceVindexColumns[0] == field.Name { + if targetTable != nil && len(targetTable.ColumnVindexes) > 0 { + targetVindexType = targetTable.ColumnVindexes[0].Name + } + if targetVindexType == "" { + targetVindexType, err = vindexes.ChooseVindexForType(field.Type) + if err != nil { + return nil, nil, nil, nil, err + } + } + targetVindex = &vschemapb.Vindex{ + Type: targetVindexType, + } + break + } + } + if targetVindex == nil { + // Unreachable. We validated column names when generating the DDL. + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "column %s not found in target schema %s", + sourceVindexColumns[0], tableSchema.TableDefinitions[0].Schema) + } + + if existing, ok := targetVSchema.Vindexes[targetVindexType]; ok { + if !proto.Equal(existing, targetVindex) { + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting vindex named %v already exists in the %s keyspace", + targetVindexType, vInfo.targetKeyspace) + } + } else { + targetVSchema.Vindexes[targetVindexType] = targetVindex + targetChanged = true + } + + targetTable = &vschemapb.Table{ + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: vInfo.fromCols[0], + Name: targetVindexType, + }}, + } + } else { + targetTable = &vschemapb.Table{} + } + if existing, ok := targetVSchema.Tables[vInfo.targetTableName]; ok { + if !proto.Equal(existing, targetTable) { + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting table named %s already exists in the %s vschema", + vInfo.targetTableName, vInfo.targetKeyspace) + } + } else { + targetVSchema.Tables[vInfo.targetTableName] = targetTable + targetChanged = true + } + + if targetChanged { + cancelFunc = func() error { + // Restore the original target vschema. + return lv.ts.SaveVSchema(ctx, vInfo.targetKeyspace, ogTargetVSchema) + } + } + + ms = &vtctldatapb.MaterializeSettings{ + Workflow: workflow, + MaterializationIntent: vtctldatapb.MaterializationIntent_CREATELOOKUPINDEX, + SourceKeyspace: keyspace, + TargetKeyspace: vInfo.targetKeyspace, + StopAfterCopy: vindex.Owner != "" && !continueAfterCopyWithOwner, + TableSettings: []*vtctldatapb.TableMaterializeSettings{{ + TargetTable: vInfo.targetTableName, + SourceExpression: materializeQuery, + CreateDdl: createDDL, + }}, + } + + // Update sourceVSchema + sourceVSchema.Vindexes[vInfo.name] = vindex + sourceVSchemaTable.ColumnVindexes = append(sourceVSchemaTable.ColumnVindexes, vInfo.sourceTable.ColumnVindexes[0]) + + return ms, sourceVSchema, targetVSchema, cancelFunc, nil +} + +// vindexInfo holds the validated vindex configuration +type vindexInfo struct { + name string + targetKeyspace string + targetTableName string + fromCols []string + toCol string + ignoreNulls bool + + // sourceTable is the supplied table info. + sourceTable *vschemapb.Table + sourceTableName string +} + +// validateAndGetVindex validates and extracts vindex configuration +func (lv *lookupVindex) validateAndGetVindex(specs *vschemapb.Keyspace) (*vschemapb.Vindex, *vindexInfo, error) { + if specs == nil { + return nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "no vindex provided") + } + if len(specs.Vindexes) != 1 { + return nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "only one vindex must be specified") + } + + vindexName := maps.Keys(specs.Vindexes)[0] + vindex := maps.Values(specs.Vindexes)[0] + + if !strings.Contains(vindex.Type, "lookup") { + return nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex %s is not a lookup type", vindex.Type) + } + + targetKeyspace, targetTableName, err := lv.parser.ParseTable(vindex.Params["table"]) + if err != nil || targetKeyspace == "" { + return nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, + "vindex table name (%s) must be in the form .", vindex.Params["table"]) + } + + vindexFromCols := strings.Split(vindex.Params["from"], ",") + for i, col := range vindexFromCols { + vindexFromCols[i] = strings.TrimSpace(col) + } + + if strings.Contains(vindex.Type, "unique") { + if len(vindexFromCols) != 1 { + return nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unique vindex 'from' should have only one column") + } + } + + vindexToCol := vindex.Params["to"] + // Make the vindex write_only. If one exists already in the vschema, + // it will need to match this vindex exactly, including the write_only setting. + vindex.Params["write_only"] = "true" + + // See if we can create the vindex without errors. + if _, err := vindexes.CreateVindex(vindex.Type, vindexName, vindex.Params); err != nil { + return nil, nil, err + } + + ignoreNulls := false + if ignoreNullsStr, ok := vindex.Params["ignore_nulls"]; ok { + // This mirrors the behavior of vindexes.boolFromMap(). + switch ignoreNullsStr { + case "true": + ignoreNulls = true + case "false": + ignoreNulls = false + default: + return nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "ignore_nulls (%s) value must be 'true' or 'false'", + ignoreNullsStr) + } + } + + // Validate input table. + if len(specs.Tables) < 1 || len(specs.Tables) > 2 { + return nil, nil, fmt.Errorf("one or two tables must be specified") + } + + return vindex, &vindexInfo{ + name: vindexName, + targetKeyspace: targetKeyspace, + targetTableName: targetTableName, + fromCols: vindexFromCols, + toCol: vindexToCol, + ignoreNulls: ignoreNulls, + }, nil +} + +func (lv *lookupVindex) getTargetAndSourceVSchema(ctx context.Context, sourceKeyspace string, targetKeyspace string) (sourceVSchema *vschemapb.Keyspace, targetVSchema *vschemapb.Keyspace, err error) { + sourceVSchema, err = lv.ts.GetVSchema(ctx, sourceKeyspace) + if err != nil { + return nil, nil, err + } + if sourceVSchema.Vindexes == nil { + sourceVSchema.Vindexes = make(map[string]*vschemapb.Vindex) + } + // If source and target keyspaces are the same, make vschemas point + // to the same object. + if sourceKeyspace == targetKeyspace { + targetVSchema = sourceVSchema + } else { + targetVSchema, err = lv.ts.GetVSchema(ctx, targetKeyspace) + if err != nil { + return nil, nil, err + } + } + if targetVSchema.Vindexes == nil { + targetVSchema.Vindexes = make(map[string]*vschemapb.Vindex) + } + if targetVSchema.Tables == nil { + targetVSchema.Tables = make(map[string]*vschemapb.Table) + } + + return sourceVSchema, targetVSchema, nil +} + +func getSourceTable(specs *vschemapb.Keyspace, targetTableName string, fromCols []string) (sourceTable *vschemapb.Table, sourceTableName string, err error) { + // Loop executes once or twice. + for tableName, table := range specs.Tables { + if len(table.ColumnVindexes) != 1 { + return nil, "", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "exactly one ColumnVindex must be specified for the %s table", + tableName) + } + + if tableName != targetTableName { // This is the source table. + sourceTableName = tableName + sourceTable = table + continue + } + // This is a primary vindex definition for the target table + // which allows you to override the vindex type used. + var vindexCols []string + if len(table.ColumnVindexes[0].Columns) != 0 { + vindexCols = table.ColumnVindexes[0].Columns + } else { + if table.ColumnVindexes[0].Column == "" { + return nil, "", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "at least one column must be specified in ColumnVindexes for the %s table", + tableName) + } + vindexCols = []string{table.ColumnVindexes[0].Column} + } + if !slices.Equal(vindexCols, fromCols) { + return nil, "", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "columns in the lookup table %s primary vindex (%s) don't match the 'from' columns specified (%s)", + tableName, strings.Join(vindexCols, ","), strings.Join(fromCols, ",")) + } + } + return sourceTable, sourceTableName, nil +} + +func (lv *lookupVindex) generateCreateDDLStatement(tableSchema *tabletmanagerdatapb.SchemaDefinition, sourceVindexColumns []string, vInfo *vindexInfo, vindex *vschemapb.Vindex) (string, error) { + lines := strings.Split(tableSchema.TableDefinitions[0].Schema, "\n") + if len(lines) < 3 { + // Should never happen. + return "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "schema looks incorrect: %s, expecting at least four lines", + tableSchema.TableDefinitions[0].Schema) + } + + var modified []string + modified = append(modified, strings.Replace(lines[0], vInfo.sourceTableName, vInfo.targetTableName, 1)) + for i := range sourceVindexColumns { + line, err := generateColDef(lines, sourceVindexColumns[i], vInfo.fromCols[i]) + if err != nil { + return "", err + } + modified = append(modified, line) + } + + if vindex.Params["data_type"] == "" || strings.EqualFold(vindex.Type, "consistent_lookup_unique") || strings.EqualFold(vindex.Type, "consistent_lookup") { + modified = append(modified, fmt.Sprintf(" %s varbinary(128),", sqlescape.EscapeID(vInfo.toCol))) + } else { + modified = append(modified, fmt.Sprintf(" %s %s,", sqlescape.EscapeID(vInfo.toCol), sqlescape.EscapeID(vindex.Params["data_type"]))) + } + + buf := sqlparser.NewTrackedBuffer(nil) + fmt.Fprintf(buf, " PRIMARY KEY (") + prefix := "" + for _, col := range vInfo.fromCols { + fmt.Fprintf(buf, "%s%s", prefix, sqlescape.EscapeID(col)) + prefix = ", " + } + fmt.Fprintf(buf, ")") + + modified = append(modified, buf.String()) + modified = append(modified, ")") + createDDL := strings.Join(modified, "\n") + + // Confirm that our DDL is valid before we create anything. + if _, err := lv.parser.ParseStrictDDL(createDDL); err != nil { + return "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "error: %v; invalid lookup table definition generated: %s", + err, createDDL) + } + + return createDDL, nil +} + +func generateMaterializeQuery(vInfo *vindexInfo, vindex *vschemapb.Vindex, sourceVindexColumns []string) string { + buf := sqlparser.NewTrackedBuffer(nil) + buf.Myprintf("select ") + for i := range vInfo.fromCols { + buf.Myprintf("%s as %s, ", sqlparser.String(sqlparser.NewIdentifierCI(sourceVindexColumns[i])), sqlparser.String(sqlparser.NewIdentifierCI(vInfo.fromCols[i]))) + } + if strings.EqualFold(vInfo.toCol, "keyspace_id") || strings.EqualFold(vindex.Type, "consistent_lookup_unique") || strings.EqualFold(vindex.Type, "consistent_lookup") { + buf.Myprintf("keyspace_id() as %s ", sqlparser.String(sqlparser.NewIdentifierCI(vInfo.toCol))) + } else { + buf.Myprintf("%s as %s ", sqlparser.String(sqlparser.NewIdentifierCI(vInfo.toCol)), sqlparser.String(sqlparser.NewIdentifierCI(vInfo.toCol))) + } + buf.Myprintf("from %s", sqlparser.String(sqlparser.NewIdentifierCS(vInfo.sourceTableName))) + if vInfo.ignoreNulls { + buf.Myprintf(" where ") + lastValIdx := len(vInfo.fromCols) - 1 + for i := range vInfo.fromCols { + buf.Myprintf("%s is not null", sqlparser.String(sqlparser.NewIdentifierCI(vInfo.fromCols[i]))) + if i != lastValIdx { + buf.Myprintf(" and ") + } + } + } + if vindex.Owner != "" { + // Only backfill. + buf.Myprintf(" group by ") + for i := range vInfo.fromCols { + buf.Myprintf("%s, ", sqlparser.String(sqlparser.NewIdentifierCI(vInfo.fromCols[i]))) + } + buf.Myprintf("%s", sqlparser.String(sqlparser.NewIdentifierCI(vInfo.toCol))) + } + return buf.String() +} + +// validateSourceTableAndGetVindexColumns validates input table and vindex consistency, and returns sourceVindexColumns. +func validateSourceTableAndGetVindexColumns(vInfo *vindexInfo, vindex *vschemapb.Vindex, keyspace string) (sourceVindexColumns []string, err error) { + if vInfo.sourceTable == nil || len(vInfo.sourceTable.ColumnVindexes) != 1 { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "No ColumnVindex found for the owner table (%s) in the %s keyspace", + vInfo.sourceTable, keyspace) + } + if vInfo.sourceTable.ColumnVindexes[0].Name != vInfo.name { + return nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "ColumnVindex name (%s) must match vindex name (%s)", + vInfo.sourceTable.ColumnVindexes[0].Name, vInfo.name) + } + if vindex.Owner != "" && vindex.Owner != vInfo.sourceTableName { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex owner (%s) must match table name (%s)", + vindex.Owner, vInfo.sourceTableName) + } + if len(vInfo.sourceTable.ColumnVindexes[0].Columns) != 0 { + sourceVindexColumns = vInfo.sourceTable.ColumnVindexes[0].Columns + } else { + if vInfo.sourceTable.ColumnVindexes[0].Column == "" { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "at least one column must be specified in ColumnVindexes for the %s table", + vInfo.sourceTableName) + } + sourceVindexColumns = []string{vInfo.sourceTable.ColumnVindexes[0].Column} + } + if len(sourceVindexColumns) != len(vInfo.fromCols) { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "length of table columns (%d) differs from length of vindex columns (%d)", + len(sourceVindexColumns), len(vInfo.fromCols)) + } + + return sourceVindexColumns, nil +} + +func validateNonConflictingColumnVindex(sourceVSchemaTable *vschemapb.Table, vInfo *vindexInfo, sourceVindexColumns []string, keyspace string) error { + for _, colVindex := range sourceVSchemaTable.ColumnVindexes { + // For a conflict, the vindex name and column should match. + if colVindex.Name != vInfo.name { + continue + } + var colNames []string + if len(colVindex.Columns) == 0 { + colNames = []string{colVindex.Column} + } else { + colNames = colVindex.Columns + } + // If this is the exact same definition then we can use the existing one. If they + // are not the same then they are two distinct conflicting vindexes and we should + // not proceed. + if !slices.Equal(colNames, sourceVindexColumns) { + return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting ColumnVindex on column(s) %s in table %s already exists in the %s keyspace", + strings.Join(colNames, ","), vInfo.sourceTableName, keyspace) + } + } + return nil +} + +func generateColDef(lines []string, sourceVindexCol, vindexFromCol string) (string, error) { + source := sqlescape.EscapeID(sourceVindexCol) + target := sqlescape.EscapeID(vindexFromCol) + + for _, line := range lines[1:] { + if strings.Contains(line, source) { + line = strings.Replace(line, source, target, 1) + line = strings.Replace(line, " AUTO_INCREMENT", "", 1) + line = strings.Replace(line, " DEFAULT NULL", "", 1) + // Ensure that the column definition ends with a comma as we will + // be appending the TO column and PRIMARY KEY definitions. If the + // souce column here was the last entity defined in the source + // table's definition then it will not already have the comma. + if !strings.HasSuffix(strings.TrimSpace(line), ",") { + line += "," + } + return line, nil + } + } + return "", fmt.Errorf("column %s not found in schema %v", sourceVindexCol, lines) +} diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index a583a101186..e430f740c1f 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -1515,7 +1515,8 @@ func TestCreateLookupVindexCreateDDL(t *testing.T) { setStartingVschema() }() } - outms, _, _, cancelFunc, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, tcase.specs, false) + lv := newLookupVindex(env.ws) + outms, _, _, cancelFunc, err := lv.prepareCreate(ctx, "workflow", ms.SourceKeyspace, tcase.specs, false) if tcase.err != "" { require.Error(t, err) require.Contains(t, err.Error(), tcase.err, "prepareCreateLookup(%s) err: %v, does not contain %v", tcase.description, err, tcase.err) @@ -1763,7 +1764,8 @@ func TestCreateLookupVindexSourceVSchema(t *testing.T) { t.Fatal(err) } - _, got, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false) + lv := newLookupVindex(env.ws) + _, got, _, _, err := lv.prepareCreate(ctx, "workflow", ms.SourceKeyspace, specs, false) require.NoError(t, err) if !proto.Equal(got, tcase.out) { t.Errorf("%s: got:\n%v, want\n%v", tcase.description, got, tcase.out) @@ -1984,32 +1986,35 @@ func TestCreateLookupVindexTargetVSchema(t *testing.T) { err: "type SET is not recommended for a vindex", }} for _, tcase := range testcases { - env.tmc.schema[ms.SourceKeyspace+".t1"] = &tabletmanagerdatapb.SchemaDefinition{ - TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ - Fields: []*querypb.Field{{ - Name: "col2", - Type: tcase.sourceFieldType, + t.Run(tcase.description, func(t *testing.T) { + env.tmc.schema[ms.SourceKeyspace+".t1"] = &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Fields: []*querypb.Field{{ + Name: "col2", + Type: tcase.sourceFieldType, + }}, + Schema: sourceSchema, }}, - Schema: sourceSchema, - }}, - } - specs.Vindexes["v"].Params["table"] = fmt.Sprintf("%s.%s", ms.TargetKeyspace, tcase.targetTable) - if err := env.topoServ.SaveVSchema(ctx, ms.TargetKeyspace, tcase.targetVSchema); err != nil { - t.Fatal(err) - } + } + specs.Vindexes["v"].Params["table"] = fmt.Sprintf("%s.%s", ms.TargetKeyspace, tcase.targetTable) + if err := env.topoServ.SaveVSchema(ctx, ms.TargetKeyspace, tcase.targetVSchema); err != nil { + t.Fatal(err) + } - _, _, got, cancelFunc, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false) - if tcase.err != "" { - if err == nil || !strings.Contains(err.Error(), tcase.err) { - t.Errorf("prepareCreateLookup(%s) err: %v, must contain %v", tcase.description, err, tcase.err) + lv := newLookupVindex(env.ws) + _, _, got, cancelFunc, err := lv.prepareCreate(ctx, "workflow", ms.SourceKeyspace, specs, false) + if tcase.err != "" { + if err == nil || !strings.Contains(err.Error(), tcase.err) { + t.Errorf("prepareCreateLookup(%s) err: %v, must contain %v", tcase.description, err, tcase.err) + } + return } - continue - } - require.NoError(t, err) - // withTable is a vschema that already contains the table and thus - // we don't make any vschema changes and there's nothing to cancel. - require.True(t, (cancelFunc != nil) == (tcase.targetVSchema != withTable)) - utils.MustMatch(t, tcase.out, got, tcase.description) + require.NoError(t, err) + // withTable is a vschema that already contains the table and thus + // we don't make any vschema changes and there's nothing to cancel. + require.True(t, (cancelFunc != nil) == (tcase.targetVSchema != withTable)) + utils.MustMatch(t, tcase.out, got, tcase.description) + }) } } @@ -2119,7 +2124,8 @@ func TestCreateLookupVindexSameKeyspace(t *testing.T) { t.Fatal(err) } - _, got, _, _, err := env.ws.prepareCreateLookup(ctx, "keyspace", ms.TargetKeyspace, specs, false) + lv := newLookupVindex(env.ws) + _, got, _, _, err := lv.prepareCreate(ctx, "keyspace", ms.TargetKeyspace, specs, false) require.NoError(t, err) if !proto.Equal(got, want) { t.Errorf("same keyspace: got:\n%v, want\n%v", got, want) @@ -2245,7 +2251,8 @@ func TestCreateCustomizedVindex(t *testing.T) { t.Fatal(err) } - _, got, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false) + lv := newLookupVindex(env.ws) + _, got, _, _, err := lv.prepareCreate(ctx, "workflow", ms.TargetKeyspace, specs, false) require.NoError(t, err) if !proto.Equal(got, want) { t.Errorf("customize create lookup error same: got:\n%v, want\n%v", got, want) @@ -2363,7 +2370,8 @@ func TestCreateLookupVindexIgnoreNulls(t *testing.T) { t.Fatal(err) } - ms, ks, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false) + lv := newLookupVindex(env.ws) + ms, ks, _, _, err := lv.prepareCreate(ctx, "workflow", ms.TargetKeyspace, specs, false) require.NoError(t, err) if !proto.Equal(wantKs, ks) { t.Errorf("unexpected keyspace value: got:\n%v, want\n%v", ks, wantKs) @@ -2443,11 +2451,12 @@ func TestStopAfterCopyFlag(t *testing.T) { t.Fatal(err) } - ms1, _, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false) + lv := newLookupVindex(env.ws) + ms1, _, _, _, err := lv.prepareCreate(ctx, "workflow", ms.TargetKeyspace, specs, false) require.NoError(t, err) require.Equal(t, ms1.StopAfterCopy, true) - ms2, _, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, true) + ms2, _, _, _, err := lv.prepareCreate(ctx, "workflow", ms.TargetKeyspace, specs, true) require.NoError(t, err) require.Equal(t, ms2.StopAfterCopy, false) } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index baea602b7a4..8123416eb41 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -29,18 +29,15 @@ import ( "time" "github.com/google/uuid" - "golang.org/x/exp/maps" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/encoding/prototext" - "google.golang.org/protobuf/proto" "vitess.io/vitess/go/constants/sidecar" "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/ptr" - "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/trace" "vitess.io/vitess/go/vt/concurrency" @@ -49,7 +46,6 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl/tmutils" - "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" @@ -571,7 +567,9 @@ func (s *Server) LookupVindexCreate(ctx context.Context, req *vtctldatapb.Lookup span.Annotate("cells", req.Cells) span.Annotate("tablet_types", req.TabletTypes) - ms, sourceVSchema, targetVSchema, cancelFunc, err := s.prepareCreateLookup(ctx, req.Workflow, req.Keyspace, req.Vindex, req.ContinueAfterCopyWithOwner) + lv := newLookupVindex(s) + + ms, sourceVSchema, targetVSchema, cancelFunc, err := lv.prepareCreate(ctx, req.Workflow, req.Keyspace, req.Vindex, req.ContinueAfterCopyWithOwner) if err != nil { return nil, err } @@ -715,11 +713,8 @@ func (s *Server) Materialize(ctx context.Context, ms *vtctldatapb.MaterializeSet cells[i] = strings.TrimSpace(cells[i]) } - switch { - case len(ms.ReferenceTables) == 0 && len(ms.TableSettings) == 0: - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "either --table-settings or --reference-tables must be specified") - case len(ms.ReferenceTables) > 0 && len(ms.TableSettings) > 0: - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot specify both --table-settings and --reference-tables") + if err := validateMaterializeSettings(ms); err != nil { + return err } for _, table := range ms.ReferenceTables { @@ -746,6 +741,17 @@ func (s *Server) Materialize(ctx context.Context, ms *vtctldatapb.MaterializeSet return mz.startStreams(ctx) } +func validateMaterializeSettings(ms *vtctldatapb.MaterializeSettings) error { + switch { + case len(ms.ReferenceTables) == 0 && len(ms.TableSettings) == 0: + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "either --table-settings or --reference-tables must be specified") + case len(ms.ReferenceTables) > 0 && len(ms.TableSettings) > 0: + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot specify both --table-settings and --reference-tables") + } + + return nil +} + // MoveTablesCreate is part of the vtctlservicepb.VtctldServer interface. // It passes the embedded TabletRequest object to the given keyspace's // target primary tablets that will be executing the workflow. @@ -3408,413 +3414,6 @@ func fillStringTemplate(tmpl string, vars any) (string, error) { return data.String(), nil } -// prepareCreateLookup performs the preparatory steps for creating a -// Lookup Vindex. -func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace string, specs *vschemapb.Keyspace, continueAfterCopyWithOwner bool) ( - ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *vschemapb.Keyspace, cancelFunc func() error, err error) { - // Important variables are pulled out here. - var ( - vindexName string - vindex *vschemapb.Vindex - targetKeyspace string - targetTableName string - vindexFromCols []string - vindexToCol string - vindexIgnoreNulls bool - - sourceTableName string - // sourceTable is the supplied table info. - sourceTable *vschemapb.Table - // sourceVSchemaTable is the table info present in the vschema. - sourceVSchemaTable *vschemapb.Table - // sourceVindexColumns are computed from the input sourceTable. - sourceVindexColumns []string - - // Target table info. - createDDL string - materializeQuery string - ) - - // Validate input vindex. - if specs == nil { - return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "no vindex provided") - } - if len(specs.Vindexes) != 1 { - return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "only one vindex must be specified") - } - vindexName = maps.Keys(specs.Vindexes)[0] - vindex = maps.Values(specs.Vindexes)[0] - if !strings.Contains(vindex.Type, "lookup") { - return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex %s is not a lookup type", vindex.Type) - } - targetKeyspace, targetTableName, err = s.env.Parser().ParseTable(vindex.Params["table"]) - if err != nil || targetKeyspace == "" { - return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex table name (%s) must be in the form .
", vindex.Params["table"]) - } - vindexFromCols = strings.Split(vindex.Params["from"], ",") - for i, col := range vindexFromCols { - vindexFromCols[i] = strings.TrimSpace(col) - } - if strings.Contains(vindex.Type, "unique") { - if len(vindexFromCols) != 1 { - return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unique vindex 'from' should have only one column") - } - } - vindexToCol = vindex.Params["to"] - // Make the vindex write_only. If one exists already in the vschema, - // it will need to match this vindex exactly, including the write_only setting. - vindex.Params["write_only"] = "true" - // See if we can create the vindex without errors. - if _, err := vindexes.CreateVindex(vindex.Type, vindexName, vindex.Params); err != nil { - return nil, nil, nil, nil, err - } - if ignoreNullsStr, ok := vindex.Params["ignore_nulls"]; ok { - // This mirrors the behavior of vindexes.boolFromMap(). - switch ignoreNullsStr { - case "true": - vindexIgnoreNulls = true - case "false": - vindexIgnoreNulls = false - default: - return nil, nil, nil, nil, - vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "ignore_nulls (%s) value must be 'true' or 'false'", - ignoreNullsStr) - } - } - - // Validate input table. - if len(specs.Tables) < 1 || len(specs.Tables) > 2 { - return nil, nil, nil, nil, fmt.Errorf("one or two tables must be specified") - } - // Loop executes once or twice. - for tableName, table := range specs.Tables { - if len(table.ColumnVindexes) != 1 { - return nil, nil, nil, nil, - vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "exactly one ColumnVindex must be specified for the %s table", - tableName) - } - if tableName != targetTableName { // This is the source table. - sourceTableName = tableName - sourceTable = table - continue - } - // This is a primary vindex definition for the target table - // which allows you to override the vindex type used. - var vindexCols []string - if len(table.ColumnVindexes[0].Columns) != 0 { - vindexCols = table.ColumnVindexes[0].Columns - } else { - if table.ColumnVindexes[0].Column == "" { - return nil, nil, nil, nil, - vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "at least one column must be specified in ColumnVindexes for the %s table", - tableName) - } - vindexCols = []string{table.ColumnVindexes[0].Column} - } - if !slices.Equal(vindexCols, vindexFromCols) { - return nil, nil, nil, nil, - vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "columns in the lookup table %s primary vindex (%s) don't match the 'from' columns specified (%s)", - tableName, strings.Join(vindexCols, ","), strings.Join(vindexFromCols, ",")) - } - } - - // Validate input table and vindex consistency. - if sourceTable == nil || len(sourceTable.ColumnVindexes) != 1 { - return nil, nil, nil, nil, - vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "No ColumnVindex found for the owner table (%s) in the %s keyspace", - sourceTable, keyspace) - } - if sourceTable.ColumnVindexes[0].Name != vindexName { - return nil, nil, nil, nil, - vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "ColumnVindex name (%s) must match vindex name (%s)", - sourceTable.ColumnVindexes[0].Name, vindexName) - } - if vindex.Owner != "" && vindex.Owner != sourceTableName { - return nil, nil, nil, nil, - vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex owner (%s) must match table name (%s)", - vindex.Owner, sourceTableName) - } - if len(sourceTable.ColumnVindexes[0].Columns) != 0 { - sourceVindexColumns = sourceTable.ColumnVindexes[0].Columns - } else { - if sourceTable.ColumnVindexes[0].Column == "" { - return nil, nil, nil, nil, - vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "at least one column must be specified in ColumnVindexes for the %s table", - sourceTableName) - } - sourceVindexColumns = []string{sourceTable.ColumnVindexes[0].Column} - } - if len(sourceVindexColumns) != len(vindexFromCols) { - return nil, nil, nil, nil, - vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "length of table columns (%d) differs from length of vindex columns (%d)", - len(sourceVindexColumns), len(vindexFromCols)) - } - - // Validate against source vschema. - sourceVSchema, err = s.ts.GetVSchema(ctx, keyspace) - if err != nil { - return nil, nil, nil, nil, err - } - if sourceVSchema.Vindexes == nil { - sourceVSchema.Vindexes = make(map[string]*vschemapb.Vindex) - } - // If source and target keyspaces are the same, make vschemas point - // to the same object. - if keyspace == targetKeyspace { - targetVSchema = sourceVSchema - } else { - targetVSchema, err = s.ts.GetVSchema(ctx, targetKeyspace) - if err != nil { - return nil, nil, nil, nil, err - } - } - if targetVSchema.Vindexes == nil { - targetVSchema.Vindexes = make(map[string]*vschemapb.Vindex) - } - if targetVSchema.Tables == nil { - targetVSchema.Tables = make(map[string]*vschemapb.Table) - } - if existing, ok := sourceVSchema.Vindexes[vindexName]; ok { - if !proto.Equal(existing, vindex) { // If the exact same vindex already exists then we can re-use it - return nil, nil, nil, nil, - vterrors.Errorf(vtrpcpb.Code_INTERNAL, "a conflicting vindex named %s already exists in the %s keyspace", - vindexName, keyspace) - } - } - sourceVSchemaTable = sourceVSchema.Tables[sourceTableName] - if sourceVSchemaTable == nil && !schema.IsInternalOperationTableName(sourceTableName) { - return nil, nil, nil, nil, - vterrors.Errorf(vtrpcpb.Code_INTERNAL, "table %s not found in the %s keyspace", sourceTableName, keyspace) - } - for _, colVindex := range sourceVSchemaTable.ColumnVindexes { - // For a conflict, the vindex name and column should match. - if colVindex.Name != vindexName { - continue - } - var colNames []string - if len(colVindex.Columns) == 0 { - colNames = []string{colVindex.Column} - } else { - colNames = colVindex.Columns - } - // If this is the exact same definition then we can use the existing one. If they - // are not the same then they are two distinct conflicting vindexes and we should - // not proceed. - if !slices.Equal(colNames, sourceVindexColumns) { - return nil, nil, nil, nil, - vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting ColumnVindex on column(s) %s in table %s already exists in the %s keyspace", - strings.Join(colNames, ","), sourceTableName, keyspace) - } - } - - // Validate against source schema. - sourceShards, err := s.ts.GetServingShards(ctx, keyspace) - if err != nil { - return nil, nil, nil, nil, err - } - onesource := sourceShards[0] - if onesource.PrimaryAlias == nil { - return nil, nil, nil, nil, - vterrors.Errorf(vtrpcpb.Code_INTERNAL, "source shard %s has no primary", onesource.ShardName()) - } - req := &tabletmanagerdatapb.GetSchemaRequest{Tables: []string{sourceTableName}} - tableSchema, err := schematools.GetSchema(ctx, s.ts, s.tmc, onesource.PrimaryAlias, req) - if err != nil { - return nil, nil, nil, nil, err - } - if len(tableSchema.TableDefinitions) != 1 { - return nil, nil, nil, nil, - vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected number of tables (%d) returned from %s schema", - len(tableSchema.TableDefinitions), keyspace) - } - - // Generate "create table" statement. - lines := strings.Split(tableSchema.TableDefinitions[0].Schema, "\n") - if len(lines) < 3 { - // Should never happen. - return nil, nil, nil, nil, - vterrors.Errorf(vtrpcpb.Code_INTERNAL, "schema looks incorrect: %s, expecting at least four lines", - tableSchema.TableDefinitions[0].Schema) - } - var modified []string - modified = append(modified, strings.Replace(lines[0], sourceTableName, targetTableName, 1)) - for i := range sourceVindexColumns { - line, err := generateColDef(lines, sourceVindexColumns[i], vindexFromCols[i]) - if err != nil { - return nil, nil, nil, nil, err - } - modified = append(modified, line) - } - - if vindex.Params["data_type"] == "" || strings.EqualFold(vindex.Type, "consistent_lookup_unique") || strings.EqualFold(vindex.Type, "consistent_lookup") { - modified = append(modified, fmt.Sprintf(" %s varbinary(128),", sqlescape.EscapeID(vindexToCol))) - } else { - modified = append(modified, fmt.Sprintf(" %s %s,", sqlescape.EscapeID(vindexToCol), sqlescape.EscapeID(vindex.Params["data_type"]))) - } - buf := sqlparser.NewTrackedBuffer(nil) - fmt.Fprintf(buf, " PRIMARY KEY (") - prefix := "" - for _, col := range vindexFromCols { - fmt.Fprintf(buf, "%s%s", prefix, sqlescape.EscapeID(col)) - prefix = ", " - } - fmt.Fprintf(buf, ")") - modified = append(modified, buf.String()) - modified = append(modified, ")") - createDDL = strings.Join(modified, "\n") - // Confirm that our DDL is valid before we create anything. - if _, err = s.env.Parser().ParseStrictDDL(createDDL); err != nil { - return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "error: %v; invalid lookup table definition generated: %s", - err, createDDL) - } - - // Generate vreplication query. - buf = sqlparser.NewTrackedBuffer(nil) - buf.Myprintf("select ") - for i := range vindexFromCols { - buf.Myprintf("%s as %s, ", sqlparser.String(sqlparser.NewIdentifierCI(sourceVindexColumns[i])), sqlparser.String(sqlparser.NewIdentifierCI(vindexFromCols[i]))) - } - if strings.EqualFold(vindexToCol, "keyspace_id") || strings.EqualFold(vindex.Type, "consistent_lookup_unique") || strings.EqualFold(vindex.Type, "consistent_lookup") { - buf.Myprintf("keyspace_id() as %s ", sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol))) - } else { - buf.Myprintf("%s as %s ", sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol)), sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol))) - } - buf.Myprintf("from %s", sqlparser.String(sqlparser.NewIdentifierCS(sourceTableName))) - if vindexIgnoreNulls { - buf.Myprintf(" where ") - lastValIdx := len(vindexFromCols) - 1 - for i := range vindexFromCols { - buf.Myprintf("%s is not null", sqlparser.String(sqlparser.NewIdentifierCI(vindexFromCols[i]))) - if i != lastValIdx { - buf.Myprintf(" and ") - } - } - } - if vindex.Owner != "" { - // Only backfill. - buf.Myprintf(" group by ") - for i := range vindexFromCols { - buf.Myprintf("%s, ", sqlparser.String(sqlparser.NewIdentifierCI(vindexFromCols[i]))) - } - buf.Myprintf("%s", sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol))) - } - materializeQuery = buf.String() - - // Save a copy of the original vschema if we modify it and need to provide - // a cancelFunc. - ogTargetVSchema := targetVSchema.CloneVT() - targetChanged := false - - // Update targetVSchema. - targetTable := specs.Tables[targetTableName] - if targetVSchema.Sharded { - // Choose a primary vindex type for the lookup table based on the source - // definition if one was not explicitly specified. - var targetVindexType string - var targetVindex *vschemapb.Vindex - for _, field := range tableSchema.TableDefinitions[0].Fields { - if sourceVindexColumns[0] == field.Name { - if targetTable != nil && len(targetTable.ColumnVindexes) > 0 { - targetVindexType = targetTable.ColumnVindexes[0].Name - } - if targetVindexType == "" { - targetVindexType, err = vindexes.ChooseVindexForType(field.Type) - if err != nil { - return nil, nil, nil, nil, err - } - } - targetVindex = &vschemapb.Vindex{ - Type: targetVindexType, - } - break - } - } - if targetVindex == nil { - // Unreachable. We validated column names when generating the DDL. - return nil, nil, nil, nil, - vterrors.Errorf(vtrpcpb.Code_INTERNAL, "column %s not found in target schema %s", - sourceVindexColumns[0], tableSchema.TableDefinitions[0].Schema) - } - if existing, ok := targetVSchema.Vindexes[targetVindexType]; ok { - if !proto.Equal(existing, targetVindex) { - return nil, nil, nil, nil, - vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting vindex named %v already exists in the %s keyspace", - targetVindexType, targetKeyspace) - } - } else { - targetVSchema.Vindexes[targetVindexType] = targetVindex - targetChanged = true - } - - targetTable = &vschemapb.Table{ - ColumnVindexes: []*vschemapb.ColumnVindex{{ - Column: vindexFromCols[0], - Name: targetVindexType, - }}, - } - } else { - targetTable = &vschemapb.Table{} - } - if existing, ok := targetVSchema.Tables[targetTableName]; ok { - if !proto.Equal(existing, targetTable) { - return nil, nil, nil, nil, - vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting table named %s already exists in the %s vschema", - targetTableName, targetKeyspace) - } - } else { - targetVSchema.Tables[targetTableName] = targetTable - targetChanged = true - } - - if targetChanged { - cancelFunc = func() error { - // Restore the original target vschema. - return s.ts.SaveVSchema(ctx, targetKeyspace, ogTargetVSchema) - } - } - - ms = &vtctldatapb.MaterializeSettings{ - Workflow: workflow, - MaterializationIntent: vtctldatapb.MaterializationIntent_CREATELOOKUPINDEX, - SourceKeyspace: keyspace, - TargetKeyspace: targetKeyspace, - StopAfterCopy: vindex.Owner != "" && !continueAfterCopyWithOwner, - TableSettings: []*vtctldatapb.TableMaterializeSettings{{ - TargetTable: targetTableName, - SourceExpression: materializeQuery, - CreateDdl: createDDL, - }}, - } - - // Update sourceVSchema - sourceVSchema.Vindexes[vindexName] = vindex - sourceVSchemaTable.ColumnVindexes = append(sourceVSchemaTable.ColumnVindexes, sourceTable.ColumnVindexes[0]) - - return ms, sourceVSchema, targetVSchema, cancelFunc, nil -} - -func generateColDef(lines []string, sourceVindexCol, vindexFromCol string) (string, error) { - source := sqlescape.EscapeID(sourceVindexCol) - target := sqlescape.EscapeID(vindexFromCol) - - for _, line := range lines[1:] { - if strings.Contains(line, source) { - line = strings.Replace(line, source, target, 1) - line = strings.Replace(line, " AUTO_INCREMENT", "", 1) - line = strings.Replace(line, " DEFAULT NULL", "", 1) - // Ensure that the column definition ends with a comma as we will - // be appending the TO column and PRIMARY KEY definitions. If the - // souce column here was the last entity defined in the source - // table's definition then it will not already have the comma. - if !strings.HasSuffix(strings.TrimSpace(line), ",") { - line += "," - } - return line, nil - } - } - return "", fmt.Errorf("column %s not found in schema %v", sourceVindexCol, lines) -} - func (s *Server) MigrateCreate(ctx context.Context, req *vtctldatapb.MigrateCreateRequest) (*vtctldatapb.WorkflowStatusResponse, error) { moveTablesCreateRequest := &vtctldatapb.MoveTablesCreateRequest{ Workflow: req.Workflow,