Skip to content

Commit

Permalink
[connector/routing] Support metric routing (#36236)
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski authored Nov 9, 2024
1 parent 5618c7c commit 2612314
Show file tree
Hide file tree
Showing 12 changed files with 588 additions and 25 deletions.
27 changes: 27 additions & 0 deletions .chloggen/routing-by-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: routingconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add ability to route by metric context

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36236]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
4 changes: 2 additions & 2 deletions connector/routingconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ If you are not already familiar with connectors, you may find it helpful to firs
The following settings are available:

- `table (required)`: the routing table for this connector.
- `table.context (optional, default: resource)`: the [OTTL Context] in which the statement will be evaluated. Currently, only `resource`, `log`, and `request` are supported.
- `table.context (optional, default: resource)`: the [OTTL Context] in which the statement will be evaluated. Currently, only `resource`, `metric`, `log`, and `request` are supported.
- `table.statement`: the routing condition provided as the [OTTL] statement. Required if `table.condition` is not provided. May not be used for `request` context.
- `table.condition`: the routing condition provided as the [OTTL] condition. Required if `table.statement` is not provided. Required for `request` context.
- `table.pipelines (required)`: the list of pipelines to use when the routing condition is met.
Expand All @@ -43,7 +43,7 @@ The following settings are available:

### Limitations

- The `match_once` setting is only supported when using the `resource` context. If any routes use `log` or `request` context, `match_once` must be set to `true`.
- The `match_once` setting is only supported when using the `resource` context. If any routes use `metric`, `log` or `request` context, `match_once` must be set to `true`.
- The `request` context requires use of the `condition` setting, and relies on a very limited grammar. Conditions must be in the form of `request["key"] == "value"` or `request["key"] != "value"`. (In the future, this grammar may be expanded to support more complex conditions.)

### Supported [OTTL] functions
Expand Down
2 changes: 1 addition & 1 deletion connector/routingconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (c *Config) Validate() error {
return err
}
fallthrough
case "log": // ok
case "metric", "log": // ok
if !c.MatchOnce {
return fmt.Errorf(`%q context is not supported with "match_once: false"`, item.Context)
}
Expand Down
16 changes: 16 additions & 0 deletions connector/routingconnector/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,22 @@ func TestValidateConfig(t *testing.T) {
},
error: "invalid context: invalid",
},
{
name: "metric context with match_once false",
config: &Config{
MatchOnce: false,
Table: []RoutingTableItem{
{
Context: "metric",
Statement: `route() where attributes["attr"] == "acme"`,
Pipelines: []pipeline.ID{
pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"),
},
},
},
},
error: `"metric" context is not supported with "match_once: false"`,
},
{
name: "log context with match_once false",
config: &Config{
Expand Down
43 changes: 43 additions & 0 deletions connector/routingconnector/internal/pmetricutil/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,46 @@ func MoveResourcesIf(from, to pmetric.Metrics, f func(pmetric.ResourceMetrics) b
return true
})
}

// MoveMetricsWithContextIf calls f sequentially for each Metric present in the first pmetric.Metrics.
// If f returns true, the element is removed from the first pmetric.Metrics and added to the second pmetric.Metrics.
// Notably, the Resource and Scope associated with the Metric are created in the second pmetric.Metrics only once.
// Resources or Scopes are removed from the original if they become empty. All ordering is preserved.
func MoveMetricsWithContextIf(from, to pmetric.Metrics, f func(pmetric.ResourceMetrics, pmetric.ScopeMetrics, pmetric.Metric) bool) {
rms := from.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
sms := rm.ScopeMetrics()
var rmCopy *pmetric.ResourceMetrics
for j := 0; j < sms.Len(); j++ {
sm := sms.At(j)
ms := sm.Metrics()
var smCopy *pmetric.ScopeMetrics
ms.RemoveIf(func(m pmetric.Metric) bool {
if !f(rm, sm, m) {
return false
}
if rmCopy == nil {
rmc := to.ResourceMetrics().AppendEmpty()
rmCopy = &rmc
rm.Resource().CopyTo(rmCopy.Resource())
rmCopy.SetSchemaUrl(rm.SchemaUrl())
}
if smCopy == nil {
smc := rmCopy.ScopeMetrics().AppendEmpty()
smCopy = &smc
sm.Scope().CopyTo(smCopy.Scope())
smCopy.SetSchemaUrl(sm.SchemaUrl())
}
m.CopyTo(smCopy.Metrics().AppendEmpty())
return true
})
}
sms.RemoveIf(func(sm pmetric.ScopeMetrics) bool {
return sm.Metrics().Len() == 0
})
}
rms.RemoveIf(func(rm pmetric.ResourceMetrics) bool {
return rm.ScopeMetrics().Len() == 0
})
}
144 changes: 144 additions & 0 deletions connector/routingconnector/internal/pmetricutil/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,147 @@ func TestMoveResourcesIf(t *testing.T) {
})
}
}

func TestMoveMetricsWithContextIf(t *testing.T) {
testCases := []struct {
name string
moveIf func(pmetric.ResourceMetrics, pmetric.ScopeMetrics, pmetric.Metric) bool
from pmetric.Metrics
to pmetric.Metrics
expectFrom pmetric.Metrics
expectTo pmetric.Metrics
}{
{
name: "move_none",
moveIf: func(_ pmetric.ResourceMetrics, _ pmetric.ScopeMetrics, _ pmetric.Metric) bool {
return false
},
from: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
to: pmetric.NewMetrics(),
expectFrom: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
expectTo: pmetric.NewMetrics(),
},
{
name: "move_all",
moveIf: func(_ pmetric.ResourceMetrics, _ pmetric.ScopeMetrics, _ pmetric.Metric) bool {
return true
},
from: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
to: pmetric.NewMetrics(),
expectFrom: pmetric.NewMetrics(),
expectTo: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
},
{
name: "move_all_from_one_resource",
moveIf: func(rl pmetric.ResourceMetrics, _ pmetric.ScopeMetrics, _ pmetric.Metric) bool {
rname, ok := rl.Resource().Attributes().Get("resourceName")
return ok && rname.AsString() == "resourceB"
},
from: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
to: pmetric.NewMetrics(),
expectFrom: pmetricutiltest.NewMetrics("A", "CD", "EF", "GH"),
expectTo: pmetricutiltest.NewMetrics("B", "CD", "EF", "GH"),
},
{
name: "move_all_from_one_scope",
moveIf: func(rl pmetric.ResourceMetrics, sl pmetric.ScopeMetrics, _ pmetric.Metric) bool {
rname, ok := rl.Resource().Attributes().Get("resourceName")
return ok && rname.AsString() == "resourceB" && sl.Scope().Name() == "scopeC"
},
from: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
to: pmetric.NewMetrics(),
expectFrom: pmetricutiltest.NewMetricsFromOpts(
pmetricutiltest.WithResource('A',
pmetricutiltest.WithScope('C', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH")),
pmetricutiltest.WithScope('D', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH")),
),
pmetricutiltest.WithResource('B',
pmetricutiltest.WithScope('D', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH")),
),
),
expectTo: pmetricutiltest.NewMetrics("B", "C", "EF", "GH"),
},
{
name: "move_all_from_one_scope_in_each_resource",
moveIf: func(_ pmetric.ResourceMetrics, sl pmetric.ScopeMetrics, _ pmetric.Metric) bool {
return sl.Scope().Name() == "scopeD"
},
from: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
to: pmetric.NewMetrics(),
expectFrom: pmetricutiltest.NewMetrics("AB", "C", "EF", "GH"),
expectTo: pmetricutiltest.NewMetrics("AB", "D", "EF", "GH"),
},
{
name: "move_one",
moveIf: func(rl pmetric.ResourceMetrics, sl pmetric.ScopeMetrics, m pmetric.Metric) bool {
rname, ok := rl.Resource().Attributes().Get("resourceName")
return ok && rname.AsString() == "resourceA" && sl.Scope().Name() == "scopeD" && m.Name() == "metricF"
},
from: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
to: pmetric.NewMetrics(),
expectFrom: pmetricutiltest.NewMetricsFromOpts(
pmetricutiltest.WithResource('A',
pmetricutiltest.WithScope('C', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH")),
pmetricutiltest.WithScope('D', pmetricutiltest.WithMetric('E', "GH")),
),
pmetricutiltest.WithResource('B',
pmetricutiltest.WithScope('C', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH")),
pmetricutiltest.WithScope('D', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH")),
),
),
expectTo: pmetricutiltest.NewMetrics("A", "D", "F", "GH"),
},
{
name: "move_one_from_each_scope",
moveIf: func(_ pmetric.ResourceMetrics, _ pmetric.ScopeMetrics, m pmetric.Metric) bool {
return m.Name() == "metricE"
},
from: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
to: pmetric.NewMetrics(),
expectFrom: pmetricutiltest.NewMetrics("AB", "CD", "F", "GH"),
expectTo: pmetricutiltest.NewMetrics("AB", "CD", "E", "GH"),
},
{
name: "move_one_from_each_scope_in_one_resource",
moveIf: func(rl pmetric.ResourceMetrics, _ pmetric.ScopeMetrics, m pmetric.Metric) bool {
rname, ok := rl.Resource().Attributes().Get("resourceName")
return ok && rname.AsString() == "resourceB" && m.Name() == "metricE"
},
from: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
to: pmetric.NewMetrics(),
expectFrom: pmetricutiltest.NewMetricsFromOpts(
pmetricutiltest.WithResource('A',
pmetricutiltest.WithScope('C', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH")),
pmetricutiltest.WithScope('D', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH")),
),
pmetricutiltest.WithResource('B',
pmetricutiltest.WithScope('C', pmetricutiltest.WithMetric('F', "GH")),
pmetricutiltest.WithScope('D', pmetricutiltest.WithMetric('F', "GH")),
),
),
expectTo: pmetricutiltest.NewMetrics("B", "CD", "E", "GH"),
},
{
name: "move_some_to_preexisting",
moveIf: func(_ pmetric.ResourceMetrics, sl pmetric.ScopeMetrics, _ pmetric.Metric) bool {
return sl.Scope().Name() == "scopeD"
},
from: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"),
to: pmetricutiltest.NewMetrics("1", "2", "3", "4"),
expectFrom: pmetricutiltest.NewMetrics("AB", "C", "EF", "GH"),
expectTo: pmetricutiltest.NewMetricsFromOpts(
pmetricutiltest.WithResource('1', pmetricutiltest.WithScope('2', pmetricutiltest.WithMetric('3', "4"))),
pmetricutiltest.WithResource('A', pmetricutiltest.WithScope('D', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH"))),
pmetricutiltest.WithResource('B', pmetricutiltest.WithScope('D', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH"))),
),
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
pmetricutil.MoveMetricsWithContextIf(tt.from, tt.to, tt.moveIf)
assert.NoError(t, pmetrictest.CompareMetrics(tt.expectFrom, tt.from), "from not modified as expected")
assert.NoError(t, pmetrictest.CompareMetrics(tt.expectTo, tt.to), "to not as expected")
})
}
}
56 changes: 56 additions & 0 deletions connector/routingconnector/internal/pmetricutiltest/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,59 @@ func NewMetrics(resourceIDs, scopeIDs, metricIDs, dataPointIDs string) pmetric.M
}
return md
}

type Resource struct {
id byte
scopes []Scope
}

type Scope struct {
id byte
metrics []Metric
}

type Metric struct {
id byte
dataPoints string
}

func WithResource(id byte, scopes ...Scope) Resource {
r := Resource{id: id}
r.scopes = append(r.scopes, scopes...)
return r
}

func WithScope(id byte, metrics ...Metric) Scope {
s := Scope{id: id}
s.metrics = append(s.metrics, metrics...)
return s
}

func WithMetric(id byte, dataPoints string) Metric {
return Metric{id: id, dataPoints: dataPoints}
}

// NewMetricsFromOpts creates a pmetric.Metrics with the specified resources, scopes, metrics,
// and data points. The general idea is the same as NewMetrics, but this function allows for
// more flexibility in creating non-uniform structures.
func NewMetricsFromOpts(resources ...Resource) pmetric.Metrics {
md := pmetric.NewMetrics()
for _, resource := range resources {
r := md.ResourceMetrics().AppendEmpty()
r.Resource().Attributes().PutStr("resourceName", "resource"+string(resource.id))
for _, scope := range resource.scopes {
s := r.ScopeMetrics().AppendEmpty()
s.Scope().SetName("scope" + string(scope.id))
for _, metric := range scope.metrics {
m := s.Metrics().AppendEmpty()
m.SetName("metric" + string(metric.id))
dps := m.SetEmptyGauge().DataPoints()
for i := 0; i < len(metric.dataPoints); i++ {
dp := dps.AppendEmpty()
dp.Attributes().PutStr("dpName", "dp"+string(metric.dataPoints[i]))
}
}
}
}
return md
}
Loading

0 comments on commit 2612314

Please sign in to comment.