diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 0539cff60..4ce8b76f4 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -14,8 +14,8 @@ jobs: strategy: matrix: go: - - ^1.20 - ^1.21 + - ^1.22 - ^1 steps: diff --git a/README.md b/README.md index 33b12e5ab..eb7acce75 100644 --- a/README.md +++ b/README.md @@ -121,7 +121,7 @@ Golang compatibility matrix: | 1.18 | 0.16.0-patch2 | | 1.20 | 0.16.1 | -Overall rule of thumb is that carbonapi supports last 2 major go versions. E.x. at this moment Go 1.19 and 1.18 are supported. +Overall rule of thumb is that carbonapi supports last 2 major go versions. E.x. at this moment Go 1.22 and 1.21 are supported. You can verify current versions that are being tested in [CI Configuration](https://github.com/go-graphite/carbonapi/blob/main/.github/workflows/tests.yml#L14). @@ -156,18 +156,19 @@ Internal Metrics ---------------------------------- The internal metrics are configured inside the [graphite](https://github.com/go-graphite/carbonapi/blob/main/doc/configuration.md#graphite) subsection and sent to your destinated host on an specified interval. The metrics are: -cache_items - if caching is enabled, this metric will contain many metrics are stored in cache -cache_size - configured query cache size in bytes -request_cache_hits - how many requests were served from cache. (this is for requests to /render endpoint) -request_cache_misses - how many requests were not in cache. (this is for requests to /render endpoint) -request_cache_overhead_ns - how much time in ns it took to talk to cache (that is useful to assess if cache actually helps you in terms of latency) (this is for requests to /render endpoint) -find_requests - requests server by endpoint /metrics/find -requests - requests served by endpoint /render -requests_in_XX_to_XX - request response times in percentiles -timeouts - number of timeouts while fetching from backend -backend_cache_hits - how many requests were not read from backend -backend_cache_misses - how many requests were not found in the backend - +| Metric Name | Description | +| ----------- | ----------- | +| cache_items | if caching is enabled, this metric will contain many metrics are stored in cache | +| cache_size | configured query cache size in bytes | +| request_cache_hits | how many requests were served from cache. (this is for requests to /render endpoint) | +| request_cache_misses | how many requests were not in cache. (this is for requests to /render endpoint) | +| request_cache_overhead_ns | how much time in ns it took to talk to cache (that is useful to assess if cache actually helps you in terms of latency) (this is for |requests to /render endpoint) +| find_requests | requests server by endpoint /metrics/find | +| requests | requests served by endpoint /render | +| requests_in_XX_to_XX | request response times in percentiles | +| timeouts | number of timeouts while fetching from backend | +| backend_cache_hits | how many requests were not read from backend | +| backend_cache_misses | how many requests were not found in the backend | OSX Build Notes --------------- diff --git a/cmd/carbonapi/config/config.go b/cmd/carbonapi/config/config.go index 122665b46..39019e3c6 100644 --- a/cmd/carbonapi/config/config.go +++ b/cmd/carbonapi/config/config.go @@ -107,7 +107,8 @@ type ConfigType struct { TruncateTimeMap map[time.Duration]time.Duration `mapstructure:"truncateTime"` TruncateTime []DurationTruncate `mapstructure:"-" json:"-"` // produce from TruncateTimeMap and sort in reverse order - CombineMultipleTargetsInOne bool `mapstructure:"combineMultipleTargetsInOne"` + MaxQueryLength uint64 `mapstructure:"maxQueryLength"` + CombineMultipleTargetsInOne bool `mapstructure:"combineMultipleTargetsInOne"` ResponseCache cache.BytesCache `mapstructure:"-" json:"-"` BackendCache cache.BytesCache `mapstructure:"-" json:"-"` diff --git a/cmd/carbonapi/config/init.go b/cmd/carbonapi/config/init.go index a1b7a738d..4293ae492 100644 --- a/cmd/carbonapi/config/init.go +++ b/cmd/carbonapi/config/init.go @@ -330,7 +330,7 @@ func createCache(logger *zap.Logger, cacheName string, cacheConfig *CacheConfig) } } -func SetUpViper(logger *zap.Logger, configPath *string, viperPrefix string) { +func SetUpViper(logger *zap.Logger, configPath *string, exactConfig bool, viperPrefix string) { if *configPath != "" { b, err := os.ReadFile(*configPath) if err != nil { @@ -386,24 +386,30 @@ func SetUpViper(logger *zap.Logger, configPath *string, viperPrefix string) { viper.SetDefault("upstreams.internalRoutingCache", "600s") viper.SetDefault("upstreams.buckets", 10) viper.SetDefault("upstreams.sumBuckets", false) - viper.SetDefault("upstreams.bucketsWeight", []int64{}) - viper.SetDefault("upstreams.bucketsNames", []string{}) + viper.SetDefault("upstreams.bucketsWidth", []int64{}) + viper.SetDefault("upstreams.bucketsLabels", []string{}) viper.SetDefault("upstreams.slowLogThreshold", "1s") - viper.SetDefault("upstreams.timeouts.global", "10s") - viper.SetDefault("upstreams.timeouts.afterStarted", "2s") + viper.SetDefault("upstreams.timeouts.find", "2s") + viper.SetDefault("upstreams.timeouts.render", "10s") viper.SetDefault("upstreams.timeouts.connect", "200ms") - viper.SetDefault("upstreams.concurrencyLimit", 0) + viper.SetDefault("upstreams.concurrencyLimitPerServer", 0) viper.SetDefault("upstreams.keepAliveInterval", "30s") viper.SetDefault("upstreams.maxIdleConnsPerHost", 100) viper.SetDefault("upstreams.scaleToCommonStep", true) - viper.SetDefault("upstreams.graphite09compat", false) + viper.SetDefault("graphite09compat", false) viper.SetDefault("expireDelaySec", 600) viper.SetDefault("useCachingDNSResolver", false) viper.SetDefault("logger", map[string]string{}) viper.SetDefault("combineMultipleTargetsInOne", false) viper.AutomaticEnv() - err := viper.Unmarshal(&Config) + var err error + if exactConfig { + err = viper.UnmarshalExact(&Config) + } else { + err = viper.Unmarshal(&Config) + } + if err != nil { logger.Fatal("failed to parse config", zap.Error(err), diff --git a/cmd/carbonapi/http/expand_handler.go b/cmd/carbonapi/http/expand_handler.go index df09abe62..89e048e6c 100644 --- a/cmd/carbonapi/http/expand_handler.go +++ b/cmd/carbonapi/http/expand_handler.go @@ -76,6 +76,12 @@ func expandHandler(w http.ResponseWriter, r *http.Request) { return } + if queryLengthLimitExceeded(query, config.Config.MaxQueryLength) { + setError(w, &accessLogDetails, "query length limit exceeded", http.StatusBadRequest, uid.String()) + logAsError = true + return + } + var pv3Request pbv3.MultiGlobRequest pv3Request.Metrics = query pv3Request.StartTime = from64 diff --git a/cmd/carbonapi/http/find_handlers.go b/cmd/carbonapi/http/find_handlers.go index e4141bf4c..7b7349326 100644 --- a/cmd/carbonapi/http/find_handlers.go +++ b/cmd/carbonapi/http/find_handlers.go @@ -221,6 +221,12 @@ func findHandler(w http.ResponseWriter, r *http.Request) { return } + if queryLengthLimitExceeded(query, config.Config.MaxQueryLength) { + setError(w, &accessLogDetails, "query length limit exceeded", http.StatusBadRequest, uid.String()) + logAsError = true + return + } + if format == completerFormat { var replacer = strings.NewReplacer("/", ".") for i := range query { diff --git a/cmd/carbonapi/http/helper.go b/cmd/carbonapi/http/helper.go index 8d239b216..d35f5a56b 100644 --- a/cmd/carbonapi/http/helper.go +++ b/cmd/carbonapi/http/helper.go @@ -282,3 +282,23 @@ func timestampTruncate(ts int64, duration time.Duration, durations []config.Dura } return ts } + +func setError(w http.ResponseWriter, accessLogDetails *carbonapipb.AccessLogDetails, msg string, status int, carbonapiUUID string) { + w.Header().Set(ctxHeaderUUID, carbonapiUUID) + http.Error(w, http.StatusText(status)+": "+msg, status) + accessLogDetails.Reason = msg + accessLogDetails.HTTPCode = int32(status) +} + +func queryLengthLimitExceeded(query []string, maxLength uint64) bool { + if maxLength > 0 { + var queryLengthSum uint64 = 0 + for _, q := range query { + queryLengthSum += uint64(len(q)) + } + if queryLengthSum > maxLength { + return true + } + } + return false +} diff --git a/cmd/carbonapi/http/helper_test.go b/cmd/carbonapi/http/helper_test.go index 0f1342b44..1d2261f09 100644 --- a/cmd/carbonapi/http/helper_test.go +++ b/cmd/carbonapi/http/helper_test.go @@ -2,6 +2,7 @@ package http import ( "fmt" + "strings" "testing" "time" @@ -68,3 +69,55 @@ func Test_timestampTruncate(t *testing.T) { }) } } + +func Test_queryLengthLimitExceeded(t *testing.T) { + tests := []struct { + query []string + maxLength uint64 + want bool + }{ + { + query: []string{"a.b.c.d.e", "a.a.a.a.a.a.a.a.a.a.a.a"}, + maxLength: 20, + want: true, + }, + { + query: []string{"a.b.c", "a.b.d"}, + maxLength: 10, + want: false, + }, + { + query: []string{"a.b.c", "a.b.d"}, + maxLength: 9, + want: true, + }, + { + query: []string{"a.b.c.d.e", "a.a.a.a.a.a.a.a.a.a.a.a"}, + maxLength: 0, + want: false, + }, + { + query: []string{"a.b.b.c.*", "a.d.e", "a.b.c", "a.b.e", "a.a.a.b", "a.f.s.x.w.g"}, + maxLength: 30, + want: true, + }, + { + query: []string{"a.b.c.d.e", "a.b.c.d.f", "b.a.*"}, + maxLength: 10000, + want: false, + }, + { + query: []string{}, + maxLength: 0, + want: false, + }, + } + + for _, tt := range tests { + t.Run(fmt.Sprintf("queryLengthLimitExceeded([%s], %d) -> %t", strings.Join(tt.query, ", "), tt.maxLength, tt.want), func(t *testing.T) { + if got := queryLengthLimitExceeded(tt.query, tt.maxLength); got != tt.want { + t.Errorf("queryLengthLimitExceeded() = %t, want %t", got, tt.want) + } + }) + } +} diff --git a/cmd/carbonapi/http/main_test.go b/cmd/carbonapi/http/main_test.go index c14458238..4e68fb987 100644 --- a/cmd/carbonapi/http/main_test.go +++ b/cmd/carbonapi/http/main_test.go @@ -114,7 +114,7 @@ func init() { logger := zapwriter.Logger("main") cfgFile := "" - config.SetUpViper(logger, &cfgFile, "CARBONAPI_") + config.SetUpViper(logger, &cfgFile, false, "CARBONAPI_") config.Config.Upstreams.Backends = []string{"dummy"} config.SetUpConfigUpstreams(logger) config.SetUpConfig(logger, "(test)") diff --git a/cmd/carbonapi/http/render_handler.go b/cmd/carbonapi/http/render_handler.go index 56ae44738..c82ba1333 100644 --- a/cmd/carbonapi/http/render_handler.go +++ b/cmd/carbonapi/http/render_handler.go @@ -42,13 +42,6 @@ func cleanupParams(r *http.Request) { r.Form.Del("_t") // Used by jquery.graphite.js } -func setError(w http.ResponseWriter, accessLogDetails *carbonapipb.AccessLogDetails, msg string, status int, carbonapiUUID string) { - w.Header().Set(ctxHeaderUUID, carbonapiUUID) - http.Error(w, http.StatusText(status)+": "+msg, status) - accessLogDetails.Reason = msg - accessLogDetails.HTTPCode = int32(status) -} - func getCacheTimeout(logger *zap.Logger, r *http.Request, now32, until32 int64, duration time.Duration, cacheConfig *config.CacheConfig) int32 { if tstr := r.FormValue("cacheTimeout"); tstr != "" { t, err := strconv.Atoi(tstr) @@ -232,6 +225,12 @@ func renderHandler(w http.ResponseWriter, r *http.Request) { } } + if queryLengthLimitExceeded(targets, config.Config.MaxQueryLength) { + setError(w, accessLogDetails, "total target length limit exceeded", http.StatusBadRequest, uid.String()) + logAsError = true + return + } + if useCache { tc := time.Now() response, err := config.Config.ResponseCache.Get(responseCacheKey) diff --git a/cmd/carbonapi/http/tags_handler.go b/cmd/carbonapi/http/tags_handler.go index 1fbcca3c3..2663f37d3 100644 --- a/cmd/carbonapi/http/tags_handler.go +++ b/cmd/carbonapi/http/tags_handler.go @@ -80,6 +80,12 @@ func tagHandler(w http.ResponseWriter, r *http.Request) { q.Del("pretty") rawQuery := q.Encode() + if queryLengthLimitExceeded(r.Form["query"], config.Config.MaxQueryLength) { + setError(w, accessLogDetails, "query length limit exceeded", http.StatusBadRequest, uuid.String()) + logAsError = true + return + } + // TODO(civil): Implement caching var res []string if strings.HasSuffix(r.URL.Path, "tags") || strings.HasSuffix(r.URL.Path, "tags/") { diff --git a/cmd/carbonapi/main.go b/cmd/carbonapi/main.go index cb2712f8a..6e391fa35 100644 --- a/cmd/carbonapi/main.go +++ b/cmd/carbonapi/main.go @@ -35,6 +35,7 @@ func main() { configPath := flag.String("config", "", "Path to the `config file`.") checkConfig := flag.Bool("check-config", false, "Check config file and exit.") + exactConfig := flag.Bool("exact-config", false, "Ensure that all config params are contained in the target struct.") envPrefix := flag.String("envprefix", "CARBONAPI", "Prefix for environment variables override") if *envPrefix == "(empty)" { *envPrefix = "" @@ -43,7 +44,7 @@ func main() { logger.Warn("empty prefix is not recommended due to possible collisions with OS environment variables") } flag.Parse() - config.SetUpViper(logger, configPath, *envPrefix) + config.SetUpViper(logger, configPath, *exactConfig, *envPrefix) if *checkConfig { os.Exit(0) } diff --git a/cmd/mockbackend/testcases/pr817/carbonapi.yaml b/cmd/mockbackend/testcases/pr817/carbonapi.yaml new file mode 100644 index 000000000..368baaa66 --- /dev/null +++ b/cmd/mockbackend/testcases/pr817/carbonapi.yaml @@ -0,0 +1,61 @@ +listen: "localhost:8081" +expvar: + enabled: true + pprofEnabled: false + listen: "" +concurency: 1000 +notFoundStatusCode: 200 +cache: + type: "mem" + size_mb: 0 + defaultTimeoutSec: 60 +cpus: 0 +tz: "" +maxBatchSize: 500 +maxQueryLength: 20 +combineMultipleTargetsInOne: true +graphite: + host: "" + interval: "60s" + prefix: "carbon.api" + pattern: "{prefix}.{fqdn}" +idleConnections: 10 +pidFile: "" +upstreams: + buckets: 10 + timeouts: + find: "2s" + render: "10s" + connect: "200ms" + concurrencyLimitPerServer: 100 + keepAliveInterval: "30s" + maxIdleConnsPerHost: 100 + doMultipleRequestsIfSplit: true + backendsv2: + backends: + - + groupName: "mock-001" + protocol: "auto" + lbMethod: "all" + maxTries: 3 + maxBatchSize: 500 + keepAliveInterval: "10s" + concurrencyLimit: 0 + forceAttemptHTTP2: true + maxIdleConnsPerHost: 1000 + doMultipleRequestsIfSplit: true + timeouts: + find: "15s" + render: "50s" + connect: "200ms" + servers: + - "http://127.0.0.1:9070" + graphite09compat: false +expireDelaySec: 10 +logger: + - logger: "" + file: "stderr" + level: "debug" + encoding: "console" + encodingTime: "iso8601" + encodingDuration: "seconds" diff --git a/cmd/mockbackend/testcases/pr817/pr817.yaml b/cmd/mockbackend/testcases/pr817/pr817.yaml new file mode 100644 index 000000000..0330930dc --- /dev/null +++ b/cmd/mockbackend/testcases/pr817/pr817.yaml @@ -0,0 +1,94 @@ +version: "v1" +test: + apps: + - name: "carbonapi" + binary: "./carbonapi" + args: + - "-config" + - "./cmd/mockbackend/testcases/pr817/carbonapi.yaml" + queries: + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/render?target=a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.b&format=json" + expectedResponse: + httpCode: 400 + contentType: "text/plain; charset=utf-8" + emptyBody: true + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/metrics/find?query=a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.*&format=json" + expectedResponse: + httpCode: 400 + contentType: "text/plain; charset=utf-8" + emptyBody: true + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/metrics/expand?query=a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.b&format=json" + expectedResponse: + httpCode: 400 + contentType: "text/plain; charset=utf-8" + emptyBody: true + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/tags/autoComplete/tags?query=a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.b" + expectedResponse: + httpCode: 400 + contentType: "text/plain; charset=utf-8" + emptyBody: true + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/tags/autoComplete/values?query=a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.a.b" + expectedResponse: + httpCode: 400 + contentType: "text/plain; charset=utf-8" + emptyBody: true + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/render/?target=a.b.c&target=a.b.d&format=json" + expectedResponse: + httpCode: 200 + contentType: "application/json" + expectedResults: + - metrics: + - target: "a.b.c" + datapoints: [[0,1],[1,2],[2,3],[2,4],[3,5]] + - target: "a.b.d" + datapoints: [[31,1],[10,2],[4,3],[7,4],[3,5]] + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/metrics/find?query=a.b.*&format=json" + expectedResponse: + httpCode: 200 + contentType: "application/json" + # - endpoint: "http://127.0.0.1:8081" + # type: "GET" + # URL: "/metrics/expand?query=a.*&format=json" + # expectedResponse: + # httpCode: 200 + # contentType: "application/json" + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/tags/autoComplete/tags?query=a.b.c" + expectedResponse: + httpCode: 200 + contentType: "application/json" + - endpoint: "http://127.0.0.1:8081" + type: "GET" + URL: "/tags/autoComplete/values?query=a.b.c" + expectedResponse: + httpCode: 200 + contentType: "application/json" + +listeners: + - address: ":9070" + expressions: + "a.b.c": + pathExpression: "a.b.c" + data: + - metricName: "a.b.c" + values: [0,1,2,2,3] + "a.b.d": + pathExpression: "a.b.d" + data: + - metricName: "a.b.d" + values: [31,10,4,7,3] diff --git a/expr/expr.go b/expr/expr.go index 7d8bea278..aaf8d284f 100644 --- a/expr/expr.go +++ b/expr/expr.go @@ -18,7 +18,9 @@ import ( type evaluator struct{} func (eval evaluator) Fetch(ctx context.Context, exprs []parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) (map[parser.MetricRequest][]*types.MetricData, error) { - config.Config.Limiter.Enter() + if err := config.Config.Limiter.Enter(ctx); err != nil { + return nil, err + } defer config.Config.Limiter.Leave() multiFetchRequest := pb.MultiFetchRequest{} diff --git a/expr/functions/transformNull/function.go b/expr/functions/transformNull/function.go index 0ebbac84c..678151d98 100644 --- a/expr/functions/transformNull/function.go +++ b/expr/functions/transformNull/function.go @@ -10,6 +10,7 @@ import ( "github.com/go-graphite/carbonapi/expr/helper" "github.com/go-graphite/carbonapi/expr/interfaces" + "github.com/go-graphite/carbonapi/expr/tags" "github.com/go-graphite/carbonapi/expr/types" "github.com/go-graphite/carbonapi/pkg/parser" ) @@ -42,7 +43,7 @@ func (f *transformNull) Do(ctx context.Context, e parser.Expr, from, until int64 if err != nil { return nil, err } - defaultOnAbsent, err := e.GetBoolNamedOrPosArgDefault("defaultOnAbsent", 2, false) + defaultOnAbsent, err := e.GetBoolNamedOrPosArgDefault("defaultOnAbsent", 3, false) if err != nil { return nil, err } @@ -88,7 +89,8 @@ func (f *transformNull) Do(ctx context.Context, e parser.Expr, from, until int64 name = "transformNull(" + a.Name + ")" } - r := a.CopyName(name) + r := a.CopyLink() + r.Name = name r.Values = make([]float64, len(a.Values)) r.Tags["transformNull"] = defvStr @@ -109,15 +111,18 @@ func (f *transformNull) Do(ctx context.Context, e parser.Expr, from, until int64 if len(arg) == 0 && defaultOnAbsent { values := []float64{defv, defv} step := until - from + name := e.ToString() + tags := tags.ExtractTags(types.ExtractName(name)) + tags["transformNull"] = defvStr results = append(results, &types.MetricData{ FetchResponse: pbv3.FetchResponse{ - Name: e.ToString(), + Name: name, StartTime: from, StopTime: from + step*int64(len(values)), StepTime: step, Values: values, }, - Tags: map[string]string{"name": e.ToString()}, + Tags: tags, }) } return results, nil diff --git a/expr/functions/transformNull/function_test.go b/expr/functions/transformNull/function_test.go index 59a92fbe5..d548b5a7f 100644 --- a/expr/functions/transformNull/function_test.go +++ b/expr/functions/transformNull/function_test.go @@ -32,7 +32,7 @@ func TestTransformNull(t *testing.T) { {"metric1", 0, 1}: {types.MakeMetricData("metric1", []float64{1, math.NaN(), math.NaN(), 3, 4, 12}, 1, now)}, }, []*types.MetricData{types.MakeMetricData("transformNull(metric1)", - []float64{1, 0, 0, 3, 4, 12}, 1, now).SetTag("transformNull", "0").SetNameTag("transformNull(metric1)")}, + []float64{1, 0, 0, 3, 4, 12}, 1, now).SetTag("transformNull", "0").SetNameTag("metric1")}, }, { `transformNull(metric1, default=5)`, @@ -40,7 +40,7 @@ func TestTransformNull(t *testing.T) { {"metric1", 0, 1}: {types.MakeMetricData("metric1", []float64{1, math.NaN(), math.NaN(), 3, 4, 12}, 1, now)}, }, []*types.MetricData{types.MakeMetricData("transformNull(metric1,5)", - []float64{1, 5, 5, 3, 4, 12}, 1, now).SetTag("transformNull", "5").SetNameTag("transformNull(metric1,5)")}, + []float64{1, 5, 5, 3, 4, 12}, 1, now).SetTag("transformNull", "5").SetNameTag("metric1")}, }, { `transformNull(metric1, default=5, referenceSeries=metric2.*)`, @@ -51,13 +51,22 @@ func TestTransformNull(t *testing.T) { types.MakeMetricData("metric2.bar", []float64{1, math.NaN(), math.NaN(), 3, 4, 12}, 1, now)}, }, []*types.MetricData{types.MakeMetricData("transformNull(metric1,5)", - []float64{1, 5, math.NaN(), 5, 4, 12}, 1, now).SetTag("transformNull", "5").SetNameTag("transformNull(metric1,5)")}, + []float64{1, 5, math.NaN(), 5, 4, 12}, 1, now).SetTag("transformNull", "5").SetNameTag("metric1")}, }, { `transformNull(metric1, default=5, defaultOnAbsent=True)`, map[parser.MetricRequest][]*types.MetricData{}, []*types.MetricData{types.MakeMetricData("transformNull(metric1, default=5, defaultOnAbsent=True)", - []float64{5, 5}, 1, 0).SetNameTag(`transformNull(metric1, default=5, defaultOnAbsent=True)`)}, + []float64{5, 5}, 1, 0).SetTag("transformNull", "5").SetNameTag(`metric1`)}, + }, + // tagged metric + { + `transformNull(seriesByTag('name=metric1'), 0)`, + map[parser.MetricRequest][]*types.MetricData{ + {"seriesByTag('name=metric1')", 0, 1}: {types.MakeMetricData("metric1;env=prod", []float64{1, math.NaN(), math.NaN(), 3, 4, 12}, 1, now)}, + }, + []*types.MetricData{types.MakeMetricData("transformNull(metric1;env=prod,0)", + []float64{1, 0, 0, 3, 4, 12}, 1, now).SetTag("transformNull", "0").SetNameTag("metric1").SetTag("env", "prod")}, }, } diff --git a/expr/types/types.go b/expr/types/types.go index 9df402930..3f5fb764c 100644 --- a/expr/types/types.go +++ b/expr/types/types.go @@ -427,12 +427,17 @@ func (r *MetricData) Copy(includeValues bool) *MetricData { } } +func CopyLink(tags map[string]string) map[string]string { + newTags := make(map[string]string) + for k, v := range tags { + newTags[k] = v + } + return newTags +} + // CopyLink returns the copy of MetricData, Values not copied and link from parent. Tags map are copied func (r *MetricData) CopyLink() *MetricData { - tags := make(map[string]string) - for k, v := range r.Tags { - tags[k] = v - } + tags := CopyLink(r.Tags) return &MetricData{ FetchResponse: pb.FetchResponse{ diff --git a/limiter/simple.go b/limiter/simple.go index 4a65f8cdb..2c96fb92e 100644 --- a/limiter/simple.go +++ b/limiter/simple.go @@ -1,9 +1,27 @@ package limiter +import "context" + type SimpleLimiter chan struct{} -func (l SimpleLimiter) Enter() { l <- struct{}{} } -func (l SimpleLimiter) Leave() { <-l } +func (l SimpleLimiter) Enter(ctx context.Context) error { + if l == nil { + return nil + } + + select { + case l <- struct{}{}: + return nil + case <-ctx.Done(): + return ErrTimeout + } +} + +func (l SimpleLimiter) Leave() { + if l != nil { + <-l + } +} func NewSimpleLimiter(l int) SimpleLimiter { return make(chan struct{}, l) diff --git a/zipper/broadcast/broadcast_group.go b/zipper/broadcast/broadcast_group.go index 35efed554..fb80ed3f4 100644 --- a/zipper/broadcast/broadcast_group.go +++ b/zipper/broadcast/broadcast_group.go @@ -571,8 +571,9 @@ func (bg *BroadcastGroup) doFind(ctx context.Context, logger *zap.Logger, backen var err merry.Error r.Response, r.Stats, err = backend.Find(ctx, request) r.AddError(err) + // TODO: Add a separate logger that would log full response logger.Debug("fetched response", - zap.Any("response", r), + zap.Any("response_length", len(r)), ) resCh <- r }