Skip to content

Commit

Permalink
move consistent_index forward when executing alarmList operation
Browse files Browse the repository at this point in the history
The alarm list is the only exception that doesn't move consistent_index
forward. The reproduction steps are as simple as,

```
etcd --snapshot-count=5 &
for i in {1..6}; do etcdctl  alarm list; done
kill -9 <etcd_pid>
etcd
```

Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Sep 5, 2022
1 parent 5707147 commit c2ed9ec
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 9 deletions.
5 changes: 2 additions & 3 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1840,7 +1840,6 @@ func (s *EtcdServer) apply(
// applyEntryNormal applies an EntryNormal type raftpb request to the EtcdServer
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
shouldApplyV3 := membership.ApplyV2storeOnly
applyV3Performed := false
var ar *apply.Result
index := s.consistIndex.ConsistentIndex()
if e.Index > index {
Expand All @@ -1850,7 +1849,8 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
defer func() {
// The txPostLockInsideApplyHook will not get called in some cases,
// in which we should move the consistent index forward directly.
if !applyV3Performed || (ar != nil && ar.Err != nil) {
newIndex := s.consistIndex.ConsistentIndex()
if newIndex < e.Index {
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
}
}()
Expand Down Expand Up @@ -1903,7 +1903,6 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
if !needResult && raftReq.Txn != nil {
removeNeedlessRangeReqs(raftReq.Txn)
}
applyV3Performed = true
ar = s.uberApply.Apply(&raftReq, shouldApplyV3)
}

Expand Down
25 changes: 25 additions & 0 deletions tests/common/alarm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,28 @@ func TestAlarm(t *testing.T) {
}
})
}

func TestAlarmlistOnMemberRestart(t *testing.T) {
testRunner.BeforeTest(t)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
clus := testRunner.NewCluster(ctx, t, config.ClusterConfig{
ClusterSize: 1,
QuotaBackendBytes: int64(13 * os.Getpagesize()),
SnapshotCount: 5,
})
defer clus.Close()

testutils.ExecuteUntil(ctx, t, func() {
for i := 0; i < 6; i++ {
if _, err := clus.Client().AlarmList(ctx); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}

clus.Members()[0].Stop()
if err := clus.Members()[0].Start(ctx); err != nil {
t.Fatalf("failed to start etcdserver: %v", err)
}
})
}
1 change: 1 addition & 0 deletions tests/framework/config/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ type ClusterConfig struct {
PeerTLS TLSConfig
ClientTLS TLSConfig
QuotaBackendBytes int64
SnapshotCount int
}
3 changes: 2 additions & 1 deletion tests/framework/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func (e e2eRunner) NewCluster(ctx context.Context, t testing.TB, cfg config.Clus
InitialToken: "new",
ClusterSize: cfg.ClusterSize,
QuotaBackendBytes: cfg.QuotaBackendBytes,
SnapshotCount: cfg.SnapshotCount,
}
switch cfg.ClientTLS {
case config.NoTLS:
Expand Down Expand Up @@ -174,7 +175,7 @@ func (m e2eMember) Client() Client {
}

func (m e2eMember) Start(ctx context.Context) error {
return m.Restart(ctx)
return m.EtcdProcess.Start(ctx)
}

func (m e2eMember) Stop() {
Expand Down
4 changes: 2 additions & 2 deletions tests/framework/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func (ep *EtcdServerProcess) EndpointsV3() []string { return ep.EndpointsV2
func (ep *EtcdServerProcess) EndpointsMetrics() []string { return []string{ep.cfg.Murl} }

func (ep *EtcdServerProcess) Start(ctx context.Context) error {
ep.donec = make(chan struct{})
if ep.proc != nil {
panic("already started")
}
Expand All @@ -121,7 +122,6 @@ func (ep *EtcdServerProcess) Restart(ctx context.Context) error {
if err := ep.Stop(); err != nil {
return err
}
ep.donec = make(chan struct{})
err := ep.Start(ctx)
if err == nil {
ep.cfg.lg.Info("restarted server", zap.String("name", ep.cfg.Name))
Expand All @@ -135,10 +135,10 @@ func (ep *EtcdServerProcess) Stop() (err error) {
return nil
}
err = ep.proc.Stop()
ep.proc = nil
if err != nil {
return err
}
ep.proc = nil
<-ep.donec
ep.donec = make(chan struct{})
if ep.cfg.Purl.Scheme == "unix" || ep.cfg.Purl.Scheme == "unixs" {
Expand Down
8 changes: 5 additions & 3 deletions tests/framework/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@ func (e integrationRunner) BeforeTest(t testing.TB) {

func (e integrationRunner) NewCluster(ctx context.Context, t testing.TB, cfg config.ClusterConfig) Cluster {
var err error
var integrationCfg integration.ClusterConfig
integrationCfg.Size = cfg.ClusterSize
integrationCfg := integration.ClusterConfig{
Size: cfg.ClusterSize,
QuotaBackendBytes: cfg.QuotaBackendBytes,
SnapshotCount: uint64(cfg.SnapshotCount),
}
integrationCfg.ClientTLS, err = tlsInfo(t, cfg.ClientTLS)
integrationCfg.QuotaBackendBytes = cfg.QuotaBackendBytes
if err != nil {
t.Fatalf("ClientTLS: %s", err)
}
Expand Down

0 comments on commit c2ed9ec

Please sign in to comment.