Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expr: refactor for avoid global evaluator usage #818

Merged
merged 1 commit into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions cmd/carbonapi/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"time"

"github.com/go-graphite/carbonapi/cache"
"github.com/go-graphite/carbonapi/cmd/carbonapi/interfaces"
"github.com/go-graphite/carbonapi/expr"
"github.com/go-graphite/carbonapi/expr/interfaces"
"github.com/go-graphite/carbonapi/limiter"
"github.com/go-graphite/carbonapi/pkg/tlsconfig"
zipperCfg "github.com/go-graphite/carbonapi/zipper/config"
zipper "github.com/go-graphite/carbonapi/zipper/interfaces"
zipperTypes "github.com/go-graphite/carbonapi/zipper/types"

"github.com/lomik/zapwriter"
Expand Down Expand Up @@ -116,10 +118,12 @@ type ConfigType struct {
DefaultTimeZone *time.Location `mapstructure:"-" json:"-"`

// ZipperInstance is API entry to carbonzipper
ZipperInstance interfaces.CarbonZipper `mapstructure:"-" json:"-"`
ZipperInstance zipper.CarbonZipper `mapstructure:"-" json:"-"`

// Limiter limits concurrent zipper requests
Limiter limiter.SimpleLimiter `mapstructure:"-" json:"-"`

Evaluator interfaces.Evaluator `mapstructure:"-" json:"-"`
}

// skipcq: CRT-P0003
Expand All @@ -132,6 +136,12 @@ func (c ConfigType) String() string {
}
}

func (c *ConfigType) SetZipper(zipper zipper.CarbonZipper) (err error) {
c.ZipperInstance = zipper
c.Evaluator, err = expr.NewEvaluator(c.Limiter, c.ZipperInstance)
return
}

var Config = ConfigType{
ExtrapolateExperiment: false,
Buckets: 10,
Expand Down
2 changes: 1 addition & 1 deletion cmd/carbonapi/http/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func init() {
config.Config.Upstreams.Backends = []string{"dummy"}
config.SetUpConfigUpstreams(logger)
config.SetUpConfig(logger, "(test)")
config.Config.ZipperInstance = newMockCarbonZipper()
config.Config.SetZipper(newMockCarbonZipper())
emptyStringList := make([]string, 0)
InitHandlers(emptyStringList, emptyStringList)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/carbonapi/http/render_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func renderHandler(w http.ResponseWriter, r *http.Request) {

ApiMetrics.RenderRequests.Add(1)

result, errs := expr.FetchAndEvalExprs(ctx, exprs, from32, until32, values)
result, errs := expr.FetchAndEvalExprs(ctx, config.Config.Evaluator, exprs, from32, until32, values)
if errs != nil {
errors = errs
}
Expand All @@ -324,7 +324,7 @@ func renderHandler(w http.ResponseWriter, r *http.Request) {

ApiMetrics.RenderRequests.Add(1)

result, err := expr.FetchAndEvalExp(ctx, exp, from32, until32, values)
result, err := expr.FetchAndEvalExp(ctx, config.Config.Evaluator, exp, from32, until32, values)
if err != nil {
errors[target] = merry.Wrap(err)
}
Expand Down
6 changes: 5 additions & 1 deletion cmd/carbonapi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ func main() {
dns.UseDNSCache(config.Config.CachingDNSRefreshTime)
}

config.Config.ZipperInstance = newZipper(carbonapiHttp.ZipperStats, &config.Config.Upstreams, config.Config.IgnoreClientTimeout, zapwriter.Logger("zipper"))
if err := config.Config.SetZipper(newZipper(carbonapiHttp.ZipperStats, &config.Config.Upstreams, config.Config.IgnoreClientTimeout, zapwriter.Logger("zipper"))); err != nil {
logger.Fatal("failed to setup zipper",
zap.Error(err),
)
}

wg := sync.WaitGroup{}
serve := func(listen config.Listener, handler http.Handler) {
Expand Down
135 changes: 72 additions & 63 deletions expr/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,34 @@ package expr

import (
"context"
"errors"

"github.com/ansel1/merry"
pb "github.com/go-graphite/protocol/carbonapi_v3_pb"

"github.com/go-graphite/carbonapi/cmd/carbonapi/config"
_ "github.com/go-graphite/carbonapi/expr/functions"
"github.com/go-graphite/carbonapi/expr/helper"
"github.com/go-graphite/carbonapi/expr/interfaces"
"github.com/go-graphite/carbonapi/expr/metadata"
"github.com/go-graphite/carbonapi/expr/types"
"github.com/go-graphite/carbonapi/limiter"
"github.com/go-graphite/carbonapi/pkg/parser"
utilctx "github.com/go-graphite/carbonapi/util/ctx"
zipper "github.com/go-graphite/carbonapi/zipper/interfaces"
)

type evaluator struct{}
var ErrZipperNotInit = errors.New("zipper not initialized")

func (eval evaluator) Fetch(ctx context.Context, exprs []parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) (map[parser.MetricRequest][]*types.MetricData, error) {
if err := config.Config.Limiter.Enter(ctx); err != nil {
type Evaluator struct {
limiter limiter.SimpleLimiter
zipper zipper.CarbonZipper
}

func (eval Evaluator) Fetch(ctx context.Context, exprs []parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) (map[parser.MetricRequest][]*types.MetricData, error) {
if err := eval.limiter.Enter(ctx); err != nil {
return nil, err
}
defer config.Config.Limiter.Leave()
defer eval.limiter.Leave()

multiFetchRequest := pb.MultiFetchRequest{}
metricRequestCache := make(map[string]parser.MetricRequest)
Expand Down Expand Up @@ -74,7 +82,7 @@ func (eval evaluator) Fetch(ctx context.Context, exprs []parser.Expr, from, unti
}

if len(multiFetchRequest.Metrics) > 0 {
metrics, _, err := config.Config.ZipperInstance.Render(ctx, multiFetchRequest)
metrics, _, err := eval.zipper.Render(ctx, multiFetchRequest)
// If we had only partial result, we want to do our best to actually do our job
if err != nil && merry.HTTPCode(err) >= 400 && !haveFallbackSeries {
return nil, err
Expand All @@ -97,16 +105,16 @@ func (eval evaluator) Fetch(ctx context.Context, exprs []parser.Expr, from, unti
targetValues[m] = values[m]
}

if config.Config.ZipperInstance.ScaleToCommonStep() {
if eval.zipper.ScaleToCommonStep() {
targetValues = helper.ScaleValuesToCommonStep(targetValues)
}

return targetValues, nil
}

// Eval evaluates expressions.
func (eval evaluator) Eval(ctx context.Context, exp parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) (results []*types.MetricData, err error) {
rewritten, targets, err := RewriteExpr(ctx, exp, from, until, values)
func (eval Evaluator) Eval(ctx context.Context, exp parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) (results []*types.MetricData, err error) {
rewritten, targets, err := RewriteExpr(ctx, eval, exp, from, until, values)
if err != nil {
return nil, err
}
Expand All @@ -128,63 +136,19 @@ func (eval evaluator) Eval(ctx context.Context, exp parser.Expr, from, until int
}
return results, nil
}
return EvalExpr(ctx, exp, from, until, values)
}

var _evaluator = evaluator{}

func init() {
helper.SetEvaluator(_evaluator)
metadata.SetEvaluator(_evaluator)
}

// FetchAndEvalExp fetch data and evaluates expressions
func FetchAndEvalExp(ctx context.Context, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, merry.Error) {
targetValues, err := _evaluator.Fetch(ctx, []parser.Expr{e}, from, until, values)
if err != nil {
return nil, merry.Wrap(err)
}

res, err := _evaluator.Eval(ctx, e, from, until, targetValues)
if err != nil {
return nil, merry.Wrap(err)
}

for mReq := range values {
SortMetrics(values[mReq], mReq)
}

return res, nil
return EvalExpr(ctx, eval, exp, from, until, values)
}

func FetchAndEvalExprs(ctx context.Context, exprs []parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, map[string]merry.Error) {
targetValues, err := _evaluator.Fetch(ctx, exprs, from, until, values)
if err != nil {
return nil, map[string]merry.Error{"*": merry.Wrap(err)}
}

res := make([]*types.MetricData, 0, len(exprs))
var errors map[string]merry.Error
for _, exp := range exprs {
evaluationResult, err := _evaluator.Eval(ctx, exp, from, until, targetValues)
if err != nil {
if errors == nil {
errors = make(map[string]merry.Error)
}
errors[exp.Target()] = merry.Wrap(err)
}
res = append(res, evaluationResult...)
}

for mReq := range values {
SortMetrics(values[mReq], mReq)
// NewEvaluator create evaluator with limiter and zipper
func NewEvaluator(limiter limiter.SimpleLimiter, zipper zipper.CarbonZipper) (*Evaluator, error) {
if zipper == nil {
return nil, ErrZipperNotInit
}

return res, errors
return &Evaluator{limiter: limiter, zipper: zipper}, nil
}

// EvalExpr is the main expression evaluator.
func EvalExpr(ctx context.Context, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, error) {
func EvalExpr(ctx context.Context, eval interfaces.Evaluator, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, error) {
if e.IsName() {
return values[parser.MetricRequest{Metric: e.Target(), From: from, Until: until}], nil
} else if e.IsConst() {
Expand Down Expand Up @@ -212,7 +176,7 @@ func EvalExpr(ctx context.Context, e parser.Expr, from, until int64, values map[
f, ok := metadata.FunctionMD.Functions[e.Target()]
metadata.FunctionMD.RUnlock()
if ok {
v, err := f.Do(ctx, e, from, until, values)
v, err := f.Do(ctx, eval, e, from, until, values)
if err != nil {
err = merry.WithMessagef(err, "function=%s: %s", e.Target(), err.Error())
if merry.Is(
Expand Down Expand Up @@ -242,14 +206,59 @@ func EvalExpr(ctx context.Context, e parser.Expr, from, until int64, values map[
// applyByNode(foo*, 1, "%") -> (true, ["foo1", "foo2"], nil)
// sumSeries(foo) -> (false, nil, nil)
// Assumes that applyByNode only appears as the outermost function.
func RewriteExpr(ctx context.Context, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) (bool, []string, error) {
func RewriteExpr(ctx context.Context, eval interfaces.Evaluator, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) (bool, []string, error) {
if e.IsFunc() {
metadata.FunctionMD.RLock()
f, ok := metadata.FunctionMD.RewriteFunctions[e.Target()]
metadata.FunctionMD.RUnlock()
if ok {
return f.Do(ctx, e, from, until, values)
return f.Do(ctx, eval, e, from, until, values)
}
}
return false, nil, nil
}

// FetchAndEvalExp fetch data and evaluates expressions
func FetchAndEvalExp(ctx context.Context, eval interfaces.Evaluator, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, merry.Error) {
targetValues, err := eval.Fetch(ctx, []parser.Expr{e}, from, until, values)
if err != nil {
return nil, merry.Wrap(err)
}

res, err := eval.Eval(ctx, e, from, until, targetValues)
if err != nil {
return nil, merry.Wrap(err)
}

for mReq := range values {
SortMetrics(values[mReq], mReq)
}

return res, nil
}

func FetchAndEvalExprs(ctx context.Context, eval interfaces.Evaluator, exprs []parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, map[string]merry.Error) {
targetValues, err := eval.Fetch(ctx, exprs, from, until, values)
if err != nil {
return nil, map[string]merry.Error{"*": merry.Wrap(err)}
}

res := make([]*types.MetricData, 0, len(exprs))
var errors map[string]merry.Error
for _, exp := range exprs {
evaluationResult, err := eval.Eval(ctx, exp, from, until, targetValues)
if err != nil {
if errors == nil {
errors = make(map[string]merry.Error)
}
errors[exp.Target()] = merry.Wrap(err)
}
res = append(res, evaluationResult...)
}

for mReq := range values {
SortMetrics(values[mReq], mReq)
}

return res, errors
}
Loading
Loading