diff --git a/go/test/endtoend/vtgate/misc_test.go b/go/test/endtoend/vtgate/misc_test.go index 83c41fd7183..f0c063b1caf 100644 --- a/go/test/endtoend/vtgate/misc_test.go +++ b/go/test/endtoend/vtgate/misc_test.go @@ -17,9 +17,13 @@ limitations under the License. package vtgate import ( + "context" "fmt" + "sync/atomic" "testing" + "time" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/test/endtoend/utils" @@ -336,6 +340,52 @@ func TestFlush(t *testing.T) { utils.Exec(t, conn, "flush local tables t1, t2") } +// TestFlushLock tests that ftwrl and unlock tables should unblock other session connections to execute the query. +func TestFlushLock(t *testing.T) { + conn, closer := start(t) + defer closer() + + // replica: fail it + utils.Exec(t, conn, "use @replica") + _, err := utils.ExecAllowError(t, conn, "flush tables ks.t1, ks.t2 with read lock") + require.ErrorContains(t, err, "VT09012: FLUSH statement with REPLICA tablet not allowed") + + // primary: should work + utils.Exec(t, conn, "use @primary") + utils.Exec(t, conn, "flush tables ks.t1, ks.t2 with read lock") + + var cnt atomic.Int32 + go func() { + ctx := context.Background() + conn2, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer conn2.Close() + + cnt.Add(1) + utils.Exec(t, conn2, "select * from ks.t1 for update") + cnt.Add(1) + }() + for cnt.Load() == 0 { + } + // added sleep to let the query execute inside the go routine, which should be blocked. + time.Sleep(1 * time.Second) + require.EqualValues(t, 1, cnt.Load()) + + // unlock it + utils.Exec(t, conn, "unlock tables") + + // now wait for go routine to complete. + timeout := time.After(3 * time.Second) + for cnt.Load() != 2 { + select { + case <-timeout: + t.Fatalf("test timeout waiting for select query to complete") + default: + + } + } +} + func TestShowVariables(t *testing.T) { conn, closer := start(t) defer closer() diff --git a/go/vt/vtgate/engine/send.go b/go/vt/vtgate/engine/send.go index 1a95d8f93fa..83ba960af9e 100644 --- a/go/vt/vtgate/engine/send.go +++ b/go/vt/vtgate/engine/send.go @@ -54,6 +54,8 @@ type Send struct { // MultishardAutocommit specifies that a multishard transaction query can autocommit MultishardAutocommit bool + ReservedConnectionNeeded bool + noInputs } @@ -88,19 +90,12 @@ func (s *Send) GetTableName() string { func (s *Send) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { ctx, cancelFunc := addQueryTimeout(ctx, vcursor, 0) defer cancelFunc() - rss, _, err := vcursor.ResolveDestinations(ctx, s.Keyspace.Name, nil, []key.Destination{s.TargetDestination}) + + rss, err := s.checkAndReturnShards(ctx, vcursor) if err != nil { return nil, err } - if !s.Keyspace.Sharded && len(rss) != 1 { - return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Keyspace does not have exactly one shard: %v", rss) - } - - if s.SingleShardOnly && len(rss) != 1 { - return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Unexpected error, DestinationKeyspaceID mapping to multiple shards: %s, got: %v", s.Query, s.TargetDestination) - } - queries := make([]*querypb.BoundQuery, len(rss)) for i, rs := range rss { bv := bindVars @@ -123,6 +118,26 @@ func (s *Send) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[str return result, nil } +func (s *Send) checkAndReturnShards(ctx context.Context, vcursor VCursor) ([]*srvtopo.ResolvedShard, error) { + rss, _, err := vcursor.ResolveDestinations(ctx, s.Keyspace.Name, nil, []key.Destination{s.TargetDestination}) + if err != nil { + return nil, err + } + + if !s.Keyspace.Sharded && len(rss) != 1 { + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Keyspace does not have exactly one shard: %v", rss) + } + + if s.SingleShardOnly && len(rss) != 1 { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Unexpected error, DestinationKeyspaceID mapping to multiple shards: %s, got: %v", s.Query, s.TargetDestination) + } + + if s.ReservedConnectionNeeded { + vcursor.Session().NeedsReservedConn() + } + return rss, nil +} + func (s *Send) canAutoCommit(vcursor VCursor, rss []*srvtopo.ResolvedShard) bool { if s.IsDML { return (len(rss) == 1 || s.MultishardAutocommit) && vcursor.AutocommitApproval() @@ -140,19 +155,11 @@ func copyBindVars(in map[string]*querypb.BindVariable) map[string]*querypb.BindV // TryStreamExecute implements Primitive interface func (s *Send) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { - rss, _, err := vcursor.ResolveDestinations(ctx, s.Keyspace.Name, nil, []key.Destination{s.TargetDestination}) + rss, err := s.checkAndReturnShards(ctx, vcursor) if err != nil { return err } - if !s.Keyspace.Sharded && len(rss) != 1 { - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Keyspace does not have exactly one shard: %v", rss) - } - - if s.SingleShardOnly && len(rss) != 1 { - return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Unexpected error, DestinationKeyspaceID mapping to multiple shards: %s, got: %v", s.Query, s.TargetDestination) - } - multiBindVars := make([]map[string]*querypb.BindVariable, len(rss)) for i, rs := range rss { bv := bindVars @@ -178,20 +185,13 @@ func (s *Send) GetFields(ctx context.Context, vcursor VCursor, bindVars map[stri func (s *Send) description() PrimitiveDescription { other := map[string]any{ - "Query": s.Query, - "Table": s.GetTableName(), - } - if s.IsDML { - other["IsDML"] = true - } - if s.SingleShardOnly { - other["SingleShardOnly"] = true - } - if s.ShardNameNeeded { - other["ShardNameNeeded"] = true - } - if s.MultishardAutocommit { - other["MultishardAutocommit"] = true + "Query": s.Query, + "Table": s.GetTableName(), + "IsDML": s.IsDML, + "SingleShardOnly": s.SingleShardOnly, + "ShardNameNeeded": s.ShardNameNeeded, + "MultishardAutocommit": s.MultishardAutocommit, + "ReservedConnectionNeeded": s.ReservedConnectionNeeded, } return PrimitiveDescription{ OperatorType: "Send", diff --git a/go/vt/vtgate/engine/unlock.go b/go/vt/vtgate/engine/unlock.go new file mode 100644 index 00000000000..5addbb957fa --- /dev/null +++ b/go/vt/vtgate/engine/unlock.go @@ -0,0 +1,79 @@ +/* +Copyright 2020 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package engine + +import ( + "context" + + "vitess.io/vitess/go/sqltypes" + querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/vterrors" +) + +var _ Primitive = (*Unlock)(nil) + +// Unlock primitive will execute unlock tables to all connections in the session. +type Unlock struct { + noTxNeeded + noInputs +} + +const unlockTables = "unlock tables" + +func (u *Unlock) RouteType() string { + return "UNLOCK" +} + +func (u *Unlock) GetKeyspaceName() string { + return "" +} + +func (u *Unlock) GetTableName() string { + return "" +} + +func (u *Unlock) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { + return nil, vterrors.VT13001("GetFields should not be called for unlock tables") +} + +func (u *Unlock) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { + rss := vcursor.Session().ShardSession() + + if len(rss) == 0 { + return &sqltypes.Result{}, nil + } + bqs := make([]*querypb.BoundQuery, len(rss)) + for i := 0; i < len(rss); i++ { + bqs[i] = &querypb.BoundQuery{Sql: unlockTables} + } + qr, errs := vcursor.ExecuteMultiShard(ctx, u, rss, bqs, true, false) + return qr, vterrors.Aggregate(errs) +} + +func (u *Unlock) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { + qr, err := u.TryExecute(ctx, vcursor, bindVars, wantfields) + if err != nil { + return err + } + return callback(qr) +} + +func (u *Unlock) description() PrimitiveDescription { + return PrimitiveDescription{ + OperatorType: "UnlockTables", + } +} diff --git a/go/vt/vtgate/planbuilder/builder.go b/go/vt/vtgate/planbuilder/builder.go index ffa316d4b71..4c1bfc0c547 100644 --- a/go/vt/vtgate/planbuilder/builder.go +++ b/go/vt/vtgate/planbuilder/builder.go @@ -382,18 +382,12 @@ func buildFlushOptions(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*pla dest = key.DestinationAllShards{} } - tc := &tableCollector{} - for _, tbl := range stmt.TableNames { - tc.addASTTable(keyspace.Name, tbl) - } - return newPlanResult(&engine.Send{ - Keyspace: keyspace, - TargetDestination: dest, - Query: sqlparser.String(stmt), - IsDML: false, - SingleShardOnly: false, - }, tc.getTables()...), nil + Keyspace: keyspace, + TargetDestination: dest, + Query: sqlparser.String(stmt), + ReservedConnectionNeeded: stmt.WithLock, + }), nil } func buildFlushTables(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*planResult, error) { @@ -441,9 +435,10 @@ func buildFlushTables(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*plan if len(tablesMap) == 1 { for sendDest, tables := range tablesMap { return newPlanResult(&engine.Send{ - Keyspace: sendDest.ks, - TargetDestination: sendDest.dest, - Query: sqlparser.String(newFlushStmt(stmt, tables)), + Keyspace: sendDest.ks, + TargetDestination: sendDest.dest, + Query: sqlparser.String(newFlushStmt(stmt, tables)), + ReservedConnectionNeeded: stmt.WithLock, }, tc.getTables()...), nil } } @@ -455,9 +450,10 @@ func buildFlushTables(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*plan var sources []engine.Primitive for _, sendDest := range keys { plan := &engine.Send{ - Keyspace: sendDest.ks, - TargetDestination: sendDest.dest, - Query: sqlparser.String(newFlushStmt(stmt, tablesMap[sendDest])), + Keyspace: sendDest.ks, + TargetDestination: sendDest.dest, + Query: sqlparser.String(newFlushStmt(stmt, tablesMap[sendDest])), + ReservedConnectionNeeded: stmt.WithLock, } sources = append(sources, plan) } diff --git a/go/vt/vtgate/planbuilder/locktables.go b/go/vt/vtgate/planbuilder/locktables.go index 9c3a5fa44e9..e8776d13e65 100644 --- a/go/vt/vtgate/planbuilder/locktables.go +++ b/go/vt/vtgate/planbuilder/locktables.go @@ -33,6 +33,5 @@ func buildLockPlan(stmt sqlparser.Statement, _ *sqlparser.ReservedVars, _ planco // buildUnlockPlan plans lock tables statement. func buildUnlockPlan(stmt sqlparser.Statement, _ *sqlparser.ReservedVars, _ plancontext.VSchema) (*planResult, error) { - log.Warningf("Unlock Tables statement is ignored: %v", stmt) - return newPlanResult(engine.NewRowsPrimitive(make([][]sqltypes.Value, 0), make([]*querypb.Field, 0))), nil + return newPlanResult(&engine.Unlock{}), nil } diff --git a/go/vt/vtgate/planbuilder/testdata/flush_cases.json b/go/vt/vtgate/planbuilder/testdata/flush_cases.json index 8298c6de649..26a1f218c8d 100644 --- a/go/vt/vtgate/planbuilder/testdata/flush_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/flush_cases.json @@ -33,7 +33,8 @@ "Sharded": false }, "TargetDestination": "AllShards()", - "Query": "flush local tables with read lock" + "Query": "flush local tables with read lock", + "ReservedConnectionNeeded": true } } }, @@ -53,5 +54,42 @@ "Query": "flush local hosts, logs" } } + }, + { + "comment": "Flush statement with multiple tables in different keyspace with read lock", + "query": "flush tables user.music, main.unsharded with read lock", + "plan": { + "QueryType": "FLUSH", + "Original": "flush tables user.music, main.unsharded with read lock", + "Instructions": { + "OperatorType": "Concatenate", + "Inputs": [ + { + "OperatorType": "Send", + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "TargetDestination": "AllShards()", + "Query": "flush tables unsharded with read lock", + "ReservedConnectionNeeded": true + }, + { + "OperatorType": "Send", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "TargetDestination": "AllShards()", + "Query": "flush tables music with read lock", + "ReservedConnectionNeeded": true + } + ] + }, + "TablesUsed": [ + "main.unsharded", + "user.music" + ] + } } ] diff --git a/go/vt/vtgate/planbuilder/testdata/flush_cases_no_default_keyspace.json b/go/vt/vtgate/planbuilder/testdata/flush_cases_no_default_keyspace.json index a3370a74f5d..7afd090ba21 100644 --- a/go/vt/vtgate/planbuilder/testdata/flush_cases_no_default_keyspace.json +++ b/go/vt/vtgate/planbuilder/testdata/flush_cases_no_default_keyspace.json @@ -15,7 +15,8 @@ "Sharded": false }, "TargetDestination": "AllShards()", - "Query": "flush local tables unsharded_a with read lock" + "Query": "flush local tables unsharded_a with read lock", + "ReservedConnectionNeeded": true }, { "OperatorType": "Send", @@ -24,7 +25,8 @@ "Sharded": true }, "TargetDestination": "AllShards()", - "Query": "flush local tables `user`, user_extra with read lock" + "Query": "flush local tables `user`, user_extra with read lock", + "ReservedConnectionNeeded": true } ] }, @@ -105,7 +107,8 @@ "Sharded": false }, "TargetDestination": "AllShards()", - "Query": "flush tables a with read lock" + "Query": "flush tables a with read lock", + "ReservedConnectionNeeded": true }, "TablesUsed": [ "main.a" @@ -128,7 +131,8 @@ "Sharded": false }, "TargetDestination": "AllShards()", - "Query": "flush local tables unsharded_a with read lock" + "Query": "flush local tables unsharded_a with read lock", + "ReservedConnectionNeeded": true }, { "OperatorType": "Send", @@ -137,7 +141,8 @@ "Sharded": false }, "TargetDestination": "AllShards()", - "Query": "flush local tables unsharded_tab with read lock" + "Query": "flush local tables unsharded_tab with read lock", + "ReservedConnectionNeeded": true }, { "OperatorType": "Send", @@ -146,7 +151,8 @@ "Sharded": true }, "TargetDestination": "AllShards()", - "Query": "flush local tables `user`, user_extra with read lock" + "Query": "flush local tables `user`, user_extra with read lock", + "ReservedConnectionNeeded": true } ] }, diff --git a/go/vt/vtgate/planbuilder/testdata/lock_cases.json b/go/vt/vtgate/planbuilder/testdata/lock_cases.json index 568b066fa22..2490424a1ec 100644 --- a/go/vt/vtgate/planbuilder/testdata/lock_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/lock_cases.json @@ -97,7 +97,7 @@ "QueryType": "UNLOCK_TABLES", "Original": "unlock tables", "Instructions": { - "OperatorType": "Rows" + "OperatorType": "UnlockTables" } } }, diff --git a/go/vt/vttablet/endtoend/reserve_test.go b/go/vt/vttablet/endtoend/reserve_test.go index 63b87f9c00f..d3fb685dd49 100644 --- a/go/vt/vttablet/endtoend/reserve_test.go +++ b/go/vt/vttablet/endtoend/reserve_test.go @@ -28,8 +28,6 @@ import ( "vitess.io/vitess/go/vt/vttablet/endtoend/framework" ) -//TODO: Add Counter checks in all the tests. - func TestMultipleReserveHaveDifferentConnection(t *testing.T) { framework.Server.Config().EnableSettingsPool = false defer func() { @@ -1190,3 +1188,23 @@ func TestReserveQueryTimeout(t *testing.T) { assert.NoError(t, client.Release()) } + +// TestReserveFlushTables checks that `flush table with read lock` works only with reserve api. +func TestReserveFlushTables(t *testing.T) { + client := framework.NewClient() + + _, err := client.Execute("flush tables with read lock", nil) + assert.ErrorContains(t, err, "Flush not allowed without reserved connection") + + _, err = client.Execute("unlock tables", nil) + assert.ErrorContains(t, err, "unlock tables should be executed with an existing connection") + + _, err = client.ReserveExecute("flush tables with read lock", nil, nil) + assert.NoError(t, err) + + _, err = client.Execute("unlock tables", nil) + assert.NoError(t, err) + + assert.NoError(t, + client.Release()) +} diff --git a/go/vt/vttablet/tabletserver/planbuilder/builder.go b/go/vt/vttablet/tabletserver/planbuilder/builder.go index 3cae292b593..4594f6350f6 100644 --- a/go/vt/vttablet/tabletserver/planbuilder/builder.go +++ b/go/vt/vttablet/tabletserver/planbuilder/builder.go @@ -219,3 +219,21 @@ func analyzeDDL(stmt sqlparser.DDLStatement) (*Plan, error) { } return &Plan{PlanID: PlanDDL, FullQuery: fullQuery, FullStmt: stmt, NeedsReservedConn: stmt.IsTemporary()}, nil } + +func analyzeFlush(stmt *sqlparser.Flush, tables map[string]*schema.Table) (*Plan, error) { + plan := &Plan{PlanID: PlanFlush, FullQuery: GenerateFullQuery(stmt)} + + for _, tbl := range stmt.TableNames { + if schemaTbl, ok := tables[tbl.Name.String()]; ok { + plan.AllTables = append(plan.AllTables, schemaTbl) + } + } + if len(plan.AllTables) == 1 { + plan.Table = plan.AllTables[0] + } + + if stmt.WithLock { + plan.NeedsReservedConn = true + } + return plan, nil +} diff --git a/go/vt/vttablet/tabletserver/planbuilder/permission.go b/go/vt/vttablet/tabletserver/planbuilder/permission.go index a9d772f2931..d6069c6adf3 100644 --- a/go/vt/vttablet/tabletserver/planbuilder/permission.go +++ b/go/vt/vttablet/tabletserver/planbuilder/permission.go @@ -65,7 +65,8 @@ func BuildPermissions(stmt sqlparser.Statement) []Permission { case *sqlparser.Analyze: permissions = buildTableNamePermissions(node.Table, tableacl.WRITER, permissions) case *sqlparser.OtherAdmin, *sqlparser.CallProc, *sqlparser.Begin, *sqlparser.Commit, *sqlparser.Rollback, - *sqlparser.Load, *sqlparser.Savepoint, *sqlparser.Release, *sqlparser.SRollback, *sqlparser.Set, *sqlparser.Show, sqlparser.Explain: + *sqlparser.Load, *sqlparser.Savepoint, *sqlparser.Release, *sqlparser.SRollback, *sqlparser.Set, *sqlparser.Show, sqlparser.Explain, + *sqlparser.UnlockTables: // no op default: panic(fmt.Errorf("BUG: unexpected statement type: %T", node)) diff --git a/go/vt/vttablet/tabletserver/planbuilder/plan.go b/go/vt/vttablet/tabletserver/planbuilder/plan.go index c4a8f905607..6f491692241 100644 --- a/go/vt/vttablet/tabletserver/planbuilder/plan.go +++ b/go/vt/vttablet/tabletserver/planbuilder/plan.go @@ -246,7 +246,9 @@ func Build(statement sqlparser.Statement, tables map[string]*schema.Table, dbNam case *sqlparser.Load: plan, err = &Plan{PlanID: PlanLoad}, nil case *sqlparser.Flush: - plan, err = &Plan{PlanID: PlanFlush, FullQuery: GenerateFullQuery(stmt)}, nil + plan, err = analyzeFlush(stmt, tables) + case *sqlparser.UnlockTables: + plan, err = &Plan{PlanID: PlanUnlockTables}, nil case *sqlparser.CallProc: plan, err = &Plan{PlanID: PlanCallProc, FullQuery: GenerateFullQuery(stmt)}, nil default: diff --git a/go/vt/vttablet/tabletserver/planbuilder/testdata/exec_cases.txt b/go/vt/vttablet/tabletserver/planbuilder/testdata/exec_cases.txt index d5bd99ca7a0..1e9dbf6ad12 100644 --- a/go/vt/vttablet/tabletserver/planbuilder/testdata/exec_cases.txt +++ b/go/vt/vttablet/tabletserver/planbuilder/testdata/exec_cases.txt @@ -939,6 +939,25 @@ options:PassthroughDMLs "FullQuery": "flush tables a, b" } +# flush statement with read lock +"flush tables a,b with read lock" +{ + "PlanID": "Flush", + "TableName": "", + "Permissions": [ + { + "TableName": "a", + "Role": 2 + }, + { + "TableName": "b", + "Role": 2 + } + ], + "FullQuery": "flush tables a, b with read lock", + "NeedsReservedConn": true +} + # call proc "call getAllTheThings()" { diff --git a/go/vt/vttablet/tabletserver/query_engine.go b/go/vt/vttablet/tabletserver/query_engine.go index c31b65be8dd..0afe76d14ce 100644 --- a/go/vt/vttablet/tabletserver/query_engine.go +++ b/go/vt/vttablet/tabletserver/query_engine.go @@ -109,7 +109,7 @@ func (ep *TabletPlan) IsValid(hasReservedCon, hasSysSettings bool) error { func isValid(planType planbuilder.PlanType, hasReservedCon bool, hasSysSettings bool) error { switch planType { - case planbuilder.PlanSelectLockFunc, planbuilder.PlanDDL: + case planbuilder.PlanSelectLockFunc, planbuilder.PlanDDL, planbuilder.PlanFlush: if hasReservedCon { return nil } diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 735562c536f..e53bdaec754 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -207,6 +207,8 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) { return qre.execShowThrottledApps() case p.PlanShowThrottlerStatus: return qre.execShowThrottlerStatus() + case p.PlanUnlockTables: + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "unlock tables should be executed with an existing connection") case p.PlanSet: if qre.setting == nil { return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "[BUG] %s not allowed without setting connection", qre.query) @@ -279,7 +281,7 @@ func (qre *QueryExecutor) txConnExec(conn *StatefulConnection) (*sqltypes.Result return qre.txFetch(conn, true) case p.PlanUpdateLimit, p.PlanDeleteLimit: return qre.execDMLLimit(conn) - case p.PlanOtherRead, p.PlanOtherAdmin, p.PlanFlush: + case p.PlanOtherRead, p.PlanOtherAdmin, p.PlanFlush, p.PlanUnlockTables: return qre.execStatefulConn(conn, qre.query, true) case p.PlanSavepoint, p.PlanRelease, p.PlanSRollback: return qre.execStatefulConn(conn, qre.query, true)