Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move consistent_index forward when executing alarmList operation #14419

Merged
merged 1 commit into from
Sep 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1840,7 +1840,6 @@ func (s *EtcdServer) apply(
// applyEntryNormal applies an EntryNormal type raftpb request to the EtcdServer
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
shouldApplyV3 := membership.ApplyV2storeOnly
applyV3Performed := false
var ar *apply.Result
index := s.consistIndex.ConsistentIndex()
if e.Index > index {
Expand All @@ -1850,7 +1849,8 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
defer func() {
// The txPostLockInsideApplyHook will not get called in some cases,
// in which we should move the consistent index forward directly.
if !applyV3Performed || (ar != nil && ar.Err != nil) {
newIndex := s.consistIndex.ConsistentIndex()
if newIndex < e.Index {
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
}
}()
Expand Down Expand Up @@ -1903,7 +1903,6 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
if !needResult && raftReq.Txn != nil {
removeNeedlessRangeReqs(raftReq.Txn)
}
applyV3Performed = true
ar = s.uberApply.Apply(&raftReq, shouldApplyV3)
}

Expand Down
25 changes: 25 additions & 0 deletions tests/common/alarm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,28 @@ func TestAlarm(t *testing.T) {
}
})
}

func TestAlarmlistOnMemberRestart(t *testing.T) {
testRunner.BeforeTest(t)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
clus := testRunner.NewCluster(ctx, t, config.ClusterConfig{
ClusterSize: 1,
QuotaBackendBytes: int64(13 * os.Getpagesize()),
SnapshotCount: 5,
})
defer clus.Close()

testutils.ExecuteUntil(ctx, t, func() {
for i := 0; i < 6; i++ {
if _, err := clus.Client().AlarmList(ctx); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}

clus.Members()[0].Stop()
if err := clus.Members()[0].Start(ctx); err != nil {
t.Fatalf("failed to start etcdserver: %v", err)
}
})
}
1 change: 1 addition & 0 deletions tests/framework/config/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ type ClusterConfig struct {
ClientTLS TLSConfig
QuotaBackendBytes int64
DisableStrictReconfigCheck bool
SnapshotCount int
}
3 changes: 2 additions & 1 deletion tests/framework/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (e e2eRunner) NewCluster(ctx context.Context, t testing.TB, cfg config.Clus
ClusterSize: cfg.ClusterSize,
QuotaBackendBytes: cfg.QuotaBackendBytes,
DisableStrictReconfigCheck: cfg.DisableStrictReconfigCheck,
SnapshotCount: cfg.SnapshotCount,
}
switch cfg.ClientTLS {
case config.NoTLS:
Expand Down Expand Up @@ -175,7 +176,7 @@ func (m e2eMember) Client() Client {
}

func (m e2eMember) Start(ctx context.Context) error {
return m.Restart(ctx)
return m.EtcdProcess.Start(ctx)
}

func (m e2eMember) Stop() {
Expand Down
4 changes: 2 additions & 2 deletions tests/framework/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func (ep *EtcdServerProcess) EndpointsV3() []string { return ep.EndpointsV2
func (ep *EtcdServerProcess) EndpointsMetrics() []string { return []string{ep.cfg.Murl} }

func (ep *EtcdServerProcess) Start(ctx context.Context) error {
ep.donec = make(chan struct{})
if ep.proc != nil {
panic("already started")
}
Expand All @@ -121,7 +122,6 @@ func (ep *EtcdServerProcess) Restart(ctx context.Context) error {
if err := ep.Stop(); err != nil {
return err
}
ep.donec = make(chan struct{})
err := ep.Start(ctx)
if err == nil {
ep.cfg.lg.Info("restarted server", zap.String("name", ep.cfg.Name))
Expand All @@ -135,10 +135,10 @@ func (ep *EtcdServerProcess) Stop() (err error) {
return nil
}
err = ep.proc.Stop()
ep.proc = nil
if err != nil {
return err
}
ep.proc = nil
<-ep.donec
ep.donec = make(chan struct{})
if ep.cfg.Purl.Scheme == "unix" || ep.cfg.Purl.Scheme == "unixs" {
Expand Down
10 changes: 6 additions & 4 deletions tests/framework/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ func (e integrationRunner) BeforeTest(t testing.TB) {

func (e integrationRunner) NewCluster(ctx context.Context, t testing.TB, cfg config.ClusterConfig) Cluster {
var err error
var integrationCfg integration.ClusterConfig
integrationCfg.Size = cfg.ClusterSize
integrationCfg := integration.ClusterConfig{
Size: cfg.ClusterSize,
QuotaBackendBytes: cfg.QuotaBackendBytes,
DisableStrictReconfigCheck: cfg.DisableStrictReconfigCheck,
SnapshotCount: uint64(cfg.SnapshotCount),
}
integrationCfg.ClientTLS, err = tlsInfo(t, cfg.ClientTLS)
integrationCfg.QuotaBackendBytes = cfg.QuotaBackendBytes
integrationCfg.DisableStrictReconfigCheck = cfg.DisableStrictReconfigCheck
if err != nil {
t.Fatalf("ClientTLS: %s", err)
}
Expand Down