Skip to content

Commit

Permalink
kvserver: create node liveness in tests
Browse files Browse the repository at this point in the history
Previously node liveness wasn't constructed in tests. This commit adds
it to allow removing a number of nil checks.

Epic: none

Release note: None
  • Loading branch information
andrewbaptist committed Nov 14, 2024
1 parent 8dd01a7 commit d34af36
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 23 deletions.
6 changes: 1 addition & 5 deletions pkg/kv/kvserver/consistency_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,7 @@ func (q *consistencyQueue) shouldQueue(
return repl.getQueueLastProcessed(ctx, q.name)
},
isNodeAvailable: func(nodeID roachpb.NodeID) bool {
if repl.store.cfg.NodeLiveness != nil {
return repl.store.cfg.NodeLiveness.GetNodeVitalityFromCache(nodeID).IsLive(livenesspb.ConsistencyQueue)
}
// Some tests run without a NodeLiveness configured.
return true
return repl.store.cfg.NodeLiveness.GetNodeVitalityFromCache(nodeID).IsLive(livenesspb.ConsistencyQueue)
},
disableLastProcessedCheck: repl.store.cfg.TestingKnobs.DisableLastProcessedCheck,
interval: q.interval(),
Expand Down
13 changes: 13 additions & 0 deletions pkg/kv/kvserver/liveness/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Cache struct {
clock *hlc.Clock
nodeDialer *nodedialer.Dialer
st *cluster.Settings
isTestMode bool
mu struct {
syncutil.RWMutex
// lastNodeUpdate stores timestamps of StoreDescriptor updates in Gossip.
Expand Down Expand Up @@ -214,6 +215,18 @@ func (c *Cache) self() (_ Record, ok bool) {
// includes the raw, encoded value that the database has for this liveness
// record in addition to the decoded liveness proto.
func (c *Cache) getLiveness(nodeID roachpb.NodeID) (_ Record, ok bool) {
// In test mode, create a node vitality that is always live.
if c.isTestMode {
now := c.clock.Now()
return Record{
Liveness: livenesspb.Liveness{
NodeID: nodeID,
Epoch: 1,
Expiration: now.AddDuration(time.Minute).ToLegacyTimestamp(),
Draining: false,
Membership: livenesspb.MembershipStatus_ACTIVE,
}}, true
}
c.mu.RLock()
defer c.mu.RUnlock()
if l, ok := c.mu.nodes[nodeID]; ok {
Expand Down
32 changes: 32 additions & 0 deletions pkg/kv/kvserver/liveness/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
diskStorage "github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
Expand Down Expand Up @@ -314,6 +315,37 @@ type NodeLivenessOptions struct {
Cache *Cache
}

// NewTestNodeLiveness creates a NodeLiveness for testing. It will always return
// true when asked if a node is live and doesn't depend on gossip.
func NewTestNodeLiveness(
stopper *stop.Stopper, clock *hlc.Clock, st *cluster.Settings,
) *NodeLiveness {
cache := Cache{
clock: clock,
nodeDialer: nil,
st: st,
isTestMode: true,
gossip: &gossip.Gossip{},
}

return &NodeLiveness{
ambientCtx: log.AmbientContext{},
stopper: stopper,
clock: clock,
storage: nil, // storage should never be accessed.
livenessThreshold: 2 * time.Minute,
renewalDuration: time.Minute,
selfSem: make(chan struct{}, 1),
otherSem: make(chan struct{}, 1),
heartbeatToken: make(chan struct{}, 1),
onNodeDecommissioned: nil,
onNodeDecommissioning: nil,
engines: []diskstorage.Engine{},
onSelfHeartbeat: nil,
cache: &cache,
}
}

// NewNodeLiveness returns a new instance of NodeLiveness configured
// with the specified gossip instance.
func NewNodeLiveness(opts NodeLivenessOptions) *NodeLiveness {
Expand Down
5 changes: 0 additions & 5 deletions pkg/kv/kvserver/replica_gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,6 @@ func replicaIsSuspect(repl *Replica) bool {
return true
}

// NodeLiveness can be nil in tests/benchmarks.
if repl.store.cfg.NodeLiveness == nil {
return false
}

// If a replica doesn't have an active raft group, we should check whether
// or not the node is active. If not, we should consider the replica suspect
// because it has probably already been removed from its raft group but
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,8 +796,7 @@ func (r *Replica) GetRangeLeaseDuration() time.Duration {
// shouldUseExpirationLeaseRLocked. We can merge these once there are no more
// callers: when expiration leases don't quiesce and are always eagerly renewed.
func (r *Replica) requiresExpirationLeaseRLocked() bool {
return r.store.cfg.NodeLiveness == nil ||
r.shMu.state.Desc.StartKey.Less(roachpb.RKey(keys.NodeLivenessKeyMax))
return r.shMu.state.Desc.StartKey.Less(roachpb.RKey(keys.NodeLivenessKeyMax))
}

// shouldUseExpirationLeaseRLocked returns true if this range should be using an
Expand Down
8 changes: 3 additions & 5 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,11 +602,9 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica
updateFn()
})
}
if nl := store.cfg.NodeLiveness; nl != nil { // node liveness is nil for some unittests
nl.RegisterCallback(func(_ livenesspb.Liveness) {
updateFn()
})
}
store.cfg.NodeLiveness.RegisterCallback(func(_ livenesspb.Liveness) {
updateFn()
})

return rq
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2329,9 +2329,7 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {

// Register a callback to unquiesce any ranges with replicas on a
// node transitioning from non-live to live.
if s.cfg.NodeLiveness != nil {
s.cfg.NodeLiveness.RegisterCallback(s.nodeIsLiveCallback)
}
s.cfg.NodeLiveness.RegisterCallback(s.nodeIsLiveCallback)

// SystemConfigProvider can be nil during some tests.
if scp := s.cfg.SystemConfigProvider; scp != nil {
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,9 +805,7 @@ func (s *Store) raftTickLoop(ctx context.Context) {
select {
case <-ticker.C:
// Update the liveness map.
if s.cfg.NodeLiveness != nil {
s.updateLivenessMap()
}
s.updateLivenessMap()
s.updateIOThresholdMap()

s.unquiescedReplicas.Lock()
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
Expand Down Expand Up @@ -273,6 +274,7 @@ func createTestStoreWithoutStart(
}, ds)
require.Nil(t, cfg.DB)
cfg.DB = kv.NewDB(cfg.AmbientCtx, txnCoordSenderFactory, cfg.Clock, stopper)
cfg.NodeLiveness = liveness.NewTestNodeLiveness(stopper, cfg.Clock, cfg.Settings)
store := NewStore(ctx, *cfg, eng, nodeDesc)
storeSender.Sender = store

Expand Down

0 comments on commit d34af36

Please sign in to comment.