Skip to content

Commit

Permalink
vtgate: record warning for partially successful cross-shard commits (#…
Browse files Browse the repository at this point in the history
…14848)

Signed-off-by: Max Englander <[email protected]>
  • Loading branch information
maxenglander authored Jan 11, 2024
1 parent edce68b commit 961a978
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 24 deletions.
21 changes: 19 additions & 2 deletions changelog/19.0/19.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- [Multi Table Delete Support](#multi-table-delete)
- [`SHOW VSCHEMA KEYSPACES` Query](#show-vschema-keyspaces)
- [`FOREIGN_KEY_CHECKS` is now a Vitess Aware Variable](#fk-checks-vitess-aware)
- [Partial Multi-shard Commit Warnings](#partial-multi-shard-commit-warnings)
- **[Vttestserver](#vttestserver)**
- [`--vtcombo-bind-host` flag](#vtcombo-bind-host)
- **[Minor Changes](#minor-changes)**
Expand Down Expand Up @@ -64,7 +65,6 @@ Prior to 19.0 VTTablet reported how much time non-streaming executions spend wai

The build version (e.g., `19.0.0-SNAPSHOT`) has been added to `/debug/vars`, allowing users to programmatically inspect Vitess components' build version at runtime.


### <a id="planned-reparent-shard"/>Planned Reparent Shard

#### <a id="tolerable-repl-lag"/>`--tolerable-replication-lag` Sub-flag
Expand All @@ -74,7 +74,6 @@ This feature is opt-in and not specifying this sub-flag makes Vitess ignore the

A new flag in VTOrc with the same name has been added to control the behaviour of the PlannedReparentShard calls that VTOrc issues.


### <a id="query-compatibility"/>Query Compatibility

#### <a id="multi-table-delete"/> Multi Table Delete Support
Expand Down Expand Up @@ -107,6 +106,24 @@ mysql> show vschema keyspaces;

When VTGate receives a query to change the `FOREIGN_KEY_CHECKS` value for a session, instead of sending the value down to MySQL, VTGate now keeps track of the value and changes the queries by adding `SET_VAR(FOREIGN_KEY_CHECKS=On/Off)` style query optimizer hints wherever required.

#### <a id="partial-multi-shard-commit-warnings"/>Partial Multi-shard Commit Warnings

When using `multi` transaction mode (the default), it is possible for Vitess to successfully commit to one shard, but fail to commit to a subsequent shard, thus breaking the atomicity of a multi-shard transaction.

In `v19.0`, VTGate reports partial-success commits in warnings, e.g.:

```mysql
mysql> commit;
ERROR 1317 (70100): target: customer.-80.primary: vttablet: rpc error: code = Aborted desc = transaction 1703182545849001001: ended at 2023-12-21 14:07:41.515 EST (exceeded timeout: 30s) (CallerID: userData1)
mysql> show warnings;
+---------+------+----------------------------------------------------------+
| Level | Code | Message |
+---------+------+----------------------------------------------------------+
| Warning | 301 | multi-db commit failed after committing to 1 shards: 80- |
+---------+------+----------------------------------------------------------+
1 row in set, 1 warning (0.00 sec)
```

### <a id="vttestserver"/>Vttestserver

#### <a id="vtcombo-bind-host"/>`--vtcombo-bind-host` flag
Expand Down
3 changes: 2 additions & 1 deletion go/mysql/sqlerror/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ func (e ErrorCode) ToString() string {
// See above reference for more information on each code.
const (
// Vitess specific errors, (100-999)
ERNotReplica = ErrorCode(100)
ERNotReplica = ErrorCode(100)
ERNonAtomicCommit = ErrorCode(301)

// unknown
ERUnknownError = ErrorCode(1105)
Expand Down
27 changes: 26 additions & 1 deletion go/vt/vtgate/tx_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package vtgate
import (
"context"
"fmt"
"strings"
"sync"

"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/dtids"
"vitess.io/vitess/go/vt/log"
Expand All @@ -34,6 +36,10 @@ import (
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// nonAtomicCommitWarnMaxShards limits the number of shard names reported in
// non-atomic commit warnings.
const nonAtomicCommitWarnMaxShards = 16

// TxConn is used for executing transactional requests.
type TxConn struct {
tabletGateway *TabletGateway
Expand Down Expand Up @@ -132,8 +138,27 @@ func (txc *TxConn) commitNormal(ctx context.Context, session *SafeSession) error
}

// Retain backward compatibility on commit order for the normal session.
for _, shardSession := range session.ShardSessions {
for i, shardSession := range session.ShardSessions {
if err := txc.commitShard(ctx, shardSession, session.logging); err != nil {
if i > 0 {
nShards := i
elipsis := false
if i > nonAtomicCommitWarnMaxShards {
nShards = nonAtomicCommitWarnMaxShards
elipsis = true
}
sNames := make([]string, nShards, nShards+1 /*...*/)
for j := 0; j < nShards; j++ {
sNames[j] = session.ShardSessions[j].Target.Shard
}
if elipsis {
sNames = append(sNames, "...")
}
session.RecordWarning(&querypb.QueryWarning{
Code: uint32(sqlerror.ERNonAtomicCommit),
Message: fmt.Sprintf("multi-db commit failed after committing to %d shards: %s", i, strings.Join(sNames, ", ")),
})
}
_ = txc.Release(ctx, session)
return err
}
Expand Down
207 changes: 187 additions & 20 deletions go/vt/vtgate/tx_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package vtgate
import (
"context"
"fmt"
"strconv"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -27,6 +28,7 @@ import (

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/srvtopo"
Expand All @@ -41,6 +43,7 @@ import (

var queries = []*querypb.BoundQuery{{Sql: "query1"}}
var twoQueries = []*querypb.BoundQuery{{Sql: "query1"}, {Sql: "query1"}}
var threeQueries = []*querypb.BoundQuery{{Sql: "query1"}, {Sql: "query1"}, {Sql: "query1"}}

func TestTxConnBegin(t *testing.T) {
ctx := utils.LeakCheckContext(t)
Expand All @@ -67,12 +70,13 @@ func TestTxConnBegin(t *testing.T) {
func TestTxConnCommitFailure(t *testing.T) {
ctx := utils.LeakCheckContext(t)

sc, sbc0, sbc1, rss0, rss1, rss01 := newTestTxConnEnv(t, ctx, "TestTxConn")
sc, sbcs, rssm, rssa := newTestTxConnEnvNShards(t, ctx, "TestTxConn", 3)
sc.txConn.mode = vtgatepb.TransactionMode_MULTI

// Sequence the executes to ensure commit order

session := NewSafeSession(&vtgatepb.Session{InTransaction: true})
sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false)
sc.ExecuteMultiShard(ctx, nil, rssm[0], queries, session, false, false)
wantSession := vtgatepb.Session{
InTransaction: true,
ShardSessions: []*vtgatepb.Session_ShardSession{{
Expand All @@ -82,11 +86,12 @@ func TestTxConnCommitFailure(t *testing.T) {
TabletType: topodatapb.TabletType_PRIMARY,
},
TransactionId: 1,
TabletAlias: sbc0.Tablet().Alias,
TabletAlias: sbcs[0].Tablet().Alias,
}},
}
utils.MustMatch(t, &wantSession, session.Session, "Session")
sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false)

sc.ExecuteMultiShard(ctx, nil, rssm[1], queries, session, false, false)
wantSession = vtgatepb.Session{
InTransaction: true,
ShardSessions: []*vtgatepb.Session_ShardSession{{
Expand All @@ -96,31 +101,170 @@ func TestTxConnCommitFailure(t *testing.T) {
TabletType: topodatapb.TabletType_PRIMARY,
},
TransactionId: 1,
TabletAlias: sbc0.Tablet().Alias,
TabletAlias: sbcs[0].Tablet().Alias,
}, {
Target: &querypb.Target{
Keyspace: "TestTxConn",
Shard: "1",
TabletType: topodatapb.TabletType_PRIMARY,
},
TransactionId: 1,
TabletAlias: sbc1.Tablet().Alias,
TabletAlias: sbcs[1].Tablet().Alias,
}},
}
utils.MustMatch(t, &wantSession, session.Session, "Session")

sbc1.MustFailCodes[vtrpcpb.Code_DEADLINE_EXCEEDED] = 1
sc.ExecuteMultiShard(ctx, nil, rssa, threeQueries, session, false, false)
wantSession = vtgatepb.Session{
InTransaction: true,
ShardSessions: []*vtgatepb.Session_ShardSession{{
Target: &querypb.Target{
Keyspace: "TestTxConn",
Shard: "0",
TabletType: topodatapb.TabletType_PRIMARY,
},
TransactionId: 1,
TabletAlias: sbcs[0].Tablet().Alias,
}, {
Target: &querypb.Target{
Keyspace: "TestTxConn",
Shard: "1",
TabletType: topodatapb.TabletType_PRIMARY,
},
TransactionId: 1,
TabletAlias: sbcs[1].Tablet().Alias,
}, {
Target: &querypb.Target{
Keyspace: "TestTxConn",
Shard: "2",
TabletType: topodatapb.TabletType_PRIMARY,
},
TransactionId: 1,
TabletAlias: sbcs[2].Tablet().Alias,
}},
}
utils.MustMatch(t, &wantSession, session.Session, "Session")

sbcs[2].MustFailCodes[vtrpcpb.Code_DEADLINE_EXCEEDED] = 1

expectErr := NewShardError(vterrors.New(
vtrpcpb.Code_DEADLINE_EXCEEDED,
fmt.Sprintf("%v error", vtrpcpb.Code_DEADLINE_EXCEEDED)),
rss1[0].Target)
rssm[2][0].Target)

require.ErrorContains(t, sc.txConn.Commit(ctx, session), expectErr.Error())
wantSession = vtgatepb.Session{}
wantSession = vtgatepb.Session{
Warnings: []*querypb.QueryWarning{
{
Code: uint32(sqlerror.ERNonAtomicCommit),
Message: "multi-db commit failed after committing to 2 shards: 0, 1",
},
},
}
utils.MustMatch(t, &wantSession, session.Session, "Session")
assert.EqualValues(t, 1, sbc0.CommitCount.Load(), "sbc0.CommitCount")
assert.EqualValues(t, 1, sbc1.CommitCount.Load(), "sbc1.CommitCount")
assert.EqualValues(t, 1, sbcs[0].CommitCount.Load(), "sbc0.CommitCount")
}

func TestTxConnCommitFailureAfterNonAtomicCommitMaxShards(t *testing.T) {
ctx := utils.LeakCheckContext(t)

sc, sbcs, rssm, _ := newTestTxConnEnvNShards(t, ctx, "TestTxConn", 18)
sc.txConn.mode = vtgatepb.TransactionMode_MULTI

// Sequence the executes to ensure commit order

session := NewSafeSession(&vtgatepb.Session{InTransaction: true})
wantSession := vtgatepb.Session{
InTransaction: true,
ShardSessions: []*vtgatepb.Session_ShardSession{},
}

for i := 0; i < 18; i++ {
sc.ExecuteMultiShard(ctx, nil, rssm[i], queries, session, false, false)
wantSession.ShardSessions = append(wantSession.ShardSessions, &vtgatepb.Session_ShardSession{
Target: &querypb.Target{
Keyspace: "TestTxConn",
Shard: rssm[i][0].Target.Shard,
TabletType: topodatapb.TabletType_PRIMARY,
},
TransactionId: 1,
TabletAlias: sbcs[i].Tablet().Alias,
})
utils.MustMatch(t, &wantSession, session.Session, "Session")
}

sbcs[17].MustFailCodes[vtrpcpb.Code_DEADLINE_EXCEEDED] = 1

expectErr := NewShardError(vterrors.New(
vtrpcpb.Code_DEADLINE_EXCEEDED,
fmt.Sprintf("%v error", vtrpcpb.Code_DEADLINE_EXCEEDED)),
rssm[17][0].Target)

require.ErrorContains(t, sc.txConn.Commit(ctx, session), expectErr.Error())
wantSession = vtgatepb.Session{
Warnings: []*querypb.QueryWarning{
{
Code: uint32(sqlerror.ERNonAtomicCommit),
Message: "multi-db commit failed after committing to 17 shards: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, ...",
},
},
}

utils.MustMatch(t, &wantSession, session.Session, "Session")
for i := 0; i < 17; i++ {
assert.EqualValues(t, 1, sbcs[i].CommitCount.Load(), fmt.Sprintf("sbc%d.CommitCount", i))
}
}

func TestTxConnCommitFailureBeforeNonAtomicCommitMaxShards(t *testing.T) {
ctx := utils.LeakCheckContext(t)

sc, sbcs, rssm, _ := newTestTxConnEnvNShards(t, ctx, "TestTxConn", 17)
sc.txConn.mode = vtgatepb.TransactionMode_MULTI

// Sequence the executes to ensure commit order

session := NewSafeSession(&vtgatepb.Session{InTransaction: true})
wantSession := vtgatepb.Session{
InTransaction: true,
ShardSessions: []*vtgatepb.Session_ShardSession{},
}

for i := 0; i < 17; i++ {
sc.ExecuteMultiShard(ctx, nil, rssm[i], queries, session, false, false)
wantSession.ShardSessions = append(wantSession.ShardSessions, &vtgatepb.Session_ShardSession{
Target: &querypb.Target{
Keyspace: "TestTxConn",
Shard: rssm[i][0].Target.Shard,
TabletType: topodatapb.TabletType_PRIMARY,
},
TransactionId: 1,
TabletAlias: sbcs[i].Tablet().Alias,
})
utils.MustMatch(t, &wantSession, session.Session, "Session")
}

sbcs[16].MustFailCodes[vtrpcpb.Code_DEADLINE_EXCEEDED] = 1

expectErr := NewShardError(vterrors.New(
vtrpcpb.Code_DEADLINE_EXCEEDED,
fmt.Sprintf("%v error", vtrpcpb.Code_DEADLINE_EXCEEDED)),
rssm[16][0].Target)

require.ErrorContains(t, sc.txConn.Commit(ctx, session), expectErr.Error())
wantSession = vtgatepb.Session{
Warnings: []*querypb.QueryWarning{
{
Code: uint32(sqlerror.ERNonAtomicCommit),
Message: "multi-db commit failed after committing to 16 shards: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15",
},
},
}

utils.MustMatch(t, &wantSession, session.Session, "Session")
for i := 0; i < 16; i++ {
assert.EqualValues(t, 1, sbcs[i].CommitCount.Load(), fmt.Sprintf("sbc%d.CommitCount", i))
}
}

func TestTxConnCommitSuccess(t *testing.T) {
Expand Down Expand Up @@ -1359,17 +1503,40 @@ func TestTxConnAccessModeReset(t *testing.T) {
func newTestTxConnEnv(t *testing.T, ctx context.Context, name string) (sc *ScatterConn, sbc0, sbc1 *sandboxconn.SandboxConn, rss0, rss1, rss01 []*srvtopo.ResolvedShard) {
t.Helper()
createSandbox(name)
sc, sbcs, rssl, rssa := newTestTxConnEnvNShards(t, ctx, name, 2)
return sc, sbcs[0], sbcs[1], rssl[0], rssl[1], rssa
}

func newTestTxConnEnvNShards(t *testing.T, ctx context.Context, name string, n int) (
sc *ScatterConn, sbcl []*sandboxconn.SandboxConn, rssl [][]*srvtopo.ResolvedShard, rssa []*srvtopo.ResolvedShard,
) {
t.Helper()
createSandbox(name)

hc := discovery.NewFakeHealthCheck(nil)
sc = newTestScatterConn(ctx, hc, newSandboxForCells(ctx, []string{"aa"}), "aa")
sbc0 = hc.AddTestTablet("aa", "0", 1, name, "0", topodatapb.TabletType_PRIMARY, true, 1, nil)
sbc1 = hc.AddTestTablet("aa", "1", 1, name, "1", topodatapb.TabletType_PRIMARY, true, 1, nil)

sNames := make([]string, n)
for i := 0; i < n; i++ {
sNames[i] = strconv.FormatInt(int64(i), 10)
}

sbcl = make([]*sandboxconn.SandboxConn, len(sNames))
for i, sName := range sNames {
sbcl[i] = hc.AddTestTablet("aa", sName, int32(i)+1, name, sName, topodatapb.TabletType_PRIMARY, true, 1, nil)
}

res := srvtopo.NewResolver(newSandboxForCells(ctx, []string{"aa"}), sc.gateway, "aa")
var err error
rss0, err = res.ResolveDestination(ctx, name, topodatapb.TabletType_PRIMARY, key.DestinationShard("0"))
require.NoError(t, err)
rss1, err = res.ResolveDestination(ctx, name, topodatapb.TabletType_PRIMARY, key.DestinationShard("1"))
require.NoError(t, err)
rss01, err = res.ResolveDestination(ctx, name, topodatapb.TabletType_PRIMARY, key.DestinationShards([]string{"0", "1"}))

rssl = make([][]*srvtopo.ResolvedShard, len(sNames))
for i, sName := range sNames {
rss, err := res.ResolveDestination(ctx, name, topodatapb.TabletType_PRIMARY, key.DestinationShard(sName))
require.NoError(t, err)
rssl[i] = rss
}

rssa, err := res.ResolveDestination(ctx, name, topodatapb.TabletType_PRIMARY, key.DestinationShards(sNames))
require.NoError(t, err)
return sc, sbc0, sbc1, rss0, rss1, rss01

return sc, sbcl, rssl, rssa
}

0 comments on commit 961a978

Please sign in to comment.