From cc840336f0f36139e49eb69c4b53340492cd6803 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Sat, 3 Sep 2022 03:03:09 +0800 Subject: [PATCH] move consistent_index forward when executing alarmList operation The alarm list is the only exception that doesn't move consistent_index forward. The reproduction steps are as simple as, ``` etcd --snapshot-count=5 & for i in {1..6}; do etcdctl alarm list; done kill -9 etcd ``` Signed-off-by: Benjamin Wang --- server/etcdserver/server.go | 5 ++--- tests/common/alarm_test.go | 25 +++++++++++++++++++++++++ tests/framework/config/cluster.go | 1 + tests/framework/e2e.go | 3 ++- tests/framework/e2e/etcd_process.go | 4 ++-- tests/framework/integration.go | 10 ++++++---- 6 files changed, 38 insertions(+), 10 deletions(-) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 99a2159d993..5f039c52481 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -1840,7 +1840,6 @@ func (s *EtcdServer) apply( // applyEntryNormal applies an EntryNormal type raftpb request to the EtcdServer func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { shouldApplyV3 := membership.ApplyV2storeOnly - applyV3Performed := false var ar *apply.Result index := s.consistIndex.ConsistentIndex() if e.Index > index { @@ -1850,7 +1849,8 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { defer func() { // The txPostLockInsideApplyHook will not get called in some cases, // in which we should move the consistent index forward directly. - if !applyV3Performed || (ar != nil && ar.Err != nil) { + newIndex := s.consistIndex.ConsistentIndex() + if newIndex < e.Index { s.consistIndex.SetConsistentIndex(e.Index, e.Term) } }() @@ -1903,7 +1903,6 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { if !needResult && raftReq.Txn != nil { removeNeedlessRangeReqs(raftReq.Txn) } - applyV3Performed = true ar = s.uberApply.Apply(&raftReq, shouldApplyV3) } diff --git a/tests/common/alarm_test.go b/tests/common/alarm_test.go index 11784fc3b9f..d9218277d4b 100644 --- a/tests/common/alarm_test.go +++ b/tests/common/alarm_test.go @@ -104,3 +104,28 @@ func TestAlarm(t *testing.T) { } }) } + +func TestAlarmlistOnMemberRestart(t *testing.T) { + testRunner.BeforeTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + clus := testRunner.NewCluster(ctx, t, config.ClusterConfig{ + ClusterSize: 1, + QuotaBackendBytes: int64(13 * os.Getpagesize()), + SnapshotCount: 5, + }) + defer clus.Close() + + testutils.ExecuteUntil(ctx, t, func() { + for i := 0; i < 6; i++ { + if _, err := clus.Client().AlarmList(ctx); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + + clus.Members()[0].Stop() + if err := clus.Members()[0].Start(ctx); err != nil { + t.Fatalf("failed to start etcdserver: %v", err) + } + }) +} diff --git a/tests/framework/config/cluster.go b/tests/framework/config/cluster.go index 22eeaae12cd..0af2cc1e8c3 100644 --- a/tests/framework/config/cluster.go +++ b/tests/framework/config/cluster.go @@ -32,4 +32,5 @@ type ClusterConfig struct { ClientTLS TLSConfig QuotaBackendBytes int64 DisableStrictReconfigCheck bool + SnapshotCount int } diff --git a/tests/framework/e2e.go b/tests/framework/e2e.go index e113249b641..da694091612 100644 --- a/tests/framework/e2e.go +++ b/tests/framework/e2e.go @@ -47,6 +47,7 @@ func (e e2eRunner) NewCluster(ctx context.Context, t testing.TB, cfg config.Clus ClusterSize: cfg.ClusterSize, QuotaBackendBytes: cfg.QuotaBackendBytes, DisableStrictReconfigCheck: cfg.DisableStrictReconfigCheck, + SnapshotCount: cfg.SnapshotCount, } switch cfg.ClientTLS { case config.NoTLS: @@ -175,7 +176,7 @@ func (m e2eMember) Client() Client { } func (m e2eMember) Start(ctx context.Context) error { - return m.Restart(ctx) + return m.EtcdProcess.Start(ctx) } func (m e2eMember) Stop() { diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index ede5f3bb8a4..197f3f4bd5a 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -100,6 +100,7 @@ func (ep *EtcdServerProcess) EndpointsV3() []string { return ep.EndpointsV2 func (ep *EtcdServerProcess) EndpointsMetrics() []string { return []string{ep.cfg.Murl} } func (ep *EtcdServerProcess) Start(ctx context.Context) error { + ep.donec = make(chan struct{}) if ep.proc != nil { panic("already started") } @@ -121,7 +122,6 @@ func (ep *EtcdServerProcess) Restart(ctx context.Context) error { if err := ep.Stop(); err != nil { return err } - ep.donec = make(chan struct{}) err := ep.Start(ctx) if err == nil { ep.cfg.lg.Info("restarted server", zap.String("name", ep.cfg.Name)) @@ -135,10 +135,10 @@ func (ep *EtcdServerProcess) Stop() (err error) { return nil } err = ep.proc.Stop() + ep.proc = nil if err != nil { return err } - ep.proc = nil <-ep.donec ep.donec = make(chan struct{}) if ep.cfg.Purl.Scheme == "unix" || ep.cfg.Purl.Scheme == "unixs" { diff --git a/tests/framework/integration.go b/tests/framework/integration.go index 05b3bceb9d6..5787415e228 100644 --- a/tests/framework/integration.go +++ b/tests/framework/integration.go @@ -44,11 +44,13 @@ func (e integrationRunner) BeforeTest(t testing.TB) { func (e integrationRunner) NewCluster(ctx context.Context, t testing.TB, cfg config.ClusterConfig) Cluster { var err error - var integrationCfg integration.ClusterConfig - integrationCfg.Size = cfg.ClusterSize + integrationCfg := integration.ClusterConfig{ + Size: cfg.ClusterSize, + QuotaBackendBytes: cfg.QuotaBackendBytes, + DisableStrictReconfigCheck: cfg.DisableStrictReconfigCheck, + SnapshotCount: uint64(cfg.SnapshotCount), + } integrationCfg.ClientTLS, err = tlsInfo(t, cfg.ClientTLS) - integrationCfg.QuotaBackendBytes = cfg.QuotaBackendBytes - integrationCfg.DisableStrictReconfigCheck = cfg.DisableStrictReconfigCheck if err != nil { t.Fatalf("ClientTLS: %s", err) }