Skip to content

Commit

Permalink
VReplication: LookupVindex create use existing artifacts when possible (
Browse files Browse the repository at this point in the history
#16097)

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Jun 24, 2024
1 parent 465ffcf commit 13e5d33
Show file tree
Hide file tree
Showing 2 changed files with 208 additions and 28 deletions.
206 changes: 185 additions & 21 deletions go/vt/vtctl/workflow/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,18 +822,21 @@ func TestCreateLookupVindexCreateDDL(t *testing.T) {
},
},
}
if err := env.topoServ.SaveVSchema(ctx, ms.SourceKeyspace, vs); err != nil {
t.Fatal(err)
setStartingVschema := func() {
err := env.topoServ.SaveVSchema(ctx, ms.SourceKeyspace, vs)
require.NoError(t, err)
}
setStartingVschema()

testcases := []struct {
description string
specs *vschemapb.Keyspace
sourceSchema string
preFunc func()
out string
err string
}{{
description: "unique lookup",
description: "unique lookup re-use vschema",
specs: &vschemapb.Keyspace{
Vindexes: map[string]*vschemapb.Vindex{
"v": {
Expand All @@ -855,6 +858,32 @@ func TestCreateLookupVindexCreateDDL(t *testing.T) {
},
},
},
preFunc: func() {
// The vschema entries will already exist and we will re-use them.
err := env.ws.ts.SaveVSchema(ctx, ms.SourceKeyspace, &vschemapb.Keyspace{
Vindexes: map[string]*vschemapb.Vindex{
"v": {
Type: "lookup_unique",
Params: map[string]string{
"table": fmt.Sprintf("%s.lkp", ms.TargetKeyspace),
"from": "c1",
"to": "c2",
"write_only": "true", // It has not been externalized yet
},
Owner: "t1",
},
},
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Name: "v",
Column: "col2",
}},
},
},
})
require.NoError(t, err)
},
sourceSchema: "CREATE TABLE `t1` (\n" +
" `col1` int(11) NOT NULL AUTO_INCREMENT,\n" +
" `col2` int(11) DEFAULT NULL,\n" +
Expand All @@ -866,6 +895,132 @@ func TestCreateLookupVindexCreateDDL(t *testing.T) {
" `c2` varbinary(128),\n" +
" PRIMARY KEY (`c1`)\n" +
")",
}, {
description: "unique lookup with conflicting vindex",
specs: &vschemapb.Keyspace{
Vindexes: map[string]*vschemapb.Vindex{
"v": {
Type: "lookup_unique",
Params: map[string]string{
"table": fmt.Sprintf("%s.lkp", ms.TargetKeyspace),
"from": "c1",
"to": "c2",
},
Owner: "t1",
},
},
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Name: "v",
Column: "col2",
}},
},
},
},
preFunc: func() {
// The existing vindex vschema entry differs from what we want to
// create so we cannot re-use it.
err := env.ws.ts.SaveVSchema(ctx, ms.SourceKeyspace, &vschemapb.Keyspace{
Vindexes: map[string]*vschemapb.Vindex{
"v": {
Type: "lookup_unique",
Params: map[string]string{
"table": fmt.Sprintf("%s.lkp", ms.TargetKeyspace),
"from": "c1",
"to": "c2",
"write_only": "false", // This vindex has been externalized
},
Owner: "t1",
},
},
})
require.NoError(t, err)
},
err: "a conflicting vindex named v already exists in the sourceks keyspace",
sourceSchema: "CREATE TABLE `t1` (\n" +
" `col1` int(11) NOT NULL AUTO_INCREMENT,\n" +
" `col2` int(11) DEFAULT NULL,\n" +
" `col3` int(11) DEFAULT NULL,\n" +
" PRIMARY KEY (`id`)\n" +
") ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1",
}, {
description: "unique lookup with conflicting column vindexes",
specs: &vschemapb.Keyspace{
Vindexes: map[string]*vschemapb.Vindex{
"v": {
Type: "lookup_unique",
Params: map[string]string{
"table": fmt.Sprintf("%s.lkp", ms.TargetKeyspace),
"from": "c1",
"to": "c2",
},
Owner: "t1",
},
},
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Name: "v",
Column: "col2",
}},
},
},
},
preFunc: func() {
// The existing ColumnVindexes vschema entry differs from what we
// want to create so we cannot re-use it.
err := env.ws.ts.SaveVSchema(ctx, ms.SourceKeyspace, &vschemapb.Keyspace{
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Name: "v",
Columns: []string{"col1", "col2"},
}},
},
},
})
require.NoError(t, err)
},
err: "a conflicting ColumnVindex on column(s) col1,col2 in table t1 already exists in the sourceks keyspace",
sourceSchema: "CREATE TABLE `t1` (\n" +
" `col1` int(11) NOT NULL AUTO_INCREMENT,\n" +
" `col2` int(11) DEFAULT NULL,\n" +
" `col3` int(11) DEFAULT NULL,\n" +
" PRIMARY KEY (`id`)\n" +
") ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1",
}, {
description: "unique lookup using last column w/o primary key",
specs: &vschemapb.Keyspace{
Vindexes: map[string]*vschemapb.Vindex{
"v": {
Type: "lookup_unique",
Params: map[string]string{
"table": fmt.Sprintf("%s.lkp", ms.TargetKeyspace),
"from": "c1",
"to": "c2",
},
Owner: "t1",
},
},
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Name: "v",
Column: "col2",
}},
},
},
},
sourceSchema: "CREATE TABLE `t1` (\n" +
" `col1` int(11) NOT NULL AUTO_INCREMENT,\n" +
" `col2` int(11) DEFAULT NULL\n" + // Because it's the last entity in the definition it has no trailing comma
") ENGINE=InnoDB",
out: "CREATE TABLE `lkp` (\n" +
" `c1` int(11),\n" +
" `c2` varbinary(128),\n" +
" PRIMARY KEY (`c1`)\n" +
")",
}, {
description: "unique lookup, also pk",
specs: &vschemapb.Keyspace{
Expand Down Expand Up @@ -992,27 +1147,36 @@ func TestCreateLookupVindexCreateDDL(t *testing.T) {
err: "unexpected number of tables (0) returned from sourceks schema",
}}
for _, tcase := range testcases {
if tcase.sourceSchema != "" {
env.tmc.schema[ms.SourceKeyspace+".t1"] = &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{
Schema: tcase.sourceSchema,
}},
t.Run(tcase.description, func(t *testing.T) {
if tcase.sourceSchema != "" {
env.tmc.schema[ms.SourceKeyspace+".t1"] = &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{
Schema: tcase.sourceSchema,
}},
}
} else {
delete(env.tmc.schema, ms.SourceKeyspace+".t1")
}
} else {
delete(env.tmc.schema, ms.SourceKeyspace+".t1")
}

outms, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, tcase.specs, false)
if tcase.err != "" {
if err == nil || !strings.Contains(err.Error(), tcase.err) {
t.Errorf("prepareCreateLookup(%s) err: %v, must contain %v", tcase.description, err, tcase.err)
if tcase.preFunc != nil {
tcase.preFunc()
defer func() {
// Reset the vschema as it may have been changed in the pre
// function.
setStartingVschema()
}()
}
continue
}
require.NoError(t, err)
want := strings.Split(tcase.out, "\n")
got := strings.Split(outms.TableSettings[0].CreateDdl, "\n")
require.Equal(t, want, got, tcase.description)
outms, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, tcase.specs, false)
if tcase.err != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tcase.err, "prepareCreateLookup(%s) err: %v, does not contain %v", tcase.description, err, tcase.err)
return
}
require.NoError(t, err)
want := strings.Split(tcase.out, "\n")
got := strings.Split(outms.TableSettings[0].CreateDdl, "\n")
require.Equal(t, want, got, tcase.description)
})
}
}

Expand Down
30 changes: 23 additions & 7 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3811,7 +3811,7 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str
targetVSchema.Tables = make(map[string]*vschemapb.Table)
}
if existing, ok := sourceVSchema.Vindexes[vindexName]; ok {
if !proto.Equal(existing, vindex) {
if !proto.Equal(existing, vindex) { // If the exact same vindex already exists then we can re-use it
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "a conflicting vindex named %s already exists in the %s keyspace", vindexName, keyspace)
}
}
Expand All @@ -3824,13 +3824,18 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str
if colVindex.Name != vindexName {
continue
}
colName := colVindex.Column
if len(colVindex.Columns) != 0 {
colName = colVindex.Columns[0]
var colNames []string
if len(colVindex.Columns) == 0 {
colNames = []string{colVindex.Column}
} else {
colNames = colVindex.Columns
}
if colName == sourceVindexColumns[0] {
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting ColumnVindex on column %s in table %s already exists in the %s keyspace",
colName, sourceTableName, keyspace)
// If this is the exact same definition then we can use the existing one. If they
// are not the same then they are two distinct conflicting vindexes and we should
// not proceed.
if !slices.Equal(colNames, sourceVindexColumns) {
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting ColumnVindex on column(s) %s in table %s already exists in the %s keyspace",
strings.Join(colNames, ","), sourceTableName, keyspace)
}
}

Expand Down Expand Up @@ -3884,6 +3889,10 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str
modified = append(modified, buf.String())
modified = append(modified, ")")
createDDL = strings.Join(modified, "\n")
// Confirm that our DDL is valid before we create anything.
if _, err = s.env.Parser().ParseStrictDDL(createDDL); err != nil {
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "error: %v; invalid lookup table definition generated: %s", err, createDDL)
}

// Generate vreplication query.
buf = sqlparser.NewTrackedBuffer(nil)
Expand Down Expand Up @@ -3999,6 +4008,13 @@ func generateColDef(lines []string, sourceVindexCol, vindexFromCol string) (stri
line = strings.Replace(line, source, target, 1)
line = strings.Replace(line, " AUTO_INCREMENT", "", 1)
line = strings.Replace(line, " DEFAULT NULL", "", 1)
// Ensure that the column definition ends with a comma as we will
// be appending the TO column and PRIMARY KEY definitions. If the
// souce column here was the last entity defined in the source
// table's definition then it will not already have the comma.
if !strings.HasSuffix(strings.TrimSpace(line), ",") {
line += ","
}
return line, nil
}
}
Expand Down

0 comments on commit 13e5d33

Please sign in to comment.