Skip to content

Commit

Permalink
fix: insert on duplicate key update missing BindVars (#14728)
Browse files Browse the repository at this point in the history
Co-authored-by: wlx5575 <[email protected]>
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
2 people authored and harshit-gangal committed Dec 12, 2023
1 parent 870675b commit 7396b80
Showing 8 changed files with 318 additions and 100 deletions.
10 changes: 10 additions & 0 deletions go/test/endtoend/vtgate/misc_test.go
Original file line number Diff line number Diff line change
@@ -27,6 +27,16 @@ import (
"github.com/stretchr/testify/require"
)

func TestInsertOnDuplicateKey(t *testing.T) {
conn, closer := start(t)
defer closer()

utils.Exec(t, conn, "insert into t11(id, sharding_key, col1, col2, col3) values(1, 2, 'a', 1, 2)")
utils.Exec(t, conn, "insert into t11(id, sharding_key, col1, col2, col3) values(1, 2, 'a', 1, 2) on duplicate key update id=10;")
utils.AssertMatches(t, conn, "select id, sharding_key from t11 where id=10", "[[INT64(10) INT64(2)]]")

}

func TestInsertNeg(t *testing.T) {
conn, closer := start(t)
defer closer()
10 changes: 10 additions & 0 deletions go/test/endtoend/vtgate/schema.sql
Original file line number Diff line number Diff line change
@@ -155,3 +155,13 @@ create table t10_id_to_keyspace_id_idx
keyspace_id varbinary(10),
primary key (id)
) Engine = InnoDB;

create table t11
(
id bigint,
sharding_key bigint,
col1 varchar(50),
col2 int,
col3 int,
primary key (id)
) Engine = InnoDB;
23 changes: 18 additions & 5 deletions go/test/endtoend/vtgate/vschema.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@

{
"sharded": true,
"vindexes": {
"unicode_loose_xxhash" : {
"unicode_loose_xxhash": {
"type": "unicode_loose_xxhash"
},
"unicode_loose_md5" : {
"unicode_loose_md5": {
"type": "unicode_loose_md5"
},
"hash": {
@@ -159,7 +158,10 @@
"name": "hash"
},
{
"columns": ["id2", "id1"],
"columns": [
"id2",
"id1"
],
"name": "t4_id2_vdx"
}
]
@@ -179,7 +181,10 @@
"name": "hash"
},
{
"columns": ["id2", "id1"],
"columns": [
"id2",
"id1"
],
"name": "t6_id2_vdx"
}
]
@@ -301,6 +306,14 @@
"name": "hash"
}
]
},
"t11": {
"column_vindexes": [
{
"column": "sharding_key",
"name": "hash"
}
]
}
}
}
9 changes: 7 additions & 2 deletions go/vt/vtgate/engine/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 21 additions & 16 deletions go/vt/vtgate/engine/insert.go
Original file line number Diff line number Diff line change
@@ -77,7 +77,7 @@ type (
// Prefix, Mid and Suffix are for sharded insert plans.
Prefix string
Mid sqlparser.Values
Suffix string
Suffix sqlparser.OnDup

// Option to override the standard behavior and allow a multi-shard insert
// to use single round trip autocommit.
@@ -134,7 +134,7 @@ func NewInsert(
table *vindexes.Table,
prefix string,
mid sqlparser.Values,
suffix string,
suffix sqlparser.OnDup,
) *Insert {
ins := &Insert{
Opcode: opcode,
@@ -345,7 +345,7 @@ func (ins *Insert) getInsertQueryForUnsharded(result *sqltypes.Result, bindVars
}
mids = append(mids, row)
}
return ins.Prefix + sqlparser.String(mids) + ins.Suffix
return ins.Prefix + sqlparser.String(mids) + sqlparser.String(ins.Suffix)
}

func (ins *Insert) execInsertSharded(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
@@ -467,7 +467,7 @@ func (ins *Insert) getInsertSelectQueries(
mids = append(mids, row)
}
}
rewritten := ins.Prefix + sqlparser.String(mids) + ins.Suffix
rewritten := ins.Prefix + sqlparser.String(mids) + sqlparser.String(ins.Suffix)
queries[i] = &querypb.BoundQuery{
Sql: rewritten,
BindVariables: bvs,
@@ -764,25 +764,30 @@ func (ins *Insert) getInsertShardedRoute(
for _, indexValue := range indexesPerRss[i] {
index, _ := strconv.ParseInt(string(indexValue.Value), 0, 64)
if keyspaceIDs[index] != nil {
walkFunc := func(node sqlparser.SQLNode) (kontinue bool, err error) {
if arg, ok := node.(*sqlparser.Argument); ok {
bv, exists := bindVars[arg.Name]
if !exists {
return false, vterrors.VT03026(arg.Name)
}
shardBindVars[arg.Name] = bv
}
return true, nil
}
mids = append(mids, sqlparser.String(ins.Mid[index]))
for _, expr := range ins.Mid[index] {
err = sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) {
if arg, ok := node.(*sqlparser.Argument); ok {
bv, exists := bindVars[arg.Name]
if !exists {
return false, vterrors.VT03026(arg.Name)
}
shardBindVars[arg.Name] = bv
}
return true, nil
}, expr, nil)
err = sqlparser.Walk(walkFunc, expr, nil)
if err != nil {
return nil, nil, err
}
}
err = sqlparser.Walk(walkFunc, ins.Suffix, nil)
if err != nil {
return nil, nil, err
}
}
}
rewritten := ins.Prefix + strings.Join(mids, ",") + ins.Suffix
rewritten := ins.Prefix + strings.Join(mids, ",") + sqlparser.String(ins.Suffix)
queries[i] = &querypb.BoundQuery{
Sql: rewritten,
BindVariables: shardBindVars,
@@ -1005,7 +1010,7 @@ func (ins *Insert) description() PrimitiveDescription {
mids := slice.Map(ins.Mid, func(from sqlparser.ValTuple) string {
return sqlparser.String(from)
})
shardQuery := fmt.Sprintf("%s%s%s", ins.Prefix, strings.Join(mids, ", "), ins.Suffix)
shardQuery := fmt.Sprintf("%s%s%s", ins.Prefix, strings.Join(mids, ", "), sqlparser.String(ins.Suffix))
if shardQuery != ins.Query {
other["ShardedQuery"] = shardQuery
}
Loading

0 comments on commit 7396b80

Please sign in to comment.