Skip to content

Commit

Permalink
Distributed Transaction: Action on commit prepared or redo prepared f…
Browse files Browse the repository at this point in the history
…ailure (vitessio#16803)

Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal authored Sep 24, 2024
1 parent 670e141 commit 83b37b8
Show file tree
Hide file tree
Showing 12 changed files with 444 additions and 87 deletions.
1 change: 1 addition & 0 deletions go/vt/sidecardb/schema/twopc/redo_state.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ CREATE TABLE IF NOT EXISTS redo_state(
dtid varbinary(512) NOT NULL,
state bigint NOT NULL,
time_created bigint NOT NULL,
message text,
primary key(dtid)
) ENGINE = InnoDB CHARSET = utf8mb4
2 changes: 1 addition & 1 deletion go/vt/vttablet/endtoend/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ var tableACLConfig = `{
},
{
"name": "vitess_twopc",
"table_names_or_prefixes": ["dt_state"],
"table_names_or_prefixes": ["dt_state", "redo_state"],
"readers": ["dev"],
"writers": ["dev"],
"admins": ["dev"]
Expand Down
100 changes: 100 additions & 0 deletions go/vt/vttablet/endtoend/twopc/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package endtoend

import (
"context"
"flag"
"fmt"
"os"
"testing"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/vttablet/endtoend/framework"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttest"

vttestpb "vitess.io/vitess/go/vt/proto/vttest"
)

var (
connParams mysql.ConnParams
connAppDebugParams mysql.ConnParams
cluster vttest.LocalCluster
)

func TestMain(m *testing.M) {
flag.Parse() // Do not remove this comment, import into google3 depends on it
tabletenv.Init()

exitCode := func() int {
// Launch MySQL.
// We need a Keyspace in the topology, so the DbName is set.
// We need a Shard too, so the database 'vttest' is created.
cfg := vttest.Config{
Topology: &vttestpb.VTTestTopology{
Keyspaces: []*vttestpb.Keyspace{
{
Name: "vttest",
Shards: []*vttestpb.Shard{
{
Name: "0",
DbNameOverride: "vttest",
},
},
},
},
},
OnlyMySQL: true,
Charset: "utf8mb4_general_ci",
}
if err := cfg.InitSchemas("vttest", testSchema, nil); err != nil {
fmt.Fprintf(os.Stderr, "InitSchemas failed: %v\n", err)
return 1
}
defer os.RemoveAll(cfg.SchemaDir)
cluster = vttest.LocalCluster{
Config: cfg,
}
if err := cluster.Setup(); err != nil {
fmt.Fprintf(os.Stderr, "could not launch mysql: %v\n", err)
return 1
}

defer cluster.TearDown()

connParams = cluster.MySQLConnParams()
connAppDebugParams = cluster.MySQLAppDebugConnParams()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

config := tabletenv.NewDefaultConfig()
config.TwoPCEnable = true
config.TwoPCAbandonAge = 1
err := framework.StartCustomServer(ctx, connParams, connAppDebugParams, cluster.DbName(), config)
if err != nil {
fmt.Fprintf(os.Stderr, "%v", err)
return 1
}
defer framework.StopServer()

return m.Run()
}()
os.Exit(exitCode)
}

var testSchema = `create table vitess_test(intval int default 0 primary key);`
109 changes: 109 additions & 0 deletions go/vt/vttablet/endtoend/twopc/prepare_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package endtoend

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/vttablet/endtoend/framework"
)

// TestCommitPreparedFailNonRetryable tests the case where the commit_prepared fails trying to acquire update lock.
// The transaction updates to failed state.
func TestCommitPreparedFailNonRetryable(t *testing.T) {
dbaConnector := framework.Server.Config().DB.DbaWithDB()
conn, err := dbaConnector.Connect(context.Background())
require.NoError(t, err)
defer conn.Close()

_, err = conn.ExecuteFetch("set global innodb_lock_wait_timeout = 1", 1, false)
require.NoError(t, err)
defer conn.ExecuteFetch("set global innodb_lock_wait_timeout = default", 1, false)

client := framework.NewClient()
defer client.RollbackPrepared("bb", client.TransactionID())

_, err = client.BeginExecute(`insert into vitess_test (intval) values(50)`, nil, nil)
require.NoError(t, err)
err = client.Prepare("bb")
require.NoError(t, err)

client2 := framework.NewClient()
_, err = client2.BeginExecute(`select * from _vt.redo_state where dtid = 'bb' for update`, nil, nil)
require.NoError(t, err)

ch := make(chan any)
go func() {
err := client.CommitPrepared("bb")
ch <- nil
require.ErrorContains(t, err, "commit_prepared")
}()
time.Sleep(1500 * time.Millisecond)

client2.Release()
<-ch

qr, err := client2.Execute("select dtid, state, message from _vt.redo_state where dtid = 'bb'", nil)
require.NoError(t, err)
require.Equal(t, `[[VARBINARY("bb") INT64(0) TEXT("Lock wait timeout exceeded; try restarting transaction (errno 1205) (sqlstate HY000) during query: delete from _vt.redo_state where dtid = 'bb'")]]`, fmt.Sprintf("%v", qr.Rows))
}

// TestCommitPreparedFailRetryable tests the case where the commit_prepared fails when the query is killed.
// The transaction remains in the prepare state.
func TestCommitPreparedFailRetryable(t *testing.T) {
client := framework.NewClient()
defer client.RollbackPrepared("aa", client.TransactionID())

_, err := client.BeginExecute(`insert into vitess_test (intval) values(40)`, nil, nil)
require.NoError(t, err)
connRes, err := client.Execute(`select connection_id()`, nil)
require.NoError(t, err)
err = client.Prepare("aa")
require.NoError(t, err)

client2 := framework.NewClient()
_, err = client2.BeginExecute(`select * from _vt.redo_state where dtid = 'aa' for update`, nil, nil)
require.NoError(t, err)

ch := make(chan any)
go func() {
err := client.CommitPrepared("aa")
ch <- nil
require.ErrorContains(t, err, "commit_prepared")
}()
time.Sleep(100 * time.Millisecond)

dbaConnector := framework.Server.Config().DB.DbaWithDB()
conn, err := dbaConnector.Connect(context.Background())
require.NoError(t, err)
defer conn.Close()

_, err = conn.ExecuteFetch(fmt.Sprintf("kill query %s", connRes.Rows[0][0].ToString()), 1, false)
require.NoError(t, err)

client2.Release()
<-ch

qr, err := client2.Execute("select dtid, state, message from _vt.redo_state where dtid = 'aa'", nil)
require.NoError(t, err)
require.Equal(t, `[[VARBINARY("aa") INT64(1) TEXT("Query execution was interrupted (errno 1317) (sqlstate 70100) during query: delete from _vt.redo_state where dtid = 'aa'")]]`, fmt.Sprintf("%v", qr.Rows))
}
29 changes: 1 addition & 28 deletions go/vt/vttablet/tabletserver/dt_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ func (dte *DTExecutor) CommitPrepared(dtid string) (err error) {
ctx := trace.CopySpan(context.Background(), dte.ctx)
defer func() {
if err != nil {
dte.markFailed(ctx, dtid)
log.Warningf("failed to commit the prepared transaction '%s' with error: %v", dtid, err)
dte.te.checkErrorAndMarkFailed(ctx, dtid, err, "TwopcCommit")
}
dte.te.txPool.RollbackAndRelease(ctx, conn)
}()
Expand All @@ -172,33 +172,6 @@ func (dte *DTExecutor) CommitPrepared(dtid string) (err error) {
return nil
}

// markFailed does the necessary work to mark a CommitPrepared
// as failed. It marks the dtid as failed in the prepared pool,
// increments the InternalErros counter, and also changes the
// state of the transaction in the redo log as failed. If the
// state change does not succeed, it just logs the event.
// The function uses the passed in context that has no timeout
// instead of DTExecutor's context.
func (dte *DTExecutor) markFailed(ctx context.Context, dtid string) {
dte.te.env.Stats().InternalErrors.Add("TwopcCommit", 1)
dte.te.preparedPool.SetFailed(dtid)
conn, _, _, err := dte.te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
if err != nil {
log.Errorf("markFailed: Begin failed for dtid %s: %v", dtid, err)
return
}
defer dte.te.txPool.RollbackAndRelease(ctx, conn)

if err = dte.te.twoPC.UpdateRedo(ctx, conn, dtid, RedoStateFailed); err != nil {
log.Errorf("markFailed: UpdateRedo failed for dtid %s: %v", dtid, err)
return
}

if _, err = dte.te.txPool.Commit(ctx, conn); err != nil {
log.Errorf("markFailed: Commit failed for dtid %s: %v", dtid, err)
}
}

// RollbackPrepared rolls back a prepared transaction. This function handles
// the case of an incomplete prepare.
//
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletserver/dt_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

"vitess.io/vitess/go/event/syslogger"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
Expand Down Expand Up @@ -718,6 +719,7 @@ func newTestTxExecutor(t *testing.T, ctx context.Context) (txe *DTExecutor, tsv
db.AddQuery("delete from _vt.redo_state where dtid = 'aa'", &sqltypes.Result{})
db.AddQuery("delete from _vt.redo_statement where dtid = 'aa'", &sqltypes.Result{})
db.AddQuery("update test_table set `name` = 2 where pk = 1 limit 10001", &sqltypes.Result{})
db.AddRejectedQuery("bogus", sqlerror.NewSQLError(sqlerror.ERUnknownError, sqlerror.SSUnknownSQLState, "bogus query"))
return &DTExecutor{
ctx: ctx,
logStats: logStats,
Expand Down
11 changes: 10 additions & 1 deletion go/vt/vttablet/tabletserver/tabletserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,14 @@ func TestTabletServerRedoLogIsKeptBetweenRestarts(t *testing.T) {
{Type: sqltypes.Uint64},
{Type: sqltypes.Uint64},
{Type: sqltypes.VarBinary},
{Type: sqltypes.Text},
},
Rows: [][]sqltypes.Value{{
sqltypes.NewVarBinary("dtid0"),
sqltypes.NewInt64(RedoStatePrepared),
sqltypes.NewVarBinary(""),
sqltypes.NewVarBinary("update test_table set `name` = 2 where pk = 1 limit 10001"),
sqltypes.NULL,
}},
})
turnOnTxEngine()
Expand All @@ -255,22 +257,26 @@ func TestTabletServerRedoLogIsKeptBetweenRestarts(t *testing.T) {
{Type: sqltypes.Uint64},
{Type: sqltypes.Uint64},
{Type: sqltypes.VarBinary},
{Type: sqltypes.Text},
},
Rows: [][]sqltypes.Value{{
sqltypes.NewVarBinary("bogus"),
sqltypes.NewInt64(RedoStatePrepared),
sqltypes.NewVarBinary(""),
sqltypes.NewVarBinary("bogus"),
sqltypes.NULL,
}, {
sqltypes.NewVarBinary("a:b:10"),
sqltypes.NewInt64(RedoStatePrepared),
sqltypes.NewVarBinary(""),
sqltypes.NewVarBinary("update test_table set `name` = 2 where pk = 1 limit 10001"),
sqltypes.NULL,
}, {
sqltypes.NewVarBinary("a:b:20"),
sqltypes.NewInt64(RedoStateFailed),
sqltypes.NewVarBinary(""),
sqltypes.NewVarBinary("unused"),
sqltypes.TestValue(sqltypes.Text, "deadlock detected, transaction rolled back"),
}},
})
turnOnTxEngine()
Expand All @@ -280,7 +286,10 @@ func TestTabletServerRedoLogIsKeptBetweenRestarts(t *testing.T) {
Sql: "update test_table set `name` = 2 where pk = 1 limit 10001",
Tables: []string{"test_table"}}}
utils.MustMatch(t, want, got, "Prepared queries")
wantFailed := map[string]error{"a:b:20": errPrepFailed}
wantFailed := map[string]error{
"bogus": errPrepFailed, // The query is rejected by database so added to failed list.
"a:b:20": errPrepFailed, // The DTID is already in failed state.
}
utils.MustMatch(t, tsv.te.preparedPool.reserved, wantFailed, fmt.Sprintf("Failed dtids: %v, want %v", tsv.te.preparedPool.reserved, wantFailed))
// Verify last id got adjusted.
assert.EqualValues(t, 20, tsv.te.txPool.scp.lastID.Load(), "tsv.te.txPool.lastID.Get()")
Expand Down
18 changes: 10 additions & 8 deletions go/vt/vttablet/tabletserver/twopc.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const (
// DTStateRollback represents the ROLLBACK state for dt_state.
DTStateRollback = querypb.TransactionState_ROLLBACK

readAllRedo = `select t.dtid, t.state, t.time_created, s.statement
readAllRedo = `select t.dtid, t.state, t.time_created, s.statement, t.message
from %s.redo_state t
join %s.redo_statement s on t.dtid = s.dtid
order by t.dtid, s.id`
Expand Down Expand Up @@ -109,8 +109,8 @@ func (tpc *TwoPC) initializeQueries() {
"insert into %s.redo_statement(dtid, id, statement) values %a",
dbname, ":vals")
tpc.updateRedoTx = sqlparser.BuildParsedQuery(
"update %s.redo_state set state = %a where dtid = %a",
dbname, ":state", ":dtid")
"update %s.redo_state set state = %a, message = %a where dtid = %a",
dbname, ":state", ":message", ":dtid")
tpc.deleteRedoTx = sqlparser.BuildParsedQuery(
"delete from %s.redo_state where dtid = %a",
dbname, ":dtid")
Expand Down Expand Up @@ -200,10 +200,11 @@ func (tpc *TwoPC) SaveRedo(ctx context.Context, conn *StatefulConnection, dtid s
}

// UpdateRedo changes the state of the redo log for the dtid.
func (tpc *TwoPC) UpdateRedo(ctx context.Context, conn *StatefulConnection, dtid string, state int) error {
func (tpc *TwoPC) UpdateRedo(ctx context.Context, conn *StatefulConnection, dtid string, state int, message string) error {
bindVars := map[string]*querypb.BindVariable{
"dtid": sqltypes.StringBindVariable(dtid),
"state": sqltypes.Int64BindVariable(int64(state)),
"dtid": sqltypes.StringBindVariable(dtid),
"state": sqltypes.Int64BindVariable(int64(state)),
"message": sqltypes.StringBindVariable(message),
}
_, err := tpc.exec(ctx, conn, tpc.updateRedoTx, bindVars)
return err
Expand Down Expand Up @@ -244,8 +245,9 @@ func (tpc *TwoPC) ReadAllRedo(ctx context.Context) (prepared, failed []*tx.Prepa
// which is harmless.
tm, _ := row[2].ToCastInt64()
curTx = &tx.PreparedTx{
Dtid: dtid,
Time: time.Unix(0, tm),
Dtid: dtid,
Time: time.Unix(0, tm),
Message: row[4].ToString(),
}
st, err := row[1].ToCastInt64()
if err != nil {
Expand Down
Loading

0 comments on commit 83b37b8

Please sign in to comment.