Skip to content

Commit

Permalink
kvserver: clear rac2 token metrics prior to integration testing
Browse files Browse the repository at this point in the history
`TestFlowControl.*V2` tests assert on exact counters. This can be
problematic if benign deltas occur while setting up the test, such a
send queue forming when adding a new learner, but being quickly
resolved.

Clear the token metrics prior to commencing these tests, in order to
prevent flakes that result from such deltas in setup.

Fixes: #132642
Release note: None
  • Loading branch information
kvoli committed Oct 18, 2024
1 parent 8ef2e0f commit f488847
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ go_test(
"//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch",
"//pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb",
"//pkg/kv/kvserver/kvflowcontrol/node_rac2",
"//pkg/kv/kvserver/kvflowcontrol/rac2",
"//pkg/kv/kvserver/kvflowcontrol/replica_rac2",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/kvserverpb",
Expand Down
62 changes: 62 additions & 0 deletions pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -2253,6 +2254,10 @@ func TestFlowControlBasicV2(t *testing.T) {
n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0))

h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics()

h.comment(`-- Flow token metrics, before issuing the 1MiB replicated write.`)
h.query(n1, v2FlowTokensQueryStr)
Expand Down Expand Up @@ -2342,6 +2347,10 @@ func TestFlowControlRangeSplitMergeV2(t *testing.T) {
require.NoError(t, err)

h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics()
h.log("sending put request to pre-split range")
h.put(ctx, k, 1<<20 /* 1MiB */, testFlowModeToPri(mode))
h.log("sent put request to pre-split range")
Expand Down Expand Up @@ -2464,6 +2473,10 @@ func TestFlowControlBlockedAdmissionV2(t *testing.T) {
require.NoError(t, err)
h.enableVerboseRaftMsgLoggingForRange(desc.RangeID)
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics()

h.comment(`-- (Issuing 5 1MiB, 3x replicated write that's not admitted.)`)
h.log("sending put requests")
Expand Down Expand Up @@ -2579,6 +2592,10 @@ func TestFlowControlAdmissionPostSplitMergeV2(t *testing.T) {
require.NoError(t, err)

h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics()

h.log("sending put request to pre-split range")
h.put(ctx, k, 1<<20 /* 1MiB */, testFlowModeToPri(mode))
Expand Down Expand Up @@ -2722,6 +2739,10 @@ func TestFlowControlCrashedNodeV2(t *testing.T) {
require.NoError(t, err)
tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(0))
h.waitForConnectedStreams(ctx, desc.RangeID, 2, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics()

h.comment(`-- (Issuing 5x1MiB, 2x replicated writes that are not admitted.)`)
h.log("sending put requests")
Expand Down Expand Up @@ -2870,6 +2891,10 @@ func TestFlowControlRaftSnapshotV2(t *testing.T) {
repl := store.LookupReplica(roachpb.RKey(k))
require.NotNil(t, repl)
h.waitForConnectedStreams(ctx, repl.RangeID, 5, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics()

// Set up a key to replicate across the cluster. We're going to modify this
// key and truncate the raft logs from that command after killing one of the
Expand Down Expand Up @@ -3085,6 +3110,10 @@ func TestFlowControlRaftMembershipV2(t *testing.T) {
desc, err := tc.LookupRange(k)
require.NoError(t, err)
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics()

h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`)
h.put(ctx, k, 1<<20 /* 1MiB */, testFlowModeToPri(mode))
Expand Down Expand Up @@ -3224,6 +3253,10 @@ func TestFlowControlRaftMembershipRemoveSelfV2(t *testing.T) {
// Make sure the lease is on n1 and that we're triply connected.
tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(0))
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics()

h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`)
h.put(ctx, k, 1<<20 /* 1MiB */, testFlowModeToPri(mode))
Expand Down Expand Up @@ -3353,6 +3386,10 @@ func TestFlowControlClassPrioritizationV2(t *testing.T) {
desc, err := tc.LookupRange(k)
require.NoError(t, err)
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics()

h.comment(`-- (Issuing 1x1MiB, 3x replicated elastic write that's not admitted.)`)
h.put(ctx, k, 1<<20 /* 1MiB */, testFlowModeToPri(mode))
Expand Down Expand Up @@ -3469,6 +3506,10 @@ func TestFlowControlUnquiescedRangeV2(t *testing.T) {
n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0))

h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics()

h.comment(`-- (Issuing 1x1MiB, 3x replicated elastic write that's not admitted.)`)
h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.BulkNormalPri)
Expand Down Expand Up @@ -3571,6 +3612,10 @@ func TestFlowControlTransferLeaseV2(t *testing.T) {
desc, err := tc.LookupRange(k)
require.NoError(t, err)
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics()

h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`)
h.put(ctx, k, 1<<20 /* 1MiB */, testFlowModeToPri(mode))
Expand Down Expand Up @@ -3664,6 +3709,10 @@ func TestFlowControlLeaderNotLeaseholderV2(t *testing.T) {
desc, err := tc.LookupRange(k)
require.NoError(t, err)
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics()

h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`)
h.put(ctx, k, 1<<20 /* 1MiB */, testFlowModeToPri(mode))
Expand Down Expand Up @@ -3780,6 +3829,10 @@ func TestFlowControlGranterAdmitOneByOneV2(t *testing.T) {
desc, err := tc.LookupRange(k)
require.NoError(t, err)
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics()

h.comment(`-- (Issuing 1024*1KiB, 3x replicated writes that are not admitted.)`)
h.log("sending put requests")
Expand Down Expand Up @@ -4865,6 +4918,15 @@ func (h *flowControlTestHelper) enableVerboseRaftMsgLoggingForRange(rangeID roac
}
}

func (h *flowControlTestHelper) resetV2TokenMetrics() {
for _, server := range h.tc.Servers {
require.NoError(h.t, server.GetStores().(*kvserver.Stores).VisitStores(func(s *kvserver.Store) error {
s.GetStoreConfig().KVFlowStreamTokenProvider.Metrics().(*rac2.TokenMetrics).TestingClear()
return nil
}))
}
}

// makeV2EnabledTestFileName is a utility function which returns an updated
// filename for the testdata file based on the v2EnabledWhenLeaderLevel.
func makeV2EnabledTestFileName(
Expand Down
30 changes: 30 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,36 @@ func NewTokenMetrics() *TokenMetrics {
return m
}

// TestingClear is used in tests to reset the metrics.
func (m *TokenMetrics) TestingClear() {
for _, typ := range []TokenType{
EvalToken,
SendToken,
} {
for _, wc := range []admissionpb.WorkClass{
admissionpb.RegularWorkClass,
admissionpb.ElasticWorkClass,
} {
m.CounterMetrics[typ].Deducted[wc].Clear()
m.CounterMetrics[typ].Returned[wc].Clear()
m.CounterMetrics[typ].Unaccounted[wc].Clear()
m.CounterMetrics[typ].Disconnected[wc].Clear()
if typ == SendToken {
m.CounterMetrics[typ].SendQueue[0].ForceFlushDeducted.Clear()
for _, wc := range []admissionpb.WorkClass{
admissionpb.RegularWorkClass,
admissionpb.ElasticWorkClass,
} {
m.CounterMetrics[typ].SendQueue[0].PreventionDeducted[wc].Clear()
}
}
m.StreamMetrics[typ].Count[wc].Reset()
m.StreamMetrics[typ].BlockedCount[wc].Reset()
m.StreamMetrics[typ].TokensAvailable[wc].Reset()
}
}
}

type TokenCounterMetrics struct {
Deducted [admissionpb.NumWorkClasses]*metric.Counter
Returned [admissionpb.NumWorkClasses]*metric.Counter
Expand Down

0 comments on commit f488847

Please sign in to comment.