From 83b37b8e8fbc2353dd511f0e1223271e5879556c Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Tue, 24 Sep 2024 17:57:52 +0530 Subject: [PATCH] Distributed Transaction: Action on commit prepared or redo prepared failure (#16803) Signed-off-by: Harshit Gangal --- go/vt/sidecardb/schema/twopc/redo_state.sql | 1 + go/vt/vttablet/endtoend/main_test.go | 2 +- go/vt/vttablet/endtoend/twopc/main_test.go | 100 ++++++++++++ go/vt/vttablet/endtoend/twopc/prepare_test.go | 109 +++++++++++++ go/vt/vttablet/tabletserver/dt_executor.go | 29 +--- .../vttablet/tabletserver/dt_executor_test.go | 2 + .../tabletserver/tabletserver_test.go | 11 +- go/vt/vttablet/tabletserver/twopc.go | 18 ++- go/vt/vttablet/tabletserver/twopc_test.go | 32 ++-- go/vt/vttablet/tabletserver/tx/twopc.go | 1 + go/vt/vttablet/tabletserver/tx_engine.go | 151 +++++++++++++----- go/vt/vttablet/tabletserver/tx_engine_test.go | 75 +++++++++ 12 files changed, 444 insertions(+), 87 deletions(-) create mode 100644 go/vt/vttablet/endtoend/twopc/main_test.go create mode 100644 go/vt/vttablet/endtoend/twopc/prepare_test.go diff --git a/go/vt/sidecardb/schema/twopc/redo_state.sql b/go/vt/sidecardb/schema/twopc/redo_state.sql index 975124e0320..58e250e435e 100644 --- a/go/vt/sidecardb/schema/twopc/redo_state.sql +++ b/go/vt/sidecardb/schema/twopc/redo_state.sql @@ -18,5 +18,6 @@ CREATE TABLE IF NOT EXISTS redo_state( dtid varbinary(512) NOT NULL, state bigint NOT NULL, time_created bigint NOT NULL, + message text, primary key(dtid) ) ENGINE = InnoDB CHARSET = utf8mb4 diff --git a/go/vt/vttablet/endtoend/main_test.go b/go/vt/vttablet/endtoend/main_test.go index 1284f790b93..eedd893f3eb 100644 --- a/go/vt/vttablet/endtoend/main_test.go +++ b/go/vt/vttablet/endtoend/main_test.go @@ -343,7 +343,7 @@ var tableACLConfig = `{ }, { "name": "vitess_twopc", - "table_names_or_prefixes": ["dt_state"], + "table_names_or_prefixes": ["dt_state", "redo_state"], "readers": ["dev"], "writers": ["dev"], "admins": ["dev"] diff --git a/go/vt/vttablet/endtoend/twopc/main_test.go b/go/vt/vttablet/endtoend/twopc/main_test.go new file mode 100644 index 00000000000..090751503d4 --- /dev/null +++ b/go/vt/vttablet/endtoend/twopc/main_test.go @@ -0,0 +1,100 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endtoend + +import ( + "context" + "flag" + "fmt" + "os" + "testing" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/vttablet/endtoend/framework" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttest" + + vttestpb "vitess.io/vitess/go/vt/proto/vttest" +) + +var ( + connParams mysql.ConnParams + connAppDebugParams mysql.ConnParams + cluster vttest.LocalCluster +) + +func TestMain(m *testing.M) { + flag.Parse() // Do not remove this comment, import into google3 depends on it + tabletenv.Init() + + exitCode := func() int { + // Launch MySQL. + // We need a Keyspace in the topology, so the DbName is set. + // We need a Shard too, so the database 'vttest' is created. + cfg := vttest.Config{ + Topology: &vttestpb.VTTestTopology{ + Keyspaces: []*vttestpb.Keyspace{ + { + Name: "vttest", + Shards: []*vttestpb.Shard{ + { + Name: "0", + DbNameOverride: "vttest", + }, + }, + }, + }, + }, + OnlyMySQL: true, + Charset: "utf8mb4_general_ci", + } + if err := cfg.InitSchemas("vttest", testSchema, nil); err != nil { + fmt.Fprintf(os.Stderr, "InitSchemas failed: %v\n", err) + return 1 + } + defer os.RemoveAll(cfg.SchemaDir) + cluster = vttest.LocalCluster{ + Config: cfg, + } + if err := cluster.Setup(); err != nil { + fmt.Fprintf(os.Stderr, "could not launch mysql: %v\n", err) + return 1 + } + + defer cluster.TearDown() + + connParams = cluster.MySQLConnParams() + connAppDebugParams = cluster.MySQLAppDebugConnParams() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + config := tabletenv.NewDefaultConfig() + config.TwoPCEnable = true + config.TwoPCAbandonAge = 1 + err := framework.StartCustomServer(ctx, connParams, connAppDebugParams, cluster.DbName(), config) + if err != nil { + fmt.Fprintf(os.Stderr, "%v", err) + return 1 + } + defer framework.StopServer() + + return m.Run() + }() + os.Exit(exitCode) +} + +var testSchema = `create table vitess_test(intval int default 0 primary key);` diff --git a/go/vt/vttablet/endtoend/twopc/prepare_test.go b/go/vt/vttablet/endtoend/twopc/prepare_test.go new file mode 100644 index 00000000000..ae07356d6dc --- /dev/null +++ b/go/vt/vttablet/endtoend/twopc/prepare_test.go @@ -0,0 +1,109 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endtoend + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/vttablet/endtoend/framework" +) + +// TestCommitPreparedFailNonRetryable tests the case where the commit_prepared fails trying to acquire update lock. +// The transaction updates to failed state. +func TestCommitPreparedFailNonRetryable(t *testing.T) { + dbaConnector := framework.Server.Config().DB.DbaWithDB() + conn, err := dbaConnector.Connect(context.Background()) + require.NoError(t, err) + defer conn.Close() + + _, err = conn.ExecuteFetch("set global innodb_lock_wait_timeout = 1", 1, false) + require.NoError(t, err) + defer conn.ExecuteFetch("set global innodb_lock_wait_timeout = default", 1, false) + + client := framework.NewClient() + defer client.RollbackPrepared("bb", client.TransactionID()) + + _, err = client.BeginExecute(`insert into vitess_test (intval) values(50)`, nil, nil) + require.NoError(t, err) + err = client.Prepare("bb") + require.NoError(t, err) + + client2 := framework.NewClient() + _, err = client2.BeginExecute(`select * from _vt.redo_state where dtid = 'bb' for update`, nil, nil) + require.NoError(t, err) + + ch := make(chan any) + go func() { + err := client.CommitPrepared("bb") + ch <- nil + require.ErrorContains(t, err, "commit_prepared") + }() + time.Sleep(1500 * time.Millisecond) + + client2.Release() + <-ch + + qr, err := client2.Execute("select dtid, state, message from _vt.redo_state where dtid = 'bb'", nil) + require.NoError(t, err) + require.Equal(t, `[[VARBINARY("bb") INT64(0) TEXT("Lock wait timeout exceeded; try restarting transaction (errno 1205) (sqlstate HY000) during query: delete from _vt.redo_state where dtid = 'bb'")]]`, fmt.Sprintf("%v", qr.Rows)) +} + +// TestCommitPreparedFailRetryable tests the case where the commit_prepared fails when the query is killed. +// The transaction remains in the prepare state. +func TestCommitPreparedFailRetryable(t *testing.T) { + client := framework.NewClient() + defer client.RollbackPrepared("aa", client.TransactionID()) + + _, err := client.BeginExecute(`insert into vitess_test (intval) values(40)`, nil, nil) + require.NoError(t, err) + connRes, err := client.Execute(`select connection_id()`, nil) + require.NoError(t, err) + err = client.Prepare("aa") + require.NoError(t, err) + + client2 := framework.NewClient() + _, err = client2.BeginExecute(`select * from _vt.redo_state where dtid = 'aa' for update`, nil, nil) + require.NoError(t, err) + + ch := make(chan any) + go func() { + err := client.CommitPrepared("aa") + ch <- nil + require.ErrorContains(t, err, "commit_prepared") + }() + time.Sleep(100 * time.Millisecond) + + dbaConnector := framework.Server.Config().DB.DbaWithDB() + conn, err := dbaConnector.Connect(context.Background()) + require.NoError(t, err) + defer conn.Close() + + _, err = conn.ExecuteFetch(fmt.Sprintf("kill query %s", connRes.Rows[0][0].ToString()), 1, false) + require.NoError(t, err) + + client2.Release() + <-ch + + qr, err := client2.Execute("select dtid, state, message from _vt.redo_state where dtid = 'aa'", nil) + require.NoError(t, err) + require.Equal(t, `[[VARBINARY("aa") INT64(1) TEXT("Query execution was interrupted (errno 1317) (sqlstate 70100) during query: delete from _vt.redo_state where dtid = 'aa'")]]`, fmt.Sprintf("%v", qr.Rows)) +} diff --git a/go/vt/vttablet/tabletserver/dt_executor.go b/go/vt/vttablet/tabletserver/dt_executor.go index edeec50a54d..b14e4d65d16 100644 --- a/go/vt/vttablet/tabletserver/dt_executor.go +++ b/go/vt/vttablet/tabletserver/dt_executor.go @@ -157,8 +157,8 @@ func (dte *DTExecutor) CommitPrepared(dtid string) (err error) { ctx := trace.CopySpan(context.Background(), dte.ctx) defer func() { if err != nil { - dte.markFailed(ctx, dtid) log.Warningf("failed to commit the prepared transaction '%s' with error: %v", dtid, err) + dte.te.checkErrorAndMarkFailed(ctx, dtid, err, "TwopcCommit") } dte.te.txPool.RollbackAndRelease(ctx, conn) }() @@ -172,33 +172,6 @@ func (dte *DTExecutor) CommitPrepared(dtid string) (err error) { return nil } -// markFailed does the necessary work to mark a CommitPrepared -// as failed. It marks the dtid as failed in the prepared pool, -// increments the InternalErros counter, and also changes the -// state of the transaction in the redo log as failed. If the -// state change does not succeed, it just logs the event. -// The function uses the passed in context that has no timeout -// instead of DTExecutor's context. -func (dte *DTExecutor) markFailed(ctx context.Context, dtid string) { - dte.te.env.Stats().InternalErrors.Add("TwopcCommit", 1) - dte.te.preparedPool.SetFailed(dtid) - conn, _, _, err := dte.te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) - if err != nil { - log.Errorf("markFailed: Begin failed for dtid %s: %v", dtid, err) - return - } - defer dte.te.txPool.RollbackAndRelease(ctx, conn) - - if err = dte.te.twoPC.UpdateRedo(ctx, conn, dtid, RedoStateFailed); err != nil { - log.Errorf("markFailed: UpdateRedo failed for dtid %s: %v", dtid, err) - return - } - - if _, err = dte.te.txPool.Commit(ctx, conn); err != nil { - log.Errorf("markFailed: Commit failed for dtid %s: %v", dtid, err) - } -} - // RollbackPrepared rolls back a prepared transaction. This function handles // the case of an incomplete prepare. // diff --git a/go/vt/vttablet/tabletserver/dt_executor_test.go b/go/vt/vttablet/tabletserver/dt_executor_test.go index b13a2a60b67..bc497b070e6 100644 --- a/go/vt/vttablet/tabletserver/dt_executor_test.go +++ b/go/vt/vttablet/tabletserver/dt_executor_test.go @@ -28,6 +28,7 @@ import ( "time" "vitess.io/vitess/go/event/syslogger" + "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/rules" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" @@ -718,6 +719,7 @@ func newTestTxExecutor(t *testing.T, ctx context.Context) (txe *DTExecutor, tsv db.AddQuery("delete from _vt.redo_state where dtid = 'aa'", &sqltypes.Result{}) db.AddQuery("delete from _vt.redo_statement where dtid = 'aa'", &sqltypes.Result{}) db.AddQuery("update test_table set `name` = 2 where pk = 1 limit 10001", &sqltypes.Result{}) + db.AddRejectedQuery("bogus", sqlerror.NewSQLError(sqlerror.ERUnknownError, sqlerror.SSUnknownSQLState, "bogus query")) return &DTExecutor{ ctx: ctx, logStats: logStats, diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index 6e589acb907..97d948a4d0a 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -229,12 +229,14 @@ func TestTabletServerRedoLogIsKeptBetweenRestarts(t *testing.T) { {Type: sqltypes.Uint64}, {Type: sqltypes.Uint64}, {Type: sqltypes.VarBinary}, + {Type: sqltypes.Text}, }, Rows: [][]sqltypes.Value{{ sqltypes.NewVarBinary("dtid0"), sqltypes.NewInt64(RedoStatePrepared), sqltypes.NewVarBinary(""), sqltypes.NewVarBinary("update test_table set `name` = 2 where pk = 1 limit 10001"), + sqltypes.NULL, }}, }) turnOnTxEngine() @@ -255,22 +257,26 @@ func TestTabletServerRedoLogIsKeptBetweenRestarts(t *testing.T) { {Type: sqltypes.Uint64}, {Type: sqltypes.Uint64}, {Type: sqltypes.VarBinary}, + {Type: sqltypes.Text}, }, Rows: [][]sqltypes.Value{{ sqltypes.NewVarBinary("bogus"), sqltypes.NewInt64(RedoStatePrepared), sqltypes.NewVarBinary(""), sqltypes.NewVarBinary("bogus"), + sqltypes.NULL, }, { sqltypes.NewVarBinary("a:b:10"), sqltypes.NewInt64(RedoStatePrepared), sqltypes.NewVarBinary(""), sqltypes.NewVarBinary("update test_table set `name` = 2 where pk = 1 limit 10001"), + sqltypes.NULL, }, { sqltypes.NewVarBinary("a:b:20"), sqltypes.NewInt64(RedoStateFailed), sqltypes.NewVarBinary(""), sqltypes.NewVarBinary("unused"), + sqltypes.TestValue(sqltypes.Text, "deadlock detected, transaction rolled back"), }}, }) turnOnTxEngine() @@ -280,7 +286,10 @@ func TestTabletServerRedoLogIsKeptBetweenRestarts(t *testing.T) { Sql: "update test_table set `name` = 2 where pk = 1 limit 10001", Tables: []string{"test_table"}}} utils.MustMatch(t, want, got, "Prepared queries") - wantFailed := map[string]error{"a:b:20": errPrepFailed} + wantFailed := map[string]error{ + "bogus": errPrepFailed, // The query is rejected by database so added to failed list. + "a:b:20": errPrepFailed, // The DTID is already in failed state. + } utils.MustMatch(t, tsv.te.preparedPool.reserved, wantFailed, fmt.Sprintf("Failed dtids: %v, want %v", tsv.te.preparedPool.reserved, wantFailed)) // Verify last id got adjusted. assert.EqualValues(t, 20, tsv.te.txPool.scp.lastID.Load(), "tsv.te.txPool.lastID.Get()") diff --git a/go/vt/vttablet/tabletserver/twopc.go b/go/vt/vttablet/tabletserver/twopc.go index 868ffff2b3d..5eff30ce07e 100644 --- a/go/vt/vttablet/tabletserver/twopc.go +++ b/go/vt/vttablet/tabletserver/twopc.go @@ -47,7 +47,7 @@ const ( // DTStateRollback represents the ROLLBACK state for dt_state. DTStateRollback = querypb.TransactionState_ROLLBACK - readAllRedo = `select t.dtid, t.state, t.time_created, s.statement + readAllRedo = `select t.dtid, t.state, t.time_created, s.statement, t.message from %s.redo_state t join %s.redo_statement s on t.dtid = s.dtid order by t.dtid, s.id` @@ -109,8 +109,8 @@ func (tpc *TwoPC) initializeQueries() { "insert into %s.redo_statement(dtid, id, statement) values %a", dbname, ":vals") tpc.updateRedoTx = sqlparser.BuildParsedQuery( - "update %s.redo_state set state = %a where dtid = %a", - dbname, ":state", ":dtid") + "update %s.redo_state set state = %a, message = %a where dtid = %a", + dbname, ":state", ":message", ":dtid") tpc.deleteRedoTx = sqlparser.BuildParsedQuery( "delete from %s.redo_state where dtid = %a", dbname, ":dtid") @@ -200,10 +200,11 @@ func (tpc *TwoPC) SaveRedo(ctx context.Context, conn *StatefulConnection, dtid s } // UpdateRedo changes the state of the redo log for the dtid. -func (tpc *TwoPC) UpdateRedo(ctx context.Context, conn *StatefulConnection, dtid string, state int) error { +func (tpc *TwoPC) UpdateRedo(ctx context.Context, conn *StatefulConnection, dtid string, state int, message string) error { bindVars := map[string]*querypb.BindVariable{ - "dtid": sqltypes.StringBindVariable(dtid), - "state": sqltypes.Int64BindVariable(int64(state)), + "dtid": sqltypes.StringBindVariable(dtid), + "state": sqltypes.Int64BindVariable(int64(state)), + "message": sqltypes.StringBindVariable(message), } _, err := tpc.exec(ctx, conn, tpc.updateRedoTx, bindVars) return err @@ -244,8 +245,9 @@ func (tpc *TwoPC) ReadAllRedo(ctx context.Context) (prepared, failed []*tx.Prepa // which is harmless. tm, _ := row[2].ToCastInt64() curTx = &tx.PreparedTx{ - Dtid: dtid, - Time: time.Unix(0, tm), + Dtid: dtid, + Time: time.Unix(0, tm), + Message: row[4].ToString(), } st, err := row[1].ToCastInt64() if err != nil { diff --git a/go/vt/vttablet/tabletserver/twopc_test.go b/go/vt/vttablet/tabletserver/twopc_test.go index 7375a4f2e7c..0dc10266e8c 100644 --- a/go/vt/vttablet/tabletserver/twopc_test.go +++ b/go/vt/vttablet/tabletserver/twopc_test.go @@ -66,12 +66,14 @@ func TestReadAllRedo(t *testing.T) { {Type: sqltypes.Int64}, {Type: sqltypes.Int64}, {Type: sqltypes.VarChar}, + {Type: sqltypes.Text}, }, Rows: [][]sqltypes.Value{{ sqltypes.NewVarBinary("dtid0"), sqltypes.NewInt64(RedoStatePrepared), sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("stmt01"), + sqltypes.NULL, }}, }) prepared, failed, err = tpc.ReadAllRedo(ctx) @@ -96,17 +98,20 @@ func TestReadAllRedo(t *testing.T) { {Type: sqltypes.Int64}, {Type: sqltypes.Int64}, {Type: sqltypes.VarChar}, + {Type: sqltypes.Text}, }, Rows: [][]sqltypes.Value{{ sqltypes.NewVarBinary("dtid0"), sqltypes.NewInt64(RedoStatePrepared), sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("stmt01"), + sqltypes.NULL, }, { sqltypes.NewVarBinary("dtid0"), sqltypes.NewInt64(RedoStatePrepared), sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("stmt02"), + sqltypes.NULL, }}, }) prepared, failed, err = tpc.ReadAllRedo(ctx) @@ -131,22 +136,26 @@ func TestReadAllRedo(t *testing.T) { {Type: sqltypes.Int64}, {Type: sqltypes.Int64}, {Type: sqltypes.VarChar}, + {Type: sqltypes.Text}, }, Rows: [][]sqltypes.Value{{ sqltypes.NewVarBinary("dtid0"), sqltypes.NewInt64(RedoStatePrepared), sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("stmt01"), + sqltypes.NULL, }, { sqltypes.NewVarBinary("dtid0"), sqltypes.NewInt64(RedoStatePrepared), sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("stmt02"), + sqltypes.NULL, }, { sqltypes.NewVarBinary("dtid1"), sqltypes.NewInt64(RedoStatePrepared), sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("stmt11"), + sqltypes.NULL, }}, }) prepared, failed, err = tpc.ReadAllRedo(ctx) @@ -175,37 +184,44 @@ func TestReadAllRedo(t *testing.T) { {Type: sqltypes.Int64}, {Type: sqltypes.Int64}, {Type: sqltypes.VarChar}, + {Type: sqltypes.Text}, }, Rows: [][]sqltypes.Value{{ sqltypes.NewVarBinary("dtid0"), sqltypes.NewInt64(RedoStatePrepared), sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("stmt01"), + sqltypes.NULL, }, { sqltypes.NewVarBinary("dtid0"), sqltypes.NewInt64(RedoStatePrepared), sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("stmt02"), + sqltypes.NULL, }, { sqltypes.NewVarBinary("dtid1"), - sqltypes.NewVarBinary("Failed"), + sqltypes.NewInt64(RedoStateFailed), sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("stmt11"), + sqltypes.TestValue(sqltypes.Text, "error1"), }, { sqltypes.NewVarBinary("dtid2"), - sqltypes.NewVarBinary("Failed"), + sqltypes.NewInt64(RedoStateFailed), sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("stmt21"), + sqltypes.TestValue(sqltypes.Text, "error2"), }, { sqltypes.NewVarBinary("dtid2"), - sqltypes.NewVarBinary("Failed"), + sqltypes.NewInt64(RedoStateFailed), sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("stmt22"), + sqltypes.TestValue(sqltypes.Text, "error2"), }, { sqltypes.NewVarBinary("dtid3"), sqltypes.NewInt64(RedoStatePrepared), sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("stmt31"), + sqltypes.NULL, }}, }) prepared, failed, err = tpc.ReadAllRedo(ctx) @@ -221,21 +237,19 @@ func TestReadAllRedo(t *testing.T) { Queries: []string{"stmt31"}, Time: time.Unix(0, 1), }} - if !reflect.DeepEqual(prepared, want) { - t.Errorf("ReadAllRedo: %s, want %s", jsonStr(prepared), jsonStr(want)) - } + utils.MustMatch(t, want, prepared) wantFailed := []*tx.PreparedTx{{ Dtid: "dtid1", Queries: []string{"stmt11"}, Time: time.Unix(0, 1), + Message: "error1", }, { Dtid: "dtid2", Queries: []string{"stmt21", "stmt22"}, Time: time.Unix(0, 1), + Message: "error2", }} - if !reflect.DeepEqual(failed, wantFailed) { - t.Errorf("ReadAllRedo failed): %s, want %s", jsonStr(failed), jsonStr(wantFailed)) - } + utils.MustMatch(t, wantFailed, failed) } func TestReadAllTransactions(t *testing.T) { diff --git a/go/vt/vttablet/tabletserver/tx/twopc.go b/go/vt/vttablet/tabletserver/tx/twopc.go index 56cfbd1a51f..6412fc53b4d 100644 --- a/go/vt/vttablet/tabletserver/tx/twopc.go +++ b/go/vt/vttablet/tabletserver/tx/twopc.go @@ -36,4 +36,5 @@ type PreparedTx struct { Dtid string Queries []string Time time.Time + Message string } diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go index d7f2d55b18a..549851790ed 100644 --- a/go/vt/vttablet/tabletserver/tx_engine.go +++ b/go/vt/vttablet/tabletserver/tx_engine.go @@ -22,10 +22,10 @@ import ( "sync" "time" + "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/pools/smartconnpool" "vitess.io/vitess/go/timer" "vitess.io/vitess/go/trace" - "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/dtids" "vitess.io/vitess/go/vt/log" querypb "vitess.io/vitess/go/vt/proto/query" @@ -411,58 +411,129 @@ func (te *TxEngine) shutdownLocked() { // to ensure there are no future collisions. func (te *TxEngine) prepareFromRedo() error { ctx := tabletenv.LocalContext() - var allErr concurrency.AllErrorRecorder - prepared, failed, err := te.twoPC.ReadAllRedo(ctx) - if err != nil { - return err + + prepared, failed, readErr := te.twoPC.ReadAllRedo(ctx) + if readErr != nil { + return readErr } - maxid := int64(0) -outer: + var ( + maxID = int64(0) + preparedCounter = 0 + failedCounter = len(failed) + allErrs []error + ) + + // While going through the prepared transaction. + // We will extract the transaction ID from the dtid and + // update the last transaction ID to max value to avoid any collision with the new transactions. + for _, preparedTx := range prepared { - txid, err := dtids.TransactionID(preparedTx.Dtid) - if err != nil { - log.Errorf("Error extracting transaction ID from dtid: %v", err) - } - if txid > maxid { - maxid = txid + txID, _ := dtids.TransactionID(preparedTx.Dtid) + if txID > maxID { + maxID = txID } - // We need to redo the prepared transactions using a dba user because MySQL might still be in read only mode. - conn, err := te.beginNewDbaConnection(ctx) + + prepFailed, err := te.prepareTx(ctx, preparedTx) if err != nil { - allErr.RecordError(vterrors.Wrapf(err, "dtid - %v", preparedTx.Dtid)) - continue - } - for _, stmt := range preparedTx.Queries { - conn.TxProperties().RecordQuery(stmt, te.env.Environment().Parser()) - _, err := conn.Exec(ctx, stmt, 1, false) - if err != nil { - allErr.RecordError(vterrors.Wrapf(err, "dtid - %v", preparedTx.Dtid)) - te.txPool.RollbackAndRelease(ctx, conn) - continue outer + allErrs = append(allErrs, vterrors.Wrapf(err, "dtid - %v", preparedTx.Dtid)) + if prepFailed { + failedCounter++ } - } - // We should not use the external Prepare because - // we don't want to write again to the redo log. - err = te.preparedPool.Put(conn, preparedTx.Dtid) - if err != nil { - allErr.RecordError(vterrors.Wrapf(err, "dtid - %v", preparedTx.Dtid)) - continue + } else { + preparedCounter++ } } + for _, preparedTx := range failed { - txid, err := dtids.TransactionID(preparedTx.Dtid) + txID, _ := dtids.TransactionID(preparedTx.Dtid) + if txID > maxID { + maxID = txID + } + te.preparedPool.SetFailed(preparedTx.Dtid) + } + + te.txPool.AdjustLastID(maxID) + log.Infof("TwoPC: Prepared %d transactions, and registered %d failures.", preparedCounter, failedCounter) + return vterrors.Aggregate(allErrs) +} + +func (te *TxEngine) prepareTx(ctx context.Context, preparedTx *tx.PreparedTx) (failed bool, err error) { + defer func() { if err != nil { - log.Errorf("Error extracting transaction ID from dtid: %v", err) + failed = te.checkErrorAndMarkFailed(ctx, preparedTx.Dtid, err, "TwopcPrepareRedo") } - if txid > maxid { - maxid = txid + }() + + // We need to redo the prepared transactions using a dba user because MySQL might still be in read only mode. + var conn *StatefulConnection + if conn, err = te.beginNewDbaConnection(ctx); err != nil { + return + } + + for _, stmt := range preparedTx.Queries { + conn.TxProperties().RecordQuery(stmt, te.env.Environment().Parser()) + if _, err = conn.Exec(ctx, stmt, 1, false); err != nil { + te.txPool.RollbackAndRelease(ctx, conn) + return } - te.preparedPool.SetFailed(preparedTx.Dtid) } - te.txPool.AdjustLastID(maxid) - log.Infof("TwoPC: Prepared %d transactions, and registered %d failures.", len(prepared), len(failed)) - return allErr.Error() + // We should not use the external Prepare because + // we don't want to write again to the redo log. + err = te.preparedPool.Put(conn, preparedTx.Dtid) + return +} + +// checkErrorAndMarkFailed check that the error is retryable or non-retryable error. +// If it is a non-retryable error than it marks the dtid as failed in the prepared pool, +// increments the InternalErrors counter, and also changes the state of the transaction in the redo log as failed. +func (te *TxEngine) checkErrorAndMarkFailed(ctx context.Context, dtid string, receivedErr error, metricName string) (fail bool) { + state := RedoStateFailed + if isRetryableError(receivedErr) { + log.Infof("retryable error for dtid: %s", dtid) + state = RedoStatePrepared + } else { + fail = true + te.env.Stats().InternalErrors.Add(metricName, 1) + te.preparedPool.SetFailed(dtid) + } + + // Update the state of the transaction in the redo log. + // Retryable Error: Update the message with error message. + // Non-retryable Error: Along with message, update the state as RedoStateFailed. + conn, _, _, err := te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) + if err != nil { + log.Errorf("markFailed: Begin failed for dtid %s: %v", dtid, err) + return + } + defer te.txPool.RollbackAndRelease(ctx, conn) + + if err = te.twoPC.UpdateRedo(ctx, conn, dtid, state, receivedErr.Error()); err != nil { + log.Errorf("markFailed: UpdateRedo failed for dtid %s: %v", dtid, err) + return + } + + if _, err = te.txPool.Commit(ctx, conn); err != nil { + log.Errorf("markFailed: Commit failed for dtid %s: %v", dtid, err) + } + return +} + +func isRetryableError(err error) bool { + switch vterrors.Code(err) { + case vtrpcpb.Code_OK, + vtrpcpb.Code_DEADLINE_EXCEEDED, + vtrpcpb.Code_CANCELED, + vtrpcpb.Code_UNAVAILABLE: + return true + case vtrpcpb.Code_UNKNOWN: + // If the error is unknown, convert to SQL Error. + sqlErr := sqlerror.NewSQLErrorFromError(err) + // Connection errors are retryable + return sqlerror.IsConnErr(sqlErr) + default: + return false + } } // shutdownTransactions rolls back all open transactions that are idol. diff --git a/go/vt/vttablet/tabletserver/tx_engine_test.go b/go/vt/vttablet/tabletserver/tx_engine_test.go index 95057d754fb..43916dab3c2 100644 --- a/go/vt/vttablet/tabletserver/tx_engine_test.go +++ b/go/vt/vttablet/tabletserver/tx_engine_test.go @@ -25,7 +25,10 @@ import ( "testing" "time" + "vitess.io/vitess/go/mysql/sqlerror" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vtenv" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletserver/tx" "github.com/stretchr/testify/assert" @@ -603,3 +606,75 @@ func TestTxEngineFailReserve(t *testing.T) { require.Error(t, err) assert.Zero(t, connID) } + +func TestCheckReceivedError(t *testing.T) { + db := setUpQueryExecutorTest(t) + defer db.Close() + cfg := tabletenv.NewDefaultConfig() + cfg.DB = newDBConfigs(db) + env := tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "TabletServerTest") + env.Config().TwoPCEnable = true + env.Config().TwoPCAbandonAge = 5 + te := NewTxEngine(env, nil) + te.AcceptReadWrite() + + tcases := []struct { + receivedErr error + retryable bool + expQuery string + }{{ + receivedErr: vterrors.New(vtrpcpb.Code_DEADLINE_EXCEEDED, "deadline exceeded"), + retryable: true, + expQuery: `update _vt.redo_state set state = 1, message = 'deadline exceeded' where dtid = 'aa'`, + }, { + receivedErr: vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "invalid argument"), + retryable: false, + expQuery: `update _vt.redo_state set state = 0, message = 'invalid argument' where dtid = 'aa'`, + }, { + receivedErr: sqlerror.NewSQLError(sqlerror.ERLockDeadlock, sqlerror.SSLockDeadlock, "Deadlock found when trying to get lock; try restarting transaction"), + retryable: false, + expQuery: `update _vt.redo_state set state = 0, message = 'Deadlock found when trying to get lock; try restarting transaction (errno 1213) (sqlstate 40001)' where dtid = 'aa'`, + }, { + receivedErr: context.DeadlineExceeded, + retryable: true, + expQuery: `update _vt.redo_state set state = 1, message = 'context deadline exceeded' where dtid = 'aa'`, + }, { + receivedErr: context.Canceled, + retryable: true, + expQuery: `update _vt.redo_state set state = 1, message = 'context canceled' where dtid = 'aa'`, + }, { + receivedErr: sqlerror.NewSQLError(sqlerror.CRServerLost, sqlerror.SSUnknownSQLState, "Lost connection to MySQL server during query"), + retryable: true, + expQuery: `update _vt.redo_state set state = 1, message = 'Lost connection to MySQL server during query (errno 2013) (sqlstate HY000)' where dtid = 'aa'`, + }, { + receivedErr: sqlerror.NewSQLError(sqlerror.CRMalformedPacket, sqlerror.SSUnknownSQLState, "Malformed packet"), + retryable: false, + expQuery: `update _vt.redo_state set state = 0, message = 'Malformed packet (errno 2027) (sqlstate HY000)' where dtid = 'aa'`, + }, { + receivedErr: sqlerror.NewSQLError(sqlerror.CRServerGone, sqlerror.SSUnknownSQLState, "Server has gone away"), + retryable: true, + expQuery: `update _vt.redo_state set state = 1, message = 'Server has gone away (errno 2006) (sqlstate HY000)' where dtid = 'aa'`, + }, { + receivedErr: vterrors.New(vtrpcpb.Code_ABORTED, "Row count exceeded"), + retryable: false, + expQuery: `update _vt.redo_state set state = 0, message = 'Row count exceeded' where dtid = 'aa'`, + }, { + receivedErr: errors.New("(errno 2013) (sqlstate HY000) lost connection"), + retryable: true, + expQuery: `update _vt.redo_state set state = 1, message = '(errno 2013) (sqlstate HY000) lost connection' where dtid = 'aa'`, + }} + + for _, tc := range tcases { + t.Run(tc.receivedErr.Error(), func(t *testing.T) { + if tc.expQuery != "" { + db.AddQuery(tc.expQuery, &sqltypes.Result{}) + } + nonRetryable := te.checkErrorAndMarkFailed(context.Background(), "aa", tc.receivedErr, "") + require.NotEqual(t, tc.retryable, nonRetryable) + if !tc.retryable { + require.Equal(t, errPrepFailed, te.preparedPool.reserved["aa"]) + } + delete(te.preparedPool.reserved, "aa") + }) + } +}