From d7215590214ac4720d729a2fd4c8c59fa6bb39e4 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Wed, 28 Apr 2021 08:49:40 +0200 Subject: [PATCH] Simplify lease management after cindex update is moved to 'hooks'. --- server/etcdserver/server.go | 34 ++++++++++++----------------- server/lease/leasehttp/http_test.go | 6 ++--- server/lease/lessor.go | 9 +++----- server/lease/lessor_bench_test.go | 2 +- server/lease/lessor_test.go | 30 ++++++++++++------------- 5 files changed, 36 insertions(+), 45 deletions(-) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 2a76dca21859..e5c3dc1a2d5b 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -301,9 +301,7 @@ type backendHooks struct { } func (bh *backendHooks) OnPreCommitUnsafe(tx backend.BatchTx) { - if bh.indexer != nil { - bh.indexer.UnsafeSave(tx) - } + bh.indexer.UnsafeSave(tx) } // NewServer creates a new EtcdServer from the supplied configuration. The @@ -358,8 +356,11 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { bepath := cfg.BackendPath() beExist := fileutil.Exist(bepath) - beHooks := &backendHooks{lg: cfg.Logger} + ci := cindex.NewConsistentIndex(nil) + beHooks := &backendHooks{lg: cfg.Logger, indexer: ci} be := openBackend(cfg, beHooks) + ci.SetBackend(be) + cindex.CreateMetaBucket(be.BatchTx()) defer func() { if err != nil { @@ -543,7 +544,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { peerRt: prt, reqIDGen: idutil.NewGenerator(uint16(id), time.Now()), AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, - consistIndex: cindex.NewConsistentIndex(be.BatchTx()), + consistIndex: ci, firstCommitInTermC: make(chan struct{}), } serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1) @@ -556,16 +557,11 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. - srv.lessor = lease.NewLessor( - srv.Logger(), - srv.be, - lease.LessorConfig{ - MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), - CheckpointInterval: cfg.LeaseCheckpointInterval, - ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(), - }, - srv.consistIndex, - ) + srv.lessor = lease.NewLessor(srv.Logger(), srv.be, lease.LessorConfig{ + MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), + CheckpointInterval: cfg.LeaseCheckpointInterval, + ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(), + }) tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken, func(index uint64) <-chan struct{} { @@ -577,12 +573,9 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { cfg.Logger.Warn("failed to create token provider", zap.Error(err)) return nil, err } - srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) - - // Backend should contain 'meta' bucket going forward. - beHooks.indexer = srv.consistIndex + srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) - kvindex := srv.consistIndex.ConsistentIndex() + kvindex := ci.ConsistentIndex() srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex)) if beExist { // TODO: remove kvindex != 0 checking when we do not expect users to upgrade @@ -1210,6 +1203,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { lg.Panic("failed to restore mvcc store", zap.Error(err)) } + s.consistIndex.SetBackend(newbe) lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex())) // Closing old backend might block until all the txns diff --git a/server/lease/leasehttp/http_test.go b/server/lease/leasehttp/http_test.go index 1a0ca486512c..ada3d3a2e2ad 100644 --- a/server/lease/leasehttp/http_test.go +++ b/server/lease/leasehttp/http_test.go @@ -31,7 +31,7 @@ func TestRenewHTTP(t *testing.T) { be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) defer betesting.Close(t, be) - le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}, nil) + le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) le.Promote(time.Second) l, err := le.Grant(1, int64(5)) if err != nil { @@ -55,7 +55,7 @@ func TestTimeToLiveHTTP(t *testing.T) { be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) defer betesting.Close(t, be) - le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}, nil) + le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) le.Promote(time.Second) l, err := le.Grant(1, int64(5)) if err != nil { @@ -96,7 +96,7 @@ func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) { be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) defer betesting.Close(t, be) - le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}, nil) + le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) le.Promote(time.Second) l, err := le.Grant(1, int64(5)) if err != nil { diff --git a/server/lease/lessor.go b/server/lease/lessor.go index 7a1544e9395c..5dba54db02e1 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -25,7 +25,6 @@ import ( "time" pb "go.etcd.io/etcd/api/v3/etcdserverpb" - "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/lease/leasepb" "go.etcd.io/etcd/server/v3/mvcc/backend" "go.uber.org/zap" @@ -182,7 +181,6 @@ type lessor struct { checkpointInterval time.Duration // the interval to check if the expired lease is revoked expiredLeaseRetryInterval time.Duration - ci cindex.ConsistentIndexer } type LessorConfig struct { @@ -191,11 +189,11 @@ type LessorConfig struct { ExpiredLeasesRetryInterval time.Duration } -func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig, ci cindex.ConsistentIndexer) Lessor { - return newLessor(lg, b, cfg, ci) +func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor { + return newLessor(lg, b, cfg) } -func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig, ci cindex.ConsistentIndexer) *lessor { +func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor { checkpointInterval := cfg.CheckpointInterval expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval if checkpointInterval == 0 { @@ -218,7 +216,6 @@ func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig, ci cindex.Co stopC: make(chan struct{}), doneC: make(chan struct{}), lg: lg, - ci: ci, } l.initAndRecover() diff --git a/server/lease/lessor_bench_test.go b/server/lease/lessor_bench_test.go index 46b702de33b4..06feec810c52 100644 --- a/server/lease/lessor_bench_test.go +++ b/server/lease/lessor_bench_test.go @@ -68,7 +68,7 @@ func setUp(t testing.TB) (le *lessor, tearDown func()) { be, _ := betesting.NewDefaultTmpBackend(t) // MinLeaseTTL is negative, so we can grant expired lease in benchmark. // ExpiredLeasesRetryInterval should small, so benchmark of findExpired will recheck expired lease. - le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond}, nil) + le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond}) le.SetRangeDeleter(func() TxnDelete { ftd := &FakeTxnDelete{be.BatchTx()} ftd.Lock() diff --git a/server/lease/lessor_test.go b/server/lease/lessor_test.go index 61b930929657..c6cb0518e845 100644 --- a/server/lease/lessor_test.go +++ b/server/lease/lessor_test.go @@ -45,7 +45,7 @@ func TestLessorGrant(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.Promote(0) @@ -107,7 +107,7 @@ func TestLeaseConcurrentKeys(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) }) @@ -156,7 +156,7 @@ func TestLessorRevoke(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() var fd *fakeDeleter le.SetRangeDeleter(func() TxnDelete { @@ -209,7 +209,7 @@ func TestLessorRenew(t *testing.T) { defer be.Close() defer os.RemoveAll(dir) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.Promote(0) @@ -242,7 +242,7 @@ func TestLessorRenewWithCheckpointer(t *testing.T) { defer be.Close() defer os.RemoveAll(dir) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) { for _, cp := range cp.GetCheckpoints() { le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL()) @@ -291,7 +291,7 @@ func TestLessorRenewExtendPileup(t *testing.T) { dir, be := NewTestBackend(t) defer os.RemoveAll(dir) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) ttl := int64(10) for i := 1; i <= leaseRevokeRate*10; i++ { if _, err := le.Grant(LeaseID(2*i), ttl); err != nil { @@ -310,7 +310,7 @@ func TestLessorRenewExtendPileup(t *testing.T) { bcfg.Path = filepath.Join(dir, "be") be = backend.New(bcfg) defer be.Close() - le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() // extend after recovery should extend expiration on lease pile-up @@ -340,7 +340,7 @@ func TestLessorDetach(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) }) @@ -381,7 +381,7 @@ func TestLessorRecover(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() l1, err1 := le.Grant(1, 10) l2, err2 := le.Grant(2, 20) @@ -390,7 +390,7 @@ func TestLessorRecover(t *testing.T) { } // Create a new lessor with the same backend - nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer nle.Stop() nl1 := nle.Lookup(l1.ID) if nl1 == nil || nl1.ttl != l1.ttl { @@ -411,7 +411,7 @@ func TestLessorExpire(t *testing.T) { testMinTTL := int64(1) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}) defer le.Stop() le.Promote(1 * time.Second) @@ -464,7 +464,7 @@ func TestLessorExpireAndDemote(t *testing.T) { testMinTTL := int64(1) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}) defer le.Stop() le.Promote(1 * time.Second) @@ -513,7 +513,7 @@ func TestLessorMaxTTL(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() _, err := le.Grant(1, MaxLeaseTTL+1) @@ -529,7 +529,7 @@ func TestLessorCheckpointScheduling(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second}) le.minLeaseTTL = 1 checkpointedC := make(chan struct{}) le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) { @@ -564,7 +564,7 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() l, err := le.Grant(1, 10) if err != nil {