Skip to content

Commit

Permalink
Modify distributed transaction commit flow (#16468)
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal authored Jul 30, 2024
1 parent b88c62f commit 3d104d0
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 103 deletions.
5 changes: 3 additions & 2 deletions go/mysql/sqlerror/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ func (e ErrorCode) ToString() string {
// See above reference for more information on each code.
const (
// Vitess specific errors, (100-999)
ERNotReplica = ErrorCode(100)
ERNonAtomicCommit = ErrorCode(301)
ERNotReplica = ErrorCode(100)
ERNonAtomicCommit = ErrorCode(301)
ERInAtomicRecovery = ErrorCode(302)

// unknown
ERUnknownError = ErrorCode(1105)
Expand Down
97 changes: 85 additions & 12 deletions go/test/endtoend/transaction/twopc/twopc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/callerid"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
)

// TestDTCommit tests distributed transaction commit for insert, update and delete operations
Expand Down Expand Up @@ -580,6 +582,10 @@ func TestDTResolveAfterMMCommit(t *testing.T) {
_, err = conn.Execute(newCtx, "commit", nil)
require.ErrorContains(t, err, "Fail After MM commit")

testWarningAndTransactionStatus(t, conn,
"distributed transaction ID failed during metadata manager commit; transaction will be committed/rollbacked based on the state on recovery",
false, "COMMIT", "ks:40-80,ks:-40")

// Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM.
tableMap := make(map[string][]*querypb.Field)
dtMap := make(map[string]string)
Expand Down Expand Up @@ -656,6 +662,10 @@ func TestDTResolveAfterRMPrepare(t *testing.T) {
_, err = conn.Execute(newCtx, "commit", nil)
require.ErrorContains(t, err, "Fail After RM prepared")

testWarningAndTransactionStatus(t, conn,
"distributed transaction ID failed during transaction prepare phase; prepare transaction rollback attempted; conclude on recovery",
true /* transaction concluded */, "", "")

// Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM.
tableMap := make(map[string][]*querypb.Field)
dtMap := make(map[string]string)
Expand Down Expand Up @@ -714,6 +724,10 @@ func TestDTResolveDuringRMPrepare(t *testing.T) {
_, err = conn.Execute(newCtx, "commit", nil)
require.ErrorContains(t, err, "Fail During RM prepare")

testWarningAndTransactionStatus(t, conn,
"distributed transaction ID failed during transaction prepare phase; prepare transaction rollback attempted; conclude on recovery",
true, "", "")

// Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM.
tableMap := make(map[string][]*querypb.Field)
dtMap := make(map[string]string)
Expand Down Expand Up @@ -776,6 +790,10 @@ func TestDTResolveDuringRMCommit(t *testing.T) {
_, err = conn.Execute(newCtx, "commit", nil)
require.ErrorContains(t, err, "Fail During RM commit")

testWarningAndTransactionStatus(t, conn,
"distributed transaction ID failed during resource manager commit; transaction will be committed on recovery",
false, "COMMIT", "ks:40-80,ks:-40")

// Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM.
tableMap := make(map[string][]*querypb.Field)
dtMap := make(map[string]string)
Expand Down Expand Up @@ -851,18 +869,9 @@ func TestDTResolveAfterTransactionRecord(t *testing.T) {
_, err = conn.Execute(newCtx, "commit", nil)
require.ErrorContains(t, err, "Fail After TR created")

t.Run("ReadTransactionState", func(t *testing.T) {
errStr := err.Error()
indx := strings.Index(errStr, "Fail")
require.Greater(t, indx, 0)
dtid := errStr[0 : indx-2]
res, err := conn.Execute(context.Background(), fmt.Sprintf(`show transaction status for '%v'`, dtid), nil)
require.NoError(t, err)
resStr := fmt.Sprintf("%v", res.Rows)
require.Contains(t, resStr, `[[VARCHAR("ks:80-`)
require.Contains(t, resStr, `VARCHAR("PREPARE") DATETIME("`)
require.Contains(t, resStr, `+0000 UTC") VARCHAR("ks:40-80")]]`)
})
testWarningAndTransactionStatus(t, conn,
"distributed transaction ID failed during transaction record creation; rollback attempted; conclude on recovery",
false, "PREPARE", "ks:40-80")

// Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM.
tableMap := make(map[string][]*querypb.Field)
Expand All @@ -882,3 +891,67 @@ func TestDTResolveAfterTransactionRecord(t *testing.T) {
assert.Equal(t, expectations, logTable,
"mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations))
}

type warn struct {
level string
code uint16
msg string
}

func toWarn(row sqltypes.Row) warn {
code, _ := row[1].ToUint16()
return warn{
level: row[0].ToString(),
code: code,
msg: row[2].ToString(),
}
}

type txStatus struct {
dtid string
state string
rTime string
participants string
}

func toTxStatus(row sqltypes.Row) txStatus {
return txStatus{
dtid: row[0].ToString(),
state: row[1].ToString(),
rTime: row[2].ToString(),
participants: row[3].ToString(),
}
}

func testWarningAndTransactionStatus(t *testing.T, conn *vtgateconn.VTGateSession, warnMsg string,
txConcluded bool, txState string, txParticipants string) {
t.Helper()

qr, err := conn.Execute(context.Background(), "show warnings", nil)
require.NoError(t, err)
require.Len(t, qr.Rows, 1)

// validate warning output
w := toWarn(qr.Rows[0])
assert.Equal(t, "Warning", w.level)
assert.EqualValues(t, 302, w.code)
assert.Contains(t, w.msg, warnMsg)

// extract transaction ID
indx := strings.Index(w.msg, " ")
require.Greater(t, indx, 0)
dtid := w.msg[:indx]

qr, err = conn.Execute(context.Background(), fmt.Sprintf(`show transaction status for '%v'`, dtid), nil)
require.NoError(t, err)

// validate transaction status
if txConcluded {
require.Empty(t, qr.Rows)
} else {
tx := toTxStatus(qr.Rows[0])
assert.Equal(t, dtid, tx.dtid)
assert.Equal(t, txState, tx.state)
assert.Equal(t, txParticipants, tx.participants)
}
}
48 changes: 48 additions & 0 deletions go/vt/vtgate/debug_2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,52 @@ limitations under the License.

package vtgate

import (
"context"

"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
)

const DebugTwoPc = true

// checkTestFailure is used to simulate failures in 2PC flow for testing when DebugTwoPc is true.
func checkTestFailure(ctx context.Context, expectCaller string, target *querypb.Target) error {
callerID := callerid.EffectiveCallerIDFromContext(ctx)
if callerID == nil || callerID.GetPrincipal() != expectCaller {
return nil
}
switch callerID.Principal {
case "TRCreated_FailNow":
log.Errorf("Fail After TR created")
// no commit decision is made. Transaction should be a rolled back.
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail After TR created")
case "RMPrepare_-40_FailNow":
if target.Shard != "-40" {
return nil
}
log.Errorf("Fail During RM prepare")
// no commit decision is made. Transaction should be a rolled back.
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail During RM prepare")
case "RMPrepared_FailNow":
log.Errorf("Fail After RM prepared")
// no commit decision is made. Transaction should be a rolled back.
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail After RM prepared")
case "MMCommitted_FailNow":
log.Errorf("Fail After MM commit")
// commit decision is made. Transaction should be committed.
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail After MM commit")
case "RMCommit_-40_FailNow":
if target.Shard != "-40" {
return nil
}
log.Errorf("Fail During RM commit")
// commit decision is made. Transaction should be a committed.
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail During RM commit")
default:
return nil
}
}
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/transaction_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (t *TransactionStatus) TryExecute(ctx context.Context, vcursor VCursor, bin
if wantfields {
res.Fields = t.getFields()
}
if transactionState != nil {
if transactionState != nil && transactionState.Dtid != "" {
var participantString []string
for _, participant := range transactionState.Participants {
participantString = append(participantString, fmt.Sprintf("%s:%s", participant.Keyspace, participant.Shard))
Expand Down
10 changes: 10 additions & 0 deletions go/vt/vtgate/production.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ limitations under the License.

package vtgate

import (
"context"

querypb "vitess.io/vitess/go/vt/proto/query"
)

// This file defines debug constants that are always false.
// This file is used for building production code.
// We use go build directives to include a file that defines the constant to true
Expand All @@ -26,3 +32,7 @@ package vtgate
// production performance.

const DebugTwoPc = false

func checkTestFailure(_ context.Context, _ string, _ *querypb.Target) error {
return nil
}
Loading

0 comments on commit 3d104d0

Please sign in to comment.