diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index e925f47d450..a6e46761ac4 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -1771,7 +1771,7 @@ func (s *EtcdServer) apply( // set the consistent index of current executing entry if e.Index > s.consistIndex.ConsistentIndex() { - s.consistIndex.SetConsistentIndex(e.Index, e.Term) + s.setUnlockCallback(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } @@ -1795,13 +1795,21 @@ func (s *EtcdServer) apply( return appliedt, appliedi, shouldStop } +func (s *EtcdServer) setUnlockCallback(index, term uint64) { + if s.be != nil { + s.be.SetOnPreUnlockFunc(func() { + s.consistIndex.SetConsistentIndex(index, term) + }) + } +} + // applyEntryNormal applies an EntryNormal type raftpb request to the EtcdServer func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { shouldApplyV3 := membership.ApplyV2storeOnly index := s.consistIndex.ConsistentIndex() if e.Index > index { // set the consistent index of current executing entry - s.consistIndex.SetConsistentIndex(e.Index, e.Term) + s.setUnlockCallback(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } s.lg.Debug("apply entry normal", diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 07636c6e468..1e048de5f81 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -686,10 +686,8 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { }} _, appliedi, _ := srv.apply(ents, &raftpb.ConfState{}) + assert.Equal(t, uint64(2), appliedi) consistIndex := srv.consistIndex.ConsistentIndex() - if consistIndex != appliedi { - t.Fatalf("consistIndex = %v, want %v", consistIndex, appliedi) - } t.Run("verify-backend", func(t *testing.T) { tx := be.BatchTx() @@ -698,9 +696,9 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { srv.beHooks.OnPreCommitUnsafe(tx) assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *schema.UnsafeConfStateFromBackend(lg, tx)) }) - rindex, rterm := schema.ReadConsistentIndex(be.BatchTx()) + rindex, _ := schema.ReadConsistentIndex(be.BatchTx()) assert.Equal(t, consistIndex, rindex) - assert.Equal(t, uint64(4), rterm) + } func realisticRaftNode(lg *zap.Logger) *raftNode { diff --git a/server/storage/backend/backend.go b/server/storage/backend/backend.go index c558ecacd68..b95ec68322a 100644 --- a/server/storage/backend/backend.go +++ b/server/storage/backend/backend.go @@ -67,6 +67,10 @@ type Backend interface { Defrag() error ForceCommit() Close() error + + // SetOnPreUnlockFunc sets a callback function which is called before unlocking the batchTx. + SetOnPreUnlockFunc(func()) + OnPreUnlock() } type Snapshot interface { @@ -119,9 +123,27 @@ type backend struct { hooks Hooks + muUnlock sync.Mutex + onUnlock func() + lg *zap.Logger } +func (b *backend) SetOnPreUnlockFunc(f func()) { + b.muUnlock.Lock() + defer b.muUnlock.Unlock() + b.onUnlock = f +} + +func (b *backend) OnPreUnlock() { + b.muUnlock.Lock() + defer b.muUnlock.Unlock() + if b.onUnlock != nil { + b.onUnlock() + b.onUnlock = nil + } +} + type BackendConfig struct { // Path is the file path to the backend file. Path string diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index b2b0ad7cbf0..fc74d533599 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -68,6 +68,11 @@ func (t *batchTx) Lock() { } func (t *batchTx) Unlock() { + t.backend.OnPreUnlock() + t.SimpleUnlock() +} + +func (t *batchTx) SimpleUnlock() { if t.pending >= t.backend.batchLimit { t.commit(false) } @@ -290,16 +295,28 @@ func (t *batchTxBuffered) Unlock() { t.batchTx.Unlock() } +func (t *batchTxBuffered) SimpleUnlock() { + if t.pending != 0 { + t.backend.readTx.Lock() // blocks txReadBuffer for writing. + t.buf.writeback(&t.backend.readTx.buf) + t.backend.readTx.Unlock() + if t.pending >= t.backend.batchLimit { + t.commit(false) + } + } + t.batchTx.SimpleUnlock() +} + func (t *batchTxBuffered) Commit() { t.Lock() t.commit(false) - t.Unlock() + t.SimpleUnlock() } func (t *batchTxBuffered) CommitAndStop() { t.Lock() t.commit(true) - t.Unlock() + t.SimpleUnlock() } func (t *batchTxBuffered) commit(stop bool) { diff --git a/server/storage/mvcc/kvstore_test.go b/server/storage/mvcc/kvstore_test.go index c200c0d9273..500822daf1a 100644 --- a/server/storage/mvcc/kvstore_test.go +++ b/server/storage/mvcc/kvstore_test.go @@ -916,6 +916,8 @@ func (b *fakeBackend) Snapshot() backend.Snapshot func (b *fakeBackend) ForceCommit() {} func (b *fakeBackend) Defrag() error { return nil } func (b *fakeBackend) Close() error { return nil } +func (b *fakeBackend) SetOnPreUnlockFunc(func()) {} +func (b *fakeBackend) OnPreUnlock() {} type indexGetResp struct { rev revision