Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Take replication lag into account while selecting primary candidate #14634

Merged
merged 7 commits into from
Dec 5, 2023
16 changes: 8 additions & 8 deletions go/vt/vtctl/reparentutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ func ChooseNewPrimary(
tb := tablet.Tablet
errorGroup.Go(func() error {
// find and store the positions for the tablet
pos, err := findPositionForTablet(groupCtx, tb, logger, tmc, waitReplicasTimeout)
pos, replLag, err := findPositionAndLagForTablet(groupCtx, tb, logger, tmc, waitReplicasTimeout)
mu.Lock()
defer mu.Unlock()
if err == nil {
if err == nil && waitReplicasTimeout >= replLag {
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
validTablets = append(validTablets, tb)
tabletPositions = append(tabletPositions, pos)
}
Expand All @@ -127,9 +127,9 @@ func ChooseNewPrimary(
return validTablets[0].Alias, nil
}

// findPositionForTablet processes the replication position for a single tablet and
// findPositionAndLagForTablet processes the replication position and lag for a single tablet and
// returns it. It is safe to call from multiple goroutines.
func findPositionForTablet(ctx context.Context, tablet *topodatapb.Tablet, logger logutil.Logger, tmc tmclient.TabletManagerClient, waitTimeout time.Duration) (replication.Position, error) {
func findPositionAndLagForTablet(ctx context.Context, tablet *topodatapb.Tablet, logger logutil.Logger, tmc tmclient.TabletManagerClient, waitTimeout time.Duration) (replication.Position, time.Duration, error) {
logger.Infof("getting replication position from %v", topoproto.TabletAliasString(tablet.Alias))

ctx, cancel := context.WithTimeout(ctx, waitTimeout)
Expand All @@ -140,10 +140,10 @@ func findPositionForTablet(ctx context.Context, tablet *topodatapb.Tablet, logge
sqlErr, isSQLErr := sqlerror.NewSQLErrorFromError(err).(*sqlerror.SQLError)
if isSQLErr && sqlErr != nil && sqlErr.Number() == sqlerror.ERNotReplica {
logger.Warningf("no replication statue from %v, using empty gtid set", topoproto.TabletAliasString(tablet.Alias))
return replication.Position{}, nil
return replication.Position{}, 0, nil
}
logger.Warningf("failed to get replication status from %v, ignoring tablet: %v", topoproto.TabletAliasString(tablet.Alias), err)
return replication.Position{}, err
return replication.Position{}, 0, err
}

// Use the relay log position if available, otherwise use the executed GTID set (binary log position).
Expand All @@ -154,10 +154,10 @@ func findPositionForTablet(ctx context.Context, tablet *topodatapb.Tablet, logge
pos, err := replication.DecodePosition(positionString)
if err != nil {
logger.Warningf("cannot decode replica position %v for tablet %v, ignoring tablet: %v", positionString, topoproto.TabletAliasString(tablet.Alias), err)
return replication.Position{}, err
return replication.Position{}, 0, err
}

return pos, nil
return pos, time.Second * time.Duration(status.ReplicationLagSeconds), nil
}

// FindCurrentPrimary returns the current primary tablet of a shard, if any. The
Expand Down
74 changes: 70 additions & 4 deletions go/vt/vtctl/reparentutil/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,65 @@ func TestChooseNewPrimary(t *testing.T) {
},
shouldErr: false,
},
{
name: "found a replica - ignore one with replication lag",
tmc: &chooseNewPrimaryTestTMClient{
// zone1-101 is behind zone1-102
replicationStatuses: map[string]*replicationdatapb.Status{
"zone1-0000000101": {
Position: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1",
},
"zone1-0000000102": {
Position: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5",
ReplicationLagSeconds: 232,
},
},
},
shardInfo: topo.NewShardInfo("testkeyspace", "-", &topodatapb.Shard{
PrimaryAlias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 100,
},
}, nil),
tabletMap: map[string]*topo.TabletInfo{
"primary": {
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 100,
},
Type: topodatapb.TabletType_PRIMARY,
},
},
"replica1": {
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 101,
},
Type: topodatapb.TabletType_REPLICA,
},
},
"replica2": {
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 102,
},
Type: topodatapb.TabletType_REPLICA,
},
},
},
avoidPrimaryAlias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 0,
},
expected: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 101,
},
shouldErr: false,
},
{
name: "found a replica - more advanced relay log position",
tmc: &chooseNewPrimaryTestTMClient{
Expand Down Expand Up @@ -465,6 +524,7 @@ func TestFindPositionForTablet(t *testing.T) {
tmc *testutil.TabletManagerClient
tablet *topodatapb.Tablet
expectedPosition string
expectedLag time.Duration
expectedErr string
}{
{
Expand All @@ -476,7 +536,8 @@ func TestFindPositionForTablet(t *testing.T) {
}{
"zone1-0000000100": {
Position: &replicationdatapb.Status{
Position: "MySQL56/3e11fa47-71ca-11e1-9e33-c80aa9429562:1-5",
Position: "MySQL56/3e11fa47-71ca-11e1-9e33-c80aa9429562:1-5",
ReplicationLagSeconds: 201,
},
},
},
Expand All @@ -487,6 +548,7 @@ func TestFindPositionForTablet(t *testing.T) {
Uid: 100,
},
},
expectedLag: 201 * time.Second,
expectedPosition: "MySQL56/3e11fa47-71ca-11e1-9e33-c80aa9429562:1-5",
}, {
name: "no replication status",
Expand All @@ -506,6 +568,7 @@ func TestFindPositionForTablet(t *testing.T) {
Uid: 100,
},
},
expectedLag: 0,
expectedPosition: "",
}, {
name: "relay log",
Expand All @@ -516,8 +579,9 @@ func TestFindPositionForTablet(t *testing.T) {
}{
"zone1-0000000100": {
Position: &replicationdatapb.Status{
Position: "unused",
RelayLogPosition: "MySQL56/3e11fa47-71ca-11e1-9e33-c80aa9429562:1-5",
Position: "unused",
RelayLogPosition: "MySQL56/3e11fa47-71ca-11e1-9e33-c80aa9429562:1-5",
ReplicationLagSeconds: 291,
},
},
},
Expand All @@ -528,6 +592,7 @@ func TestFindPositionForTablet(t *testing.T) {
Uid: 100,
},
},
expectedLag: 291 * time.Second,
expectedPosition: "MySQL56/3e11fa47-71ca-11e1-9e33-c80aa9429562:1-5",
}, {
name: "error in parsing position",
Expand Down Expand Up @@ -555,14 +620,15 @@ func TestFindPositionForTablet(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
pos, err := findPositionForTablet(ctx, test.tablet, logger, test.tmc, 10*time.Second)
pos, lag, err := findPositionAndLagForTablet(ctx, test.tablet, logger, test.tmc, 10*time.Second)
if test.expectedErr != "" {
require.EqualError(t, err, test.expectedErr)
return
}
require.NoError(t, err)
posString := replication.EncodePosition(pos)
require.Equal(t, test.expectedPosition, posString)
require.Equal(t, test.expectedLag, lag)
})
}
}
Expand Down
Loading