Skip to content

Commit

Permalink
Metric filter
Browse files Browse the repository at this point in the history
  • Loading branch information
alpinskiy committed Nov 13, 2024
1 parent ad4ba42 commit 75ac31e
Show file tree
Hide file tree
Showing 12 changed files with 540 additions and 474 deletions.
72 changes: 18 additions & 54 deletions internal/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1039,46 +1039,22 @@ func formValueParamMetric(r *http.Request) string {
return mergeMetricNamespace(ns, str)
}

func (h *requestHandler) resolveFilter(metricMeta *format.MetricMetaValue, version string, f map[string][]string) (map[string][]interface{}, error) {
m := make(map[string][]interface{}, len(f))
func (h *requestHandler) resolveFilter(metricMeta *format.MetricMetaValue, version string, f map[string][]string) (data_model.TagFilters, error) {
var m data_model.TagFilters
for k, values := range f {
if version == Version1 && k == format.EnvTagID {
continue // we only support production tables for v1
}
if k == format.StringTopTagID {
for _, val := range values {
m[k] = append(m[k], unspecifiedToEmpty(val))
m.StringTop = append(m.StringTop, unspecifiedToEmpty(val))
}
} else {
ids, err := h.getRichTagValueIDs(metricMeta, version, k, values)
if err != nil {
return nil, err
}
m[k] = []interface{}{}
for _, id := range ids {
m[k] = append(m[k], id)
}
}
}
return m, nil
}

func (h *requestHandler) resolveFilterV3(metricMeta *format.MetricMetaValue, f map[string][]string) (map[string][]maybeMappedTag, error) {
m := make(map[string][]maybeMappedTag, len(f))
for k, values := range f {
if k == format.StringTopTagID {
for _, val := range values {
m[k] = append(m[k], maybeMappedTag{Value: unspecifiedToEmpty(val)})
}
} else {
ids, err := h.getRichTagValueIDs(metricMeta, Version3, k, values)
if err != nil {
return nil, err
}
m[k] = []maybeMappedTag{}
for i := range ids {
m[k] = append(m[k], maybeMappedTag{values[i], ids[i]})
return data_model.TagFilters{}, err
}
m.AppendMapped(metricMeta.Name2Tag[k].Index, ids...)
}
}
return m, nil
Expand Down Expand Up @@ -1824,22 +1800,11 @@ func (h *requestHandler) handleGetMetricTagValues(ctx context.Context, req getMe
if err != nil {
return nil, false, err
}
mappedFilterIn.Metrics.Head = metricMeta
mappedFilterNotIn, err := h.resolveFilter(metricMeta, version, filterNotIn)
if err != nil {
return nil, false, err
}
var filterInV3 map[string][]maybeMappedTag
var filterNotInV3 map[string][]maybeMappedTag
if version == Version3 {
filterInV3, err = h.resolveFilterV3(metricMeta, filterIn)
if err != nil {
return nil, false, err
}
filterNotInV3, err = h.resolveFilterV3(metricMeta, filterNotIn)
if err != nil {
return nil, false, err
}
}

lods, err := data_model.GetLODs(data_model.GetTimescaleArgs{
Version: version,
Expand All @@ -1857,14 +1822,12 @@ func (h *requestHandler) handleGetMetricTagValues(ctx context.Context, req getMe
}

pq := &preparedTagValuesQuery{
metricID: metricMeta.MetricID,
preKeyTagID: metricMeta.PreKeyTagID,
tagID: tagID,
numResults: numResults,
filterIn: mappedFilterIn,
filterNotIn: mappedFilterNotIn,
filterInV3: filterInV3,
filterNotInV3: filterNotInV3,
preKeyTagX: format.TagIndex(metricMeta.PreKeyTagID),
preKeyTagID: metricMeta.PreKeyTagID,
tagID: tagID,
numResults: numResults,
filterIn: mappedFilterIn,
filterNotIn: mappedFilterNotIn,
}

tagInfo := map[selectRow]float64{}
Expand Down Expand Up @@ -2297,6 +2260,7 @@ func (h *requestHandler) handleGetTable(ctx context.Context, req seriesRequest)
if err != nil {
return nil, false, err
}
mappedFilterIn.Metrics.Head = metricMeta
mappedFilterNotIn, err := h.resolveFilter(metricMeta, req.version, req.filterNotIn)
if err != nil {
return nil, false, err
Expand Down Expand Up @@ -2931,13 +2895,13 @@ func loadPoints(ctx context.Context, h *requestHandler, pq *preparedPointsQuery,
isFast := lod.IsFast()
isLight := pq.isLight()
IsHardware := pq.IsHardware()
metric := pq.metricID
metric := pq.metricID()
table := lod.Table
kind := pq.kind
var metricName string
if m, ok := format.BuiltinMetrics[pq.metricID]; ok {
if m, ok := format.BuiltinMetrics[metric]; ok {
metricName = m.Name
} else if m := h.metricsStorage.GetMetaMetric(pq.metricID); m != nil {
} else if m := h.metricsStorage.GetMetaMetric(metric); m != nil {
metricName = m.Name
}
start := time.Now()
Expand Down Expand Up @@ -3011,7 +2975,7 @@ func loadPoints(ctx context.Context, h *requestHandler, pq *preparedPointsQuery,
}

func loadPoint(ctx context.Context, h *requestHandler, pq *preparedPointsQuery, lod data_model.LOD) ([]pSelectRow, error) {
query, args, err := loadPointQuery(pq, lod, h.utcOffset)
query, args, err := loadPointQuery(pq, lod)
if err != nil {
return nil, err
}
Expand All @@ -3021,7 +2985,7 @@ func loadPoint(ctx context.Context, h *requestHandler, pq *preparedPointsQuery,
isFast := lod.IsFast()
isLight := pq.isLight()
isHardware := pq.IsHardware()
metric := pq.metricID
metric := pq.metricID()
table := lod.Table
kind := pq.kind
err = h.doSelect(ctx, util.QueryMetaInto{
Expand Down
129 changes: 39 additions & 90 deletions internal/api/promql.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,12 +348,10 @@ func promRespondError(w http.ResponseWriter, typ promErrorType, err error) {

// endregion

func (h *requestHandler) MatchMetrics(ctx context.Context, matcher *labels.Matcher, namespace string) ([]*format.MetricMetaValue, error) {
res := h.metricsStorage.MatchMetrics(matcher, namespace, h.showInvisible, nil)
for _, metric := range res {
if !h.accessInfo.CanViewMetric(*metric) {
return nil, httpErr(http.StatusForbidden, fmt.Errorf("metric %q forbidden", metric.Name))
}
func (h *requestHandler) MatchMetrics(matcher *labels.Matcher, namespace string) (data_model.QueryFilter, error) {
res, ok := h.metricsStorage.MatchMetrics(h.accessInfo.CanViewMetric, matcher, namespace, h.showInvisible)
if !ok {
return data_model.QueryFilter{}, httpErr(http.StatusForbidden, nil)
}
return res, nil
}
Expand Down Expand Up @@ -388,9 +386,6 @@ func (h *requestHandler) GetTagValueID(qry promql.TagValueIDQuery) (int32, error
}

func (h *requestHandler) QuerySeries(ctx context.Context, qry *promql.SeriesQuery) (promql.Series, func(), error) {
if !h.accessInfo.CanViewMetricName(qry.Metric.Name) {
return promql.Series{}, func() {}, httpErr(http.StatusForbidden, fmt.Errorf("metric %q forbidden", qry.Metric.Name))
}
if qry.Options.Mode == data_model.PointQuery {
for _, what := range qry.Whats {
switch what.Digest {
Expand All @@ -410,22 +405,22 @@ func (h *requestHandler) QuerySeries(ctx context.Context, qry *promql.SeriesQuer
} else {
step = qry.Timescale.Step
}
res := promql.Series{Meta: promql.SeriesMeta{Metric: qry.Metric}}
res := promql.Series{Meta: promql.SeriesMeta{Metric: qry.Metric()}}
if len(qry.Whats) == 1 {
switch qry.Whats[0].Digest {
case data_model.DigestCount, data_model.DigestCountSec, data_model.DigestCountRaw,
data_model.DigestStdVar, data_model.DigestCardinality, data_model.DigestCardinalitySec,
data_model.DigestCardinalityRaw, data_model.DigestUnique, data_model.DigestUniqueSec:
// measure units does not apply to counters
default:
res.Meta.Units = qry.Metric.MetricType
res.Meta.Units = qry.Metric().MetricType
}
}
var lods []data_model.LOD
if qry.Options.Mode == data_model.PointQuery {
lod0 := qry.Timescale.LODs[0]
start := qry.Timescale.Time[0]
metric := qry.Metric
metric := qry.Metric()
lods = []data_model.LOD{{
FromSec: qry.Timescale.Time[0] - qry.Offset,
ToSec: qry.Timescale.Time[1] - qry.Offset,
Expand All @@ -436,7 +431,7 @@ func (h *requestHandler) QuerySeries(ctx context.Context, qry *promql.SeriesQuer
Location: h.location,
}}
} else {
lods = qry.Timescale.GetLODs(qry.Metric, qry.Offset)
lods = qry.Timescale.GetLODs(qry.Metric(), qry.Offset)
}
tagX := make(map[tsTags]int, len(qry.GroupBy))
var buffers []*[]float64
Expand Down Expand Up @@ -581,24 +576,24 @@ func (h *requestHandler) QuerySeries(ctx context.Context, qry *promql.SeriesQuer
for v, j := range tagX {
for _, groupBy := range qry.GroupBy {
switch groupBy {
case format.StringTopTagID, qry.Metric.StringTopName:
case format.StringTopTagID, qry.Metric().StringTopName:
res.AddTagAt(i+j, &promql.SeriesTag{
Metric: qry.Metric,
Metric: qry.Metric(),
Index: format.StringTopTagIndex + promql.SeriesTagIndexOffset,
ID: format.StringTopTagID,
Name: qry.Metric.StringTopName,
Name: qry.Metric().StringTopName,
SValue: emptyToUnspecified(v.tagStr.String()),
})
case format.ShardTagID:
res.AddTagAt(i+j, &promql.SeriesTag{
Metric: qry.Metric,
Metric: qry.Metric(),
ID: promql.LabelShard,
Value: int32(v.shardNum),
})
default:
if m, ok := qry.Metric.Name2Tag[groupBy]; ok && m.Index < len(v.tag) {
if m, ok := qry.Metric().Name2Tag[groupBy]; ok && m.Index < len(v.tag) {
st := &promql.SeriesTag{
Metric: qry.Metric,
Metric: qry.Metric(),
Index: m.Index + promql.SeriesTagIndexOffset,
ID: format.TagID(m.Index),
Name: m.Name,
Expand Down Expand Up @@ -632,7 +627,8 @@ func (h *requestHandler) QuerySeries(ctx context.Context, qry *promql.SeriesQuer
func (h *requestHandler) QueryTagValueIDs(ctx context.Context, qry promql.TagValuesQuery) ([]int32, error) {
var (
pq = &preparedTagValuesQuery{
metricID: qry.Metric.MetricID,
filterIn: data_model.MetricFilter(qry.Metric),
preKeyTagX: format.TagIndex(qry.Metric.PreKeyTagID),
preKeyTagID: qry.Metric.PreKeyTagID,
tagID: format.TagID(qry.TagIndex),
numResults: math.MaxInt - 1,
Expand Down Expand Up @@ -676,7 +672,8 @@ func (h *requestHandler) QueryTagValueIDs(ctx context.Context, qry promql.TagVal
func (h *requestHandler) QueryStringTop(ctx context.Context, qry promql.TagValuesQuery) ([]string, error) {
var (
pq = &preparedTagValuesQuery{
metricID: qry.Metric.MetricID,
filterIn: data_model.MetricFilter(qry.Metric),
preKeyTagX: format.TagIndex(qry.Metric.PreKeyTagID),
preKeyTagID: qry.Metric.PreKeyTagID,
tagID: format.StringTopTagID,
numResults: math.MaxInt - 1,
Expand Down Expand Up @@ -735,49 +732,6 @@ type handlerWhat struct {
}

func (h *requestHandler) getHandlerArgs(qry *promql.SeriesQuery, step int64) map[data_model.DigestKind]handlerArgs {
// filtering
var (
filterIn = make(map[string][]string)
filterInM = make(map[string][]any) // mapped
filterInV3 = make(map[string][]maybeMappedTag)
)
for i, m := range qry.FilterIn {
if i == 0 && qry.Options.Version == Version1 {
continue
}
tagName := format.TagID(i)
for tagValue, tagValueID := range m {
filterIn[tagName] = append(filterIn[tagName], tagValue)
filterInM[tagName] = append(filterInM[tagName], tagValueID)
filterInV3[tagName] = append(filterInV3[tagName], maybeMappedTag{tagValue, tagValueID})
}
}
for _, tagValue := range qry.SFilterIn {
filterIn[format.StringTopTagID] = append(filterIn[format.StringTopTagID], promqlEncodeSTagValue(tagValue))
filterInM[format.StringTopTagID] = append(filterInM[format.StringTopTagID], tagValue)
filterInV3[format.StringTopTagID] = append(filterInV3[format.StringTopTagID], maybeMappedTag{Value: tagValue})
}
var (
filterOut = make(map[string][]string)
filterOutM = make(map[string][]any) // mapped
filterOutV3 = make(map[string][]maybeMappedTag)
)
for i, m := range qry.FilterOut {
if i == 0 && qry.Options.Version == Version1 {
continue
}
tagID := format.TagID(i)
for tagValue, tagValueID := range m {
filterOut[tagID] = append(filterOut[tagID], tagValue)
filterOutM[tagID] = append(filterOutM[tagID], tagValueID)
filterOutV3[tagID] = append(filterOutV3[tagID], maybeMappedTag{tagValue, tagValueID})
}
}
for _, tagValue := range qry.SFilterOut {
filterOut[format.StringTopTagID] = append(filterOut[format.StringTopTagID], promqlEncodeSTagValue(tagValue))
filterOutM[format.StringTopTagID] = append(filterOutM[format.StringTopTagID], tagValue)
filterOutV3[format.StringTopTagID] = append(filterOutV3[format.StringTopTagID], maybeMappedTag{Value: tagValue})
}
// grouping
var groupBy []string
switch qry.Options.Version {
Expand Down Expand Up @@ -826,19 +780,18 @@ func (h *requestHandler) getHandlerArgs(qry *promql.SeriesQuery, step int64) map
}
}
// cache key & query
preKeyTagID := qry.Metric().PreKeyTagID
for kind, args := range res {
// TODO switch to v3 filters, for now we always use v2
args.qs = normalizedQueryString(qry.Metric.Name, kind, groupBy, filterIn, filterOut, false)
metricName := fmt.Sprintf("%s%s", qry.Matcher.Type, qry.Matcher.Value)
args.qs = normalizedQueryString(metricName, kind, groupBy, qry.FilterIn, qry.FilterNotIn, false)
args.pq = preparedPointsQuery{
user: h.accessInfo.user,
metricID: qry.Metric.MetricID,
preKeyTagID: qry.Metric.PreKeyTagID,
kind: kind,
by: qry.GroupBy,
filterIn: filterInM,
filterNotIn: filterOutM,
filterInV3: filterInV3,
filterNotInV3: filterOutV3,
user: h.accessInfo.user,
preKeyTagX: format.TagIndex(preKeyTagID),
preKeyTagID: preKeyTagID,
kind: kind,
by: qry.GroupBy,
filterIn: qry.FilterIn,
filterNotIn: qry.FilterNotIn,
}
res[kind] = args
}
Expand All @@ -856,7 +809,7 @@ func (h *Handler) Free(s *[]float64) {
h.putFloatsSlice(s)
}

func (h *Handler) getPromQuery(req seriesRequest) (string, error) {
func (h *requestHandler) getPromQuery(req seriesRequest) (string, error) {
if len(req.promQL) != 0 {
return req.promQL, nil
}
Expand All @@ -878,11 +831,14 @@ func (h *Handler) getPromQuery(req seriesRequest) (string, error) {
}
// filtering and grouping
var filterGroupBy []string
var m [1]*format.MetricMetaValue
matcher := labels.Matcher{Type: labels.MatchEqual, Value: req.metricName}
copy(m[:], h.metricsStorage.MatchMetrics(&matcher, "", h.showInvisible, m[:0]))
filter, err := h.MatchMetrics(&matcher, "")
if err != nil {
return "", err
}
m := filter.FilterIn.Metrics.Head
if len(req.by) != 0 {
by, err := promqlGetBy(req.by, m[0])
by, err := promqlGetBy(req.by, m)
if err != nil {
return "", err
}
Expand All @@ -894,8 +850,8 @@ func (h *Handler) getPromQuery(req seriesRequest) (string, error) {
if err != nil {
return "", err
}
tid = promqlTagName(tid, m[0])
filterGroupBy = append(filterGroupBy, fmt.Sprintf("%s=%q", tid, promqlGetFilterValue(tid, v, m[0])))
tid = promqlTagName(tid, m)
filterGroupBy = append(filterGroupBy, fmt.Sprintf("%s=%q", tid, promqlGetFilterValue(tid, v, m)))
}
}
for t, out := range req.filterNotIn {
Expand All @@ -904,8 +860,8 @@ func (h *Handler) getPromQuery(req seriesRequest) (string, error) {
if err != nil {
return "", err
}
tid = promqlTagName(tid, m[0])
filterGroupBy = append(filterGroupBy, fmt.Sprintf("%s!=%q", tid, promqlGetFilterValue(tid, v, m[0])))
tid = promqlTagName(tid, m)
filterGroupBy = append(filterGroupBy, fmt.Sprintf("%s!=%q", tid, promqlGetFilterValue(tid, v, m)))
}
}
// generate resulting string
Expand Down Expand Up @@ -1010,13 +966,6 @@ func promqlGetFilterValue(tagID string, s string, m *format.MetricMetaValue) str
return s
}

func promqlEncodeSTagValue(s string) string {
if s == "" {
return format.TagValueCodeZero
}
return s
}

func promqlTagName(tagID string, m *format.MetricMetaValue) string {
if m != nil {
if t := m.Name2Tag[tagID]; t.Name != "" {
Expand Down
Loading

0 comments on commit 75ac31e

Please sign in to comment.