From 961a9786e7a76b98069c36a63048edb0f28b8a5d Mon Sep 17 00:00:00 2001 From: Max Englander Date: Thu, 11 Jan 2024 06:42:29 -0500 Subject: [PATCH] vtgate: record warning for partially successful cross-shard commits (#14848) Signed-off-by: Max Englander --- changelog/19.0/19.0.0/summary.md | 21 +++- go/mysql/sqlerror/constants.go | 3 +- go/vt/vtgate/tx_conn.go | 27 +++- go/vt/vtgate/tx_conn_test.go | 207 ++++++++++++++++++++++++++++--- 4 files changed, 234 insertions(+), 24 deletions(-) diff --git a/changelog/19.0/19.0.0/summary.md b/changelog/19.0/19.0.0/summary.md index cde1608562a..dc65c3f7137 100644 --- a/changelog/19.0/19.0.0/summary.md +++ b/changelog/19.0/19.0.0/summary.md @@ -17,6 +17,7 @@ - [Multi Table Delete Support](#multi-table-delete) - [`SHOW VSCHEMA KEYSPACES` Query](#show-vschema-keyspaces) - [`FOREIGN_KEY_CHECKS` is now a Vitess Aware Variable](#fk-checks-vitess-aware) + - [Partial Multi-shard Commit Warnings](#partial-multi-shard-commit-warnings) - **[Vttestserver](#vttestserver)** - [`--vtcombo-bind-host` flag](#vtcombo-bind-host) - **[Minor Changes](#minor-changes)** @@ -64,7 +65,6 @@ Prior to 19.0 VTTablet reported how much time non-streaming executions spend wai The build version (e.g., `19.0.0-SNAPSHOT`) has been added to `/debug/vars`, allowing users to programmatically inspect Vitess components' build version at runtime. - ### Planned Reparent Shard #### `--tolerable-replication-lag` Sub-flag @@ -74,7 +74,6 @@ This feature is opt-in and not specifying this sub-flag makes Vitess ignore the A new flag in VTOrc with the same name has been added to control the behaviour of the PlannedReparentShard calls that VTOrc issues. - ### Query Compatibility #### Multi Table Delete Support @@ -107,6 +106,24 @@ mysql> show vschema keyspaces; When VTGate receives a query to change the `FOREIGN_KEY_CHECKS` value for a session, instead of sending the value down to MySQL, VTGate now keeps track of the value and changes the queries by adding `SET_VAR(FOREIGN_KEY_CHECKS=On/Off)` style query optimizer hints wherever required. +#### Partial Multi-shard Commit Warnings + +When using `multi` transaction mode (the default), it is possible for Vitess to successfully commit to one shard, but fail to commit to a subsequent shard, thus breaking the atomicity of a multi-shard transaction. + +In `v19.0`, VTGate reports partial-success commits in warnings, e.g.: + +```mysql +mysql> commit; +ERROR 1317 (70100): target: customer.-80.primary: vttablet: rpc error: code = Aborted desc = transaction 1703182545849001001: ended at 2023-12-21 14:07:41.515 EST (exceeded timeout: 30s) (CallerID: userData1) +mysql> show warnings; ++---------+------+----------------------------------------------------------+ +| Level | Code | Message | ++---------+------+----------------------------------------------------------+ +| Warning | 301 | multi-db commit failed after committing to 1 shards: 80- | ++---------+------+----------------------------------------------------------+ +1 row in set, 1 warning (0.00 sec) +``` + ### Vttestserver #### `--vtcombo-bind-host` flag diff --git a/go/mysql/sqlerror/constants.go b/go/mysql/sqlerror/constants.go index 01de1b6d45c..fdec64588c1 100644 --- a/go/mysql/sqlerror/constants.go +++ b/go/mysql/sqlerror/constants.go @@ -34,7 +34,8 @@ func (e ErrorCode) ToString() string { // See above reference for more information on each code. const ( // Vitess specific errors, (100-999) - ERNotReplica = ErrorCode(100) + ERNotReplica = ErrorCode(100) + ERNonAtomicCommit = ErrorCode(301) // unknown ERUnknownError = ErrorCode(1105) diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go index 9170093c23e..246140b3711 100644 --- a/go/vt/vtgate/tx_conn.go +++ b/go/vt/vtgate/tx_conn.go @@ -19,8 +19,10 @@ package vtgate import ( "context" "fmt" + "strings" "sync" + "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/dtids" "vitess.io/vitess/go/vt/log" @@ -34,6 +36,10 @@ import ( vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) +// nonAtomicCommitWarnMaxShards limits the number of shard names reported in +// non-atomic commit warnings. +const nonAtomicCommitWarnMaxShards = 16 + // TxConn is used for executing transactional requests. type TxConn struct { tabletGateway *TabletGateway @@ -132,8 +138,27 @@ func (txc *TxConn) commitNormal(ctx context.Context, session *SafeSession) error } // Retain backward compatibility on commit order for the normal session. - for _, shardSession := range session.ShardSessions { + for i, shardSession := range session.ShardSessions { if err := txc.commitShard(ctx, shardSession, session.logging); err != nil { + if i > 0 { + nShards := i + elipsis := false + if i > nonAtomicCommitWarnMaxShards { + nShards = nonAtomicCommitWarnMaxShards + elipsis = true + } + sNames := make([]string, nShards, nShards+1 /*...*/) + for j := 0; j < nShards; j++ { + sNames[j] = session.ShardSessions[j].Target.Shard + } + if elipsis { + sNames = append(sNames, "...") + } + session.RecordWarning(&querypb.QueryWarning{ + Code: uint32(sqlerror.ERNonAtomicCommit), + Message: fmt.Sprintf("multi-db commit failed after committing to %d shards: %s", i, strings.Join(sNames, ", ")), + }) + } _ = txc.Release(ctx, session) return err } diff --git a/go/vt/vtgate/tx_conn_test.go b/go/vt/vtgate/tx_conn_test.go index 3fc141c64ac..6ce863603f8 100644 --- a/go/vt/vtgate/tx_conn_test.go +++ b/go/vt/vtgate/tx_conn_test.go @@ -19,6 +19,7 @@ package vtgate import ( "context" "fmt" + "strconv" "testing" "github.com/stretchr/testify/assert" @@ -27,6 +28,7 @@ import ( "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/srvtopo" @@ -41,6 +43,7 @@ import ( var queries = []*querypb.BoundQuery{{Sql: "query1"}} var twoQueries = []*querypb.BoundQuery{{Sql: "query1"}, {Sql: "query1"}} +var threeQueries = []*querypb.BoundQuery{{Sql: "query1"}, {Sql: "query1"}, {Sql: "query1"}} func TestTxConnBegin(t *testing.T) { ctx := utils.LeakCheckContext(t) @@ -67,12 +70,13 @@ func TestTxConnBegin(t *testing.T) { func TestTxConnCommitFailure(t *testing.T) { ctx := utils.LeakCheckContext(t) - sc, sbc0, sbc1, rss0, rss1, rss01 := newTestTxConnEnv(t, ctx, "TestTxConn") + sc, sbcs, rssm, rssa := newTestTxConnEnvNShards(t, ctx, "TestTxConn", 3) sc.txConn.mode = vtgatepb.TransactionMode_MULTI // Sequence the executes to ensure commit order + session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rssm[0], queries, session, false, false) wantSession := vtgatepb.Session{ InTransaction: true, ShardSessions: []*vtgatepb.Session_ShardSession{{ @@ -82,11 +86,12 @@ func TestTxConnCommitFailure(t *testing.T) { TabletType: topodatapb.TabletType_PRIMARY, }, TransactionId: 1, - TabletAlias: sbc0.Tablet().Alias, + TabletAlias: sbcs[0].Tablet().Alias, }}, } utils.MustMatch(t, &wantSession, session.Session, "Session") - sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false) + + sc.ExecuteMultiShard(ctx, nil, rssm[1], queries, session, false, false) wantSession = vtgatepb.Session{ InTransaction: true, ShardSessions: []*vtgatepb.Session_ShardSession{{ @@ -96,7 +101,7 @@ func TestTxConnCommitFailure(t *testing.T) { TabletType: topodatapb.TabletType_PRIMARY, }, TransactionId: 1, - TabletAlias: sbc0.Tablet().Alias, + TabletAlias: sbcs[0].Tablet().Alias, }, { Target: &querypb.Target{ Keyspace: "TestTxConn", @@ -104,23 +109,162 @@ func TestTxConnCommitFailure(t *testing.T) { TabletType: topodatapb.TabletType_PRIMARY, }, TransactionId: 1, - TabletAlias: sbc1.Tablet().Alias, + TabletAlias: sbcs[1].Tablet().Alias, }}, } utils.MustMatch(t, &wantSession, session.Session, "Session") - sbc1.MustFailCodes[vtrpcpb.Code_DEADLINE_EXCEEDED] = 1 + sc.ExecuteMultiShard(ctx, nil, rssa, threeQueries, session, false, false) + wantSession = vtgatepb.Session{ + InTransaction: true, + ShardSessions: []*vtgatepb.Session_ShardSession{{ + Target: &querypb.Target{ + Keyspace: "TestTxConn", + Shard: "0", + TabletType: topodatapb.TabletType_PRIMARY, + }, + TransactionId: 1, + TabletAlias: sbcs[0].Tablet().Alias, + }, { + Target: &querypb.Target{ + Keyspace: "TestTxConn", + Shard: "1", + TabletType: topodatapb.TabletType_PRIMARY, + }, + TransactionId: 1, + TabletAlias: sbcs[1].Tablet().Alias, + }, { + Target: &querypb.Target{ + Keyspace: "TestTxConn", + Shard: "2", + TabletType: topodatapb.TabletType_PRIMARY, + }, + TransactionId: 1, + TabletAlias: sbcs[2].Tablet().Alias, + }}, + } + utils.MustMatch(t, &wantSession, session.Session, "Session") + + sbcs[2].MustFailCodes[vtrpcpb.Code_DEADLINE_EXCEEDED] = 1 expectErr := NewShardError(vterrors.New( vtrpcpb.Code_DEADLINE_EXCEEDED, fmt.Sprintf("%v error", vtrpcpb.Code_DEADLINE_EXCEEDED)), - rss1[0].Target) + rssm[2][0].Target) require.ErrorContains(t, sc.txConn.Commit(ctx, session), expectErr.Error()) - wantSession = vtgatepb.Session{} + wantSession = vtgatepb.Session{ + Warnings: []*querypb.QueryWarning{ + { + Code: uint32(sqlerror.ERNonAtomicCommit), + Message: "multi-db commit failed after committing to 2 shards: 0, 1", + }, + }, + } utils.MustMatch(t, &wantSession, session.Session, "Session") - assert.EqualValues(t, 1, sbc0.CommitCount.Load(), "sbc0.CommitCount") - assert.EqualValues(t, 1, sbc1.CommitCount.Load(), "sbc1.CommitCount") + assert.EqualValues(t, 1, sbcs[0].CommitCount.Load(), "sbc0.CommitCount") +} + +func TestTxConnCommitFailureAfterNonAtomicCommitMaxShards(t *testing.T) { + ctx := utils.LeakCheckContext(t) + + sc, sbcs, rssm, _ := newTestTxConnEnvNShards(t, ctx, "TestTxConn", 18) + sc.txConn.mode = vtgatepb.TransactionMode_MULTI + + // Sequence the executes to ensure commit order + + session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) + wantSession := vtgatepb.Session{ + InTransaction: true, + ShardSessions: []*vtgatepb.Session_ShardSession{}, + } + + for i := 0; i < 18; i++ { + sc.ExecuteMultiShard(ctx, nil, rssm[i], queries, session, false, false) + wantSession.ShardSessions = append(wantSession.ShardSessions, &vtgatepb.Session_ShardSession{ + Target: &querypb.Target{ + Keyspace: "TestTxConn", + Shard: rssm[i][0].Target.Shard, + TabletType: topodatapb.TabletType_PRIMARY, + }, + TransactionId: 1, + TabletAlias: sbcs[i].Tablet().Alias, + }) + utils.MustMatch(t, &wantSession, session.Session, "Session") + } + + sbcs[17].MustFailCodes[vtrpcpb.Code_DEADLINE_EXCEEDED] = 1 + + expectErr := NewShardError(vterrors.New( + vtrpcpb.Code_DEADLINE_EXCEEDED, + fmt.Sprintf("%v error", vtrpcpb.Code_DEADLINE_EXCEEDED)), + rssm[17][0].Target) + + require.ErrorContains(t, sc.txConn.Commit(ctx, session), expectErr.Error()) + wantSession = vtgatepb.Session{ + Warnings: []*querypb.QueryWarning{ + { + Code: uint32(sqlerror.ERNonAtomicCommit), + Message: "multi-db commit failed after committing to 17 shards: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, ...", + }, + }, + } + + utils.MustMatch(t, &wantSession, session.Session, "Session") + for i := 0; i < 17; i++ { + assert.EqualValues(t, 1, sbcs[i].CommitCount.Load(), fmt.Sprintf("sbc%d.CommitCount", i)) + } +} + +func TestTxConnCommitFailureBeforeNonAtomicCommitMaxShards(t *testing.T) { + ctx := utils.LeakCheckContext(t) + + sc, sbcs, rssm, _ := newTestTxConnEnvNShards(t, ctx, "TestTxConn", 17) + sc.txConn.mode = vtgatepb.TransactionMode_MULTI + + // Sequence the executes to ensure commit order + + session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) + wantSession := vtgatepb.Session{ + InTransaction: true, + ShardSessions: []*vtgatepb.Session_ShardSession{}, + } + + for i := 0; i < 17; i++ { + sc.ExecuteMultiShard(ctx, nil, rssm[i], queries, session, false, false) + wantSession.ShardSessions = append(wantSession.ShardSessions, &vtgatepb.Session_ShardSession{ + Target: &querypb.Target{ + Keyspace: "TestTxConn", + Shard: rssm[i][0].Target.Shard, + TabletType: topodatapb.TabletType_PRIMARY, + }, + TransactionId: 1, + TabletAlias: sbcs[i].Tablet().Alias, + }) + utils.MustMatch(t, &wantSession, session.Session, "Session") + } + + sbcs[16].MustFailCodes[vtrpcpb.Code_DEADLINE_EXCEEDED] = 1 + + expectErr := NewShardError(vterrors.New( + vtrpcpb.Code_DEADLINE_EXCEEDED, + fmt.Sprintf("%v error", vtrpcpb.Code_DEADLINE_EXCEEDED)), + rssm[16][0].Target) + + require.ErrorContains(t, sc.txConn.Commit(ctx, session), expectErr.Error()) + wantSession = vtgatepb.Session{ + Warnings: []*querypb.QueryWarning{ + { + Code: uint32(sqlerror.ERNonAtomicCommit), + Message: "multi-db commit failed after committing to 16 shards: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15", + }, + }, + } + + utils.MustMatch(t, &wantSession, session.Session, "Session") + for i := 0; i < 16; i++ { + assert.EqualValues(t, 1, sbcs[i].CommitCount.Load(), fmt.Sprintf("sbc%d.CommitCount", i)) + } } func TestTxConnCommitSuccess(t *testing.T) { @@ -1359,17 +1503,40 @@ func TestTxConnAccessModeReset(t *testing.T) { func newTestTxConnEnv(t *testing.T, ctx context.Context, name string) (sc *ScatterConn, sbc0, sbc1 *sandboxconn.SandboxConn, rss0, rss1, rss01 []*srvtopo.ResolvedShard) { t.Helper() createSandbox(name) + sc, sbcs, rssl, rssa := newTestTxConnEnvNShards(t, ctx, name, 2) + return sc, sbcs[0], sbcs[1], rssl[0], rssl[1], rssa +} + +func newTestTxConnEnvNShards(t *testing.T, ctx context.Context, name string, n int) ( + sc *ScatterConn, sbcl []*sandboxconn.SandboxConn, rssl [][]*srvtopo.ResolvedShard, rssa []*srvtopo.ResolvedShard, +) { + t.Helper() + createSandbox(name) + hc := discovery.NewFakeHealthCheck(nil) sc = newTestScatterConn(ctx, hc, newSandboxForCells(ctx, []string{"aa"}), "aa") - sbc0 = hc.AddTestTablet("aa", "0", 1, name, "0", topodatapb.TabletType_PRIMARY, true, 1, nil) - sbc1 = hc.AddTestTablet("aa", "1", 1, name, "1", topodatapb.TabletType_PRIMARY, true, 1, nil) + + sNames := make([]string, n) + for i := 0; i < n; i++ { + sNames[i] = strconv.FormatInt(int64(i), 10) + } + + sbcl = make([]*sandboxconn.SandboxConn, len(sNames)) + for i, sName := range sNames { + sbcl[i] = hc.AddTestTablet("aa", sName, int32(i)+1, name, sName, topodatapb.TabletType_PRIMARY, true, 1, nil) + } + res := srvtopo.NewResolver(newSandboxForCells(ctx, []string{"aa"}), sc.gateway, "aa") - var err error - rss0, err = res.ResolveDestination(ctx, name, topodatapb.TabletType_PRIMARY, key.DestinationShard("0")) - require.NoError(t, err) - rss1, err = res.ResolveDestination(ctx, name, topodatapb.TabletType_PRIMARY, key.DestinationShard("1")) - require.NoError(t, err) - rss01, err = res.ResolveDestination(ctx, name, topodatapb.TabletType_PRIMARY, key.DestinationShards([]string{"0", "1"})) + + rssl = make([][]*srvtopo.ResolvedShard, len(sNames)) + for i, sName := range sNames { + rss, err := res.ResolveDestination(ctx, name, topodatapb.TabletType_PRIMARY, key.DestinationShard(sName)) + require.NoError(t, err) + rssl[i] = rss + } + + rssa, err := res.ResolveDestination(ctx, name, topodatapb.TabletType_PRIMARY, key.DestinationShards(sNames)) require.NoError(t, err) - return sc, sbc0, sbc1, rss0, rss1, rss01 + + return sc, sbcl, rssl, rssa }