From 57d8ce466f0efd7e5c3fba83e39d886f4c83b7cd Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Fri, 18 Aug 2023 21:22:40 -0700 Subject: [PATCH] Add AlertBackfill functionality to promxy This optional config flag will enable promxy to backfill alert status by re-querying the alert statement on startup -- in the event that the downstream store doesn't have the corresponding ALERTS_FOR_STATE series. Fixes #50 --- cmd/promxy/Makefile | 2 +- cmd/promxy/alert_example.rule | 8 +- cmd/promxy/demo_robust.conf | 6 + cmd/promxy/main.go | 16 ++- cmd/promxy/recording_example.rule | 7 - pkg/alertbackfill/alertstate.go | 72 +++++++++++ pkg/alertbackfill/alertstate_test.go | 187 +++++++++++++++++++++++++++ pkg/alertbackfill/backfill.go | 166 ++++++++++++++++++++++++ pkg/alertbackfill/rule.go | 83 ++++++++++++ pkg/alertbackfill/rule_test.go | 126 ++++++++++++++++++ pkg/promclient/engine.go | 120 +++++++++++++++++ pkg/promclient/engine_test.go | 59 +++++++++ 12 files changed, 842 insertions(+), 10 deletions(-) delete mode 100644 cmd/promxy/recording_example.rule create mode 100644 pkg/alertbackfill/alertstate.go create mode 100644 pkg/alertbackfill/alertstate_test.go create mode 100644 pkg/alertbackfill/backfill.go create mode 100644 pkg/alertbackfill/rule.go create mode 100644 pkg/alertbackfill/rule_test.go create mode 100644 pkg/promclient/engine.go create mode 100644 pkg/promclient/engine_test.go diff --git a/cmd/promxy/Makefile b/cmd/promxy/Makefile index d31037804..e98133837 100644 --- a/cmd/promxy/Makefile +++ b/cmd/promxy/Makefile @@ -1,5 +1,5 @@ test-robust: - go build -race -mod=vendor -tags netgo,builtinassets && ./promxy --log-level=debug --http.shutdown-delay=0s --config=demo_robust.conf --web.external-url=http://localhost:8082/promxy + go build -race -mod=vendor -tags netgo,builtinassets && ./promxy --rules.alertbackfill --log-level=debug --http.shutdown-delay=0s --config=demo_robust.conf --web.external-url=http://localhost:8082/promxy test-local: diff --git a/cmd/promxy/alert_example.rule b/cmd/promxy/alert_example.rule index 6937f8b05..b90981684 100644 --- a/cmd/promxy/alert_example.rule +++ b/cmd/promxy/alert_example.rule @@ -9,4 +9,10 @@ groups: severity: page annotations: summary: High request latency - + - alert: testAlert + expr: prometheus_build_info == 1 + for: 10m + labels: + severity: page + annotations: + summary: example always-firing alert diff --git a/cmd/promxy/demo_robust.conf b/cmd/promxy/demo_robust.conf index effb1334c..052976418 100644 --- a/cmd/promxy/demo_robust.conf +++ b/cmd/promxy/demo_robust.conf @@ -6,6 +6,12 @@ global: external_labels: source: promxy +# Rule files specifies a list of globs. Rules and alerts are read from +# all matching files. +rule_files: +- "*rule" + + ## ### Promxy configuration ## diff --git a/cmd/promxy/main.go b/cmd/promxy/main.go index 64f32f7cf..7706ea82b 100644 --- a/cmd/promxy/main.go +++ b/cmd/promxy/main.go @@ -44,6 +44,7 @@ import ( "go.uber.org/atomic" "k8s.io/klog" + "github.com/jacksontj/promxy/pkg/alertbackfill" proxyconfig "github.com/jacksontj/promxy/pkg/config" "github.com/jacksontj/promxy/pkg/logging" "github.com/jacksontj/promxy/pkg/middleware" @@ -104,6 +105,7 @@ type cliOpts struct { ForOutageTolerance time.Duration `long:"rules.alert.for-outage-tolerance" description:"Max time to tolerate prometheus outage for restoring for state of alert." default:"1h"` ForGracePeriod time.Duration `long:"rules.alert.for-grace-period" description:"Minimum duration between alert and restored for state. This is maintained only for alerts with configured for time greater than grace period." default:"10m"` ResendDelay time.Duration `long:"rules.alert.resend-delay" description:"Minimum amount of time to wait before resending an alert to Alertmanager." default:"1m"` + AlertBackfill bool `long:"rules.alertbackfill" description:"Enable promxy to recalculate alert state on startup when the downstream datastore doesn't have an ALERTS_FOR_STATE"` ShutdownDelay time.Duration `long:"http.shutdown-delay" description:"time to wait before shutting down the http server, this allows for a grace period for upstreams (e.g. LoadBalancers) to discover the new stopping status through healthchecks" default:"10s"` ShutdownTimeout time.Duration `long:"http.shutdown-timeout" description:"max time to wait for a graceful shutdown of the HTTP server" default:"60s"` @@ -307,19 +309,31 @@ func main() { logrus.Infof("Notifier manager stopped") }() + var ruleQueryable storage.Queryable + // If alertbackfill is enabled; wire it up! + if opts.AlertBackfill { + ruleQueryable = alertbackfill.NewAlertBackfillQueryable(engine, proxyStorage) + } else { + ruleQueryable = proxyStorage + } ruleManager := rules.NewManager(&rules.ManagerOptions{ Context: ctx, // base context for all background tasks ExternalURL: externalUrl, // URL listed as URL for "who fired this alert" QueryFunc: rules.EngineQueryFunc(engine, proxyStorage), NotifyFunc: sendAlerts(notifierManager, externalUrl.String()), Appendable: proxyStorage, - Queryable: proxyStorage, + Queryable: ruleQueryable, Logger: logger, Registerer: prometheus.DefaultRegisterer, OutageTolerance: opts.ForOutageTolerance, ForGracePeriod: opts.ForGracePeriod, ResendDelay: opts.ResendDelay, }) + + if q, ok := ruleQueryable.(*alertbackfill.AlertBackfillQueryable); ok { + q.SetRuleGroupFetcher(ruleManager.RuleGroups) + } + go ruleManager.Run() reloadables = append(reloadables, proxyconfig.WrapPromReloadable(&proxyconfig.ApplyConfigFunc{func(cfg *config.Config) error { diff --git a/cmd/promxy/recording_example.rule b/cmd/promxy/recording_example.rule deleted file mode 100644 index 6b2c3e30b..000000000 --- a/cmd/promxy/recording_example.rule +++ /dev/null @@ -1,7 +0,0 @@ -# This get evaluated, but since prometheus has no API for appending -# datapoints this is a no-op -groups: - - name: example - rules: - - record: query_qps - expr: irate(http_requests_total{handler="query"}[1m]) diff --git a/pkg/alertbackfill/alertstate.go b/pkg/alertbackfill/alertstate.go new file mode 100644 index 000000000..2dda5b66d --- /dev/null +++ b/pkg/alertbackfill/alertstate.go @@ -0,0 +1,72 @@ +package alertbackfill + +import ( + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" +) + +// GenerateAlertStateMatrix will generate the a model.Matrix which is the equivalent +// of the `ALERTS_FOR_STATE` series that would have been generated for the given data +func GenerateAlertStateMatrix(v model.Matrix, matchers []*labels.Matcher, alertLabels labels.Labels, step time.Duration) model.Matrix { + matrix := make(model.Matrix, 0, v.Len()) +MATRIXSAMPLE_LOOP: + for _, item := range v { + // clone the metric -- and convert the labels + metric := item.Metric.Clone() + + // Add the labels which the alert would add + for _, label := range alertLabels { + metric[model.LabelName(label.Name)] = model.LabelValue(label.Value) + } + + // Filter to results that match our matchers + for _, matcher := range matchers { + switch matcher.Name { + // Overwrite the __name__ and alertname + case model.MetricNameLabel, model.AlertNameLabel: + metric[model.LabelName(matcher.Name)] = model.LabelValue(matcher.Value) + default: + if !matcher.Matches(string(metric[model.LabelName(matcher.Name)])) { + continue MATRIXSAMPLE_LOOP + } + } + } + + var ( + activeAt model.SampleValue + lastPoint time.Time + ) + // Now we have to convert the *actual* result into the series that is stored for the ALERTS + samples := make([]model.SamplePair, len(item.Values)) + for x, sample := range item.Values { + sampleTime := sample.Timestamp.Time() + + // If we are missing a point in the matrix; then we are going to assume + // that the series cleared, so we need to reset activeAt + if sampleTime.Sub(lastPoint) > step { + activeAt = 0 + } + lastPoint = sampleTime + + // if there is no `activeAt` set; lets set this timestamp (earliest timestamp in the steps that has a point) + if activeAt == 0 { + activeAt = model.SampleValue(sample.Timestamp.Unix()) + } + + samples[x] = model.SamplePair{ + Timestamp: sample.Timestamp, + // The timestamp is a unix timestapm of ActiveAt, so we'll set this to the timestamp instead of the value + Value: activeAt, + } + } + + matrix = append(matrix, &model.SampleStream{ + Metric: metric, + Values: samples, + }) + } + + return matrix +} diff --git a/pkg/alertbackfill/alertstate_test.go b/pkg/alertbackfill/alertstate_test.go new file mode 100644 index 000000000..259c64bd8 --- /dev/null +++ b/pkg/alertbackfill/alertstate_test.go @@ -0,0 +1,187 @@ +package alertbackfill + +import ( + "strconv" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" +) + +func TestGenerateAlertStateMatrix(t *testing.T) { + start := model.Time(0).Add(time.Minute) + tests := []struct { + in model.Matrix + matchers []*labels.Matcher + alertLabels labels.Labels + step time.Duration + + out model.Matrix + }{ + // Simple test case + { + in: model.Matrix{ + &model.SampleStream{ + Metric: model.Metric{ + "__name__": "prometheus_build_info", + "job": "prometheus", + "replica": "a", + }, + Values: []model.SamplePair{ + {start.Add(time.Minute * 0), 1}, + {start.Add(time.Minute * 1), 1}, + {start.Add(time.Minute * 2), 1}, + {start.Add(time.Minute * 3), 1}, + {start.Add(time.Minute * 4), 1}, + // Have a gap! + {start.Add(time.Minute * 6), 1}, + }, + }, + &model.SampleStream{ + Metric: model.Metric{ + "__name__": "prometheus_build_info", + "job": "prometheus", + "replica": "b", + }, + Values: []model.SamplePair{ + {start.Add(time.Minute * 0), 1}, + {start.Add(time.Minute * 1), 1}, + {start.Add(time.Minute * 2), 1}, + {start.Add(time.Minute * 3), 1}, + {start.Add(time.Minute * 4), 1}, + }, + }, + }, + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "ALERTS_FOR_STATE"), + labels.MustNewMatcher(labels.MatchEqual, model.AlertNameLabel, "testalert"), + labels.MustNewMatcher(labels.MatchEqual, "replica", "a"), + }, + alertLabels: labels.Labels{ + labels.Label{"severity", "page"}, + }, + step: time.Minute, + out: model.Matrix{ + &model.SampleStream{ + Metric: model.Metric{ + "__name__": "ALERTS_FOR_STATE", + "alertname": "testalert", + "job": "prometheus", + "replica": "a", + "severity": "page", + }, + Values: []model.SamplePair{ + {start.Add(time.Minute * 0), model.SampleValue(start.Unix())}, + {start.Add(time.Minute * 1), model.SampleValue(start.Unix())}, + {start.Add(time.Minute * 2), model.SampleValue(start.Unix())}, + {start.Add(time.Minute * 3), model.SampleValue(start.Unix())}, + {start.Add(time.Minute * 4), model.SampleValue(start.Unix())}, + {start.Add(time.Minute * 6), model.SampleValue(start.Add(time.Minute * 6).Unix())}, + }, + }, + }, + }, + + // Example from `prometheus_build_info == 1` + { + in: model.Matrix{ + &model.SampleStream{ + Metric: model.Metric{ + "__name__": "prometheus_build_info", + "branch": "HEAD", + "goversion": "go1.16.4", + "instance": "demo.do.prometheus.io:9090", + "job": "prometheus", + "replica": "a", + "revision": "24c9b61221f7006e87cd62b9fe2901d43e19ed53", + "version": "2.27.0", + }, + Values: []model.SamplePair{ + {model.TimeFromUnix(1692419110), 1}, + {model.TimeFromUnix(1692419115), 1}, + {model.TimeFromUnix(1692419120), 1}, + {model.TimeFromUnix(1692419125), 1}, + {model.TimeFromUnix(1692419130), 1}, + }, + }, + &model.SampleStream{ + Metric: model.Metric{ + "__name__": "prometheus_build_info", + "branch": "HEAD", + "goversion": "go1.16.4", + "instance": "demo.do.prometheus.io:9090", + "job": "prometheus", + "replica": "b", + "revision": "24c9b61221f7006e87cd62b9fe2901d43e19ed53", + "version": "2.27.0", + }, + Values: []model.SamplePair{ + {model.TimeFromUnix(1692419110), 1}, + {model.TimeFromUnix(1692419115), 1}, + {model.TimeFromUnix(1692419120), 1}, + {model.TimeFromUnix(1692419125), 1}, + {model.TimeFromUnix(1692419130), 1}, + }, + }, + }, + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "ALERTS_FOR_STATE"), + labels.MustNewMatcher(labels.MatchEqual, model.AlertNameLabel, "testAlert"), + labels.MustNewMatcher(labels.MatchEqual, "branch", "HEAD"), + labels.MustNewMatcher(labels.MatchEqual, "goversion", "go1.16.4"), + labels.MustNewMatcher(labels.MatchEqual, "instance", "demo.do.prometheus.io:9090"), + labels.MustNewMatcher(labels.MatchEqual, "job", "prometheus"), + labels.MustNewMatcher(labels.MatchEqual, "replica", "a"), + labels.MustNewMatcher(labels.MatchEqual, "revision", "24c9b61221f7006e87cd62b9fe2901d43e19ed53"), + labels.MustNewMatcher(labels.MatchEqual, "severity", "pageMORE"), + labels.MustNewMatcher(labels.MatchEqual, "version", "2.27.0"), + }, + alertLabels: labels.Labels{ + labels.Label{"severity", "pageMORE"}, + }, + step: time.Second * 5, + out: model.Matrix{ + &model.SampleStream{ + Metric: model.Metric{ + "__name__": "ALERTS_FOR_STATE", + "alertname": "testAlert", + "branch": "HEAD", + "goversion": "go1.16.4", + "instance": "demo.do.prometheus.io:9090", + "job": "prometheus", + "replica": "a", + "revision": "24c9b61221f7006e87cd62b9fe2901d43e19ed53", + "version": "2.27.0", + "severity": "pageMORE", + }, + Values: []model.SamplePair{ + {model.TimeFromUnix(1692419110), 1692419110}, + {model.TimeFromUnix(1692419115), 1692419110}, + {model.TimeFromUnix(1692419120), 1692419110}, + {model.TimeFromUnix(1692419125), 1692419110}, + {model.TimeFromUnix(1692419130), 1692419110}, + }, + }, + }, + }, + } + + for i, test := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + out := GenerateAlertStateMatrix(test.in, test.matchers, test.alertLabels, test.step) + + if test.out.String() != out.String() { + t.Fatalf("mismatch in series expected=%v actual=%v", test.out, out) + } + + for i, sampleStream := range out { + for _, matcher := range test.matchers { + if !matcher.Matches(string(sampleStream.Metric[model.LabelName(matcher.Name)])) { + t.Fatalf("out series=%d label %s=%s doesn't match matcher %v", i, matcher.Name, sampleStream.Metric[model.LabelName(matcher.Name)], matcher.Value) + } + } + } + }) + } +} diff --git a/pkg/alertbackfill/backfill.go b/pkg/alertbackfill/backfill.go new file mode 100644 index 000000000..87ed77e7f --- /dev/null +++ b/pkg/alertbackfill/backfill.go @@ -0,0 +1,166 @@ +package alertbackfill + +import ( + "context" + "errors" + "fmt" + "time" + + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/rules" + "github.com/prometheus/prometheus/storage" + + "github.com/jacksontj/promxy/pkg/promclient" + "github.com/jacksontj/promxy/pkg/proxyquerier" +) + +// RuleGroupFetcher defines a method to fetch []*rules.Group +type RuleGroupFetcher func() []*rules.Group + +// NewAlertBackfillQueryable returns a new AlertBackfillQueryable +func NewAlertBackfillQueryable(e *promql.Engine, q storage.Queryable) *AlertBackfillQueryable { + return &AlertBackfillQueryable{e: e, q: q} +} + +// AlertBackfillQueryable returns a storage.Queryable that will handle returning +// results for the RuleManager alert backfill. This is done by first attempting +// to query the downstream store and if no result is found it will "recreate" the +// the series by re-running the necessary query to get the data back +type AlertBackfillQueryable struct { + e *promql.Engine + q storage.Queryable + f RuleGroupFetcher +} + +// SetRuleGroupFetcher sets the RuleGroupFetcher -- this is required as we need +// the groups to find the correct step interval +func (q *AlertBackfillQueryable) SetRuleGroupFetcher(f RuleGroupFetcher) { + q.f = f +} + +// Querier returns an AlertBackfillQuerier +func (q *AlertBackfillQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + api, err := promclient.NewEngineAPI(q.e, q.q) + if err != nil { + return nil, err + } + + return &AlertBackfillQuerier{ + ctx: ctx, + api: api, + q: q.q, + ruleGroups: q.f(), + ruleValues: make(map[string]*queryResult), + mint: mint, + maxt: maxt, + }, nil +} + +type queryResult struct { + v model.Value + warnings storage.Warnings + err error +} + +// TODO: move to a util package? +func StringsToWarnings(ins []string) storage.Warnings { + warnings := make(storage.Warnings, len(ins)) + for i, in := range ins { + warnings[i] = errors.New(in) + } + + return warnings +} + +// AlertBackfillQuerier will Query a downstream storage.Queryable for the +// ALERTS_FOR_STATE series, if that series is not found -- it will then +// run the appropriate query_range equivalent to re-generate the data. +type AlertBackfillQuerier struct { + ctx context.Context + q storage.Queryable + api promclient.API + ruleGroups []*rules.Group + + // map of (groupidx.ruleidx) -> result + ruleValues map[string]*queryResult + mint int64 + maxt int64 +} + +// Select will fetch and return the ALERTS_FOR_STATE series for the given matchers +func (q *AlertBackfillQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + // first, we call the actual downstream to see if we have the correct data + // this will return something if the remote_write from promxy has been saved + // somewhere where promxy is also configured to read from + querier, err := q.q.Querier(q.ctx, q.mint, q.maxt) + if err != nil { + return proxyquerier.NewSeriesSet(nil, nil, err) + } + ret := querier.Select(sortSeries, hints, matchers...) + downstreamSeries := make([]storage.Series, 0) + for ret.Next() { + downstreamSeries = append(downstreamSeries, ret.At()) + } + // If the raw queryable had something; return that + if len(downstreamSeries) > 0 { + return proxyquerier.NewSeriesSet(downstreamSeries, ret.Warnings(), ret.Err()) + } + + // Find our Rule + interval + key, matchingRule, interval := FindGroupAndAlert(q.ruleGroups, matchers) + + // If we can't find a matching rule; return an empty set + if matchingRule == nil { + return proxyquerier.NewSeriesSet(nil, nil, nil) + } + + result, ok := q.ruleValues[key] + // If we haven't queried this *rule* before; lets load that + if !ok { + now := time.Now() + value, warnings, err := q.api.QueryRange(q.ctx, matchingRule.Query().String(), v1.Range{ + // Start is the HoldDuration + 1 step (to avoid "edge" issues) + Start: now.Add(-1 * matchingRule.HoldDuration()).Add(-1 * interval), + End: now, + Step: interval, + }) + result = &queryResult{ + v: value, + warnings: StringsToWarnings(warnings), + err: err, + } + q.ruleValues[key] = result + } + + // If there are any errors; we can return those + if result.err != nil { + return proxyquerier.NewSeriesSet(nil, result.warnings, result.err) + } + + resultMatrix, ok := result.v.(model.Matrix) + if !ok { + return proxyquerier.NewSeriesSet(nil, nil, fmt.Errorf("backfill query returned unexpected type: %T", result.v)) + } + + iterators := promclient.IteratorsForValue(GenerateAlertStateMatrix(resultMatrix, matchers, matchingRule.Labels(), interval)) + + series := make([]storage.Series, len(iterators)) + for i, iterator := range iterators { + series[i] = &proxyquerier.Series{iterator} + } + + return proxyquerier.NewSeriesSet(series, result.warnings, nil) +} + +func (q *AlertBackfillQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { + return nil, nil, fmt.Errorf("not implemented") +} + +func (q *AlertBackfillQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { + return nil, nil, fmt.Errorf("not implemented") +} + +func (q *AlertBackfillQuerier) Close() error { return nil } diff --git a/pkg/alertbackfill/rule.go b/pkg/alertbackfill/rule.go new file mode 100644 index 000000000..9a96221f4 --- /dev/null +++ b/pkg/alertbackfill/rule.go @@ -0,0 +1,83 @@ +package alertbackfill + +import ( + "fmt" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/rules" +) + +func FindGroupAndAlert(groups []*rules.Group, matchers []*labels.Matcher) (string, *rules.AlertingRule, time.Duration) { + // right now upstream alert state restore *just* uses the name of the alert and labels + // this causes issues if there are any additional alerts (as only one will be queried + // and it's state will be used for all alerts with the same name + labels). + // TODO: Once upstream fixes this (https://github.com/prometheus/prometheus/issues/12714) + // we'll want to adjust this logic. For now we're effectively mirroring upstream logic + // Now we need to "backfill" the data (regenerate the series that it would have queried) + var ( + matchingGroupIdx int + matchingRuleIdx int + matchingGroup *rules.Group + matchingRule *rules.AlertingRule + found bool + ) + var alertname string + for _, matcher := range matchers { + if matcher.Name == model.AlertNameLabel { + alertname = matcher.Value + break + } + } + if alertname == "" { + panic("what") + } + +FIND_RULE: + for i, group := range groups { + RULE_LOOP: + for ii, rule := range group.Rules() { + alertingRule, ok := rule.(*rules.AlertingRule) + if !ok { + continue + } + // For now we check if the name and all given labels match + // which is both the best we can do, and equivalent to + // direct prometheus behavior + if alertingRule.Name() == alertname { + // Check the rule labels fit the matcher set + for _, lbl := range alertingRule.Labels() { + MATCHERLOOP: + for _, m := range matchers { + // skip matchers that we know we will overwrite + switch m.Name { + case model.MetricNameLabel, model.AlertNameLabel: + continue MATCHERLOOP + } + if lbl.Name == m.Name { + if !m.Matches(lbl.Value) { + continue RULE_LOOP + } + break + } + } + } + + matchingGroupIdx = i + matchingRuleIdx = ii + matchingGroup = group + matchingRule = alertingRule + found = true + break FIND_RULE + } + } + } + + if !found { + return "", nil, time.Duration(0) + } + + key := fmt.Sprintf("%d.%d", matchingGroupIdx, matchingRuleIdx) + return key, matchingRule, matchingGroup.Interval() +} diff --git a/pkg/alertbackfill/rule_test.go b/pkg/alertbackfill/rule_test.go new file mode 100644 index 000000000..503c53868 --- /dev/null +++ b/pkg/alertbackfill/rule_test.go @@ -0,0 +1,126 @@ +package alertbackfill + +import ( + "strconv" + "strings" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/rules" +) + +func TestFindGroupAndAlert(t *testing.T) { + groups := []*rules.Group{ + rules.NewGroup(rules.GroupOptions{ + Name: "a", + Interval: time.Minute, + Rules: []rules.Rule{ + rules.NewAlertingRule( + "testalert", // name + nil, // expression + time.Hour, // hold + labels.Labels{ + labels.Label{"labelkey", "labelvalue"}, + }, // labels + nil, // annotations + nil, // externalLabels + "", // externalURL + false, // restored + nil, // logger + ), + rules.NewAlertingRule( + "alertWithLabels", // name + nil, // expression + time.Hour, // hold + labels.Labels{ + labels.Label{"labelkey", "labelvalue"}, + }, // labels + nil, // annotations + nil, // externalLabels + "", // externalURL + false, // restored + nil, // logger + ), + rules.NewAlertingRule( + "alertWithLabels", // name + nil, // expression + time.Hour, // hold + labels.Labels{ + labels.Label{"labelkey", "labelvalue2"}, + }, // labels + nil, // annotations + nil, // externalLabels + "", // externalURL + false, // restored + nil, // logger + ), + }, + Opts: &rules.ManagerOptions{}, + }), + } + + tests := []struct { + matchers []*labels.Matcher + key string + }{ + // basic finding of a single + { + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "ALERTS_FOR_STATE"), + labels.MustNewMatcher(labels.MatchEqual, model.AlertNameLabel, "testalert"), + }, + key: "0.0", + }, + // ask for an alert that doesn't exist + { + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "ALERTS_FOR_STATE"), + labels.MustNewMatcher(labels.MatchEqual, model.AlertNameLabel, "notanalert"), + }, + key: "", + }, + // More explicit matchers + { + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "ALERTS_FOR_STATE"), + labels.MustNewMatcher(labels.MatchEqual, model.AlertNameLabel, "alertWithLabels"), + labels.MustNewMatcher(labels.MatchEqual, "labelkey", "labelvalue"), + }, + key: "0.1", + }, + { + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "ALERTS_FOR_STATE"), + labels.MustNewMatcher(labels.MatchEqual, model.AlertNameLabel, "alertWithLabels"), + labels.MustNewMatcher(labels.MatchEqual, "labelkey", "labelvalue2"), + }, + key: "0.2", + }, + } + + for i, test := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + k, alertingRule, interval := FindGroupAndAlert(groups, test.matchers) + + if k != test.key { + t.Fatalf("Mismatch in key expected=%v actual=%v", test.key, k) + } + + if k == "" { + return + } + + groupIdx, _ := strconv.Atoi(strings.Split(k, ".")[0]) + if groups[groupIdx].Interval() != interval { + t.Fatalf("mismatch in interval expected=%v actual=%v", groups[groupIdx].Interval(), interval) + } + + alertIdx, _ := strconv.Atoi(strings.Split(k, ".")[1]) + if groups[groupIdx].Rules()[alertIdx] != alertingRule { + t.Fatalf("mismatch in alertingRule expected=%v actual=%v", groups[groupIdx].Rules()[alertIdx], alertingRule) + } + }) + } +} diff --git a/pkg/promclient/engine.go b/pkg/promclient/engine.go new file mode 100644 index 000000000..8379ef52b --- /dev/null +++ b/pkg/promclient/engine.go @@ -0,0 +1,120 @@ +package promclient + +import ( + "context" + "fmt" + "time" + + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/storage" +) + +// StorageWarningsToAPIWarnings simply converts `storage.Warnings` to `v1.Warnings` +// which is simply converting a []error -> []string +// TODO: move to a util package? +func StorageWarningsToAPIWarnings(warnings storage.Warnings) v1.Warnings { + ret := make(v1.Warnings, len(warnings)) + for i, w := range warnings { + ret[i] = w.Error() + } + + return ret +} + +// ParserValueToModelValue can *parser.Value to model.Value +func ParserValueToModelValue(value parser.Value) (model.Value, error) { + switch v := value.(type) { + case promql.Matrix: + matrix := make(model.Matrix, v.Len()) + for i, item := range v { + metric := make(model.Metric) + for _, label := range item.Metric { + metric[model.LabelName(label.Name)] = model.LabelValue(label.Value) + } + + samples := make([]model.SamplePair, len(item.Points)) + for x, sample := range item.Points { + samples[x] = model.SamplePair{ + Timestamp: model.Time(sample.T), + Value: model.SampleValue(sample.V), + } + } + + matrix[i] = &model.SampleStream{ + Metric: metric, + Values: samples, + } + } + return matrix, nil + default: + return nil, fmt.Errorf("unknown type %T", v) + } +} + +// NewEngineAPI returns a new EngineAPI +func NewEngineAPI(e *promql.Engine, q storage.Queryable) (*EngineAPI, error) { + return &EngineAPI{ + e: e, + q: q, + }, nil +} + +// EngineAPI implements the API interface using a Queryable and an engine +type EngineAPI struct { + e *promql.Engine + q storage.Queryable +} + +// LabelNames returns all the unique label names present in the block in sorted order. +func (a *EngineAPI) LabelNames(ctx context.Context, matchers []string, startTime time.Time, endTime time.Time) ([]string, v1.Warnings, error) { + return nil, nil, fmt.Errorf("not implemented") +} + +// LabelValues performs a query for the values of the given label. +func (a *EngineAPI) LabelValues(ctx context.Context, label string, matchers []string, startTime time.Time, endTime time.Time) (model.LabelValues, v1.Warnings, error) { + return nil, nil, fmt.Errorf("not implemented") +} + +// Query performs a query for the given time. +func (a *EngineAPI) Query(ctx context.Context, query string, ts time.Time) (model.Value, v1.Warnings, error) { + return nil, nil, fmt.Errorf("not implemented") +} + +// QueryRange performs a query for the given range. +func (a *EngineAPI) QueryRange(ctx context.Context, query string, r v1.Range) (model.Value, v1.Warnings, error) { + engineQuery, err := a.e.NewRangeQuery(a.q, &promql.QueryOpts{false}, query, r.Start, r.End, r.Step) + if err != nil { + return nil, nil, err + } + + result := engineQuery.Exec(ctx) + if result.Err != nil { + return nil, StorageWarningsToAPIWarnings(result.Warnings), result.Err + } + + val, err := ParserValueToModelValue(result.Value) + if err != nil { + return nil, nil, err + } + + return val, StorageWarningsToAPIWarnings(result.Warnings), nil +} + +// Series finds series by label matchers. +func (a *EngineAPI) Series(ctx context.Context, matches []string, startTime time.Time, endTime time.Time) ([]model.LabelSet, v1.Warnings, error) { + return nil, nil, fmt.Errorf("not implemented") +} + +// GetValue loads the raw data for a given set of matchers in the time range +func (a *EngineAPI) GetValue(ctx context.Context, start, end time.Time, matchers []*labels.Matcher) (model.Value, v1.Warnings, error) { + return nil, nil, fmt.Errorf("not implemented") +} + +// Metadata returns metadata about metrics currently scraped by the metric name. +func (a *EngineAPI) Metadata(ctx context.Context, metric, limit string) (map[string][]v1.Metadata, error) { + return nil, fmt.Errorf("not implemented") +} diff --git a/pkg/promclient/engine_test.go b/pkg/promclient/engine_test.go new file mode 100644 index 000000000..83ea6d7a5 --- /dev/null +++ b/pkg/promclient/engine_test.go @@ -0,0 +1,59 @@ +package promclient + +import ( + "context" + "os" + "testing" + "time" + + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/promql" +) + +func TestEngineAPI(t *testing.T) { + // create test + content, err := os.ReadFile("testdata/metric_relabel.test") + if err != nil { + t.Fatal(err) + } + + test, err := promql.NewTest(t, string(content)) + if err != nil { + t.Fatal(err) + } + if err := test.Run(); err != nil { + t.Fatal(err) + } + + api, err := NewEngineAPI(test.QueryEngine(), test.Queryable()) + if err != nil { + t.Fatal(err) + } + ctx := context.TODO() + + t.Run("QueryRange", func(t *testing.T) { + value, warnings, err := api.QueryRange(ctx, "prometheus_build_info", v1.Range{ + Start: model.Time(0).Time(), + End: model.Time(10).Time(), + Step: time.Duration(1e6), + }) + + if len(warnings) > 0 { + t.Fatalf("unexpected warnings: %v", warnings) + } + + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + matrixValue, ok := value.(model.Matrix) + if !ok { + t.Fatalf("unexpected data type: %T", value) + } + if len(matrixValue) != 1 { + t.Fatalf("expecting a single series: %v", matrixValue) + } + }) + +}