diff --git a/server/cache.go b/server/cache.go index e75a6b2fb63..6bda3a3a26b 100644 --- a/server/cache.go +++ b/server/cache.go @@ -507,6 +507,8 @@ func (c *clusterInfo) collectMetrics() { defer c.RUnlock() c.regionStats.Collect() c.labelLevelStats.Collect() + // collect hot cache metrics + c.HotCache.CollectMetrics(c.Stores) } func (c *clusterInfo) GetRegionStatsByType(typ regionStatisticType) []*core.RegionInfo { diff --git a/server/coordinator.go b/server/coordinator.go index f116f9add65..959c6b45aa2 100644 --- a/server/coordinator.go +++ b/server/coordinator.go @@ -167,7 +167,7 @@ func (c *coordinator) checkRegion(region *core.RegionInfo) bool { ToStore: p.GetStoreId(), PeerID: p.GetId(), } - op := schedule.NewOperator("promoteLearner", region.GetId(), schedule.OpRegion, step) + op := schedule.NewOperator("promoteLearner", region.GetId(), region.GetRegionEpoch(), schedule.OpRegion, step) if c.addOperator(op) { return true } @@ -188,7 +188,7 @@ func (c *coordinator) checkRegion(region *core.RegionInfo) bool { if c.limiter.OperatorCount(schedule.OpMerge) < c.cluster.GetMergeScheduleLimit() { if op1, op2 := c.mergeChecker.Check(region); op1 != nil && op2 != nil { // make sure two operators can add successfully altogether - if c.addOperators(op1, op2) { + if c.addOperator(op1, op2) { return true } } @@ -365,8 +365,6 @@ func (c *coordinator) collectHotSpotMetrics() { } } - // collect hot cache metrics - c.cluster.HotCache.CollectMetrics(c.cluster.Stores) } func (c *coordinator) shouldRun() bool { @@ -430,11 +428,7 @@ func (c *coordinator) runScheduler(s *scheduleController) { } opInfluence := schedule.NewOpInfluence(c.getOperators(), c.cluster) if op := s.Schedule(c.cluster, opInfluence); op != nil { - if len(op) == 1 { - c.addOperator(op[0]) - } else { - c.addOperators(op...) - } + c.addOperator(op...) } case <-s.Ctx().Done(): @@ -449,14 +443,9 @@ func (c *coordinator) addOperatorLocked(op *schedule.Operator) bool { log.Infof("[region %v] add operator: %s", regionID, op) - // If the new operator passed in has higher priorities than the old one, - // then replace the old operator. + // If there is an old operator, replace it. The priority should be checked + // already. if old, ok := c.operators[regionID]; ok { - if !isHigherPriorityOperator(op, old) { - log.Infof("[region %v] cancel add operator, old: %s", regionID, old) - operatorCounter.WithLabelValues(op.Desc(), "canceled").Inc() - return false - } log.Infof("[region %v] replace old operator: %s", regionID, old) operatorCounter.WithLabelValues(old.Desc(), "replaced").Inc() c.removeOperatorLocked(old) @@ -475,20 +464,12 @@ func (c *coordinator) addOperatorLocked(op *schedule.Operator) bool { return true } -func (c *coordinator) addOperator(op *schedule.Operator) bool { - c.Lock() - defer c.Unlock() - - return c.addOperatorLocked(op) -} - -func (c *coordinator) addOperators(ops ...*schedule.Operator) bool { +func (c *coordinator) addOperator(ops ...*schedule.Operator) bool { c.Lock() defer c.Unlock() for _, op := range ops { - if old := c.operators[op.RegionID()]; old != nil && !isHigherPriorityOperator(op, old) { - log.Infof("[region %v] cancel add operators, old: %s", op.RegionID(), old) + if !c.checkAddOperator(op) { operatorCounter.WithLabelValues(op.Desc(), "canceled").Inc() return false } @@ -500,6 +481,23 @@ func (c *coordinator) addOperators(ops ...*schedule.Operator) bool { return true } +func (c *coordinator) checkAddOperator(op *schedule.Operator) bool { + region := c.cluster.GetRegion(op.RegionID()) + if region == nil { + log.Debugf("[region %v] region not found, cancel add operator", op.RegionID()) + return false + } + if region.GetRegionEpoch().GetVersion() != op.RegionEpoch().GetVersion() || region.GetRegionEpoch().GetConfVer() != op.RegionEpoch().GetConfVer() { + log.Debugf("[region %v] region epoch not match, %v vs %v, cancel add operator", op.RegionID(), region.GetRegionEpoch(), op.RegionEpoch()) + return false + } + if old := c.operators[op.RegionID()]; old != nil && !isHigherPriorityOperator(op, old) { + log.Debugf("[region %v] already have operator %s, cancel add operator", op.RegionID(), old) + return false + } + return true +} + func isHigherPriorityOperator(new, old *schedule.Operator) bool { return new.GetPriorityLevel() < old.GetPriorityLevel() } diff --git a/server/coordinator_test.go b/server/coordinator_test.go index a7f3c2c02c3..d5080b7a8ea 100644 --- a/server/coordinator_test.go +++ b/server/coordinator_test.go @@ -15,6 +15,7 @@ package server import ( "fmt" + "math/rand" "time" . "github.com/pingcap/check" @@ -28,8 +29,8 @@ import ( "github.com/pingcap/pd/server/schedulers" ) -func newTestOperator(regionID uint64, kind schedule.OperatorKind) *schedule.Operator { - return schedule.NewOperator("test", regionID, kind) +func newTestOperator(regionID uint64, regionEpoch *metapb.RegionEpoch, kind schedule.OperatorKind) *schedule.Operator { + return schedule.NewOperator("test", regionID, regionEpoch, kind) } func newTestScheduleConfig() (*ScheduleConfig, *scheduleOption) { @@ -53,9 +54,10 @@ func newTestClusterInfo(opt *scheduleOption) *testClusterInfo { func newTestRegionMeta(regionID uint64) *metapb.Region { return &metapb.Region{ - Id: regionID, - StartKey: []byte(fmt.Sprintf("%20d", regionID)), - EndKey: []byte(fmt.Sprintf("%20d", regionID+1)), + Id: regionID, + StartKey: []byte(fmt.Sprintf("%20d", regionID)), + EndKey: []byte(fmt.Sprintf("%20d", regionID+1)), + RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}, } } @@ -134,13 +136,15 @@ func (s *testCoordinatorSuite) TestBasic(c *C) { co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) l := co.limiter - op1 := newTestOperator(1, schedule.OpLeader) + tc.addLeaderRegion(1, 1) + + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), schedule.OpLeader) co.addOperator(op1) c.Assert(l.OperatorCount(op1.Kind()), Equals, uint64(1)) c.Assert(co.getOperator(1).RegionID(), Equals, op1.RegionID()) // Region 1 already has an operator, cannot add another one. - op2 := newTestOperator(1, schedule.OpRegion) + op2 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), schedule.OpRegion) co.addOperator(op2) c.Assert(l.OperatorCount(op2.Kind()), Equals, uint64(0)) @@ -234,6 +238,29 @@ func dispatchHeartbeat(c *C, co *coordinator, region *core.RegionInfo, stream *m co.dispatch(region) } +func (s *testCoordinatorSuite) TestCollectMetrics(c *C) { + _, opt := newTestScheduleConfig() + tc := newTestClusterInfo(opt) + hbStreams := newHeartbeatStreams(tc.getClusterID()) + defer hbStreams.Close() + + co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) + co.run() + // Make sure there are no problem when concurrent write and read + for i := 0; i <= 10; i++ { + go func(i int) { + for j := 0; j < 10000; j++ { + tc.addRegionStore(uint64(i%5), rand.Intn(200)) + } + }(i) + } + for i := 0; i < 1000; i++ { + co.collectHotSpotMetrics() + co.collectSchedulerMetrics() + co.cluster.collectMetrics() + } +} + func (s *testCoordinatorSuite) TestCheckRegion(c *C) { cfg, opt := newTestScheduleConfig() cfg.EnableRaftLearner = true @@ -615,21 +642,21 @@ func (s *testScheduleLimiterSuite) TestOperatorCount(c *C) { operators := make(map[uint64]*schedule.Operator) - operators[1] = newTestOperator(1, schedule.OpLeader) + operators[1] = newTestOperator(1, nil, schedule.OpLeader) l.UpdateCounts(operators) c.Assert(l.OperatorCount(schedule.OpLeader), Equals, uint64(1)) // 1:leader - operators[2] = newTestOperator(2, schedule.OpLeader) + operators[2] = newTestOperator(2, nil, schedule.OpLeader) l.UpdateCounts(operators) c.Assert(l.OperatorCount(schedule.OpLeader), Equals, uint64(2)) // 1:leader, 2:leader delete(operators, 1) l.UpdateCounts(operators) c.Assert(l.OperatorCount(schedule.OpLeader), Equals, uint64(1)) // 2:leader - operators[1] = newTestOperator(1, schedule.OpRegion) + operators[1] = newTestOperator(1, nil, schedule.OpRegion) l.UpdateCounts(operators) c.Assert(l.OperatorCount(schedule.OpRegion), Equals, uint64(1)) // 1:region 2:leader c.Assert(l.OperatorCount(schedule.OpLeader), Equals, uint64(1)) - operators[2] = newTestOperator(2, schedule.OpRegion) + operators[2] = newTestOperator(2, nil, schedule.OpRegion) l.UpdateCounts(operators) c.Assert(l.OperatorCount(schedule.OpRegion), Equals, uint64(2)) // 1:region 2:region c.Assert(l.OperatorCount(schedule.OpLeader), Equals, uint64(0)) @@ -657,6 +684,9 @@ func (s *testScheduleControllerSuite) TestController(c *C) { hbStreams := newHeartbeatStreams(tc.getClusterID()) defer hbStreams.Close() + tc.addLeaderRegion(1, 1) + tc.addLeaderRegion(2, 2) + co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) scheduler, err := schedule.CreateScheduler("balance-leader", co.limiter) c.Assert(err, IsNil) @@ -676,11 +706,11 @@ func (s *testScheduleControllerSuite) TestController(c *C) { lb.limit = 2 // count = 0 c.Assert(sc.AllowSchedule(), IsTrue) - op1 := newTestOperator(1, schedule.OpLeader) + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), schedule.OpLeader) c.Assert(co.addOperator(op1), IsTrue) // count = 1 c.Assert(sc.AllowSchedule(), IsTrue) - op2 := newTestOperator(2, schedule.OpLeader) + op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), schedule.OpLeader) c.Assert(co.addOperator(op2), IsTrue) // count = 2 c.Assert(sc.AllowSchedule(), IsFalse) @@ -689,7 +719,7 @@ func (s *testScheduleControllerSuite) TestController(c *C) { c.Assert(sc.AllowSchedule(), IsTrue) // add a PriorityKind operator will remove old operator - op3 := newTestOperator(2, schedule.OpHotRegion) + op3 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), schedule.OpHotRegion) op3.SetPriorityLevel(core.HighPriority) c.Assert(co.addOperator(op1), IsTrue) c.Assert(sc.AllowSchedule(), IsFalse) @@ -700,10 +730,28 @@ func (s *testScheduleControllerSuite) TestController(c *C) { // add a admin operator will remove old operator c.Assert(co.addOperator(op2), IsTrue) c.Assert(sc.AllowSchedule(), IsFalse) - op4 := newTestOperator(2, schedule.OpAdmin) + op4 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), schedule.OpAdmin) op4.SetPriorityLevel(core.HighPriority) c.Assert(co.addOperator(op4), IsTrue) c.Assert(sc.AllowSchedule(), IsTrue) + co.removeOperator(op4) + + // test wrong region id. + op5 := newTestOperator(3, &metapb.RegionEpoch{}, schedule.OpHotRegion) + c.Assert(co.addOperator(op5), IsFalse) + + // test wrong region epoch. + co.removeOperator(op1) + epoch := &metapb.RegionEpoch{ + Version: tc.GetRegion(1).GetRegionEpoch().GetVersion() + 1, + ConfVer: tc.GetRegion(1).GetRegionEpoch().GetConfVer(), + } + op6 := newTestOperator(1, epoch, schedule.OpLeader) + c.Assert(co.addOperator(op6), IsFalse) + epoch.Version-- + op6 = newTestOperator(1, epoch, schedule.OpLeader) + c.Assert(co.addOperator(op6), IsTrue) + co.removeOperator(op6) } func (s *testScheduleControllerSuite) TestInterval(c *C) { diff --git a/server/handler.go b/server/handler.go index 8f875f6a714..4ffe132bdcf 100644 --- a/server/handler.go +++ b/server/handler.go @@ -307,7 +307,7 @@ func (h *Handler) AddTransferLeaderOperator(regionID uint64, storeID uint64) err } step := schedule.TransferLeader{FromStore: region.Leader.GetStoreId(), ToStore: newLeader.GetStoreId()} - op := schedule.NewOperator("adminTransferLeader", regionID, schedule.OpAdmin|schedule.OpLeader, step) + op := schedule.NewOperator("adminTransferLeader", regionID, region.GetRegionEpoch(), schedule.OpAdmin|schedule.OpLeader, step) if ok := c.addOperator(op); !ok { return errors.Trace(errAddOperator) } @@ -358,7 +358,7 @@ func (h *Handler) AddTransferRegionOperator(regionID uint64, storeIDs map[uint64 steps = append(steps, schedule.RemovePeer{FromStore: peer.GetStoreId()}) } - op := schedule.NewOperator("adminMoveRegion", regionID, schedule.OpAdmin|schedule.OpRegion, steps...) + op := schedule.NewOperator("adminMoveRegion", regionID, region.GetRegionEpoch(), schedule.OpAdmin|schedule.OpRegion, steps...) if ok := c.addOperator(op); !ok { return errors.Trace(errAddOperator) } @@ -432,7 +432,7 @@ func (h *Handler) AddAddPeerOperator(regionID uint64, toStoreID uint64) error { schedule.AddPeer{ToStore: toStoreID, PeerID: newPeer.GetId()}, } } - op := schedule.NewOperator("adminAddPeer", regionID, schedule.OpAdmin|schedule.OpRegion, steps...) + op := schedule.NewOperator("adminAddPeer", regionID, region.GetRegionEpoch(), schedule.OpAdmin|schedule.OpRegion, steps...) if ok := c.addOperator(op); !ok { return errors.Trace(errAddOperator) } @@ -499,7 +499,7 @@ func (h *Handler) AddMergeRegionOperator(regionID uint64, targetID uint64) error if err != nil { return errors.Trace(err) } - if ok := c.addOperators(op1, op2); !ok { + if ok := c.addOperator(op1, op2); !ok { return errors.Trace(ErrAddOperator) } return nil @@ -518,7 +518,7 @@ func (h *Handler) AddSplitRegionOperator(regionID uint64) error { } step := schedule.SplitRegion{StartKey: region.StartKey, EndKey: region.EndKey} - op := schedule.NewOperator("adminSplitRegion", regionID, schedule.OpAdmin, step) + op := schedule.NewOperator("adminSplitRegion", regionID, region.GetRegionEpoch(), schedule.OpAdmin, step) if ok := c.addOperator(op); !ok { return errors.Trace(errAddOperator) } diff --git a/server/schedule/operator.go b/server/schedule/operator.go index ad3182abe96..00d04787289 100644 --- a/server/schedule/operator.go +++ b/server/schedule/operator.go @@ -238,6 +238,7 @@ func (sr SplitRegion) Influence(opInfluence OpInfluence, region *core.RegionInfo type Operator struct { desc string regionID uint64 + regionEpoch *metapb.RegionEpoch kind OperatorKind steps []OperatorStep currentStep int32 @@ -247,20 +248,21 @@ type Operator struct { } // NewOperator creates a new operator. -func NewOperator(desc string, regionID uint64, kind OperatorKind, steps ...OperatorStep) *Operator { +func NewOperator(desc string, regionID uint64, regionEpoch *metapb.RegionEpoch, kind OperatorKind, steps ...OperatorStep) *Operator { return &Operator{ - desc: desc, - regionID: regionID, - kind: kind, - steps: steps, - createTime: time.Now(), - stepTime: time.Now().UnixNano(), - level: core.NormalPriority, + desc: desc, + regionID: regionID, + regionEpoch: regionEpoch, + kind: kind, + steps: steps, + createTime: time.Now(), + stepTime: time.Now().UnixNano(), + level: core.NormalPriority, } } func (o *Operator) String() string { - s := fmt.Sprintf("%s (kind:%s, region:%v, createAt:%s, currentStep:%v, steps:%+v) ", o.desc, o.kind, o.regionID, o.createTime, atomic.LoadInt32(&o.currentStep), o.steps) + s := fmt.Sprintf("%s (kind:%s, region:%v(%v,%v), createAt:%s, currentStep:%v, steps:%+v) ", o.desc, o.kind, o.regionID, o.regionEpoch.GetVersion(), o.regionEpoch.GetConfVer(), o.createTime, atomic.LoadInt32(&o.currentStep), o.steps) if o.IsTimeout() { s = s + "timeout" } @@ -295,6 +297,11 @@ func (o *Operator) RegionID() uint64 { return o.regionID } +// RegionEpoch returns the region's epoch that is attched to the operator. +func (o *Operator) RegionEpoch() *metapb.RegionEpoch { + return o.regionEpoch +} + // Kind returns operator's kind. func (o *Operator) Kind() OperatorKind { return o.kind @@ -412,7 +419,7 @@ func (o *Operator) History() []OperatorHistory { // CreateRemovePeerOperator creates an Operator that removes a peer from region. func CreateRemovePeerOperator(desc string, cluster Cluster, kind OperatorKind, region *core.RegionInfo, storeID uint64) *Operator { removeKind, steps := removePeerSteps(cluster, region, storeID) - return NewOperator(desc, region.GetId(), removeKind|kind, steps...) + return NewOperator(desc, region.GetId(), region.GetRegionEpoch(), removeKind|kind, steps...) } // CreateMovePeerOperator creates an Operator that replaces an old peer with a new peer. @@ -430,7 +437,7 @@ func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInf } } steps = append(st, steps...) - return NewOperator(desc, region.GetId(), removeKind|kind|OpRegion, steps...) + return NewOperator(desc, region.GetId(), region.GetRegionEpoch(), removeKind|kind|OpRegion, steps...) } // removePeerSteps returns the steps to safely remove a peer. It prevents removing leader by transfer its leadership first. @@ -463,8 +470,8 @@ func CreateMergeRegionOperator(desc string, cluster Cluster, source *core.Region IsPassive: false, }) - op1 := NewOperator(desc, source.GetId(), kinds|kind, steps...) - op2 := NewOperator(desc, target.GetId(), kind, MergeRegion{ + op1 := NewOperator(desc, source.GetId(), source.GetRegionEpoch(), kinds|kind, steps...) + op2 := NewOperator(desc, target.GetId(), target.GetRegionEpoch(), kind, MergeRegion{ FromRegion: source.Region, ToRegion: target.Region, IsPassive: true, diff --git a/server/schedule/operator_test.go b/server/schedule/operator_test.go index 28139a3dfec..e344e7d5ba9 100644 --- a/server/schedule/operator_test.go +++ b/server/schedule/operator_test.go @@ -58,7 +58,7 @@ func (s *testOperatorSuite) TestOperatorStep(c *C) { } func (s *testOperatorSuite) newTestOperator(regionID uint64, kind OperatorKind, steps ...OperatorStep) *Operator { - return NewOperator("testOperator", regionID, OpAdmin|kind, steps...) + return NewOperator("testOperator", regionID, &metapb.RegionEpoch{}, OpAdmin|kind, steps...) } func (s *testOperatorSuite) checkSteps(c *C, op *Operator, steps []OperatorStep) { diff --git a/server/schedule/region_scatterer.go b/server/schedule/region_scatterer.go index 318e476489c..a5c5b4f2022 100644 --- a/server/schedule/region_scatterer.go +++ b/server/schedule/region_scatterer.go @@ -130,7 +130,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo) *Operator { if len(steps) == 0 { return nil } - return NewOperator("scatter-region", region.GetId(), kind, steps...) + return NewOperator("scatter-region", region.GetId(), region.GetRegionEpoch(), kind, steps...) } func (r *RegionScatterer) selectPeerToReplace(stores map[uint64]*core.StoreInfo, region *core.RegionInfo, oldPeer *metapb.Peer) *metapb.Peer { diff --git a/server/schedule/replica_checker.go b/server/schedule/replica_checker.go index e5db0b4b2af..f5c48f8e4eb 100644 --- a/server/schedule/replica_checker.go +++ b/server/schedule/replica_checker.go @@ -74,7 +74,7 @@ func (r *ReplicaChecker) Check(region *core.RegionInfo) *Operator { } } checkerCounter.WithLabelValues("replica_checker", "new_operator").Inc() - return NewOperator("makeUpReplica", region.GetId(), OpReplica|OpRegion, steps...) + return NewOperator("makeUpReplica", region.GetId(), region.GetRegionEpoch(), OpReplica|OpRegion, steps...) } // when add learner peer, the number of peer will exceed max replicas for a wille, diff --git a/server/schedulers/adjacent_region.go b/server/schedulers/adjacent_region.go index 2032ac4f1e4..6daf8a29076 100644 --- a/server/schedulers/adjacent_region.go +++ b/server/schedulers/adjacent_region.go @@ -256,7 +256,7 @@ func (l *balanceAdjacentRegionScheduler) disperseLeader(cluster schedule.Cluster return nil } step := schedule.TransferLeader{FromStore: before.Leader.GetStoreId(), ToStore: target.GetId()} - op := schedule.NewOperator("balance-adjacent-leader", before.GetId(), schedule.OpAdjacent|schedule.OpLeader, step) + op := schedule.NewOperator("balance-adjacent-leader", before.GetId(), before.GetRegionEpoch(), schedule.OpAdjacent|schedule.OpLeader, step) op.SetPriorityLevel(core.LowPriority) schedulerCounter.WithLabelValues(l.GetName(), "adjacent_leader").Inc() return op diff --git a/server/schedulers/balance_leader.go b/server/schedulers/balance_leader.go index 3269f004fc3..423e2d46012 100644 --- a/server/schedulers/balance_leader.go +++ b/server/schedulers/balance_leader.go @@ -170,6 +170,6 @@ func (l *balanceLeaderScheduler) createOperator(region *core.RegionInfo, source, balanceLeaderCounter.WithLabelValues("move_leader", fmt.Sprintf("store%d-out", source.GetId())).Inc() balanceLeaderCounter.WithLabelValues("move_leader", fmt.Sprintf("store%d-in", target.GetId())).Inc() step := schedule.TransferLeader{FromStore: region.Leader.GetStoreId(), ToStore: target.GetId()} - op := schedule.NewOperator("balance-leader", region.GetId(), schedule.OpBalance|schedule.OpLeader, step) + op := schedule.NewOperator("balance-leader", region.GetId(), region.GetRegionEpoch(), schedule.OpBalance|schedule.OpLeader, step) return []*schedule.Operator{op} } diff --git a/server/schedulers/evict_leader.go b/server/schedulers/evict_leader.go index b1d4d77f01c..3509ebde266 100644 --- a/server/schedulers/evict_leader.go +++ b/server/schedulers/evict_leader.go @@ -94,7 +94,7 @@ func (s *evictLeaderScheduler) Schedule(cluster schedule.Cluster, opInfluence sc } schedulerCounter.WithLabelValues(s.GetName(), "new_operator").Inc() step := schedule.TransferLeader{FromStore: region.Leader.GetStoreId(), ToStore: target.GetId()} - op := schedule.NewOperator("evict-leader", region.GetId(), schedule.OpLeader, step) + op := schedule.NewOperator("evict-leader", region.GetId(), region.GetRegionEpoch(), schedule.OpLeader, step) op.SetPriorityLevel(core.HighPriority) return []*schedule.Operator{op} } diff --git a/server/schedulers/grant_leader.go b/server/schedulers/grant_leader.go index aad4a797423..0b7d154dd1a 100644 --- a/server/schedulers/grant_leader.go +++ b/server/schedulers/grant_leader.go @@ -84,7 +84,7 @@ func (s *grantLeaderScheduler) Schedule(cluster schedule.Cluster, opInfluence sc } schedulerCounter.WithLabelValues(s.GetName(), "new_operator").Inc() step := schedule.TransferLeader{FromStore: region.Leader.GetStoreId(), ToStore: s.storeID} - op := schedule.NewOperator("grant-leader", region.GetId(), schedule.OpLeader, step) + op := schedule.NewOperator("grant-leader", region.GetId(), region.GetRegionEpoch(), schedule.OpLeader, step) op.SetPriorityLevel(core.HighPriority) return []*schedule.Operator{op} } diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index e4f455e3909..5013078b27d 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -148,7 +148,7 @@ func (h *balanceHotRegionsScheduler) balanceHotReadRegions(cluster schedule.Clus if srcRegion != nil { schedulerCounter.WithLabelValues(h.GetName(), "move_leader").Inc() step := schedule.TransferLeader{FromStore: srcRegion.Leader.GetStoreId(), ToStore: newLeader.GetStoreId()} - return []*schedule.Operator{schedule.NewOperator("transferHotReadLeader", srcRegion.GetId(), schedule.OpHotRegion|schedule.OpLeader, step)} + return []*schedule.Operator{schedule.NewOperator("transferHotReadLeader", srcRegion.GetId(), srcRegion.GetRegionEpoch(), schedule.OpHotRegion|schedule.OpLeader, step)} } // balance by peer @@ -180,7 +180,7 @@ func (h *balanceHotRegionsScheduler) balanceHotWriteRegions(cluster schedule.Clu if srcRegion != nil { schedulerCounter.WithLabelValues(h.GetName(), "move_leader").Inc() step := schedule.TransferLeader{FromStore: srcRegion.Leader.GetStoreId(), ToStore: newLeader.GetStoreId()} - return []*schedule.Operator{schedule.NewOperator("transferHotWriteLeader", srcRegion.GetId(), schedule.OpHotRegion|schedule.OpLeader, step)} + return []*schedule.Operator{schedule.NewOperator("transferHotWriteLeader", srcRegion.GetId(), srcRegion.GetRegionEpoch(), schedule.OpHotRegion|schedule.OpLeader, step)} } } } diff --git a/server/schedulers/label.go b/server/schedulers/label.go index e8199595782..fe412c05970 100644 --- a/server/schedulers/label.go +++ b/server/schedulers/label.go @@ -90,7 +90,7 @@ func (s *labelScheduler) Schedule(cluster schedule.Cluster, opInfluence schedule schedulerCounter.WithLabelValues(s.GetName(), "new_operator").Inc() step := schedule.TransferLeader{FromStore: id, ToStore: target.GetId()} - op := schedule.NewOperator("label-reject-leader", region.GetId(), schedule.OpLeader, step) + op := schedule.NewOperator("label-reject-leader", region.GetId(), region.GetRegionEpoch(), schedule.OpLeader, step) return []*schedule.Operator{op} } } diff --git a/server/schedulers/shuffle_leader.go b/server/schedulers/shuffle_leader.go index bf00946ad73..eba421ec3dd 100644 --- a/server/schedulers/shuffle_leader.go +++ b/server/schedulers/shuffle_leader.go @@ -76,7 +76,7 @@ func (s *shuffleLeaderScheduler) Schedule(cluster schedule.Cluster, opInfluence } schedulerCounter.WithLabelValues(s.GetName(), "new_operator").Inc() step := schedule.TransferLeader{FromStore: region.Leader.GetStoreId(), ToStore: targetStore.GetId()} - op := schedule.NewOperator("shuffleLeader", region.GetId(), schedule.OpAdmin|schedule.OpLeader, step) + op := schedule.NewOperator("shuffleLeader", region.GetId(), region.GetRegionEpoch(), schedule.OpAdmin|schedule.OpLeader, step) op.SetPriorityLevel(core.HighPriority) return []*schedule.Operator{op} }