diff --git a/lease/lessor.go b/lease/lessor.go index 2a53be3226f..8f2e3a7579d 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -45,8 +45,8 @@ var ( leaseBucketName = []byte("lease") - // maximum number of leases to revoke per second; configurable for tests - leaseRevokeRate = 1000 + // default number of leases to revoke per second; configurable for tests + defaultLeaseRevokeRate = 1000 // maximum number of lease checkpoints recorded to the consensus log per second; configurable for tests leaseCheckpointRate = 1000 @@ -173,6 +173,9 @@ type lessor struct { // requests for shorter TTLs are extended to the minimum TTL. minLeaseTTL int64 + // maximum number of leases to revoke per second + leaseRevokeRate int + expiredC chan []*Lease // stopC is a channel whose closure indicates that the lessor should be stopped. stopC chan struct{} @@ -201,6 +204,8 @@ type LessorConfig struct { CheckpointInterval time.Duration ExpiredLeasesRetryInterval time.Duration CheckpointPersist bool + + leaseRevokeRate int } func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) Lessor { @@ -210,12 +215,16 @@ func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorCon func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) *lessor { checkpointInterval := cfg.CheckpointInterval expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval + leaseRevokeRate := cfg.leaseRevokeRate if checkpointInterval == 0 { checkpointInterval = defaultLeaseCheckpointInterval } if expiredLeaseRetryInterval == 0 { expiredLeaseRetryInterval = defaultExpiredleaseRetryInterval } + if leaseRevokeRate == 0 { + leaseRevokeRate = defaultLeaseRevokeRate + } l := &lessor{ leaseMap: make(map[LeaseID]*Lease), itemMap: make(map[LeaseItem]LeaseID), @@ -223,6 +232,7 @@ func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorCon leaseCheckpointHeap: make(LeaseQueue, 0), b: b, minLeaseTTL: cfg.MinLeaseTTL, + leaseRevokeRate: leaseRevokeRate, checkpointInterval: checkpointInterval, expiredLeaseRetryInterval: expiredLeaseRetryInterval, checkpointPersist: cfg.CheckpointPersist, @@ -475,7 +485,7 @@ func (le *lessor) Promote(extend time.Duration) { le.scheduleCheckpointIfNeeded(l) } - if len(le.leaseMap) < leaseRevokeRate { + if len(le.leaseMap) < le.leaseRevokeRate { // no possibility of lease pile-up return } @@ -489,7 +499,7 @@ func (le *lessor) Promote(extend time.Duration) { expires := 0 // have fewer expires than the total revoke rate so piled up leases // don't consume the entire revoke limit - targetExpiresPerSecond := (3 * leaseRevokeRate) / 4 + targetExpiresPerSecond := (3 * le.leaseRevokeRate) / 4 for _, l := range leases { remaining := l.Remaining() if remaining > nextWindow { @@ -628,7 +638,7 @@ func (le *lessor) revokeExpiredLeases() { var ls []*Lease // rate limit - revokeLimit := leaseRevokeRate / 2 + revokeLimit := le.leaseRevokeRate / 2 le.mu.RLock() if le.isPrimary() { diff --git a/lease/lessor_test.go b/lease/lessor_test.go index 5280be0751e..36eab0b6118 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -289,17 +289,15 @@ func TestLessorRenewWithCheckpointer(t *testing.T) { // TestLessorRenewExtendPileup ensures Lessor extends leases on promotion if too many // expire at the same time. func TestLessorRenewExtendPileup(t *testing.T) { - oldRevokeRate := leaseRevokeRate - defer func() { leaseRevokeRate = oldRevokeRate }() + leaseRevokeRate := 10 lg := zap.NewNop() - leaseRevokeRate = 10 dir, be := NewTestBackend(t) defer os.RemoveAll(dir) - le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL, leaseRevokeRate: leaseRevokeRate}) ttl := int64(10) - for i := 1; i <= leaseRevokeRate*10; i++ { + for i := 1; i <= le.leaseRevokeRate*10; i++ { if _, err := le.Grant(LeaseID(2*i), ttl); err != nil { t.Fatal(err) } @@ -316,7 +314,7 @@ func TestLessorRenewExtendPileup(t *testing.T) { bcfg.Path = filepath.Join(dir, "be") be = backend.New(bcfg) defer be.Close() - le = newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) + le = newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL, leaseRevokeRate: leaseRevokeRate}) defer le.Stop() // extend after recovery should extend expiration on lease pile-up @@ -331,11 +329,11 @@ func TestLessorRenewExtendPileup(t *testing.T) { for i := ttl; i < ttl+20; i++ { c := windowCounts[i] - if c > leaseRevokeRate { - t.Errorf("expected at most %d expiring at %ds, got %d", leaseRevokeRate, i, c) + if c > le.leaseRevokeRate { + t.Errorf("expected at most %d expiring at %ds, got %d", le.leaseRevokeRate, i, c) } - if c < leaseRevokeRate/2 { - t.Errorf("expected at least %d expiring at %ds, got %d", leaseRevokeRate/2, i, c) + if c < le.leaseRevokeRate/2 { + t.Errorf("expected at least %d expiring at %ds, got %d", le.leaseRevokeRate/2, i, c) } } }