Skip to content

Commit

Permalink
server: Fix NPE in spanStatsFanOut
Browse files Browse the repository at this point in the history
a NPE was surfaced when doing a span stats request on an unfinalized
(mixed version) cluster.

To fix, defensive checks are added to defend against potential nil
responses and references to non-existant map entries for a request
span stat

Fixes: cockroachdb#132130
Release note (bug fix): Fixes a bug where a span stats request on a
mixed version cluster resulted in an NPE
  • Loading branch information
kyle-a-wong committed Oct 12, 2024
1 parent 30dbb17 commit abac489
Show file tree
Hide file tree
Showing 3 changed files with 282 additions and 26 deletions.
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,7 @@ go_test(
"//pkg/sql/tablemetadatacache/util",
"//pkg/storage",
"//pkg/storage/disk",
"//pkg/storage/enginepb",
"//pkg/storage/fs",
"//pkg/testutils",
"//pkg/testutils/datapathutils",
Expand Down
68 changes: 42 additions & 26 deletions pkg/server/span_stats_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ func (s *systemStatusServer) spanStatsFanOut(
res.SpanToStats[sp.String()] = &roachpb.SpanStats{}
}

responses := make(map[string]struct{})

spansPerNode, err := s.getSpansPerNode(ctx, req)
if err != nil {
return nil, err
Expand Down Expand Up @@ -127,7 +125,37 @@ func (s *systemStatusServer) spanStatsFanOut(
return resp, err
}

responseFn := func(nodeID roachpb.NodeID, resp interface{}) {
errorFn := func(nodeID roachpb.NodeID, err error) {
log.Errorf(ctx, nodeErrorMsgPlaceholder, nodeID, err)
errorMessage := fmt.Sprintf("%v", err)
res.Errors = append(res.Errors, errorMessage)
}

timeout := SpanStatsNodeTimeout.Get(&s.st.SV)
if err := iterateNodes(
ctx,
s.serverIterator,
s.stopper,
"iterating nodes for span stats",
timeout,
smartDial,
nodeFn,
collectSpanStatsResponses(ctx, res),
errorFn,
); err != nil {
return nil, err
}

return res, nil
}

// collectSpanStatsResponses takes a *roachpb.SpanStatsResponse and creates a closure around it
// to provide as a callback to fanned out SpanStats requests.
func collectSpanStatsResponses(
ctx context.Context, res *roachpb.SpanStatsResponse,
) func(nodeID roachpb.NodeID, resp interface{}) {
responses := make(map[string]struct{})
return func(nodeID roachpb.NodeID, resp interface{}) {
// Noop if nil response (returned from skipped node).
if resp == nil {
return
Expand All @@ -143,6 +171,17 @@ func (s *systemStatusServer) spanStatsFanOut(
// MVCC stats from the leaseholder, MVCC stats are taken from the node that responded first.
// See #108779.
for spanStr, spanStats := range nodeResponse.SpanToStats {
if spanStats == nil {
log.Errorf(ctx, "Span stats for %s from node response is nil", spanStr)
continue
}

_, ok := res.SpanToStats[spanStr]
if !ok {
log.Warningf(ctx, "Received Span not in original request: %s", spanStr)
res.SpanToStats[spanStr] = &roachpb.SpanStats{}
}

// Accumulate physical values across all replicas:
res.SpanToStats[spanStr].ApproximateTotalStats.Add(spanStats.TotalStats)
res.SpanToStats[spanStr].ApproximateDiskBytes += spanStats.ApproximateDiskBytes
Expand All @@ -160,29 +199,6 @@ func (s *systemStatusServer) spanStatsFanOut(
}
}
}

errorFn := func(nodeID roachpb.NodeID, err error) {
log.Errorf(ctx, nodeErrorMsgPlaceholder, nodeID, err)
errorMessage := fmt.Sprintf("%v", err)
res.Errors = append(res.Errors, errorMessage)
}

timeout := SpanStatsNodeTimeout.Get(&s.st.SV)
if err := iterateNodes(
ctx,
s.serverIterator,
s.stopper,
"iterating nodes for span stats",
timeout,
smartDial,
nodeFn,
responseFn,
errorFn,
); err != nil {
return nil, err
}

return res, nil
}

func (s *systemStatusServer) getLocalStats(
Expand Down
239 changes: 239 additions & 0 deletions pkg/server/span_stats_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -102,3 +103,241 @@ func TestSpanStatsBatching(t *testing.T) {
}

}

func TestCollectSpanStatsResponses(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

type nodeResponse struct {
nodeID roachpb.NodeID
resp *roachpb.SpanStatsResponse
}
type testCase struct {
actualRes *roachpb.SpanStatsResponse
nodeResponses []nodeResponse
expectedRes *roachpb.SpanStatsResponse
}

var testCases []testCase

// test case 1
tc1 := testCase{
actualRes: createSpanStatsResponse("span1", "span2"),
nodeResponses: []nodeResponse{
{nodeID: 1, resp: &roachpb.SpanStatsResponse{
SpanToStats: map[string]*roachpb.SpanStats{
"span1": {RangeCount: 1,
ApproximateDiskBytes: 1,
RemoteFileBytes: 1,
ExternalFileBytes: 1,
StoreIDs: []roachpb.StoreID{1},
ReplicaCount: 1,
},
},
}},
{nodeID: 2, resp: &roachpb.SpanStatsResponse{
SpanToStats: map[string]*roachpb.SpanStats{
"span2": {RangeCount: 1,
ApproximateDiskBytes: 1,
RemoteFileBytes: 1,
ExternalFileBytes: 1,
StoreIDs: []roachpb.StoreID{2},
ReplicaCount: 1,
},
},
}},
},
expectedRes: &roachpb.SpanStatsResponse{
SpanToStats: map[string]*roachpb.SpanStats{
"span1": {RangeCount: 1,
ApproximateDiskBytes: 1,
RemoteFileBytes: 1,
ExternalFileBytes: 1,
StoreIDs: []roachpb.StoreID{1},
ReplicaCount: 1,
},
"span2": {RangeCount: 1,
ApproximateDiskBytes: 1,
RemoteFileBytes: 1,
ExternalFileBytes: 1,
StoreIDs: []roachpb.StoreID{2},
ReplicaCount: 1,
},
},
},
}
testCases = append(testCases, tc1)

// test case 2 - span is replicated in node 1 and 2
totalStats1 := enginepb.MVCCStats{LiveBytes: 1}
totalStats2 := enginepb.MVCCStats{LiveBytes: 2}
expectedApproxTotalStats := enginepb.MVCCStats{}
expectedApproxTotalStats.Add(totalStats1)
expectedApproxTotalStats.Add(totalStats2)
tc2 := testCase{
actualRes: createSpanStatsResponse("span1", "span2"),
nodeResponses: []nodeResponse{
{nodeID: 1, resp: &roachpb.SpanStatsResponse{
SpanToStats: map[string]*roachpb.SpanStats{
"span1": {
TotalStats: totalStats1,
RangeCount: 1,
ApproximateDiskBytes: 1,
RemoteFileBytes: 1,
ExternalFileBytes: 1,
StoreIDs: []roachpb.StoreID{1, 2},
ReplicaCount: 1,
},
},
}},
{nodeID: 2, resp: &roachpb.SpanStatsResponse{
SpanToStats: map[string]*roachpb.SpanStats{
"span1": {
TotalStats: totalStats2,
RangeCount: 1,
ApproximateDiskBytes: 1,
RemoteFileBytes: 1,
ExternalFileBytes: 1,
StoreIDs: []roachpb.StoreID{2},
ReplicaCount: 1,
},
},
}},
},
expectedRes: &roachpb.SpanStatsResponse{
SpanToStats: map[string]*roachpb.SpanStats{
"span1": {
TotalStats: totalStats1,
ApproximateTotalStats: expectedApproxTotalStats,
RangeCount: 1,
ApproximateDiskBytes: 2,
RemoteFileBytes: 2,
ExternalFileBytes: 2,
StoreIDs: []roachpb.StoreID{1, 2},
ReplicaCount: 1,
},
},
},
}
testCases = append(testCases, tc2)

// test case 3 - node response spanToStats value is nil for span2
tc3 := testCase{
actualRes: createSpanStatsResponse("span1", "span2"),
nodeResponses: []nodeResponse{
{nodeID: 1, resp: &roachpb.SpanStatsResponse{
SpanToStats: map[string]*roachpb.SpanStats{
"span1": {
RangeCount: 1,
ApproximateDiskBytes: 1,
RemoteFileBytes: 1,
ExternalFileBytes: 1,
StoreIDs: []roachpb.StoreID{1},
ReplicaCount: 1,
},
},
}},
{nodeID: 2, resp: &roachpb.SpanStatsResponse{
SpanToStats: map[string]*roachpb.SpanStats{
"span2": nil,
},
}},
},
expectedRes: &roachpb.SpanStatsResponse{
SpanToStats: map[string]*roachpb.SpanStats{
"span1": {
RangeCount: 1,
ApproximateDiskBytes: 1,
RemoteFileBytes: 1,
ExternalFileBytes: 1,
StoreIDs: []roachpb.StoreID{1},
ReplicaCount: 1,
},
"span2": {},
},
},
}
testCases = append(testCases, tc3)

// test case 4 - node response contains span not in original response struct
tc4 := testCase{
actualRes: createSpanStatsResponse("span1"),
nodeResponses: []nodeResponse{
{nodeID: 1, resp: &roachpb.SpanStatsResponse{
SpanToStats: map[string]*roachpb.SpanStats{
"span1": {
RangeCount: 1,
ApproximateDiskBytes: 1,
RemoteFileBytes: 1,
ExternalFileBytes: 1,
StoreIDs: []roachpb.StoreID{1},
ReplicaCount: 1,
},
},
}},
{nodeID: 2, resp: &roachpb.SpanStatsResponse{
SpanToStats: map[string]*roachpb.SpanStats{
"span2": {
RangeCount: 1,
ApproximateDiskBytes: 1,
RemoteFileBytes: 1,
ExternalFileBytes: 1,
StoreIDs: []roachpb.StoreID{2},
ReplicaCount: 1,
},
},
}},
},
expectedRes: &roachpb.SpanStatsResponse{
SpanToStats: map[string]*roachpb.SpanStats{
"span1": {
RangeCount: 1,
ApproximateDiskBytes: 1,
RemoteFileBytes: 1,
ExternalFileBytes: 1,
StoreIDs: []roachpb.StoreID{1},
ReplicaCount: 1,
},
"span2": {
RangeCount: 1,
ApproximateDiskBytes: 1,
RemoteFileBytes: 1,
ExternalFileBytes: 1,
StoreIDs: []roachpb.StoreID{2},
ReplicaCount: 1,
},
},
},
}
testCases = append(testCases, tc4)

ctx := context.Background()
for _, tc := range testCases {
cb := collectSpanStatsResponses(ctx, tc.actualRes)
for _, nodeResp := range tc.nodeResponses {
cb(nodeResp.nodeID, nodeResp.resp)
}

for spanStr, spanStats := range tc.expectedRes.SpanToStats {
actualSpanStat, ok := tc.actualRes.SpanToStats[spanStr]
require.True(t, ok)
require.NotNil(t, actualSpanStat)
require.Equal(t, spanStats.TotalStats, actualSpanStat.TotalStats)
require.Equal(t, spanStats.RangeCount, actualSpanStat.RangeCount)
require.Equal(t, spanStats.ApproximateDiskBytes, actualSpanStat.ApproximateDiskBytes)
require.Equal(t, spanStats.RemoteFileBytes, actualSpanStat.RemoteFileBytes)
require.Equal(t, spanStats.ExternalFileBytes, actualSpanStat.ExternalFileBytes)
require.Equal(t, spanStats.StoreIDs, actualSpanStat.StoreIDs)
require.Equal(t, spanStats.ReplicaCount, actualSpanStat.ReplicaCount)
}
}
}

func createSpanStatsResponse(spanStrs ...string) *roachpb.SpanStatsResponse {
resp := &roachpb.SpanStatsResponse{}
resp.SpanToStats = make(map[string]*roachpb.SpanStats)
for _, str := range spanStrs {
resp.SpanToStats[str] = &roachpb.SpanStats{}
}
return resp
}

0 comments on commit abac489

Please sign in to comment.