diff --git a/go/test/endtoend/transaction/twopc/stress/main_test.go b/go/test/endtoend/transaction/twopc/stress/main_test.go index 782ffe1ec12..76cd05df50a 100644 --- a/go/test/endtoend/transaction/twopc/stress/main_test.go +++ b/go/test/endtoend/transaction/twopc/stress/main_test.go @@ -73,6 +73,8 @@ func TestMain(m *testing.M) { "--twopc_enable", "--twopc_abandon_age", "1", "--migration_check_interval", "2s", + "--onterm_timeout", "1s", + "--onclose_timeout", "1s", ) // Start keyspace @@ -123,8 +125,6 @@ func start(t *testing.T) (*mysql.Conn, func()) { func cleanup(t *testing.T) { cluster.PanicHandler(t) - - utils.ClearOutTable(t, vtParams, "twopc_fuzzer_insert") - utils.ClearOutTable(t, vtParams, "twopc_fuzzer_update") utils.ClearOutTable(t, vtParams, "twopc_t1") + utils.ClearOutTable(t, vtParams, "twopc_settings") } diff --git a/go/test/endtoend/transaction/twopc/stress/schema.sql b/go/test/endtoend/transaction/twopc/stress/schema.sql index 5173166bfd4..81f28d21340 100644 --- a/go/test/endtoend/transaction/twopc/stress/schema.sql +++ b/go/test/endtoend/transaction/twopc/stress/schema.sql @@ -1,20 +1,11 @@ -create table twopc_fuzzer_update ( +create table twopc_t1 ( id bigint, col bigint, primary key (id) ) Engine=InnoDB; -create table twopc_fuzzer_insert ( - id bigint, - updateSet bigint, - threadId bigint, - col bigint auto_increment, - key(col), - primary key (id, col) -) Engine=InnoDB; - -create table twopc_t1 ( +create table twopc_settings ( id bigint, - col bigint, + col varchar(50), primary key (id) ) Engine=InnoDB; diff --git a/go/test/endtoend/transaction/twopc/stress/stress_test.go b/go/test/endtoend/transaction/twopc/stress/stress_test.go index acb8146975b..05a89b780dc 100644 --- a/go/test/endtoend/transaction/twopc/stress/stress_test.go +++ b/go/test/endtoend/transaction/twopc/stress/stress_test.go @@ -42,6 +42,109 @@ import ( "vitess.io/vitess/go/vt/schema" ) +var ( + // idVals are the primary key values to use while creating insert queries that ensures all the three shards get an insert. + idVals = [3]int{ + 4, // 4 maps to 0x20 and ends up in the first shard (-40) + 6, // 6 maps to 0x60 and ends up in the second shard (40-80) + 9, // 9 maps to 0x90 and ends up in the third shard (80-) + } +) + +func TestSettings(t *testing.T) { + testcases := []struct { + name string + commitDelayTime string + queries []string + verifyFunc func(t *testing.T, vtParams *mysql.ConnParams) + }{ + { + name: "No settings changes", + commitDelayTime: "5", + queries: append([]string{"begin"}, getMultiShardInsertQueries()...), + verifyFunc: func(t *testing.T, vtParams *mysql.ConnParams) { + // There is nothing to verify. + }, + }, + { + name: "Settings changes before begin", + commitDelayTime: "5", + queries: append( + append([]string{`set @@time_zone="+10:30"`, "begin"}, getMultiShardInsertQueries()...), + "insert into twopc_settings(id, col) values(9, now())"), + verifyFunc: func(t *testing.T, vtParams *mysql.ConnParams) { + // We can check that the time_zone setting was taken into account by checking the diff with the time by using a different time_zone. + ctx := context.Background() + conn, err := mysql.Connect(ctx, vtParams) + require.NoError(t, err) + defer conn.Close() + utils.Exec(t, conn, `set @@time_zone="+7:00"`) + utils.AssertMatches(t, conn, `select HOUR(TIMEDIFF((select col from twopc_settings where id = 9),now()))`, `[[INT64(3)]]`) + }, + }, + { + name: "Settings changes during transaction", + commitDelayTime: "5", + queries: append( + append([]string{"begin"}, getMultiShardInsertQueries()...), + `set @@time_zone="+10:30"`, + "insert into twopc_settings(id, col) values(9, now())"), + verifyFunc: func(t *testing.T, vtParams *mysql.ConnParams) { + // We can check that the time_zone setting was taken into account by checking the diff with the time by using a different time_zone. + ctx := context.Background() + conn, err := mysql.Connect(ctx, vtParams) + require.NoError(t, err) + defer conn.Close() + utils.Exec(t, conn, `set @@time_zone="+7:00"`) + utils.AssertMatches(t, conn, `select HOUR(TIMEDIFF((select col from twopc_settings where id = 9),now()))`, `[[INT64(3)]]`) + }, + }, + { + name: "Settings changes before begin and during transaction", + commitDelayTime: "5", + queries: append( + append([]string{`set @@time_zone="+10:30"`, "begin"}, getMultiShardInsertQueries()...), + "insert into twopc_settings(id, col) values(9, now())", + `set @@time_zone="+7:00"`, + "insert into twopc_settings(id, col) values(25, now())"), + verifyFunc: func(t *testing.T, vtParams *mysql.ConnParams) { + // We can check that the time_zone setting was taken into account by checking the diff with the time by using a different time_zone. + ctx := context.Background() + conn, err := mysql.Connect(ctx, vtParams) + require.NoError(t, err) + defer conn.Close() + utils.AssertMatches(t, conn, `select HOUR(TIMEDIFF((select col from twopc_settings where id = 9),(select col from twopc_settings where id = 25)))`, `[[INT64(3)]]`) + }, + }, + } + + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + // Reparent all the shards to first tablet being the primary. + reparentToFirstTablet(t) + // cleanup all the old data. + conn, closer := start(t) + defer closer() + defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitShard) + defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitTime) + var wg sync.WaitGroup + runMultiShardCommitWithDelay(t, conn, tt.commitDelayTime, &wg, tt.queries) + // Allow enough time for the commit to have started. + time.Sleep(1 * time.Second) + // Run the vttablet restart to ensure that the transaction needs to be redone. + err := vttabletRestartShard3(t) + require.NoError(t, err) + // Wait for the commit to have returned. We don't actually check for an error in the commit because the user might receive an error. + // But since we are waiting in CommitPrepared, the decision to commit the transaction should have already been taken. + wg.Wait() + // Wair for the data in the table to see that the transaction was committed. + twopcutil.WaitForResults(t, &vtParams, "select id, col from twopc_t1 where col = 4 order by id", `[[INT64(4) INT64(4)] [INT64(6) INT64(4)] [INT64(9) INT64(4)]]`, 30*time.Second) + tt.verifyFunc(t, &vtParams) + }) + } + +} + // TestDisruptions tests that atomic transactions persevere through various disruptions. func TestDisruptions(t *testing.T) { testcases := []struct { @@ -112,30 +215,10 @@ func TestDisruptions(t *testing.T) { // cleanup all the old data. conn, closer := start(t) defer closer() - // Start an atomic transaction. - utils.Exec(t, conn, "begin") - // Insert rows such that they go to all the three shards. Given that we have sharded the table `twopc_t1` on reverse_bits - // it is very easy to figure out what value will end up in which shard. - idVals := []int{4, 6, 9} - for _, val := range idVals { - utils.Exec(t, conn, fmt.Sprintf("insert into twopc_t1(id, col) values(%d, 4)", val)) - } - // We want to delay the commit on one of the shards to simulate slow commits on a shard. - twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitShard, "80-") defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitShard) - twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitTime, tt.commitDelayTime) defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitTime) - // We will execute a commit in a go routine, because we know it will take some time to complete. - // While the commit is ongoing, we would like to run the disruption. var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - _, err := utils.ExecAllowError(t, conn, "commit") - if err != nil { - log.Errorf("Error in commit - %v", err) - } - }() + runMultiShardCommitWithDelay(t, conn, tt.commitDelayTime, &wg, append([]string{"begin"}, getMultiShardInsertQueries()...)) // Allow enough time for the commit to have started. time.Sleep(1 * time.Second) writeCtx, writeCancel := context.WithCancel(context.Background()) @@ -167,6 +250,38 @@ func TestDisruptions(t *testing.T) { } } +// getMultiShardInsertQueries gets the queries that will cause one insert on all the shards. +func getMultiShardInsertQueries() []string { + var queries []string + // Insert rows such that they go to all the three shards. Given that we have sharded the table `twopc_t1` on reverse_bits + // it is very easy to figure out what value will end up in which shard. + for _, val := range idVals { + queries = append(queries, fmt.Sprintf("insert into twopc_t1(id, col) values(%d, 4)", val)) + } + return queries +} + +// runMultiShardCommitWithDelay runs a multi shard commit and configures it to wait for a certain amount of time in the commit phase. +func runMultiShardCommitWithDelay(t *testing.T, conn *mysql.Conn, commitDelayTime string, wg *sync.WaitGroup, queries []string) { + // Run all the queries to start the transaction. + for _, query := range queries { + utils.Exec(t, conn, query) + } + // We want to delay the commit on one of the shards to simulate slow commits on a shard. + twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitShard, "80-") + twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitTime, commitDelayTime) + // We will execute a commit in a go routine, because we know it will take some time to complete. + // While the commit is ongoing, we would like to run the disruption. + wg.Add(1) + go func() { + defer wg.Done() + _, err := utils.ExecAllowError(t, conn, "commit") + if err != nil { + log.Errorf("Error in commit - %v", err) + } + }() +} + func mergeShards(t *testing.T) error { return twopcutil.RunReshard(t, clusterInstance, "TestDisruptions", keyspaceName, "40-80,80-", "40-") } @@ -234,7 +349,9 @@ func ersShard3(t *testing.T) error { func vttabletRestartShard3(t *testing.T) error { shard := clusterInstance.Keyspaces[0].Shards[2] tablet := shard.Vttablets[0] - return tablet.RestartOnlyTablet() + _ = tablet.VttabletProcess.TearDownWithTimeout(2 * time.Second) + tablet.VttabletProcess.ServingStatus = "SERVING" + return tablet.VttabletProcess.Setup() } // mysqlRestartShard3 restarts MySQL on the first tablet of the third shard. diff --git a/go/test/endtoend/transaction/twopc/stress/vschema.json b/go/test/endtoend/transaction/twopc/stress/vschema.json index 415b5958f54..6873b233e61 100644 --- a/go/test/endtoend/transaction/twopc/stress/vschema.json +++ b/go/test/endtoend/transaction/twopc/stress/vschema.json @@ -6,15 +6,7 @@ } }, "tables": { - "twopc_fuzzer_update": { - "column_vindexes": [ - { - "column": "id", - "name": "reverse_bits" - } - ] - }, - "twopc_fuzzer_insert": { + "twopc_t1": { "column_vindexes": [ { "column": "id", @@ -22,7 +14,7 @@ } ] }, - "twopc_t1": { + "twopc_settings": { "column_vindexes": [ { "column": "id", diff --git a/go/test/endtoend/transaction/twopc/twopc_test.go b/go/test/endtoend/transaction/twopc/twopc_test.go index dda655613de..742aa832cfe 100644 --- a/go/test/endtoend/transaction/twopc/twopc_test.go +++ b/go/test/endtoend/transaction/twopc/twopc_test.go @@ -61,6 +61,7 @@ func TestDTCommit(t *testing.T) { utils.Exec(t, conn, "begin") utils.Exec(t, conn, "insert into twopc_user(id, name) values(7,'foo')") utils.Exec(t, conn, "insert into twopc_user(id, name) values(8,'bar')") + utils.Exec(t, conn, `set @@time_zone="+10:30"`) utils.Exec(t, conn, "insert into twopc_user(id, name) values(9,'baz')") utils.Exec(t, conn, "insert into twopc_user(id, name) values(10,'apa')") utils.Exec(t, conn, "commit") @@ -89,12 +90,16 @@ func TestDTCommit(t *testing.T) { "delete:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", }, "ks.redo_statement:-40": { - "insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]", - "delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]", + "insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"set @@time_zone = '+10:30'\")]", + "insert:[VARCHAR(\"dtid-1\") INT64(2) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"set @@time_zone = '+10:30'\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(2) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]", }, "ks.redo_statement:40-80": { "insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]", + "insert:[VARCHAR(\"dtid-1\") INT64(2) BLOB(\"set @@time_zone = '+10:30'\")]", "delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(2) BLOB(\"set @@time_zone = '+10:30'\")]", }, "ks.twopc_user:-40": { `insert:[INT64(10) VARCHAR("apa")]`, @@ -132,8 +137,10 @@ func TestDTCommit(t *testing.T) { "delete:[VARCHAR(\"dtid-2\") VARCHAR(\"PREPARE\")]", }, "ks.redo_statement:40-80": { - "insert:[VARCHAR(\"dtid-2\") INT64(1) BLOB(\"update twopc_user set `name` = 'newfoo' where id = 8 limit 10001 /* INT64 */\")]", - "delete:[VARCHAR(\"dtid-2\") INT64(1) BLOB(\"update twopc_user set `name` = 'newfoo' where id = 8 limit 10001 /* INT64 */\")]", + "insert:[VARCHAR(\"dtid-2\") INT64(1) BLOB(\"set @@time_zone = '+10:30'\")]", + "insert:[VARCHAR(\"dtid-2\") INT64(2) BLOB(\"update twopc_user set `name` = 'newfoo' where id = 8 limit 10001 /* INT64 */\")]", + "delete:[VARCHAR(\"dtid-2\") INT64(1) BLOB(\"set @@time_zone = '+10:30'\")]", + "delete:[VARCHAR(\"dtid-2\") INT64(2) BLOB(\"update twopc_user set `name` = 'newfoo' where id = 8 limit 10001 /* INT64 */\")]", }, "ks.twopc_user:40-80": {"update:[INT64(8) VARCHAR(\"newfoo\")]"}, "ks.twopc_user:80-": {"update:[INT64(7) VARCHAR(\"newfoo\")]"}, @@ -163,8 +170,10 @@ func TestDTCommit(t *testing.T) { "delete:[VARCHAR(\"dtid-3\") VARCHAR(\"PREPARE\")]", }, "ks.redo_statement:-40": { - "insert:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from twopc_user where id = 10 limit 10001 /* INT64 */\")]", - "delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from twopc_user where id = 10 limit 10001 /* INT64 */\")]", + "insert:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"set @@time_zone = '+10:30'\")]", + "insert:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"delete from twopc_user where id = 10 limit 10001 /* INT64 */\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"set @@time_zone = '+10:30'\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"delete from twopc_user where id = 10 limit 10001 /* INT64 */\")]", }, "ks.twopc_user:-40": {"delete:[INT64(10) VARCHAR(\"apa\")]"}, "ks.twopc_user:80-": {"delete:[INT64(9) VARCHAR(\"baz\")]"}, diff --git a/go/test/endtoend/transaction/twopc/utils/utils.go b/go/test/endtoend/transaction/twopc/utils/utils.go index 14f3214ae00..9d0adb57e3c 100644 --- a/go/test/endtoend/transaction/twopc/utils/utils.go +++ b/go/test/endtoend/transaction/twopc/utils/utils.go @@ -88,6 +88,8 @@ func ClearOutTable(t *testing.T, vtParams mysql.ConnParams, tableName string) { // WriteTestCommunicationFile writes the content to the file with the given name. // We use these files to coordinate with the vttablets running in the debug mode. func WriteTestCommunicationFile(t *testing.T, fileName string, content string) { + // Delete the file just to make sure it doesn't exist before we write to it. + DeleteFile(fileName) err := os.WriteFile(path.Join(os.Getenv("VTDATAROOT"), fileName), []byte(content), 0644) require.NoError(t, err) } diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index 043c45cf62a..3732a37d1d1 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -42,6 +42,11 @@ import ( "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/discovery" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vtgate/buffer" @@ -50,12 +55,6 @@ import ( "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vtgate/vschemaacl" "vitess.io/vitess/go/vt/vtgate/vtgateservice" - - querypb "vitess.io/vitess/go/vt/proto/query" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" - vschemapb "vitess.io/vitess/go/vt/proto/vschema" - vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) func TestExecutorResultsExceeded(t *testing.T) { @@ -2794,14 +2793,13 @@ func TestExecutorPrepareExecute(t *testing.T) { require.Error(t, err) } -// TestExecutorRejectTwoPC test all the unsupported cases for multi-shard atomic commit. -func TestExecutorRejectTwoPC(t *testing.T) { +// TestExecutorSettingsInTwoPC tests that settings are supported for multi-shard atomic commit. +func TestExecutorSettingsInTwoPC(t *testing.T) { executor, sbc1, sbc2, _, ctx := createExecutorEnv(t) tcases := []struct { - sqls []string - testRes []*sqltypes.Result - - expErr string + sqls []string + testRes []*sqltypes.Result + expectedQueries [][]string }{ { sqls: []string{ @@ -2810,8 +2808,72 @@ func TestExecutorRejectTwoPC(t *testing.T) { `insert into user_extra(user_id) values (2)`, `insert into user_extra(user_id) values (3)`, }, - expErr: "VT12001: unsupported: atomic distributed transaction commit with system settings", - }, { + testRes: []*sqltypes.Result{ + sqltypes.MakeTestResult(sqltypes.MakeTestFields("id", "varchar"), + "+08:00"), + }, + expectedQueries: [][]string{ + { + "select '+08:00' from dual where @@time_zone != '+08:00'", + "set @@time_zone = '+08:00'", + "set @@time_zone = '+08:00'", + "insert into user_extra(user_id) values (1)", + "insert into user_extra(user_id) values (2)", + }, + { + "set @@time_zone = '+08:00'", + "insert into user_extra(user_id) values (3)", + }, + }, + }, + } + + for _, tcase := range tcases { + t.Run(fmt.Sprintf("%v", tcase.sqls), func(t *testing.T) { + sbc1.SetResults(tcase.testRes) + sbc2.SetResults(tcase.testRes) + + // create a new session + session := NewSafeSession(&vtgatepb.Session{ + TargetString: KsTestSharded, + TransactionMode: vtgatepb.TransactionMode_TWOPC, + EnableSystemSettings: true, + }) + + // start transaction + _, err := executor.Execute(ctx, nil, "TestExecutorSettingsInTwoPC", session, "begin", nil) + require.NoError(t, err) + + // execute queries + for _, sql := range tcase.sqls { + _, err = executor.Execute(ctx, nil, "TestExecutorSettingsInTwoPC", session, sql, nil) + require.NoError(t, err) + } + + // commit 2pc + _, err = executor.Execute(ctx, nil, "TestExecutorSettingsInTwoPC", session, "commit", nil) + require.NoError(t, err) + + queriesRecvd, err := sbc1.GetFinalQueries() + require.NoError(t, err) + assert.EqualValues(t, tcase.expectedQueries[0], queriesRecvd) + queriesRecvd, err = sbc2.GetFinalQueries() + require.NoError(t, err) + assert.EqualValues(t, tcase.expectedQueries[1], queriesRecvd) + }) + } +} + +// TestExecutorRejectTwoPC test all the unsupported cases for multi-shard atomic commit. +func TestExecutorRejectTwoPC(t *testing.T) { + executor, sbc1, sbc2, _, ctx := createExecutorEnv(t) + tcases := []struct { + sqls []string + testRes []*sqltypes.Result + + expErr string + }{ + { sqls: []string{ `update t1 set unq_col = 1 where id = 1`, `update t1 set unq_col = 1 where id = 3`, diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go index f7dc472df2e..eba362e82f9 100644 --- a/go/vt/vtgate/tx_conn.go +++ b/go/vt/vtgate/tx_conn.go @@ -280,9 +280,6 @@ func (txc *TxConn) checkValidCondition(session *SafeSession) error { if len(session.PreSessions) != 0 || len(session.PostSessions) != 0 { return vterrors.VT12001("atomic distributed transaction commit with consistent lookup vindex") } - if session.GetInReservedConn() { - return vterrors.VT12001("atomic distributed transaction commit with system settings") - } return nil } diff --git a/go/vt/vttablet/sandboxconn/sandboxconn.go b/go/vt/vttablet/sandboxconn/sandboxconn.go index ae956e5e249..6fbf0032b2a 100644 --- a/go/vt/vttablet/sandboxconn/sandboxconn.go +++ b/go/vt/vttablet/sandboxconn/sandboxconn.go @@ -172,6 +172,28 @@ func (sbc *SandboxConn) GetQueries() []*querypb.BoundQuery { return sbc.Queries } +// GetFinalQueries gets the final queries as strings from sandboxconn. +func (sbc *SandboxConn) GetFinalQueries() ([]string, error) { + if sbc.queriesRequireLocking { + sbc.queriesMu.Lock() + defer sbc.queriesMu.Unlock() + } + var queries []string + for _, q := range sbc.Queries { + stmt, err := sbc.parser.Parse(q.Sql) + if err != nil { + return nil, err + } + pq := sqlparser.NewParsedQuery(stmt) + query, err := pq.GenerateQuery(q.BindVariables, nil) + if err != nil { + return nil, err + } + queries = append(queries, query) + } + return queries, nil +} + // ClearQueries clears the Queries in sandboxconn. func (sbc *SandboxConn) ClearQueries() { if sbc.queriesRequireLocking { diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index d06953b3241..abf296c0583 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -167,9 +167,15 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) { } defer conn.Unlock() if qre.setting != nil { - if err = conn.ApplySetting(qre.ctx, qre.setting); err != nil { + applied, err := conn.ApplySetting(qre.ctx, qre.setting) + if err != nil { return nil, vterrors.Wrap(err, "failed to execute system setting on the connection") } + // If we have applied the settings on the connection, then we should record the query detail. + // This is required for redoing the transaction in case of a failure. + if applied { + conn.TxProperties().RecordQueryDetail(qre.setting.ApplyQuery(), nil) + } } return qre.txConnExec(conn) } @@ -278,8 +284,10 @@ func (qre *QueryExecutor) execAsTransaction(f func(conn *StatefulConnection) (*s func (qre *QueryExecutor) txConnExec(conn *StatefulConnection) (*sqltypes.Result, error) { switch qre.plan.PlanID { - case p.PlanInsert, p.PlanUpdate, p.PlanDelete, p.PlanSet: + case p.PlanInsert, p.PlanUpdate, p.PlanDelete: return qre.txFetch(conn, true) + case p.PlanSet: + return qre.txFetch(conn, false) case p.PlanInsertMessage: qre.bindVars["#time_now"] = sqltypes.Int64BindVariable(time.Now().UnixNano()) return qre.txFetch(conn, true) @@ -382,7 +390,7 @@ func (qre *QueryExecutor) Stream(callback StreamCallback) error { } defer txConn.Unlock() if qre.setting != nil { - if err = txConn.ApplySetting(qre.ctx, qre.setting); err != nil { + if _, err = txConn.ApplySetting(qre.ctx, qre.setting); err != nil { return vterrors.Wrap(err, "failed to execute system setting on the connection") } } diff --git a/go/vt/vttablet/tabletserver/stateful_connection.go b/go/vt/vttablet/tabletserver/stateful_connection.go index 91d51677241..10fc763984f 100644 --- a/go/vt/vttablet/tabletserver/stateful_connection.go +++ b/go/vt/vttablet/tabletserver/stateful_connection.go @@ -313,11 +313,12 @@ func (sc *StatefulConnection) getUsername() string { return callerid.GetUsername(sc.reservedProps.ImmediateCaller) } -func (sc *StatefulConnection) ApplySetting(ctx context.Context, setting *smartconnpool.Setting) error { +// ApplySetting returns whether the settings where applied or not. It also returns an error, if encountered. +func (sc *StatefulConnection) ApplySetting(ctx context.Context, setting *smartconnpool.Setting) (bool, error) { if sc.dbConn.Conn.Setting() == setting { - return nil + return false, nil } - return sc.dbConn.Conn.ApplySetting(ctx, setting) + return true, sc.dbConn.Conn.ApplySetting(ctx, setting) } func (sc *StatefulConnection) resetExpiryTime() { diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go index d581fb79ae4..1bd7c85b1f3 100644 --- a/go/vt/vttablet/tabletserver/tx_engine.go +++ b/go/vt/vttablet/tabletserver/tx_engine.go @@ -31,6 +31,7 @@ import ( querypb "vitess.io/vitess/go/vt/proto/query" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/servenv" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" @@ -480,9 +481,24 @@ func (te *TxEngine) prepareTx(ctx context.Context, preparedTx *tx.PreparedTx) (f } }() + // We need to check whether the first query is a SET query or not. + // If it is then we need to run it before we begin the transaction because + // some connection settings can't be modified after a transaction has started + // For example - + // mysql> begin; + // Query OK, 0 rows affected (0.00 sec) + // mysql> set @@transaction_isolation="read-committed"; + // ERROR 1568 (25001): Transaction characteristics can't be changed while a transaction is in progress. + var settingsQuery string + firstQuery := preparedTx.Queries[0] + if sqlparser.Preview(firstQuery) == sqlparser.StmtSet { + settingsQuery = firstQuery + preparedTx.Queries = preparedTx.Queries[1:] + } + // We need to redo the prepared transactions using a dba user because MySQL might still be in read only mode. var conn *StatefulConnection - if conn, err = te.beginNewDbaConnection(ctx); err != nil { + if conn, err = te.beginNewDbaConnection(ctx, settingsQuery); err != nil { return } @@ -707,12 +723,19 @@ func (te *TxEngine) Release(connID int64) error { // beginNewDbaConnection gets a new dba connection and starts a transaction in it. // This should only be used to redo prepared transactions. All the other writes should use the normal pool. -func (te *TxEngine) beginNewDbaConnection(ctx context.Context) (*StatefulConnection, error) { +func (te *TxEngine) beginNewDbaConnection(ctx context.Context, settingsQuery string) (*StatefulConnection, error) { dbConn, err := connpool.NewConn(ctx, te.env.Config().DB.DbaWithDB(), nil, nil, te.env) if err != nil { return nil, err } + // If we have a settings query that we need to apply, we do that before starting the transaction. + if settingsQuery != "" { + if _, err = dbConn.ExecOnce(ctx, settingsQuery, 1, false); err != nil { + return nil, err + } + } + sc := &StatefulConnection{ dbConn: &connpool.PooledConn{ Conn: dbConn, diff --git a/go/vt/vttablet/tabletserver/tx_engine_test.go b/go/vt/vttablet/tabletserver/tx_engine_test.go index be2531f1a41..10af5974c34 100644 --- a/go/vt/vttablet/tabletserver/tx_engine_test.go +++ b/go/vt/vttablet/tabletserver/tx_engine_test.go @@ -721,3 +721,90 @@ func TestIsTwoPCAllowed(t *testing.T) { }) } } + +// TestPrepareTx tests prepareTx function in transaction engine. +func TestPrepareTx(t *testing.T) { + testcases := []struct { + name string + preparedTx *tx.PreparedTx + requireFailure bool + errWanted string + queryLogWanted string + }{ + { + name: "Success", + preparedTx: &tx.PreparedTx{ + Queries: []string{ + "insert into vitess_test (intval) values(40)", + "set @@time_zone='+10:30'", + "insert into vitess_test (intval) values(20)", + }, + }, + requireFailure: false, + errWanted: "", + queryLogWanted: "use `fakesqldb`;begin;insert into vitess_test (intval) values(40);set @@time_zone='+10:30';insert into vitess_test (intval) values(20)", + }, + { + name: "Unretryable failure during query", + preparedTx: &tx.PreparedTx{ + Queries: []string{ + "insert into vitess_test (intval) values(40)", + "failing query", + "insert into vitess_test (intval) values(20)", + }, + }, + requireFailure: true, + errWanted: "(errno 1105) (sqlstate HY000)", + }, + { + name: "Retryable failure during query", + preparedTx: &tx.PreparedTx{ + Queries: []string{ + "insert into vitess_test (intval) values(40)", + "retryable query", + "insert into vitess_test (intval) values(20)", + }, + }, + requireFailure: false, + errWanted: "Retryable error (errno 2002) (sqlstate HY000)", + }, + { + name: "Success - Settings query in the beginning", + preparedTx: &tx.PreparedTx{ + Queries: []string{ + "set @@time_zone='+10:30'", + "insert into vitess_test (intval) values(40)", + "insert into vitess_test (intval) values(20)", + }, + }, + requireFailure: false, + errWanted: "", + queryLogWanted: "use `fakesqldb`;set @@time_zone='+10:30';begin;insert into vitess_test (intval) values(40);insert into vitess_test (intval) values(20)", + }, + } + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + db := setUpQueryExecutorTest(t) + defer db.Close() + db.AddQueryPattern(".*", &sqltypes.Result{}) + db.AddRejectedQuery("failing query", assert.AnError) + db.AddRejectedQuery("retryable query", sqlerror.NewSQLError(sqlerror.CRConnectionError, "", "Retryable error")) + cfg := tabletenv.NewDefaultConfig() + cfg.DB = newDBConfigs(db) + cfg.TwoPCEnable = true + cfg.TwoPCAbandonAge = 200 + te := NewTxEngine(tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "TabletServerTest"), nil) + te.AcceptReadWrite() + db.ResetQueryLog() + failed, err := te.prepareTx(context.Background(), tt.preparedTx) + require.EqualValues(t, tt.requireFailure, failed) + if tt.errWanted != "" { + require.ErrorContains(t, err, tt.errWanted) + return + } + require.NoError(t, err) + require.EqualValues(t, 1, len(te.preparedPool.conns)) + require.EqualValues(t, tt.queryLogWanted, db.QueryLog()) + }) + } +} diff --git a/go/vt/vttablet/tabletserver/tx_pool.go b/go/vt/vttablet/tabletserver/tx_pool.go index 6d1f1dec3c2..ca8a0ea34b2 100644 --- a/go/vt/vttablet/tabletserver/tx_pool.go +++ b/go/vt/vttablet/tabletserver/tx_pool.go @@ -268,6 +268,11 @@ func (tp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions, re conn.Release(tx.ConnInitFail) return nil, "", "", err } + // If we have applied any settings on the connection, then we need to record the query + // in case we need to redo the transaction because of a failure. + if setting != nil { + conn.TxProperties().RecordQueryDetail(setting.ApplyQuery(), nil) + } return conn, sql, sessionStateChanges, nil }