Skip to content

Commit

Permalink
Simplify KVStore interaction with cindex thanks to hooks.
Browse files Browse the repository at this point in the history
  • Loading branch information
ptabor committed May 4, 2021
1 parent fe3254a commit 2dbecea
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 103 deletions.
2 changes: 1 addition & 1 deletion server/etcdserver/api/v3rpc/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
)

type KVGetter interface {
KV() mvcc.ConsistentWatchableKV
KV() mvcc.WatchableKV
}

type BackendGetter interface {
Expand Down
78 changes: 50 additions & 28 deletions server/etcdserver/cindex/cindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ var (
ConsistentIndexKeyName = []byte("consistent_index")
)

type Backend interface {
BatchTx() backend.BatchTx
}

// ConsistentIndexer is an interface that wraps the Get/Set/Save method for consistentIndex.
type ConsistentIndexer interface {

Expand All @@ -41,32 +45,38 @@ type ConsistentIndexer interface {
// It saves consistentIndex to the underlying stable storage.
UnsafeSave(tx backend.BatchTx)

// SetBatchTx set the available backend.BatchTx for ConsistentIndexer.
SetBatchTx(tx backend.BatchTx)
// SetBackend set the available backend.BatchTx for ConsistentIndexer.
SetBackend(be Backend)
}

// consistentIndex implements the ConsistentIndexer interface.
type consistentIndex struct {
tx backend.BatchTx
// consistentIndex represents the offset of an entry in a consistent replica log.
// it caches the "consistent_index" key's value. Accessed
// through atomics so must be 64-bit aligned.
// it caches the "consistent_index" key's value.
// Accessed through atomics so must be 64-bit aligned.
consistentIndex uint64
mutex sync.Mutex

// be is used for initial read consistentIndex
be Backend
// mutex is protecting be.
mutex sync.Mutex
}

func NewConsistentIndex(tx backend.BatchTx) ConsistentIndexer {
return &consistentIndex{tx: tx}
// NewConsistentIndex creates a new consistent index.
// If `be` is nil, it must be set (SetBackend) before first access using `ConsistentIndex()`.
func NewConsistentIndex(be Backend) ConsistentIndexer {
return &consistentIndex{be: be}
}

func (ci *consistentIndex) ConsistentIndex() uint64 {

if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 {
return index
}
ci.mutex.Lock()
defer ci.mutex.Unlock()
v := ReadConsistentIndex(ci.tx)

v := ReadConsistentIndex(ci.be.BatchTx())
atomic.StoreUint64(&ci.consistentIndex, v)
return v
}

Expand All @@ -76,18 +86,15 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64) {

func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
index := atomic.LoadUint64(&ci.consistentIndex)

if index == 0 {
// Never save 0 as it means that we didn't loaded the real index yet.
return
}
unsafeUpdateConsistentIndex(tx, index)
UnsafeUpdateConsistentIndex(tx, index, true)

This comment has been minimized.

Copy link
@serathius

serathius Apr 7, 2022

Member

This PR introduces merges CI update logic from apply and migrator. The only grow check introduced for migrator in #5391, was also enabled in apply. This is however not correct for apply as the UnsafeUpdateConsistentIndex will just exit as it would succeed, so the apply will be executed without updating the CI.

}

func (ci *consistentIndex) SetBatchTx(tx backend.BatchTx) {
func (ci *consistentIndex) SetBackend(be Backend) {
ci.mutex.Lock()
defer ci.mutex.Unlock()
ci.tx = tx
ci.be = be
// After the backend is changed, the first access should re-read it.
ci.SetConsistentIndex(0)
}

func NewFakeConsistentIndex(index uint64) ConsistentIndexer {
Expand All @@ -102,13 +109,21 @@ func (f *fakeConsistentIndex) SetConsistentIndex(index uint64) {
atomic.StoreUint64(&f.index, index)
}

func (f *fakeConsistentIndex) UnsafeSave(tx backend.BatchTx) {}
func (f *fakeConsistentIndex) SetBatchTx(tx backend.BatchTx) {}
func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
func (f *fakeConsistentIndex) SetBackend(_ Backend) {}

// UnsafeCreateMetaBucket creates the `meta` bucket (if it does not exists yet).
func UnsafeCreateMetaBucket(tx backend.BatchTx) {
tx.UnsafeCreateBucket(MetaBucketName)
}

// CreateMetaBucket creates the `meta` bucket (if it does not exists yet).
func CreateMetaBucket(tx backend.BatchTx) {
tx.Lock()
defer tx.Unlock()
tx.UnsafeCreateBucket(MetaBucketName)
}

// unsafeGetConsistentIndex loads consistent index from given transaction.
// returns 0 if the data are not found.
func unsafeReadConsistentIndex(tx backend.ReadTx) uint64 {
Expand All @@ -128,21 +143,28 @@ func ReadConsistentIndex(tx backend.ReadTx) uint64 {
return unsafeReadConsistentIndex(tx)
}

func unsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64) {
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, onlyGrow bool) {
if index == 0 {
// Never save 0 as it means that we didn't loaded the real index yet.
return
}

if onlyGrow {
oldi := unsafeReadConsistentIndex(tx)
if index <= oldi {
return
}
}

bs := make([]byte, 8) // this is kept on stack (not heap) so its quick.
binary.BigEndian.PutUint64(bs, index)
// put the index into the underlying backend
// tx has been locked in TxnBegin, so there is no need to lock it again
tx.UnsafePut(MetaBucketName, ConsistentIndexKeyName, bs)
}

func UpdateConsistentIndex(tx backend.BatchTx, index uint64) {
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, onlyGrow bool) {
tx.Lock()
defer tx.Unlock()

oldi := unsafeReadConsistentIndex(tx)
if index <= oldi {
return
}
unsafeUpdateConsistentIndex(tx, index)
UnsafeUpdateConsistentIndex(tx, index, onlyGrow)
}
8 changes: 4 additions & 4 deletions server/etcdserver/cindex/cindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ import (
func TestConsistentIndex(t *testing.T) {

be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
ci := NewConsistentIndex(be.BatchTx())
ci := NewConsistentIndex(be)

tx := be.BatchTx()
if tx == nil {
t.Fatal("batch tx is nil")
}
tx.Lock()

UnsafeCreateMetaBucket(tx)
tx.Unlock()
be.ForceCommit()
Expand All @@ -51,14 +52,13 @@ func TestConsistentIndex(t *testing.T) {

b := backend.NewDefaultBackend(tmpPath)
defer b.Close()
ci.SetConsistentIndex(0)
ci.SetBatchTx(b.BatchTx())
ci.SetBackend(b)
index = ci.ConsistentIndex()
if index != r {
t.Errorf("expected %d,got %d", r, index)
}

ci = NewConsistentIndex(b.BatchTx())
ci = NewConsistentIndex(b)
index = ci.ConsistentIndex()
if index != r {
t.Errorf("expected %d,got %d", r, index)
Expand Down
7 changes: 3 additions & 4 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ type EtcdServer struct {
applyV3Internal applierV3Internal
applyWait wait.WaitTime

kv mvcc.ConsistentWatchableKV
kv mvcc.WatchableKV
lessor lease.Lessor
bemu sync.Mutex
be backend.Backend
Expand Down Expand Up @@ -1210,8 +1210,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
lg.Panic("failed to restore mvcc store", zap.Error(err))
}

s.consistIndex.SetConsistentIndex(s.kv.ConsistentIndex())
lg.Info("restored mvcc store")
lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))

// Closing old backend might block until all the txns
// on the backend are finished.
Expand Down Expand Up @@ -2522,7 +2521,7 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
}
}

func (s *EtcdServer) KV() mvcc.ConsistentWatchableKV { return s.kv }
func (s *EtcdServer) KV() mvcc.WatchableKV { return s.kv }
func (s *EtcdServer) Backend() backend.Backend {
s.bemu.Lock()
defer s.bemu.Unlock()
Expand Down
11 changes: 2 additions & 9 deletions server/lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
}

le.leaseMap[id] = l
l.persistTo(le.b, le.ci)
l.persistTo(le.b)

leaseTotalTTLs.Observe(float64(l.ttl))
leaseGranted.Inc()
Expand Down Expand Up @@ -341,10 +341,6 @@ func (le *lessor) Revoke(id LeaseID) error {
// kv deletion. Or we might end up with not executing the revoke or not
// deleting the keys if etcdserver fails in between.
le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID)))
// if len(keys) > 0, txn.End() will call ci.UnsafeSave function.
if le.ci != nil && len(keys) == 0 {
le.ci.UnsafeSave(le.b.BatchTx())
}

txn.End()

Expand Down Expand Up @@ -828,7 +824,7 @@ func (l *Lease) expired() bool {
return l.Remaining() <= 0
}

func (l *Lease) persistTo(b backend.Backend, ci cindex.ConsistentIndexer) {
func (l *Lease) persistTo(b backend.Backend) {
key := int64ToBytes(int64(l.ID))

lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL}
Expand All @@ -839,9 +835,6 @@ func (l *Lease) persistTo(b backend.Backend, ci cindex.ConsistentIndexer) {

b.BatchTx().Lock()
b.BatchTx().UnsafePut(leaseBucketName, key, val)
if ci != nil {
ci.UnsafeSave(b.BatchTx())
}
b.BatchTx().Unlock()
}

Expand Down
11 changes: 0 additions & 11 deletions server/mvcc/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,3 @@ type Watchable interface {
// watch events happened or happening on the KV.
NewWatchStream() WatchStream
}

// ConsistentWatchableKV is a WatchableKV that understands the consistency
// algorithm and consistent index.
// If the consistent index of executing entry is not larger than the
// consistent index of ConsistentWatchableKV, all operations in
// this entry are skipped and return empty response.
type ConsistentWatchableKV interface {
WatchableKV
// ConsistentIndex returns the current consistent index of the KV.
ConsistentIndex() uint64
}
29 changes: 2 additions & 27 deletions server/mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ type store struct {
// mu read locks for txns and write locks for non-txn store changes.
mu sync.RWMutex

ci cindex.ConsistentIndexer

b backend.Backend
kvindex index

Expand All @@ -94,7 +92,7 @@ type store struct {

// NewStore returns a new store. It is useful to create a store inside
// mvcc pkg. It should only be used for testing externally.
func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) *store {
func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *store {
if lg == nil {
lg = zap.NewNop()
}
Expand All @@ -104,7 +102,6 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.Cons
s := &store{
cfg: cfg,
b: b,
ci: ci,
kvindex: newTreeIndex(lg),

le: le,
Expand Down Expand Up @@ -314,11 +311,6 @@ func init() {
func (s *store) Commit() {
s.mu.Lock()
defer s.mu.Unlock()

tx := s.b.BatchTx()
tx.Lock()
s.saveIndex(tx)
tx.Unlock()
s.b.ForceCommit()
}

Expand All @@ -342,8 +334,6 @@ func (s *store) Restore(b backend.Backend) error {

s.fifoSched = schedule.NewFIFOScheduler()
s.stopc = make(chan struct{})
s.ci.SetBatchTx(b.BatchTx())
s.ci.SetConsistentIndex(0)

return s.restore()
}
Expand Down Expand Up @@ -436,9 +426,7 @@ func (s *store) restore() error {

tx.Unlock()

s.lg.Info("kvstore restored",
zap.Uint64("consistent-index", s.ConsistentIndex()),
zap.Int64("current-rev", s.currentRev))
s.lg.Info("kvstore restored", zap.Int64("current-rev", s.currentRev))

if scheduledCompact != 0 {
if _, err := s.compactLockfree(scheduledCompact); err != nil {
Expand Down Expand Up @@ -533,19 +521,6 @@ func (s *store) Close() error {
return nil
}

func (s *store) saveIndex(tx backend.BatchTx) {
if s.ci != nil {
s.ci.UnsafeSave(tx)
}
}

func (s *store) ConsistentIndex() uint64 {
if s.ci != nil {
return s.ci.ConsistentIndex()
}
return 0
}

func (s *store) setupMetricsReporter() {
b := s.b
reportDbTotalSizeInBytesMu.Lock()
Expand Down
Loading

0 comments on commit 2dbecea

Please sign in to comment.