From 9f11b16b2dcab5922bed18b818b84fc1113f7e06 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Fri, 9 Apr 2021 14:18:18 +0200 Subject: [PATCH 1/8] backend: Hooks interface & implementation. --- server/mvcc/backend/backend.go | 8 ++++++++ server/mvcc/backend/batch_tx.go | 7 +++++++ server/mvcc/backend/hooks.go | 36 +++++++++++++++++++++++++++++++++ 3 files changed, 51 insertions(+) create mode 100644 server/mvcc/backend/hooks.go diff --git a/server/mvcc/backend/backend.go b/server/mvcc/backend/backend.go index 6c7037c244e..84f35d14121 100644 --- a/server/mvcc/backend/backend.go +++ b/server/mvcc/backend/backend.go @@ -104,6 +104,8 @@ type backend struct { stopc chan struct{} donec chan struct{} + hooks Hooks + lg *zap.Logger } @@ -124,6 +126,9 @@ type BackendConfig struct { UnsafeNoFsync bool `json:"unsafe-no-fsync"` // Mlock prevents backend database file to be swapped Mlock bool + + // Hooks are getting executed during lifecycle of Backend's transactions. + Hooks Hooks } func DefaultBackendConfig() BackendConfig { @@ -189,6 +194,9 @@ func newBackend(bcfg BackendConfig) *backend { lg: bcfg.Logger, } b.batchTx = newBatchTxBuffered(b) + // We set it after newBatchTxBuffered to skip the 'empty' commit. + b.hooks = bcfg.Hooks + go b.run() return b } diff --git a/server/mvcc/backend/batch_tx.go b/server/mvcc/backend/batch_tx.go index d4bc8c68428..74107b445e5 100644 --- a/server/mvcc/backend/batch_tx.go +++ b/server/mvcc/backend/batch_tx.go @@ -109,6 +109,7 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo t.backend.lg.Fatal( "failed to find a bucket", zap.String("bucket-name", string(bucketName)), + zap.Stack("stack"), ) } if seq { @@ -133,6 +134,7 @@ func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][] t.backend.lg.Fatal( "failed to find a bucket", zap.String("bucket-name", string(bucketName)), + zap.Stack("stack"), ) } return unsafeRange(bucket.Cursor(), key, endKey, limit) @@ -167,6 +169,7 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) { t.backend.lg.Fatal( "failed to find a bucket", zap.String("bucket-name", string(bucketName)), + zap.Stack("stack"), ) } err := bucket.Delete(key) @@ -283,6 +286,10 @@ func (t *batchTxBuffered) CommitAndStop() { } func (t *batchTxBuffered) commit(stop bool) { + if t.backend.hooks != nil { + t.backend.hooks.OnPreCommitUnsafe(t) + } + // all read txs must be closed to acquire boltdb commit rwlock t.backend.readTx.Lock() t.unsafeCommit(stop) diff --git a/server/mvcc/backend/hooks.go b/server/mvcc/backend/hooks.go new file mode 100644 index 00000000000..9750828ef7b --- /dev/null +++ b/server/mvcc/backend/hooks.go @@ -0,0 +1,36 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package backend + +type HookFunc func(tx BatchTx) + +// Hooks allow to add additional logic executed during transaction lifetime. +type Hooks interface { + // OnPreCommitUnsafe is executed before Commit of transactions. + // The given transaction is already locked. + OnPreCommitUnsafe(tx BatchTx) +} + +type hooks struct { + onPreCommitUnsafe HookFunc +} + +func (h hooks) OnPreCommitUnsafe(tx BatchTx) { + h.onPreCommitUnsafe(tx) +} + +func NewHooks(onPreCommitUnsafe HookFunc) Hooks { + return hooks{onPreCommitUnsafe: onPreCommitUnsafe} +} From d53d2db1e28d9aca9ccac5b48a472adde68ffc0f Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Fri, 9 Apr 2021 18:43:13 +0200 Subject: [PATCH 2/8] Tests: Backend hooks support. --- server/mvcc/backend/hooks_test.go | 138 ++++++++++++++++++++++++++++++ 1 file changed, 138 insertions(+) create mode 100644 server/mvcc/backend/hooks_test.go diff --git a/server/mvcc/backend/hooks_test.go b/server/mvcc/backend/hooks_test.go new file mode 100644 index 00000000000..dbf68f910be --- /dev/null +++ b/server/mvcc/backend/hooks_test.go @@ -0,0 +1,138 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package backend_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/server/v3/mvcc/backend" + betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" +) + +var ( + bucket = []byte("bucket") + key = []byte("key") +) + +func TestBackendPreCommitHook(t *testing.T) { + be := newTestHooksBackend(t, backend.DefaultBackendConfig()) + + tx := be.BatchTx() + prepareBuckenAndKey(tx) + tx.Commit() + + // Empty commit. + tx.Commit() + + write(tx, []byte("foo"), []byte("bar")) + + assert.Equal(t, ">cc", getCommitsKey(t, be), "expected 2 explict commits") + tx.Commit() + assert.Equal(t, ">ccc", getCommitsKey(t, be), "expected 3 explict commits") +} + +func TestBackendAutoCommitLimitHook(t *testing.T) { + cfg := backend.DefaultBackendConfig() + cfg.BatchLimit = 3 + be := newTestHooksBackend(t, cfg) + + tx := be.BatchTx() + prepareBuckenAndKey(tx) // writes 2 entries. + + for i := 3; i <= 9; i++ { + write(tx, []byte("i"), []byte{byte(i)}) + } + + assert.Equal(t, ">ccc", getCommitsKey(t, be)) +} + +func write(tx backend.BatchTx, k, v []byte) { + tx.Lock() + defer tx.Unlock() + tx.UnsafePut(bucket, k, v) +} + +func TestBackendAutoCommitBatchIntervalHook(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + cfg := backend.DefaultBackendConfig() + cfg.BatchInterval = 10 * time.Millisecond + be := newTestHooksBackend(t, cfg) + tx := be.BatchTx() + prepareBuckenAndKey(tx) + + // Edits trigger an auto-commit + waitUntil(ctx, t, func() bool { return getCommitsKey(t, be) == ">c" }) + + time.Sleep(time.Second) + // No additional auto-commits, as there were no more edits + assert.Equal(t, ">c", getCommitsKey(t, be)) + + write(tx, []byte("foo"), []byte("bar1")) + + waitUntil(ctx, t, func() bool { return getCommitsKey(t, be) == ">cc" }) + + write(tx, []byte("foo"), []byte("bar1")) + + waitUntil(ctx, t, func() bool { return getCommitsKey(t, be) == ">ccc" }) +} + +func waitUntil(ctx context.Context, t testing.TB, f func() bool) { + for !f() { + select { + case <-ctx.Done(): + t.Fatalf("Context cancelled/timedout without condition met: %v", ctx.Err()) + default: + } + time.Sleep(10 * time.Millisecond) + } +} + +func prepareBuckenAndKey(tx backend.BatchTx) { + tx.Lock() + defer tx.Unlock() + tx.UnsafeCreateBucket(bucket) + tx.UnsafePut(bucket, key, []byte(">")) +} + +func newTestHooksBackend(t testing.TB, baseConfig backend.BackendConfig) backend.Backend { + cfg := baseConfig + cfg.Hooks = backend.NewHooks(func(tx backend.BatchTx) { + k, v := tx.UnsafeRange(bucket, key, nil, 1) + t.Logf("OnPreCommit executed: %v %v", string(k[0]), string(v[0])) + assert.Len(t, k, 1) + assert.Len(t, v, 1) + tx.UnsafePut(bucket, key, append(v[0], byte('c'))) + }) + + be, _ := betesting.NewTmpBackendFromCfg(t, cfg) + t.Cleanup(func() { + betesting.Close(t, be) + }) + return be +} + +func getCommitsKey(t testing.TB, be backend.Backend) string { + rtx := be.BatchTx() + rtx.Lock() + defer rtx.Unlock() + _, v := rtx.UnsafeRange(bucket, key, nil, 1) + assert.Len(t, v, 1) + return string(v[0]) +} From 50051675f9740a4561e13bc5f00a89982b5202ad Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Mon, 12 Apr 2021 17:28:39 +0200 Subject: [PATCH 3/8] Integrate backend::hooks with consistent_index. Every transaction committed to backend is writing most recent consistent_index. Makes sure that even automatically trigger commits of batch-transactions stays "really" consistent a.d. the most recent WAL log index applied. --- server/etcdserver/backend.go | 19 +++++++++---------- server/etcdserver/cindex/cindex.go | 1 + server/etcdserver/server.go | 26 +++++++++++++++++++++++--- 3 files changed, 33 insertions(+), 13 deletions(-) diff --git a/server/etcdserver/backend.go b/server/etcdserver/backend.go index 2e738c1963a..120d0124f53 100644 --- a/server/etcdserver/backend.go +++ b/server/etcdserver/backend.go @@ -28,7 +28,7 @@ import ( "go.uber.org/zap" ) -func newBackend(cfg config.ServerConfig) backend.Backend { +func newBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend { bcfg := backend.DefaultBackendConfig() bcfg.Path = cfg.BackendPath() bcfg.UnsafeNoFsync = cfg.UnsafeNoFsync @@ -51,12 +51,12 @@ func newBackend(cfg config.ServerConfig) backend.Backend { bcfg.MmapSize = uint64(cfg.QuotaBackendBytes + cfg.QuotaBackendBytes/10) } bcfg.Mlock = cfg.ExperimentalMemoryMlock - + bcfg.Hooks = hooks return backend.New(bcfg) } // openSnapshotBackend renames a snapshot db to the current etcd db and opens it. -func openSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) { +func openSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot, hooks backend.Hooks) (backend.Backend, error) { snapPath, err := ss.DBFilePath(snapshot.Metadata.Index) if err != nil { return nil, fmt.Errorf("failed to find database snapshot file (%v)", err) @@ -64,16 +64,16 @@ func openSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot if err := os.Rename(snapPath, cfg.BackendPath()); err != nil { return nil, fmt.Errorf("failed to rename database snapshot file (%v)", err) } - return openBackend(cfg), nil + return openBackend(cfg, hooks), nil } // openBackend returns a backend using the current etcd db. -func openBackend(cfg config.ServerConfig) backend.Backend { +func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend { fn := cfg.BackendPath() now, beOpened := time.Now(), make(chan backend.Backend) go func() { - beOpened <- newBackend(cfg) + beOpened <- newBackend(cfg, hooks) }() select { @@ -96,15 +96,14 @@ func openBackend(cfg config.ServerConfig) backend.Backend { // before updating the backend db after persisting raft snapshot to disk, // violating the invariant snapshot.Metadata.Index < db.consistentIndex. In this // case, replace the db with the snapshot db sent by the leader. -func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool) (backend.Backend, error) { +func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) { consistentIndex := uint64(0) if beExist { - ci := cindex.NewConsistentIndex(oldbe.BatchTx()) - consistentIndex = ci.ConsistentIndex() + consistentIndex = cindex.ReadConsistentIndex(oldbe.BatchTx()) } if snapshot.Metadata.Index <= consistentIndex { return oldbe, nil } oldbe.Close() - return openSnapshotBackend(cfg, snap.New(cfg.Logger, cfg.SnapDir()), snapshot) + return openSnapshotBackend(cfg, snap.New(cfg.Logger, cfg.SnapDir()), snapshot, hooks) } diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index 25bbeb8727c..73e96fd70c4 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -76,6 +76,7 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64) { func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) { index := atomic.LoadUint64(&ci.consistentIndex) + if index == 0 { // Never save 0 as it means that we didn't loaded the real index yet. return diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index d5acb2da1e7..f7da9b4f156 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -260,6 +260,7 @@ type EtcdServer struct { lessor lease.Lessor bemu sync.Mutex be backend.Backend + beHooks backend.Hooks authStore auth.AuthStore alarmStore *v3alarm.AlarmStore @@ -294,6 +295,17 @@ type EtcdServer struct { *AccessController } +type backendHooks struct { + indexer cindex.ConsistentIndexer + lg *zap.Logger +} + +func (bh *backendHooks) OnPreCommitUnsafe(tx backend.BatchTx) { + if bh.indexer != nil { + bh.indexer.UnsafeSave(tx) + } +} + // NewServer creates a new EtcdServer from the supplied configuration. The // configuration is considered static for the lifetime of the EtcdServer. func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { @@ -345,7 +357,9 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { bepath := cfg.BackendPath() beExist := fileutil.Exist(bepath) - be := openBackend(cfg) + + beHooks := &backendHooks{lg: cfg.Logger} + be := openBackend(cfg, beHooks) defer func() { if err != nil { @@ -463,7 +477,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))), ) - if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist); err != nil { + if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil { cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err)) } s1, s2 := be.Size(), be.SizeInUse() @@ -537,6 +551,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) srv.be = be + srv.beHooks = beHooks minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. @@ -563,6 +578,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { return nil, err } srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) + + // Backend should contain 'meta' bucket going forward. + beHooks.indexer = srv.consistIndex + kvindex := srv.consistIndex.ConsistentIndex() srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex)) if beExist { @@ -1170,7 +1189,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { // wait for raftNode to persist snapshot onto the disk <-apply.notifyc - newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot) + newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot, s.beHooks) if err != nil { lg.Panic("failed to open snapshot backend", zap.Error(err)) } @@ -2117,6 +2136,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { return } s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq)) + if raftReq.V2 != nil { req := (*RequestV2)(raftReq.V2) s.w.Trigger(req.ID, s.applyV2Request(req)) From fe3254aee3099c70e8d0bc9c910a1bfd7ab19cb6 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Mon, 12 Apr 2021 19:10:13 +0200 Subject: [PATCH 4/8] Remove explicit authStore->ConsistencyIndex updates, as they are taken care by hook. --- server/auth/store.go | 23 +---------------------- server/auth/store_test.go | 16 ++++++++-------- server/etcdserver/server.go | 2 +- server/etcdserver/server_test.go | 20 ++++++++++---------- 4 files changed, 20 insertions(+), 41 deletions(-) diff --git a/server/auth/store.go b/server/auth/store.go index 965c3e86d1b..19dd7e738ad 100644 --- a/server/auth/store.go +++ b/server/auth/store.go @@ -29,7 +29,6 @@ import ( "go.etcd.io/etcd/api/v3/authpb" pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" - "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/mvcc/backend" "go.uber.org/zap" @@ -215,7 +214,6 @@ type authStore struct { tokenProvider TokenProvider bcryptCost int // the algorithm cost / strength for hashing auth passwords - ci cindex.ConsistentIndexer } func (as *authStore) AuthEnable() error { @@ -266,7 +264,6 @@ func (as *authStore) AuthDisable() { tx.Lock() tx.UnsafePut(authBucketName, enableFlagKey, authDisabled) as.commitRevision(tx) - as.saveConsistentIndex(tx) tx.Unlock() b.ForceCommit() @@ -424,7 +421,6 @@ func (as *authStore) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, putUser(as.lg, tx, newUser) as.commitRevision(tx) - as.saveConsistentIndex(tx) as.lg.Info("added a user", zap.String("user-name", r.Name)) return &pb.AuthUserAddResponse{}, nil @@ -448,7 +444,6 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete delUser(tx, r.Name) as.commitRevision(tx) - as.saveConsistentIndex(tx) as.invalidateCachedPerm(r.Name) as.tokenProvider.invalidateUser(r.Name) @@ -491,7 +486,6 @@ func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*p putUser(as.lg, tx, updatedUser) as.commitRevision(tx) - as.saveConsistentIndex(tx) as.invalidateCachedPerm(r.Name) as.tokenProvider.invalidateUser(r.Name) @@ -540,7 +534,6 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser as.invalidateCachedPerm(r.User) as.commitRevision(tx) - as.saveConsistentIndex(tx) as.lg.Info( "granted a role to a user", @@ -619,7 +612,6 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs as.invalidateCachedPerm(r.Name) as.commitRevision(tx) - as.saveConsistentIndex(tx) as.lg.Info( "revoked a role from a user", @@ -690,7 +682,6 @@ func (as *authStore) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest) as.clearCachedPerm() as.commitRevision(tx) - as.saveConsistentIndex(tx) as.lg.Info( "revoked a permission on range", @@ -742,7 +733,6 @@ func (as *authStore) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDelete } as.commitRevision(tx) - as.saveConsistentIndex(tx) as.lg.Info("deleted a role", zap.String("role-name", r.Role)) return &pb.AuthRoleDeleteResponse{}, nil @@ -769,7 +759,6 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, putRole(as.lg, tx, newRole) as.commitRevision(tx) - as.saveConsistentIndex(tx) as.lg.Info("created a role", zap.String("role-name", r.Name)) return &pb.AuthRoleAddResponse{}, nil @@ -829,7 +818,6 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) ( as.clearCachedPerm() as.commitRevision(tx) - as.saveConsistentIndex(tx) as.lg.Info( "granted/updated a permission to a user", @@ -1021,7 +1009,7 @@ func (as *authStore) IsAuthEnabled() bool { } // NewAuthStore creates a new AuthStore. -func NewAuthStore(lg *zap.Logger, be backend.Backend, ci cindex.ConsistentIndexer, tp TokenProvider, bcryptCost int) *authStore { +func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCost int) *authStore { if lg == nil { lg = zap.NewNop() } @@ -1056,7 +1044,6 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, ci cindex.ConsistentIndexe revision: getRevision(tx), lg: lg, be: be, - ci: ci, enabled: enabled, rangePermCache: make(map[string]*unifiedRangePermissions), tokenProvider: tp, @@ -1317,14 +1304,6 @@ func (as *authStore) BcryptCost() int { return as.bcryptCost } -func (as *authStore) saveConsistentIndex(tx backend.BatchTx) { - if as.ci != nil { - as.ci.UnsafeSave(tx) - } else { - as.lg.Error("failed to save consistentIndex,consistentIndexer is nil") - } -} - func (as *authStore) setupMetricsReporter() { reportCurrentAuthRevMu.Lock() reportCurrentAuthRev = func() float64 { diff --git a/server/auth/store_test.go b/server/auth/store_test.go index 53ed3469f40..c530ffe8861 100644 --- a/server/auth/store_test.go +++ b/server/auth/store_test.go @@ -52,7 +52,7 @@ func TestNewAuthStoreRevision(t *testing.T) { if err != nil { t.Fatal(err) } - as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost) + as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost) err = enableAuthAndCreateRoot(as) if err != nil { t.Fatal(err) @@ -64,7 +64,7 @@ func TestNewAuthStoreRevision(t *testing.T) { // no changes to commit b2 := backend.NewDefaultBackend(tPath) defer b2.Close() - as = NewAuthStore(zap.NewExample(), b2, nil, tp, bcrypt.MinCost) + as = NewAuthStore(zap.NewExample(), b2, tp, bcrypt.MinCost) defer as.Close() new := as.Revision() @@ -85,7 +85,7 @@ func TestNewAuthStoreBcryptCost(t *testing.T) { invalidCosts := [2]int{bcrypt.MinCost - 1, bcrypt.MaxCost + 1} for _, invalidCost := range invalidCosts { - as := NewAuthStore(zap.NewExample(), b, nil, tp, invalidCost) + as := NewAuthStore(zap.NewExample(), b, tp, invalidCost) defer as.Close() if as.BcryptCost() != bcrypt.DefaultCost { t.Fatalf("expected DefaultCost when bcryptcost is invalid") @@ -105,7 +105,7 @@ func setupAuthStore(t *testing.T) (store *authStore, teardownfunc func(t *testin if err != nil { t.Fatal(err) } - as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost) + as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost) err = enableAuthAndCreateRoot(as) if err != nil { t.Fatal(err) @@ -657,7 +657,7 @@ func TestAuthInfoFromCtxRace(t *testing.T) { if err != nil { t.Fatal(err) } - as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost) + as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost) defer as.Close() donec := make(chan struct{}) @@ -730,7 +730,7 @@ func TestRecoverFromSnapshot(t *testing.T) { if err != nil { t.Fatal(err) } - as2 := NewAuthStore(zap.NewExample(), as.be, nil, tp, bcrypt.MinCost) + as2 := NewAuthStore(zap.NewExample(), as.be, tp, bcrypt.MinCost) defer as2.Close() if !as2.IsAuthEnabled() { @@ -811,7 +811,7 @@ func TestRolesOrder(t *testing.T) { if err != nil { t.Fatal(err) } - as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost) + as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost) defer as.Close() err = enableAuthAndCreateRoot(as) if err != nil { @@ -867,7 +867,7 @@ func testAuthInfoFromCtxWithRoot(t *testing.T, opts string) { if err != nil { t.Fatal(err) } - as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost) + as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost) defer as.Close() if err = enableAuthAndCreateRoot(as); err != nil { diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index f7da9b4f156..9ea22aa35dc 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -598,7 +598,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } } - srv.authStore = auth.NewAuthStore(srv.Logger(), srv.be, srv.consistIndex, tp, int(cfg.BcryptCost)) + srv.authStore = auth.NewAuthStore(srv.Logger(), srv.be, tp, int(cfg.BcryptCost)) newSrv := srv // since srv == nil in defer if srv is returned as nil defer func() { diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 50e8533a97e..d8df5dfaa31 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -989,9 +989,9 @@ func TestSnapshot(t *testing.T) { lg: zap.NewExample(), r: *r, v2store: st, - consistIndex: cindex.NewConsistentIndex(be.BatchTx()), + consistIndex: cindex.NewConsistentIndex(be), } - srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, srv.consistIndex, mvcc.StoreConfig{}) + srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, mvcc.StoreConfig{}) srv.be = be ch := make(chan struct{}, 2) @@ -1074,11 +1074,11 @@ func TestSnapshotOrdering(t *testing.T) { snapshotter: snap.New(zap.NewExample(), snapdir), cluster: cl, SyncTicker: &time.Ticker{}, - consistIndex: cindex.NewConsistentIndex(be.BatchTx()), + consistIndex: cindex.NewConsistentIndex(be), } s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster} - s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, s.consistIndex, mvcc.StoreConfig{}) + s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, mvcc.StoreConfig{}) s.be = be s.start() @@ -1148,11 +1148,11 @@ func TestTriggerSnap(t *testing.T) { v2store: st, reqIDGen: idutil.NewGenerator(0, time.Time{}), SyncTicker: &time.Ticker{}, - consistIndex: cindex.NewConsistentIndex(be.BatchTx()), + consistIndex: cindex.NewConsistentIndex(be), } srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster} - srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, srv.consistIndex, mvcc.StoreConfig{}) + srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, mvcc.StoreConfig{}) srv.be = be srv.start() @@ -1227,11 +1227,11 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { snapshotter: snap.New(zap.NewExample(), testdir), cluster: cl, SyncTicker: &time.Ticker{}, - consistIndex: cindex.NewConsistentIndex(be.BatchTx()), + consistIndex: cindex.NewConsistentIndex(be), } s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster} - s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, s.consistIndex, mvcc.StoreConfig{}) + s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, mvcc.StoreConfig{}) s.be = be s.start() @@ -1562,7 +1562,7 @@ func TestPublishV3(t *testing.T) { w: w, reqIDGen: idutil.NewGenerator(0, time.Time{}), SyncTicker: &time.Ticker{}, - authStore: auth.NewAuthStore(lg, be, nil, nil, 0), + authStore: auth.NewAuthStore(lg, be, nil, 0), be: be, ctx: ctx, cancel: cancel, @@ -1633,7 +1633,7 @@ func TestPublishV3Retry(t *testing.T) { cluster: &membership.RaftCluster{}, reqIDGen: idutil.NewGenerator(0, time.Time{}), SyncTicker: &time.Ticker{}, - authStore: auth.NewAuthStore(lg, be, nil, nil, 0), + authStore: auth.NewAuthStore(lg, be, nil, 0), be: be, ctx: ctx, cancel: cancel, From 2dbecea5b26282b5ded675b5ba5922976340784f Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Mon, 12 Apr 2021 19:49:58 +0200 Subject: [PATCH 5/8] Simplify KVStore interaction with cindex thanks to hooks. --- server/etcdserver/api/v3rpc/maintenance.go | 2 +- server/etcdserver/cindex/cindex.go | 78 ++++++++++++++-------- server/etcdserver/cindex/cindex_test.go | 8 +-- server/etcdserver/server.go | 7 +- server/lease/lessor.go | 11 +-- server/mvcc/kv.go | 11 --- server/mvcc/kvstore.go | 29 +------- server/mvcc/kvstore_bench_test.go | 31 +++++---- server/mvcc/kvstore_txn.go | 1 - server/mvcc/watchable_store.go | 9 ++- 10 files changed, 84 insertions(+), 103 deletions(-) diff --git a/server/etcdserver/api/v3rpc/maintenance.go b/server/etcdserver/api/v3rpc/maintenance.go index dcacbf4978c..38cc9137163 100644 --- a/server/etcdserver/api/v3rpc/maintenance.go +++ b/server/etcdserver/api/v3rpc/maintenance.go @@ -34,7 +34,7 @@ import ( ) type KVGetter interface { - KV() mvcc.ConsistentWatchableKV + KV() mvcc.WatchableKV } type BackendGetter interface { diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index 73e96fd70c4..6f8661b6d2a 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -28,6 +28,10 @@ var ( ConsistentIndexKeyName = []byte("consistent_index") ) +type Backend interface { + BatchTx() backend.BatchTx +} + // ConsistentIndexer is an interface that wraps the Get/Set/Save method for consistentIndex. type ConsistentIndexer interface { @@ -41,32 +45,38 @@ type ConsistentIndexer interface { // It saves consistentIndex to the underlying stable storage. UnsafeSave(tx backend.BatchTx) - // SetBatchTx set the available backend.BatchTx for ConsistentIndexer. - SetBatchTx(tx backend.BatchTx) + // SetBackend set the available backend.BatchTx for ConsistentIndexer. + SetBackend(be Backend) } // consistentIndex implements the ConsistentIndexer interface. type consistentIndex struct { - tx backend.BatchTx // consistentIndex represents the offset of an entry in a consistent replica log. - // it caches the "consistent_index" key's value. Accessed - // through atomics so must be 64-bit aligned. + // it caches the "consistent_index" key's value. + // Accessed through atomics so must be 64-bit aligned. consistentIndex uint64 - mutex sync.Mutex + + // be is used for initial read consistentIndex + be Backend + // mutex is protecting be. + mutex sync.Mutex } -func NewConsistentIndex(tx backend.BatchTx) ConsistentIndexer { - return &consistentIndex{tx: tx} +// NewConsistentIndex creates a new consistent index. +// If `be` is nil, it must be set (SetBackend) before first access using `ConsistentIndex()`. +func NewConsistentIndex(be Backend) ConsistentIndexer { + return &consistentIndex{be: be} } func (ci *consistentIndex) ConsistentIndex() uint64 { - if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 { return index } ci.mutex.Lock() defer ci.mutex.Unlock() - v := ReadConsistentIndex(ci.tx) + + v := ReadConsistentIndex(ci.be.BatchTx()) + atomic.StoreUint64(&ci.consistentIndex, v) return v } @@ -76,18 +86,15 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64) { func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) { index := atomic.LoadUint64(&ci.consistentIndex) - - if index == 0 { - // Never save 0 as it means that we didn't loaded the real index yet. - return - } - unsafeUpdateConsistentIndex(tx, index) + UnsafeUpdateConsistentIndex(tx, index, true) } -func (ci *consistentIndex) SetBatchTx(tx backend.BatchTx) { +func (ci *consistentIndex) SetBackend(be Backend) { ci.mutex.Lock() defer ci.mutex.Unlock() - ci.tx = tx + ci.be = be + // After the backend is changed, the first access should re-read it. + ci.SetConsistentIndex(0) } func NewFakeConsistentIndex(index uint64) ConsistentIndexer { @@ -102,13 +109,21 @@ func (f *fakeConsistentIndex) SetConsistentIndex(index uint64) { atomic.StoreUint64(&f.index, index) } -func (f *fakeConsistentIndex) UnsafeSave(tx backend.BatchTx) {} -func (f *fakeConsistentIndex) SetBatchTx(tx backend.BatchTx) {} +func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {} +func (f *fakeConsistentIndex) SetBackend(_ Backend) {} +// UnsafeCreateMetaBucket creates the `meta` bucket (if it does not exists yet). func UnsafeCreateMetaBucket(tx backend.BatchTx) { tx.UnsafeCreateBucket(MetaBucketName) } +// CreateMetaBucket creates the `meta` bucket (if it does not exists yet). +func CreateMetaBucket(tx backend.BatchTx) { + tx.Lock() + defer tx.Unlock() + tx.UnsafeCreateBucket(MetaBucketName) +} + // unsafeGetConsistentIndex loads consistent index from given transaction. // returns 0 if the data are not found. func unsafeReadConsistentIndex(tx backend.ReadTx) uint64 { @@ -128,7 +143,19 @@ func ReadConsistentIndex(tx backend.ReadTx) uint64 { return unsafeReadConsistentIndex(tx) } -func unsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64) { +func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, onlyGrow bool) { + if index == 0 { + // Never save 0 as it means that we didn't loaded the real index yet. + return + } + + if onlyGrow { + oldi := unsafeReadConsistentIndex(tx) + if index <= oldi { + return + } + } + bs := make([]byte, 8) // this is kept on stack (not heap) so its quick. binary.BigEndian.PutUint64(bs, index) // put the index into the underlying backend @@ -136,13 +163,8 @@ func unsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64) { tx.UnsafePut(MetaBucketName, ConsistentIndexKeyName, bs) } -func UpdateConsistentIndex(tx backend.BatchTx, index uint64) { +func UpdateConsistentIndex(tx backend.BatchTx, index uint64, onlyGrow bool) { tx.Lock() defer tx.Unlock() - - oldi := unsafeReadConsistentIndex(tx) - if index <= oldi { - return - } - unsafeUpdateConsistentIndex(tx, index) + UnsafeUpdateConsistentIndex(tx, index, onlyGrow) } diff --git a/server/etcdserver/cindex/cindex_test.go b/server/etcdserver/cindex/cindex_test.go index eb577b8fd64..c500260d38a 100644 --- a/server/etcdserver/cindex/cindex_test.go +++ b/server/etcdserver/cindex/cindex_test.go @@ -27,13 +27,14 @@ import ( func TestConsistentIndex(t *testing.T) { be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10) - ci := NewConsistentIndex(be.BatchTx()) + ci := NewConsistentIndex(be) tx := be.BatchTx() if tx == nil { t.Fatal("batch tx is nil") } tx.Lock() + UnsafeCreateMetaBucket(tx) tx.Unlock() be.ForceCommit() @@ -51,14 +52,13 @@ func TestConsistentIndex(t *testing.T) { b := backend.NewDefaultBackend(tmpPath) defer b.Close() - ci.SetConsistentIndex(0) - ci.SetBatchTx(b.BatchTx()) + ci.SetBackend(b) index = ci.ConsistentIndex() if index != r { t.Errorf("expected %d,got %d", r, index) } - ci = NewConsistentIndex(b.BatchTx()) + ci = NewConsistentIndex(b) index = ci.ConsistentIndex() if index != r { t.Errorf("expected %d,got %d", r, index) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 9ea22aa35dc..2a76dca2185 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -256,7 +256,7 @@ type EtcdServer struct { applyV3Internal applierV3Internal applyWait wait.WaitTime - kv mvcc.ConsistentWatchableKV + kv mvcc.WatchableKV lessor lease.Lessor bemu sync.Mutex be backend.Backend @@ -1210,8 +1210,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { lg.Panic("failed to restore mvcc store", zap.Error(err)) } - s.consistIndex.SetConsistentIndex(s.kv.ConsistentIndex()) - lg.Info("restored mvcc store") + lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex())) // Closing old backend might block until all the txns // on the backend are finished. @@ -2522,7 +2521,7 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error { } } -func (s *EtcdServer) KV() mvcc.ConsistentWatchableKV { return s.kv } +func (s *EtcdServer) KV() mvcc.WatchableKV { return s.kv } func (s *EtcdServer) Backend() backend.Backend { s.bemu.Lock() defer s.bemu.Unlock() diff --git a/server/lease/lessor.go b/server/lease/lessor.go index a12591e46ef..7a1544e9395 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -294,7 +294,7 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) { } le.leaseMap[id] = l - l.persistTo(le.b, le.ci) + l.persistTo(le.b) leaseTotalTTLs.Observe(float64(l.ttl)) leaseGranted.Inc() @@ -341,10 +341,6 @@ func (le *lessor) Revoke(id LeaseID) error { // kv deletion. Or we might end up with not executing the revoke or not // deleting the keys if etcdserver fails in between. le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID))) - // if len(keys) > 0, txn.End() will call ci.UnsafeSave function. - if le.ci != nil && len(keys) == 0 { - le.ci.UnsafeSave(le.b.BatchTx()) - } txn.End() @@ -828,7 +824,7 @@ func (l *Lease) expired() bool { return l.Remaining() <= 0 } -func (l *Lease) persistTo(b backend.Backend, ci cindex.ConsistentIndexer) { +func (l *Lease) persistTo(b backend.Backend) { key := int64ToBytes(int64(l.ID)) lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL} @@ -839,9 +835,6 @@ func (l *Lease) persistTo(b backend.Backend, ci cindex.ConsistentIndexer) { b.BatchTx().Lock() b.BatchTx().UnsafePut(leaseBucketName, key, val) - if ci != nil { - ci.UnsafeSave(b.BatchTx()) - } b.BatchTx().Unlock() } diff --git a/server/mvcc/kv.go b/server/mvcc/kv.go index b8cd982da6e..15ad263288a 100644 --- a/server/mvcc/kv.go +++ b/server/mvcc/kv.go @@ -139,14 +139,3 @@ type Watchable interface { // watch events happened or happening on the KV. NewWatchStream() WatchStream } - -// ConsistentWatchableKV is a WatchableKV that understands the consistency -// algorithm and consistent index. -// If the consistent index of executing entry is not larger than the -// consistent index of ConsistentWatchableKV, all operations in -// this entry are skipped and return empty response. -type ConsistentWatchableKV interface { - WatchableKV - // ConsistentIndex returns the current consistent index of the KV. - ConsistentIndex() uint64 -} diff --git a/server/mvcc/kvstore.go b/server/mvcc/kvstore.go index ac79b07b1c5..df39301136a 100644 --- a/server/mvcc/kvstore.go +++ b/server/mvcc/kvstore.go @@ -69,8 +69,6 @@ type store struct { // mu read locks for txns and write locks for non-txn store changes. mu sync.RWMutex - ci cindex.ConsistentIndexer - b backend.Backend kvindex index @@ -94,7 +92,7 @@ type store struct { // NewStore returns a new store. It is useful to create a store inside // mvcc pkg. It should only be used for testing externally. -func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) *store { +func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *store { if lg == nil { lg = zap.NewNop() } @@ -104,7 +102,6 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.Cons s := &store{ cfg: cfg, b: b, - ci: ci, kvindex: newTreeIndex(lg), le: le, @@ -314,11 +311,6 @@ func init() { func (s *store) Commit() { s.mu.Lock() defer s.mu.Unlock() - - tx := s.b.BatchTx() - tx.Lock() - s.saveIndex(tx) - tx.Unlock() s.b.ForceCommit() } @@ -342,8 +334,6 @@ func (s *store) Restore(b backend.Backend) error { s.fifoSched = schedule.NewFIFOScheduler() s.stopc = make(chan struct{}) - s.ci.SetBatchTx(b.BatchTx()) - s.ci.SetConsistentIndex(0) return s.restore() } @@ -436,9 +426,7 @@ func (s *store) restore() error { tx.Unlock() - s.lg.Info("kvstore restored", - zap.Uint64("consistent-index", s.ConsistentIndex()), - zap.Int64("current-rev", s.currentRev)) + s.lg.Info("kvstore restored", zap.Int64("current-rev", s.currentRev)) if scheduledCompact != 0 { if _, err := s.compactLockfree(scheduledCompact); err != nil { @@ -533,19 +521,6 @@ func (s *store) Close() error { return nil } -func (s *store) saveIndex(tx backend.BatchTx) { - if s.ci != nil { - s.ci.UnsafeSave(tx) - } -} - -func (s *store) ConsistentIndex() uint64 { - if s.ci != nil { - return s.ci.ConsistentIndex() - } - return 0 -} - func (s *store) setupMetricsReporter() { b := s.b reportDbTotalSizeInBytesMu.Lock() diff --git a/server/mvcc/kvstore_bench_test.go b/server/mvcc/kvstore_bench_test.go index 910bacf3050..9ea70dad414 100644 --- a/server/mvcc/kvstore_bench_test.go +++ b/server/mvcc/kvstore_bench_test.go @@ -18,6 +18,7 @@ import ( "context" "testing" + "github.com/stretchr/testify/assert" "go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/lease" @@ -28,7 +29,7 @@ import ( func BenchmarkStorePut(b *testing.B) { be, tmpPath := betesting.NewDefaultTmpBackend(b) - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -47,7 +48,7 @@ func BenchmarkStoreRangeKey100(b *testing.B) { benchmarkStoreRange(b, 100) } func benchmarkStoreRange(b *testing.B, n int) { be, tmpPath := betesting.NewDefaultTmpBackend(b) - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, be, tmpPath) // 64 byte key/val @@ -73,26 +74,30 @@ func benchmarkStoreRange(b *testing.B, n int) { } func BenchmarkConsistentIndex(b *testing.B) { - be, tmpPath := betesting.NewDefaultTmpBackend(b) - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) - defer cleanup(s, be, tmpPath) + be, _ := betesting.NewDefaultTmpBackend(b) + ci := cindex.NewConsistentIndex(be) + defer betesting.Close(b, be) + + // This will force the index to be reread from scratch on each call. + ci.SetConsistentIndex(0) - tx := s.b.BatchTx() + tx := be.BatchTx() tx.Lock() - s.saveIndex(tx) + cindex.UnsafeCreateMetaBucket(tx) + ci.UnsafeSave(tx) tx.Unlock() b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - s.ConsistentIndex() + ci.ConsistentIndex() } } // BenchmarkStoreTxnPutUpdate is same as above, but instead updates single key func BenchmarkStorePutUpdate(b *testing.B) { be, tmpPath := betesting.NewDefaultTmpBackend(b) - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -110,7 +115,7 @@ func BenchmarkStorePutUpdate(b *testing.B) { // some synchronization operations, such as mutex locking. func BenchmarkStoreTxnPut(b *testing.B) { be, tmpPath := betesting.NewDefaultTmpBackend(b) - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -130,7 +135,7 @@ func BenchmarkStoreTxnPut(b *testing.B) { // benchmarkStoreRestore benchmarks the restore operation func benchmarkStoreRestore(revsPerKey int, b *testing.B) { be, tmpPath := betesting.NewDefaultTmpBackend(b) - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{}) // use closure to capture 's' to pick up the reassignment defer func() { cleanup(s, be, tmpPath) }() @@ -146,11 +151,11 @@ func benchmarkStoreRestore(revsPerKey int, b *testing.B) { txn.End() } } - s.Close() + assert.NoError(b, s.Close()) b.ReportAllocs() b.ResetTimer() - s = NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) + s = NewStore(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{}) } func BenchmarkStoreRestoreRevs1(b *testing.B) { diff --git a/server/mvcc/kvstore_txn.go b/server/mvcc/kvstore_txn.go index 870c710ec24..155a13d0730 100644 --- a/server/mvcc/kvstore_txn.go +++ b/server/mvcc/kvstore_txn.go @@ -104,7 +104,6 @@ func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 { func (tw *storeTxnWrite) End() { // only update index if the txn modifies the mvcc state. if len(tw.changes) != 0 { - tw.s.saveIndex(tw.tx) // hold revMu lock to prevent new read txns from opening until writeback. tw.s.revMu.Lock() tw.s.currentRev++ diff --git a/server/mvcc/watchable_store.go b/server/mvcc/watchable_store.go index ff5bdb78492..63529ed672e 100644 --- a/server/mvcc/watchable_store.go +++ b/server/mvcc/watchable_store.go @@ -20,7 +20,6 @@ import ( "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/pkg/v3/traceutil" - "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/mvcc/backend" @@ -70,16 +69,16 @@ type watchableStore struct { // cancel operations. type cancelFunc func() -func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) ConsistentWatchableKV { - return newWatchableStore(lg, b, le, ci, cfg) +func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) WatchableKV { + return newWatchableStore(lg, b, le, cfg) } -func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) *watchableStore { +func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore { if lg == nil { lg = zap.NewNop() } s := &watchableStore{ - store: NewStore(lg, b, le, ci, cfg), + store: NewStore(lg, b, le, cfg), victimc: make(chan struct{}, 1), unsynced: newWatcherGroup(), synced: newWatcherGroup(), From 2fb6f0a74bfb85a5b2699f566b1113842dfadff4 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Wed, 28 Apr 2021 08:49:40 +0200 Subject: [PATCH 6/8] Simplify lease management after cindex update is moved to 'hooks'. --- server/etcdserver/server.go | 34 ++++++++++++----------------- server/lease/leasehttp/http_test.go | 6 ++--- server/lease/lessor.go | 9 +++----- server/lease/lessor_bench_test.go | 2 +- server/lease/lessor_test.go | 30 ++++++++++++------------- 5 files changed, 36 insertions(+), 45 deletions(-) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 2a76dca2185..e5c3dc1a2d5 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -301,9 +301,7 @@ type backendHooks struct { } func (bh *backendHooks) OnPreCommitUnsafe(tx backend.BatchTx) { - if bh.indexer != nil { - bh.indexer.UnsafeSave(tx) - } + bh.indexer.UnsafeSave(tx) } // NewServer creates a new EtcdServer from the supplied configuration. The @@ -358,8 +356,11 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { bepath := cfg.BackendPath() beExist := fileutil.Exist(bepath) - beHooks := &backendHooks{lg: cfg.Logger} + ci := cindex.NewConsistentIndex(nil) + beHooks := &backendHooks{lg: cfg.Logger, indexer: ci} be := openBackend(cfg, beHooks) + ci.SetBackend(be) + cindex.CreateMetaBucket(be.BatchTx()) defer func() { if err != nil { @@ -543,7 +544,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { peerRt: prt, reqIDGen: idutil.NewGenerator(uint16(id), time.Now()), AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, - consistIndex: cindex.NewConsistentIndex(be.BatchTx()), + consistIndex: ci, firstCommitInTermC: make(chan struct{}), } serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1) @@ -556,16 +557,11 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. - srv.lessor = lease.NewLessor( - srv.Logger(), - srv.be, - lease.LessorConfig{ - MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), - CheckpointInterval: cfg.LeaseCheckpointInterval, - ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(), - }, - srv.consistIndex, - ) + srv.lessor = lease.NewLessor(srv.Logger(), srv.be, lease.LessorConfig{ + MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), + CheckpointInterval: cfg.LeaseCheckpointInterval, + ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(), + }) tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken, func(index uint64) <-chan struct{} { @@ -577,12 +573,9 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { cfg.Logger.Warn("failed to create token provider", zap.Error(err)) return nil, err } - srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) - - // Backend should contain 'meta' bucket going forward. - beHooks.indexer = srv.consistIndex + srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) - kvindex := srv.consistIndex.ConsistentIndex() + kvindex := ci.ConsistentIndex() srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex)) if beExist { // TODO: remove kvindex != 0 checking when we do not expect users to upgrade @@ -1210,6 +1203,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { lg.Panic("failed to restore mvcc store", zap.Error(err)) } + s.consistIndex.SetBackend(newbe) lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex())) // Closing old backend might block until all the txns diff --git a/server/lease/leasehttp/http_test.go b/server/lease/leasehttp/http_test.go index 1a0ca486512..ada3d3a2e2a 100644 --- a/server/lease/leasehttp/http_test.go +++ b/server/lease/leasehttp/http_test.go @@ -31,7 +31,7 @@ func TestRenewHTTP(t *testing.T) { be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) defer betesting.Close(t, be) - le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}, nil) + le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) le.Promote(time.Second) l, err := le.Grant(1, int64(5)) if err != nil { @@ -55,7 +55,7 @@ func TestTimeToLiveHTTP(t *testing.T) { be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) defer betesting.Close(t, be) - le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}, nil) + le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) le.Promote(time.Second) l, err := le.Grant(1, int64(5)) if err != nil { @@ -96,7 +96,7 @@ func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) { be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) defer betesting.Close(t, be) - le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}, nil) + le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) le.Promote(time.Second) l, err := le.Grant(1, int64(5)) if err != nil { diff --git a/server/lease/lessor.go b/server/lease/lessor.go index 7a1544e9395..5dba54db02e 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -25,7 +25,6 @@ import ( "time" pb "go.etcd.io/etcd/api/v3/etcdserverpb" - "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/lease/leasepb" "go.etcd.io/etcd/server/v3/mvcc/backend" "go.uber.org/zap" @@ -182,7 +181,6 @@ type lessor struct { checkpointInterval time.Duration // the interval to check if the expired lease is revoked expiredLeaseRetryInterval time.Duration - ci cindex.ConsistentIndexer } type LessorConfig struct { @@ -191,11 +189,11 @@ type LessorConfig struct { ExpiredLeasesRetryInterval time.Duration } -func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig, ci cindex.ConsistentIndexer) Lessor { - return newLessor(lg, b, cfg, ci) +func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor { + return newLessor(lg, b, cfg) } -func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig, ci cindex.ConsistentIndexer) *lessor { +func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor { checkpointInterval := cfg.CheckpointInterval expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval if checkpointInterval == 0 { @@ -218,7 +216,6 @@ func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig, ci cindex.Co stopC: make(chan struct{}), doneC: make(chan struct{}), lg: lg, - ci: ci, } l.initAndRecover() diff --git a/server/lease/lessor_bench_test.go b/server/lease/lessor_bench_test.go index 46b702de33b..06feec810c5 100644 --- a/server/lease/lessor_bench_test.go +++ b/server/lease/lessor_bench_test.go @@ -68,7 +68,7 @@ func setUp(t testing.TB) (le *lessor, tearDown func()) { be, _ := betesting.NewDefaultTmpBackend(t) // MinLeaseTTL is negative, so we can grant expired lease in benchmark. // ExpiredLeasesRetryInterval should small, so benchmark of findExpired will recheck expired lease. - le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond}, nil) + le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond}) le.SetRangeDeleter(func() TxnDelete { ftd := &FakeTxnDelete{be.BatchTx()} ftd.Lock() diff --git a/server/lease/lessor_test.go b/server/lease/lessor_test.go index 61b93092965..c6cb0518e84 100644 --- a/server/lease/lessor_test.go +++ b/server/lease/lessor_test.go @@ -45,7 +45,7 @@ func TestLessorGrant(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.Promote(0) @@ -107,7 +107,7 @@ func TestLeaseConcurrentKeys(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) }) @@ -156,7 +156,7 @@ func TestLessorRevoke(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() var fd *fakeDeleter le.SetRangeDeleter(func() TxnDelete { @@ -209,7 +209,7 @@ func TestLessorRenew(t *testing.T) { defer be.Close() defer os.RemoveAll(dir) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.Promote(0) @@ -242,7 +242,7 @@ func TestLessorRenewWithCheckpointer(t *testing.T) { defer be.Close() defer os.RemoveAll(dir) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) { for _, cp := range cp.GetCheckpoints() { le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL()) @@ -291,7 +291,7 @@ func TestLessorRenewExtendPileup(t *testing.T) { dir, be := NewTestBackend(t) defer os.RemoveAll(dir) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) ttl := int64(10) for i := 1; i <= leaseRevokeRate*10; i++ { if _, err := le.Grant(LeaseID(2*i), ttl); err != nil { @@ -310,7 +310,7 @@ func TestLessorRenewExtendPileup(t *testing.T) { bcfg.Path = filepath.Join(dir, "be") be = backend.New(bcfg) defer be.Close() - le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() // extend after recovery should extend expiration on lease pile-up @@ -340,7 +340,7 @@ func TestLessorDetach(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) }) @@ -381,7 +381,7 @@ func TestLessorRecover(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() l1, err1 := le.Grant(1, 10) l2, err2 := le.Grant(2, 20) @@ -390,7 +390,7 @@ func TestLessorRecover(t *testing.T) { } // Create a new lessor with the same backend - nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer nle.Stop() nl1 := nle.Lookup(l1.ID) if nl1 == nil || nl1.ttl != l1.ttl { @@ -411,7 +411,7 @@ func TestLessorExpire(t *testing.T) { testMinTTL := int64(1) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}) defer le.Stop() le.Promote(1 * time.Second) @@ -464,7 +464,7 @@ func TestLessorExpireAndDemote(t *testing.T) { testMinTTL := int64(1) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}) defer le.Stop() le.Promote(1 * time.Second) @@ -513,7 +513,7 @@ func TestLessorMaxTTL(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() _, err := le.Grant(1, MaxLeaseTTL+1) @@ -529,7 +529,7 @@ func TestLessorCheckpointScheduling(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second}) le.minLeaseTTL = 1 checkpointedC := make(chan struct{}) le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) { @@ -564,7 +564,7 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}, nil) + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() l, err := le.Grant(1, 10) if err != nil { From a78d072b9a2ec5561a4301f19981e18bc5a0e140 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Tue, 4 May 2021 15:38:11 +0200 Subject: [PATCH 7/8] Simplify KVstore dependency on cindex. --- etcdctl/ctlv2/command/backup_command.go | 4 +- etcdctl/snapshot/v3_snapshot.go | 5 +-- server/mvcc/kv_test.go | 38 +++++++++---------- server/mvcc/kvstore_compaction_test.go | 6 +-- server/mvcc/kvstore_test.go | 20 +++++----- server/mvcc/watchable_store_bench_test.go | 11 +++--- server/mvcc/watchable_store_test.go | 27 +++++++------ server/mvcc/watcher_bench_test.go | 2 +- server/mvcc/watcher_test.go | 16 ++++---- .../integration/clientv3/maintenance_test.go | 2 +- tests/integration/v3_alarm_test.go | 3 +- tools/benchmark/cmd/mvcc.go | 2 +- 12 files changed, 65 insertions(+), 71 deletions(-) diff --git a/etcdctl/ctlv2/command/backup_command.go b/etcdctl/ctlv2/command/backup_command.go index c8e1fe540ad..e3ec04eead3 100644 --- a/etcdctl/ctlv2/command/backup_command.go +++ b/etcdctl/ctlv2/command/backup_command.go @@ -322,9 +322,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCl tx.Lock() defer tx.Unlock() cindex.UnsafeCreateMetaBucket(tx) - ci := cindex.NewConsistentIndex(tx) - ci.SetConsistentIndex(idx) - ci.UnsafeSave(tx) + cindex.UnsafeUpdateConsistentIndex(tx, idx, false) } else { // Thanks to translateWAL not moving entries, but just replacing them with // 'empty', there is no need to update the consistency index. diff --git a/etcdctl/snapshot/v3_snapshot.go b/etcdctl/snapshot/v3_snapshot.go index 0fc0e1497e9..e726a59f9aa 100644 --- a/etcdctl/snapshot/v3_snapshot.go +++ b/etcdctl/snapshot/v3_snapshot.go @@ -478,8 +478,7 @@ func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) { func (s *v3Manager) updateCIndex(commit uint64) error { be := backend.NewDefaultBackend(s.outDbPath()) defer be.Close() - ci := cindex.NewConsistentIndex(be.BatchTx()) - ci.SetConsistentIndex(commit) - ci.UnsafeSave(be.BatchTx()) + + cindex.UpdateConsistentIndex(be.BatchTx(), commit, false) return nil } diff --git a/server/mvcc/kv_test.go b/server/mvcc/kv_test.go index 19d6539d833..524f72e6900 100644 --- a/server/mvcc/kv_test.go +++ b/server/mvcc/kv_test.go @@ -79,7 +79,7 @@ func TestKVTxnRange(t *testing.T) { testKVRange(t, txnRangeFunc) } func testKVRange(t *testing.T, f rangeFunc) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b, tmpPath) kvs := put3TestKVs(s) @@ -145,7 +145,7 @@ func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, txnRangeFunc) } func testKVRangeRev(t *testing.T, f rangeFunc) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b, tmpPath) kvs := put3TestKVs(s) @@ -181,7 +181,7 @@ func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, txnRangeFunc) } func testKVRangeBadRev(t *testing.T, f rangeFunc) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b, tmpPath) put3TestKVs(s) @@ -214,7 +214,7 @@ func TestKVTxnRangeLimit(t *testing.T) { testKVRangeLimit(t, txnRangeFunc) } func testKVRangeLimit(t *testing.T, f rangeFunc) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b, tmpPath) kvs := put3TestKVs(s) @@ -259,7 +259,7 @@ func TestKVTxnPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, txnPutF func testKVPutMultipleTimes(t *testing.T, f putFunc) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b, tmpPath) for i := 0; i < 10; i++ { @@ -321,7 +321,7 @@ func testKVDeleteRange(t *testing.T, f deleteRangeFunc) { for i, tt := range tests { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) s.Put([]byte("foo"), []byte("bar"), lease.NoLease) s.Put([]byte("foo1"), []byte("bar1"), lease.NoLease) @@ -341,7 +341,7 @@ func TestKVTxnDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, t func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b, tmpPath) s.Put([]byte("foo"), []byte("bar"), lease.NoLease) @@ -362,7 +362,7 @@ func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) { // test that range, put, delete on single key in sequence repeatedly works correctly. func TestKVOperationInSequence(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b, tmpPath) for i := 0; i < 10; i++ { @@ -409,7 +409,7 @@ func TestKVOperationInSequence(t *testing.T) { func TestKVTxnBlockWriteOperations(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) tests := []func(){ func() { s.Put([]byte("foo"), nil, lease.NoLease) }, @@ -443,7 +443,7 @@ func TestKVTxnBlockWriteOperations(t *testing.T) { func TestKVTxnNonBlockRange(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b, tmpPath) txn := s.Write(traceutil.TODO()) @@ -464,7 +464,7 @@ func TestKVTxnNonBlockRange(t *testing.T) { // test that txn range, put, delete on single key in sequence repeatedly works correctly. func TestKVTxnOperationInSequence(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b, tmpPath) for i := 0; i < 10; i++ { @@ -514,7 +514,7 @@ func TestKVTxnOperationInSequence(t *testing.T) { func TestKVCompactReserveLastValue(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b, tmpPath) s.Put([]byte("foo"), []byte("bar0"), 1) @@ -568,7 +568,7 @@ func TestKVCompactReserveLastValue(t *testing.T) { func TestKVCompactBad(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b, tmpPath) s.Put([]byte("foo"), []byte("bar0"), lease.NoLease) @@ -601,7 +601,7 @@ func TestKVHash(t *testing.T) { for i := 0; i < len(hashes); i++ { var err error b, tmpPath := betesting.NewDefaultTmpBackend(t) - kv := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + kv := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) kv.Put([]byte("foo0"), []byte("bar0"), lease.NoLease) kv.Put([]byte("foo1"), []byte("bar0"), lease.NoLease) hashes[i], _, err = kv.Hash() @@ -639,7 +639,7 @@ func TestKVRestore(t *testing.T) { } for i, tt := range tests { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) tt(s) var kvss [][]mvccpb.KeyValue for k := int64(0); k < 10; k++ { @@ -651,7 +651,7 @@ func TestKVRestore(t *testing.T) { s.Close() // ns should recover the the previous state from backend. - ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) if keysRestore := readGaugeInt(keysGauge); keysBefore != keysRestore { t.Errorf("#%d: got %d key count, expected %d", i, keysRestore, keysBefore) @@ -683,7 +683,7 @@ func readGaugeInt(g prometheus.Gauge) int { func TestKVSnapshot(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b, tmpPath) wkvs := put3TestKVs(s) @@ -703,7 +703,7 @@ func TestKVSnapshot(t *testing.T) { } f.Close() - ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer ns.Close() r, err := ns.Range(context.TODO(), []byte("a"), []byte("z"), RangeOptions{}) if err != nil { @@ -719,7 +719,7 @@ func TestKVSnapshot(t *testing.T) { func TestWatchableKVWatch(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() diff --git a/server/mvcc/kvstore_compaction_test.go b/server/mvcc/kvstore_compaction_test.go index 559fc07c5a2..0c65c6ec19d 100644 --- a/server/mvcc/kvstore_compaction_test.go +++ b/server/mvcc/kvstore_compaction_test.go @@ -67,7 +67,7 @@ func TestScheduleCompaction(t *testing.T) { } for i, tt := range tests { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) tx := s.b.BatchTx() tx.Lock() @@ -101,7 +101,7 @@ func TestScheduleCompaction(t *testing.T) { func TestCompactAllAndRestore(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer os.Remove(tmpPath) s0.Put([]byte("foo"), []byte("bar"), lease.NoLease) @@ -127,7 +127,7 @@ func TestCompactAllAndRestore(t *testing.T) { t.Fatal(err) } - s1 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s1 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) if s1.Rev() != rev { t.Errorf("rev = %v, want %v", s1.Rev(), rev) } diff --git a/server/mvcc/kvstore_test.go b/server/mvcc/kvstore_test.go index 10fa500535b..256968583d4 100644 --- a/server/mvcc/kvstore_test.go +++ b/server/mvcc/kvstore_test.go @@ -43,7 +43,7 @@ import ( func TestStoreRev(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer s.Close() for i := 1; i <= 3; i++ { @@ -426,7 +426,7 @@ func TestRestoreDelete(t *testing.T) { defer func() { restoreChunkKeys = oldChunk }() b, _ := betesting.NewDefaultTmpBackend(t) - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) keys := make(map[string]struct{}) for i := 0; i < 20; i++ { @@ -451,7 +451,7 @@ func TestRestoreDelete(t *testing.T) { } s.Close() - s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer s.Close() for i := 0; i < 20; i++ { ks := fmt.Sprintf("foo-%d", i) @@ -473,7 +473,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { tests := []string{"recreate", "restore"} for _, test := range tests { b, _ := betesting.NewDefaultTmpBackend(t) - s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) s0.Put([]byte("foo"), []byte("bar"), lease.NoLease) s0.Put([]byte("foo"), []byte("bar1"), lease.NoLease) @@ -492,7 +492,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { var s *store switch test { case "recreate": - s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) case "restore": s0.Restore(b) s = s0 @@ -534,7 +534,7 @@ type hashKVResult struct { // TestHashKVWhenCompacting ensures that HashKV returns correct hash when compacting. func TestHashKVWhenCompacting(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer os.Remove(tmpPath) rev := 10000 @@ -602,7 +602,7 @@ func TestHashKVWhenCompacting(t *testing.T) { // correct hash value with latest revision. func TestHashKVZeroRevision(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer os.Remove(tmpPath) rev := 10000 @@ -635,7 +635,7 @@ func TestTxnPut(t *testing.T) { vals := createBytesSlice(bytesN, sliceN) b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b, tmpPath) for i := 0; i < sliceN; i++ { @@ -651,7 +651,7 @@ func TestTxnPut(t *testing.T) { // TestConcurrentReadNotBlockingWrite ensures Read does not blocking Write after its creation func TestConcurrentReadNotBlockingWrite(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer os.Remove(tmpPath) // write something to read later @@ -720,7 +720,7 @@ func TestConcurrentReadTxAndWrite(t *testing.T) { mu sync.Mutex // mu protects committedKVs ) b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer os.Remove(tmpPath) var wg sync.WaitGroup diff --git a/server/mvcc/watchable_store_bench_test.go b/server/mvcc/watchable_store_bench_test.go index 9642b69e9b8..0cdc09e3ba0 100644 --- a/server/mvcc/watchable_store_bench_test.go +++ b/server/mvcc/watchable_store_bench_test.go @@ -20,7 +20,6 @@ import ( "testing" "go.etcd.io/etcd/pkg/v3/traceutil" - "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/lease" betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" @@ -29,7 +28,7 @@ import ( func BenchmarkWatchableStorePut(b *testing.B) { be, tmpPath := betesting.NewDefaultTmpBackend(b) - s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{}) + s := New(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -49,7 +48,7 @@ func BenchmarkWatchableStorePut(b *testing.B) { // some synchronization operations, such as mutex locking. func BenchmarkWatchableStoreTxnPut(b *testing.B) { be, tmpPath := betesting.NewDefaultTmpBackend(b) - s := New(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) + s := New(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -80,7 +79,7 @@ func BenchmarkWatchableStoreWatchPutUnsync(b *testing.B) { func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) { be, tmpPath := betesting.NewDefaultTmpBackend(b) - s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, be, tmpPath) k := []byte("testkey") @@ -123,7 +122,7 @@ func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) { // we should put to simulate the real-world use cases. func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { be, tmpPath := betesting.NewDefaultTmpBackend(b) - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{}) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{}) // manually create watchableStore instead of newWatchableStore // because newWatchableStore periodically calls syncWatchersLoop @@ -180,7 +179,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { func BenchmarkWatchableStoreSyncedCancel(b *testing.B) { be, tmpPath := betesting.NewDefaultTmpBackend(b) - s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{}) defer func() { s.store.Close() diff --git a/server/mvcc/watchable_store_test.go b/server/mvcc/watchable_store_test.go index c0f566f2d65..bc09a4a0366 100644 --- a/server/mvcc/watchable_store_test.go +++ b/server/mvcc/watchable_store_test.go @@ -25,7 +25,6 @@ import ( "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/pkg/v3/traceutil" - "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/lease" betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" "go.uber.org/zap" @@ -33,7 +32,7 @@ import ( func TestWatch(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer func() { s.store.Close() @@ -55,7 +54,7 @@ func TestWatch(t *testing.T) { func TestNewWatcherCancel(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer func() { s.store.Close() @@ -87,7 +86,7 @@ func TestCancelUnsynced(t *testing.T) { // method to sync watchers in unsynced map. We want to keep watchers // in unsynced to test if syncWatchers works as expected. s := &watchableStore{ - store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}), + store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}), unsynced: newWatcherGroup(), // to make the test not crash from assigning to nil map. @@ -142,7 +141,7 @@ func TestSyncWatchers(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) s := &watchableStore{ - store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}), + store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}), unsynced: newWatcherGroup(), synced: newWatcherGroup(), } @@ -225,7 +224,7 @@ func TestSyncWatchers(t *testing.T) { // TestWatchCompacted tests a watcher that watches on a compacted revision. func TestWatchCompacted(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer func() { s.store.Close() @@ -262,7 +261,7 @@ func TestWatchCompacted(t *testing.T) { func TestWatchFutureRev(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer func() { s.store.Close() @@ -303,7 +302,7 @@ func TestWatchRestore(t *testing.T) { test := func(delay time.Duration) func(t *testing.T) { return func(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, cindex.NewConsistentIndex(b.BatchTx()), StoreConfig{}) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b, tmpPath) testKey := []byte("foo") @@ -311,7 +310,7 @@ func TestWatchRestore(t *testing.T) { rev := s.Put(testKey, testValue, lease.NoLease) newBackend, newPath := betesting.NewDefaultTmpBackend(t) - newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, cindex.NewConsistentIndex(newBackend.BatchTx()), StoreConfig{}) + newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(newStore, newBackend, newPath) w := newStore.NewWatchStream() @@ -349,11 +348,11 @@ func TestWatchRestore(t *testing.T) { // 5. choose the watcher from step 1, without panic func TestWatchRestoreSyncedWatcher(t *testing.T) { b1, b1Path := betesting.NewDefaultTmpBackend(t) - s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, cindex.NewConsistentIndex(b1.BatchTx()), StoreConfig{}) + s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s1, b1, b1Path) b2, b2Path := betesting.NewDefaultTmpBackend(t) - s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, cindex.NewConsistentIndex(b2.BatchTx()), StoreConfig{}) + s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s2, b2, b2Path) testKey, testValue := []byte("foo"), []byte("bar") @@ -400,7 +399,7 @@ func TestWatchRestoreSyncedWatcher(t *testing.T) { // TestWatchBatchUnsynced tests batching on unsynced watchers func TestWatchBatchUnsynced(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) oldMaxRevs := watchBatchMaxRevs defer func() { @@ -534,7 +533,7 @@ func TestWatchVictims(t *testing.T) { oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer func() { s.store.Close() @@ -612,7 +611,7 @@ func TestWatchVictims(t *testing.T) { // canceling its watches. func TestStressWatchCancelClose(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer func() { s.store.Close() diff --git a/server/mvcc/watcher_bench_test.go b/server/mvcc/watcher_bench_test.go index efd9011f76b..5a90a46ebb6 100644 --- a/server/mvcc/watcher_bench_test.go +++ b/server/mvcc/watcher_bench_test.go @@ -26,7 +26,7 @@ import ( func BenchmarkKVWatcherMemoryUsage(b *testing.B) { be, tmpPath := betesting.NewDefaultTmpBackend(b) - watchable := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{}) + watchable := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(watchable, be, tmpPath) diff --git a/server/mvcc/watcher_test.go b/server/mvcc/watcher_test.go index 3b598969d01..bbada4ed5dc 100644 --- a/server/mvcc/watcher_test.go +++ b/server/mvcc/watcher_test.go @@ -32,7 +32,7 @@ import ( // and the watched event attaches the correct watchID. func TestWatcherWatchID(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -82,7 +82,7 @@ func TestWatcherWatchID(t *testing.T) { func TestWatcherRequestsCustomID(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -119,7 +119,7 @@ func TestWatcherRequestsCustomID(t *testing.T) { // and returns events with matching prefixes. func TestWatcherWatchPrefix(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -193,7 +193,7 @@ func TestWatcherWatchPrefix(t *testing.T) { // does not create watcher, which panics when canceling in range tree. func TestWatcherWatchWrongRange(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -213,7 +213,7 @@ func TestWatcherWatchWrongRange(t *testing.T) { func TestWatchDeleteRange(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) defer func() { s.store.Close() @@ -252,7 +252,7 @@ func TestWatchDeleteRange(t *testing.T) { // with given id inside watchStream. func TestWatchStreamCancelWatcherByID(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -295,7 +295,7 @@ func TestWatcherRequestProgress(t *testing.T) { // method to sync watchers in unsynced map. We want to keep watchers // in unsynced to test if syncWatchers works as expected. s := &watchableStore{ - store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}), + store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}), unsynced: newWatcherGroup(), synced: newWatcherGroup(), } @@ -344,7 +344,7 @@ func TestWatcherRequestProgress(t *testing.T) { func TestWatcherWatchWithFilter(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() diff --git a/tests/integration/clientv3/maintenance_test.go b/tests/integration/clientv3/maintenance_test.go index 586e0d6edcd..dfef11a6c49 100644 --- a/tests/integration/clientv3/maintenance_test.go +++ b/tests/integration/clientv3/maintenance_test.go @@ -149,7 +149,7 @@ func TestMaintenanceSnapshotErrorInflight(t *testing.T) { clus.Members[0].Stop(t) dpath := filepath.Join(clus.Members[0].DataDir, "member", "snap", "db") b := backend.NewDefaultBackend(dpath) - s := mvcc.NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, nil, mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32}) + s := mvcc.NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32}) rev := 100000 for i := 2; i <= rev; i++ { s.Put([]byte(fmt.Sprintf("%10d", i)), bytes.Repeat([]byte("a"), 1024), lease.NoLease) diff --git a/tests/integration/v3_alarm_test.go b/tests/integration/v3_alarm_test.go index 9f1f744f885..0151dc27fcb 100644 --- a/tests/integration/v3_alarm_test.go +++ b/tests/integration/v3_alarm_test.go @@ -25,7 +25,6 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/pkg/v3/traceutil" - "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/mvcc" "go.etcd.io/etcd/server/v3/mvcc/backend" "go.uber.org/zap/zaptest" @@ -167,7 +166,7 @@ func TestV3CorruptAlarm(t *testing.T) { clus.Members[0].Stop(t) fp := filepath.Join(clus.Members[0].DataDir, "member", "snap", "db") be := backend.NewDefaultBackend(fp) - s := mvcc.NewStore(zaptest.NewLogger(t), be, nil, cindex.NewFakeConsistentIndex(13), mvcc.StoreConfig{}) + s := mvcc.NewStore(zaptest.NewLogger(t), be, nil, mvcc.StoreConfig{}) // NOTE: cluster_proxy mode with namespacing won't set 'k', but namespace/'k'. s.Put([]byte("abc"), []byte("def"), 0) s.Put([]byte("xyz"), []byte("123"), 0) diff --git a/tools/benchmark/cmd/mvcc.go b/tools/benchmark/cmd/mvcc.go index 5bdef979751..fd7d6aee010 100644 --- a/tools/benchmark/cmd/mvcc.go +++ b/tools/benchmark/cmd/mvcc.go @@ -38,7 +38,7 @@ func initMVCC() { bcfg := backend.DefaultBackendConfig() bcfg.Path, bcfg.BatchInterval, bcfg.BatchLimit = "mvcc-bench", time.Duration(batchInterval)*time.Millisecond, batchLimit be := backend.New(bcfg) - s = mvcc.NewStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, mvcc.StoreConfig{}) + s = mvcc.NewStore(zap.NewExample(), be, &lease.FakeLessor{}, mvcc.StoreConfig{}) os.Remove("mvcc-bench") // boltDB has an opened fd, so removing the file is ok } From f1123d696c9f772119c04b9dee72b5b1e050f57c Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Tue, 4 May 2021 16:51:59 +0200 Subject: [PATCH 8/8] fixup! Unify shared code (and constants) with cindex package. --- etcdctl/ctlv3/command/migrate_command.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etcdctl/ctlv3/command/migrate_command.go b/etcdctl/ctlv3/command/migrate_command.go index deba3c2ffba..deb23fb4c2b 100644 --- a/etcdctl/ctlv3/command/migrate_command.go +++ b/etcdctl/ctlv3/command/migrate_command.go @@ -92,7 +92,7 @@ func migrateCommandFunc(cmd *cobra.Command, args []string) { }() readKeys(reader, be) - cindex.UpdateConsistentIndex(be.BatchTx(), index) + cindex.UpdateConsistentIndex(be.BatchTx(), index, true) err := <-errc if err != nil { fmt.Println("failed to transform keys")