From 12cff656786c632d4891c1c088805adc2f0bb5bb Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Mon, 12 Apr 2021 19:55:05 +0200 Subject: [PATCH] Simplify lease management after cindex update is moved to 'hooks'. --- etcdctl/snapshot/v3_snapshot.go | 4 +++- server/etcdserver/server.go | 15 +++++---------- server/lease/leasehttp/http_test.go | 6 +++--- server/lease/lessor.go | 9 +++------ server/lease/lessor_bench_test.go | 2 +- server/lease/lessor_test.go | 30 ++++++++++++++--------------- 6 files changed, 30 insertions(+), 36 deletions(-) diff --git a/etcdctl/snapshot/v3_snapshot.go b/etcdctl/snapshot/v3_snapshot.go index 7c8b159f8977..df4530c709da 100644 --- a/etcdctl/snapshot/v3_snapshot.go +++ b/etcdctl/snapshot/v3_snapshot.go @@ -361,7 +361,7 @@ func (s *v3Manager) saveDB() error { ci.SetConsistentIndex(uint64(commit)) // a lessor never timeouts leases - lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64}, ci) + lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64}) defer lessor.Stop() mvs := mvcc.NewStore(s.lg, be, lessor, ci, mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32}) @@ -379,6 +379,8 @@ func (s *v3Manager) saveDB() error { // todo: add back new members when we start to deprecate old snap file. btx.UnsafeForEach([]byte("members_removed"), del) + ci.UnsafeSave(btx) + // trigger write-out of new consistent index txn.End() diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index a6cbba516a75..dbd685da9528 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -552,16 +552,11 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. - srv.lessor = lease.NewLessor( - srv.Logger(), - srv.be, - lease.LessorConfig{ - MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), - CheckpointInterval: cfg.LeaseCheckpointInterval, - ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(), - }, - srv.consistIndex, - ) + srv.lessor = lease.NewLessor(srv.Logger(), srv.be, lease.LessorConfig{ + MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), + CheckpointInterval: cfg.LeaseCheckpointInterval, + ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(), + }) tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken, func(index uint64) <-chan struct{} { diff --git a/server/lease/leasehttp/http_test.go b/server/lease/leasehttp/http_test.go index 1a0ca486512c..ada3d3a2e2ad 100644 --- a/server/lease/leasehttp/http_test.go +++ b/server/lease/leasehttp/http_test.go @@ -31,7 +31,7 @@ func TestRenewHTTP(t *testing.T) { be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) defer betesting.Close(t, be) - le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}, nil) + le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) le.Promote(time.Second) l, err := le.Grant(1, int64(5)) if err != nil { @@ -55,7 +55,7 @@ func TestTimeToLiveHTTP(t *testing.T) { be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) defer betesting.Close(t, be) - le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}, nil) + le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) le.Promote(time.Second) l, err := le.Grant(1, int64(5)) if err != nil { @@ -96,7 +96,7 @@ func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) { be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) defer betesting.Close(t, be) - le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}, nil) + le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) le.Promote(time.Second) l, err := le.Grant(1, int64(5)) if err != nil { diff --git a/server/lease/lessor.go b/server/lease/lessor.go index 7a1544e9395c..5dba54db02e1 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -25,7 +25,6 @@ import ( "time" pb "go.etcd.io/etcd/api/v3/etcdserverpb" - "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/lease/leasepb" "go.etcd.io/etcd/server/v3/mvcc/backend" "go.uber.org/zap" @@ -182,7 +181,6 @@ type lessor struct { checkpointInterval time.Duration // the interval to check if the expired lease is revoked expiredLeaseRetryInterval time.Duration - ci cindex.ConsistentIndexer } type LessorConfig struct { @@ -191,11 +189,11 @@ type LessorConfig struct { ExpiredLeasesRetryInterval time.Duration } -func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig, ci cindex.ConsistentIndexer) Lessor { - return newLessor(lg, b, cfg, ci) +func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor { + return newLessor(lg, b, cfg) } -func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig, ci cindex.ConsistentIndexer) *lessor { +func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor { checkpointInterval := cfg.CheckpointInterval expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval if checkpointInterval == 0 { @@ -218,7 +216,6 @@ func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig, ci cindex.Co stopC: make(chan struct{}), doneC: make(chan struct{}), lg: lg, - ci: ci, } l.initAndRecover() diff --git a/server/lease/lessor_bench_test.go b/server/lease/lessor_bench_test.go index 46b702de33b4..06feec810c52 100644 --- a/server/lease/lessor_bench_test.go +++ b/server/lease/lessor_bench_test.go @@ -68,7 +68,7 @@ func setUp(t testing.TB) (le *lessor, tearDown func()) { be, _ := betesting.NewDefaultTmpBackend(t) // MinLeaseTTL is negative, so we can grant expired lease in benchmark. // ExpiredLeasesRetryInterval should small, so benchmark of findExpired will recheck expired lease. - le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond}, nil) + le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond}) le.SetRangeDeleter(func() TxnDelete { ftd := &FakeTxnDelete{be.BatchTx()} ftd.Lock() diff --git a/server/lease/lessor_test.go b/server/lease/lessor_test.go index 61b930929657..c6cb0518e845 100644 --- a/server/lease/lessor_test.go +++ b/server/lease/lessor_test.go @@ -45,7 +45,7 @@ func TestLessorGrant(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.Promote(0) @@ -107,7 +107,7 @@ func TestLeaseConcurrentKeys(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) }) @@ -156,7 +156,7 @@ func TestLessorRevoke(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() var fd *fakeDeleter le.SetRangeDeleter(func() TxnDelete { @@ -209,7 +209,7 @@ func TestLessorRenew(t *testing.T) { defer be.Close() defer os.RemoveAll(dir) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.Promote(0) @@ -242,7 +242,7 @@ func TestLessorRenewWithCheckpointer(t *testing.T) { defer be.Close() defer os.RemoveAll(dir) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) { for _, cp := range cp.GetCheckpoints() { le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL()) @@ -291,7 +291,7 @@ func TestLessorRenewExtendPileup(t *testing.T) { dir, be := NewTestBackend(t) defer os.RemoveAll(dir) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) ttl := int64(10) for i := 1; i <= leaseRevokeRate*10; i++ { if _, err := le.Grant(LeaseID(2*i), ttl); err != nil { @@ -310,7 +310,7 @@ func TestLessorRenewExtendPileup(t *testing.T) { bcfg.Path = filepath.Join(dir, "be") be = backend.New(bcfg) defer be.Close() - le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() // extend after recovery should extend expiration on lease pile-up @@ -340,7 +340,7 @@ func TestLessorDetach(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) }) @@ -381,7 +381,7 @@ func TestLessorRecover(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() l1, err1 := le.Grant(1, 10) l2, err2 := le.Grant(2, 20) @@ -390,7 +390,7 @@ func TestLessorRecover(t *testing.T) { } // Create a new lessor with the same backend - nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer nle.Stop() nl1 := nle.Lookup(l1.ID) if nl1 == nil || nl1.ttl != l1.ttl { @@ -411,7 +411,7 @@ func TestLessorExpire(t *testing.T) { testMinTTL := int64(1) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}) defer le.Stop() le.Promote(1 * time.Second) @@ -464,7 +464,7 @@ func TestLessorExpireAndDemote(t *testing.T) { testMinTTL := int64(1) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}) defer le.Stop() le.Promote(1 * time.Second) @@ -513,7 +513,7 @@ func TestLessorMaxTTL(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() _, err := le.Grant(1, MaxLeaseTTL+1) @@ -529,7 +529,7 @@ func TestLessorCheckpointScheduling(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second}) le.minLeaseTTL = 1 checkpointedC := make(chan struct{}) le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) { @@ -564,7 +564,7 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() l, err := le.Grant(1, 10) if err != nil {