Skip to content

Commit

Permalink
Merge pull request etcd-io#12795 from wpedrak/resend-read-index-on-fi…
Browse files Browse the repository at this point in the history
…rst-commit-in-term

etcdserver: resend ReadIndex request on empty apply request
  • Loading branch information
ptabor authored Apr 9, 2021
2 parents bad0b4d + 08ea9cb commit 7f97dfd
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 21 deletions.
56 changes: 36 additions & 20 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,6 @@ type EtcdServer struct {
peerRt http.RoundTripper
reqIDGen *idutil.Generator

// forceVersionC is used to force the version monitor loop
// to detect the cluster version immediately.
forceVersionC chan struct{}

// wgMu blocks concurrent waitgroup mutation while server stopping
wgMu sync.RWMutex
// wg is used to wait for the goroutines that depends on the server state
Expand All @@ -291,6 +287,9 @@ type EtcdServer struct {
leadTimeMu sync.RWMutex
leadElectedTime time.Time

firstCommitInTermMu sync.RWMutex
firstCommitInTermC chan struct{}

*AccessController
}

Expand Down Expand Up @@ -517,17 +516,17 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
storage: NewStorage(w, ss),
},
),
id: id,
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
cluster: cl,
stats: sstats,
lstats: lstats,
SyncTicker: time.NewTicker(500 * time.Millisecond),
peerRt: prt,
reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
forceVersionC: make(chan struct{}),
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
consistIndex: cindex.NewConsistentIndex(be.BatchTx()),
id: id,
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
cluster: cl,
stats: sstats,
lstats: lstats,
SyncTicker: time.NewTicker(500 * time.Millisecond),
peerRt: prt,
reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
consistIndex: cindex.NewConsistentIndex(be.BatchTx()),
firstCommitInTermC: make(chan struct{}),
}
serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1)

Expand Down Expand Up @@ -1770,6 +1769,16 @@ func (s *EtcdServer) LeaderChangedNotify() <-chan struct{} {
return s.leaderChanged
}

// FirstCommitInTermNotify returns channel that will be unlocked on first
// entry committed in new term, which is necessary for new leader to answer
// read-only requests (leader is not able to respond any read-only requests
// as long as linearizable semantic is required)
func (s *EtcdServer) FirstCommitInTermNotify() <-chan struct{} {
s.firstCommitInTermMu.RLock()
defer s.firstCommitInTermMu.RUnlock()
return s.firstCommitInTermC
}

// RaftStatusGetter represents etcd server and Raft progress.
type RaftStatusGetter interface {
ID() types.ID
Expand Down Expand Up @@ -2068,10 +2077,8 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
// raft state machine may generate noop entry when leader confirmation.
// skip it in advance to avoid some potential bug in the future
if len(e.Data) == 0 {
select {
case s.forceVersionC <- struct{}{}:
default:
}
s.notifyAboutFirstCommitInTerm()

// promote lessor when the local member is leader and finished
// applying all entries from the last term.
if s.isLeader() {
Expand Down Expand Up @@ -2140,6 +2147,15 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
})
}

func (s *EtcdServer) notifyAboutFirstCommitInTerm() {
newNotifier := make(chan struct{})
s.firstCommitInTermMu.Lock()
notifierToClose := s.firstCommitInTermC
s.firstCommitInTermC = newNotifier
s.firstCommitInTermMu.Unlock()
close(notifierToClose)
}

// applyConfChange applies a ConfChange to the server. It is only
// invoked with a ConfChange that has already passed through Raft
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) {
Expand Down Expand Up @@ -2319,7 +2335,7 @@ func (s *EtcdServer) ClusterVersion() *semver.Version {
func (s *EtcdServer) monitorVersions() {
for {
select {
case <-s.forceVersionC:
case <-s.FirstCommitInTermNotify():
case <-time.After(monitorVersionInterval):
case <-s.stopping:
return
Expand Down
11 changes: 11 additions & 0 deletions server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,8 @@ func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{},
retryTimer := time.NewTimer(readIndexRetryTime)
defer retryTimer.Stop()

firstCommitInTermNotifier := s.FirstCommitInTermNotify()

for {
select {
case rs := <-s.r.readStateC:
Expand All @@ -800,6 +802,15 @@ func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{},
readIndexFailed.Inc()
// return a retryable error.
return 0, ErrLeaderChanged
case <-firstCommitInTermNotifier:
firstCommitInTermNotifier = s.FirstCommitInTermNotify()
lg.Info("first commit in current term: resending ReadIndex request")
err := s.sendReadIndex(requestId)
if err != nil {
return 0, err
}
retryTimer.Reset(readIndexRetryTime)
continue
case <-retryTimer.C:
lg.Warn(
"waiting for ReadIndex response took too long, retrying",
Expand Down
1 change: 1 addition & 0 deletions tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
go.etcd.io/etcd/server/v3 v3.5.0-alpha.0
go.uber.org/zap v1.16.0
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
google.golang.org/grpc v1.36.1
gopkg.in/yaml.v2 v2.3.0
Expand Down
1 change: 1 addition & 0 deletions tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
82 changes: 81 additions & 1 deletion tests/integration/v3_leadership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ package integration

import (
"context"
"fmt"
"strings"
"testing"
"time"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"golang.org/x/sync/errgroup"
)

func TestMoveLeader(t *testing.T) { testMoveLeader(t, true) }
Expand All @@ -36,7 +38,7 @@ func testMoveLeader(t *testing.T, auto bool) {
oldLeadIdx := clus.WaitLeader(t)
oldLeadID := uint64(clus.Members[oldLeadIdx].s.ID())

// ensure followers go through leader transition while learship transfer
// ensure followers go through leader transition while leadership transfer
idc := make(chan uint64)
stopc := make(chan struct{})
defer close(stopc)
Expand Down Expand Up @@ -179,3 +181,81 @@ func TestTransferLeadershipWithLearner(t *testing.T) {
t.Error("timed out waiting for leader transition")
}
}

func TestFirstCommitNotification(t *testing.T) {
BeforeTest(t)
clusterSize := 3
cluster := NewClusterV3(t, &ClusterConfig{Size: clusterSize})
defer cluster.Terminate(t)

oldLeaderIdx := cluster.WaitLeader(t)
oldLeaderClient := cluster.Client(oldLeaderIdx)

newLeaderIdx := (oldLeaderIdx + 1) % clusterSize
newLeaderId := uint64(cluster.Members[newLeaderIdx].ID())

notifiers := make(map[int]<-chan struct{}, clusterSize)
for i, clusterMember := range cluster.Members {
notifiers[i] = clusterMember.s.FirstCommitInTermNotify()
}

_, err := oldLeaderClient.MoveLeader(context.Background(), newLeaderId)

if err != nil {
t.Errorf("got error during leadership transfer: %v", err)
}

leaderAppliedIndex := cluster.Members[newLeaderIdx].s.AppliedIndex()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

group, groupContext := errgroup.WithContext(ctx)

for i, notifier := range notifiers {
member, notifier := cluster.Members[i], notifier
group.Go(func() error {
return checkFirstCommitNotification(groupContext, member, leaderAppliedIndex, notifier)
})
}

err = group.Wait()
if err != nil {
t.Error(err)
}
}

func checkFirstCommitNotification(
ctx context.Context,
member *member,
leaderAppliedIndex uint64,
notifier <-chan struct{},
) error {
// wait until server applies all the changes of leader
for member.s.AppliedIndex() < leaderAppliedIndex {
select {
case <-ctx.Done():
return ctx.Err()
default:
time.Sleep(100 * time.Millisecond)
}
}

select {
case msg, ok := <-notifier:
if ok {
return fmt.Errorf(
"member with ID %d got message via notifier, msg: %v",
member.ID(),
msg,
)
}
default:
return fmt.Errorf(
"notification was not triggered, member ID: %d",
member.ID(),
)
}

return nil
}

0 comments on commit 7f97dfd

Please sign in to comment.