From c075e7d2f8d97bc9257b8d608585c7295f9be0a8 Mon Sep 17 00:00:00 2001 From: Zach Leslie Date: Tue, 14 Nov 2023 13:42:49 +0000 Subject: [PATCH] Update dskit dependency (#3137) * Update dskit * Tidy * Update serverless go.mod --- cmd/tempo-serverless/cloud-run/go.mod | 2 +- cmd/tempo-serverless/cloud-run/go.sum | 4 +- cmd/tempo-serverless/lambda/go.mod | 2 +- cmd/tempo-serverless/lambda/go.sum | 4 +- go.mod | 2 +- go.sum | 4 +- .../grafana/dskit/concurrency/buffer.go | 7 + .../dskit/grpcclient/instrumentation.go | 6 +- .../grafana/dskit/grpcutil/cancel.go | 25 --- .../grafana/dskit/grpcutil/status.go | 70 +++++++++ .../grafana/dskit/httpgrpc/httpgrpc.go | 32 +--- .../dskit/middleware/grpc_instrumentation.go | 146 ++++++++++++------ .../grafana/dskit/middleware/zero_response.go | 132 ++++++++++++++++ vendor/github.com/grafana/dskit/ring/batch.go | 107 +++++++------ .../github.com/grafana/dskit/server/server.go | 22 ++- vendor/modules.txt | 2 +- 16 files changed, 407 insertions(+), 160 deletions(-) delete mode 100644 vendor/github.com/grafana/dskit/grpcutil/cancel.go create mode 100644 vendor/github.com/grafana/dskit/grpcutil/status.go create mode 100644 vendor/github.com/grafana/dskit/middleware/zero_response.go diff --git a/cmd/tempo-serverless/cloud-run/go.mod b/cmd/tempo-serverless/cloud-run/go.mod index 1f1ae041dea..34dc5477fe9 100644 --- a/cmd/tempo-serverless/cloud-run/go.mod +++ b/cmd/tempo-serverless/cloud-run/go.mod @@ -62,7 +62,7 @@ require ( github.com/googleapis/gax-go/v2 v2.12.0 // indirect github.com/gorilla/handlers v1.5.1 // indirect github.com/gorilla/mux v1.8.0 // indirect - github.com/grafana/dskit v0.0.0-20231031132813-52f4e8d82d59 // indirect + github.com/grafana/dskit v0.0.0-20231113165309-40e91edb6190 // indirect github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect diff --git a/cmd/tempo-serverless/cloud-run/go.sum b/cmd/tempo-serverless/cloud-run/go.sum index 1fb66f5b6a1..41309eead35 100644 --- a/cmd/tempo-serverless/cloud-run/go.sum +++ b/cmd/tempo-serverless/cloud-run/go.sum @@ -275,8 +275,8 @@ github.com/gorilla/handlers v1.5.1 h1:9lRY6j8DEeeBT10CvO9hGW0gmky0BprnvDI5vfhUHH github.com/gorilla/handlers v1.5.1/go.mod h1:t8XrUpc4KVXb7HGyJ4/cEnwQiaxrX/hz1Zv/4g96P1Q= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= -github.com/grafana/dskit v0.0.0-20231031132813-52f4e8d82d59 h1:tWbF2UD8HgvpqyxV60zmUDDzJI2gybMJxw4/RY/UNTo= -github.com/grafana/dskit v0.0.0-20231031132813-52f4e8d82d59/go.mod h1:8dsy5tQOkeNQyjXpm5mQsbCu3H5uzeBD35MzRQFznKU= +github.com/grafana/dskit v0.0.0-20231113165309-40e91edb6190 h1:uphjpuVv+q/D9vdBfQyGBueeZlwgep7CFS7ik+YP/Xo= +github.com/grafana/dskit v0.0.0-20231113165309-40e91edb6190/go.mod h1:8dsy5tQOkeNQyjXpm5mQsbCu3H5uzeBD35MzRQFznKU= github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586 h1:/of8Z8taCPftShATouOrBVy6GaTTjgQd/VfNiZp/VXQ= github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI= diff --git a/cmd/tempo-serverless/lambda/go.mod b/cmd/tempo-serverless/lambda/go.mod index 1ae2c6094c7..8ca67df3202 100644 --- a/cmd/tempo-serverless/lambda/go.mod +++ b/cmd/tempo-serverless/lambda/go.mod @@ -65,7 +65,7 @@ require ( github.com/googleapis/gax-go/v2 v2.12.0 // indirect github.com/gorilla/handlers v1.5.1 // indirect github.com/gorilla/mux v1.8.0 // indirect - github.com/grafana/dskit v0.0.0-20231031132813-52f4e8d82d59 // indirect + github.com/grafana/dskit v0.0.0-20231113165309-40e91edb6190 // indirect github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect diff --git a/cmd/tempo-serverless/lambda/go.sum b/cmd/tempo-serverless/lambda/go.sum index c910e9091b7..ee021d8b9da 100644 --- a/cmd/tempo-serverless/lambda/go.sum +++ b/cmd/tempo-serverless/lambda/go.sum @@ -279,8 +279,8 @@ github.com/gorilla/handlers v1.5.1 h1:9lRY6j8DEeeBT10CvO9hGW0gmky0BprnvDI5vfhUHH github.com/gorilla/handlers v1.5.1/go.mod h1:t8XrUpc4KVXb7HGyJ4/cEnwQiaxrX/hz1Zv/4g96P1Q= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= -github.com/grafana/dskit v0.0.0-20231031132813-52f4e8d82d59 h1:tWbF2UD8HgvpqyxV60zmUDDzJI2gybMJxw4/RY/UNTo= -github.com/grafana/dskit v0.0.0-20231031132813-52f4e8d82d59/go.mod h1:8dsy5tQOkeNQyjXpm5mQsbCu3H5uzeBD35MzRQFznKU= +github.com/grafana/dskit v0.0.0-20231113165309-40e91edb6190 h1:uphjpuVv+q/D9vdBfQyGBueeZlwgep7CFS7ik+YP/Xo= +github.com/grafana/dskit v0.0.0-20231113165309-40e91edb6190/go.mod h1:8dsy5tQOkeNQyjXpm5mQsbCu3H5uzeBD35MzRQFznKU= github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586 h1:/of8Z8taCPftShATouOrBVy6GaTTjgQd/VfNiZp/VXQ= github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI= diff --git a/go.mod b/go.mod index 168c2f3a7b7..553391d6eb8 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( github.com/google/go-cmp v0.5.9 github.com/google/uuid v1.3.1 github.com/gorilla/mux v1.8.0 - github.com/grafana/dskit v0.0.0-20231031132813-52f4e8d82d59 + github.com/grafana/dskit v0.0.0-20231113165309-40e91edb6190 github.com/grafana/e2e v0.1.1 github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 github.com/hashicorp/go-hclog v1.5.0 diff --git a/go.sum b/go.sum index f5c5538b2af..6329fc56892 100644 --- a/go.sum +++ b/go.sum @@ -511,8 +511,8 @@ github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+ github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/grafana/dskit v0.0.0-20231031132813-52f4e8d82d59 h1:tWbF2UD8HgvpqyxV60zmUDDzJI2gybMJxw4/RY/UNTo= -github.com/grafana/dskit v0.0.0-20231031132813-52f4e8d82d59/go.mod h1:8dsy5tQOkeNQyjXpm5mQsbCu3H5uzeBD35MzRQFznKU= +github.com/grafana/dskit v0.0.0-20231113165309-40e91edb6190 h1:uphjpuVv+q/D9vdBfQyGBueeZlwgep7CFS7ik+YP/Xo= +github.com/grafana/dskit v0.0.0-20231113165309-40e91edb6190/go.mod h1:8dsy5tQOkeNQyjXpm5mQsbCu3H5uzeBD35MzRQFznKU= github.com/grafana/e2e v0.1.1 h1:/b6xcv5BtoBnx8cZnCiey9DbjEc8z7gXHO5edoeRYxc= github.com/grafana/e2e v0.1.1/go.mod h1:RpNLgae5VT+BUHvPE+/zSypmOXKwEu4t+tnEMS1ATaE= github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586 h1:/of8Z8taCPftShATouOrBVy6GaTTjgQd/VfNiZp/VXQ= diff --git a/vendor/github.com/grafana/dskit/concurrency/buffer.go b/vendor/github.com/grafana/dskit/concurrency/buffer.go index 623b9a70761..b8da4423f10 100644 --- a/vendor/github.com/grafana/dskit/concurrency/buffer.go +++ b/vendor/github.com/grafana/dskit/concurrency/buffer.go @@ -24,3 +24,10 @@ func (sb *SyncBuffer) String() string { return sb.buf.String() } + +func (sb *SyncBuffer) Reset() { + sb.mu.Lock() + defer sb.mu.Unlock() + + sb.buf.Reset() +} diff --git a/vendor/github.com/grafana/dskit/grpcclient/instrumentation.go b/vendor/github.com/grafana/dskit/grpcclient/instrumentation.go index 4a10ce48d27..280f02180c3 100644 --- a/vendor/github.com/grafana/dskit/grpcclient/instrumentation.go +++ b/vendor/github.com/grafana/dskit/grpcclient/instrumentation.go @@ -9,14 +9,14 @@ import ( "github.com/grafana/dskit/middleware" ) -func Instrument(requestDuration *prometheus.HistogramVec) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) { +func Instrument(requestDuration *prometheus.HistogramVec, instrumentationLabelOptions ...middleware.InstrumentationOption) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) { return []grpc.UnaryClientInterceptor{ otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), middleware.ClientUserHeaderInterceptor, - middleware.UnaryClientInstrumentInterceptor(requestDuration), + middleware.UnaryClientInstrumentInterceptor(requestDuration, instrumentationLabelOptions...), }, []grpc.StreamClientInterceptor{ otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()), middleware.StreamClientUserHeaderInterceptor, - middleware.StreamClientInstrumentInterceptor(requestDuration), + middleware.StreamClientInstrumentInterceptor(requestDuration, instrumentationLabelOptions...), } } diff --git a/vendor/github.com/grafana/dskit/grpcutil/cancel.go b/vendor/github.com/grafana/dskit/grpcutil/cancel.go deleted file mode 100644 index b1d369d2a3e..00000000000 --- a/vendor/github.com/grafana/dskit/grpcutil/cancel.go +++ /dev/null @@ -1,25 +0,0 @@ -// Provenance-includes-location: https://github.com/weaveworks/common/blob/main/grpc/cancel.go -// Provenance-includes-license: Apache-2.0 -// Provenance-includes-copyright: Weaveworks Ltd. - -package grpcutil - -import ( - "context" - "errors" - - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -// IsCanceled checks whether an error comes from an operation being canceled -func IsCanceled(err error) bool { - if errors.Is(err, context.Canceled) { - return true - } - s, ok := status.FromError(err) - if ok && s.Code() == codes.Canceled { - return true - } - return false -} diff --git a/vendor/github.com/grafana/dskit/grpcutil/status.go b/vendor/github.com/grafana/dskit/grpcutil/status.go new file mode 100644 index 00000000000..a9e9aab249a --- /dev/null +++ b/vendor/github.com/grafana/dskit/grpcutil/status.go @@ -0,0 +1,70 @@ +package grpcutil + +import ( + "context" + "errors" + + "github.com/gogo/status" + "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" +) + +// ErrorToStatus returns a *github.com/gogo/status.Status representation of err. +// +// - If err implements the method `GRPCStatus() *google.golang.org/grpc/status.Status` and +// `GRPCStatus()` does not return nil, or if err wraps a type satisfying this, Status from +// `GRPCStatus()` is converted to gogo Status, and returned. In that case, ok is true. +// +// - If err is or GRPCStatus() returns nil, a nil Status is returned and ok is false. +// +// - Otherwise, err is an error not compatible with this function. In this +// case, a nil Status is returned and ok is false. +func ErrorToStatus(err error) (*status.Status, bool) { + if err == nil { + return nil, false + } + type grpcStatus interface{ GRPCStatus() *grpcstatus.Status } + var gs grpcStatus + if errors.As(err, &gs) { + st := gs.GRPCStatus() + if st == nil { + return nil, false + } + return status.FromGRPCStatus(st), true + } + return nil, false +} + +// ErrorToStatusCode extracts gRPC status code from error and returns it. +// +// - If err is nil, codes.OK is returned. +// +// - If err implements (or wraps error that implements) the method +// `GRPCStatus() *google.golang.org/grpc/status.Status`, and +// `GRPCStatus()` returns a non-nil status, code from the status +// is returned. +// +// - Otherwise code.Unknown is returned. +func ErrorToStatusCode(err error) codes.Code { + if err == nil { + return codes.OK + } + type grpcStatus interface{ GRPCStatus() *grpcstatus.Status } + var gs grpcStatus + if errors.As(err, &gs) { + st := gs.GRPCStatus() + if st != nil { + return st.Code() + } + } + return codes.Unknown +} + +// IsCanceled checks whether an error comes from an operation being canceled. +func IsCanceled(err error) bool { + if errors.Is(err, context.Canceled) { + return true + } + statusCode := ErrorToStatusCode(err) + return statusCode == codes.Canceled +} diff --git a/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go b/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go index 83ea023f3b8..213c261b700 100644 --- a/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go +++ b/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go @@ -6,16 +6,15 @@ package httpgrpc import ( "context" - "errors" "fmt" "github.com/go-kit/log/level" - "google.golang.org/grpc/metadata" - grpcstatus "google.golang.org/grpc/status" - spb "github.com/gogo/googleapis/google/rpc" "github.com/gogo/protobuf/types" "github.com/gogo/status" + "google.golang.org/grpc/metadata" + + "github.com/grafana/dskit/grpcutil" "github.com/grafana/dskit/log" ) @@ -46,7 +45,7 @@ func ErrorFromHTTPResponse(resp *HTTPResponse) error { // HTTPResponseFromError converts a grpc error into an HTTP response func HTTPResponseFromError(err error) (*HTTPResponse, bool) { - s, ok := statusFromError(err) + s, ok := grpcutil.ErrorToStatus(err) if !ok { return nil, false } @@ -65,29 +64,6 @@ func HTTPResponseFromError(err error) (*HTTPResponse, bool) { return &resp, true } -// statusFromError tries to cast the given error into status.Status. -// If the given error, or any error from its tree are a status.Status, -// that status.Status and the outcome true are returned. -// Otherwise, nil and the outcome false are returned. -// This implementation differs from status.FromError() because the -// latter checks only if the given error can be cast to status.Status, -// and doesn't check other errors in the given error's tree. -func statusFromError(err error) (*status.Status, bool) { - if err == nil { - return nil, false - } - type grpcStatus interface{ GRPCStatus() *grpcstatus.Status } - var gs grpcStatus - if errors.As(err, &gs) { - st := gs.GRPCStatus() - if st == nil { - return nil, false - } - return status.FromGRPCStatus(st), true - } - return nil, false -} - const ( MetadataMethod = "httpgrpc-method" MetadataURL = "httpgrpc-url" diff --git a/vendor/github.com/grafana/dskit/middleware/grpc_instrumentation.go b/vendor/github.com/grafana/dskit/middleware/grpc_instrumentation.go index 70069fa36fa..e4052b8ed05 100644 --- a/vendor/github.com/grafana/dskit/middleware/grpc_instrumentation.go +++ b/vendor/github.com/grafana/dskit/middleware/grpc_instrumentation.go @@ -6,6 +6,7 @@ package middleware import ( "context" + "errors" "io" "strconv" "time" @@ -13,72 +14,69 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "github.com/grafana/dskit/grpcutil" - "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/instrument" ) -func observe(ctx context.Context, hist *prometheus.HistogramVec, method string, err error, duration time.Duration) { - respStatus := "success" - if err != nil { - if errResp, ok := httpgrpc.HTTPResponseFromError(err); ok { - respStatus = strconv.Itoa(int(errResp.Code)) - } else if grpcutil.IsCanceled(err) { - respStatus = "cancel" - } else { - respStatus = "error" - } - } - instrument.ObserveWithExemplar(ctx, hist.WithLabelValues(gRPC, method, respStatus, "false"), duration.Seconds()) +func observe(ctx context.Context, hist *prometheus.HistogramVec, method string, err error, duration time.Duration, instrumentLabel instrumentationLabel) { + instrument.ObserveWithExemplar(ctx, hist.WithLabelValues(gRPC, method, instrumentLabel.getInstrumentationLabel(err), "false"), duration.Seconds()) } // UnaryServerInstrumentInterceptor instruments gRPC requests for errors and latency. -func UnaryServerInstrumentInterceptor(hist *prometheus.HistogramVec) grpc.UnaryServerInterceptor { +func UnaryServerInstrumentInterceptor(hist *prometheus.HistogramVec, instrumentationOptions ...InstrumentationOption) grpc.UnaryServerInterceptor { + instrumentationLabel := applyInstrumentationOptions(false, instrumentationOptions...) return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { begin := time.Now() resp, err := handler(ctx, req) - observe(ctx, hist, info.FullMethod, err, time.Since(begin)) + observe(ctx, hist, info.FullMethod, err, time.Since(begin), instrumentationLabel) return resp, err } } // StreamServerInstrumentInterceptor instruments gRPC requests for errors and latency. -func StreamServerInstrumentInterceptor(hist *prometheus.HistogramVec) grpc.StreamServerInterceptor { +func StreamServerInstrumentInterceptor(hist *prometheus.HistogramVec, instrumentationOptions ...InstrumentationOption) grpc.StreamServerInterceptor { + instrumentationLabel := applyInstrumentationOptions(false, instrumentationOptions...) return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { begin := time.Now() err := handler(srv, ss) - observe(ss.Context(), hist, info.FullMethod, err, time.Since(begin)) + observe(ss.Context(), hist, info.FullMethod, err, time.Since(begin), instrumentationLabel) return err } } // UnaryClientInstrumentInterceptor records duration of gRPC requests client side. -func UnaryClientInstrumentInterceptor(metric *prometheus.HistogramVec) grpc.UnaryClientInterceptor { +func UnaryClientInstrumentInterceptor(metric *prometheus.HistogramVec, instrumentationOptions ...InstrumentationOption) grpc.UnaryClientInterceptor { + // we enforce masking of HTTP statuses. + instrumentationLabel := applyInstrumentationOptions(true, instrumentationOptions...) return func(ctx context.Context, method string, req, resp interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { start := time.Now() err := invoker(ctx, method, req, resp, cc, opts...) - metric.WithLabelValues(method, errorCode(err)).Observe(time.Since(start).Seconds()) + metric.WithLabelValues(method, instrumentationLabel.getInstrumentationLabel(err)).Observe(time.Since(start).Seconds()) return err } } // StreamClientInstrumentInterceptor records duration of streaming gRPC requests client side. -func StreamClientInstrumentInterceptor(metric *prometheus.HistogramVec) grpc.StreamClientInterceptor { +func StreamClientInstrumentInterceptor(metric *prometheus.HistogramVec, instrumentationOptions ...InstrumentationOption) grpc.StreamClientInterceptor { + // we enforce masking of HTTP statuses. + instrumentationLabel := applyInstrumentationOptions(true, instrumentationOptions...) return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption, ) (grpc.ClientStream, error) { start := time.Now() stream, err := streamer(ctx, desc, cc, method, opts...) s := &instrumentedClientStream{ - metric: metric, - start: start, - method: method, - serverStreams: desc.ServerStreams, - finished: atomic.NewBool(false), - finishedChan: make(chan struct{}), - stream: stream, + metric: metric, + start: start, + method: method, + serverStreams: desc.ServerStreams, + finished: atomic.NewBool(false), + finishedChan: make(chan struct{}), + stream: stream, + instrumentationLabel: instrumentationLabel, } s.awaitCompletion(ctx) return s, err @@ -87,13 +85,14 @@ func StreamClientInstrumentInterceptor(metric *prometheus.HistogramVec) grpc.Str // This implementation is heavily inspired by github.com/opentracing-contrib/go-grpc's openTracingClientStream. type instrumentedClientStream struct { - metric *prometheus.HistogramVec - start time.Time - method string - serverStreams bool - finished *atomic.Bool - finishedChan chan struct{} - stream grpc.ClientStream + metric *prometheus.HistogramVec + start time.Time + method string + serverStreams bool + finished *atomic.Bool + finishedChan chan struct{} + stream grpc.ClientStream + instrumentationLabel instrumentationLabel } func (s *instrumentedClientStream) Trailer() metadata.MD { @@ -122,7 +121,7 @@ func (s *instrumentedClientStream) finish(err error) { close(s.finishedChan) - s.metric.WithLabelValues(s.method, errorCode(err)).Observe(time.Since(s.start).Seconds()) + s.metric.WithLabelValues(s.method, s.instrumentationLabel.getInstrumentationLabel(err)).Observe(time.Since(s.start).Seconds()) } func (s *instrumentedClientStream) SendMsg(m interface{}) error { @@ -173,18 +172,75 @@ func (s *instrumentedClientStream) CloseSend() error { return err } -// errorCode converts an error into an error code string. -func errorCode(err error) string { - if err == nil { - return "2xx" +type InstrumentationOption func(*instrumentationLabel) + +var ( + // ReportGRPCStatusOption is an InstrumentationOption that is used for enabling gRPC status codes to be used + // in instrumentation labels. + ReportGRPCStatusOption InstrumentationOption = func(instrumentationLabel *instrumentationLabel) { + instrumentationLabel.reportGRPCStatus = true + } +) + +func applyInstrumentationOptions(maskHTTPStatuses bool, options ...InstrumentationOption) instrumentationLabel { + instrumentationLabel := instrumentationLabel{maskHTTPStatus: maskHTTPStatuses} + for _, opt := range options { + opt(&instrumentationLabel) + } + return instrumentationLabel +} + +type instrumentationLabel struct { + reportGRPCStatus bool + maskHTTPStatus bool +} + +// getInstrumentationLabel converts an error into an error code string by applying the configurations +// contained in this instrumentationLabel object. +func (i *instrumentationLabel) getInstrumentationLabel(err error) string { + statusCode := errorToStatusCode(err) + return i.statusCodeToString(statusCode) +} + +func (i *instrumentationLabel) statusCodeToString(statusCode codes.Code) string { + if isHTTPStatusCode(statusCode) { + statusFamily := int(statusCode / 100) + if i.maskHTTPStatus { + return strconv.Itoa(statusFamily) + "xx" + } + return strconv.Itoa(int(statusCode)) } - if errResp, ok := httpgrpc.HTTPResponseFromError(err); ok { - statusFamily := int(errResp.Code / 100) - return strconv.Itoa(statusFamily) + "xx" - } else if grpcutil.IsCanceled(err) { + if i.reportGRPCStatus { + return statusCode.String() + } + + if statusCode == codes.OK { + if i.maskHTTPStatus { + return "2xx" + } + return "success" + } + + if statusCode == codes.Canceled { return "cancel" - } else { - return "error" } + + return "error" +} + +func errorToStatusCode(err error) codes.Code { + if err == nil { + return codes.OK + } + + if errors.Is(err, context.Canceled) { + return codes.Canceled + } + + return grpcutil.ErrorToStatusCode(err) +} + +func isHTTPStatusCode(statusCode codes.Code) bool { + return int(statusCode) >= 100 && int(statusCode) < 600 } diff --git a/vendor/github.com/grafana/dskit/middleware/zero_response.go b/vendor/github.com/grafana/dskit/middleware/zero_response.go new file mode 100644 index 00000000000..1bb4ecc8d1f --- /dev/null +++ b/vendor/github.com/grafana/dskit/middleware/zero_response.go @@ -0,0 +1,132 @@ +package middleware + +import ( + "errors" + "net" + "os" + "regexp" + "strconv" + "sync" + + "github.com/go-kit/log" + "go.uber.org/atomic" +) + +// NewZeroResponseListener returns a Listener that logs all connections that encountered io timeout on reads, and were closed before sending any response. +func NewZeroResponseListener(list net.Listener, log log.Logger) net.Listener { + return &zeroResponseListener{ + Listener: list, + log: log, + bufPool: sync.Pool{ + New: func() interface{} { return &bufHolder{buf: make([]byte, 0, requestBufSize)} }, + }, + } +} + +// Wrap a slice in a struct, so we can store a pointer in sync.Pool +type bufHolder struct { + buf []byte +} + +// Size of buffer for read data. We log this eventually. +const requestBufSize = 512 + +type zeroResponseListener struct { + net.Listener + log log.Logger + bufPool sync.Pool // pool of &bufHolder. +} + +func (zl *zeroResponseListener) Accept() (net.Conn, error) { + conn, err := zl.Listener.Accept() + if err != nil { + return nil, err + } + bh := zl.bufPool.Get().(*bufHolder) + bh.buf = bh.buf[:0] + return &zeroResponseConn{Conn: conn, log: zl.log, bufHolder: bh, returnPool: &zl.bufPool}, nil +} + +type zeroResponseConn struct { + net.Conn + + log log.Logger + once sync.Once + returnPool *sync.Pool + + bufHolderMux sync.Mutex + bufHolder *bufHolder // Buffer with first requestBufSize bytes from connection. Set to nil as soon as data is written to the connection. + + lastReadErrIsDeadlineExceeded atomic.Bool +} + +func (zc *zeroResponseConn) Read(b []byte) (n int, err error) { + n, err = zc.Conn.Read(b) + if err != nil && errors.Is(err, os.ErrDeadlineExceeded) { + zc.lastReadErrIsDeadlineExceeded.Store(true) + } else { + zc.lastReadErrIsDeadlineExceeded.Store(false) + } + + // Store first requestBufSize read bytes on connection into the buffer for logging. + if n > 0 { + zc.bufHolderMux.Lock() + defer zc.bufHolderMux.Unlock() + + if zc.bufHolder != nil { + rem := requestBufSize - len(zc.bufHolder.buf) // how much space is in our buffer. + if rem > n { + rem = n + } + if rem > 0 { + zc.bufHolder.buf = append(zc.bufHolder.buf, b[:rem]...) + } + } + } + return +} + +func (zc *zeroResponseConn) Write(b []byte) (n int, err error) { + n, err = zc.Conn.Write(b) + if n > 0 { + zc.bufHolderMux.Lock() + if zc.bufHolder != nil { + zc.returnPool.Put(zc.bufHolder) + zc.bufHolder = nil + } + zc.bufHolderMux.Unlock() + } + return +} + +var authRegexp = regexp.MustCompile(`((?i)\r\nauthorization:\s+)(\S+\s+)(\S+)`) + +func (zc *zeroResponseConn) Close() error { + err := zc.Conn.Close() + + zc.once.Do(func() { + zc.bufHolderMux.Lock() + defer zc.bufHolderMux.Unlock() + + // If buffer was already returned, it means there was some data written on the connection, nothing to do. + if zc.bufHolder == nil { + return + } + + // If we didn't write anything to this connection, and we've got timeout while reading data, it looks like + // slow a slow client failing to send a request to us. + if !zc.lastReadErrIsDeadlineExceeded.Load() { + return + } + + b := zc.bufHolder.buf + b = authRegexp.ReplaceAll(b, []byte("${1}${2}***")) // Replace value in Authorization header with ***. + + _ = zc.log.Log("msg", "read timeout, connection closed with no response", "read", strconv.Quote(string(b)), "remote", zc.RemoteAddr().String()) + + zc.returnPool.Put(zc.bufHolder) + zc.bufHolder = nil + }) + + return err +} diff --git a/vendor/github.com/grafana/dskit/ring/batch.go b/vendor/github.com/grafana/dskit/ring/batch.go index fa627445ed2..11e511e0c62 100644 --- a/vendor/github.com/grafana/dskit/ring/batch.go +++ b/vendor/github.com/grafana/dskit/ring/batch.go @@ -8,7 +8,8 @@ import ( "sync" "go.uber.org/atomic" - "google.golang.org/grpc/status" + + grpcUtils "github.com/grafana/dskit/grpcutil" ) type batchTracker struct { @@ -25,38 +26,53 @@ type instance struct { } type itemTracker struct { - minSuccess int - maxFailures int - succeeded atomic.Int32 - failed4xx atomic.Int32 - failed5xx atomic.Int32 - remaining atomic.Int32 - err atomic.Error + minSuccess int + maxFailures int + succeeded atomic.Int32 + failedClient atomic.Int32 + failedServer atomic.Int32 + remaining atomic.Int32 + err atomic.Error } -func (i *itemTracker) recordError(err error) int32 { +func (i *itemTracker) recordError(err error, isClientError func(error) bool) int32 { i.err.Store(err) - if s, ok := status.FromError(err); ok && s.Code()/100 == 4 { - return i.failed4xx.Inc() + if isClientError(err) { + return i.failedClient.Inc() } + return i.failedServer.Inc() +} + +func isHTTPStatus4xx(err error) bool { + code := grpcUtils.ErrorToStatusCode(err) + return code/100 == 4 +} - return i.failed5xx.Inc() +// DoBatch is a special case of DoBatchWithClientError where errors +// containing HTTP status code 4xx are treated as client errors. +func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callback func(InstanceDesc, []int) error, cleanup func()) error { + return DoBatchWithClientError(ctx, op, r, keys, callback, cleanup, isHTTPStatus4xx) } -// DoBatch request against a set of keys in the ring, handling replication and -// failures. For example if we want to write N items where they may all -// hit different instances, and we want them all replicated R ways with -// quorum writes, we track the relationship between batch RPCs and the items -// within them. +// DoBatchWithClientError request against a set of keys in the ring, +// handling replication and failures. For example if we want to write +// N items where they may all hit different instances, and we want them +// all replicated R ways with quorum writes, we track the relationship +// between batch RPCs and the items within them. // -// Callback is passed the instance to target, and the indexes of the keys +// callback() is passed the instance to target, and the indexes of the keys // to send to that instance. // -// cleanup() is always called, either on an error before starting the batches or after they all finish. +// cleanup() is always called, either on an error before starting the batches +// or after they all finish. // -// Not implemented as a method on Ring so we can test separately. -func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callback func(InstanceDesc, []int) error, cleanup func()) error { +// isClientError() classifies errors returned by `callback()` into client or +// server errors. See `batchTracker.record()` function for details about how +// errors are combined into final error returned by DoBatchWithClientError. +// +// Not implemented as a method on Ring, so we can test separately. +func DoBatchWithClientError(ctx context.Context, op Operation, r ReadRing, keys []uint32, callback func(InstanceDesc, []int) error, cleanup func(), isClientError func(error) bool) error { if r.InstancesCount() <= 0 { cleanup() return fmt.Errorf("DoBatch: InstancesCount <= 0") @@ -106,7 +122,7 @@ func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callb for _, i := range instances { go func(i instance) { err := callback(i.desc, i.indexes) - tracker.record(i.itemTrackers, err) + tracker.record(i.itemTrackers, err, isClientError) wg.Done() }(i) } @@ -128,35 +144,36 @@ func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callb } } -func (b *batchTracker) record(itemTrackers []*itemTracker, err error) { +func (b *batchTracker) record(itemTrackers []*itemTracker, err error, isClientError func(error) bool) { // If we reach the required number of successful puts on this item, then decrement the // number of pending items by one. // // The use of atomic increments here is needed as: // * rpcsPending and rpcsFailed guarantee only a single goroutine will write to either channel - // * succeeded, failed4xx, failed5xx and remaining guarantee that the "return decision" is made atomically + // * succeeded, failedClient, failedServer and remaining guarantee that the "return decision" is made atomically // avoiding race condition - for i := range itemTrackers { + for _, it := range itemTrackers { if err != nil { // Track the number of errors by error family, and if it exceeds maxFailures // shortcut the waiting rpc. - errCount := itemTrackers[i].recordError(err) + errCount := it.recordError(err, isClientError) // We should return an error if we reach the maxFailure (quorum) on a given error family OR - // we don't have any remaining instances to try. + // we don't have any remaining instances to try. In the following we use ClientError and ServerError + // to denote errors, for which isClientError() returns true and false respectively. // - // Ex: 2xx, 4xx, 5xx -> return 5xx - // Ex: 4xx, 4xx, _ -> return 4xx - // Ex: 5xx, _, 5xx -> return 5xx + // Ex: Success, ClientError, ServerError -> return ServerError + // Ex: ClientError, ClientError, Success -> return ClientError + // Ex: ServerError, Success, ServerError -> return ServerError // - // The reason for searching for quorum in 4xx and 5xx errors separately is to give a more accurate - // response to the initial request. So if a quorum of instances rejects the request with 4xx, then the request should be rejected - // even if less-than-quorum instances indicated a failure to process the request (via 5xx). + // The reason for searching for quorum in ClientError and ServerError errors separately is to give a more accurate + // response to the initial request. So if a quorum of instances rejects the request with ClientError, then the request should be rejected + // even if less-than-quorum instances indicated a failure to process the request (via ServerError). // The speculation is that had the unavailable instances been available, - // they would have rejected the request with a 4xx as well. - // Conversely, if a quorum of instances failed to process the request via 5xx and less-than-quorum - // instances rejected it with 4xx, then we do not have quorum to reject the request as a 4xx. Instead, - // we return the last 5xx error for debuggability. - if errCount > int32(itemTrackers[i].maxFailures) || itemTrackers[i].remaining.Dec() == 0 { + // they would have rejected the request with a ClientError as well. + // Conversely, if a quorum of instances failed to process the request via ServerError and less-than-quorum + // instances rejected it with ClientError, then we do not have quorum to reject the request as a ClientError. Instead, + // we return the last ServerError error for debuggability. + if errCount > int32(it.maxFailures) || it.remaining.Dec() == 0 { if b.rpcsFailed.Inc() == 1 { b.err <- err } @@ -164,7 +181,8 @@ func (b *batchTracker) record(itemTrackers []*itemTracker, err error) { } else { // If we successfully process items in minSuccess instances, // then wake up the waiting rpc, so it can return early. - if itemTrackers[i].succeeded.Inc() >= int32(itemTrackers[i].minSuccess) { + succeeded := it.succeeded.Inc() + if succeeded == int32(it.minSuccess) { if b.rpcsPending.Dec() == 0 { b.done <- struct{}{} } @@ -172,11 +190,12 @@ func (b *batchTracker) record(itemTrackers []*itemTracker, err error) { } // If we successfully called this particular instance, but we don't have any remaining instances to try, - // and we failed to call minSuccess instances, then we need to return the last error - // Ex: 4xx, 5xx, 2xx - if itemTrackers[i].remaining.Dec() == 0 { - if b.rpcsFailed.Inc() == 1 { - b.err <- itemTrackers[i].err.Load() + // and we failed to call minSuccess instances, then we need to return the last error. + if succeeded < int32(it.minSuccess) { + if it.remaining.Dec() == 0 { + if b.rpcsFailed.Inc() == 1 { + b.err <- it.err.Load() + } } } } diff --git a/vendor/github.com/grafana/dskit/server/server.go b/vendor/github.com/grafana/dskit/server/server.go index 157642c648e..2b54283df7f 100644 --- a/vendor/github.com/grafana/dskit/server/server.go +++ b/vendor/github.com/grafana/dskit/server/server.go @@ -92,9 +92,10 @@ type Config struct { HTTPTLSConfig TLSConfig `yaml:"http_tls_config"` GRPCTLSConfig TLSConfig `yaml:"grpc_tls_config"` - RegisterInstrumentation bool `yaml:"register_instrumentation"` - ExcludeRequestInLog bool `yaml:"-"` - DisableRequestSuccessLog bool `yaml:"-"` + RegisterInstrumentation bool `yaml:"register_instrumentation"` + ReportGRPCCodesInInstrumentationLabel bool `yaml:"report_grpc_codes_in_instrumentation_label_enabled"` + ExcludeRequestInLog bool `yaml:"-"` + DisableRequestSuccessLog bool `yaml:"-"` ServerGracefulShutdownTimeout time.Duration `yaml:"graceful_shutdown_timeout"` HTTPServerReadTimeout time.Duration `yaml:"http_server_read_timeout"` @@ -102,6 +103,8 @@ type Config struct { HTTPServerWriteTimeout time.Duration `yaml:"http_server_write_timeout"` HTTPServerIdleTimeout time.Duration `yaml:"http_server_idle_timeout"` + HTTPLogClosedConnectionsWithoutResponse bool `yaml:"http_log_closed_connections_without_response_enabled"` + GRPCOptions []grpc.ServerOption `yaml:"-"` GRPCMiddleware []grpc.UnaryServerInterceptor `yaml:"-"` GRPCStreamMiddleware []grpc.StreamServerInterceptor `yaml:"-"` @@ -168,11 +171,13 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.GRPCListenPort, "server.grpc-listen-port", 9095, "gRPC server listen port.") f.IntVar(&cfg.GRPCConnLimit, "server.grpc-conn-limit", 0, "Maximum number of simultaneous grpc connections, <=0 to disable") f.BoolVar(&cfg.RegisterInstrumentation, "server.register-instrumentation", true, "Register the intrumentation handlers (/metrics etc).") + f.BoolVar(&cfg.ReportGRPCCodesInInstrumentationLabel, "server.report-grpc-codes-in-instrumentation-label-enabled", false, "If set to true, gRPC statuses will be reported in instrumentation labels with their string representations. Otherwise, they will be reported as \"error\".") f.DurationVar(&cfg.ServerGracefulShutdownTimeout, "server.graceful-shutdown-timeout", 30*time.Second, "Timeout for graceful shutdowns") f.DurationVar(&cfg.HTTPServerReadTimeout, "server.http-read-timeout", 30*time.Second, "Read timeout for entire HTTP request, including headers and body.") f.DurationVar(&cfg.HTTPServerReadHeaderTimeout, "server.http-read-header-timeout", 0, "Read timeout for HTTP request headers. If set to 0, value of -server.http-read-timeout is used.") f.DurationVar(&cfg.HTTPServerWriteTimeout, "server.http-write-timeout", 30*time.Second, "Write timeout for HTTP server") f.DurationVar(&cfg.HTTPServerIdleTimeout, "server.http-idle-timeout", 120*time.Second, "Idle timeout for HTTP server") + f.BoolVar(&cfg.HTTPLogClosedConnectionsWithoutResponse, "server.http-log-closed-connections-without-response-enabled", false, "Log closed connections that did not receive any response, most likely because client didn't send any request within timeout.") f.IntVar(&cfg.GRPCServerMaxRecvMsgSize, "server.grpc-max-recv-msg-size-bytes", 4*1024*1024, "Limit on the size of a gRPC message this server can receive (bytes).") f.IntVar(&cfg.GRPCServerMaxSendMsgSize, "server.grpc-max-send-msg-size-bytes", 4*1024*1024, "Limit on the size of a gRPC message this server can send (bytes).") f.UintVar(&cfg.GRPCServerMaxConcurrentStreams, "server.grpc-max-concurrent-streams", 100, "Limit on the number of concurrent streams for gRPC calls per client connection (0 = unlimited)") @@ -261,6 +266,9 @@ func newServer(cfg Config, metrics *Metrics) (*Server, error) { return nil, err } httpListener = middleware.CountingListener(httpListener, metrics.TCPConnections.WithLabelValues("http")) + if cfg.HTTPLogClosedConnectionsWithoutResponse { + httpListener = middleware.NewZeroResponseListener(httpListener, level.Warn(logger)) + } metrics.TCPConnectionsLimit.WithLabelValues("http").Set(float64(cfg.HTTPConnLimit)) if cfg.HTTPConnLimit > 0 { @@ -348,17 +356,21 @@ func newServer(cfg Config, metrics *Metrics) (*Server, error) { WithRequest: !cfg.ExcludeRequestInLog, DisableRequestSuccessLog: cfg.DisableRequestSuccessLog, } + var reportGRPCStatusesOptions []middleware.InstrumentationOption + if cfg.ReportGRPCCodesInInstrumentationLabel { + reportGRPCStatusesOptions = []middleware.InstrumentationOption{middleware.ReportGRPCStatusOption} + } grpcMiddleware := []grpc.UnaryServerInterceptor{ serverLog.UnaryServerInterceptor, otgrpc.OpenTracingServerInterceptor(opentracing.GlobalTracer()), - middleware.UnaryServerInstrumentInterceptor(metrics.RequestDuration), + middleware.UnaryServerInstrumentInterceptor(metrics.RequestDuration, reportGRPCStatusesOptions...), } grpcMiddleware = append(grpcMiddleware, cfg.GRPCMiddleware...) grpcStreamMiddleware := []grpc.StreamServerInterceptor{ serverLog.StreamServerInterceptor, otgrpc.OpenTracingStreamServerInterceptor(opentracing.GlobalTracer()), - middleware.StreamServerInstrumentInterceptor(metrics.RequestDuration), + middleware.StreamServerInstrumentInterceptor(metrics.RequestDuration, reportGRPCStatusesOptions...), } grpcStreamMiddleware = append(grpcStreamMiddleware, cfg.GRPCStreamMiddleware...) diff --git a/vendor/modules.txt b/vendor/modules.txt index 8b57a1c5d16..ada12eca287 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -475,7 +475,7 @@ github.com/gorilla/mux # github.com/gorilla/websocket v1.5.0 ## explicit; go 1.12 github.com/gorilla/websocket -# github.com/grafana/dskit v0.0.0-20231031132813-52f4e8d82d59 +# github.com/grafana/dskit v0.0.0-20231113165309-40e91edb6190 ## explicit; go 1.20 github.com/grafana/dskit/backoff github.com/grafana/dskit/concurrency