Skip to content

Commit

Permalink
Backport ignore old leader's leases revoking request
Browse files Browse the repository at this point in the history
Backport of PR etcd-io#16822, commits f7e488d,
67f1716,
and f7ff898.

Co-authored-by: Benjamin Wang <[email protected]>
Signed-off-by: Ivan Valdes <[email protected]>
  • Loading branch information
ivanvc and ahrtr committed Feb 20, 2024
1 parent 6e86445 commit 44d7858
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 7 deletions.
61 changes: 60 additions & 1 deletion etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,19 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {

if srv.Cfg.EnableLeaseCheckpoint {
// setting checkpointer enables lease checkpoint feature.
srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) error {
if !srv.ensureLeadership() {
if lg := srv.getLogger(); lg != nil {
lg.Warn("Ignore the checkpoint request because current member isn't a leader",
zap.Uint64("local-member-id", uint64(srv.ID())))
} else {
plog.Warningf("Ignore the checkpoint request because current member %d isn't a leader", uint64(srv.ID()))
}
return lease.ErrNotPrimary
}

srv.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseCheckpoint: cp})
return nil
})
}

Expand Down Expand Up @@ -1098,7 +1109,23 @@ func (s *EtcdServer) run() {

func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) {
s.goAttach(func() {
// We shouldn't revoke any leases if current member isn't a leader,
// because the operation should only be performed by the leader. When
// the leader gets blocked on the raft loop, such as writing WAL entries,
// it can't process any events or messages from raft. It may think it
// is still the leader even the leader has already changed.
// Refer to https://github.com/etcd-io/etcd/issues/15247
lg := s.Logger()
if !s.ensureLeadership() {
if lg != nil {
lg.Warn("Ignore the lease revoking request because current member isn't a leader",
zap.Uint64("local-member-id", uint64(s.ID())))
} else {
plog.Warningf("Ignore the lease revoking request because current member %d isn't a leader", uint64(s.ID()))
}
return
}

// Increases throughput of expired leases deletion process through parallelization
c := make(chan struct{}, maxPendingRevokes)
for _, curLease := range leases {
Expand Down Expand Up @@ -1135,6 +1162,38 @@ func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) {
})
}

// ensureLeadership checks whether current member is still the leader.
func (s *EtcdServer) ensureLeadership() bool {
lg := s.Logger()

ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
defer cancel()
if err := s.linearizableReadNotify(ctx); err != nil {
if lg != nil {
lg.Warn("Failed to check current member's leadership", zap.Error(err))
} else {
plog.Warningf("Failed to check current member's leadership: %s", err)
}

return false
}

newLeaderId := s.raftStatus().Lead
if newLeaderId != uint64(s.ID()) {
if lg != nil {
lg.Warn("Current member isn't a leader",
zap.Uint64("local-member-id", uint64(s.ID())),
zap.Uint64("new-lead", newLeaderId))
} else {
plog.Warningf("Current member %d isn't a leader (new-lead=%d)", uint64(s.ID()), newLeaderId)
}

return false
}

return true
}

func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
s.applySnapshot(ep, apply)
s.applyEntries(ep, apply)
Expand Down
11 changes: 11 additions & 0 deletions etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,17 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)

func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
if s.isLeader() {
// If s.isLeader() returns true, but we fail to ensure the current
// member's leadership, there are a couple of possibilities:
// 1. current member gets stuck on writing WAL entries;
// 2. current member is in network isolation status;
// 3. current member isn't a leader anymore (possibly due to #1 above).
// In such case, we just return error to client, so that the client can
// switch to another member to continue the lease keep-alive operation.
if !s.ensureLeadership() {
return -1, lease.ErrNotPrimary
}

if err := s.waitAppliedIndex(); err != nil {
return 0, err
}
Expand Down
2 changes: 1 addition & 1 deletion integration/v3_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ func TestV3LeaseFailover(t *testing.T) {

// send keep alive to old leader until the old leader starts
// to drop lease request.
var expectedExp time.Time
expectedExp := time.Now().Add(5 * time.Second)
for {
if err = lac.Send(lreq); err != nil {
break
Expand Down
10 changes: 7 additions & 3 deletions lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type RangeDeleter func() TxnDelete

// Checkpointer permits checkpointing of lease remaining TTLs to the consensus log. Defined here to
// avoid circular dependency with mvcc.
type Checkpointer func(ctx context.Context, lc *pb.LeaseCheckpointRequest)
type Checkpointer func(ctx context.Context, lc *pb.LeaseCheckpointRequest) error

type LeaseID int64

Expand Down Expand Up @@ -424,7 +424,9 @@ func (le *lessor) Renew(id LeaseID) (int64, error) {
// By applying a RAFT entry only when the remainingTTL is already set, we limit the number
// of RAFT entries written per lease to a max of 2 per checkpoint interval.
if clearRemainingTTL {
le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}})
if err := le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}}); err != nil {
return -1, err
}
}

le.mu.Lock()
Expand Down Expand Up @@ -660,7 +662,9 @@ func (le *lessor) checkpointScheduledLeases() {
le.mu.Unlock()

if len(cps) != 0 {
le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps})
if err := le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps}); err != nil {
return
}
}
if len(cps) < maxLeaseCheckpointBatchSize {
return
Expand Down
6 changes: 4 additions & 2 deletions lease/lessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,11 @@ func TestLessorRenewWithCheckpointer(t *testing.T) {
defer os.RemoveAll(dir)

le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) error {
for _, cp := range cp.GetCheckpoints() {
le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL())
}
return nil
}
defer le.Stop()
// Set checkpointer
Expand Down Expand Up @@ -538,7 +539,7 @@ func TestLessorCheckpointScheduling(t *testing.T) {
defer le.Stop()
le.minLeaseTTL = 1
checkpointedC := make(chan struct{})
le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) {
le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) error {
close(checkpointedC)
if len(lc.Checkpoints) != 1 {
t.Errorf("expected 1 checkpoint but got %d", len(lc.Checkpoints))
Expand All @@ -547,6 +548,7 @@ func TestLessorCheckpointScheduling(t *testing.T) {
if c.Remaining_TTL != 1 {
t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL)
}
return nil
})
_, err := le.Grant(1, 2)
if err != nil {
Expand Down
156 changes: 156 additions & 0 deletions tests/e2e/v3_lease_no_proxy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !cluster_proxy

package e2e

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/pkg/testutil"
)

// TestLeaseRevoke_IgnoreOldLeader verifies that leases shouldn't be revoked
// by old leader.
// See the case 1 in https://github.com/etcd-io/etcd/issues/15247#issuecomment-1777862093.
func TestLeaseRevoke_IgnoreOldLeader(t *testing.T) {
testLeaseRevokeIssue(t, true)
}

// TestLeaseRevoke_ClientSwitchToOtherMember verifies that leases shouldn't
// be revoked by new leader.
// See the case 2 in https://github.com/etcd-io/etcd/issues/15247#issuecomment-1777862093.
func TestLeaseRevoke_ClientSwitchToOtherMember(t *testing.T) {
testLeaseRevokeIssue(t, false)
}

func testLeaseRevokeIssue(t *testing.T, connectToOneFollower bool) {
defer testutil.AfterTest(t)

ctx := context.Background()

t.Log("Starting a new etcd cluster")
epc, err := newEtcdProcessCluster(t, &etcdProcessClusterConfig{
clusterSize: 3,
goFailEnabled: true,
goFailClientTimeout: 40 * time.Second,
})
require.NoError(t, err)
defer func() {
if errC := epc.Close(); errC != nil {
t.Fatalf("error closing etcd processes (%v)", errC)
}
}()

leaderIdx := epc.WaitLeader(t)
t.Logf("Leader index: %d", leaderIdx)

epsForNormalOperations := epc.procs[(leaderIdx+2)%3].EndpointsGRPC()
t.Logf("Creating a client for normal operations: %v", epsForNormalOperations)
client, err := clientv3.New(clientv3.Config{Endpoints: epsForNormalOperations, DialTimeout: 3 * time.Second})
require.NoError(t, err)
defer client.Close()

var epsForLeaseKeepAlive []string
if connectToOneFollower {
epsForLeaseKeepAlive = epc.procs[(leaderIdx+1)%3].EndpointsGRPC()
} else {
epsForLeaseKeepAlive = epc.EndpointsGRPC()
}
t.Logf("Creating a client for the leaseKeepAlive operation: %v", epsForLeaseKeepAlive)
clientForKeepAlive, err := clientv3.New(clientv3.Config{Endpoints: epsForLeaseKeepAlive, DialTimeout: 3 * time.Second})
require.NoError(t, err)
defer clientForKeepAlive.Close()

resp, err := client.Status(ctx, epsForNormalOperations[0])
require.NoError(t, err)
oldLeaderId := resp.Leader

t.Log("Creating a new lease")
leaseRsp, err := client.Grant(ctx, 20)
require.NoError(t, err)

t.Log("Starting a goroutine to keep alive the lease")
doneC := make(chan struct{})
stopC := make(chan struct{})
startC := make(chan struct{}, 1)
go func() {
defer close(doneC)

respC, kerr := clientForKeepAlive.KeepAlive(ctx, leaseRsp.ID)
require.NoError(t, kerr)
// ensure we have received the first response from the server
<-respC
startC <- struct{}{}

for {
select {
case <-stopC:
return
case <-respC:
}
}
}()

t.Log("Wait for the keepAlive goroutine to get started")
<-startC

t.Log("Trigger the failpoint to simulate stalled writing")
err = epc.procs[leaderIdx].Failpoints().SetupHTTP(ctx, "raftBeforeSave", `sleep("30s")`)
require.NoError(t, err)

cctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
t.Logf("Waiting for a new leader to be elected, old leader index: %d, old leader ID: %d", leaderIdx, oldLeaderId)
executeUntil(cctx, t, func() {
for {
resp, err = client.Status(ctx, epsForNormalOperations[0])
if err == nil && resp.Leader != oldLeaderId {
t.Logf("A new leader has already been elected, new leader index: %d", resp.Leader)
return
}
time.Sleep(100 * time.Millisecond)
}
})
cancel()

t.Log("Writing a key/value pair")
_, err = client.Put(ctx, "foo", "bar")
require.NoError(t, err)

t.Log("Sleeping 30 seconds")
time.Sleep(30 * time.Second)

t.Log("Remove the failpoint 'raftBeforeSave'")
err = epc.procs[leaderIdx].Failpoints().DeactivateHTTP(ctx, "raftBeforeSave")
require.NoError(t, err)

// By default, etcd tries to revoke leases every 7 seconds.
t.Log("Sleeping 10 seconds")
time.Sleep(10 * time.Second)

t.Log("Confirming the lease isn't revoked")
leases, err := client.Leases(ctx)
require.NoError(t, err)
require.Equal(t, 1, len(leases.Leases))

t.Log("Waiting for the keepAlive goroutine to exit")
close(stopC)
<-doneC
}

0 comments on commit 44d7858

Please sign in to comment.