diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index 9a43ea5ed7e..d9fe7b9eb1f 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -822,18 +822,21 @@ func TestCreateLookupVindexCreateDDL(t *testing.T) { }, }, } - if err := env.topoServ.SaveVSchema(ctx, ms.SourceKeyspace, vs); err != nil { - t.Fatal(err) + setStartingVschema := func() { + err := env.topoServ.SaveVSchema(ctx, ms.SourceKeyspace, vs) + require.NoError(t, err) } + setStartingVschema() testcases := []struct { description string specs *vschemapb.Keyspace sourceSchema string + preFunc func() out string err string }{{ - description: "unique lookup", + description: "unique lookup re-use vschema", specs: &vschemapb.Keyspace{ Vindexes: map[string]*vschemapb.Vindex{ "v": { @@ -855,6 +858,32 @@ func TestCreateLookupVindexCreateDDL(t *testing.T) { }, }, }, + preFunc: func() { + // The vschema entries will already exist and we will re-use them. + err := env.ws.ts.SaveVSchema(ctx, ms.SourceKeyspace, &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v": { + Type: "lookup_unique", + Params: map[string]string{ + "table": fmt.Sprintf("%s.lkp", ms.TargetKeyspace), + "from": "c1", + "to": "c2", + "write_only": "true", // It has not been externalized yet + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v", + Column: "col2", + }}, + }, + }, + }) + require.NoError(t, err) + }, sourceSchema: "CREATE TABLE `t1` (\n" + " `col1` int(11) NOT NULL AUTO_INCREMENT,\n" + " `col2` int(11) DEFAULT NULL,\n" + @@ -866,6 +895,132 @@ func TestCreateLookupVindexCreateDDL(t *testing.T) { " `c2` varbinary(128),\n" + " PRIMARY KEY (`c1`)\n" + ")", + }, { + description: "unique lookup with conflicting vindex", + specs: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v": { + Type: "lookup_unique", + Params: map[string]string{ + "table": fmt.Sprintf("%s.lkp", ms.TargetKeyspace), + "from": "c1", + "to": "c2", + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v", + Column: "col2", + }}, + }, + }, + }, + preFunc: func() { + // The existing vindex vschema entry differs from what we want to + // create so we cannot re-use it. + err := env.ws.ts.SaveVSchema(ctx, ms.SourceKeyspace, &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v": { + Type: "lookup_unique", + Params: map[string]string{ + "table": fmt.Sprintf("%s.lkp", ms.TargetKeyspace), + "from": "c1", + "to": "c2", + "write_only": "false", // This vindex has been externalized + }, + Owner: "t1", + }, + }, + }) + require.NoError(t, err) + }, + err: "a conflicting vindex named v already exists in the sourceks keyspace", + sourceSchema: "CREATE TABLE `t1` (\n" + + " `col1` int(11) NOT NULL AUTO_INCREMENT,\n" + + " `col2` int(11) DEFAULT NULL,\n" + + " `col3` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`id`)\n" + + ") ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1", + }, { + description: "unique lookup with conflicting column vindexes", + specs: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v": { + Type: "lookup_unique", + Params: map[string]string{ + "table": fmt.Sprintf("%s.lkp", ms.TargetKeyspace), + "from": "c1", + "to": "c2", + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v", + Column: "col2", + }}, + }, + }, + }, + preFunc: func() { + // The existing ColumnVindexes vschema entry differs from what we + // want to create so we cannot re-use it. + err := env.ws.ts.SaveVSchema(ctx, ms.SourceKeyspace, &vschemapb.Keyspace{ + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v", + Columns: []string{"col1", "col2"}, + }}, + }, + }, + }) + require.NoError(t, err) + }, + err: "a conflicting ColumnVindex on column(s) col1,col2 in table t1 already exists in the sourceks keyspace", + sourceSchema: "CREATE TABLE `t1` (\n" + + " `col1` int(11) NOT NULL AUTO_INCREMENT,\n" + + " `col2` int(11) DEFAULT NULL,\n" + + " `col3` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`id`)\n" + + ") ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1", + }, { + description: "unique lookup using last column w/o primary key", + specs: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v": { + Type: "lookup_unique", + Params: map[string]string{ + "table": fmt.Sprintf("%s.lkp", ms.TargetKeyspace), + "from": "c1", + "to": "c2", + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v", + Column: "col2", + }}, + }, + }, + }, + sourceSchema: "CREATE TABLE `t1` (\n" + + " `col1` int(11) NOT NULL AUTO_INCREMENT,\n" + + " `col2` int(11) DEFAULT NULL\n" + // Because it's the last entity in the definition it has no trailing comma + ") ENGINE=InnoDB", + out: "CREATE TABLE `lkp` (\n" + + " `c1` int(11),\n" + + " `c2` varbinary(128),\n" + + " PRIMARY KEY (`c1`)\n" + + ")", }, { description: "unique lookup, also pk", specs: &vschemapb.Keyspace{ @@ -992,27 +1147,36 @@ func TestCreateLookupVindexCreateDDL(t *testing.T) { err: "unexpected number of tables (0) returned from sourceks schema", }} for _, tcase := range testcases { - if tcase.sourceSchema != "" { - env.tmc.schema[ms.SourceKeyspace+".t1"] = &tabletmanagerdatapb.SchemaDefinition{ - TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ - Schema: tcase.sourceSchema, - }}, + t.Run(tcase.description, func(t *testing.T) { + if tcase.sourceSchema != "" { + env.tmc.schema[ms.SourceKeyspace+".t1"] = &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Schema: tcase.sourceSchema, + }}, + } + } else { + delete(env.tmc.schema, ms.SourceKeyspace+".t1") } - } else { - delete(env.tmc.schema, ms.SourceKeyspace+".t1") - } - outms, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, tcase.specs, false) - if tcase.err != "" { - if err == nil || !strings.Contains(err.Error(), tcase.err) { - t.Errorf("prepareCreateLookup(%s) err: %v, must contain %v", tcase.description, err, tcase.err) + if tcase.preFunc != nil { + tcase.preFunc() + defer func() { + // Reset the vschema as it may have been changed in the pre + // function. + setStartingVschema() + }() } - continue - } - require.NoError(t, err) - want := strings.Split(tcase.out, "\n") - got := strings.Split(outms.TableSettings[0].CreateDdl, "\n") - require.Equal(t, want, got, tcase.description) + outms, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, tcase.specs, false) + if tcase.err != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tcase.err, "prepareCreateLookup(%s) err: %v, does not contain %v", tcase.description, err, tcase.err) + return + } + require.NoError(t, err) + want := strings.Split(tcase.out, "\n") + got := strings.Split(outms.TableSettings[0].CreateDdl, "\n") + require.Equal(t, want, got, tcase.description) + }) } } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 587caff3c8c..31c27601f6b 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3811,7 +3811,7 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str targetVSchema.Tables = make(map[string]*vschemapb.Table) } if existing, ok := sourceVSchema.Vindexes[vindexName]; ok { - if !proto.Equal(existing, vindex) { + if !proto.Equal(existing, vindex) { // If the exact same vindex already exists then we can re-use it return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "a conflicting vindex named %s already exists in the %s keyspace", vindexName, keyspace) } } @@ -3824,13 +3824,18 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str if colVindex.Name != vindexName { continue } - colName := colVindex.Column - if len(colVindex.Columns) != 0 { - colName = colVindex.Columns[0] + var colNames []string + if len(colVindex.Columns) == 0 { + colNames = []string{colVindex.Column} + } else { + colNames = colVindex.Columns } - if colName == sourceVindexColumns[0] { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting ColumnVindex on column %s in table %s already exists in the %s keyspace", - colName, sourceTableName, keyspace) + // If this is the exact same definition then we can use the existing one. If they + // are not the same then they are two distinct conflicting vindexes and we should + // not proceed. + if !slices.Equal(colNames, sourceVindexColumns) { + return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting ColumnVindex on column(s) %s in table %s already exists in the %s keyspace", + strings.Join(colNames, ","), sourceTableName, keyspace) } } @@ -3884,6 +3889,10 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str modified = append(modified, buf.String()) modified = append(modified, ")") createDDL = strings.Join(modified, "\n") + // Confirm that our DDL is valid before we create anything. + if _, err = s.env.Parser().ParseStrictDDL(createDDL); err != nil { + return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "error: %v; invalid lookup table definition generated: %s", err, createDDL) + } // Generate vreplication query. buf = sqlparser.NewTrackedBuffer(nil) @@ -3999,6 +4008,13 @@ func generateColDef(lines []string, sourceVindexCol, vindexFromCol string) (stri line = strings.Replace(line, source, target, 1) line = strings.Replace(line, " AUTO_INCREMENT", "", 1) line = strings.Replace(line, " DEFAULT NULL", "", 1) + // Ensure that the column definition ends with a comma as we will + // be appending the TO column and PRIMARY KEY definitions. If the + // souce column here was the last entity defined in the source + // table's definition then it will not already have the comma. + if !strings.HasSuffix(strings.TrimSpace(line), ",") { + line += "," + } return line, nil } }