Skip to content

Commit

Permalink
[BACK-PORT] Properly support ignore_nulls in CreateLookupVindex (vite…
Browse files Browse the repository at this point in the history
…ssio#13913) (#122)

* Properly support ignore_nulls in CreateLookupVindex (vitessio#13913)

Signed-off-by: Matt Lord <[email protected]>

* remove new vreplication tests that dont work with v15

Signed-off-by: Austen Lacy <[email protected]>

---------

Signed-off-by: Matt Lord <[email protected]>
Signed-off-by: Austen Lacy <[email protected]>
Co-authored-by: Matt Lord <[email protected]>
Co-authored-by: Austen Lacy <[email protected]>
  • Loading branch information
3 people authored Sep 19, 2023
1 parent 0869e39 commit ebde310
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 13 deletions.
29 changes: 28 additions & 1 deletion go/test/endtoend/vreplication/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var (
initialProductSchema = `
create table product(pid int, description varbinary(128), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key(pid)) CHARSET=utf8mb4;
create table customer(cid int, name varchar(128) collate utf8mb4_bin, meta json default null, typ enum('individual','soho','enterprise'), sport set('football','cricket','baseball'),
ts timestamp not null default current_timestamp, bits bit(2) default b'11', date1 datetime not null default '0000-00-00 00:00:00',
ts timestamp not null default current_timestamp, bits bit(2) default b'11', date1 datetime not null default '0000-00-00 00:00:00',
date2 datetime not null default '2021-00-01 00:00:00', dec80 decimal(8,0), primary key(cid,typ)) CHARSET=utf8mb4;
create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
create table merchant(mname varchar(128), category varchar(128), primary key(mname)) CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
Expand Down Expand Up @@ -84,6 +84,33 @@ create table datze (id int, dt1 datetime not null default current_timestamp, dt2
}
}
`

createLookupVindexVSchema = `
{
"sharded": true,
"vindexes": {
"customer_name_keyspace_id": {
"type": "consistent_lookup",
"params": {
"table": "product.customer_name_keyspace_id",
"from": "name,cid",
"to": "keyspace_id",
"ignore_nulls": "true"
},
"owner": "customer"
}
},
"tables": {
"customer": {
"column_vindexes": [{
"columns": ["name", "cid"],
"name": "customer_name_keyspace_id"
}]
}
}
}
`

customerSchema = ""
customerVSchema = `
{
Expand Down
25 changes: 25 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ const (
GreaterThanEqual
// NotEqual is used to filter a comparable column if != specific value
NotEqual
// IsNotNull is used to filter a column if it is NULL
IsNotNull
)

// Filter contains opcodes for filtering.
Expand Down Expand Up @@ -226,6 +228,10 @@ func (plan *Plan) filter(values, result []sqltypes.Value, charsets []collations.
if !key.KeyRangeContains(filter.KeyRange, ksid) {
return false, nil
}
case IsNotNull:
if values[filter.ColNum].IsNull() {
return false, nil
}
default:
match, err := compare(filter.Opcode, values[filter.ColNum], filter.Value, charsets[filter.ColNum])
if err != nil {
Expand Down Expand Up @@ -552,6 +558,25 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
if err := plan.analyzeInKeyRange(vschema, expr.Exprs); err != nil {
return err
}
case *sqlparser.IsExpr: // Needed for CreateLookupVindex with ignore_nulls
if expr.Right != sqlparser.IsNotNullOp {
return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr))
}
qualifiedName, ok := expr.Left.(*sqlparser.ColName)
if !ok {
return fmt.Errorf("unexpected: %v", sqlparser.String(expr))
}
if !qualifiedName.Qualifier.IsEmpty() {
return fmt.Errorf("unsupported qualifier for column: %v", sqlparser.String(qualifiedName))
}
colnum, err := findColumn(plan.Table, qualifiedName.Name)
if err != nil {
return err
}
plan.Filters = append(plan.Filters, Filter{
Opcode: IsNotNull,
ColNum: colnum,
})
default:
return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr))
}
Expand Down
48 changes: 36 additions & 12 deletions go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

type materializer struct {
Expand Down Expand Up @@ -440,12 +441,13 @@ func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, sp
// Important variables are pulled out here.
var (
// lookup vindex info
vindexName string
vindex *vschemapb.Vindex
targetKeyspace string
targetTableName string
vindexFromCols []string
vindexToCol string
vindexName string
vindex *vschemapb.Vindex
targetKeyspace string
targetTableName string
vindexFromCols []string
vindexToCol string
vindexIgnoreNulls bool

// source table info
sourceTableName string
Expand Down Expand Up @@ -496,6 +498,18 @@ func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, sp
if _, err := vindexes.CreateVindex(vindex.Type, vindexName, vindex.Params); err != nil {
return nil, nil, nil, err
}
if ignoreNullsStr, ok := vindex.Params["ignore_nulls"]; ok {
// This mirrors the behavior of vindexes.boolFromMap().
switch ignoreNullsStr {
case "true":
vindexIgnoreNulls = true
case "false":
vindexIgnoreNulls = false
default:
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "ignore_nulls value must be 'true' or 'false': '%s'",
ignoreNullsStr)
}
}

// Validate input table
if len(specs.Tables) != 1 {
Expand Down Expand Up @@ -632,21 +646,31 @@ func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, sp
buf = sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("select ")
for i := range vindexFromCols {
buf.Myprintf("%v as %v, ", sqlparser.NewIdentifierCI(sourceVindexColumns[i]), sqlparser.NewIdentifierCI(vindexFromCols[i]))
buf.Myprintf("%s as %s, ", sqlparser.String(sqlparser.NewIdentifierCI(sourceVindexColumns[i])), sqlparser.String(sqlparser.NewIdentifierCI(vindexFromCols[i])))
}
if strings.EqualFold(vindexToCol, "keyspace_id") || strings.EqualFold(vindex.Type, "consistent_lookup_unique") || strings.EqualFold(vindex.Type, "consistent_lookup") {
buf.Myprintf("keyspace_id() as %v ", sqlparser.NewIdentifierCI(vindexToCol))
buf.Myprintf("keyspace_id() as %s ", sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol)))
} else {
buf.Myprintf("%v as %v ", sqlparser.NewIdentifierCI(vindexToCol), sqlparser.NewIdentifierCI(vindexToCol))
buf.Myprintf("%s as %s ", sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol)), sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol)))
}
buf.Myprintf("from %s", sqlparser.String(sqlparser.NewIdentifierCS(sourceTableName)))
if vindexIgnoreNulls {
buf.Myprintf(" where ")
lastValIdx := len(vindexFromCols) - 1
for i := range vindexFromCols {
buf.Myprintf("%s is not null", sqlparser.String(sqlparser.NewIdentifierCI(vindexFromCols[i])))
if i != lastValIdx {
buf.Myprintf(" and ")
}
}
}
buf.Myprintf("from %v", sqlparser.NewIdentifierCS(sourceTableName))
if vindex.Owner != "" {
// Only backfill
buf.Myprintf(" group by ")
for i := range vindexFromCols {
buf.Myprintf("%v, ", sqlparser.NewIdentifierCI(vindexFromCols[i]))
buf.Myprintf("%s, ", sqlparser.String(sqlparser.NewIdentifierCI(vindexFromCols[i])))
}
buf.Myprintf("%v", sqlparser.NewIdentifierCI(vindexToCol))
buf.Myprintf("%s", sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol)))
}
materializeQuery = buf.String()

Expand Down
119 changes: 119 additions & 0 deletions go/vt/wrangler/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,6 +1274,125 @@ func TestCreateCustomizedVindex(t *testing.T) {
}
}

func TestCreateLookupVindexIgnoreNulls(t *testing.T) {
ms := &vtctldatapb.MaterializeSettings{
SourceKeyspace: "ks",
TargetKeyspace: "ks",
}

env := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"})
defer env.close()

specs := &vschemapb.Keyspace{
Vindexes: map[string]*vschemapb.Vindex{
"v": {
Type: "consistent_lookup",
Params: map[string]string{
"table": "ks.lkp",
"from": "col2,col1",
"to": "keyspace_id",
"ignore_nulls": "true",
},
Owner: "t1",
},
},
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Name: "v",
Columns: []string{"col2", "col1"},
}},
},
},
}
// Dummy sourceSchema
sourceSchema := "CREATE TABLE `t1` (\n" +
" `col1` int(11) NOT NULL AUTO_INCREMENT,\n" +
" `col2` int(11) DEFAULT NULL,\n" +
" PRIMARY KEY (`id`)\n" +
") ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1"

vschema := &vschemapb.Keyspace{
Sharded: true,
Vindexes: map[string]*vschemapb.Vindex{
"hash": {
Type: "hash",
},
},
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Name: "hash",
Column: "col1",
}},
},
},
}

wantKs := &vschemapb.Keyspace{
Sharded: true,
Vindexes: map[string]*vschemapb.Vindex{
"hash": {
Type: "hash",
},
"v": {
Type: "consistent_lookup",
Params: map[string]string{
"table": "ks.lkp",
"from": "col2,col1",
"to": "keyspace_id",
"write_only": "true",
"ignore_nulls": "true",
},
Owner: "t1",
},
},
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Name: "hash",
Column: "col1",
}, {
Name: "v",
Columns: []string{"col2", "col1"},
}},
},
"lkp": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "col2",
Name: "hash",
}},
},
},
}
wantQuery := "select col2 as col2, col1 as col1, keyspace_id() as keyspace_id from t1 where col2 is not null and col1 is not null group by col2, col1, keyspace_id"

env.tmc.schema[ms.SourceKeyspace+".t1"] = &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{
Fields: []*querypb.Field{{
Name: "col1",
Type: querypb.Type_INT64,
}, {
Name: "col2",
Type: querypb.Type_INT64,
}},
Schema: sourceSchema,
}},
}
if err := env.topoServ.SaveVSchema(context.Background(), ms.SourceKeyspace, vschema); err != nil {
t.Fatal(err)
}

ms, ks, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs, false)
require.NoError(t, err)
if !proto.Equal(wantKs, ks) {
t.Errorf("unexpected keyspace value: got:\n%v, want\n%v", ks, wantKs)
}
require.NotNil(t, ms)
require.GreaterOrEqual(t, len(ms.TableSettings), 1)
require.Equal(t, wantQuery, ms.TableSettings[0].SourceExpression, "unexpected query")
}

func TestStopAfterCopyFlag(t *testing.T) {
ms := &vtctldatapb.MaterializeSettings{
SourceKeyspace: "ks",
Expand Down

0 comments on commit ebde310

Please sign in to comment.