diff --git a/changelog/19.0/19.0.0/summary.md b/changelog/19.0/19.0.0/summary.md
index cde1608562a..dc65c3f7137 100644
--- a/changelog/19.0/19.0.0/summary.md
+++ b/changelog/19.0/19.0.0/summary.md
@@ -17,6 +17,7 @@
- [Multi Table Delete Support](#multi-table-delete)
- [`SHOW VSCHEMA KEYSPACES` Query](#show-vschema-keyspaces)
- [`FOREIGN_KEY_CHECKS` is now a Vitess Aware Variable](#fk-checks-vitess-aware)
+ - [Partial Multi-shard Commit Warnings](#partial-multi-shard-commit-warnings)
- **[Vttestserver](#vttestserver)**
- [`--vtcombo-bind-host` flag](#vtcombo-bind-host)
- **[Minor Changes](#minor-changes)**
@@ -64,7 +65,6 @@ Prior to 19.0 VTTablet reported how much time non-streaming executions spend wai
The build version (e.g., `19.0.0-SNAPSHOT`) has been added to `/debug/vars`, allowing users to programmatically inspect Vitess components' build version at runtime.
-
### Planned Reparent Shard
#### `--tolerable-replication-lag` Sub-flag
@@ -74,7 +74,6 @@ This feature is opt-in and not specifying this sub-flag makes Vitess ignore the
A new flag in VTOrc with the same name has been added to control the behaviour of the PlannedReparentShard calls that VTOrc issues.
-
### Query Compatibility
#### Multi Table Delete Support
@@ -107,6 +106,24 @@ mysql> show vschema keyspaces;
When VTGate receives a query to change the `FOREIGN_KEY_CHECKS` value for a session, instead of sending the value down to MySQL, VTGate now keeps track of the value and changes the queries by adding `SET_VAR(FOREIGN_KEY_CHECKS=On/Off)` style query optimizer hints wherever required.
+#### Partial Multi-shard Commit Warnings
+
+When using `multi` transaction mode (the default), it is possible for Vitess to successfully commit to one shard, but fail to commit to a subsequent shard, thus breaking the atomicity of a multi-shard transaction.
+
+In `v19.0`, VTGate reports partial-success commits in warnings, e.g.:
+
+```mysql
+mysql> commit;
+ERROR 1317 (70100): target: customer.-80.primary: vttablet: rpc error: code = Aborted desc = transaction 1703182545849001001: ended at 2023-12-21 14:07:41.515 EST (exceeded timeout: 30s) (CallerID: userData1)
+mysql> show warnings;
++---------+------+----------------------------------------------------------+
+| Level | Code | Message |
++---------+------+----------------------------------------------------------+
+| Warning | 301 | multi-db commit failed after committing to 1 shards: 80- |
++---------+------+----------------------------------------------------------+
+1 row in set, 1 warning (0.00 sec)
+```
+
### Vttestserver
#### `--vtcombo-bind-host` flag
diff --git a/go/mysql/sqlerror/constants.go b/go/mysql/sqlerror/constants.go
index 01de1b6d45c..fdec64588c1 100644
--- a/go/mysql/sqlerror/constants.go
+++ b/go/mysql/sqlerror/constants.go
@@ -34,7 +34,8 @@ func (e ErrorCode) ToString() string {
// See above reference for more information on each code.
const (
// Vitess specific errors, (100-999)
- ERNotReplica = ErrorCode(100)
+ ERNotReplica = ErrorCode(100)
+ ERNonAtomicCommit = ErrorCode(301)
// unknown
ERUnknownError = ErrorCode(1105)
diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go
index 9170093c23e..246140b3711 100644
--- a/go/vt/vtgate/tx_conn.go
+++ b/go/vt/vtgate/tx_conn.go
@@ -19,8 +19,10 @@ package vtgate
import (
"context"
"fmt"
+ "strings"
"sync"
+ "vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/dtids"
"vitess.io/vitess/go/vt/log"
@@ -34,6 +36,10 @@ import (
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)
+// nonAtomicCommitWarnMaxShards limits the number of shard names reported in
+// non-atomic commit warnings.
+const nonAtomicCommitWarnMaxShards = 16
+
// TxConn is used for executing transactional requests.
type TxConn struct {
tabletGateway *TabletGateway
@@ -132,8 +138,27 @@ func (txc *TxConn) commitNormal(ctx context.Context, session *SafeSession) error
}
// Retain backward compatibility on commit order for the normal session.
- for _, shardSession := range session.ShardSessions {
+ for i, shardSession := range session.ShardSessions {
if err := txc.commitShard(ctx, shardSession, session.logging); err != nil {
+ if i > 0 {
+ nShards := i
+ elipsis := false
+ if i > nonAtomicCommitWarnMaxShards {
+ nShards = nonAtomicCommitWarnMaxShards
+ elipsis = true
+ }
+ sNames := make([]string, nShards, nShards+1 /*...*/)
+ for j := 0; j < nShards; j++ {
+ sNames[j] = session.ShardSessions[j].Target.Shard
+ }
+ if elipsis {
+ sNames = append(sNames, "...")
+ }
+ session.RecordWarning(&querypb.QueryWarning{
+ Code: uint32(sqlerror.ERNonAtomicCommit),
+ Message: fmt.Sprintf("multi-db commit failed after committing to %d shards: %s", i, strings.Join(sNames, ", ")),
+ })
+ }
_ = txc.Release(ctx, session)
return err
}
diff --git a/go/vt/vtgate/tx_conn_test.go b/go/vt/vtgate/tx_conn_test.go
index 3fc141c64ac..6ce863603f8 100644
--- a/go/vt/vtgate/tx_conn_test.go
+++ b/go/vt/vtgate/tx_conn_test.go
@@ -19,6 +19,7 @@ package vtgate
import (
"context"
"fmt"
+ "strconv"
"testing"
"github.com/stretchr/testify/assert"
@@ -27,6 +28,7 @@ import (
"github.com/stretchr/testify/require"
+ "vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/srvtopo"
@@ -41,6 +43,7 @@ import (
var queries = []*querypb.BoundQuery{{Sql: "query1"}}
var twoQueries = []*querypb.BoundQuery{{Sql: "query1"}, {Sql: "query1"}}
+var threeQueries = []*querypb.BoundQuery{{Sql: "query1"}, {Sql: "query1"}, {Sql: "query1"}}
func TestTxConnBegin(t *testing.T) {
ctx := utils.LeakCheckContext(t)
@@ -67,12 +70,13 @@ func TestTxConnBegin(t *testing.T) {
func TestTxConnCommitFailure(t *testing.T) {
ctx := utils.LeakCheckContext(t)
- sc, sbc0, sbc1, rss0, rss1, rss01 := newTestTxConnEnv(t, ctx, "TestTxConn")
+ sc, sbcs, rssm, rssa := newTestTxConnEnvNShards(t, ctx, "TestTxConn", 3)
sc.txConn.mode = vtgatepb.TransactionMode_MULTI
// Sequence the executes to ensure commit order
+
session := NewSafeSession(&vtgatepb.Session{InTransaction: true})
- sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false)
+ sc.ExecuteMultiShard(ctx, nil, rssm[0], queries, session, false, false)
wantSession := vtgatepb.Session{
InTransaction: true,
ShardSessions: []*vtgatepb.Session_ShardSession{{
@@ -82,11 +86,12 @@ func TestTxConnCommitFailure(t *testing.T) {
TabletType: topodatapb.TabletType_PRIMARY,
},
TransactionId: 1,
- TabletAlias: sbc0.Tablet().Alias,
+ TabletAlias: sbcs[0].Tablet().Alias,
}},
}
utils.MustMatch(t, &wantSession, session.Session, "Session")
- sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false)
+
+ sc.ExecuteMultiShard(ctx, nil, rssm[1], queries, session, false, false)
wantSession = vtgatepb.Session{
InTransaction: true,
ShardSessions: []*vtgatepb.Session_ShardSession{{
@@ -96,7 +101,7 @@ func TestTxConnCommitFailure(t *testing.T) {
TabletType: topodatapb.TabletType_PRIMARY,
},
TransactionId: 1,
- TabletAlias: sbc0.Tablet().Alias,
+ TabletAlias: sbcs[0].Tablet().Alias,
}, {
Target: &querypb.Target{
Keyspace: "TestTxConn",
@@ -104,23 +109,162 @@ func TestTxConnCommitFailure(t *testing.T) {
TabletType: topodatapb.TabletType_PRIMARY,
},
TransactionId: 1,
- TabletAlias: sbc1.Tablet().Alias,
+ TabletAlias: sbcs[1].Tablet().Alias,
}},
}
utils.MustMatch(t, &wantSession, session.Session, "Session")
- sbc1.MustFailCodes[vtrpcpb.Code_DEADLINE_EXCEEDED] = 1
+ sc.ExecuteMultiShard(ctx, nil, rssa, threeQueries, session, false, false)
+ wantSession = vtgatepb.Session{
+ InTransaction: true,
+ ShardSessions: []*vtgatepb.Session_ShardSession{{
+ Target: &querypb.Target{
+ Keyspace: "TestTxConn",
+ Shard: "0",
+ TabletType: topodatapb.TabletType_PRIMARY,
+ },
+ TransactionId: 1,
+ TabletAlias: sbcs[0].Tablet().Alias,
+ }, {
+ Target: &querypb.Target{
+ Keyspace: "TestTxConn",
+ Shard: "1",
+ TabletType: topodatapb.TabletType_PRIMARY,
+ },
+ TransactionId: 1,
+ TabletAlias: sbcs[1].Tablet().Alias,
+ }, {
+ Target: &querypb.Target{
+ Keyspace: "TestTxConn",
+ Shard: "2",
+ TabletType: topodatapb.TabletType_PRIMARY,
+ },
+ TransactionId: 1,
+ TabletAlias: sbcs[2].Tablet().Alias,
+ }},
+ }
+ utils.MustMatch(t, &wantSession, session.Session, "Session")
+
+ sbcs[2].MustFailCodes[vtrpcpb.Code_DEADLINE_EXCEEDED] = 1
expectErr := NewShardError(vterrors.New(
vtrpcpb.Code_DEADLINE_EXCEEDED,
fmt.Sprintf("%v error", vtrpcpb.Code_DEADLINE_EXCEEDED)),
- rss1[0].Target)
+ rssm[2][0].Target)
require.ErrorContains(t, sc.txConn.Commit(ctx, session), expectErr.Error())
- wantSession = vtgatepb.Session{}
+ wantSession = vtgatepb.Session{
+ Warnings: []*querypb.QueryWarning{
+ {
+ Code: uint32(sqlerror.ERNonAtomicCommit),
+ Message: "multi-db commit failed after committing to 2 shards: 0, 1",
+ },
+ },
+ }
utils.MustMatch(t, &wantSession, session.Session, "Session")
- assert.EqualValues(t, 1, sbc0.CommitCount.Load(), "sbc0.CommitCount")
- assert.EqualValues(t, 1, sbc1.CommitCount.Load(), "sbc1.CommitCount")
+ assert.EqualValues(t, 1, sbcs[0].CommitCount.Load(), "sbc0.CommitCount")
+}
+
+func TestTxConnCommitFailureAfterNonAtomicCommitMaxShards(t *testing.T) {
+ ctx := utils.LeakCheckContext(t)
+
+ sc, sbcs, rssm, _ := newTestTxConnEnvNShards(t, ctx, "TestTxConn", 18)
+ sc.txConn.mode = vtgatepb.TransactionMode_MULTI
+
+ // Sequence the executes to ensure commit order
+
+ session := NewSafeSession(&vtgatepb.Session{InTransaction: true})
+ wantSession := vtgatepb.Session{
+ InTransaction: true,
+ ShardSessions: []*vtgatepb.Session_ShardSession{},
+ }
+
+ for i := 0; i < 18; i++ {
+ sc.ExecuteMultiShard(ctx, nil, rssm[i], queries, session, false, false)
+ wantSession.ShardSessions = append(wantSession.ShardSessions, &vtgatepb.Session_ShardSession{
+ Target: &querypb.Target{
+ Keyspace: "TestTxConn",
+ Shard: rssm[i][0].Target.Shard,
+ TabletType: topodatapb.TabletType_PRIMARY,
+ },
+ TransactionId: 1,
+ TabletAlias: sbcs[i].Tablet().Alias,
+ })
+ utils.MustMatch(t, &wantSession, session.Session, "Session")
+ }
+
+ sbcs[17].MustFailCodes[vtrpcpb.Code_DEADLINE_EXCEEDED] = 1
+
+ expectErr := NewShardError(vterrors.New(
+ vtrpcpb.Code_DEADLINE_EXCEEDED,
+ fmt.Sprintf("%v error", vtrpcpb.Code_DEADLINE_EXCEEDED)),
+ rssm[17][0].Target)
+
+ require.ErrorContains(t, sc.txConn.Commit(ctx, session), expectErr.Error())
+ wantSession = vtgatepb.Session{
+ Warnings: []*querypb.QueryWarning{
+ {
+ Code: uint32(sqlerror.ERNonAtomicCommit),
+ Message: "multi-db commit failed after committing to 17 shards: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, ...",
+ },
+ },
+ }
+
+ utils.MustMatch(t, &wantSession, session.Session, "Session")
+ for i := 0; i < 17; i++ {
+ assert.EqualValues(t, 1, sbcs[i].CommitCount.Load(), fmt.Sprintf("sbc%d.CommitCount", i))
+ }
+}
+
+func TestTxConnCommitFailureBeforeNonAtomicCommitMaxShards(t *testing.T) {
+ ctx := utils.LeakCheckContext(t)
+
+ sc, sbcs, rssm, _ := newTestTxConnEnvNShards(t, ctx, "TestTxConn", 17)
+ sc.txConn.mode = vtgatepb.TransactionMode_MULTI
+
+ // Sequence the executes to ensure commit order
+
+ session := NewSafeSession(&vtgatepb.Session{InTransaction: true})
+ wantSession := vtgatepb.Session{
+ InTransaction: true,
+ ShardSessions: []*vtgatepb.Session_ShardSession{},
+ }
+
+ for i := 0; i < 17; i++ {
+ sc.ExecuteMultiShard(ctx, nil, rssm[i], queries, session, false, false)
+ wantSession.ShardSessions = append(wantSession.ShardSessions, &vtgatepb.Session_ShardSession{
+ Target: &querypb.Target{
+ Keyspace: "TestTxConn",
+ Shard: rssm[i][0].Target.Shard,
+ TabletType: topodatapb.TabletType_PRIMARY,
+ },
+ TransactionId: 1,
+ TabletAlias: sbcs[i].Tablet().Alias,
+ })
+ utils.MustMatch(t, &wantSession, session.Session, "Session")
+ }
+
+ sbcs[16].MustFailCodes[vtrpcpb.Code_DEADLINE_EXCEEDED] = 1
+
+ expectErr := NewShardError(vterrors.New(
+ vtrpcpb.Code_DEADLINE_EXCEEDED,
+ fmt.Sprintf("%v error", vtrpcpb.Code_DEADLINE_EXCEEDED)),
+ rssm[16][0].Target)
+
+ require.ErrorContains(t, sc.txConn.Commit(ctx, session), expectErr.Error())
+ wantSession = vtgatepb.Session{
+ Warnings: []*querypb.QueryWarning{
+ {
+ Code: uint32(sqlerror.ERNonAtomicCommit),
+ Message: "multi-db commit failed after committing to 16 shards: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15",
+ },
+ },
+ }
+
+ utils.MustMatch(t, &wantSession, session.Session, "Session")
+ for i := 0; i < 16; i++ {
+ assert.EqualValues(t, 1, sbcs[i].CommitCount.Load(), fmt.Sprintf("sbc%d.CommitCount", i))
+ }
}
func TestTxConnCommitSuccess(t *testing.T) {
@@ -1359,17 +1503,40 @@ func TestTxConnAccessModeReset(t *testing.T) {
func newTestTxConnEnv(t *testing.T, ctx context.Context, name string) (sc *ScatterConn, sbc0, sbc1 *sandboxconn.SandboxConn, rss0, rss1, rss01 []*srvtopo.ResolvedShard) {
t.Helper()
createSandbox(name)
+ sc, sbcs, rssl, rssa := newTestTxConnEnvNShards(t, ctx, name, 2)
+ return sc, sbcs[0], sbcs[1], rssl[0], rssl[1], rssa
+}
+
+func newTestTxConnEnvNShards(t *testing.T, ctx context.Context, name string, n int) (
+ sc *ScatterConn, sbcl []*sandboxconn.SandboxConn, rssl [][]*srvtopo.ResolvedShard, rssa []*srvtopo.ResolvedShard,
+) {
+ t.Helper()
+ createSandbox(name)
+
hc := discovery.NewFakeHealthCheck(nil)
sc = newTestScatterConn(ctx, hc, newSandboxForCells(ctx, []string{"aa"}), "aa")
- sbc0 = hc.AddTestTablet("aa", "0", 1, name, "0", topodatapb.TabletType_PRIMARY, true, 1, nil)
- sbc1 = hc.AddTestTablet("aa", "1", 1, name, "1", topodatapb.TabletType_PRIMARY, true, 1, nil)
+
+ sNames := make([]string, n)
+ for i := 0; i < n; i++ {
+ sNames[i] = strconv.FormatInt(int64(i), 10)
+ }
+
+ sbcl = make([]*sandboxconn.SandboxConn, len(sNames))
+ for i, sName := range sNames {
+ sbcl[i] = hc.AddTestTablet("aa", sName, int32(i)+1, name, sName, topodatapb.TabletType_PRIMARY, true, 1, nil)
+ }
+
res := srvtopo.NewResolver(newSandboxForCells(ctx, []string{"aa"}), sc.gateway, "aa")
- var err error
- rss0, err = res.ResolveDestination(ctx, name, topodatapb.TabletType_PRIMARY, key.DestinationShard("0"))
- require.NoError(t, err)
- rss1, err = res.ResolveDestination(ctx, name, topodatapb.TabletType_PRIMARY, key.DestinationShard("1"))
- require.NoError(t, err)
- rss01, err = res.ResolveDestination(ctx, name, topodatapb.TabletType_PRIMARY, key.DestinationShards([]string{"0", "1"}))
+
+ rssl = make([][]*srvtopo.ResolvedShard, len(sNames))
+ for i, sName := range sNames {
+ rss, err := res.ResolveDestination(ctx, name, topodatapb.TabletType_PRIMARY, key.DestinationShard(sName))
+ require.NoError(t, err)
+ rssl[i] = rss
+ }
+
+ rssa, err := res.ResolveDestination(ctx, name, topodatapb.TabletType_PRIMARY, key.DestinationShards(sNames))
require.NoError(t, err)
- return sc, sbc0, sbc1, rss0, rss1, rss01
+
+ return sc, sbcl, rssl, rssa
}