Skip to content

Commit

Permalink
Backoff on inconsistent state
Browse files Browse the repository at this point in the history
Signed-off-by: narcsfz <[email protected]>
  • Loading branch information
5antelope committed Nov 29, 2021
1 parent 297227d commit 4225733
Show file tree
Hide file tree
Showing 10 changed files with 354 additions and 129 deletions.
2 changes: 1 addition & 1 deletion go/vt/vtgr/controller/diagnose.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (shard *GRShard) Diagnose(ctx context.Context) (DiagnoseType, error) {
shard.shardStatusCollector.recordDiagnoseResult(diagnoseResult)
shard.populateVTGRStatusLocked()
if diagnoseResult != DiagnoseTypeHealthy {
shard.logger.Warningf(`VTGR diagnose shard as unhealthy for %s/%s: result=%v | last_result=%v | instances=%v | primary=%v | primary_tablet=%v | problematics=%v | unreachables=%v | SQL group=%v`,
shard.logger.Warningf(`VTGR diagnose shard as unhealthy for %s/%s: result=%v, last_result=%v, instances=%v, primary=%v, primary_tablet=%v, problematics=%v, unreachables=%v,\n%v`,
shard.KeyspaceShard.Keyspace, shard.KeyspaceShard.Shard,
shard.shardStatusCollector.status.DiagnoseResult,
shard.lastDiagnoseResult,
Expand Down
181 changes: 108 additions & 73 deletions go/vt/vtgr/controller/diagnose_test.go

Large diffs are not rendered by default.

57 changes: 50 additions & 7 deletions go/vt/vtgr/controller/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ limitations under the License.
package controller

import (
"flag"
"fmt"
"math"
"sort"
"strings"
"sync"
Expand All @@ -31,22 +33,31 @@ import (
var (
groupOnlineSize = stats.NewGaugesWithMultiLabels("MysqlGroupOnlineSize", "Online MySQL server in the group", []string{"Keyspace", "Shard"})
isLostQuorum = stats.NewGaugesWithMultiLabels("MysqlGroupLostQuorum", "If MySQL group lost quorum", []string{"Keyspace", "Shard"})

heartbeatThreshold = flag.Int("group_heartbeat_threshold", 0, "Group heartbeat staleness threshold. Need to set together with -enable_heartbeat_check")
)

// SQLGroup contains views from all the nodes within the shard
type SQLGroup struct {
views []*db.GroupView
resolvedView *ResolvedView
logger *log.Logger
size int
singlePrimary bool
statsTags []string
views []*db.GroupView
resolvedView *ResolvedView
logger *log.Logger
size int
singlePrimary bool
heartbeatStalenessThreshold int
statsTags []string
sync.Mutex
}

// NewSQLGroup creates a new SQLGroup
func NewSQLGroup(size int, singlePrimary bool, keyspace, shard string) *SQLGroup {
return &SQLGroup{size: size, singlePrimary: singlePrimary, statsTags: []string{keyspace, shard}, logger: log.NewVTGRLogger(keyspace, shard)}
return &SQLGroup{
size: size,
singlePrimary: singlePrimary,
statsTags: []string{keyspace, shard},
logger: log.NewVTGRLogger(keyspace, shard),
heartbeatStalenessThreshold: *heartbeatThreshold,
}
}

// ResolvedView is the resolved view
Expand Down Expand Up @@ -205,6 +216,14 @@ func (group *SQLGroup) Resolve() error {
func (group *SQLGroup) resolveLocked() error {
rv := &ResolvedView{logger: group.logger}
group.resolvedView = rv
// a node that is not in the group might be outlier with big lag
// iterate over all views to get global minStalenessResult first
minStalenessResult := math.MaxInt32
for _, view := range group.views {
if view.HeartbeatStaleness < minStalenessResult {
minStalenessResult = view.HeartbeatStaleness
}
}
m := make(map[inst.InstanceKey]db.GroupMember)
for _, view := range group.views {
if rv.groupName == "" && view.GroupName != "" {
Expand Down Expand Up @@ -233,6 +252,30 @@ func (group *SQLGroup) resolveLocked() error {
if st.State == memberState && st.Role == memberRole && st.ReadOnly == isReadOnly {
continue
}
// Members in a group should eventually converge on a state
// if there is a partition, then a node should be removed from
// a group. If we a node is reported as ONLINE together with
// some other staet, we back off if we see a node with diverged state
if memberState != db.UNKNOWNSTATE &&
st.State != db.UNKNOWNSTATE &&
st.State != memberState &&
(st.State == db.ONLINE || memberState == db.ONLINE) {
group.logger.Warningf("found inconsistent member state for %v: %v vs %v", instance.Hostname, st.State, memberState)
if group.heartbeatStalenessThreshold != 0 &&
// Check minStalenessResult among the group is not math.MaxInt32
// which means at least one node returns the lag from _vt.heartbeat table
// otherwise we don't trigger backoff on inconsistent state
minStalenessResult != math.MaxInt32 &&
minStalenessResult >= group.heartbeatStalenessThreshold {
group.logger.Warningf("ErrGroupBackoffError by staled heartbeat check %v", minStalenessResult)
var sb strings.Builder
for _, view := range group.views {
sb.WriteString(fmt.Sprintf("%v staleness=%v\n", view.MySQLHost, view.HeartbeatStaleness))
}
group.logger.Warningf("%v", sb.String())
return db.ErrGroupBackoffError
}
}
m[instance] = db.GroupMember{
HostName: instance.Hostname,
Port: instance.Port,
Expand Down
105 changes: 104 additions & 1 deletion go/vt/vtgr/controller/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package controller

import (
"math"
"testing"

"vitess.io/vitess/go/vt/vtgr/log"
Expand Down Expand Up @@ -199,7 +200,7 @@ func TestNetworkPartition(t *testing.T) {
group.recordView(v2)
group.recordView(v3)
err := group.Resolve()
assert.Errorf(t, err, "group backoff error")
assert.EqualErrorf(t, err, "group backoff error", err.Error())
rv := group.resolvedView
assert.Equal(t, "group", rv.groupName)
assert.Equal(t, map[inst.InstanceKey]db.GroupMember{
Expand All @@ -209,6 +210,108 @@ func TestNetworkPartition(t *testing.T) {
}, rv.view)
}

func TestInconsistentState(t *testing.T) {
group := NewSQLGroup(3, true, "ks", "0")
v1 := db.NewGroupView("v1", "host1", 10)
v1.GroupName = "group"
v1.HeartbeatStaleness = 11
v1.UnresolvedMembers = []*db.GroupMember{
db.NewGroupMember("ONLINE", "PRIMARY", "host1", 10, false),
db.NewGroupMember("ONLINE", "SECONDARY", "host2", 10, true),
db.NewGroupMember("ONLINE", "SECONDARY", "host3", 10, true),
}
v2 := db.NewGroupView("v2", "host2", 10)
v2.GroupName = "group"
v2.HeartbeatStaleness = 11
v2.UnresolvedMembers = []*db.GroupMember{
db.NewGroupMember("OFFLINE", "", "host2", 10, true),
}
v3 := db.NewGroupView("v3", "host3", 10)
v3.GroupName = "group"
v3.HeartbeatStaleness = 13
v3.UnresolvedMembers = []*db.GroupMember{
db.NewGroupMember("OFFLINE", "", "host3", 10, true),
}
group.recordView(v1)
group.recordView(v2)
group.recordView(v3)
group.heartbeatStalenessThreshold = 10
err := group.Resolve()
assert.EqualErrorf(t, err, "group backoff error", err.Error())
rv := group.resolvedView
assert.Equal(t, "group", rv.groupName)
assert.Nil(t, rv.view)
}

func TestInconsistentStateWithInvalidStaleResult(t *testing.T) {
group := NewSQLGroup(3, true, "ks", "0")
v1 := db.NewGroupView("v1", "host1", 10)
v1.GroupName = "group"
v1.HeartbeatStaleness = math.MaxInt32
v1.UnresolvedMembers = []*db.GroupMember{
db.NewGroupMember("ONLINE", "PRIMARY", "host1", 10, false),
db.NewGroupMember("ONLINE", "SECONDARY", "host2", 10, true),
db.NewGroupMember("ONLINE", "SECONDARY", "host3", 10, true),
}
v2 := db.NewGroupView("v2", "host2", 10)
v2.GroupName = "group"
v2.HeartbeatStaleness = math.MaxInt32
v2.UnresolvedMembers = []*db.GroupMember{
db.NewGroupMember("OFFLINE", "", "host2", 10, true),
}
v3 := db.NewGroupView("v3", "host3", 10)
v3.GroupName = "group"
v3.HeartbeatStaleness = math.MaxInt32
v3.UnresolvedMembers = []*db.GroupMember{
db.NewGroupMember("OFFLINE", "", "host3", 10, true),
}
group.recordView(v1)
group.recordView(v2)
group.recordView(v3)
group.heartbeatStalenessThreshold = 10
err := group.Resolve()
// Same setup as TestInconsistentState but because HeartbeatStaleness are all MaxInt32
// the backoff is not triggered
assert.NoError(t, err)
rv := group.resolvedView
assert.Equal(t, "group", rv.groupName)
}

func TestInconsistentUnknownState(t *testing.T) {
group := NewSQLGroup(3, true, "ks", "0")
v1 := db.NewGroupView("v1", "host1", 10)
v1.GroupName = "group"
v1.UnresolvedMembers = []*db.GroupMember{
db.NewGroupMember("ONLINE", "PRIMARY", "host1", 10, false),
db.NewGroupMember("RECOVERING", "SECONDARY", "host2", 10, true),
db.NewGroupMember("ONLINE", "SECONDARY", "host3", 10, true),
}
v2 := db.NewGroupView("v2", "host2", 10)
v2.GroupName = "group"
v2.UnresolvedMembers = []*db.GroupMember{
db.NewGroupMember("", "", "host2", 10, true),
}
v3 := db.NewGroupView("v3", "host3", 10)
v3.GroupName = "group"
v3.UnresolvedMembers = []*db.GroupMember{
db.NewGroupMember("ONLINE", "SECONDARY", "host3", 10, true),
}
group.recordView(v1)
group.recordView(v2)
group.recordView(v3)
err := group.Resolve()
// host 2 reports itself with empty state
// therefore we shouldn't raise error even with inconsistent state
assert.NoError(t, err)
rv := group.resolvedView
assert.Equal(t, "group", rv.groupName)
assert.Equal(t, map[inst.InstanceKey]db.GroupMember{
{Hostname: "host1", Port: 10}: {HostName: "host1", Port: 10, Role: db.PRIMARY, State: db.ONLINE, ReadOnly: false},
{Hostname: "host2", Port: 10}: {HostName: "host2", Port: 10, Role: db.SECONDARY, State: db.RECOVERING, ReadOnly: true},
{Hostname: "host3", Port: 10}: {HostName: "host3", Port: 10, Role: db.SECONDARY, State: db.ONLINE, ReadOnly: true},
}, rv.view)
}

func TestIsBootstrapInProcess(t *testing.T) {
group := NewSQLGroup(3, true, "ks", "0")
v1 := db.NewGroupView("v1", "host1", 10)
Expand Down
26 changes: 13 additions & 13 deletions go/vt/vtgr/controller/refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ var (
// grInstance represents an instance that's running MySQL GR
// it wraps a InstanceKey plus some tablet related information
type grInstance struct {
instanceKey *inst.InstanceKey
tablet *topodatapb.Tablet
primaryTimeStamp time.Time
alias string
instanceKey *inst.InstanceKey
tablet *topodatapb.Tablet
masterTimeStamp time.Time
alias string
}

// GRTopo is VTGR wrapper for topo server
Expand Down Expand Up @@ -215,14 +215,14 @@ func (shard *GRShard) refreshPrimaryShard(ctx context.Context) (string, error) {
// findPrimaryFromLocalCell iterates through the replicas stored in grShard and returns
// the one that's marked as primary
func (shard *GRShard) findPrimaryFromLocalCell() string {
var latestPrimaryTimestamp time.Time
var latestMasterTimestamp time.Time
var primaryInstance *grInstance
for _, instance := range shard.instances {
if instance.tablet.Type == topodatapb.TabletType_PRIMARY {
if instance.tablet.Type == topodatapb.TabletType_MASTER {
// It is possible that there are more than one master in topo server
// we should compare timestamp to pick the latest one
if latestPrimaryTimestamp.Before(instance.primaryTimeStamp) {
latestPrimaryTimestamp = instance.primaryTimeStamp
if latestMasterTimestamp.Before(instance.masterTimeStamp) {
latestMasterTimestamp = instance.masterTimeStamp
primaryInstance = instance
}
}
Expand All @@ -240,7 +240,7 @@ func parseTabletInfos(tablets map[string]*topo.TabletInfo) []*grInstance {
var newReplicas []*grInstance
for alias, tabletInfo := range tablets {
tablet := tabletInfo.Tablet
// Only monitor primary, replica and ronly tablet types
// Only monitor master, replica and ronly tablet types
switch tablet.Type {
case topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY:
// mysql hostname and port might be empty here if tablet is not running
Expand All @@ -250,10 +250,10 @@ func parseTabletInfos(tablets map[string]*topo.TabletInfo) []*grInstance {
Port: int(tablet.MysqlPort),
}
grInstance := grInstance{
instanceKey: &instanceKey,
tablet: tablet,
primaryTimeStamp: logutil.ProtoToTime(tablet.PrimaryTermStartTime),
alias: alias,
instanceKey: &instanceKey,
tablet: tablet,
masterTimeStamp: logutil.ProtoToTime(tablet.PrimaryTermStartTime),
alias: alias,
}
newReplicas = append(newReplicas, &grInstance)
}
Expand Down
10 changes: 5 additions & 5 deletions go/vt/vtgr/controller/refresh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestRefreshTabletsInShard(t *testing.T) {
})
assert.Equal(t, testHost, instances[0].tablet.Hostname)
assert.Equal(t, int32(testPort0), instances[0].tablet.MysqlPort)
assert.Equal(t, topodatapb.TabletType_PRIMARY, instances[0].tablet.Type)
assert.Equal(t, topodatapb.TabletType_MASTER, instances[0].tablet.Type)
// host 3 is missing mysql host but we still put it in the instances list here
assert.Equal(t, testHost, instances[1].instanceKey.Hostname)
assert.Equal(t, int32(0), instances[1].tablet.MysqlPort)
Expand Down Expand Up @@ -139,11 +139,11 @@ func TestLockRelease(t *testing.T) {
assert.EqualError(t, err, "lost topology lock; aborting: shard ks/0 is not locked (no lockInfo in map)")
}

func buildTabletInfo(id uint32, host string, mysqlPort int, ttype topodatapb.TabletType, primaryTermTime time.Time) *topo.TabletInfo {
return buildTabletInfoWithCell(id, host, "test_cell", mysqlPort, ttype, primaryTermTime)
func buildTabletInfo(id uint32, host string, mysqlPort int, ttype topodatapb.TabletType, masterTermTime time.Time) *topo.TabletInfo {
return buildTabletInfoWithCell(id, host, "test_cell", mysqlPort, ttype, masterTermTime)
}

func buildTabletInfoWithCell(id uint32, host, cell string, mysqlPort int, ttype topodatapb.TabletType, primaryTermTime time.Time) *topo.TabletInfo {
func buildTabletInfoWithCell(id uint32, host, cell string, mysqlPort int, ttype topodatapb.TabletType, masterTermTime time.Time) *topo.TabletInfo {
alias := &topodatapb.TabletAlias{Cell: cell, Uid: id}
return &topo.TabletInfo{Tablet: &topodatapb.Tablet{
Alias: alias,
Expand All @@ -153,7 +153,7 @@ func buildTabletInfoWithCell(id uint32, host, cell string, mysqlPort int, ttype
Keyspace: "ks",
Shard: "0",
Type: ttype,
PrimaryTermStartTime: logutil.TimeToProto(primaryTermTime),
PrimaryTermStartTime: logutil.TimeToProto(masterTermTime),
Tags: map[string]string{"hostname": fmt.Sprintf("host_%d", id)},
}}
}
20 changes: 12 additions & 8 deletions go/vt/vtgr/controller/repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (shard *GRShard) stopAndRebootstrap(ctx context.Context) error {
return shard.dbAgent.RebootstrapGroupLocked(candidate.instanceKey, uuid)
}

func (shard *GRShard) getGTIDSetFromAll(skipPrimary bool) (*groupGTIDRecorder, *concurrency.AllErrorRecorder, error) {
func (shard *GRShard) getGTIDSetFromAll(skipMaster bool) (*groupGTIDRecorder, *concurrency.AllErrorRecorder, error) {
if len(shard.instances) == 0 {
return nil, nil, fmt.Errorf("%v has 0 instance", formatKeyspaceShard(shard.KeyspaceShard))
}
Expand All @@ -315,9 +315,9 @@ func (shard *GRShard) getGTIDSetFromAll(skipPrimary bool) (*groupGTIDRecorder, *
primary := shard.findShardPrimaryTablet()
var mysqlPrimaryHost string
var mysqlPrimaryPort int
// skipPrimary is true when we manual failover or if there is a unreachalbe primary tablet
// skipMaster is true when we manual failover or if there is a unreachalbe primary tablet
// in both case, there should be a reconciled primary tablet
if skipPrimary && primary != nil {
if skipMaster && primary != nil {
status := shard.sqlGroup.GetStatus(primary.instanceKey)
mysqlPrimaryHost, mysqlPrimaryPort = status.HostName, status.Port
shard.logger.Infof("Found primary instance from MySQL on %v", mysqlPrimaryHost)
Expand All @@ -328,7 +328,7 @@ func (shard *GRShard) getGTIDSetFromAll(skipPrimary bool) (*groupGTIDRecorder, *
// that is unreachable
errorRecorder := shard.forAllInstances(func(instance *grInstance, wg *sync.WaitGroup, er concurrency.ErrorRecorder) {
defer wg.Done()
if skipPrimary && instance.instanceKey.Hostname == mysqlPrimaryHost && instance.instanceKey.Port == mysqlPrimaryPort {
if skipMaster && instance.instanceKey.Hostname == mysqlPrimaryHost && instance.instanceKey.Port == mysqlPrimaryPort {
shard.logger.Infof("Skip %v to failover to a non-primary node", mysqlPrimaryHost)
return
}
Expand Down Expand Up @@ -407,7 +407,7 @@ func (shard *GRShard) findFailoverCandidate(ctx context.Context) (*grInstance, e
return !shard.shardStatusCollector.isUnreachable(instance)
})
if err != nil {
shard.logger.Errorf("Failed to find failover candidate by GTID after forAllInstances: %v", err)
shard.logger.Errorf("Failed to find failover candidate by GTID after fanout: %v", err)
return nil, err
}
if candidate == nil {
Expand Down Expand Up @@ -460,9 +460,9 @@ func (shard *GRShard) fixPrimaryTabletLocked(ctx context.Context) error {
if err := shard.checkShardLocked(ctx); err != nil {
return err
}
err := shard.tmc.ChangeType(ctx, candidate.tablet, topodatapb.TabletType_PRIMARY)
err := shard.tmc.ChangeType(ctx, candidate.tablet, topodatapb.TabletType_MASTER)
if err != nil {
return fmt.Errorf("failed to change type to primary on %v: %v", candidate.alias, err)
return fmt.Errorf("failed to change type to master on %v: %v", candidate.alias, err)
}
shard.logger.Infof("Successfully make %v the primary tablet", candidate.alias)
return nil
Expand Down Expand Up @@ -653,11 +653,15 @@ func (shard *GRShard) failoverLocked(ctx context.Context) error {
return err
}
shard.logger.Infof("Successfully failover MySQL to %v for %v", candidate.instanceKey.Hostname, formatKeyspaceShard(shard.KeyspaceShard))
if !shard.isActive.Get() {
shard.logger.Infof("Skip vttablet failover on an inactive shard %v", formatKeyspaceShard(shard.KeyspaceShard))
return nil
}
// Make sure we still hold the topo server lock before moving on
if err := shard.checkShardLocked(ctx); err != nil {
return err
}
err = shard.tmc.ChangeType(ctx, candidate.tablet, topodatapb.TabletType_PRIMARY)
err = shard.tmc.ChangeType(ctx, candidate.tablet, topodatapb.TabletType_MASTER)
if err != nil {
shard.logger.Errorf("Failed to failover Vitess %v", candidate.alias)
return err
Expand Down
Loading

0 comments on commit 4225733

Please sign in to comment.