diff --git a/pkg/kv/kvserver/consistency_queue.go b/pkg/kv/kvserver/consistency_queue.go index 59ad7056fe93..44615a1e72b4 100644 --- a/pkg/kv/kvserver/consistency_queue.go +++ b/pkg/kv/kvserver/consistency_queue.go @@ -125,11 +125,7 @@ func (q *consistencyQueue) shouldQueue( return repl.getQueueLastProcessed(ctx, q.name) }, isNodeAvailable: func(nodeID roachpb.NodeID) bool { - if repl.store.cfg.NodeLiveness != nil { - return repl.store.cfg.NodeLiveness.GetNodeVitalityFromCache(nodeID).IsLive(livenesspb.ConsistencyQueue) - } - // Some tests run without a NodeLiveness configured. - return true + return repl.store.cfg.NodeLiveness.GetNodeVitalityFromCache(nodeID).IsLive(livenesspb.ConsistencyQueue) }, disableLastProcessedCheck: repl.store.cfg.TestingKnobs.DisableLastProcessedCheck, interval: q.interval(), diff --git a/pkg/kv/kvserver/liveness/cache.go b/pkg/kv/kvserver/liveness/cache.go index f424d0da46a8..a0d8fe5515c9 100644 --- a/pkg/kv/kvserver/liveness/cache.go +++ b/pkg/kv/kvserver/liveness/cache.go @@ -49,6 +49,7 @@ type Cache struct { clock *hlc.Clock nodeDialer *nodedialer.Dialer st *cluster.Settings + isTestMode bool mu struct { syncutil.RWMutex // lastNodeUpdate stores timestamps of StoreDescriptor updates in Gossip. @@ -214,6 +215,18 @@ func (c *Cache) self() (_ Record, ok bool) { // includes the raw, encoded value that the database has for this liveness // record in addition to the decoded liveness proto. func (c *Cache) getLiveness(nodeID roachpb.NodeID) (_ Record, ok bool) { + // In test mode, create a node vitality that is always live. + if c.isTestMode { + now := c.clock.Now() + return Record{ + Liveness: livenesspb.Liveness{ + NodeID: nodeID, + Epoch: 1, + Expiration: now.AddDuration(time.Minute).ToLegacyTimestamp(), + Draining: false, + Membership: livenesspb.MembershipStatus_ACTIVE, + }}, true + } c.mu.RLock() defer c.mu.RUnlock() if l, ok := c.mu.nodes[nodeID]; ok { diff --git a/pkg/kv/kvserver/liveness/liveness.go b/pkg/kv/kvserver/liveness/liveness.go index 8bf0ec034fa9..c6ce9ce379fb 100644 --- a/pkg/kv/kvserver/liveness/liveness.go +++ b/pkg/kv/kvserver/liveness/liveness.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" diskStorage "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" @@ -314,6 +315,37 @@ type NodeLivenessOptions struct { Cache *Cache } +// NewTestNodeLiveness creates a NodeLiveness for testing. It will always return +// true when asked if a node is live and doesn't depend on gossip. +func NewTestNodeLiveness( + stopper *stop.Stopper, clock *hlc.Clock, st *cluster.Settings, +) *NodeLiveness { + cache := Cache{ + clock: clock, + nodeDialer: nil, + st: st, + isTestMode: true, + gossip: &gossip.Gossip{}, + } + + return &NodeLiveness{ + ambientCtx: log.AmbientContext{}, + stopper: stopper, + clock: clock, + storage: nil, // storage should never be accessed. + livenessThreshold: 2 * time.Minute, + renewalDuration: time.Minute, + selfSem: make(chan struct{}, 1), + otherSem: make(chan struct{}, 1), + heartbeatToken: make(chan struct{}, 1), + onNodeDecommissioned: nil, + onNodeDecommissioning: nil, + engines: []diskstorage.Engine{}, + onSelfHeartbeat: nil, + cache: &cache, + } +} + // NewNodeLiveness returns a new instance of NodeLiveness configured // with the specified gossip instance. func NewNodeLiveness(opts NodeLivenessOptions) *NodeLiveness { diff --git a/pkg/kv/kvserver/replica_gc_queue.go b/pkg/kv/kvserver/replica_gc_queue.go index 24376896fcd2..645d28d3acd7 100644 --- a/pkg/kv/kvserver/replica_gc_queue.go +++ b/pkg/kv/kvserver/replica_gc_queue.go @@ -149,11 +149,6 @@ func replicaIsSuspect(repl *Replica) bool { return true } - // NodeLiveness can be nil in tests/benchmarks. - if repl.store.cfg.NodeLiveness == nil { - return false - } - // If a replica doesn't have an active raft group, we should check whether // or not the node is active. If not, we should consider the replica suspect // because it has probably already been removed from its raft group but diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 258315c38752..4d9facb8ae80 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -796,8 +796,7 @@ func (r *Replica) GetRangeLeaseDuration() time.Duration { // shouldUseExpirationLeaseRLocked. We can merge these once there are no more // callers: when expiration leases don't quiesce and are always eagerly renewed. func (r *Replica) requiresExpirationLeaseRLocked() bool { - return r.store.cfg.NodeLiveness == nil || - r.shMu.state.Desc.StartKey.Less(roachpb.RKey(keys.NodeLivenessKeyMax)) + return r.shMu.state.Desc.StartKey.Less(roachpb.RKey(keys.NodeLivenessKeyMax)) } // shouldUseExpirationLeaseRLocked returns true if this range should be using an diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 69e433280913..299972d64078 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -602,11 +602,9 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica updateFn() }) } - if nl := store.cfg.NodeLiveness; nl != nil { // node liveness is nil for some unittests - nl.RegisterCallback(func(_ livenesspb.Liveness) { - updateFn() - }) - } + store.cfg.NodeLiveness.RegisterCallback(func(_ livenesspb.Liveness) { + updateFn() + }) return rq } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 6484b104f8ea..6c1dbded4599 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2329,9 +2329,7 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { // Register a callback to unquiesce any ranges with replicas on a // node transitioning from non-live to live. - if s.cfg.NodeLiveness != nil { - s.cfg.NodeLiveness.RegisterCallback(s.nodeIsLiveCallback) - } + s.cfg.NodeLiveness.RegisterCallback(s.nodeIsLiveCallback) // SystemConfigProvider can be nil during some tests. if scp := s.cfg.SystemConfigProvider; scp != nil { diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 79977d9d2846..e3aca39025fa 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -805,9 +805,7 @@ func (s *Store) raftTickLoop(ctx context.Context) { select { case <-ticker.C: // Update the liveness map. - if s.cfg.NodeLiveness != nil { - s.updateLivenessMap() - } + s.updateLivenessMap() s.updateIOThresholdMap() s.unquiescedReplicas.Lock() diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 7fc556807cef..221bc278f894 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -38,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" @@ -273,6 +274,7 @@ func createTestStoreWithoutStart( }, ds) require.Nil(t, cfg.DB) cfg.DB = kv.NewDB(cfg.AmbientCtx, txnCoordSenderFactory, cfg.Clock, stopper) + cfg.NodeLiveness = liveness.NewTestNodeLiveness(stopper, cfg.Clock, cfg.Settings) store := NewStore(ctx, *cfg, eng, nodeDesc) storeSender.Sender = store