From 44abfd0a8290109e41a37c7c5dee797ad5690863 Mon Sep 17 00:00:00 2001 From: RyanBlaney Date: Sun, 1 Dec 2024 14:13:54 -0800 Subject: [PATCH 1/6] Improved sequencing Documentation for the stats events and added tests to verify the order of server-side events. --- stats/stats.go | 56 +++-- stats/stats_test.go | 567 +++++++++++++++++++++++++++++++++----------- 2 files changed, 463 insertions(+), 160 deletions(-) diff --git a/stats/stats.go b/stats/stats.go index 6f20d2d54868..4410be7e1596 100644 --- a/stats/stats.go +++ b/stats/stats.go @@ -36,7 +36,38 @@ type RPCStats interface { IsClient() bool } +// InHeader is the first event handled in the RPC lifecycle. +// It contains stats when the header is received. +// This event marks the start of processing for incoming RPCs +// and must be completed before any other events occur. +type InHeader struct { + // Client is true if this InHeader is from client side. + Client bool + // WireLength is the wire length of header. + WireLength int + // Compression is the compression algorithm used for the RPC. + Compression string + // Header contains the header metadata received. + Header metadata.MD + + // The following fields are valid only if Client is false. + // FullMethod is the full RPC method string, i.e., /package.service/method. + FullMethod string + // RemoteAddr is the remote address of the corresponding connection. + RemoteAddr net.Addr + // LocalAddr is the local address of the corresponding connection. + LocalAddr net.Addr +} + +// IsClient indicates if the stats information is from client side. +func (s *InHeader) IsClient() bool { return s.Client } + +func (s *InHeader) isRPCStats() {} + // Begin contains stats when an RPC attempt begins. +// This event is called AFTER the InHeader event, as headers must +// be processed before the RPC lifecycle begins +// // FailFast is only valid if this Begin is from client side. type Begin struct { // Client is true if this Begin is from client side. @@ -98,31 +129,6 @@ func (s *InPayload) IsClient() bool { return s.Client } func (s *InPayload) isRPCStats() {} -// InHeader contains stats when a header is received. -type InHeader struct { - // Client is true if this InHeader is from client side. - Client bool - // WireLength is the wire length of header. - WireLength int - // Compression is the compression algorithm used for the RPC. - Compression string - // Header contains the header metadata received. - Header metadata.MD - - // The following fields are valid only if Client is false. - // FullMethod is the full RPC method string, i.e., /package.service/method. - FullMethod string - // RemoteAddr is the remote address of the corresponding connection. - RemoteAddr net.Addr - // LocalAddr is the local address of the corresponding connection. - LocalAddr net.Addr -} - -// IsClient indicates if the stats information is from client side. -func (s *InHeader) IsClient() bool { return s.Client } - -func (s *InHeader) isRPCStats() {} - // InTrailer contains stats when a trailer is received. type InTrailer struct { // Client is true if this InTrailer is from client side. diff --git a/stats/stats_test.go b/stats/stats_test.go index ec5ffa042f47..3c28642138d4 100644 --- a/stats/stats_test.go +++ b/stats/stats_test.go @@ -59,8 +59,10 @@ func init() { grpc.EnableTracing = false } -type connCtxKey struct{} -type rpcCtxKey struct{} +type ( + connCtxKey struct{} + rpcCtxKey struct{} +) var ( // For headers sent to server: @@ -81,6 +83,35 @@ var ( } // The id for which the service handler should return error. errorID int32 = 32202 + + // Ensure that Unary RPC server stats events are logged in the correct order. + expectedUnarySequence = []string{ + "ConnStats", + "InHeader", + "Begin", + "InPayload", + "OutHeader", + "OutPayload", + "OutTrailer", + "End", + } + + // Ensure that the sequence of server-side stats events for a Unary RPC + // matches the expected flow. + expectedClientStreamSequence = []string{ + "ConnStats", + "InHeader", + "Begin", + "OutHeader", + "InPayload", + "InPayload", + "InPayload", + "InPayload", + "InPayload", + "OutPayload", + "OutTrailer", + "End", + } ) func idToPayload(id int32) *testpb.Payload { @@ -119,12 +150,25 @@ type testServer struct { testgrpc.UnimplementedTestServiceServer } -func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { +func (s *testServer) UnaryCall( + ctx context.Context, + in *testpb.SimpleRequest, +) (*testpb.SimpleResponse, error) { if err := grpc.SendHeader(ctx, testHeaderMetadata); err != nil { - return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want ", testHeaderMetadata, err) + return nil, status.Errorf( + status.Code(err), + "grpc.SendHeader(_, %v) = %v, want ", + testHeaderMetadata, + err, + ) } if err := grpc.SetTrailer(ctx, testTrailerMetadata); err != nil { - return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want ", testTrailerMetadata, err) + return nil, status.Errorf( + status.Code(err), + "grpc.SetTrailer(_, %v) = %v, want ", + testTrailerMetadata, + err, + ) } if id := payloadToID(in.Payload); id == errorID { @@ -136,7 +180,14 @@ func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (* func (s *testServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallServer) error { if err := stream.SendHeader(testHeaderMetadata); err != nil { - return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, testHeaderMetadata, err, nil) + return status.Errorf( + status.Code(err), + "%v.SendHeader(%v) = %v, want %v", + stream, + testHeaderMetadata, + err, + nil, + ) } stream.SetTrailer(testTrailerMetadata) for { @@ -159,9 +210,18 @@ func (s *testServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallSe } } -func (s *testServer) StreamingInputCall(stream testgrpc.TestService_StreamingInputCallServer) error { +func (s *testServer) StreamingInputCall( + stream testgrpc.TestService_StreamingInputCallServer, +) error { if err := stream.SendHeader(testHeaderMetadata); err != nil { - return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, testHeaderMetadata, err, nil) + return status.Errorf( + status.Code(err), + "%v.SendHeader(%v) = %v, want %v", + stream, + testHeaderMetadata, + err, + nil, + ) } stream.SetTrailer(testTrailerMetadata) for { @@ -180,9 +240,19 @@ func (s *testServer) StreamingInputCall(stream testgrpc.TestService_StreamingInp } } -func (s *testServer) StreamingOutputCall(in *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error { +func (s *testServer) StreamingOutputCall( + in *testpb.StreamingOutputCallRequest, + stream testgrpc.TestService_StreamingOutputCallServer, +) error { if err := stream.SendHeader(testHeaderMetadata); err != nil { - return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, testHeaderMetadata, err, nil) + return status.Errorf( + status.Code(err), + "%v.SendHeader(%v) = %v, want %v", + stream, + testHeaderMetadata, + err, + nil, + ) } stream.SetTrailer(testTrailerMetadata) @@ -326,7 +396,11 @@ func (te *test) doUnaryCall(c *rpcConfig) (*testpb.SimpleRequest, *testpb.Simple tCtx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - resp, err = tc.UnaryCall(metadata.NewOutgoingContext(tCtx, testMetadata), req, grpc.WaitForReady(!c.failfast)) + resp, err = tc.UnaryCall( + metadata.NewOutgoingContext(tCtx, testMetadata), + req, + grpc.WaitForReady(!c.failfast), + ) return req, resp, err } @@ -339,7 +413,10 @@ func (te *test) doFullDuplexCallRoundtrip(c *rpcConfig) ([]proto.Message, []prot tc := testgrpc.NewTestServiceClient(te.clientConn()) tCtx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - stream, err := tc.FullDuplexCall(metadata.NewOutgoingContext(tCtx, testMetadata), grpc.WaitForReady(!c.failfast)) + stream, err := tc.FullDuplexCall( + metadata.NewOutgoingContext(tCtx, testMetadata), + grpc.WaitForReady(!c.failfast), + ) if err != nil { return reqs, resps, err } @@ -371,7 +448,9 @@ func (te *test) doFullDuplexCallRoundtrip(c *rpcConfig) ([]proto.Message, []prot return reqs, resps, nil } -func (te *test) doClientStreamCall(c *rpcConfig) ([]proto.Message, *testpb.StreamingInputCallResponse, error) { +func (te *test) doClientStreamCall( + c *rpcConfig, +) ([]proto.Message, *testpb.StreamingInputCallResponse, error) { var ( reqs []proto.Message resp *testpb.StreamingInputCallResponse @@ -380,7 +459,10 @@ func (te *test) doClientStreamCall(c *rpcConfig) ([]proto.Message, *testpb.Strea tc := testgrpc.NewTestServiceClient(te.clientConn()) tCtx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - stream, err := tc.StreamingInputCall(metadata.NewOutgoingContext(tCtx, testMetadata), grpc.WaitForReady(!c.failfast)) + stream, err := tc.StreamingInputCall( + metadata.NewOutgoingContext(tCtx, testMetadata), + grpc.WaitForReady(!c.failfast), + ) if err != nil { return reqs, resp, err } @@ -401,7 +483,9 @@ func (te *test) doClientStreamCall(c *rpcConfig) ([]proto.Message, *testpb.Strea return reqs, resp, err } -func (te *test) doServerStreamCall(c *rpcConfig) (*testpb.StreamingOutputCallRequest, []proto.Message, error) { +func (te *test) doServerStreamCall( + c *rpcConfig, +) (*testpb.StreamingOutputCallRequest, []proto.Message, error) { var ( req *testpb.StreamingOutputCallRequest resps []proto.Message @@ -417,7 +501,11 @@ func (te *test) doServerStreamCall(c *rpcConfig) (*testpb.StreamingOutputCallReq req = &testpb.StreamingOutputCallRequest{Payload: idToPayload(startID)} tCtx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - stream, err := tc.StreamingOutputCall(metadata.NewOutgoingContext(tCtx, testMetadata), req, grpc.WaitForReady(!c.failfast)) + stream, err := tc.StreamingOutputCall( + metadata.NewOutgoingContext(tCtx, testMetadata), + req, + grpc.WaitForReady(!c.failfast), + ) if err != nil { return req, resps, err } @@ -512,7 +600,12 @@ func checkInHeader(t *testing.T, d *gotData, e *expectedData) { // expected headers keys have the expected header values. for key := range testHeaderMetadata { if !reflect.DeepEqual(st.Header.Get(key), testHeaderMetadata.Get(key)) { - t.Fatalf("st.Header[%s] = %v, want %v", key, st.Header.Get(key), testHeaderMetadata.Get(key)) + t.Fatalf( + "st.Header[%s] = %v, want %v", + key, + st.Header.Get(key), + testHeaderMetadata.Get(key), + ) } } } else { @@ -636,7 +729,12 @@ func checkOutHeader(t *testing.T, d *gotData, e *expectedData) { // expected headers keys have the expected header values. for key := range testMetadata { if !reflect.DeepEqual(st.Header.Get(key), testMetadata.Get(key)) { - t.Fatalf("st.Header[%s] = %v, want %v", key, st.Header.Get(key), testMetadata.Get(key)) + t.Fatalf( + "st.Header[%s] = %v, want %v", + key, + st.Header.Get(key), + testMetadata.Get(key), + ) } } @@ -786,8 +884,14 @@ func checkConnEnd(t *testing.T, d *gotData) { st.IsClient() // TODO remove this. } +type event struct { + eventType string + timestamp time.Time +} + type statshandler struct { mu sync.Mutex + events []event gotRPC []*gotData gotConn []*gotData } @@ -800,13 +904,41 @@ func (h *statshandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) conte return context.WithValue(ctx, rpcCtxKey{}, info) } +// recordEvent records an event in the statshandler along with a timestamp. +func (h *statshandler) recordEvent(eventType string) { + h.mu.Lock() + defer h.mu.Unlock() + h.events = append(h.events, event{eventType: eventType, timestamp: time.Now()}) +} + func (h *statshandler) HandleConn(ctx context.Context, s stats.ConnStats) { + h.recordEvent("ConnStats") + h.mu.Lock() defer h.mu.Unlock() h.gotConn = append(h.gotConn, &gotData{ctx, s.IsClient(), s}) } func (h *statshandler) HandleRPC(ctx context.Context, s stats.RPCStats) { + switch s.(type) { + case *stats.Begin: + h.recordEvent("Begin") + case *stats.InHeader: + h.recordEvent("InHeader") + case *stats.InPayload: + h.recordEvent("InPayload") + case *stats.OutHeader: + h.recordEvent("OutHeader") + case *stats.OutPayload: + h.recordEvent("OutPayload") + case *stats.InTrailer: + h.recordEvent("InTrailer") + case *stats.OutTrailer: + h.recordEvent("OutTrailer") + case *stats.End: + h.recordEvent("End") + } + h.mu.Lock() defer h.mu.Unlock() h.gotRPC = append(h.gotRPC, &gotData{ctx, s.IsClient(), s}) @@ -825,7 +957,12 @@ func checkConnStats(t *testing.T, got []*gotData) { checkConnEnd(t, got[len(got)-1]) } -func checkServerStats(t *testing.T, got []*gotData, expect *expectedData, checkFuncs []func(t *testing.T, d *gotData, e *expectedData)) { +func checkServerStats( + t *testing.T, + got []*gotData, + expect *expectedData, + checkFuncs []func(t *testing.T, d *gotData, e *expectedData), +) { if len(got) != len(checkFuncs) { for i, g := range got { t.Errorf(" - %v, %T", i, g.s) @@ -838,7 +975,12 @@ func checkServerStats(t *testing.T, got []*gotData, expect *expectedData, checkF } } -func testServerStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs []func(t *testing.T, d *gotData, e *expectedData)) { +func testServerStats( + t *testing.T, + tc *testConfig, + cc *rpcConfig, + checkFuncs []func(t *testing.T, d *gotData, e *expectedData), +) { h := &statshandler{} te := newTest(t, tc, nil, []stats.Handler{h}) te.startServer(&testServer{}) @@ -927,26 +1069,36 @@ func testServerStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs []f } func (s) TestServerStatsUnaryRPC(t *testing.T) { - testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, callType: unaryRPC}, []func(t *testing.T, d *gotData, e *expectedData){ - checkInHeader, - checkBegin, - checkInPayload, - checkOutHeader, - checkOutPayload, - checkOutTrailer, - checkEnd, - }) + testServerStats( + t, + &testConfig{compress: ""}, + &rpcConfig{success: true, callType: unaryRPC}, + []func(t *testing.T, d *gotData, e *expectedData){ + checkInHeader, + checkBegin, + checkInPayload, + checkOutHeader, + checkOutPayload, + checkOutTrailer, + checkEnd, + }, + ) } func (s) TestServerStatsUnaryRPCError(t *testing.T) { - testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, callType: unaryRPC}, []func(t *testing.T, d *gotData, e *expectedData){ - checkInHeader, - checkBegin, - checkInPayload, - checkOutHeader, - checkOutTrailer, - checkEnd, - }) + testServerStats( + t, + &testConfig{compress: ""}, + &rpcConfig{success: false, callType: unaryRPC}, + []func(t *testing.T, d *gotData, e *expectedData){ + checkInHeader, + checkBegin, + checkInPayload, + checkOutHeader, + checkOutTrailer, + checkEnd, + }, + ) } func (s) TestServerStatsClientStreamRPC(t *testing.T) { @@ -967,19 +1119,29 @@ func (s) TestServerStatsClientStreamRPC(t *testing.T) { checkOutTrailer, checkEnd, ) - testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: clientStreamRPC}, checkFuncs) + testServerStats( + t, + &testConfig{compress: "gzip"}, + &rpcConfig{count: count, success: true, callType: clientStreamRPC}, + checkFuncs, + ) } func (s) TestServerStatsClientStreamRPCError(t *testing.T) { count := 1 - testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: clientStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){ - checkInHeader, - checkBegin, - checkOutHeader, - checkInPayload, - checkOutTrailer, - checkEnd, - }) + testServerStats( + t, + &testConfig{compress: "gzip"}, + &rpcConfig{count: count, success: false, callType: clientStreamRPC}, + []func(t *testing.T, d *gotData, e *expectedData){ + checkInHeader, + checkBegin, + checkOutHeader, + checkInPayload, + checkOutTrailer, + checkEnd, + }, + ) } func (s) TestServerStatsServerStreamRPC(t *testing.T) { @@ -1000,19 +1162,29 @@ func (s) TestServerStatsServerStreamRPC(t *testing.T) { checkOutTrailer, checkEnd, ) - testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: serverStreamRPC}, checkFuncs) + testServerStats( + t, + &testConfig{compress: "gzip"}, + &rpcConfig{count: count, success: true, callType: serverStreamRPC}, + checkFuncs, + ) } func (s) TestServerStatsServerStreamRPCError(t *testing.T) { count := 5 - testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: serverStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){ - checkInHeader, - checkBegin, - checkInPayload, - checkOutHeader, - checkOutTrailer, - checkEnd, - }) + testServerStats( + t, + &testConfig{compress: "gzip"}, + &rpcConfig{count: count, success: false, callType: serverStreamRPC}, + []func(t *testing.T, d *gotData, e *expectedData){ + checkInHeader, + checkBegin, + checkInPayload, + checkOutHeader, + checkOutTrailer, + checkEnd, + }, + ) } func (s) TestServerStatsFullDuplexRPC(t *testing.T) { @@ -1033,19 +1205,29 @@ func (s) TestServerStatsFullDuplexRPC(t *testing.T) { checkOutTrailer, checkEnd, ) - testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: fullDuplexStreamRPC}, checkFuncs) + testServerStats( + t, + &testConfig{compress: "gzip"}, + &rpcConfig{count: count, success: true, callType: fullDuplexStreamRPC}, + checkFuncs, + ) } func (s) TestServerStatsFullDuplexRPCError(t *testing.T) { count := 5 - testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: fullDuplexStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){ - checkInHeader, - checkBegin, - checkOutHeader, - checkInPayload, - checkOutTrailer, - checkEnd, - }) + testServerStats( + t, + &testConfig{compress: "gzip"}, + &rpcConfig{count: count, success: false, callType: fullDuplexStreamRPC}, + []func(t *testing.T, d *gotData, e *expectedData){ + checkInHeader, + checkBegin, + checkOutHeader, + checkInPayload, + checkOutTrailer, + checkEnd, + }, + ) } type checkFuncWithCount struct { @@ -1053,7 +1235,12 @@ type checkFuncWithCount struct { c int // expected count } -func checkClientStats(t *testing.T, got []*gotData, expect *expectedData, checkFuncs map[int]*checkFuncWithCount) { +func checkClientStats( + t *testing.T, + got []*gotData, + expect *expectedData, + checkFuncs map[int]*checkFuncWithCount, +) { var expectLen int for _, v := range checkFuncs { expectLen += v.c @@ -1138,7 +1325,12 @@ func checkClientStats(t *testing.T, got []*gotData, expect *expectedData, checkF } } -func testClientStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs map[int]*checkFuncWithCount) { +func testClientStats( + t *testing.T, + tc *testConfig, + cc *rpcConfig, + checkFuncs map[int]*checkFuncWithCount, +) { h := &statshandler{} te := newTest(t, tc, []stats.Handler{h}, nil) te.startServer(&testServer{}) @@ -1231,101 +1423,141 @@ func testClientStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs map } func (s) TestClientStatsUnaryRPC(t *testing.T) { - testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, failfast: false, callType: unaryRPC}, map[int]*checkFuncWithCount{ - begin: {checkBegin, 1}, - outHeader: {checkOutHeader, 1}, - outPayload: {checkOutPayload, 1}, - inHeader: {checkInHeader, 1}, - inPayload: {checkInPayload, 1}, - inTrailer: {checkInTrailer, 1}, - end: {checkEnd, 1}, - }) + testClientStats( + t, + &testConfig{compress: ""}, + &rpcConfig{success: true, failfast: false, callType: unaryRPC}, + map[int]*checkFuncWithCount{ + begin: {checkBegin, 1}, + outHeader: {checkOutHeader, 1}, + outPayload: {checkOutPayload, 1}, + inHeader: {checkInHeader, 1}, + inPayload: {checkInPayload, 1}, + inTrailer: {checkInTrailer, 1}, + end: {checkEnd, 1}, + }, + ) } func (s) TestClientStatsUnaryRPCError(t *testing.T) { - testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, failfast: false, callType: unaryRPC}, map[int]*checkFuncWithCount{ - begin: {checkBegin, 1}, - outHeader: {checkOutHeader, 1}, - outPayload: {checkOutPayload, 1}, - inHeader: {checkInHeader, 1}, - inTrailer: {checkInTrailer, 1}, - end: {checkEnd, 1}, - }) + testClientStats( + t, + &testConfig{compress: ""}, + &rpcConfig{success: false, failfast: false, callType: unaryRPC}, + map[int]*checkFuncWithCount{ + begin: {checkBegin, 1}, + outHeader: {checkOutHeader, 1}, + outPayload: {checkOutPayload, 1}, + inHeader: {checkInHeader, 1}, + inTrailer: {checkInTrailer, 1}, + end: {checkEnd, 1}, + }, + ) } func (s) TestClientStatsClientStreamRPC(t *testing.T) { count := 5 - testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: clientStreamRPC}, map[int]*checkFuncWithCount{ - begin: {checkBegin, 1}, - outHeader: {checkOutHeader, 1}, - inHeader: {checkInHeader, 1}, - outPayload: {checkOutPayload, count}, - inTrailer: {checkInTrailer, 1}, - inPayload: {checkInPayload, 1}, - end: {checkEnd, 1}, - }) + testClientStats( + t, + &testConfig{compress: "gzip"}, + &rpcConfig{count: count, success: true, failfast: false, callType: clientStreamRPC}, + map[int]*checkFuncWithCount{ + begin: {checkBegin, 1}, + outHeader: {checkOutHeader, 1}, + inHeader: {checkInHeader, 1}, + outPayload: {checkOutPayload, count}, + inTrailer: {checkInTrailer, 1}, + inPayload: {checkInPayload, 1}, + end: {checkEnd, 1}, + }, + ) } func (s) TestClientStatsClientStreamRPCError(t *testing.T) { count := 1 - testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: clientStreamRPC}, map[int]*checkFuncWithCount{ - begin: {checkBegin, 1}, - outHeader: {checkOutHeader, 1}, - inHeader: {checkInHeader, 1}, - outPayload: {checkOutPayload, 1}, - inTrailer: {checkInTrailer, 1}, - end: {checkEnd, 1}, - }) + testClientStats( + t, + &testConfig{compress: "gzip"}, + &rpcConfig{count: count, success: false, failfast: false, callType: clientStreamRPC}, + map[int]*checkFuncWithCount{ + begin: {checkBegin, 1}, + outHeader: {checkOutHeader, 1}, + inHeader: {checkInHeader, 1}, + outPayload: {checkOutPayload, 1}, + inTrailer: {checkInTrailer, 1}, + end: {checkEnd, 1}, + }, + ) } func (s) TestClientStatsServerStreamRPC(t *testing.T) { count := 5 - testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: serverStreamRPC}, map[int]*checkFuncWithCount{ - begin: {checkBegin, 1}, - outHeader: {checkOutHeader, 1}, - outPayload: {checkOutPayload, 1}, - inHeader: {checkInHeader, 1}, - inPayload: {checkInPayload, count}, - inTrailer: {checkInTrailer, 1}, - end: {checkEnd, 1}, - }) + testClientStats( + t, + &testConfig{compress: "gzip"}, + &rpcConfig{count: count, success: true, failfast: false, callType: serverStreamRPC}, + map[int]*checkFuncWithCount{ + begin: {checkBegin, 1}, + outHeader: {checkOutHeader, 1}, + outPayload: {checkOutPayload, 1}, + inHeader: {checkInHeader, 1}, + inPayload: {checkInPayload, count}, + inTrailer: {checkInTrailer, 1}, + end: {checkEnd, 1}, + }, + ) } func (s) TestClientStatsServerStreamRPCError(t *testing.T) { count := 5 - testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: serverStreamRPC}, map[int]*checkFuncWithCount{ - begin: {checkBegin, 1}, - outHeader: {checkOutHeader, 1}, - outPayload: {checkOutPayload, 1}, - inHeader: {checkInHeader, 1}, - inTrailer: {checkInTrailer, 1}, - end: {checkEnd, 1}, - }) + testClientStats( + t, + &testConfig{compress: "gzip"}, + &rpcConfig{count: count, success: false, failfast: false, callType: serverStreamRPC}, + map[int]*checkFuncWithCount{ + begin: {checkBegin, 1}, + outHeader: {checkOutHeader, 1}, + outPayload: {checkOutPayload, 1}, + inHeader: {checkInHeader, 1}, + inTrailer: {checkInTrailer, 1}, + end: {checkEnd, 1}, + }, + ) } func (s) TestClientStatsFullDuplexRPC(t *testing.T) { count := 5 - testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: fullDuplexStreamRPC}, map[int]*checkFuncWithCount{ - begin: {checkBegin, 1}, - outHeader: {checkOutHeader, 1}, - outPayload: {checkOutPayload, count}, - inHeader: {checkInHeader, 1}, - inPayload: {checkInPayload, count}, - inTrailer: {checkInTrailer, 1}, - end: {checkEnd, 1}, - }) + testClientStats( + t, + &testConfig{compress: "gzip"}, + &rpcConfig{count: count, success: true, failfast: false, callType: fullDuplexStreamRPC}, + map[int]*checkFuncWithCount{ + begin: {checkBegin, 1}, + outHeader: {checkOutHeader, 1}, + outPayload: {checkOutPayload, count}, + inHeader: {checkInHeader, 1}, + inPayload: {checkInPayload, count}, + inTrailer: {checkInTrailer, 1}, + end: {checkEnd, 1}, + }, + ) } func (s) TestClientStatsFullDuplexRPCError(t *testing.T) { count := 5 - testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: fullDuplexStreamRPC}, map[int]*checkFuncWithCount{ - begin: {checkBegin, 1}, - outHeader: {checkOutHeader, 1}, - outPayload: {checkOutPayload, 1}, - inHeader: {checkInHeader, 1}, - inTrailer: {checkInTrailer, 1}, - end: {checkEnd, 1}, - }) + testClientStats( + t, + &testConfig{compress: "gzip"}, + &rpcConfig{count: count, success: false, failfast: false, callType: fullDuplexStreamRPC}, + map[int]*checkFuncWithCount{ + begin: {checkBegin, 1}, + outHeader: {checkOutHeader, 1}, + outPayload: {checkOutPayload, 1}, + inHeader: {checkInHeader, 1}, + inTrailer: {checkInTrailer, 1}, + end: {checkEnd, 1}, + }, + ) } func (s) TestTags(t *testing.T) { @@ -1496,7 +1728,9 @@ func (s) TestStatsHandlerCallsServerIsRegisteredMethod(t *testing.T) { t.Errorf("DoesNotExistCall should not be a registered method according to server") } if isRegisteredMethod(server, "/unknownService/UnaryCall") { - t.Errorf("/unknownService/UnaryCall should not be a registered method according to server") + t.Errorf( + "/unknownService/UnaryCall should not be a registered method according to server", + ) } wg.Done() return ctx @@ -1519,3 +1753,66 @@ func (s) TestStatsHandlerCallsServerIsRegisteredMethod(t *testing.T) { } wg.Wait() } + +// TestServerStatsUnaryRPCEventSequence tests that the sequence of server-side stats +// events for a Unary RPC matches the expected flow. +func (s) TestServerStatsUnaryRPCEventSequence(t *testing.T) { + h := &statshandler{} + te := newTest(t, &testConfig{compress: ""}, nil, []stats.Handler{h}) + te.startServer(&testServer{}) + defer te.tearDown() + + _, _, err := te.doUnaryCall(&rpcConfig{success: true, callType: unaryRPC}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Allow time for events to propagate + time.Sleep(50 * time.Millisecond) + + // Verify sequence + h.mu.Lock() + defer h.mu.Unlock() + verifyEventSequence(t, h.events, expectedUnarySequence) +} + +// TestServerStatsClientStreamEventSequence tests that the sequence of server-side +// stats events for a Client Stream RPC matches the expected flow. +func (s) TestServerStatsClientStreamEventSequence(t *testing.T) { + h := &statshandler{} + te := newTest(t, &testConfig{compress: "gzip"}, nil, []stats.Handler{h}) + te.startServer(&testServer{}) + defer te.tearDown() + + _, _, err := te.doClientStreamCall( + &rpcConfig{count: 5, success: true, callType: clientStreamRPC}, + ) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + time.Sleep(50 * time.Millisecond) + + h.mu.Lock() + defer h.mu.Unlock() + verifyEventSequence(t, h.events, expectedClientStreamSequence) +} + +// verifyEventSequence verifies that a sequence of recorded events matches +// the expected sequence. +func verifyEventSequence(t *testing.T, got []event, expected []string) { + if len(got) != len(expected) { + t.Fatalf("Event count mismatch. Got: %d, Expected: %d", len(got), len(expected)) + } + + for i, e := range got { + if e.eventType != expected[i] { + t.Errorf( + "Unexpected event at position %d. Got: %s, Expected: %s", + i, + e.eventType, + expected[i], + ) + } + } +} From 10f6726b122d981b89ff75dae15f23b4d64c9aa3 Mon Sep 17 00:00:00 2001 From: RyanBlaney Date: Mon, 2 Dec 2024 13:47:33 -0800 Subject: [PATCH 2/6] Fix formatting issues according to Google style guide --- stats/stats_test.go | 50 ++++++++------------------------------------- 1 file changed, 9 insertions(+), 41 deletions(-) diff --git a/stats/stats_test.go b/stats/stats_test.go index 3c28642138d4..f0998d841dee 100644 --- a/stats/stats_test.go +++ b/stats/stats_test.go @@ -150,10 +150,7 @@ type testServer struct { testgrpc.UnimplementedTestServiceServer } -func (s *testServer) UnaryCall( - ctx context.Context, - in *testpb.SimpleRequest, -) (*testpb.SimpleResponse, error) { +func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { if err := grpc.SendHeader(ctx, testHeaderMetadata); err != nil { return nil, status.Errorf( status.Code(err), @@ -210,9 +207,7 @@ func (s *testServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallSe } } -func (s *testServer) StreamingInputCall( - stream testgrpc.TestService_StreamingInputCallServer, -) error { +func (s *testServer) StreamingInputCall(stream testgrpc.TestService_StreamingInputCallServer) error { if err := stream.SendHeader(testHeaderMetadata); err != nil { return status.Errorf( status.Code(err), @@ -240,10 +235,7 @@ func (s *testServer) StreamingInputCall( } } -func (s *testServer) StreamingOutputCall( - in *testpb.StreamingOutputCallRequest, - stream testgrpc.TestService_StreamingOutputCallServer, -) error { +func (s *testServer) StreamingOutputCall(in *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error { if err := stream.SendHeader(testHeaderMetadata); err != nil { return status.Errorf( status.Code(err), @@ -448,9 +440,7 @@ func (te *test) doFullDuplexCallRoundtrip(c *rpcConfig) ([]proto.Message, []prot return reqs, resps, nil } -func (te *test) doClientStreamCall( - c *rpcConfig, -) ([]proto.Message, *testpb.StreamingInputCallResponse, error) { +func (te *test) doClientStreamCall(c *rpcConfig) ([]proto.Message, *testpb.StreamingInputCallResponse, error) { var ( reqs []proto.Message resp *testpb.StreamingInputCallResponse @@ -483,9 +473,7 @@ func (te *test) doClientStreamCall( return reqs, resp, err } -func (te *test) doServerStreamCall( - c *rpcConfig, -) (*testpb.StreamingOutputCallRequest, []proto.Message, error) { +func (te *test) doServerStreamCall(c *rpcConfig) (*testpb.StreamingOutputCallRequest, []proto.Message, error) { var ( req *testpb.StreamingOutputCallRequest resps []proto.Message @@ -957,12 +945,7 @@ func checkConnStats(t *testing.T, got []*gotData) { checkConnEnd(t, got[len(got)-1]) } -func checkServerStats( - t *testing.T, - got []*gotData, - expect *expectedData, - checkFuncs []func(t *testing.T, d *gotData, e *expectedData), -) { +func checkServerStats(t *testing.T, got []*gotData, expect *expectedData, checkFuncs []func(t *testing.T, d *gotData, e *expectedData)) { if len(got) != len(checkFuncs) { for i, g := range got { t.Errorf(" - %v, %T", i, g.s) @@ -975,12 +958,7 @@ func checkServerStats( } } -func testServerStats( - t *testing.T, - tc *testConfig, - cc *rpcConfig, - checkFuncs []func(t *testing.T, d *gotData, e *expectedData), -) { +func testServerStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs []func(t *testing.T, d *gotData, e *expectedData)) { h := &statshandler{} te := newTest(t, tc, nil, []stats.Handler{h}) te.startServer(&testServer{}) @@ -1235,12 +1213,7 @@ type checkFuncWithCount struct { c int // expected count } -func checkClientStats( - t *testing.T, - got []*gotData, - expect *expectedData, - checkFuncs map[int]*checkFuncWithCount, -) { +func checkClientStats( t *testing.T, got []*gotData, expect *expectedData, checkFuncs map[int]*checkFuncWithCount) { var expectLen int for _, v := range checkFuncs { expectLen += v.c @@ -1325,12 +1298,7 @@ func checkClientStats( } } -func testClientStats( - t *testing.T, - tc *testConfig, - cc *rpcConfig, - checkFuncs map[int]*checkFuncWithCount, -) { +func testClientStats( t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs map[int]*checkFuncWithCount) { h := &statshandler{} te := newTest(t, tc, []stats.Handler{h}, nil) te.startServer(&testServer{}) From 92ec34170d8a46e7cb6d7ba78e68fd9e8e14b722 Mon Sep 17 00:00:00 2001 From: RyanBlaney Date: Mon, 2 Dec 2024 13:50:15 -0800 Subject: [PATCH 3/6] Removed unnecessary time stamps from event tracing --- stats/stats_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/stats/stats_test.go b/stats/stats_test.go index f0998d841dee..fc9c8a2985b6 100644 --- a/stats/stats_test.go +++ b/stats/stats_test.go @@ -874,7 +874,6 @@ func checkConnEnd(t *testing.T, d *gotData) { type event struct { eventType string - timestamp time.Time } type statshandler struct { @@ -896,7 +895,7 @@ func (h *statshandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) conte func (h *statshandler) recordEvent(eventType string) { h.mu.Lock() defer h.mu.Unlock() - h.events = append(h.events, event{eventType: eventType, timestamp: time.Now()}) + h.events = append(h.events, event{eventType: eventType}) } func (h *statshandler) HandleConn(ctx context.Context, s stats.ConnStats) { From 71adf5e36cafbe75a2d288487f825f22bfd6d5fe Mon Sep 17 00:00:00 2001 From: RyanBlaney Date: Mon, 2 Dec 2024 20:56:31 -0800 Subject: [PATCH 4/6] Cleared up difference between client side and server side stat events. Added a server stats Server Stream RPC test. --- stats/stats.go | 23 +++++++++++++++++------ stats/stats_test.go | 46 +++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 61 insertions(+), 8 deletions(-) diff --git a/stats/stats.go b/stats/stats.go index 4410be7e1596..72288cfede1a 100644 --- a/stats/stats.go +++ b/stats/stats.go @@ -36,10 +36,15 @@ type RPCStats interface { IsClient() bool } -// InHeader is the first event handled in the RPC lifecycle. -// It contains stats when the header is received. -// This event marks the start of processing for incoming RPCs -// and must be completed before any other events occur. +// InHeader contain stats when the header is received. +// First event in the server side event sequence. +// Follows last OutPayload for server side events. +// +// Server Stats Example Event Ordering: +// *InHeader* -> Begin -> InPayload(s) -> OutHeader -> OutPayload(s) -> OutTrailer -> End +// +// Client Stats Example Event Ordering: +// Begin -> OutHeader -> OutPayload(s) -> *InHeader* -> InTrailer -> InPayload(s) -> End type InHeader struct { // Client is true if this InHeader is from client side. Client bool @@ -65,8 +70,14 @@ func (s *InHeader) IsClient() bool { return s.Client } func (s *InHeader) isRPCStats() {} // Begin contains stats when an RPC attempt begins. -// This event is called AFTER the InHeader event, as headers must -// be processed before the RPC lifecycle begins +// First event in the client-side event sequence. +// Follows InHeader for server-side events. +// +// Server Stats Example Event Ordering: +// InHeader -> *Begin* -> InPayload(s) -> OutHeader -> OutPayload(s) -> OutTrailer -> End +// +// Client Stats Example Event Ordering: +// *Begin* -> OutHeader -> OutPayload(s) -> InHeader -> InTrailer -> InPayload(s) -> End // // FailFast is only valid if this Begin is from client side. type Begin struct { diff --git a/stats/stats_test.go b/stats/stats_test.go index fc9c8a2985b6..d6231eeb1f9b 100644 --- a/stats/stats_test.go +++ b/stats/stats_test.go @@ -84,6 +84,7 @@ var ( // The id for which the service handler should return error. errorID int32 = 32202 + // Server Stats // Ensure that Unary RPC server stats events are logged in the correct order. expectedUnarySequence = []string{ "ConnStats", @@ -96,8 +97,9 @@ var ( "End", } - // Ensure that the sequence of server-side stats events for a Unary RPC - // matches the expected flow. + // Server Stats + // Ensure that the sequence of server-side stats events for a + // client stream RPC matches the expected flow. expectedClientStreamSequence = []string{ "ConnStats", "InHeader", @@ -112,6 +114,24 @@ var ( "OutTrailer", "End", } + + // Server Stats Test + // Ensure that the sequence of server-side stats events for a + // server stream RPC matches the expected flow. + expectedServerStreamSequence = []string{ + "ConnStats", + "InHeader", + "Begin", + "InPayload", + "OutHeader", + "OutPayload", + "OutPayload", + "OutPayload", + "OutPayload", + "OutPayload", + "OutTrailer", + "End", + } ) func idToPayload(id int32) *testpb.Payload { @@ -1765,6 +1785,28 @@ func (s) TestServerStatsClientStreamEventSequence(t *testing.T) { verifyEventSequence(t, h.events, expectedClientStreamSequence) } +// TestServerStatsClientStreamEventSequence tests that the sequence of server-side +// stats events for a Client Stream RPC matches the expected flow. +func (s) TestServerStatsServerStreamEventSequence(t *testing.T) { + h := &statshandler{} + te := newTest(t, &testConfig{compress: "gzip"}, nil, []stats.Handler{h}) + te.startServer(&testServer{}) + defer te.tearDown() + + _, _, err := te.doServerStreamCall( + &rpcConfig{count: 5, success: true, callType: serverStreamRPC}, + ) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + time.Sleep(50 * time.Millisecond) + + h.mu.Lock() + defer h.mu.Unlock() + verifyEventSequence(t, h.events, expectedServerStreamSequence) +} + // verifyEventSequence verifies that a sequence of recorded events matches // the expected sequence. func verifyEventSequence(t *testing.T, got []event, expected []string) { From 8622eabb0c8044330e7bac840dbb2aed452b3267 Mon Sep 17 00:00:00 2001 From: RyanBlaney Date: Wed, 11 Dec 2024 14:23:48 -0800 Subject: [PATCH 5/6] Fixed documentation style and whitespace between comments. Put the order of the InHeader and Begin structs to their original place in the stats.go file. --- stats/stats.go | 72 +++++++++++++++++++-------------------------- stats/stats_test.go | 18 +----------- 2 files changed, 32 insertions(+), 58 deletions(-) diff --git a/stats/stats.go b/stats/stats.go index 72288cfede1a..e589f0f23321 100644 --- a/stats/stats.go +++ b/stats/stats.go @@ -36,48 +36,9 @@ type RPCStats interface { IsClient() bool } -// InHeader contain stats when the header is received. -// First event in the server side event sequence. -// Follows last OutPayload for server side events. -// -// Server Stats Example Event Ordering: -// *InHeader* -> Begin -> InPayload(s) -> OutHeader -> OutPayload(s) -> OutTrailer -> End -// -// Client Stats Example Event Ordering: -// Begin -> OutHeader -> OutPayload(s) -> *InHeader* -> InTrailer -> InPayload(s) -> End -type InHeader struct { - // Client is true if this InHeader is from client side. - Client bool - // WireLength is the wire length of header. - WireLength int - // Compression is the compression algorithm used for the RPC. - Compression string - // Header contains the header metadata received. - Header metadata.MD - - // The following fields are valid only if Client is false. - // FullMethod is the full RPC method string, i.e., /package.service/method. - FullMethod string - // RemoteAddr is the remote address of the corresponding connection. - RemoteAddr net.Addr - // LocalAddr is the local address of the corresponding connection. - LocalAddr net.Addr -} - -// IsClient indicates if the stats information is from client side. -func (s *InHeader) IsClient() bool { return s.Client } - -func (s *InHeader) isRPCStats() {} - // Begin contains stats when an RPC attempt begins. -// First event in the client-side event sequence. -// Follows InHeader for server-side events. -// -// Server Stats Example Event Ordering: -// InHeader -> *Begin* -> InPayload(s) -> OutHeader -> OutPayload(s) -> OutTrailer -> End -// -// Client Stats Example Event Ordering: -// *Begin* -> OutHeader -> OutPayload(s) -> InHeader -> InTrailer -> InPayload(s) -> End +// This event is called AFTER the InHeader event, as headers must +// be processed before the RPC lifecycle begins. // // FailFast is only valid if this Begin is from client side. type Begin struct { @@ -140,6 +101,35 @@ func (s *InPayload) IsClient() bool { return s.Client } func (s *InPayload) isRPCStats() {} +// InHeader contain stats when the header is received. +// +// First event in the server side event sequence. +// Follows last OutPayload for server side events. +type InHeader struct { + // Client is true if this InHeader is from client side. + Client bool + // WireLength is the wire length of header. + WireLength int + // Compression is the compression algorithm used for the RPC. + Compression string + // Header contains the header metadata received. + Header metadata.MD + + // The following fields are valid only if Client is false. + // FullMethod is the full RPC method string, i.e., /package.service/method. + FullMethod string + // RemoteAddr is the remote address of the corresponding connection. + RemoteAddr net.Addr + // LocalAddr is the local address of the corresponding connection. + LocalAddr net.Addr +} + +// IsClient indicates if the stats information is from client side. +func (s *InHeader) IsClient() bool { return s.Client } + +func (s *InHeader) isRPCStats() {} + + // InTrailer contains stats when a trailer is received. type InTrailer struct { // Client is true if this InTrailer is from client side. diff --git a/stats/stats_test.go b/stats/stats_test.go index d6231eeb1f9b..f6c77cfd4c1d 100644 --- a/stats/stats_test.go +++ b/stats/stats_test.go @@ -83,7 +83,6 @@ var ( } // The id for which the service handler should return error. errorID int32 = 32202 - // Server Stats // Ensure that Unary RPC server stats events are logged in the correct order. expectedUnarySequence = []string{ @@ -96,7 +95,6 @@ var ( "OutTrailer", "End", } - // Server Stats // Ensure that the sequence of server-side stats events for a // client stream RPC matches the expected flow. @@ -114,7 +112,6 @@ var ( "OutTrailer", "End", } - // Server Stats Test // Ensure that the sequence of server-side stats events for a // server stream RPC matches the expected flow. @@ -197,14 +194,7 @@ func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (* func (s *testServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallServer) error { if err := stream.SendHeader(testHeaderMetadata); err != nil { - return status.Errorf( - status.Code(err), - "%v.SendHeader(%v) = %v, want %v", - stream, - testHeaderMetadata, - err, - nil, - ) + return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, testHeaderMetadata, err, nil) } stream.SetTrailer(testTrailerMetadata) for { @@ -1623,12 +1613,10 @@ func (s) TestMultipleClientStatsHandler(t *testing.T) { h.mu.Unlock() time.Sleep(10 * time.Millisecond) } - // Each RPC generates 6 stats events on the client-side, times 2 StatsHandler if len(h.gotRPC) != 12 { t.Fatalf("h.gotRPC: unexpected amount of RPCStats: %v != %v", len(h.gotRPC), 12) } - // Each connection generates 4 conn events on the client-side, times 2 StatsHandler if len(h.gotConn) != 4 { t.Fatalf("h.gotConn: unexpected amount of ConnStats: %v != %v", len(h.gotConn), 4) @@ -1669,12 +1657,10 @@ func (s) TestMultipleServerStatsHandler(t *testing.T) { h.mu.Unlock() time.Sleep(10 * time.Millisecond) } - // Each RPC generates 6 stats events on the server-side, times 2 StatsHandler if len(h.gotRPC) != 12 { t.Fatalf("h.gotRPC: unexpected amount of RPCStats: %v != %v", len(h.gotRPC), 12) } - // Each connection generates 4 conn events on the server-side, times 2 StatsHandler if len(h.gotConn) != 4 { t.Fatalf("h.gotConn: unexpected amount of ConnStats: %v != %v", len(h.gotConn), 4) @@ -1753,10 +1739,8 @@ func (s) TestServerStatsUnaryRPCEventSequence(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - // Allow time for events to propagate time.Sleep(50 * time.Millisecond) - // Verify sequence h.mu.Lock() defer h.mu.Unlock() From f291e58ad8d8a75f9403e22b6ecb6295ad7418a3 Mon Sep 17 00:00:00 2001 From: RyanBlaney Date: Wed, 11 Dec 2024 14:30:30 -0800 Subject: [PATCH 6/6] Fixed return wrapping. Added note about StartServer using deprecated opts. --- stats/stats_test.go | 34 ++++++---------------------------- 1 file changed, 6 insertions(+), 28 deletions(-) diff --git a/stats/stats_test.go b/stats/stats_test.go index f6c77cfd4c1d..36af9e66b20f 100644 --- a/stats/stats_test.go +++ b/stats/stats_test.go @@ -169,20 +169,10 @@ type testServer struct { func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { if err := grpc.SendHeader(ctx, testHeaderMetadata); err != nil { - return nil, status.Errorf( - status.Code(err), - "grpc.SendHeader(_, %v) = %v, want ", - testHeaderMetadata, - err, - ) + return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want ", testHeaderMetadata, err) } if err := grpc.SetTrailer(ctx, testTrailerMetadata); err != nil { - return nil, status.Errorf( - status.Code(err), - "grpc.SetTrailer(_, %v) = %v, want ", - testTrailerMetadata, - err, - ) + return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want ", testTrailerMetadata, err) } if id := payloadToID(in.Payload); id == errorID { @@ -219,14 +209,7 @@ func (s *testServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallSe func (s *testServer) StreamingInputCall(stream testgrpc.TestService_StreamingInputCallServer) error { if err := stream.SendHeader(testHeaderMetadata); err != nil { - return status.Errorf( - status.Code(err), - "%v.SendHeader(%v) = %v, want %v", - stream, - testHeaderMetadata, - err, - nil, - ) + return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, testHeaderMetadata, err, nil) } stream.SetTrailer(testTrailerMetadata) for { @@ -247,14 +230,7 @@ func (s *testServer) StreamingInputCall(stream testgrpc.TestService_StreamingInp func (s *testServer) StreamingOutputCall(in *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error { if err := stream.SendHeader(testHeaderMetadata); err != nil { - return status.Errorf( - status.Code(err), - "%v.SendHeader(%v) = %v, want %v", - stream, - testHeaderMetadata, - err, - nil, - ) + return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, testHeaderMetadata, err, nil) } stream.SetTrailer(testTrailerMetadata) @@ -314,6 +290,8 @@ func newTest(t *testing.T, tc *testConfig, chs []stats.Handler, shs []stats.Hand // startServer starts a gRPC server listening. Callers should defer a // call to te.tearDown to clean up. +// +// Uses deprecated opts rpc.(RPCCompressor, RPCDecompressor, WithBlock, Dial) func (te *test) startServer(ts testgrpc.TestServiceServer) { te.testServer = ts lis, err := net.Listen("tcp", "localhost:0")