Skip to content

Commit

Permalink
Merge pull request #130523 from cockroachdb/blathers/backport-staging…
Browse files Browse the repository at this point in the history
…-v23.2.11-129808

staging-v23.2.11: kv: prevent lease interval regression during expiration-to-epoch promotion
  • Loading branch information
celiala authored Sep 12, 2024
2 parents c52ebc8 + 662bbcb commit bb17c96
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 53 deletions.
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_lease_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package batcheval

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand Down Expand Up @@ -68,6 +69,15 @@ func RequestLease(
Requested: args.Lease,
}

// However, we verify that the current lease's sequence number and proposed
// timestamp match the provided PrevLease. This ensures that the validation
// here is consistent with the validation that was performed when the lease
// request was constructed.
if prevLease.Sequence != args.PrevLease.Sequence || !prevLease.ProposedTS.Equal(args.PrevLease.ProposedTS) {
rErr.Message = fmt.Sprintf("expected previous lease %s, found %s", args.PrevLease, prevLease)
return newFailedLeaseTrigger(false /* isTransfer */), rErr
}

// MIGRATION(tschottdorf): needed to apply Raft commands which got proposed
// before the StartStasis field was introduced.
newLease := args.Lease
Expand Down
82 changes: 82 additions & 0 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -1636,3 +1637,84 @@ func TestLeaseRequestBumpsEpoch(t *testing.T) {
require.Greater(t, liveness.Epoch, prevLease.Epoch)
})
}

// TestLeaseRequestFromExpirationToEpochDoesNotRegressExpiration tests that a
// promotion from an expiration-based lease to an epoch-based lease does not
// permit the expiration time of the lease to regress. This is enforced by
// detecting cases where the leaseholder's liveness record's expiration trails
// its expiration-based lease's expiration and synchronously heartbeating the
// leaseholder's liveness record before promoting the lease.
func TestLeaseRequestFromExpirationToEpochDoesNotRegressExpiration(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, true) // override metamorphism

tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: st,
},
})
defer tc.Stopper().Stop(ctx)

// Create scratch range.
key := tc.ScratchRange(t)
desc := tc.LookupRangeOrFatal(t, key)

// Pause n1's node liveness heartbeats, to allow its liveness expiration to
// fall behind.
l0 := tc.Server(0).NodeLiveness().(*liveness.NodeLiveness)
l0.PauseHeartbeatLoopForTest()
l, ok := l0.GetLiveness(tc.Server(0).NodeID())
require.True(t, ok)

// Make sure n1 has an expiration-based lease.
s0 := tc.GetFirstStoreFromServer(t, 0)
repl := s0.LookupReplica(desc.StartKey)
require.NotNil(t, repl)
expLease := repl.CurrentLeaseStatus(ctx)
require.True(t, expLease.IsValid())
require.Equal(t, roachpb.LeaseExpiration, expLease.Lease.Type())

// Wait for the expiration-based lease to have a later expiration than the
// expiration timestamp in n1's liveness record.
testutils.SucceedsSoon(t, func() error {
expLease = repl.CurrentLeaseStatus(ctx)
if expLease.Expiration().Less(l.Expiration.ToTimestamp()) {
return errors.Errorf("lease %v not extended beyond liveness %v", expLease, l)
}
return nil
})

// Enable epoch-based leases. This will cause automatic lease renewal to try
// to promote the expiration-based lease to an epoch-based lease.
//
// Since we have disabled the background node liveness heartbeat loop, it is
// critical that this lease promotion synchronously heartbeats node liveness
// before acquiring the epoch-based lease.
kvserver.ExpirationLeasesOnly.Override(ctx, &s0.ClusterSettings().SV, false)

// Wait for that lease promotion to occur.
var epochLease kvserverpb.LeaseStatus
testutils.SucceedsSoon(t, func() error {
epochLease = repl.CurrentLeaseStatus(ctx)
if epochLease.Lease.Type() != roachpb.LeaseEpoch {
return errors.Errorf("lease %v not upgraded to epoch-based", epochLease)
}
return nil
})

// Once the lease has been promoted to an epoch-based lease, the effective
// expiration (maintained indirectly in the liveness record) must be greater
// than that in the preceding expiration-based lease. If this were to regress,
// a non-cooperative lease failover to a third lease held by a different node
// could overlap in MVCC time with the first lease (i.e. its start time could
// precede expLease.Expiration), violating the lease disjointness property.
//
// If we disable the `expToEpochPromo` branch in replica_range_lease.go, this
// assertion fails.
require.True(t, expLease.Expiration().Less(epochLease.Expiration()))
}
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2828,6 +2828,12 @@ func TestStoreCapacityAfterSplit(t *testing.T) {
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: st,
RaftConfig: base.RaftConfig{
// We plan to increment the manual clock by MinStatsDuration a few
// times below and would like for leases to not expire. Configure a
// longer lease duration to achieve this.
RangeLeaseDuration: 10 * replicastats.MinStatsDuration,
},
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
WallClock: manualClock,
Expand All @@ -2845,8 +2851,6 @@ func TestStoreCapacityAfterSplit(t *testing.T) {
key := tc.ScratchRange(t)
desc := tc.AddVotersOrFatal(t, key, tc.Target(1))
tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1))

tc.IncrClockForLeaseUpgrade(t, manualClock)
tc.WaitForLeaseUpgrade(ctx, t, desc)

cap, err := s.Capacity(ctx, false /* useCached */)
Expand Down
15 changes: 3 additions & 12 deletions pkg/kv/kvserver/liveness/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,9 @@ var errNodeAlreadyLive = errors.New("node already live")
//
// If this method returns nil, the node's liveness has been extended,
// relative to the previous value. It may or may not still be alive
// when this method returns.
// when this method returns. It may also not have been extended as far
// as the livenessThreshold, because the caller may have raced with
// another heartbeater.
//
// On failure, this method returns ErrEpochIncremented, although this
// may not necessarily mean that the epoch was actually incremented.
Expand Down Expand Up @@ -847,17 +849,6 @@ func (nl *NodeLiveness) heartbeatInternal(
// expired while in flight, so maybe we don't have to care about
// that and only need to distinguish between same and different
// epochs in our return value.
//
// TODO(nvanbenschoten): Unlike the early return above, this doesn't
// guarantee that the resulting expiration is past minExpiration,
// only that it's different than our oldLiveness. Is that ok? It
// hasn't caused issues so far, but we might want to detect this
// case and retry, at least in the case of the liveness heartbeat
// loop. The downside of this is that a heartbeat that's intending
// to bump the expiration of a record out 9s into the future may
// return a success even if the expiration is only 5 seconds in the
// future. The next heartbeat will then start with only 0.5 seconds
// before expiration.
if actual.IsLive(nl.clock.Now()) && !incrementEpoch {
return errNodeAlreadyLive
}
Expand Down
106 changes: 86 additions & 20 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
nextLeaseHolder roachpb.ReplicaDescriptor,
status kvserverpb.LeaseStatus,
startKey roachpb.Key,
transfer bool,
bypassSafetyChecks bool,
limiter *quotapool.IntPool,
) *leaseRequestHandle {
Expand All @@ -252,10 +251,20 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
nextLeaseHolder.ReplicaID, nextLease.Replica.ReplicaID))
}

acquisition := !status.Lease.OwnedBy(p.repl.store.StoreID())
extension := !transfer && !acquisition
// Who owns the previous and next lease?
prevLocal := status.Lease.OwnedBy(p.repl.store.StoreID())
nextLocal := nextLeaseHolder.StoreID == p.repl.store.StoreID()

// Assert that the lease acquisition, extension, or transfer is valid.
acquisition := !prevLocal && nextLocal
extension := prevLocal && nextLocal
transfer := prevLocal && !nextLocal
remote := !prevLocal && !nextLocal
_ = extension // not used, just documentation

if remote {
log.Fatalf(ctx, "cannot acquire/extend lease for remote replica: %v -> %v", status, nextLeaseHolder)
}
if acquisition {
// If this is a non-cooperative lease change (i.e. an acquisition), it
// is up to us to ensure that Lease.Start is greater than the end time
Expand Down Expand Up @@ -294,6 +303,7 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
ProposedTS: &status.Now,
}

var reqLeaseLiveness livenesspb.Liveness
if p.repl.shouldUseExpirationLeaseRLocked() ||
(transfer &&
TransferExpirationLeasesFirstEnabled.Get(&p.repl.store.ClusterSettings().SV)) {
Expand Down Expand Up @@ -324,6 +334,7 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
return llHandle
}
reqLease.Epoch = l.Epoch
reqLeaseLiveness = l.Liveness
}

var leaseReq kvpb.Request
Expand Down Expand Up @@ -354,7 +365,7 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
}
}

err := p.requestLeaseAsync(ctx, nextLeaseHolder, status, leaseReq, limiter)
err := p.requestLeaseAsync(ctx, status, reqLease, reqLeaseLiveness, leaseReq, limiter)
if err != nil {
if errors.Is(err, stop.ErrThrottled) {
llHandle.resolve(kvpb.NewError(err))
Expand Down Expand Up @@ -385,10 +396,14 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
//
// The status argument is used as the expected value for liveness operations.
// leaseReq must be consistent with the LeaseStatus.
//
// The reqLeaseLiveness argument is provided when reqLease is an epoch-based
// lease.
func (p *pendingLeaseRequest) requestLeaseAsync(
parentCtx context.Context,
nextLeaseHolder roachpb.ReplicaDescriptor,
status kvserverpb.LeaseStatus,
reqLease roachpb.Lease,
reqLeaseLiveness livenesspb.Liveness,
leaseReq kvpb.Request,
limiter *quotapool.IntPool,
) error {
Expand Down Expand Up @@ -424,7 +439,7 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
// RPC, but here we submit the request directly to the local replica.
growstack.Grow()

err := p.requestLease(ctx, nextLeaseHolder, status, leaseReq)
err := p.requestLease(ctx, status, reqLease, reqLeaseLiveness, leaseReq)
// Error will be handled below.

// We reset our state below regardless of whether we've gotten an error or
Expand Down Expand Up @@ -464,24 +479,68 @@ var logFailedHeartbeatOwnLiveness = log.Every(10 * time.Second)
// requestLease sends a synchronous transfer lease or lease request to the
// specified replica. It is only meant to be called from requestLeaseAsync,
// since it does not coordinate with other in-flight lease requests.
//
// The reqLeaseLiveness argument is provided when reqLease is an epoch-based
// lease.
func (p *pendingLeaseRequest) requestLease(
ctx context.Context,
nextLeaseHolder roachpb.ReplicaDescriptor,
status kvserverpb.LeaseStatus,
reqLease roachpb.Lease,
reqLeaseLiveness livenesspb.Liveness,
leaseReq kvpb.Request,
) error {
started := timeutil.Now()
defer func() {
p.repl.store.metrics.LeaseRequestLatency.RecordValue(timeutil.Since(started).Nanoseconds())
}()

nextLeaseHolder := reqLease.Replica
extension := status.OwnedBy(nextLeaseHolder.StoreID)

// If we are promoting an expiration-based lease to an epoch-based lease, we
// must make sure the expiration does not regress. We do this here because the
// expiration is stored directly in the lease for expiration-based leases but
// indirectly in liveness record for epoch-based leases. To ensure this, we
// manually heartbeat our liveness record if necessary. This is expected to
// work because the liveness record interval and the expiration-based lease
// interval are the same.
expToEpochPromo := extension && status.Lease.Type() == roachpb.LeaseExpiration && reqLease.Type() == roachpb.LeaseEpoch
if expToEpochPromo && reqLeaseLiveness.Expiration.ToTimestamp().Less(status.Lease.GetExpiration()) {
curLiveness := reqLeaseLiveness
for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); {
err := p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, curLiveness)
if err != nil {
if logFailedHeartbeatOwnLiveness.ShouldLog() {
log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err)
}
return kvpb.NewNotLeaseHolderError(roachpb.Lease{}, p.repl.store.StoreID(), p.repl.Desc(),
fmt.Sprintf("failed to manipulate liveness record: %s", err))
}
// Check whether the liveness record expiration is now greater than the
// expiration of the lease we're promoting. If not, we may have raced with
// another liveness heartbeat which did not extend the liveness expiration
// far enough and we should try again.
l, ok := p.repl.store.cfg.NodeLiveness.GetLiveness(reqLeaseLiveness.NodeID)
if !ok {
return errors.NewAssertionErrorWithWrappedErrf(liveness.ErrRecordCacheMiss, "after heartbeat")
}
if l.Expiration.ToTimestamp().Less(status.Lease.GetExpiration()) {
log.Infof(ctx, "expiration of liveness record %s is not greater than "+
"expiration of the previous lease %s after liveness heartbeat, retrying...", l, status.Lease)
curLiveness = l.Liveness
continue
}
break
}
}

// If we're replacing an expired epoch-based lease, we must increment the
// epoch of the prior owner to invalidate its leases. If we were the owner,
// then we instead heartbeat to become live.
if status.Lease.Type() == roachpb.LeaseEpoch && status.State == kvserverpb.LeaseState_EXPIRED {
var err error
// If this replica is previous & next lease holder, manually heartbeat to become live.
if status.OwnedBy(nextLeaseHolder.StoreID) && p.repl.store.StoreID() == nextLeaseHolder.StoreID {
if extension {
if err = p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, status.Liveness); err != nil && logFailedHeartbeatOwnLiveness.ShouldLog() {
log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err)
}
Expand Down Expand Up @@ -882,7 +941,7 @@ func (r *Replica) requestLeaseLocked(
}
return r.mu.pendingLeaseRequest.InitOrJoinRequest(
ctx, repDesc, status, r.mu.state.Desc.StartKey.AsRawKey(),
false /* transfer */, false /* bypassSafetyChecks */, limiter)
false /* bypassSafetyChecks */, limiter)
}

// AdminTransferLease transfers the LeaderLease to another replica. Only the
Expand Down Expand Up @@ -976,7 +1035,7 @@ func (r *Replica) AdminTransferLease(
}

transfer = r.mu.pendingLeaseRequest.InitOrJoinRequest(ctx, nextLeaseHolder, status,
desc.StartKey.AsRawKey(), true /* transfer */, bypassSafetyChecks, nil /* limiter */)
desc.StartKey.AsRawKey(), bypassSafetyChecks, nil /* limiter */)
return nil, transfer, nil
}

Expand Down Expand Up @@ -1523,17 +1582,24 @@ func (r *Replica) shouldRequestLeaseRLocked(
// maybeSwitchLeaseType will synchronously renew a lease using the appropriate
// type if it is (or was) owned by this replica and has an incorrect type. This
// typically happens when changing kv.expiration_leases_only.enabled.
func (r *Replica) maybeSwitchLeaseType(ctx context.Context, st kvserverpb.LeaseStatus) *kvpb.Error {
if !st.OwnedBy(r.store.StoreID()) {
return nil
}
func (r *Replica) maybeSwitchLeaseType(ctx context.Context) *kvpb.Error {
llHandle := func() *leaseRequestHandle {
now := r.store.Clock().NowAsClockTimestamp()
// The lease status needs to be checked and requested under the same lock,
// to avoid an interleaving lease request changing the lease between the
// two.
r.mu.Lock()
defer r.mu.Unlock()

var llHandle *leaseRequestHandle
r.mu.Lock()
if !r.hasCorrectLeaseTypeRLocked(st.Lease) {
llHandle = r.requestLeaseLocked(ctx, st, nil /* limiter */)
}
r.mu.Unlock()
st := r.leaseStatusAtRLocked(ctx, now)
if !st.OwnedBy(r.store.StoreID()) {
return nil
}
if r.hasCorrectLeaseTypeRLocked(st.Lease) {
return nil
}
return r.requestLeaseLocked(ctx, st, nil /* limiter */)
}()

if llHandle != nil {
select {
Expand Down
Loading

0 comments on commit bb17c96

Please sign in to comment.