diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 99a2159d9932..5f039c52481b 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -1840,7 +1840,6 @@ func (s *EtcdServer) apply( // applyEntryNormal applies an EntryNormal type raftpb request to the EtcdServer func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { shouldApplyV3 := membership.ApplyV2storeOnly - applyV3Performed := false var ar *apply.Result index := s.consistIndex.ConsistentIndex() if e.Index > index { @@ -1850,7 +1849,8 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { defer func() { // The txPostLockInsideApplyHook will not get called in some cases, // in which we should move the consistent index forward directly. - if !applyV3Performed || (ar != nil && ar.Err != nil) { + newIndex := s.consistIndex.ConsistentIndex() + if newIndex < e.Index { s.consistIndex.SetConsistentIndex(e.Index, e.Term) } }() @@ -1903,7 +1903,6 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { if !needResult && raftReq.Txn != nil { removeNeedlessRangeReqs(raftReq.Txn) } - applyV3Performed = true ar = s.uberApply.Apply(&raftReq, shouldApplyV3) } diff --git a/tests/common/alarm_test.go b/tests/common/alarm_test.go index 11784fc3b9f2..d9218277d4b6 100644 --- a/tests/common/alarm_test.go +++ b/tests/common/alarm_test.go @@ -104,3 +104,28 @@ func TestAlarm(t *testing.T) { } }) } + +func TestAlarmlistOnMemberRestart(t *testing.T) { + testRunner.BeforeTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + clus := testRunner.NewCluster(ctx, t, config.ClusterConfig{ + ClusterSize: 1, + QuotaBackendBytes: int64(13 * os.Getpagesize()), + SnapshotCount: 5, + }) + defer clus.Close() + + testutils.ExecuteUntil(ctx, t, func() { + for i := 0; i < 6; i++ { + if _, err := clus.Client().AlarmList(ctx); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + + clus.Members()[0].Stop() + if err := clus.Members()[0].Start(ctx); err != nil { + t.Fatalf("failed to start etcdserver: %v", err) + } + }) +} diff --git a/tests/framework/config/cluster.go b/tests/framework/config/cluster.go index 22eeaae12cd6..daae23b66b85 100644 --- a/tests/framework/config/cluster.go +++ b/tests/framework/config/cluster.go @@ -32,4 +32,5 @@ type ClusterConfig struct { ClientTLS TLSConfig QuotaBackendBytes int64 DisableStrictReconfigCheck bool + SnapshotCount int } diff --git a/tests/framework/e2e.go b/tests/framework/e2e.go index e113249b641e..fa8af0a45240 100644 --- a/tests/framework/e2e.go +++ b/tests/framework/e2e.go @@ -47,6 +47,7 @@ func (e e2eRunner) NewCluster(ctx context.Context, t testing.TB, cfg config.Clus ClusterSize: cfg.ClusterSize, QuotaBackendBytes: cfg.QuotaBackendBytes, DisableStrictReconfigCheck: cfg.DisableStrictReconfigCheck, + SnapshotCount: cfg.SnapshotCount, } switch cfg.ClientTLS { case config.NoTLS: @@ -175,7 +176,7 @@ func (m e2eMember) Client() Client { } func (m e2eMember) Start(ctx context.Context) error { - return m.Restart(ctx) + return m.EtcdProcess.Start(ctx) } func (m e2eMember) Stop() { diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index ede5f3bb8a4a..197f3f4bd5ab 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -100,6 +100,7 @@ func (ep *EtcdServerProcess) EndpointsV3() []string { return ep.EndpointsV2 func (ep *EtcdServerProcess) EndpointsMetrics() []string { return []string{ep.cfg.Murl} } func (ep *EtcdServerProcess) Start(ctx context.Context) error { + ep.donec = make(chan struct{}) if ep.proc != nil { panic("already started") } @@ -121,7 +122,6 @@ func (ep *EtcdServerProcess) Restart(ctx context.Context) error { if err := ep.Stop(); err != nil { return err } - ep.donec = make(chan struct{}) err := ep.Start(ctx) if err == nil { ep.cfg.lg.Info("restarted server", zap.String("name", ep.cfg.Name)) @@ -135,10 +135,10 @@ func (ep *EtcdServerProcess) Stop() (err error) { return nil } err = ep.proc.Stop() + ep.proc = nil if err != nil { return err } - ep.proc = nil <-ep.donec ep.donec = make(chan struct{}) if ep.cfg.Purl.Scheme == "unix" || ep.cfg.Purl.Scheme == "unixs" { diff --git a/tests/framework/integration.go b/tests/framework/integration.go index 05b3bceb9d6f..84de68d4d488 100644 --- a/tests/framework/integration.go +++ b/tests/framework/integration.go @@ -44,11 +44,13 @@ func (e integrationRunner) BeforeTest(t testing.TB) { func (e integrationRunner) NewCluster(ctx context.Context, t testing.TB, cfg config.ClusterConfig) Cluster { var err error - var integrationCfg integration.ClusterConfig - integrationCfg.Size = cfg.ClusterSize + integrationCfg := integration.ClusterConfig{ + Size: cfg.ClusterSize, + QuotaBackendBytes: cfg.QuotaBackendBytes, + DisableStrictReconfigCheck: cfg.DisableStrictReconfigCheck, + SnapshotCount: uint64(cfg.SnapshotCount), + } integrationCfg.ClientTLS, err = tlsInfo(t, cfg.ClientTLS) - integrationCfg.QuotaBackendBytes = cfg.QuotaBackendBytes - integrationCfg.DisableStrictReconfigCheck = cfg.DisableStrictReconfigCheck if err != nil { t.Fatalf("ClientTLS: %s", err) }