Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport: Vindexes: Pass context in consistent lookup handleDup (#14653) #136

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions go/vt/vtgate/vindexes/consistent_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,7 @@ func (lu *clCommon) handleDup(ctx context.Context, vcursor VCursor, values []sql
return err
}
// Lock the target row using normal transaction priority.
// TODO: context needs to be passed on.
qr, err = vcursor.ExecuteKeyspaceID(context.Background(), lu.keyspace, existingksid, lu.lockOwnerQuery, bindVars, false /* rollbackOnError */, false /* autocommit */)
qr, err = vcursor.ExecuteKeyspaceID(ctx, lu.keyspace, existingksid, lu.lockOwnerQuery, bindVars, false /* rollbackOnError */, false /* autocommit */)
if err != nil {
return err
}
Expand Down
90 changes: 66 additions & 24 deletions go/vt/vtgate/vindexes/consistent_lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ func TestConsistentLookupMap(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(makeTestResultLookup([]int{2, 2}), nil)
ctx := newTestContext()

got, err := lookup.Map(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)})
got, err := lookup.Map(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)})
require.NoError(t, err)
want := []key.Destination{
key.DestinationKeyspaceIDs([][]byte{
Expand All @@ -94,10 +95,11 @@ func TestConsistentLookupMap(t *testing.T) {
vc.verifyLog(t, []string{
"ExecutePre select fromc1, toc from t where fromc1 in ::fromc1 [{fromc1 }] false",
})
vc.verifyContext(t, ctx)

// Test query fail.
vc.AddResult(nil, fmt.Errorf("execute failed"))
_, err = lookup.Map(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1)})
_, err = lookup.Map(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1)})
wantErr := "lookup.Map: execute failed"
if err == nil || err.Error() != wantErr {
t.Errorf("lookup(query fail) err: %v, want %s", err, wantErr)
Expand Down Expand Up @@ -126,8 +128,9 @@ func TestConsistentLookupUniqueMap(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup_unique", false)
vc := &loggingVCursor{}
vc.AddResult(makeTestResultLookup([]int{0, 1}), nil)
ctx := newTestContext()

got, err := lookup.Map(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)})
got, err := lookup.Map(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)})
require.NoError(t, err)
want := []key.Destination{
key.DestinationNone{},
Expand All @@ -139,10 +142,11 @@ func TestConsistentLookupUniqueMap(t *testing.T) {
vc.verifyLog(t, []string{
"ExecutePre select fromc1, toc from t where fromc1 in ::fromc1 [{fromc1 }] false",
})
vc.verifyContext(t, ctx)

// More than one result is invalid
vc.AddResult(makeTestResultLookup([]int{2}), nil)
_, err = lookup.Map(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1)})
_, err = lookup.Map(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1)})
wanterr := "Lookup.Map: unexpected multiple results from vindex t: INT64(1)"
if err == nil || err.Error() != wanterr {
t.Errorf("lookup(query fail) err: %v, want %s", err, wanterr)
Expand Down Expand Up @@ -171,8 +175,9 @@ func TestConsistentLookupMapAbsent(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(makeTestResultLookup([]int{0, 0}), nil)
ctx := newTestContext()

got, err := lookup.Map(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)})
got, err := lookup.Map(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)})
require.NoError(t, err)
want := []key.Destination{
key.DestinationNone{},
Expand All @@ -184,32 +189,35 @@ func TestConsistentLookupMapAbsent(t *testing.T) {
vc.verifyLog(t, []string{
"ExecutePre select fromc1, toc from t where fromc1 in ::fromc1 [{fromc1 }] false",
})
vc.verifyContext(t, ctx)
}

func TestConsistentLookupVerify(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(makeTestResult(1), nil)
vc.AddResult(makeTestResult(1), nil)
ctx := newTestContext()

_, err := lookup.Verify(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}, [][]byte{[]byte("test1"), []byte("test2")})
_, err := lookup.Verify(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}, [][]byte{[]byte("test1"), []byte("test2")})
require.NoError(t, err)
vc.verifyLog(t, []string{
"ExecutePre select fromc1 from t where fromc1 = :fromc1 and toc = :toc [{fromc1 1} {toc test1}] false",
"ExecutePre select fromc1 from t where fromc1 = :fromc1 and toc = :toc [{fromc1 2} {toc test2}] false",
})
vc.verifyContext(t, ctx)

// Test query fail.
vc.AddResult(nil, fmt.Errorf("execute failed"))
_, err = lookup.Verify(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1)}, [][]byte{[]byte("\x16k@\xb4J\xbaK\xd6")})
_, err = lookup.Verify(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1)}, [][]byte{[]byte("\x16k@\xb4J\xbaK\xd6")})
want := "lookup.Verify: execute failed"
if err == nil || err.Error() != want {
t.Errorf("lookup(query fail) err: %v, want %s", err, want)
}

// Test write_only.
lookup = createConsistentLookup(t, "consistent_lookup", true)
got, err := lookup.Verify(context.Background(), nil, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}, [][]byte{[]byte(""), []byte("")})
got, err := lookup.Verify(ctx, nil, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}, [][]byte{[]byte(""), []byte("")})
require.NoError(t, err)
wantBools := []bool{true, true}
if !reflect.DeepEqual(got, wantBools) {
Expand All @@ -221,8 +229,9 @@ func TestConsistentLookupCreateSimple(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(&sqltypes.Result{}, nil)
ctx := newTestContext()

if err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{
if err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{
sqltypes.NewInt64(1),
sqltypes.NewInt64(2),
}, {
Expand All @@ -234,6 +243,7 @@ func TestConsistentLookupCreateSimple(t *testing.T) {
vc.verifyLog(t, []string{
"ExecutePre insert into t(fromc1, fromc2, toc) values(:fromc1_0, :fromc2_0, :toc_0), (:fromc1_1, :fromc2_1, :toc_1) [{fromc1_0 1} {fromc1_1 3} {fromc2_0 2} {fromc2_1 4} {toc_0 test1} {toc_1 test2}] true",
})
vc.verifyContext(t, ctx)
}

func TestConsistentLookupCreateThenRecreate(t *testing.T) {
Expand All @@ -242,8 +252,9 @@ func TestConsistentLookupCreateThenRecreate(t *testing.T) {
vc.AddResult(nil, mysql.NewSQLError(mysql.ERDupEntry, mysql.SSConstraintViolation, "Duplicate entry"))
vc.AddResult(&sqltypes.Result{}, nil)
vc.AddResult(&sqltypes.Result{}, nil)
ctx := newTestContext()

if err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{
if err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{
sqltypes.NewInt64(1),
sqltypes.NewInt64(2),
}}, [][]byte{[]byte("test1")}, false); err != nil {
Expand All @@ -254,6 +265,7 @@ func TestConsistentLookupCreateThenRecreate(t *testing.T) {
"ExecutePre select toc from t where fromc1 = :fromc1 and fromc2 = :fromc2 for update [{fromc1 1} {fromc2 2} {toc test1}] false",
"ExecutePre insert into t(fromc1, fromc2, toc) values(:fromc1, :fromc2, :toc) [{fromc1 1} {fromc2 2} {toc test1}] true",
})
vc.verifyContext(t, ctx)
}

func TestConsistentLookupCreateThenUpdate(t *testing.T) {
Expand All @@ -263,8 +275,9 @@ func TestConsistentLookupCreateThenUpdate(t *testing.T) {
vc.AddResult(makeTestResult(1), nil)
vc.AddResult(&sqltypes.Result{}, nil)
vc.AddResult(&sqltypes.Result{}, nil)
ctx := newTestContext()

if err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{
if err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{
sqltypes.NewInt64(1),
sqltypes.NewInt64(2),
}}, [][]byte{[]byte("test1")}, false); err != nil {
Expand All @@ -276,6 +289,7 @@ func TestConsistentLookupCreateThenUpdate(t *testing.T) {
"ExecuteKeyspaceID select fc1 from `dot.t1` where fc1 = :fromc1 and fc2 = :fromc2 lock in share mode [{fromc1 1} {fromc2 2} {toc test1}] false",
"ExecutePre update t set toc=:toc where fromc1 = :fromc1 and fromc2 = :fromc2 [{fromc1 1} {fromc2 2} {toc test1}] true",
})
vc.verifyContext(t, ctx)
}

func TestConsistentLookupCreateThenSkipUpdate(t *testing.T) {
Expand All @@ -285,8 +299,9 @@ func TestConsistentLookupCreateThenSkipUpdate(t *testing.T) {
vc.AddResult(makeTestResult(1), nil)
vc.AddResult(&sqltypes.Result{}, nil)
vc.AddResult(&sqltypes.Result{}, nil)
ctx := newTestContext()

if err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{
if err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{
sqltypes.NewInt64(1),
sqltypes.NewInt64(2),
}}, [][]byte{[]byte("1")}, false); err != nil {
Expand All @@ -297,6 +312,7 @@ func TestConsistentLookupCreateThenSkipUpdate(t *testing.T) {
"ExecutePre select toc from t where fromc1 = :fromc1 and fromc2 = :fromc2 for update [{fromc1 1} {fromc2 2} {toc 1}] false",
"ExecuteKeyspaceID select fc1 from `dot.t1` where fc1 = :fromc1 and fc2 = :fromc2 lock in share mode [{fromc1 1} {fromc2 2} {toc 1}] false",
})
vc.verifyContext(t, ctx)
}

func TestConsistentLookupCreateThenDupkey(t *testing.T) {
Expand All @@ -306,8 +322,9 @@ func TestConsistentLookupCreateThenDupkey(t *testing.T) {
vc.AddResult(makeTestResult(1), nil)
vc.AddResult(makeTestResult(1), nil)
vc.AddResult(&sqltypes.Result{}, nil)
ctx := newTestContext()

err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{
err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{
sqltypes.NewInt64(1),
sqltypes.NewInt64(2),
}}, [][]byte{[]byte("test1")}, false)
Expand All @@ -318,14 +335,16 @@ func TestConsistentLookupCreateThenDupkey(t *testing.T) {
"ExecutePre select toc from t where fromc1 = :fromc1 and fromc2 = :fromc2 for update [{fromc1 1} {fromc2 2} {toc test1}] false",
"ExecuteKeyspaceID select fc1 from `dot.t1` where fc1 = :fromc1 and fc2 = :fromc2 lock in share mode [{fromc1 1} {fromc2 2} {toc test1}] false",
})
vc.verifyContext(t, ctx)
}

func TestConsistentLookupCreateNonDupError(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(nil, errors.New("general error"))
ctx := newTestContext()

err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{
err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{
sqltypes.NewInt64(1),
sqltypes.NewInt64(2),
}}, [][]byte{[]byte("test1")}, false)
Expand All @@ -336,15 +355,17 @@ func TestConsistentLookupCreateNonDupError(t *testing.T) {
vc.verifyLog(t, []string{
"ExecutePre insert into t(fromc1, fromc2, toc) values(:fromc1_0, :fromc2_0, :toc_0) [{fromc1_0 1} {fromc2_0 2} {toc_0 test1}] true",
})
vc.verifyContext(t, ctx)
}

func TestConsistentLookupCreateThenBadRows(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(nil, vterrors.New(vtrpcpb.Code_ALREADY_EXISTS, "(errno 1062) (sqlstate 23000) Duplicate entry"))
vc.AddResult(makeTestResult(2), nil)
ctx := newTestContext()

err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{
err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{
sqltypes.NewInt64(1),
sqltypes.NewInt64(2),
}}, [][]byte{[]byte("test1")}, false)
Expand All @@ -356,14 +377,16 @@ func TestConsistentLookupCreateThenBadRows(t *testing.T) {
"ExecutePre insert into t(fromc1, fromc2, toc) values(:fromc1_0, :fromc2_0, :toc_0) [{fromc1_0 1} {fromc2_0 2} {toc_0 test1}] true",
"ExecutePre select toc from t where fromc1 = :fromc1 and fromc2 = :fromc2 for update [{fromc1 1} {fromc2 2} {toc test1}] false",
})
vc.verifyContext(t, ctx)
}

func TestConsistentLookupDelete(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(&sqltypes.Result{}, nil)
ctx := newTestContext()

if err := lookup.(Lookup).Delete(context.Background(), vc, [][]sqltypes.Value{{
if err := lookup.(Lookup).Delete(ctx, vc, [][]sqltypes.Value{{
sqltypes.NewInt64(1),
sqltypes.NewInt64(2),
}}, []byte("test")); err != nil {
Expand All @@ -372,15 +395,17 @@ func TestConsistentLookupDelete(t *testing.T) {
vc.verifyLog(t, []string{
"ExecutePost delete from t where fromc1 = :fromc1 and fromc2 = :fromc2 and toc = :toc [{fromc1 1} {fromc2 2} {toc test}] true",
})
vc.verifyContext(t, ctx)
}

func TestConsistentLookupUpdate(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(&sqltypes.Result{}, nil)
vc.AddResult(&sqltypes.Result{}, nil)
ctx := newTestContext()

if err := lookup.(Lookup).Update(context.Background(), vc, []sqltypes.Value{
if err := lookup.(Lookup).Update(ctx, vc, []sqltypes.Value{
sqltypes.NewInt64(1),
sqltypes.NewInt64(2),
}, []byte("test"), []sqltypes.Value{
Expand All @@ -393,6 +418,7 @@ func TestConsistentLookupUpdate(t *testing.T) {
"ExecutePost delete from t where fromc1 = :fromc1 and fromc2 = :fromc2 and toc = :toc [{fromc1 1} {fromc2 2} {toc test}] true",
"ExecutePre insert into t(fromc1, fromc2, toc) values(:fromc1_0, :fromc2_0, :toc_0) [{fromc1_0 3} {fromc2_0 4} {toc_0 test}] true",
})
vc.verifyContext(t, ctx)
}

func TestConsistentLookupNoUpdate(t *testing.T) {
Expand Down Expand Up @@ -469,13 +495,19 @@ func createConsistentLookup(t *testing.T, name string, writeOnly bool) SingleCol
return l.(SingleColumn)
}

func newTestContext() context.Context {
type testContextKey string // keep static checks from complaining about built-in types as context keys
return context.WithValue(context.Background(), (testContextKey)("test"), "foo")
}

var _ VCursor = (*loggingVCursor)(nil)

type loggingVCursor struct {
results []*sqltypes.Result
errors []error
index int
log []string
results []*sqltypes.Result
errors []error
index int
log []string
contexts []context.Context
}

func (vc *loggingVCursor) LookupRowLockShardSession() vtgatepb.CommitOrder {
Expand Down Expand Up @@ -508,14 +540,14 @@ func (vc *loggingVCursor) Execute(ctx context.Context, method string, query stri
case vtgatepb.CommitOrder_AUTOCOMMIT:
name = "ExecuteAutocommit"
}
return vc.execute(name, query, bindvars, rollbackOnError)
return vc.execute(ctx, name, query, bindvars, rollbackOnError)
}

func (vc *loggingVCursor) ExecuteKeyspaceID(ctx context.Context, keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, rollbackOnError, autocommit bool) (*sqltypes.Result, error) {
return vc.execute("ExecuteKeyspaceID", query, bindVars, rollbackOnError)
return vc.execute(ctx, "ExecuteKeyspaceID", query, bindVars, rollbackOnError)
}

func (vc *loggingVCursor) execute(method string, query string, bindvars map[string]*querypb.BindVariable, rollbackOnError bool) (*sqltypes.Result, error) {
func (vc *loggingVCursor) execute(ctx context.Context, method string, query string, bindvars map[string]*querypb.BindVariable, rollbackOnError bool) (*sqltypes.Result, error) {
if vc.index >= len(vc.results) {
return nil, fmt.Errorf("ran out of results to return: %s", query)
}
Expand All @@ -525,6 +557,7 @@ func (vc *loggingVCursor) execute(method string, query string, bindvars map[stri
}
sort.Slice(bvl, func(i, j int) bool { return bvl[i].Name < bvl[j].Name })
vc.log = append(vc.log, fmt.Sprintf("%s %s %v %v", method, query, bvl, rollbackOnError))
vc.contexts = append(vc.contexts, ctx)
idx := vc.index
vc.index++
if vc.errors[idx] != nil {
Expand All @@ -548,6 +581,15 @@ func (vc *loggingVCursor) verifyLog(t *testing.T, want []string) {
}
}

func (vc *loggingVCursor) verifyContext(t *testing.T, want context.Context) {
t.Helper()
for i, got := range vc.contexts {
if got != want {
t.Errorf("context(%d):\ngot: %v\nwant: %v", i, got, want)
}
}
}

// create lookup result with one to one mapping
func makeTestResult(numRows int) *sqltypes.Result {
result := &sqltypes.Result{
Expand Down
Loading