Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vtgate: record warning for partially successful cross-shard commits #14848

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions changelog/19.0/19.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- [Multi Table Delete Support](#multi-table-delete)
- [`SHOW VSCHEMA KEYSPACES` Query](#show-vschema-keyspaces)
- [`FOREIGN_KEY_CHECKS` is now a Vitess Aware Variable](#fk-checks-vitess-aware)
- [Partial Multi-shard Commit Warnings](#partial-multi-shard-commit-warnings)
- **[Vttestserver](#vttestserver)**
- [`--vtcombo-bind-host` flag](#vtcombo-bind-host)
- **[Minor Changes](#minor-changes)**
Expand Down Expand Up @@ -64,7 +65,6 @@ Prior to 19.0 VTTablet reported how much time non-streaming executions spend wai

The build version (e.g., `19.0.0-SNAPSHOT`) has been added to `/debug/vars`, allowing users to programmatically inspect Vitess components' build version at runtime.


### <a id="planned-reparent-shard"/>Planned Reparent Shard

#### <a id="tolerable-repl-lag"/>`--tolerable-replication-lag` Sub-flag
Expand All @@ -74,7 +74,6 @@ This feature is opt-in and not specifying this sub-flag makes Vitess ignore the

A new flag in VTOrc with the same name has been added to control the behaviour of the PlannedReparentShard calls that VTOrc issues.


### <a id="query-compatibility"/>Query Compatibility

#### <a id="multi-table-delete"/> Multi Table Delete Support
Expand Down Expand Up @@ -107,6 +106,24 @@ mysql> show vschema keyspaces;

When VTGate receives a query to change the `FOREIGN_KEY_CHECKS` value for a session, instead of sending the value down to MySQL, VTGate now keeps track of the value and changes the queries by adding `SET_VAR(FOREIGN_KEY_CHECKS=On/Off)` style query optimizer hints wherever required.

#### <a id="partial-multi-shard-commit-warnings"/>Partial Multi-shard Commit Warnings

When using `multi` transaction mode (the default), it is possible for Vitess to successfully commit to one shard, but fail to commit to a subsequent shard, thus breaking the atomicity of a multi-shard transaction.

In `v19.0`, VTGate reports partial-success commits in warnings, e.g.:

```mysql
mysql> commit;
ERROR 1317 (70100): target: customer.-80.primary: vttablet: rpc error: code = Aborted desc = transaction 1703182545849001001: ended at 2023-12-21 14:07:41.515 EST (exceeded timeout: 30s) (CallerID: userData1)
mysql> show warnings;
+---------+------+----------------------------------------------------------+
| Level | Code | Message |
+---------+------+----------------------------------------------------------+
| Warning | 301 | multi-db commit failed after committing to 1 shards: 80- |
+---------+------+----------------------------------------------------------+
1 row in set, 1 warning (0.00 sec)
```

### <a id="vttestserver"/>Vttestserver

#### <a id="vtcombo-bind-host"/>`--vtcombo-bind-host` flag
Expand Down
3 changes: 2 additions & 1 deletion go/mysql/sqlerror/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ func (e ErrorCode) ToString() string {
// See above reference for more information on each code.
const (
// Vitess specific errors, (100-999)
ERNotReplica = ErrorCode(100)
ERNotReplica = ErrorCode(100)
ERNonAtomicCommit = ErrorCode(301)

// unknown
ERUnknownError = ErrorCode(1105)
Expand Down
27 changes: 26 additions & 1 deletion go/vt/vtgate/tx_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package vtgate
import (
"context"
"fmt"
"strings"
"sync"

"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/dtids"
"vitess.io/vitess/go/vt/log"
Expand All @@ -34,6 +36,10 @@ import (
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// nonAtomicCommitWarnMaxShards limits the number of shard names reported in
// non-atomic commit warnings.
const nonAtomicCommitWarnMaxShards = 16

// TxConn is used for executing transactional requests.
type TxConn struct {
tabletGateway *TabletGateway
Expand Down Expand Up @@ -132,8 +138,27 @@ func (txc *TxConn) commitNormal(ctx context.Context, session *SafeSession) error
}

// Retain backward compatibility on commit order for the normal session.
for _, shardSession := range session.ShardSessions {
for i, shardSession := range session.ShardSessions {
if err := txc.commitShard(ctx, shardSession, session.logging); err != nil {
if i > 0 {
nShards := i
elipsis := false
if i > nonAtomicCommitWarnMaxShards {
nShards = nonAtomicCommitWarnMaxShards
elipsis = true
}
sNames := make([]string, nShards, nShards+1 /*...*/)
for j := 0; j < nShards; j++ {
sNames[j] = session.ShardSessions[j].Target.Shard
}
if elipsis {
sNames = append(sNames, "...")
}
Comment on lines +154 to +156
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a test where this is covered?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gladly, added

session.RecordWarning(&querypb.QueryWarning{
Code: uint32(sqlerror.ERNonAtomicCommit),
Message: fmt.Sprintf("multi-db commit failed after committing to %d shards: %s", i, strings.Join(sNames, ", ")),
})
}
_ = txc.Release(ctx, session)
return err
}
Expand Down
207 changes: 187 additions & 20 deletions go/vt/vtgate/tx_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package vtgate
import (
"context"
"fmt"
"strconv"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -27,6 +28,7 @@ import (

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/srvtopo"
Expand All @@ -41,6 +43,7 @@ import (

var queries = []*querypb.BoundQuery{{Sql: "query1"}}
var twoQueries = []*querypb.BoundQuery{{Sql: "query1"}, {Sql: "query1"}}
var threeQueries = []*querypb.BoundQuery{{Sql: "query1"}, {Sql: "query1"}, {Sql: "query1"}}

func TestTxConnBegin(t *testing.T) {
ctx := utils.LeakCheckContext(t)
Expand All @@ -67,12 +70,13 @@ func TestTxConnBegin(t *testing.T) {
func TestTxConnCommitFailure(t *testing.T) {
ctx := utils.LeakCheckContext(t)

sc, sbc0, sbc1, rss0, rss1, rss01 := newTestTxConnEnv(t, ctx, "TestTxConn")
sc, sbcs, rssm, rssa := newTestTxConnEnvNShards(t, ctx, "TestTxConn", 3)
sc.txConn.mode = vtgatepb.TransactionMode_MULTI

// Sequence the executes to ensure commit order

session := NewSafeSession(&vtgatepb.Session{InTransaction: true})
sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false)
sc.ExecuteMultiShard(ctx, nil, rssm[0], queries, session, false, false)
wantSession := vtgatepb.Session{
InTransaction: true,
ShardSessions: []*vtgatepb.Session_ShardSession{{
Expand All @@ -82,11 +86,12 @@ func TestTxConnCommitFailure(t *testing.T) {
TabletType: topodatapb.TabletType_PRIMARY,
},
TransactionId: 1,
TabletAlias: sbc0.Tablet().Alias,
TabletAlias: sbcs[0].Tablet().Alias,
}},
}
utils.MustMatch(t, &wantSession, session.Session, "Session")
sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false)

sc.ExecuteMultiShard(ctx, nil, rssm[1], queries, session, false, false)
wantSession = vtgatepb.Session{
InTransaction: true,
ShardSessions: []*vtgatepb.Session_ShardSession{{
Expand All @@ -96,31 +101,170 @@ func TestTxConnCommitFailure(t *testing.T) {
TabletType: topodatapb.TabletType_PRIMARY,
},
TransactionId: 1,
TabletAlias: sbc0.Tablet().Alias,
TabletAlias: sbcs[0].Tablet().Alias,
}, {
Target: &querypb.Target{
Keyspace: "TestTxConn",
Shard: "1",
TabletType: topodatapb.TabletType_PRIMARY,
},
TransactionId: 1,
TabletAlias: sbc1.Tablet().Alias,
TabletAlias: sbcs[1].Tablet().Alias,
}},
}
utils.MustMatch(t, &wantSession, session.Session, "Session")

sbc1.MustFailCodes[vtrpcpb.Code_DEADLINE_EXCEEDED] = 1
sc.ExecuteMultiShard(ctx, nil, rssa, threeQueries, session, false, false)
wantSession = vtgatepb.Session{
InTransaction: true,
ShardSessions: []*vtgatepb.Session_ShardSession{{
Target: &querypb.Target{
Keyspace: "TestTxConn",
Shard: "0",
TabletType: topodatapb.TabletType_PRIMARY,
},
TransactionId: 1,
TabletAlias: sbcs[0].Tablet().Alias,
}, {
Target: &querypb.Target{
Keyspace: "TestTxConn",
Shard: "1",
TabletType: topodatapb.TabletType_PRIMARY,
},
TransactionId: 1,
TabletAlias: sbcs[1].Tablet().Alias,
}, {
Target: &querypb.Target{
Keyspace: "TestTxConn",
Shard: "2",
TabletType: topodatapb.TabletType_PRIMARY,
},
TransactionId: 1,
TabletAlias: sbcs[2].Tablet().Alias,
}},
}
utils.MustMatch(t, &wantSession, session.Session, "Session")

sbcs[2].MustFailCodes[vtrpcpb.Code_DEADLINE_EXCEEDED] = 1

expectErr := NewShardError(vterrors.New(
vtrpcpb.Code_DEADLINE_EXCEEDED,
fmt.Sprintf("%v error", vtrpcpb.Code_DEADLINE_EXCEEDED)),
rss1[0].Target)
rssm[2][0].Target)

require.ErrorContains(t, sc.txConn.Commit(ctx, session), expectErr.Error())
wantSession = vtgatepb.Session{}
wantSession = vtgatepb.Session{
Warnings: []*querypb.QueryWarning{
{
Code: uint32(sqlerror.ERNonAtomicCommit),
Message: "multi-db commit failed after committing to 2 shards: 0, 1",
},
},
}
utils.MustMatch(t, &wantSession, session.Session, "Session")
assert.EqualValues(t, 1, sbc0.CommitCount.Load(), "sbc0.CommitCount")
assert.EqualValues(t, 1, sbc1.CommitCount.Load(), "sbc1.CommitCount")
assert.EqualValues(t, 1, sbcs[0].CommitCount.Load(), "sbc0.CommitCount")
}

func TestTxConnCommitFailureAfterNonAtomicCommitMaxShards(t *testing.T) {
ctx := utils.LeakCheckContext(t)

sc, sbcs, rssm, _ := newTestTxConnEnvNShards(t, ctx, "TestTxConn", 18)
sc.txConn.mode = vtgatepb.TransactionMode_MULTI

// Sequence the executes to ensure commit order

session := NewSafeSession(&vtgatepb.Session{InTransaction: true})
wantSession := vtgatepb.Session{
InTransaction: true,
ShardSessions: []*vtgatepb.Session_ShardSession{},
}

for i := 0; i < 18; i++ {
sc.ExecuteMultiShard(ctx, nil, rssm[i], queries, session, false, false)
wantSession.ShardSessions = append(wantSession.ShardSessions, &vtgatepb.Session_ShardSession{
Target: &querypb.Target{
Keyspace: "TestTxConn",
Shard: rssm[i][0].Target.Shard,
TabletType: topodatapb.TabletType_PRIMARY,
},
TransactionId: 1,
TabletAlias: sbcs[i].Tablet().Alias,
})
utils.MustMatch(t, &wantSession, session.Session, "Session")
}

sbcs[17].MustFailCodes[vtrpcpb.Code_DEADLINE_EXCEEDED] = 1

expectErr := NewShardError(vterrors.New(
vtrpcpb.Code_DEADLINE_EXCEEDED,
fmt.Sprintf("%v error", vtrpcpb.Code_DEADLINE_EXCEEDED)),
rssm[17][0].Target)

require.ErrorContains(t, sc.txConn.Commit(ctx, session), expectErr.Error())
wantSession = vtgatepb.Session{
Warnings: []*querypb.QueryWarning{
{
Code: uint32(sqlerror.ERNonAtomicCommit),
Message: "multi-db commit failed after committing to 17 shards: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, ...",
},
},
}

utils.MustMatch(t, &wantSession, session.Session, "Session")
for i := 0; i < 17; i++ {
assert.EqualValues(t, 1, sbcs[i].CommitCount.Load(), fmt.Sprintf("sbc%d.CommitCount", i))
}
}

func TestTxConnCommitFailureBeforeNonAtomicCommitMaxShards(t *testing.T) {
ctx := utils.LeakCheckContext(t)

sc, sbcs, rssm, _ := newTestTxConnEnvNShards(t, ctx, "TestTxConn", 17)
sc.txConn.mode = vtgatepb.TransactionMode_MULTI

// Sequence the executes to ensure commit order

session := NewSafeSession(&vtgatepb.Session{InTransaction: true})
wantSession := vtgatepb.Session{
InTransaction: true,
ShardSessions: []*vtgatepb.Session_ShardSession{},
}

for i := 0; i < 17; i++ {
sc.ExecuteMultiShard(ctx, nil, rssm[i], queries, session, false, false)
wantSession.ShardSessions = append(wantSession.ShardSessions, &vtgatepb.Session_ShardSession{
Target: &querypb.Target{
Keyspace: "TestTxConn",
Shard: rssm[i][0].Target.Shard,
TabletType: topodatapb.TabletType_PRIMARY,
},
TransactionId: 1,
TabletAlias: sbcs[i].Tablet().Alias,
})
utils.MustMatch(t, &wantSession, session.Session, "Session")
}

sbcs[16].MustFailCodes[vtrpcpb.Code_DEADLINE_EXCEEDED] = 1

expectErr := NewShardError(vterrors.New(
vtrpcpb.Code_DEADLINE_EXCEEDED,
fmt.Sprintf("%v error", vtrpcpb.Code_DEADLINE_EXCEEDED)),
rssm[16][0].Target)

require.ErrorContains(t, sc.txConn.Commit(ctx, session), expectErr.Error())
wantSession = vtgatepb.Session{
Warnings: []*querypb.QueryWarning{
{
Code: uint32(sqlerror.ERNonAtomicCommit),
Message: "multi-db commit failed after committing to 16 shards: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15",
},
},
}

utils.MustMatch(t, &wantSession, session.Session, "Session")
for i := 0; i < 16; i++ {
assert.EqualValues(t, 1, sbcs[i].CommitCount.Load(), fmt.Sprintf("sbc%d.CommitCount", i))
}
}

func TestTxConnCommitSuccess(t *testing.T) {
Expand Down Expand Up @@ -1359,17 +1503,40 @@ func TestTxConnAccessModeReset(t *testing.T) {
func newTestTxConnEnv(t *testing.T, ctx context.Context, name string) (sc *ScatterConn, sbc0, sbc1 *sandboxconn.SandboxConn, rss0, rss1, rss01 []*srvtopo.ResolvedShard) {
t.Helper()
createSandbox(name)
sc, sbcs, rssl, rssa := newTestTxConnEnvNShards(t, ctx, name, 2)
return sc, sbcs[0], sbcs[1], rssl[0], rssl[1], rssa
}

func newTestTxConnEnvNShards(t *testing.T, ctx context.Context, name string, n int) (
sc *ScatterConn, sbcl []*sandboxconn.SandboxConn, rssl [][]*srvtopo.ResolvedShard, rssa []*srvtopo.ResolvedShard,
) {
t.Helper()
createSandbox(name)

hc := discovery.NewFakeHealthCheck(nil)
sc = newTestScatterConn(ctx, hc, newSandboxForCells(ctx, []string{"aa"}), "aa")
sbc0 = hc.AddTestTablet("aa", "0", 1, name, "0", topodatapb.TabletType_PRIMARY, true, 1, nil)
sbc1 = hc.AddTestTablet("aa", "1", 1, name, "1", topodatapb.TabletType_PRIMARY, true, 1, nil)

sNames := make([]string, n)
for i := 0; i < n; i++ {
sNames[i] = strconv.FormatInt(int64(i), 10)
}

sbcl = make([]*sandboxconn.SandboxConn, len(sNames))
for i, sName := range sNames {
sbcl[i] = hc.AddTestTablet("aa", sName, int32(i)+1, name, sName, topodatapb.TabletType_PRIMARY, true, 1, nil)
}

res := srvtopo.NewResolver(newSandboxForCells(ctx, []string{"aa"}), sc.gateway, "aa")
var err error
rss0, err = res.ResolveDestination(ctx, name, topodatapb.TabletType_PRIMARY, key.DestinationShard("0"))
require.NoError(t, err)
rss1, err = res.ResolveDestination(ctx, name, topodatapb.TabletType_PRIMARY, key.DestinationShard("1"))
require.NoError(t, err)
rss01, err = res.ResolveDestination(ctx, name, topodatapb.TabletType_PRIMARY, key.DestinationShards([]string{"0", "1"}))

rssl = make([][]*srvtopo.ResolvedShard, len(sNames))
for i, sName := range sNames {
rss, err := res.ResolveDestination(ctx, name, topodatapb.TabletType_PRIMARY, key.DestinationShard(sName))
require.NoError(t, err)
rssl[i] = rss
}

rssa, err := res.ResolveDestination(ctx, name, topodatapb.TabletType_PRIMARY, key.DestinationShards(sNames))
require.NoError(t, err)
return sc, sbc0, sbc1, rss0, rss1, rss01

return sc, sbcl, rssl, rssa
}
Loading