From 776bcd940b71d295a2c7ed762582bc3aff7d3c0e Mon Sep 17 00:00:00 2001 From: disksing Date: Mon, 8 Jan 2018 13:34:38 +0800 Subject: [PATCH] scheduler: fix hot region scheduler select store problem. (#898) (#906) --- server/schedulers/hot_region.go | 143 +++++++++++--------------------- 1 file changed, 50 insertions(+), 93 deletions(-) diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index 18b419aa206..4ee306c5a02 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -235,23 +235,7 @@ func (h *balanceHotRegionsScheduler) calcScore(items []*core.RegionStat, cluster } func (h *balanceHotRegionsScheduler) balanceByPeer(cluster schedule.Cluster, storesStat core.StoreHotRegionsStat) (*core.RegionInfo, *metapb.Peer, *metapb.Peer) { - var ( - maxReadBytes uint64 - srcStoreID uint64 - maxHotStoreRegionCount int - ) - - // get the srcStoreId - // We choose the store with the maximum hot region first; - // inside these stores, we choose the one with maximum written bytes. - for storeID, statistics := range storesStat { - count, readBytes := statistics.RegionsStat.Len(), statistics.TotalFlowBytes - if count >= 2 && (count > maxHotStoreRegionCount || (count == maxHotStoreRegionCount && readBytes > maxReadBytes)) { - maxHotStoreRegionCount = count - maxReadBytes = readBytes - srcStoreID = storeID - } - } + srcStoreID := h.selectSrcStore(storesStat) if srcStoreID == 0 { return nil, nil, nil } @@ -282,9 +266,8 @@ func (h *balanceHotRegionsScheduler) balanceByPeer(cluster schedule.Cluster, sto destStoreIDs = append(destStoreIDs, store.GetId()) } - destStoreID = h.selectDestStoreByPeer(destStoreIDs, srcRegion, srcStoreID, storesStat) + destStoreID = h.selectDestStore(destStoreIDs, rs.FlowBytes, srcStoreID, storesStat) if destStoreID != 0 { - srcRegion.ReadBytes = rs.FlowBytes h.adjustBalanceLimit(srcStoreID, storesStat) var srcPeer *metapb.Peer @@ -314,64 +297,8 @@ func (h *balanceHotRegionsScheduler) balanceByPeer(cluster schedule.Cluster, sto return nil, nil, nil } -// selectDestStoreByPeer selects a target store to hold the region of the source region. -// We choose a target store based on the hot region number and written bytes of this store. -func (h *balanceHotRegionsScheduler) selectDestStoreByPeer(candidateStoreIDs []uint64, srcRegion *core.RegionInfo, srcStoreID uint64, storesStat core.StoreHotRegionsStat) uint64 { - sr := storesStat[srcStoreID] - srcReadBytes := sr.TotalFlowBytes - srcHotRegionsCount := sr.RegionsStat.Len() - - var ( - destStoreID uint64 - minReadBytes uint64 = math.MaxUint64 - ) - minRegionsCount := int(math.MaxInt32) - for _, storeID := range candidateStoreIDs { - if s, ok := storesStat[storeID]; ok { - if srcHotRegionsCount-s.RegionsStat.Len() > 1 && minRegionsCount > s.RegionsStat.Len() { - destStoreID = storeID - minReadBytes = s.TotalFlowBytes - minRegionsCount = s.RegionsStat.Len() - continue - } - if minRegionsCount == s.RegionsStat.Len() && minReadBytes > s.TotalFlowBytes && - uint64(float64(srcReadBytes)*hotRegionScheduleFactor) > s.TotalFlowBytes+2*srcRegion.ReadBytes { - minReadBytes = s.TotalFlowBytes - destStoreID = storeID - } - } else { - destStoreID = storeID - break - } - } - return destStoreID -} - func (h *balanceHotRegionsScheduler) balanceByLeader(cluster schedule.Cluster, storesStat core.StoreHotRegionsStat) (*core.RegionInfo, *metapb.Peer) { - var ( - maxReadBytes uint64 - srcStoreID uint64 - maxHotStoreRegionCount int - ) - - // select srcStoreId by leader - for storeID, statistics := range storesStat { - if statistics.RegionsStat.Len() < 2 { - continue - } - - if maxHotStoreRegionCount < statistics.RegionsStat.Len() { - maxHotStoreRegionCount = statistics.RegionsStat.Len() - maxReadBytes = statistics.TotalFlowBytes - srcStoreID = storeID - continue - } - - if maxHotStoreRegionCount == statistics.RegionsStat.Len() && maxReadBytes < statistics.TotalFlowBytes { - maxReadBytes = statistics.TotalFlowBytes - srcStoreID = storeID - } - } + srcStoreID := h.selectSrcStore(storesStat) if srcStoreID == 0 { return nil, nil } @@ -384,7 +311,16 @@ func (h *balanceHotRegionsScheduler) balanceByLeader(cluster schedule.Cluster, s continue } - destPeer := h.selectDestStoreByLeader(srcRegion, storesStat) + candidateStoreIDs := make([]uint64, 0, len(srcRegion.Peers)-1) + for id := range srcRegion.GetFollowers() { + candidateStoreIDs = append(candidateStoreIDs, id) + } + destStoreID := h.selectDestStore(candidateStoreIDs, rs.FlowBytes, srcStoreID, storesStat) + if destStoreID == 0 { + continue + } + + destPeer := srcRegion.GetStorePeer(destStoreID) if destPeer != nil { h.adjustBalanceLimit(srcStoreID, storesStat) return srcRegion, destPeer @@ -393,35 +329,56 @@ func (h *balanceHotRegionsScheduler) balanceByLeader(cluster schedule.Cluster, s return nil, nil } -func (h *balanceHotRegionsScheduler) selectDestStoreByLeader(srcRegion *core.RegionInfo, storesStat core.StoreHotRegionsStat) *metapb.Peer { - sr := storesStat[srcRegion.Leader.GetStoreId()] - srcReadBytes := sr.TotalFlowBytes +// Select the store to move hot regions from. +// We choose the store with the maximum number of hot region first. +// Inside these stores, we choose the one with maximum flow bytes. +func (h *balanceHotRegionsScheduler) selectSrcStore(stats core.StoreHotRegionsStat) (srcStoreID uint64) { + var ( + maxFlowBytes uint64 + maxHotStoreRegionCount int + ) + + for storeID, statistics := range stats { + count, flowBytes := statistics.RegionsStat.Len(), statistics.TotalFlowBytes + if count >= 2 && (count > maxHotStoreRegionCount || (count == maxHotStoreRegionCount && flowBytes > maxFlowBytes)) { + maxHotStoreRegionCount = count + maxFlowBytes = flowBytes + srcStoreID = storeID + } + } + return +} + +// selectDestStore selects a target store to hold the region of the source region. +// We choose a target store based on the hot region number and flow bytes of this store. +func (h *balanceHotRegionsScheduler) selectDestStore(candidateStoreIDs []uint64, regionFlowBytes uint64, srcStoreID uint64, storesStat core.StoreHotRegionsStat) (destStoreID uint64) { + sr := storesStat[srcStoreID] + srcFlowBytes := sr.TotalFlowBytes srcHotRegionsCount := sr.RegionsStat.Len() var ( - destPeer *metapb.Peer - minReadBytes uint64 = math.MaxUint64 + minFlowBytes uint64 = math.MaxUint64 + minRegionsCount = int(math.MaxInt32) ) - minRegionsCount := int(math.MaxInt32) - for storeID, peer := range srcRegion.GetFollowers() { + for _, storeID := range candidateStoreIDs { if s, ok := storesStat[storeID]; ok { if srcHotRegionsCount-s.RegionsStat.Len() > 1 && minRegionsCount > s.RegionsStat.Len() { - destPeer = peer - minReadBytes = s.TotalFlowBytes + destStoreID = storeID + minFlowBytes = s.TotalFlowBytes minRegionsCount = s.RegionsStat.Len() continue } - if minRegionsCount == s.RegionsStat.Len() && minReadBytes > s.TotalFlowBytes && - uint64(float64(srcReadBytes)*hotRegionScheduleFactor) > s.TotalFlowBytes+2*srcRegion.ReadBytes { - minReadBytes = s.TotalFlowBytes - destPeer = peer + if minRegionsCount == s.RegionsStat.Len() && minFlowBytes > s.TotalFlowBytes && + uint64(float64(srcFlowBytes)*hotRegionScheduleFactor) > s.TotalFlowBytes+2*regionFlowBytes { + minFlowBytes = s.TotalFlowBytes + destStoreID = storeID } } else { - destPeer = peer - break + destStoreID = storeID + return } } - return destPeer + return } func (h *balanceHotRegionsScheduler) adjustBalanceLimit(storeID uint64, storesStat core.StoreHotRegionsStat) {