Skip to content

Commit

Permalink
*: LeaseTimeToLive returns error if leader changed
Browse files Browse the repository at this point in the history
The old leader demotes lessor and all the leases' expire time will be
updated. Instead of returning incorrect remaining TTL, we should return
errors to force client retry.

Signed-off-by: Wei Fu <[email protected]>
  • Loading branch information
fuweid committed Mar 26, 2024
1 parent e4448c4 commit d3bb6f6
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 2 deletions.
12 changes: 12 additions & 0 deletions server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,9 @@ func (s *EtcdServer) leaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR
if err := s.waitAppliedIndex(); err != nil {
return nil, err
}

// gofail: var beforeLookupWhenLeaseTimeToLive struct{}

// primary; timetolive directly from leader
le := s.lessor.Lookup(lease.LeaseID(r.ID))
if le == nil {
Expand All @@ -372,6 +375,15 @@ func (s *EtcdServer) leaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR
}
resp.Keys = kbs
}

// The leasor could be demoted if leader changed during lookup.
// We should return error to force retry instead of returning
// incorrect remaining TTL.
if le.Demoted() {
// NOTE: lease.ErrNotPrimary is not retryable error for
// client. Instead, uses ErrLeaderChanged.
return nil, errors.ErrLeaderChanged
}
return resp, nil
}

Expand Down
7 changes: 7 additions & 0 deletions server/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ func (l *Lease) forever() {
l.expiry = forever
}

// Demoted returns true if the lease's expiry has been reset to forever.
func (l *Lease) Demoted() bool {
l.expiryMu.Lock()
defer l.expiryMu.Unlock()
return l.expiry == forever
}

// Keys returns all the keys attached to the lease.
func (l *Lease) Keys() []string {
l.mu.RLock()
Expand Down
11 changes: 11 additions & 0 deletions server/lease/leasehttp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
return
}

// gofail: var beforeLookupWhenForwardLeaseTimeToLive struct{}

l := h.l.Lookup(lease.LeaseID(lreq.LeaseTimeToLiveRequest.ID))
if l == nil {
http.Error(w, lease.ErrLeaseNotFound.Error(), http.StatusNotFound)
Expand All @@ -126,6 +129,14 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
resp.LeaseTimeToLiveResponse.Keys = kbs
}

// The leasor could be demoted if leader changed during lookup.
// We should return error to force retry instead of returning
// incorrect remaining TTL.
if l.Demoted() {
http.Error(w, lease.ErrNotPrimary.Error(), http.StatusInternalServerError)
return
}

v, err = resp.Marshal()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down
76 changes: 76 additions & 0 deletions tests/integration/v3_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
Expand All @@ -30,8 +32,10 @@ import (
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/pkg/v3/testutil"
clientv3 "go.etcd.io/etcd/client/v3"
framecfg "go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/integration"
gofail "go.etcd.io/gofail/runtime"
)

// TestV3LeasePromote ensures the newly elected leader can promote itself
Expand Down Expand Up @@ -1046,6 +1050,78 @@ func TestV3LeaseRecoverKeyWithMutipleLease(t *testing.T) {
}
}

func TestV3LeaseTimeToLiveWithLeaderChanged(t *testing.T) {
t.Run("normal", func(subT *testing.T) {
testV3LeaseTimeToLiveWithLeaderChanged(subT, "beforeLookupWhenLeaseTimeToLive")
})

t.Run("forward", func(subT *testing.T) {
testV3LeaseTimeToLiveWithLeaderChanged(subT, "beforeLookupWhenForwardLeaseTimeToLive")
})
}

func testV3LeaseTimeToLiveWithLeaderChanged(t *testing.T, fpName string) {
if len(gofail.List()) == 0 {
t.Skip("please run 'make gofail-enable' before running the test")
}

integration.BeforeTest(t)

clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

oldLeadIdx := clus.WaitLeader(t)
followerIdx := (oldLeadIdx + 1) % 3

followerMemberID := clus.Members[followerIdx].ID()

oldLeadC := clus.Client(oldLeadIdx)

leaseResp, err := oldLeadC.Grant(ctx, 100)
require.NoError(t, err)

require.NoError(t, gofail.Enable(fpName, `sleep("3s")`))
t.Cleanup(func() {
terr := gofail.Disable(fpName)
if terr != nil && terr != gofail.ErrDisabled {
t.Fatalf("failed to disable %s: %v", fpName, terr)
}
})

readyCh := make(chan struct{})
errCh := make(chan error, 1)

var targetC *clientv3.Client
switch fpName {
case "beforeLookupWhenLeaseTimeToLive":
targetC = oldLeadC
case "beforeLookupWhenForwardLeaseTimeToLive":
targetC = clus.Client((oldLeadIdx + 2) % 3)
default:
t.Fatalf("unsupported %s failpoint", fpName)
}

go func() {
<-readyCh
time.Sleep(1 * time.Second)

_, merr := oldLeadC.MoveLeader(ctx, uint64(followerMemberID))
assert.NoError(t, gofail.Disable(fpName))
errCh <- merr
}()

close(readyCh)

ttlResp, err := targetC.TimeToLive(ctx, leaseResp.ID)
require.NoError(t, err)
require.GreaterOrEqual(t, int64(100), ttlResp.TTL)

require.NoError(t, <-errCh)
}

// acquireLeaseAndKey creates a new lease and creates an attached key.
func acquireLeaseAndKey(clus *integration.Cluster, key string) (int64, error) {
// create lease
Expand Down
4 changes: 2 additions & 2 deletions tests/robustness/makefile.mk
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ GOFAIL_VERSION = $(shell cd tools/mod && go list -m -f {{.Version}} go.etcd.io/g

.PHONY: gofail-enable
gofail-enable: install-gofail
gofail enable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/
gofail enable server/etcdserver/ server/lease/leasehttp server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/
cd ./server && go get go.etcd.io/gofail@${GOFAIL_VERSION}
cd ./etcdutl && go get go.etcd.io/gofail@${GOFAIL_VERSION}
cd ./etcdctl && go get go.etcd.io/gofail@${GOFAIL_VERSION}
cd ./tests && go get go.etcd.io/gofail@${GOFAIL_VERSION}

.PHONY: gofail-disable
gofail-disable: install-gofail
gofail disable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/
gofail disable server/etcdserver/ server/lease/leasehttp server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/
cd ./server && go mod tidy
cd ./etcdutl && go mod tidy
cd ./etcdctl && go mod tidy
Expand Down

0 comments on commit d3bb6f6

Please sign in to comment.