Skip to content

Commit

Permalink
Revamp metrics to account the status code (#320)
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix authored Mar 19, 2024
1 parent 54fce30 commit eead2c3
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 41 deletions.
33 changes: 15 additions & 18 deletions disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/reflection"
)

Expand Down Expand Up @@ -97,7 +98,6 @@ func NewDispersalServer(
}

func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_DisperseBlobAuthenticatedServer) error {

// Process disperse_request
in, err := stream.Recv()
if err != nil {
Expand All @@ -112,7 +112,7 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse
blob, err := s.validateRequestAndGetBlob(stream.Context(), request.DisperseRequest)
if err != nil {
for _, quorumID := range request.DisperseRequest.CustomQuorumNumbers {
s.metrics.HandleFailedRequest(fmt.Sprint(quorumID), len(request.DisperseRequest.GetData()), "DisperseBlob")
s.metrics.HandleFailedRequest(codes.InvalidArgument.String(), fmt.Sprint(quorumID), len(request.DisperseRequest.GetData()), "DisperseBlob")
}
return api.NewInvalidArgError(err.Error())
}
Expand Down Expand Up @@ -182,11 +182,10 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse
}

func (s *DispersalServer) DisperseBlob(ctx context.Context, req *pb.DisperseBlobRequest) (*pb.DisperseBlobReply, error) {

blob, err := s.validateRequestAndGetBlob(ctx, req)
if err != nil {
for _, quorumID := range req.CustomQuorumNumbers {
s.metrics.HandleFailedRequest(fmt.Sprint(quorumID), len(req.GetData()), "DisperseBlob")
s.metrics.HandleFailedRequest(codes.InvalidArgument.String(), fmt.Sprint(quorumID), len(req.GetData()), "DisperseBlob")
}
return nil, api.NewInvalidArgError(err.Error())
}
Expand Down Expand Up @@ -218,7 +217,7 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut
if err != nil {
for _, param := range securityParams {
quorumId := string(param.QuorumID)
s.metrics.HandleFailedRequest(quorumId, blobSize, "DisperseBlob")
s.metrics.HandleFailedRequest(codes.InvalidArgument.String(), quorumId, blobSize, "DisperseBlob")
}
return nil, api.NewInvalidArgError(err.Error())
}
Expand All @@ -228,15 +227,10 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut
if s.ratelimiter != nil {
err := s.checkRateLimitsAndAddRates(ctx, blob, origin, authenticatedAddress)
if err != nil {
for _, param := range securityParams {
quorumId := string(param.QuorumID)
if errors.Is(err, errSystemBlobRateLimit) || errors.Is(err, errSystemThroughputRateLimit) {
s.metrics.HandleSystemRateLimitedRequest(quorumId, blobSize, "DisperseBlob")
} else if errors.Is(err, errAccountBlobRateLimit) || errors.Is(err, errAccountThroughputRateLimit) {
s.metrics.HandleAccountRateLimitedRequest(quorumId, blobSize, "DisperseBlob")
} else {
s.metrics.HandleFailedRequest(quorumId, blobSize, "DisperseBlob")
}
rateLimited := errors.Is(err, errSystemBlobRateLimit) || errors.Is(err, errSystemThroughputRateLimit) || errors.Is(err, errAccountBlobRateLimit) || errors.Is(err, errAccountThroughputRateLimit)
if !rateLimited {
s.metrics.HandleFailedRequest(codes.Internal.String(), "", blobSize, "DisperseBlob")
return nil, api.NewInternalError(err.Error())
}
return nil, api.NewResourceExhaustedError(err.Error())
}
Expand Down Expand Up @@ -332,6 +326,7 @@ func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob *
for i, param := range blob.RequestHeader.SecurityParams {

rates, ok := s.rateConfig.QuorumRateInfos[param.QuorumID]
quorumId := string(param.QuorumID)
if !ok {
return fmt.Errorf("no configured rate exists for quorum %d", param.QuorumID)
}
Expand All @@ -357,6 +352,7 @@ func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob *
}
if !allowed {
s.logger.Warn("system byte ratelimit exceeded", "systemQuorumKey", systemQuorumKey, "rate", rates.TotalUnauthThroughput)
s.metrics.HandleSystemRateLimitedRequest(quorumId, blobSize, "DisperseBlob")
return errSystemThroughputRateLimit
}

Expand All @@ -367,6 +363,7 @@ func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob *
}
if !allowed {
s.logger.Warn("system blob ratelimit exceeded", "systemQuorumKey", systemQuorumKey, "rate", float32(rates.TotalUnauthBlobRate)/blobRateMultiplier)
s.metrics.HandleSystemRateLimitedRequest(quorumId, blobSize, "DisperseBlob")
return errSystemBlobRateLimit
}

Expand All @@ -379,6 +376,7 @@ func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob *
}
if !allowed {
s.logger.Warn("account byte ratelimit exceeded", "accountQuorumKey", accountQuorumKey, "rate", accountRates.Throughput)
s.metrics.HandleAccountRateLimitedRequest(quorumId, blobSize, "DisperseBlob")
return errAccountThroughputRateLimit
}

Expand All @@ -389,6 +387,7 @@ func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob *
}
if !allowed {
s.logger.Warn("account blob ratelimit exceeded", "accountQuorumKey", accountQuorumKey, "rate", float32(accountRates.BlobRate)/blobRateMultiplier)
s.metrics.HandleAccountRateLimitedRequest(quorumId, blobSize, "DisperseBlob")
return errAccountBlobRateLimit
}

Expand Down Expand Up @@ -519,17 +518,15 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob
blobMetadata, err := s.blobStore.GetMetadataInBatch(ctx, batchHeaderHash32, blobIndex)
if err != nil {
s.logger.Error("Failed to retrieve blob metadata", "err", err)
s.metrics.IncrementFailedBlobRequestNum("", "RetrieveBlob")

// TODO: we need to distinguish NOT_FOUND from actual internal error.
s.metrics.IncrementFailedBlobRequestNum(codes.Internal.String(), "", "RetrieveBlob")
return nil, api.NewInternalError("failed to get blob metadata, please retry")
}

data, err := s.blobStore.GetBlobContent(ctx, blobMetadata.BlobHash)
if err != nil {
s.logger.Error("Failed to retrieve blob", "err", err)
s.metrics.HandleFailedRequest("", len(data), "RetrieveBlob")

s.metrics.HandleFailedRequest(codes.Internal.String(), "", len(data), "RetrieveBlob")
return nil, api.NewInternalError("failed to get blob data, please retry")
}

Expand Down
44 changes: 25 additions & 19 deletions disperser/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc/codes"
)

type MetricsConfig struct {
Expand Down Expand Up @@ -48,7 +49,7 @@ func NewMetrics(httpPort string, logger logging.Logger) *Metrics {
Name: "requests_total",
Help: "the number of blob requests",
},
[]string{"status", "quorum", "method"},
[]string{"status_code", "status", "quorum", "method"},
),
BlobSize: promauto.With(reg).NewGaugeVec(
prometheus.GaugeOpts{
Expand Down Expand Up @@ -82,9 +83,10 @@ func (g *Metrics) ObserveLatency(method string, latencyMs float64) {
// IncrementSuccessfulBlobRequestNum increments the number of successful blob requests
func (g *Metrics) IncrementSuccessfulBlobRequestNum(quorum string, method string) {
g.NumBlobRequests.With(prometheus.Labels{
"status": "success",
"quorum": quorum,
"method": method,
"status_code": codes.OK.String(),
"status": "success",
"quorum": quorum,
"method": method,
}).Inc()
}

Expand All @@ -99,17 +101,18 @@ func (g *Metrics) HandleSuccessfulRequest(quorum string, blobBytes int, method s
}

// IncrementFailedBlobRequestNum increments the number of failed blob requests
func (g *Metrics) IncrementFailedBlobRequestNum(quorum string, method string) {
func (g *Metrics) IncrementFailedBlobRequestNum(statusCode string, quorum string, method string) {
g.NumBlobRequests.With(prometheus.Labels{
"status": "failed",
"quorum": quorum,
"method": method,
"status_code": statusCode,
"status": "failed",
"quorum": quorum,
"method": method,
}).Inc()
}

// HandleFailedRequest updates the number of failed requests and the size of the blob
func (g *Metrics) HandleFailedRequest(quorum string, blobBytes int, method string) {
g.IncrementFailedBlobRequestNum(quorum, method)
func (g *Metrics) HandleFailedRequest(statusCode string, quorum string, blobBytes int, method string) {
g.IncrementFailedBlobRequestNum(statusCode, quorum, method)
g.BlobSize.With(prometheus.Labels{
"status": "failed",
"quorum": quorum,
Expand All @@ -120,9 +123,10 @@ func (g *Metrics) HandleFailedRequest(quorum string, blobBytes int, method strin
// HandleBlobStoreFailedRequest updates the number of requests failed to store blob and the size of the blob
func (g *Metrics) HandleBlobStoreFailedRequest(quorum string, blobBytes int, method string) {
g.NumBlobRequests.With(prometheus.Labels{
"status": StoreBlobFailure,
"quorum": quorum,
"method": method,
"status_code": codes.Internal.String(),
"status": StoreBlobFailure,
"quorum": quorum,
"method": method,
}).Inc()
g.BlobSize.With(prometheus.Labels{
"status": StoreBlobFailure,
Expand All @@ -134,9 +138,10 @@ func (g *Metrics) HandleBlobStoreFailedRequest(quorum string, blobBytes int, met
// HandleSystemRateLimitedRequest updates the number of system rate limited requests and the size of the blob
func (g *Metrics) HandleSystemRateLimitedRequest(quorum string, blobBytes int, method string) {
g.NumBlobRequests.With(prometheus.Labels{
"status": SystemRateLimitedFailure,
"quorum": quorum,
"method": method,
"status_code": codes.ResourceExhausted.String(),
"status": SystemRateLimitedFailure,
"quorum": quorum,
"method": method,
}).Inc()
g.BlobSize.With(prometheus.Labels{
"status": SystemRateLimitedFailure,
Expand All @@ -148,9 +153,10 @@ func (g *Metrics) HandleSystemRateLimitedRequest(quorum string, blobBytes int, m
// HandleAccountRateLimitedRequest updates the number of account rate limited requests and the size of the blob
func (g *Metrics) HandleAccountRateLimitedRequest(quorum string, blobBytes int, method string) {
g.NumBlobRequests.With(prometheus.Labels{
"status": AccountRateLimitedFailure,
"quorum": quorum,
"method": method,
"status_code": codes.ResourceExhausted.String(),
"status": AccountRateLimitedFailure,
"quorum": quorum,
"method": method,
}).Inc()
g.BlobSize.With(prometheus.Labels{
"status": AccountRateLimitedFailure,
Expand Down
2 changes: 1 addition & 1 deletion operators/churner/churner.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (c *churner) ProcessChurnRequest(ctx context.Context, operatorToRegisterAdd
for _, quorumID := range churnRequest.QuorumIDs {
for _, quorumIDAlreadyRegisteredFor := range quorumIDsAlreadyRegisteredFor {
if quorumIDAlreadyRegisteredFor == quorumID {
return nil, errors.New("operator is already registered in quorum")
return nil, api.NewInvalidArgError("operator is already registered in quorum")
}
}
}
Expand Down
23 changes: 22 additions & 1 deletion operators/churner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc/codes"
)

type FailReason string

// Note: failure reason constants must be maintained in sync with statusCodeMap.
const (
FailReasonRateLimitExceeded FailReason = "rate_limit_exceeded" // Rate limited: per operator rate limiting
FailReasonInsufficientStakeToRegister FailReason = "insufficient_stake_to_register" // Operator doesn't have enough stake to be registered
Expand All @@ -25,6 +27,18 @@ const (
FailReasonInvalidRequest FailReason = "invalid_request" // Invalid request: request is malformed
)

// Note: statusCodeMap must be maintained in sync with failure reason constants.
var statusCodeMap map[FailReason]string = map[FailReason]string{
FailReasonRateLimitExceeded: codes.ResourceExhausted.String(),
FailReasonInsufficientStakeToRegister: codes.InvalidArgument.String(),
FailReasonInsufficientStakeToChurn: codes.InvalidArgument.String(),
FailReasonQuorumIdOutOfRange: codes.InvalidArgument.String(),
FailReasonPrevApprovalNotExpired: codes.ResourceExhausted.String(),
FailReasonInvalidSignature: codes.InvalidArgument.String(),
FailReasonProcessChurnRequestFailed: codes.Internal.String(),
FailReasonInvalidRequest: codes.InvalidArgument.String(),
}

type MetricsConfig struct {
HTTPPort string
EnableMetrics bool
Expand Down Expand Up @@ -87,8 +101,15 @@ func (g *Metrics) IncrementSuccessfulRequestNum(method string) {

// IncrementFailedRequestNum increments the number of failed requests
func (g *Metrics) IncrementFailedRequestNum(method string, reason FailReason) {
code, ok := statusCodeMap[reason]
if !ok {
g.logger.Error("cannot map failure reason to status code", "failure reason", reason)
// Treat this as an internal server error. This is a conservative approach to
// handle a negligence of mapping from failure reason to status code.
code = codes.Internal.String()
}
g.NumRequests.With(prometheus.Labels{
"status": "failed",
"status": code,
"reason": string(reason),
"method": method,
}).Inc()
Expand Down
3 changes: 1 addition & 2 deletions operators/churner/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func (s *Server) Start(metricsConfig MetricsConfig) error {
}

func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnReply, error) {

err := s.validateChurnRequest(ctx, req)
if err != nil {
s.metrics.IncrementFailedRequestNum("Churn", FailReasonInvalidRequest)
Expand Down Expand Up @@ -92,10 +91,10 @@ func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnRepl

response, err := s.churner.ProcessChurnRequest(ctx, operatorToRegisterAddress, request)
if err != nil {
s.metrics.IncrementFailedRequestNum("Churn", FailReasonProcessChurnRequestFailed)
if _, ok := status.FromError(err); ok {
return nil, err
}
s.metrics.IncrementFailedRequestNum("Churn", FailReasonProcessChurnRequestFailed)
return nil, api.NewInternalError(fmt.Sprintf("failed to process churn request: %s", err.Error()))
}

Expand Down

0 comments on commit eead2c3

Please sign in to comment.