diff --git a/go/test/endtoend/vreplication/reference_test.go b/go/test/endtoend/vreplication/reference_test.go new file mode 100644 index 00000000000..8ff77de8708 --- /dev/null +++ b/go/test/endtoend/vreplication/reference_test.go @@ -0,0 +1,166 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vreplication + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +const ( + uksSchema = ` +create table product (id int, mfg_id int, cat_id int, name varchar(128), primary key(id)); +create table cat (id int, name varchar(128), primary key(id)); +create table mfg (id int, name varchar(128), primary key(id)); +` + sksSchema = ` +create table product (id int, mfg_id int, cat_id int, name varchar(128), primary key(id)); +create table cat (id int, name varchar(128), primary key(id)); +create table mfg2 (id int, name varchar(128), primary key(id)); +` + uksVSchema = ` +{ + "sharded": false, + "tables": { + "product": {}, + "cat": {}, + "mfg": {} + } +}` + + sksVSchema = ` +{ + "sharded": true, + "tables": { + "product": { + "column_vindexes": [ + { + "column": "id", + "name": "hash" + } + ] + }, + "cat": { + "type": "reference", + "source": "uks.cat" + }, + "mfg2": { + "type": "reference", + "source": "uks.mfg" + } + }, + "vindexes": { + "hash": { + "type": "hash" + } + } +}` + materializeCatSpec = ` +{ + "workflow": "wfCat", + "source_keyspace": "uks", + "target_keyspace": "sks", + "table_settings": [ {"target_table": "cat", "source_expression": "select id, name from cat" }] +}` + materializeMfgSpec = ` +{ + "workflow": "wfMfg", + "source_keyspace": "uks", + "target_keyspace": "sks", + "table_settings": [ {"target_table": "mfg2", "source_expression": "select id, name from mfg" }] +}` + initializeTables = ` +use uks; +insert into product values (1, 1, 1, 'p1'); +insert into product values (2, 2, 2, 'p2'); +insert into product values (3, 3, 3, 'p3'); +insert into cat values (1, 'c1'); +insert into cat values (2, 'c2'); +insert into cat values (3, 'c3'); +insert into mfg values (1, 'm1'); +insert into mfg values (2, 'm2'); +insert into mfg values (3, 'm3'); +insert into mfg values (4, 'm4'); +` +) + +func TestReferenceTableMaterializationAndRouting(t *testing.T) { + var err error + defaultCellName := "zone1" + vc = NewVitessCluster(t, nil) + defer vc.TearDown() + defaultReplicas = 0 // because of CI resource constraints we can only run this test with primary tablets + defer func() { defaultReplicas = 1 }() + uks := "uks" + sks := "sks" + + defaultCell := vc.Cells[defaultCellName] + vc.AddKeyspace(t, []*Cell{defaultCell}, uks, "0", uksVSchema, uksSchema, defaultReplicas, defaultRdonly, 100, nil) + vc.AddKeyspace(t, []*Cell{defaultCell}, sks, "-80,80-", sksVSchema, sksSchema, defaultReplicas, defaultRdonly, 200, nil) + vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + + verifyClusterHealth(t, vc) + _, _, err = vtgateConn.ExecuteFetchMulti(initializeTables, 0, false) + require.NoError(t, err) + vtgateConn.Close() + + materialize(t, materializeCatSpec, false) + materialize(t, materializeMfgSpec, false) + + tabDash80 := vc.getPrimaryTablet(t, sks, "-80") + tab80Dash := vc.getPrimaryTablet(t, sks, "80-") + catchup(t, tabDash80, "wfCat", "Materialize Category") + catchup(t, tab80Dash, "wfCat", "Materialize Category") + catchup(t, tabDash80, "wfMfg", "Materialize Manufacturer") + catchup(t, tab80Dash, "wfMfg", "Materialize Manufacturer") + + vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + waitForRowCount(t, vtgateConn, sks, "cat", 3) + waitForRowCount(t, vtgateConn, sks, "mfg2", 4) + + execRefQuery(t, "insert into mfg values (5, 'm5')") + execRefQuery(t, "insert into mfg2 values (6, 'm6')") + execRefQuery(t, "insert into uks.mfg values (7, 'm7')") + execRefQuery(t, "insert into sks.mfg2 values (8, 'm8')") + waitForRowCount(t, vtgateConn, uks, "mfg", 8) + + execRefQuery(t, "update mfg set name = concat(name, '-updated') where id = 1") + execRefQuery(t, "update mfg2 set name = concat(name, '-updated') where id = 2") + execRefQuery(t, "update uks.mfg set name = concat(name, '-updated') where id = 3") + execRefQuery(t, "update sks.mfg2 set name = concat(name, '-updated') where id = 4") + + waitForRowCount(t, vtgateConn, uks, "mfg", 8) + qr := execVtgateQuery(t, vtgateConn, "uks", "select count(*) from uks.mfg where name like '%updated%'") + require.NotNil(t, qr) + require.Equal(t, "4", qr.Rows[0][0].ToString()) + + execRefQuery(t, "delete from mfg where id = 5") + execRefQuery(t, "delete from mfg2 where id = 6") + execRefQuery(t, "delete from uks.mfg where id = 7") + execRefQuery(t, "delete from sks.mfg2 where id = 8") + waitForRowCount(t, vtgateConn, uks, "mfg", 4) + +} + +func execRefQuery(t *testing.T, query string) { + vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + _, err := vtgateConn.ExecuteFetch(query, 0, false) + require.NoError(t, err) +} diff --git a/go/vt/vtgate/planbuilder/operators/insert.go b/go/vt/vtgate/planbuilder/operators/insert.go index 9d0048d9322..7c6e242ae9c 100644 --- a/go/vt/vtgate/planbuilder/operators/insert.go +++ b/go/vt/vtgate/planbuilder/operators/insert.go @@ -107,7 +107,7 @@ func (i *Insert) Statement() sqlparser.Statement { func createOperatorFromInsert(ctx *plancontext.PlanningContext, ins *sqlparser.Insert) Operator { tableInfo, qt := createQueryTableForDML(ctx, ins.Table, nil) - vTbl, routing := buildVindexTableForDML(ctx, tableInfo, qt, "insert") + vTbl, routing := buildVindexTableForDML(ctx, tableInfo, qt, ins, "insert") deleteBeforeInsert := false if ins.Action == sqlparser.ReplaceAct && diff --git a/go/vt/vtgate/planbuilder/operators/route_planning.go b/go/vt/vtgate/planbuilder/operators/route_planning.go index e3c0ee5f9eb..c58340291ff 100644 --- a/go/vt/vtgate/planbuilder/operators/route_planning.go +++ b/go/vt/vtgate/planbuilder/operators/route_planning.go @@ -80,15 +80,25 @@ func buildVindexTableForDML( ctx *plancontext.PlanningContext, tableInfo semantics.TableInfo, table *QueryTable, + stmt sqlparser.Statement, dmlType string, ) (*vindexes.Table, Routing) { vindexTable := tableInfo.GetVindexTable() - if vindexTable.Source != nil { + if tableInfo.GetVindexTable().Type == vindexes.TypeReference && vindexTable.Source != nil { sourceTable, _, _, _, _, err := ctx.VSchema.FindTableOrVindex(vindexTable.Source.TableName) if err != nil { panic(err) } vindexTable = sourceTable + refTbl := sqlparser.NewAliasedTableExpr(vindexTable.GetTableName(), "") + switch stmt := stmt.(type) { + case *sqlparser.Update: + stmt.TableExprs[0] = refTbl + case *sqlparser.Insert: + stmt.Table = refTbl + default: + panic("unsupported DML type in buildVindexTableForDML") + } } if !vindexTable.Keyspace.Sharded { diff --git a/go/vt/vtgate/planbuilder/operators/update.go b/go/vt/vtgate/planbuilder/operators/update.go index 727db448d30..01a2878f16d 100644 --- a/go/vt/vtgate/planbuilder/operators/update.go +++ b/go/vt/vtgate/planbuilder/operators/update.go @@ -102,8 +102,16 @@ func (u *Update) ShortDescription() string { func createOperatorFromUpdate(ctx *plancontext.PlanningContext, updStmt *sqlparser.Update) Operator { tableInfo, qt := createQueryTableForDML(ctx, updStmt.TableExprs[0], updStmt.Where) - vindexTable, routing := buildVindexTableForDML(ctx, tableInfo, qt, "update") + vindexTable, routing := buildVindexTableForDML(ctx, tableInfo, qt, updStmt, "update") + if tableInfo.GetVindexTable().GetTableName() != vindexTable.GetTableName() { + name := vindexTable.Name.String() + refTable := sqlparser.NewIdentifierCS(name) + if !qt.Alias.As.IsEmpty() { + qt.Alias.As = refTable + } + qt.Table.Name = refTable + } updClone := sqlparser.CloneRefOfUpdate(updStmt) updOp := createUpdateOperator(ctx, updStmt, vindexTable, qt, routing) diff --git a/go/vt/vtgate/planbuilder/testdata/reference_cases.json b/go/vt/vtgate/planbuilder/testdata/reference_cases.json index 91aac12d9e9..a89fa103923 100644 --- a/go/vt/vtgate/planbuilder/testdata/reference_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/reference_cases.json @@ -428,5 +428,323 @@ "main.global_ref" ] } + }, + { + "comment": "delete from reference table with another name - query send to source table", + "query": "delete from user.ref_with_source where col = 1", + "plan": { + "QueryType": "DELETE", + "Original": "delete from user.ref_with_source where col = 1", + "Instructions": { + "OperatorType": "Delete", + "Variant": "Unsharded", + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "delete from source_of_ref where col = 1", + "Table": "source_of_ref" + }, + "TablesUsed": [ + "main.source_of_ref" + ] + } + }, + { + "comment": "update from reference table with another name - query send to source table", + "query": "update user.ref_with_source set x = 4 where col = 1", + "plan": { + "QueryType": "UPDATE", + "Original": "update user.ref_with_source set x = 4 where col = 1", + "Instructions": { + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update source_of_ref set x = 4 where col = 1", + "Table": "source_of_ref" + }, + "TablesUsed": [ + "main.source_of_ref" + ] + } + }, + { + "comment": "insert from reference table with another name - query send to source table", + "query": "insert into user.ref_with_source(x) values(4)", + "plan": { + "QueryType": "INSERT", + "Original": "insert into user.ref_with_source(x) values(4)", + "Instructions": { + "OperatorType": "Insert", + "Variant": "Unsharded", + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "insert into source_of_ref(x) values (4)", + "TableName": "source_of_ref" + }, + "TablesUsed": [ + "main.source_of_ref" + ] + } + }, + { + "comment": "delete from reference table - query send to source table", + "query": "delete from source_of_ref where col = 1", + "plan": { + "QueryType": "DELETE", + "Original": "delete from source_of_ref where col = 1", + "Instructions": { + "OperatorType": "Delete", + "Variant": "Unsharded", + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "delete from source_of_ref where col = 1", + "Table": "source_of_ref" + }, + "TablesUsed": [ + "main.source_of_ref" + ] + } + }, + { + "comment": "update from reference table - query send to source table", + "query": "update source_of_ref set x = 4 where col = 1", + "plan": { + "QueryType": "UPDATE", + "Original": "update source_of_ref set x = 4 where col = 1", + "Instructions": { + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update source_of_ref set x = 4 where col = 1", + "Table": "source_of_ref" + }, + "TablesUsed": [ + "main.source_of_ref" + ] + } + }, + { + "comment": "insert from reference table - query send to source table", + "query": "insert into source_of_ref(x) values(4)", + "plan": { + "QueryType": "INSERT", + "Original": "insert into source_of_ref(x) values(4)", + "Instructions": { + "OperatorType": "Insert", + "Variant": "Unsharded", + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "insert into source_of_ref(x) values (4)", + "TableName": "source_of_ref" + }, + "TablesUsed": [ + "main.source_of_ref" + ] + } + }, + { + "comment": "delete from reference table qualified with unsharded - query send to source table", + "query": "delete from main.source_of_ref where col = 1", + "plan": { + "QueryType": "DELETE", + "Original": "delete from main.source_of_ref where col = 1", + "Instructions": { + "OperatorType": "Delete", + "Variant": "Unsharded", + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "delete from source_of_ref where col = 1", + "Table": "source_of_ref" + }, + "TablesUsed": [ + "main.source_of_ref" + ] + } + }, + { + "comment": "update from reference table qualified with unsharded - query send to source table", + "query": "update main.source_of_ref set x = 4 where col = 1", + "plan": { + "QueryType": "UPDATE", + "Original": "update main.source_of_ref set x = 4 where col = 1", + "Instructions": { + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update source_of_ref set x = 4 where col = 1", + "Table": "source_of_ref" + }, + "TablesUsed": [ + "main.source_of_ref" + ] + } + }, + { + "comment": "insert from reference table qualified with unsharded - query send to source table", + "query": "insert into main.source_of_ref(x) values(4)", + "plan": { + "QueryType": "INSERT", + "Original": "insert into main.source_of_ref(x) values(4)", + "Instructions": { + "OperatorType": "Insert", + "Variant": "Unsharded", + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "insert into source_of_ref(x) values (4)", + "TableName": "source_of_ref" + }, + "TablesUsed": [ + "main.source_of_ref" + ] + } + }, + { + "comment": "delete from reference table with another name - query send to source table", + "query": "delete from user.ref_with_source where col = 1", + "plan": { + "QueryType": "DELETE", + "Original": "delete from user.ref_with_source where col = 1", + "Instructions": { + "OperatorType": "Delete", + "Variant": "Unsharded", + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "delete from source_of_ref where col = 1", + "Table": "source_of_ref" + }, + "TablesUsed": [ + "main.source_of_ref" + ] + } + }, + { + "comment": "update from reference table with another name - query send to source table", + "query": "update user.ref_with_source set x = 4 where col = 1", + "plan": { + "QueryType": "UPDATE", + "Original": "update user.ref_with_source set x = 4 where col = 1", + "Instructions": { + "OperatorType": "Update", + "Variant": "Unsharded", + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "update source_of_ref set x = 4 where col = 1", + "Table": "source_of_ref" + }, + "TablesUsed": [ + "main.source_of_ref" + ] + } + }, + { + "comment": "insert from reference table with another name - query send to source table", + "query": "insert into user.ref_with_source(x) values(4)", + "plan": { + "QueryType": "INSERT", + "Original": "insert into user.ref_with_source(x) values(4)", + "Instructions": { + "OperatorType": "Insert", + "Variant": "Unsharded", + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "TargetTabletType": "PRIMARY", + "Query": "insert into source_of_ref(x) values (4)", + "TableName": "source_of_ref" + }, + "TablesUsed": [ + "main.source_of_ref" + ] + } + }, + { + "comment": "select with join to reference table in sharded keyspace: should route shard-scoped", + "query": "select * from user.ref_with_source ref, `user`.`user` u where ref.id = u.ref_id and u.id = 2", + "plan": { + "QueryType": "SELECT", + "Original": "select * from user.ref_with_source ref, `user`.`user` u where ref.id = u.ref_id and u.id = 2", + "Instructions": { + "FieldQuery": "select * from ref_with_source as ref, `user` as u where 1 != 1", + "OperatorType": "Route", + "Variant": "EqualUnique", + "Vindex": "user_index", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "Query": "select * from ref_with_source as ref, `user` as u where u.id = 2 and ref.id = u.ref_id", + "Table": "`user`, ref_with_source", + "Values": [ + "2" + ] + }, + "TablesUsed": [ + "user.ref_with_source", + "user.user" + ] + } + }, + { + "comment": "select with join to reference table in unsharded keyspace: should route shard-scoped", + "query": "select * from source_of_ref ref, `user`.`user` u where ref.id = u.ref_id and u.id = 2", + "plan": { + "QueryType": "SELECT", + "Original": "select * from source_of_ref ref, `user`.`user` u where ref.id = u.ref_id and u.id = 2", + "Instructions": { + "FieldQuery": "select * from ref_with_source as ref, `user` as u where 1 != 1", + "OperatorType": "Route", + "Variant": "EqualUnique", + "Vindex": "user_index", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "Query": "select * from ref_with_source as ref, `user` as u where u.id = 2 and ref.id = u.ref_id", + "Table": "`user`, ref_with_source", + "Values": [ + "2" + ] + }, + "TablesUsed": [ + "user.ref_with_source", + "user.user" + ] + } } ]