diff --git a/go/test/endtoend/vreplication/config_test.go b/go/test/endtoend/vreplication/config_test.go index 95246f5d849..d7113f72056 100644 --- a/go/test/endtoend/vreplication/config_test.go +++ b/go/test/endtoend/vreplication/config_test.go @@ -35,7 +35,7 @@ var ( initialProductSchema = ` create table product(pid int, description varbinary(128), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key(pid)) CHARSET=utf8mb4; create table customer(cid int, name varchar(128) collate utf8mb4_bin, meta json default null, typ enum('individual','soho','enterprise'), sport set('football','cricket','baseball'), - ts timestamp not null default current_timestamp, bits bit(2) default b'11', date1 datetime not null default '0000-00-00 00:00:00', + ts timestamp not null default current_timestamp, bits bit(2) default b'11', date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', dec80 decimal(8,0), primary key(cid,typ)) CHARSET=utf8mb4; create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence'; create table merchant(mname varchar(128), category varchar(128), primary key(mname)) CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; @@ -84,6 +84,33 @@ create table datze (id int, dt1 datetime not null default current_timestamp, dt2 } } ` + + createLookupVindexVSchema = ` +{ + "sharded": true, + "vindexes": { + "customer_name_keyspace_id": { + "type": "consistent_lookup", + "params": { + "table": "product.customer_name_keyspace_id", + "from": "name,cid", + "to": "keyspace_id", + "ignore_nulls": "true" + }, + "owner": "customer" + } + }, + "tables": { + "customer": { + "column_vindexes": [{ + "columns": ["name", "cid"], + "name": "customer_name_keyspace_id" + }] + } + } +} +` + customerSchema = "" customerVSchema = ` { diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go index d45dceda1b5..91be37bab24 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go @@ -81,6 +81,8 @@ const ( GreaterThanEqual // NotEqual is used to filter a comparable column if != specific value NotEqual + // IsNotNull is used to filter a column if it is NULL + IsNotNull ) // Filter contains opcodes for filtering. @@ -226,6 +228,10 @@ func (plan *Plan) filter(values, result []sqltypes.Value, charsets []collations. if !key.KeyRangeContains(filter.KeyRange, ksid) { return false, nil } + case IsNotNull: + if values[filter.ColNum].IsNull() { + return false, nil + } default: match, err := compare(filter.Opcode, values[filter.ColNum], filter.Value, charsets[filter.ColNum]) if err != nil { @@ -552,6 +558,25 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er if err := plan.analyzeInKeyRange(vschema, expr.Exprs); err != nil { return err } + case *sqlparser.IsExpr: // Needed for CreateLookupVindex with ignore_nulls + if expr.Right != sqlparser.IsNotNullOp { + return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr)) + } + qualifiedName, ok := expr.Left.(*sqlparser.ColName) + if !ok { + return fmt.Errorf("unexpected: %v", sqlparser.String(expr)) + } + if !qualifiedName.Qualifier.IsEmpty() { + return fmt.Errorf("unsupported qualifier for column: %v", sqlparser.String(qualifiedName)) + } + colnum, err := findColumn(plan.Table, qualifiedName.Name) + if err != nil { + return err + } + plan.Filters = append(plan.Filters, Filter{ + Opcode: IsNotNull, + ColNum: colnum, + }) default: return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr)) } diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 231ec3f5bae..909a025d545 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -54,6 +54,7 @@ import ( tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) type materializer struct { @@ -440,12 +441,13 @@ func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, sp // Important variables are pulled out here. var ( // lookup vindex info - vindexName string - vindex *vschemapb.Vindex - targetKeyspace string - targetTableName string - vindexFromCols []string - vindexToCol string + vindexName string + vindex *vschemapb.Vindex + targetKeyspace string + targetTableName string + vindexFromCols []string + vindexToCol string + vindexIgnoreNulls bool // source table info sourceTableName string @@ -496,6 +498,18 @@ func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, sp if _, err := vindexes.CreateVindex(vindex.Type, vindexName, vindex.Params); err != nil { return nil, nil, nil, err } + if ignoreNullsStr, ok := vindex.Params["ignore_nulls"]; ok { + // This mirrors the behavior of vindexes.boolFromMap(). + switch ignoreNullsStr { + case "true": + vindexIgnoreNulls = true + case "false": + vindexIgnoreNulls = false + default: + return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "ignore_nulls value must be 'true' or 'false': '%s'", + ignoreNullsStr) + } + } // Validate input table if len(specs.Tables) != 1 { @@ -632,21 +646,31 @@ func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, sp buf = sqlparser.NewTrackedBuffer(nil) buf.Myprintf("select ") for i := range vindexFromCols { - buf.Myprintf("%v as %v, ", sqlparser.NewIdentifierCI(sourceVindexColumns[i]), sqlparser.NewIdentifierCI(vindexFromCols[i])) + buf.Myprintf("%s as %s, ", sqlparser.String(sqlparser.NewIdentifierCI(sourceVindexColumns[i])), sqlparser.String(sqlparser.NewIdentifierCI(vindexFromCols[i]))) } if strings.EqualFold(vindexToCol, "keyspace_id") || strings.EqualFold(vindex.Type, "consistent_lookup_unique") || strings.EqualFold(vindex.Type, "consistent_lookup") { - buf.Myprintf("keyspace_id() as %v ", sqlparser.NewIdentifierCI(vindexToCol)) + buf.Myprintf("keyspace_id() as %s ", sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol))) } else { - buf.Myprintf("%v as %v ", sqlparser.NewIdentifierCI(vindexToCol), sqlparser.NewIdentifierCI(vindexToCol)) + buf.Myprintf("%s as %s ", sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol)), sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol))) + } + buf.Myprintf("from %s", sqlparser.String(sqlparser.NewIdentifierCS(sourceTableName))) + if vindexIgnoreNulls { + buf.Myprintf(" where ") + lastValIdx := len(vindexFromCols) - 1 + for i := range vindexFromCols { + buf.Myprintf("%s is not null", sqlparser.String(sqlparser.NewIdentifierCI(vindexFromCols[i]))) + if i != lastValIdx { + buf.Myprintf(" and ") + } + } } - buf.Myprintf("from %v", sqlparser.NewIdentifierCS(sourceTableName)) if vindex.Owner != "" { // Only backfill buf.Myprintf(" group by ") for i := range vindexFromCols { - buf.Myprintf("%v, ", sqlparser.NewIdentifierCI(vindexFromCols[i])) + buf.Myprintf("%s, ", sqlparser.String(sqlparser.NewIdentifierCI(vindexFromCols[i]))) } - buf.Myprintf("%v", sqlparser.NewIdentifierCI(vindexToCol)) + buf.Myprintf("%s", sqlparser.String(sqlparser.NewIdentifierCI(vindexToCol))) } materializeQuery = buf.String() diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index 2faec7cae6e..e36bd9d699a 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -1274,6 +1274,125 @@ func TestCreateCustomizedVindex(t *testing.T) { } } +func TestCreateLookupVindexIgnoreNulls(t *testing.T) { + ms := &vtctldatapb.MaterializeSettings{ + SourceKeyspace: "ks", + TargetKeyspace: "ks", + } + + env := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) + defer env.close() + + specs := &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v": { + Type: "consistent_lookup", + Params: map[string]string{ + "table": "ks.lkp", + "from": "col2,col1", + "to": "keyspace_id", + "ignore_nulls": "true", + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v", + Columns: []string{"col2", "col1"}, + }}, + }, + }, + } + // Dummy sourceSchema + sourceSchema := "CREATE TABLE `t1` (\n" + + " `col1` int(11) NOT NULL AUTO_INCREMENT,\n" + + " `col2` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`id`)\n" + + ") ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1" + + vschema := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "hash", + Column: "col1", + }}, + }, + }, + } + + wantKs := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + "v": { + Type: "consistent_lookup", + Params: map[string]string{ + "table": "ks.lkp", + "from": "col2,col1", + "to": "keyspace_id", + "write_only": "true", + "ignore_nulls": "true", + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "hash", + Column: "col1", + }, { + Name: "v", + Columns: []string{"col2", "col1"}, + }}, + }, + "lkp": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "col2", + Name: "hash", + }}, + }, + }, + } + wantQuery := "select col2 as col2, col1 as col1, keyspace_id() as keyspace_id from t1 where col2 is not null and col1 is not null group by col2, col1, keyspace_id" + + env.tmc.schema[ms.SourceKeyspace+".t1"] = &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Fields: []*querypb.Field{{ + Name: "col1", + Type: querypb.Type_INT64, + }, { + Name: "col2", + Type: querypb.Type_INT64, + }}, + Schema: sourceSchema, + }}, + } + if err := env.topoServ.SaveVSchema(context.Background(), ms.SourceKeyspace, vschema); err != nil { + t.Fatal(err) + } + + ms, ks, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs, false) + require.NoError(t, err) + if !proto.Equal(wantKs, ks) { + t.Errorf("unexpected keyspace value: got:\n%v, want\n%v", ks, wantKs) + } + require.NotNil(t, ms) + require.GreaterOrEqual(t, len(ms.TableSettings), 1) + require.Equal(t, wantQuery, ms.TableSettings[0].SourceExpression, "unexpected query") +} + func TestStopAfterCopyFlag(t *testing.T) { ms := &vtctldatapb.MaterializeSettings{ SourceKeyspace: "ks",