From 43f9e3d6a87bccfbfb616ef56a51e0bea1216aba Mon Sep 17 00:00:00 2001 From: Max Englander Date: Thu, 21 Dec 2023 12:56:46 -0500 Subject: [PATCH 1/4] vtgate: record warning for partially successful cross-shard commits Signed-off-by: Max Englander --- go/mysql/sqlerror/constants.go | 3 ++- go/vt/vtgate/tx_conn.go | 9 ++++++++- go/vt/vtgate/tx_conn_test.go | 10 +++++++++- 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/go/mysql/sqlerror/constants.go b/go/mysql/sqlerror/constants.go index 01de1b6d45c..fdec64588c1 100644 --- a/go/mysql/sqlerror/constants.go +++ b/go/mysql/sqlerror/constants.go @@ -34,7 +34,8 @@ func (e ErrorCode) ToString() string { // See above reference for more information on each code. const ( // Vitess specific errors, (100-999) - ERNotReplica = ErrorCode(100) + ERNotReplica = ErrorCode(100) + ERNonAtomicCommit = ErrorCode(301) // unknown ERUnknownError = ErrorCode(1105) diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go index 9170093c23e..5d8444645f2 100644 --- a/go/vt/vtgate/tx_conn.go +++ b/go/vt/vtgate/tx_conn.go @@ -21,6 +21,7 @@ import ( "fmt" "sync" + "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/dtids" "vitess.io/vitess/go/vt/log" @@ -132,9 +133,15 @@ func (txc *TxConn) commitNormal(ctx context.Context, session *SafeSession) error } // Retain backward compatibility on commit order for the normal session. - for _, shardSession := range session.ShardSessions { + for i, shardSession := range session.ShardSessions { if err := txc.commitShard(ctx, shardSession, session.logging); err != nil { _ = txc.Release(ctx, session) + if i > 0 { + session.RecordWarning(&querypb.QueryWarning{ + Code: uint32(sqlerror.ERNonAtomicCommit), + Message: fmt.Sprintf("multi-db commit failed after committing to %d shards", i), + }) + } return err } } diff --git a/go/vt/vtgate/tx_conn_test.go b/go/vt/vtgate/tx_conn_test.go index 3fc141c64ac..96424469d7b 100644 --- a/go/vt/vtgate/tx_conn_test.go +++ b/go/vt/vtgate/tx_conn_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/srvtopo" @@ -117,7 +118,14 @@ func TestTxConnCommitFailure(t *testing.T) { rss1[0].Target) require.ErrorContains(t, sc.txConn.Commit(ctx, session), expectErr.Error()) - wantSession = vtgatepb.Session{} + wantSession = vtgatepb.Session{ + Warnings: []*querypb.QueryWarning{ + { + Code: uint32(sqlerror.ERNonAtomicCommit), + Message: "multi-db commit failed after committing to 1 shards", + }, + }, + } utils.MustMatch(t, &wantSession, session.Session, "Session") assert.EqualValues(t, 1, sbc0.CommitCount.Load(), "sbc0.CommitCount") assert.EqualValues(t, 1, sbc1.CommitCount.Load(), "sbc1.CommitCount") From 530ceb93392003ddb7ff675fd46f540aa69e3097 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Thu, 21 Dec 2023 14:27:17 -0500 Subject: [PATCH 2/4] add note to changelog Signed-off-by: Max Englander --- changelog/19.0/19.0.0/summary.md | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/changelog/19.0/19.0.0/summary.md b/changelog/19.0/19.0.0/summary.md index eafea7b163e..0ef904a55d8 100644 --- a/changelog/19.0/19.0.0/summary.md +++ b/changelog/19.0/19.0.0/summary.md @@ -13,6 +13,7 @@ - [Build Version in `/debug/vars`](#build-version-in-debug-vars) - **[VTGate](#vtgate)** - [`FOREIGN_KEY_CHECKS` is now a Vitess Aware Variable](#fk-checks-vitess-aware) + - [Partial Multi-shard Commit Warnings](#partial-multi-shard-commit-warnings) - **[Vttestserver](#vttestserver)** - [`--vtcombo-bind-host` flag](#vtcombo-bind-host) - **[Query Compatibility](#query-compatibility)** @@ -64,6 +65,24 @@ The build version (e.g., `19.0.0-SNAPSHOT`) has been added to `/debug/vars`, all When VTGate receives a query to change the `FOREIGN_KEY_CHECKS` value for a session, instead of sending the value down to MySQL, VTGate now keeps track of the value and changes the queries by adding `SET_VAR(FOREIGN_KEY_CHECKS=On/Off)` style query optimizer hints wherever required. +#### Partial Multi-shard Commit Warnings + +When using `multi` transaction mode (the default), it is possible for Vitess to successfully commit to one shard, but fail to commit to a subsequent shard, thus breaking the atomicity of a multi-shard transaction. + +In `v19.0`, VTGate reports partial-success commits in warnings, e.g.: + +```mysql +mysql> commit; +ERROR 1317 (70100): target: customer.-80.primary: vttablet: rpc error: code = Aborted desc = transaction 1703182545849001001: ended at 2023-12-21 14:07:41.515 EST (exceeded timeout: 30s) (CallerID: userData1) +mysql> show warnings; ++---------+------+-----------------------------------------------------+ +| Level | Code | Message | ++---------+------+-----------------------------------------------------+ +| Warning | 301 | multi-db commit failed after committing to 1 shards | ++---------+------+-----------------------------------------------------+ +1 row in set, 1 warning (0.00 sec) +``` + ### Vttestserver #### `--vtcombo-bind-host` flag From babd9ae77d868ebe13df082bb24e030382d3d027 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Tue, 9 Jan 2024 11:54:14 -0500 Subject: [PATCH 3/4] cr: report successfully committed shards in non-atomic commit warning Signed-off-by: Max Englander --- changelog/19.0/19.0.0/summary.md | 10 ++-- go/vt/vtgate/tx_conn.go | 22 +++++++- go/vt/vtgate/tx_conn_test.go | 97 +++++++++++++++++++++++++------- 3 files changed, 102 insertions(+), 27 deletions(-) diff --git a/changelog/19.0/19.0.0/summary.md b/changelog/19.0/19.0.0/summary.md index 0ef904a55d8..fe9849ba9b3 100644 --- a/changelog/19.0/19.0.0/summary.md +++ b/changelog/19.0/19.0.0/summary.md @@ -75,11 +75,11 @@ In `v19.0`, VTGate reports partial-success commits in warnings, e.g.: mysql> commit; ERROR 1317 (70100): target: customer.-80.primary: vttablet: rpc error: code = Aborted desc = transaction 1703182545849001001: ended at 2023-12-21 14:07:41.515 EST (exceeded timeout: 30s) (CallerID: userData1) mysql> show warnings; -+---------+------+-----------------------------------------------------+ -| Level | Code | Message | -+---------+------+-----------------------------------------------------+ -| Warning | 301 | multi-db commit failed after committing to 1 shards | -+---------+------+-----------------------------------------------------+ ++---------+------+----------------------------------------------------------+ +| Level | Code | Message | ++---------+------+----------------------------------------------------------+ +| Warning | 301 | multi-db commit failed after committing to 1 shards: -80 | ++---------+------+----------------------------------------------------------+ 1 row in set, 1 warning (0.00 sec) ``` diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go index 5d8444645f2..246140b3711 100644 --- a/go/vt/vtgate/tx_conn.go +++ b/go/vt/vtgate/tx_conn.go @@ -19,6 +19,7 @@ package vtgate import ( "context" "fmt" + "strings" "sync" "vitess.io/vitess/go/mysql/sqlerror" @@ -35,6 +36,10 @@ import ( vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) +// nonAtomicCommitWarnMaxShards limits the number of shard names reported in +// non-atomic commit warnings. +const nonAtomicCommitWarnMaxShards = 16 + // TxConn is used for executing transactional requests. type TxConn struct { tabletGateway *TabletGateway @@ -135,13 +140,26 @@ func (txc *TxConn) commitNormal(ctx context.Context, session *SafeSession) error // Retain backward compatibility on commit order for the normal session. for i, shardSession := range session.ShardSessions { if err := txc.commitShard(ctx, shardSession, session.logging); err != nil { - _ = txc.Release(ctx, session) if i > 0 { + nShards := i + elipsis := false + if i > nonAtomicCommitWarnMaxShards { + nShards = nonAtomicCommitWarnMaxShards + elipsis = true + } + sNames := make([]string, nShards, nShards+1 /*...*/) + for j := 0; j < nShards; j++ { + sNames[j] = session.ShardSessions[j].Target.Shard + } + if elipsis { + sNames = append(sNames, "...") + } session.RecordWarning(&querypb.QueryWarning{ Code: uint32(sqlerror.ERNonAtomicCommit), - Message: fmt.Sprintf("multi-db commit failed after committing to %d shards", i), + Message: fmt.Sprintf("multi-db commit failed after committing to %d shards: %s", i, strings.Join(sNames, ", ")), }) } + _ = txc.Release(ctx, session) return err } } diff --git a/go/vt/vtgate/tx_conn_test.go b/go/vt/vtgate/tx_conn_test.go index 96424469d7b..e773179f1f6 100644 --- a/go/vt/vtgate/tx_conn_test.go +++ b/go/vt/vtgate/tx_conn_test.go @@ -19,6 +19,7 @@ package vtgate import ( "context" "fmt" + "strconv" "testing" "github.com/stretchr/testify/assert" @@ -42,6 +43,7 @@ import ( var queries = []*querypb.BoundQuery{{Sql: "query1"}} var twoQueries = []*querypb.BoundQuery{{Sql: "query1"}, {Sql: "query1"}} +var threeQueries = []*querypb.BoundQuery{{Sql: "query1"}, {Sql: "query1"}, {Sql: "query1"}} func TestTxConnBegin(t *testing.T) { ctx := utils.LeakCheckContext(t) @@ -68,12 +70,13 @@ func TestTxConnBegin(t *testing.T) { func TestTxConnCommitFailure(t *testing.T) { ctx := utils.LeakCheckContext(t) - sc, sbc0, sbc1, rss0, rss1, rss01 := newTestTxConnEnv(t, ctx, "TestTxConn") + sc, sbcs, rssm, rssa := newTestTxConnEnvNShards(t, ctx, "TestTxConn", 3) sc.txConn.mode = vtgatepb.TransactionMode_MULTI // Sequence the executes to ensure commit order + session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rssm[0], queries, session, false, false) wantSession := vtgatepb.Session{ InTransaction: true, ShardSessions: []*vtgatepb.Session_ShardSession{{ @@ -83,11 +86,12 @@ func TestTxConnCommitFailure(t *testing.T) { TabletType: topodatapb.TabletType_PRIMARY, }, TransactionId: 1, - TabletAlias: sbc0.Tablet().Alias, + TabletAlias: sbcs[0].Tablet().Alias, }}, } utils.MustMatch(t, &wantSession, session.Session, "Session") - sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false) + + sc.ExecuteMultiShard(ctx, nil, rssm[1], queries, session, false, false) wantSession = vtgatepb.Session{ InTransaction: true, ShardSessions: []*vtgatepb.Session_ShardSession{{ @@ -97,7 +101,7 @@ func TestTxConnCommitFailure(t *testing.T) { TabletType: topodatapb.TabletType_PRIMARY, }, TransactionId: 1, - TabletAlias: sbc0.Tablet().Alias, + TabletAlias: sbcs[0].Tablet().Alias, }, { Target: &querypb.Target{ Keyspace: "TestTxConn", @@ -105,30 +109,60 @@ func TestTxConnCommitFailure(t *testing.T) { TabletType: topodatapb.TabletType_PRIMARY, }, TransactionId: 1, - TabletAlias: sbc1.Tablet().Alias, + TabletAlias: sbcs[1].Tablet().Alias, + }}, + } + utils.MustMatch(t, &wantSession, session.Session, "Session") + + sc.ExecuteMultiShard(ctx, nil, rssa, threeQueries, session, false, false) + wantSession = vtgatepb.Session{ + InTransaction: true, + ShardSessions: []*vtgatepb.Session_ShardSession{{ + Target: &querypb.Target{ + Keyspace: "TestTxConn", + Shard: "0", + TabletType: topodatapb.TabletType_PRIMARY, + }, + TransactionId: 1, + TabletAlias: sbcs[0].Tablet().Alias, + }, { + Target: &querypb.Target{ + Keyspace: "TestTxConn", + Shard: "1", + TabletType: topodatapb.TabletType_PRIMARY, + }, + TransactionId: 1, + TabletAlias: sbcs[1].Tablet().Alias, + }, { + Target: &querypb.Target{ + Keyspace: "TestTxConn", + Shard: "2", + TabletType: topodatapb.TabletType_PRIMARY, + }, + TransactionId: 1, + TabletAlias: sbcs[2].Tablet().Alias, }}, } utils.MustMatch(t, &wantSession, session.Session, "Session") - sbc1.MustFailCodes[vtrpcpb.Code_DEADLINE_EXCEEDED] = 1 + sbcs[2].MustFailCodes[vtrpcpb.Code_DEADLINE_EXCEEDED] = 1 expectErr := NewShardError(vterrors.New( vtrpcpb.Code_DEADLINE_EXCEEDED, fmt.Sprintf("%v error", vtrpcpb.Code_DEADLINE_EXCEEDED)), - rss1[0].Target) + rssm[2][0].Target) require.ErrorContains(t, sc.txConn.Commit(ctx, session), expectErr.Error()) wantSession = vtgatepb.Session{ Warnings: []*querypb.QueryWarning{ { Code: uint32(sqlerror.ERNonAtomicCommit), - Message: "multi-db commit failed after committing to 1 shards", + Message: "multi-db commit failed after committing to 2 shards: 0, 1", }, }, } utils.MustMatch(t, &wantSession, session.Session, "Session") - assert.EqualValues(t, 1, sbc0.CommitCount.Load(), "sbc0.CommitCount") - assert.EqualValues(t, 1, sbc1.CommitCount.Load(), "sbc1.CommitCount") + assert.EqualValues(t, 1, sbcs[0].CommitCount.Load(), "sbc0.CommitCount") } func TestTxConnCommitSuccess(t *testing.T) { @@ -1367,17 +1401,40 @@ func TestTxConnAccessModeReset(t *testing.T) { func newTestTxConnEnv(t *testing.T, ctx context.Context, name string) (sc *ScatterConn, sbc0, sbc1 *sandboxconn.SandboxConn, rss0, rss1, rss01 []*srvtopo.ResolvedShard) { t.Helper() createSandbox(name) + sc, sbcs, rssl, rssa := newTestTxConnEnvNShards(t, ctx, name, 2) + return sc, sbcs[0], sbcs[1], rssl[0], rssl[1], rssa +} + +func newTestTxConnEnvNShards(t *testing.T, ctx context.Context, name string, n int) ( + sc *ScatterConn, sbcl []*sandboxconn.SandboxConn, rssl [][]*srvtopo.ResolvedShard, rssa []*srvtopo.ResolvedShard, +) { + t.Helper() + createSandbox(name) + hc := discovery.NewFakeHealthCheck(nil) sc = newTestScatterConn(ctx, hc, newSandboxForCells(ctx, []string{"aa"}), "aa") - sbc0 = hc.AddTestTablet("aa", "0", 1, name, "0", topodatapb.TabletType_PRIMARY, true, 1, nil) - sbc1 = hc.AddTestTablet("aa", "1", 1, name, "1", topodatapb.TabletType_PRIMARY, true, 1, nil) + + sNames := make([]string, n) + for i := 0; i < n; i++ { + sNames[i] = strconv.FormatInt(int64(i), 10) + } + + sbcl = make([]*sandboxconn.SandboxConn, len(sNames)) + for i, sName := range sNames { + sbcl[i] = hc.AddTestTablet("aa", sName, int32(i)+1, name, sName, topodatapb.TabletType_PRIMARY, true, 1, nil) + } + res := srvtopo.NewResolver(newSandboxForCells(ctx, []string{"aa"}), sc.gateway, "aa") - var err error - rss0, err = res.ResolveDestination(ctx, name, topodatapb.TabletType_PRIMARY, key.DestinationShard("0")) - require.NoError(t, err) - rss1, err = res.ResolveDestination(ctx, name, topodatapb.TabletType_PRIMARY, key.DestinationShard("1")) - require.NoError(t, err) - rss01, err = res.ResolveDestination(ctx, name, topodatapb.TabletType_PRIMARY, key.DestinationShards([]string{"0", "1"})) + + rssl = make([][]*srvtopo.ResolvedShard, len(sNames)) + for i, sName := range sNames { + rss, err := res.ResolveDestination(ctx, name, topodatapb.TabletType_PRIMARY, key.DestinationShard(sName)) + require.NoError(t, err) + rssl[i] = rss + } + + rssa, err := res.ResolveDestination(ctx, name, topodatapb.TabletType_PRIMARY, key.DestinationShards(sNames)) require.NoError(t, err) - return sc, sbc0, sbc1, rss0, rss1, rss01 + + return sc, sbcl, rssl, rssa } From 56ab355f4c4704138cc99677a4facaaa985c6392 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Wed, 10 Jan 2024 08:15:37 -0500 Subject: [PATCH 4/4] cr: test shard truncation after non atomic commit warn max shards Signed-off-by: Max Englander --- go/vt/vtgate/tx_conn_test.go | 102 +++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/go/vt/vtgate/tx_conn_test.go b/go/vt/vtgate/tx_conn_test.go index e773179f1f6..6ce863603f8 100644 --- a/go/vt/vtgate/tx_conn_test.go +++ b/go/vt/vtgate/tx_conn_test.go @@ -165,6 +165,108 @@ func TestTxConnCommitFailure(t *testing.T) { assert.EqualValues(t, 1, sbcs[0].CommitCount.Load(), "sbc0.CommitCount") } +func TestTxConnCommitFailureAfterNonAtomicCommitMaxShards(t *testing.T) { + ctx := utils.LeakCheckContext(t) + + sc, sbcs, rssm, _ := newTestTxConnEnvNShards(t, ctx, "TestTxConn", 18) + sc.txConn.mode = vtgatepb.TransactionMode_MULTI + + // Sequence the executes to ensure commit order + + session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) + wantSession := vtgatepb.Session{ + InTransaction: true, + ShardSessions: []*vtgatepb.Session_ShardSession{}, + } + + for i := 0; i < 18; i++ { + sc.ExecuteMultiShard(ctx, nil, rssm[i], queries, session, false, false) + wantSession.ShardSessions = append(wantSession.ShardSessions, &vtgatepb.Session_ShardSession{ + Target: &querypb.Target{ + Keyspace: "TestTxConn", + Shard: rssm[i][0].Target.Shard, + TabletType: topodatapb.TabletType_PRIMARY, + }, + TransactionId: 1, + TabletAlias: sbcs[i].Tablet().Alias, + }) + utils.MustMatch(t, &wantSession, session.Session, "Session") + } + + sbcs[17].MustFailCodes[vtrpcpb.Code_DEADLINE_EXCEEDED] = 1 + + expectErr := NewShardError(vterrors.New( + vtrpcpb.Code_DEADLINE_EXCEEDED, + fmt.Sprintf("%v error", vtrpcpb.Code_DEADLINE_EXCEEDED)), + rssm[17][0].Target) + + require.ErrorContains(t, sc.txConn.Commit(ctx, session), expectErr.Error()) + wantSession = vtgatepb.Session{ + Warnings: []*querypb.QueryWarning{ + { + Code: uint32(sqlerror.ERNonAtomicCommit), + Message: "multi-db commit failed after committing to 17 shards: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, ...", + }, + }, + } + + utils.MustMatch(t, &wantSession, session.Session, "Session") + for i := 0; i < 17; i++ { + assert.EqualValues(t, 1, sbcs[i].CommitCount.Load(), fmt.Sprintf("sbc%d.CommitCount", i)) + } +} + +func TestTxConnCommitFailureBeforeNonAtomicCommitMaxShards(t *testing.T) { + ctx := utils.LeakCheckContext(t) + + sc, sbcs, rssm, _ := newTestTxConnEnvNShards(t, ctx, "TestTxConn", 17) + sc.txConn.mode = vtgatepb.TransactionMode_MULTI + + // Sequence the executes to ensure commit order + + session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) + wantSession := vtgatepb.Session{ + InTransaction: true, + ShardSessions: []*vtgatepb.Session_ShardSession{}, + } + + for i := 0; i < 17; i++ { + sc.ExecuteMultiShard(ctx, nil, rssm[i], queries, session, false, false) + wantSession.ShardSessions = append(wantSession.ShardSessions, &vtgatepb.Session_ShardSession{ + Target: &querypb.Target{ + Keyspace: "TestTxConn", + Shard: rssm[i][0].Target.Shard, + TabletType: topodatapb.TabletType_PRIMARY, + }, + TransactionId: 1, + TabletAlias: sbcs[i].Tablet().Alias, + }) + utils.MustMatch(t, &wantSession, session.Session, "Session") + } + + sbcs[16].MustFailCodes[vtrpcpb.Code_DEADLINE_EXCEEDED] = 1 + + expectErr := NewShardError(vterrors.New( + vtrpcpb.Code_DEADLINE_EXCEEDED, + fmt.Sprintf("%v error", vtrpcpb.Code_DEADLINE_EXCEEDED)), + rssm[16][0].Target) + + require.ErrorContains(t, sc.txConn.Commit(ctx, session), expectErr.Error()) + wantSession = vtgatepb.Session{ + Warnings: []*querypb.QueryWarning{ + { + Code: uint32(sqlerror.ERNonAtomicCommit), + Message: "multi-db commit failed after committing to 16 shards: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15", + }, + }, + } + + utils.MustMatch(t, &wantSession, session.Session, "Session") + for i := 0; i < 16; i++ { + assert.EqualValues(t, 1, sbcs[i].CommitCount.Load(), fmt.Sprintf("sbc%d.CommitCount", i)) + } +} + func TestTxConnCommitSuccess(t *testing.T) { ctx := utils.LeakCheckContext(t)