Skip to content

Commit

Permalink
scheduler: fix hot region scheduler select store problem. (#898) (#906)
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored Jan 8, 2018
1 parent 94559ea commit 776bcd9
Showing 1 changed file with 50 additions and 93 deletions.
143 changes: 50 additions & 93 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,23 +235,7 @@ func (h *balanceHotRegionsScheduler) calcScore(items []*core.RegionStat, cluster
}

func (h *balanceHotRegionsScheduler) balanceByPeer(cluster schedule.Cluster, storesStat core.StoreHotRegionsStat) (*core.RegionInfo, *metapb.Peer, *metapb.Peer) {
var (
maxReadBytes uint64
srcStoreID uint64
maxHotStoreRegionCount int
)

// get the srcStoreId
// We choose the store with the maximum hot region first;
// inside these stores, we choose the one with maximum written bytes.
for storeID, statistics := range storesStat {
count, readBytes := statistics.RegionsStat.Len(), statistics.TotalFlowBytes
if count >= 2 && (count > maxHotStoreRegionCount || (count == maxHotStoreRegionCount && readBytes > maxReadBytes)) {
maxHotStoreRegionCount = count
maxReadBytes = readBytes
srcStoreID = storeID
}
}
srcStoreID := h.selectSrcStore(storesStat)
if srcStoreID == 0 {
return nil, nil, nil
}
Expand Down Expand Up @@ -282,9 +266,8 @@ func (h *balanceHotRegionsScheduler) balanceByPeer(cluster schedule.Cluster, sto
destStoreIDs = append(destStoreIDs, store.GetId())
}

destStoreID = h.selectDestStoreByPeer(destStoreIDs, srcRegion, srcStoreID, storesStat)
destStoreID = h.selectDestStore(destStoreIDs, rs.FlowBytes, srcStoreID, storesStat)
if destStoreID != 0 {
srcRegion.ReadBytes = rs.FlowBytes
h.adjustBalanceLimit(srcStoreID, storesStat)

var srcPeer *metapb.Peer
Expand Down Expand Up @@ -314,64 +297,8 @@ func (h *balanceHotRegionsScheduler) balanceByPeer(cluster schedule.Cluster, sto
return nil, nil, nil
}

// selectDestStoreByPeer selects a target store to hold the region of the source region.
// We choose a target store based on the hot region number and written bytes of this store.
func (h *balanceHotRegionsScheduler) selectDestStoreByPeer(candidateStoreIDs []uint64, srcRegion *core.RegionInfo, srcStoreID uint64, storesStat core.StoreHotRegionsStat) uint64 {
sr := storesStat[srcStoreID]
srcReadBytes := sr.TotalFlowBytes
srcHotRegionsCount := sr.RegionsStat.Len()

var (
destStoreID uint64
minReadBytes uint64 = math.MaxUint64
)
minRegionsCount := int(math.MaxInt32)
for _, storeID := range candidateStoreIDs {
if s, ok := storesStat[storeID]; ok {
if srcHotRegionsCount-s.RegionsStat.Len() > 1 && minRegionsCount > s.RegionsStat.Len() {
destStoreID = storeID
minReadBytes = s.TotalFlowBytes
minRegionsCount = s.RegionsStat.Len()
continue
}
if minRegionsCount == s.RegionsStat.Len() && minReadBytes > s.TotalFlowBytes &&
uint64(float64(srcReadBytes)*hotRegionScheduleFactor) > s.TotalFlowBytes+2*srcRegion.ReadBytes {
minReadBytes = s.TotalFlowBytes
destStoreID = storeID
}
} else {
destStoreID = storeID
break
}
}
return destStoreID
}

func (h *balanceHotRegionsScheduler) balanceByLeader(cluster schedule.Cluster, storesStat core.StoreHotRegionsStat) (*core.RegionInfo, *metapb.Peer) {
var (
maxReadBytes uint64
srcStoreID uint64
maxHotStoreRegionCount int
)

// select srcStoreId by leader
for storeID, statistics := range storesStat {
if statistics.RegionsStat.Len() < 2 {
continue
}

if maxHotStoreRegionCount < statistics.RegionsStat.Len() {
maxHotStoreRegionCount = statistics.RegionsStat.Len()
maxReadBytes = statistics.TotalFlowBytes
srcStoreID = storeID
continue
}

if maxHotStoreRegionCount == statistics.RegionsStat.Len() && maxReadBytes < statistics.TotalFlowBytes {
maxReadBytes = statistics.TotalFlowBytes
srcStoreID = storeID
}
}
srcStoreID := h.selectSrcStore(storesStat)
if srcStoreID == 0 {
return nil, nil
}
Expand All @@ -384,7 +311,16 @@ func (h *balanceHotRegionsScheduler) balanceByLeader(cluster schedule.Cluster, s
continue
}

destPeer := h.selectDestStoreByLeader(srcRegion, storesStat)
candidateStoreIDs := make([]uint64, 0, len(srcRegion.Peers)-1)
for id := range srcRegion.GetFollowers() {
candidateStoreIDs = append(candidateStoreIDs, id)
}
destStoreID := h.selectDestStore(candidateStoreIDs, rs.FlowBytes, srcStoreID, storesStat)
if destStoreID == 0 {
continue
}

destPeer := srcRegion.GetStorePeer(destStoreID)
if destPeer != nil {
h.adjustBalanceLimit(srcStoreID, storesStat)
return srcRegion, destPeer
Expand All @@ -393,35 +329,56 @@ func (h *balanceHotRegionsScheduler) balanceByLeader(cluster schedule.Cluster, s
return nil, nil
}

func (h *balanceHotRegionsScheduler) selectDestStoreByLeader(srcRegion *core.RegionInfo, storesStat core.StoreHotRegionsStat) *metapb.Peer {
sr := storesStat[srcRegion.Leader.GetStoreId()]
srcReadBytes := sr.TotalFlowBytes
// Select the store to move hot regions from.
// We choose the store with the maximum number of hot region first.
// Inside these stores, we choose the one with maximum flow bytes.
func (h *balanceHotRegionsScheduler) selectSrcStore(stats core.StoreHotRegionsStat) (srcStoreID uint64) {
var (
maxFlowBytes uint64
maxHotStoreRegionCount int
)

for storeID, statistics := range stats {
count, flowBytes := statistics.RegionsStat.Len(), statistics.TotalFlowBytes
if count >= 2 && (count > maxHotStoreRegionCount || (count == maxHotStoreRegionCount && flowBytes > maxFlowBytes)) {
maxHotStoreRegionCount = count
maxFlowBytes = flowBytes
srcStoreID = storeID
}
}
return
}

// selectDestStore selects a target store to hold the region of the source region.
// We choose a target store based on the hot region number and flow bytes of this store.
func (h *balanceHotRegionsScheduler) selectDestStore(candidateStoreIDs []uint64, regionFlowBytes uint64, srcStoreID uint64, storesStat core.StoreHotRegionsStat) (destStoreID uint64) {
sr := storesStat[srcStoreID]
srcFlowBytes := sr.TotalFlowBytes
srcHotRegionsCount := sr.RegionsStat.Len()

var (
destPeer *metapb.Peer
minReadBytes uint64 = math.MaxUint64
minFlowBytes uint64 = math.MaxUint64
minRegionsCount = int(math.MaxInt32)
)
minRegionsCount := int(math.MaxInt32)
for storeID, peer := range srcRegion.GetFollowers() {
for _, storeID := range candidateStoreIDs {
if s, ok := storesStat[storeID]; ok {
if srcHotRegionsCount-s.RegionsStat.Len() > 1 && minRegionsCount > s.RegionsStat.Len() {
destPeer = peer
minReadBytes = s.TotalFlowBytes
destStoreID = storeID
minFlowBytes = s.TotalFlowBytes
minRegionsCount = s.RegionsStat.Len()
continue
}
if minRegionsCount == s.RegionsStat.Len() && minReadBytes > s.TotalFlowBytes &&
uint64(float64(srcReadBytes)*hotRegionScheduleFactor) > s.TotalFlowBytes+2*srcRegion.ReadBytes {
minReadBytes = s.TotalFlowBytes
destPeer = peer
if minRegionsCount == s.RegionsStat.Len() && minFlowBytes > s.TotalFlowBytes &&
uint64(float64(srcFlowBytes)*hotRegionScheduleFactor) > s.TotalFlowBytes+2*regionFlowBytes {
minFlowBytes = s.TotalFlowBytes
destStoreID = storeID
}
} else {
destPeer = peer
break
destStoreID = storeID
return
}
}
return destPeer
return
}

func (h *balanceHotRegionsScheduler) adjustBalanceLimit(storeID uint64, storesStat core.StoreHotRegionsStat) {
Expand Down

0 comments on commit 776bcd9

Please sign in to comment.