Skip to content

Commit

Permalink
feat(render): return error on partial targets fetch
Browse files Browse the repository at this point in the history
  • Loading branch information
msaf1980 committed Jan 19, 2024
1 parent 220c62b commit fcfbdd2
Show file tree
Hide file tree
Showing 21 changed files with 470 additions and 162 deletions.
9 changes: 6 additions & 3 deletions cmd/carbonapi/http/render_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,9 @@ func renderHandler(w http.ResponseWriter, r *http.Request) {
result, err := expr.FetchAndEvalExp(ctx, exp, from32, until32, values)
if err != nil {
errors[target] = merry.Wrap(err)
// if config.Config.Upstreams.RequireSuccessAll {
// break
// }
}

results = append(results, result...)
Expand All @@ -348,18 +351,18 @@ func renderHandler(w http.ResponseWriter, r *http.Request) {
var body []byte

returnCode := http.StatusOK
if len(results) == 0 {
if len(results) == 0 || (len(errors) > 0 && config.Config.Upstreams.RequireSuccessAll) {
// Obtain error code from the errors
// In case we have only "Not Found" errors, result should be 404
// Otherwise it should be 500
returnCode, errMsgs := helper.MergeHttpErrorMap(errors)
logger.Debug("error response or no response", zap.Strings("error", errMsgs))
// Allow override status code for 404-not-found replies.
if returnCode == 404 {
if returnCode == http.StatusNotFound {
returnCode = config.Config.NotFoundStatusCode
}

if returnCode == 400 || returnCode == http.StatusForbidden || returnCode >= 500 {
if returnCode == http.StatusBadRequest || returnCode == http.StatusNotFound || returnCode == http.StatusForbidden || returnCode >= 500 {
setError(w, accessLogDetails, strings.Join(errMsgs, ","), returnCode, uid.String())
logAsError = true
return
Expand Down
2 changes: 1 addition & 1 deletion cmd/mockbackend/e2etesting.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,11 @@ func e2eTest(logger *zap.Logger, noapp, breakOnError bool) bool {

for _, t := range cfg.Test.Queries {
failures := doTest(logger, &t)

if len(failures) != 0 {
failed = true
logger.Error("test failed",
zap.Errors("failures", failures),
zap.String("url", t.URL), zap.String("type", t.Type), zap.String("body", t.Body),
)
for _, v := range runningApps {
if !v.IsRunning() {
Expand Down
1 change: 1 addition & 0 deletions cmd/mockbackend/http_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
)

type Response struct {
Code int `yaml:"code"`
ReplyDelayMS int `yaml:"replyDelayMS"`
PathExpression string `yaml:"pathExpression"`
Data []Metric `yaml:"data"`
Expand Down
2 changes: 1 addition & 1 deletion cmd/mockbackend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type MainConfig struct {

type Listener struct {
Address string `yaml:"address"`
Code int `yaml:"httpCode"`
Code int `yaml:"httpCode"` // global responce code
ShuffleResults bool `yaml:"shuffleResults"`
EmptyBody bool `yaml:"emptyBody"`
Expressions map[string]Response `yaml:"expressions"`
Expand Down
153 changes: 80 additions & 73 deletions cmd/mockbackend/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,89 +91,96 @@ func (cfg *listener) renderHandler(wr http.ResponseWriter, req *http.Request) {
Metrics: []carbonapi_v3_pb.FetchResponse{},
}

newCfg := Listener{
Code: cfg.Code,
EmptyBody: cfg.EmptyBody,
Expressions: copyMap(cfg.Expressions),
}

httpCode := http.StatusOK
for _, target := range targets {
response, ok := newCfg.Expressions[target]
if !ok {
wr.WriteHeader(http.StatusNotFound)
_, _ = wr.Write([]byte("Not found"))
return
}
if response.ReplyDelayMS > 0 {
delay := time.Duration(response.ReplyDelayMS) * time.Millisecond
logger.Info("will add extra delay",
zap.Duration("delay", delay),
)
time.Sleep(delay)
}
for _, m := range response.Data {
step := m.Step
if step == 0 {
step = 1
}
startTime := m.StartTime
if startTime == 0 {
startTime = step
}
isAbsent := make([]bool, 0, len(m.Values))
protov2Values := make([]float64, 0, len(m.Values))
for i := range m.Values {
if math.IsNaN(m.Values[i]) {
isAbsent = append(isAbsent, true)
protov2Values = append(protov2Values, 0.0)
} else {
isAbsent = append(isAbsent, false)
protov2Values = append(protov2Values, m.Values[i])
}
if response, ok := cfg.Expressions[target]; ok {
if response.ReplyDelayMS > 0 {
delay := time.Duration(response.ReplyDelayMS) * time.Millisecond
logger.Info("will add extra delay",
zap.Duration("delay", delay),
)
time.Sleep(delay)
}
fr2 := carbonapi_v2_pb.FetchResponse{
Name: m.MetricName,
StartTime: int32(startTime),
StopTime: int32(startTime + step*len(protov2Values)),
StepTime: int32(step),
Values: protov2Values,
IsAbsent: isAbsent,
if response.Code > 0 && response.Code != http.StatusOK {
httpCode = response.Code
}
if httpCode == http.StatusOK {
for _, m := range response.Data {
step := m.Step
if step == 0 {
step = 1
}
startTime := m.StartTime
if startTime == 0 {
startTime = step
}
isAbsent := make([]bool, 0, len(m.Values))
protov2Values := make([]float64, 0, len(m.Values))
for i := range m.Values {
if math.IsNaN(m.Values[i]) {
isAbsent = append(isAbsent, true)
protov2Values = append(protov2Values, 0.0)
} else {
isAbsent = append(isAbsent, false)
protov2Values = append(protov2Values, m.Values[i])
}
}
fr2 := carbonapi_v2_pb.FetchResponse{
Name: m.MetricName,
StartTime: int32(startTime),
StopTime: int32(startTime + step*len(protov2Values)),
StepTime: int32(step),
Values: protov2Values,
IsAbsent: isAbsent,
}

fr3 := carbonapi_v3_pb.FetchResponse{
Name: m.MetricName,
PathExpression: target,
ConsolidationFunc: "avg",
StartTime: int64(startTime),
StopTime: int64(startTime + step*len(m.Values)),
StepTime: int64(step),
XFilesFactor: 0,
HighPrecisionTimestamps: false,
Values: m.Values,
RequestStartTime: 1,
RequestStopTime: int64(startTime + step*len(m.Values)),
}
fr3 := carbonapi_v3_pb.FetchResponse{
Name: m.MetricName,
PathExpression: target,
ConsolidationFunc: "avg",
StartTime: int64(startTime),
StopTime: int64(startTime + step*len(m.Values)),
StepTime: int64(step),
XFilesFactor: 0,
HighPrecisionTimestamps: false,
Values: m.Values,
RequestStartTime: 1,
RequestStopTime: int64(startTime + step*len(m.Values)),
}

multiv2.Metrics = append(multiv2.Metrics, fr2)
multiv3.Metrics = append(multiv3.Metrics, fr3)
multiv2.Metrics = append(multiv2.Metrics, fr2)
multiv3.Metrics = append(multiv3.Metrics, fr3)
}
}
}
}

if cfg.Listener.ShuffleResults {
rand.Shuffle(len(multiv2.Metrics), func(i, j int) {
multiv2.Metrics[i], multiv2.Metrics[j] = multiv2.Metrics[j], multiv2.Metrics[i]
})
rand.Shuffle(len(multiv3.Metrics), func(i, j int) {
multiv3.Metrics[i], multiv3.Metrics[j] = multiv3.Metrics[j], multiv3.Metrics[i]
})
}
if httpCode == http.StatusOK {
if len(multiv2.Metrics) == 0 {
wr.WriteHeader(http.StatusNotFound)
_, _ = wr.Write([]byte("Not found"))
return
}

contentType, d := cfg.marshalResponse(wr, logger, format, multiv3, multiv2)
if d == nil {
return
if cfg.Listener.ShuffleResults {
rand.Shuffle(len(multiv2.Metrics), func(i, j int) {
multiv2.Metrics[i], multiv2.Metrics[j] = multiv2.Metrics[j], multiv2.Metrics[i]
})
rand.Shuffle(len(multiv3.Metrics), func(i, j int) {
multiv3.Metrics[i], multiv3.Metrics[j] = multiv3.Metrics[j], multiv3.Metrics[i]
})
}

contentType, d := cfg.marshalResponse(wr, logger, format, multiv3, multiv2)
if d == nil {
return
}
wr.Header().Set("Content-Type", contentType)
_, _ = wr.Write(d)
} else {
wr.WriteHeader(httpCode)
_, _ = wr.Write([]byte(http.StatusText(httpCode)))
}
wr.Header().Set("Content-Type", contentType)
_, _ = wr.Write(d)
}

func (cfg *listener) marshalResponse(wr http.ResponseWriter, logger *zap.Logger, format responseFormat, multiv3 carbonapi_v3_pb.MultiFetchResponse, multiv2 carbonapi_v2_pb.MultiFetchResponse) (string, []byte) {
Expand Down
57 changes: 57 additions & 0 deletions cmd/mockbackend/testcases/render_error/carbonapi.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
listen: "localhost:8081"
expvar:
enabled: true
pprofEnabled: false
listen: ""
concurency: 1000
notFoundStatusCode: 200
cache:
type: "mem"
size_mb: 0
defaultTimeoutSec: 60
cpus: 0
tz: ""
maxBatchSize: 0
graphite:
host: ""
interval: "60s"
prefix: "carbon.api"
pattern: "{prefix}.{fqdn}"
idleConnections: 10
pidFile: ""
upstreams:
buckets: 10
timeouts:
find: "2s"
render: "5s"
connect: "200ms"
concurrencyLimitPerServer: 0
keepAliveInterval: "30s"
maxIdleConnsPerHost: 100
backendsv2:
backends:
-
groupName: "mock-001"
protocol: "auto"
lbMethod: "all"
maxTries: 1
maxBatchSize: 0
keepAliveInterval: "10s"
concurrencyLimit: 0
forceAttemptHTTP2: true
maxIdleConnsPerHost: 1000
timeouts:
find: "3s"
render: "5s"
connect: "200ms"
servers:
- "http://127.0.0.1:9070"
graphite09compat: false
expireDelaySec: 10
logger:
- logger: ""
file: "stderr"
level: "debug"
encoding: "console"
encodingTime: "iso8601"
encodingDuration: "seconds"
99 changes: 99 additions & 0 deletions cmd/mockbackend/testcases/render_error/render_error.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
version: "v1"
test:
apps:
- name: "carbonapi"
binary: "./carbonapi"
args:
- "-config"
- "./cmd/mockbackend/testcases/render_error/carbonapi.yaml"
queries:
- endpoint: "http://127.0.0.1:8081"
type: "GET"
URL: "/render/?target=a&format=json"
expectedResponse:
httpCode: 200
contentType: "application/json"
expectedResults:
- metrics:
- target: "a"
datapoints: [[0,1],[1,2],[2,3],[2,4],[3,5]]

# empty
- endpoint: "http://127.0.0.1:8081"
type: "GET"
URL: "/render/?target=b&format=json"
expectedResponse:
httpCode: 200
contentType: "application/json"

- endpoint: "http://127.0.0.1:8081"
type: "GET"
URL: "/render/?target=a&target=b&format=json"
expectedResponse:
httpCode: 200
contentType: "application/json"
expectedResults:
- metrics:
- target: "a"
datapoints: [[0,1],[1,2],[2,3],[2,4],[3,5]]

# timeout
- endpoint: "http://127.0.0.1:8081"
type: "GET"
URL: "/render/?target=c&format=json"
expectedResponse:
httpCode: 503
contentType: "text/plain; charset=utf-8"

# 503
- endpoint: "http://127.0.0.1:8081"
type: "GET"
URL: "/render/?target=d&format=json"
expectedResponse:
httpCode: 503
contentType: "text/plain; charset=utf-8"

# partial success
- endpoint: "http://127.0.0.1:8081"
type: "GET"
URL: "/render/?target=a&target=d&format=json"
expectedResponse:
httpCode: 200
contentType: "application/json"
expectedResults:
- metrics:
- target: "a"
datapoints: [[0,1],[1,2],[2,3],[2,4],[3,5]]

# partial success
- endpoint: "http://127.0.0.1:8081"
type: "GET"
URL: "/render/?target=divideSeries(a,d)&format=json"
expectedResponse:
httpCode: 200
contentType: "application/json"
expectedResults:
- metrics:
- target: "divideSeries(a,MISSING)"
datapoints: [[nan,1],[nan,2],[nan,3],[nan,4],[nan,5]]

listeners:
- address: ":9070"
expressions:
"a":
pathExpression: "a"
data:
- metricName: "a"
values: [0,1,2,2,3]

# timeout
"c":
pathExpression: "c"
emptyBody: true
code: 404
replyDelayMS: 7000

"d":
pathExpression: "d"
emptyBody: true
code: 503
Loading

0 comments on commit fcfbdd2

Please sign in to comment.