Skip to content

Commit

Permalink
fix: flush tables with read lock to run only with reserved connection (
Browse files Browse the repository at this point in the history
…#14720)

Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal authored Dec 8, 2023
1 parent 08b2c8b commit 27f1ac2
Show file tree
Hide file tree
Showing 15 changed files with 293 additions and 65 deletions.
50 changes: 50 additions & 0 deletions go/test/endtoend/vtgate/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ limitations under the License.
package vtgate

import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/test/endtoend/utils"

Expand Down Expand Up @@ -336,6 +340,52 @@ func TestFlush(t *testing.T) {
utils.Exec(t, conn, "flush local tables t1, t2")
}

// TestFlushLock tests that ftwrl and unlock tables should unblock other session connections to execute the query.
func TestFlushLock(t *testing.T) {
conn, closer := start(t)
defer closer()

// replica: fail it
utils.Exec(t, conn, "use @replica")
_, err := utils.ExecAllowError(t, conn, "flush tables ks.t1, ks.t2 with read lock")
require.ErrorContains(t, err, "VT09012: FLUSH statement with REPLICA tablet not allowed")

// primary: should work
utils.Exec(t, conn, "use @primary")
utils.Exec(t, conn, "flush tables ks.t1, ks.t2 with read lock")

var cnt atomic.Int32
go func() {
ctx := context.Background()
conn2, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn2.Close()

cnt.Add(1)
utils.Exec(t, conn2, "select * from ks.t1 for update")
cnt.Add(1)
}()
for cnt.Load() == 0 {
}
// added sleep to let the query execute inside the go routine, which should be blocked.
time.Sleep(1 * time.Second)
require.EqualValues(t, 1, cnt.Load())

// unlock it
utils.Exec(t, conn, "unlock tables")

// now wait for go routine to complete.
timeout := time.After(3 * time.Second)
for cnt.Load() != 2 {
select {
case <-timeout:
t.Fatalf("test timeout waiting for select query to complete")
default:

}
}
}

func TestShowVariables(t *testing.T) {
conn, closer := start(t)
defer closer()
Expand Down
64 changes: 32 additions & 32 deletions go/vt/vtgate/engine/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type Send struct {
// MultishardAutocommit specifies that a multishard transaction query can autocommit
MultishardAutocommit bool

ReservedConnectionNeeded bool

noInputs
}

Expand Down Expand Up @@ -88,19 +90,12 @@ func (s *Send) GetTableName() string {
func (s *Send) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
ctx, cancelFunc := addQueryTimeout(ctx, vcursor, 0)
defer cancelFunc()
rss, _, err := vcursor.ResolveDestinations(ctx, s.Keyspace.Name, nil, []key.Destination{s.TargetDestination})

rss, err := s.checkAndReturnShards(ctx, vcursor)
if err != nil {
return nil, err
}

if !s.Keyspace.Sharded && len(rss) != 1 {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Keyspace does not have exactly one shard: %v", rss)
}

if s.SingleShardOnly && len(rss) != 1 {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Unexpected error, DestinationKeyspaceID mapping to multiple shards: %s, got: %v", s.Query, s.TargetDestination)
}

queries := make([]*querypb.BoundQuery, len(rss))
for i, rs := range rss {
bv := bindVars
Expand All @@ -123,6 +118,26 @@ func (s *Send) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[str
return result, nil
}

func (s *Send) checkAndReturnShards(ctx context.Context, vcursor VCursor) ([]*srvtopo.ResolvedShard, error) {
rss, _, err := vcursor.ResolveDestinations(ctx, s.Keyspace.Name, nil, []key.Destination{s.TargetDestination})
if err != nil {
return nil, err
}

if !s.Keyspace.Sharded && len(rss) != 1 {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Keyspace does not have exactly one shard: %v", rss)
}

if s.SingleShardOnly && len(rss) != 1 {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Unexpected error, DestinationKeyspaceID mapping to multiple shards: %s, got: %v", s.Query, s.TargetDestination)
}

if s.ReservedConnectionNeeded {
vcursor.Session().NeedsReservedConn()
}
return rss, nil
}

func (s *Send) canAutoCommit(vcursor VCursor, rss []*srvtopo.ResolvedShard) bool {
if s.IsDML {
return (len(rss) == 1 || s.MultishardAutocommit) && vcursor.AutocommitApproval()
Expand All @@ -140,19 +155,11 @@ func copyBindVars(in map[string]*querypb.BindVariable) map[string]*querypb.BindV

// TryStreamExecute implements Primitive interface
func (s *Send) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
rss, _, err := vcursor.ResolveDestinations(ctx, s.Keyspace.Name, nil, []key.Destination{s.TargetDestination})
rss, err := s.checkAndReturnShards(ctx, vcursor)
if err != nil {
return err
}

if !s.Keyspace.Sharded && len(rss) != 1 {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Keyspace does not have exactly one shard: %v", rss)
}

if s.SingleShardOnly && len(rss) != 1 {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Unexpected error, DestinationKeyspaceID mapping to multiple shards: %s, got: %v", s.Query, s.TargetDestination)
}

multiBindVars := make([]map[string]*querypb.BindVariable, len(rss))
for i, rs := range rss {
bv := bindVars
Expand All @@ -178,20 +185,13 @@ func (s *Send) GetFields(ctx context.Context, vcursor VCursor, bindVars map[stri

func (s *Send) description() PrimitiveDescription {
other := map[string]any{
"Query": s.Query,
"Table": s.GetTableName(),
}
if s.IsDML {
other["IsDML"] = true
}
if s.SingleShardOnly {
other["SingleShardOnly"] = true
}
if s.ShardNameNeeded {
other["ShardNameNeeded"] = true
}
if s.MultishardAutocommit {
other["MultishardAutocommit"] = true
"Query": s.Query,
"Table": s.GetTableName(),
"IsDML": s.IsDML,
"SingleShardOnly": s.SingleShardOnly,
"ShardNameNeeded": s.ShardNameNeeded,
"MultishardAutocommit": s.MultishardAutocommit,
"ReservedConnectionNeeded": s.ReservedConnectionNeeded,
}
return PrimitiveDescription{
OperatorType: "Send",
Expand Down
79 changes: 79 additions & 0 deletions go/vt/vtgate/engine/unlock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
Copyright 2020 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package engine

import (
"context"

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/vterrors"
)

var _ Primitive = (*Unlock)(nil)

// Unlock primitive will execute unlock tables to all connections in the session.
type Unlock struct {
noTxNeeded
noInputs
}

const unlockTables = "unlock tables"

func (u *Unlock) RouteType() string {
return "UNLOCK"
}

func (u *Unlock) GetKeyspaceName() string {
return ""
}

func (u *Unlock) GetTableName() string {
return ""
}

func (u *Unlock) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
return nil, vterrors.VT13001("GetFields should not be called for unlock tables")
}

func (u *Unlock) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
rss := vcursor.Session().ShardSession()

if len(rss) == 0 {
return &sqltypes.Result{}, nil
}
bqs := make([]*querypb.BoundQuery, len(rss))
for i := 0; i < len(rss); i++ {
bqs[i] = &querypb.BoundQuery{Sql: unlockTables}
}
qr, errs := vcursor.ExecuteMultiShard(ctx, u, rss, bqs, true, false)
return qr, vterrors.Aggregate(errs)
}

func (u *Unlock) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
qr, err := u.TryExecute(ctx, vcursor, bindVars, wantfields)
if err != nil {
return err
}
return callback(qr)
}

func (u *Unlock) description() PrimitiveDescription {
return PrimitiveDescription{
OperatorType: "UnlockTables",
}
}
30 changes: 13 additions & 17 deletions go/vt/vtgate/planbuilder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,18 +382,12 @@ func buildFlushOptions(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*pla
dest = key.DestinationAllShards{}
}

tc := &tableCollector{}
for _, tbl := range stmt.TableNames {
tc.addASTTable(keyspace.Name, tbl)
}

return newPlanResult(&engine.Send{
Keyspace: keyspace,
TargetDestination: dest,
Query: sqlparser.String(stmt),
IsDML: false,
SingleShardOnly: false,
}, tc.getTables()...), nil
Keyspace: keyspace,
TargetDestination: dest,
Query: sqlparser.String(stmt),
ReservedConnectionNeeded: stmt.WithLock,
}), nil
}

func buildFlushTables(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*planResult, error) {
Expand Down Expand Up @@ -441,9 +435,10 @@ func buildFlushTables(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*plan
if len(tablesMap) == 1 {
for sendDest, tables := range tablesMap {
return newPlanResult(&engine.Send{
Keyspace: sendDest.ks,
TargetDestination: sendDest.dest,
Query: sqlparser.String(newFlushStmt(stmt, tables)),
Keyspace: sendDest.ks,
TargetDestination: sendDest.dest,
Query: sqlparser.String(newFlushStmt(stmt, tables)),
ReservedConnectionNeeded: stmt.WithLock,
}, tc.getTables()...), nil
}
}
Expand All @@ -455,9 +450,10 @@ func buildFlushTables(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*plan
var sources []engine.Primitive
for _, sendDest := range keys {
plan := &engine.Send{
Keyspace: sendDest.ks,
TargetDestination: sendDest.dest,
Query: sqlparser.String(newFlushStmt(stmt, tablesMap[sendDest])),
Keyspace: sendDest.ks,
TargetDestination: sendDest.dest,
Query: sqlparser.String(newFlushStmt(stmt, tablesMap[sendDest])),
ReservedConnectionNeeded: stmt.WithLock,
}
sources = append(sources, plan)
}
Expand Down
3 changes: 1 addition & 2 deletions go/vt/vtgate/planbuilder/locktables.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,5 @@ func buildLockPlan(stmt sqlparser.Statement, _ *sqlparser.ReservedVars, _ planco

// buildUnlockPlan plans lock tables statement.
func buildUnlockPlan(stmt sqlparser.Statement, _ *sqlparser.ReservedVars, _ plancontext.VSchema) (*planResult, error) {
log.Warningf("Unlock Tables statement is ignored: %v", stmt)
return newPlanResult(engine.NewRowsPrimitive(make([][]sqltypes.Value, 0), make([]*querypb.Field, 0))), nil
return newPlanResult(&engine.Unlock{}), nil
}
40 changes: 39 additions & 1 deletion go/vt/vtgate/planbuilder/testdata/flush_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
"Sharded": false
},
"TargetDestination": "AllShards()",
"Query": "flush local tables with read lock"
"Query": "flush local tables with read lock",
"ReservedConnectionNeeded": true
}
}
},
Expand All @@ -53,5 +54,42 @@
"Query": "flush local hosts, logs"
}
}
},
{
"comment": "Flush statement with multiple tables in different keyspace with read lock",
"query": "flush tables user.music, main.unsharded with read lock",
"plan": {
"QueryType": "FLUSH",
"Original": "flush tables user.music, main.unsharded with read lock",
"Instructions": {
"OperatorType": "Concatenate",
"Inputs": [
{
"OperatorType": "Send",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"TargetDestination": "AllShards()",
"Query": "flush tables unsharded with read lock",
"ReservedConnectionNeeded": true
},
{
"OperatorType": "Send",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"TargetDestination": "AllShards()",
"Query": "flush tables music with read lock",
"ReservedConnectionNeeded": true
}
]
},
"TablesUsed": [
"main.unsharded",
"user.music"
]
}
}
]
Loading

0 comments on commit 27f1ac2

Please sign in to comment.