Skip to content

Commit

Permalink
traces with debug, extended sink testing code
Browse files Browse the repository at this point in the history
  • Loading branch information
ryantanjunming committed Nov 19, 2024
1 parent e770105 commit c4993eb
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 18 deletions.
11 changes: 10 additions & 1 deletion .github/workflows/otel-integration-e2e-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,21 @@ jobs:
-f ./e2e-test/testdata/values-e2e-test.yaml
kubectl set env daemonset/coralogix-opentelemetry-agent HOSTENDPOINT=${HOSTENDPOINT}
kubectl wait --all --for=condition=ready --timeout=300s pod -l component=agent-collector
- name: Verbose Debugging
env:
HOSTENDPOINT: ${{ env.HOSTENDPOINT }}
KUBECONFIG: ${{ env.KUBECONFIG }}
run: |
kubectl get nodes
kubectl get pods -A
kubectl describe daemonsets coralogix-opentelemetry-agent
kubectl logs -l app.kubernetes.io/name=opentelemetry-agent
- name: Run E2E test
env:
HOSTENDPOINT: ${{ env.HOSTENDPOINT }}
KUBECONFIG: ${{ env.KUBECONFIG }}
run: |
kubectl get pods --all-namespaces
cd ./otel-integration/k8s-helm/e2e-test/
go test -v -run=TestE2E_Agent ./...
13 changes: 8 additions & 5 deletions otel-integration/k8s-helm/e2e-test/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,20 @@ func TestE2E_Agent(t *testing.T) {

metricsConsumer := new(consumertest.MetricsSink)
tracesConsumer := new(consumertest.TracesSink)

shutdownSink := StartUpSinks(t, metricsConsumer, tracesConsumer)
defer shutdownSink()
shutdownSinks := StartUpSinks(t, ReceiverSinks{
Traces: &TraceSinkConfig{
Consumer: tracesConsumer,
},
})
defer shutdownSinks()

nodeIP := os.Getenv("NODE")
testID := uuid.NewString()[:8]
createTeleOpts := &k8stest.TelemetrygenCreateOpts{
ManifestsDir: filepath.Join(k8sDir, "telemetrygen"),
TestID: testID,
OtlpEndpoint: fmt.Sprintf("%s:4317", nodeIP),
DataTypes: []string{"metrics", "traces"},
DataTypes: []string{"traces"},
}
telemetryGenObjs, telemetryGenObjInfos := k8stest.CreateTelemetryGenObjects(t, k8sClient, createTeleOpts)
defer func() {
Expand All @@ -71,7 +74,7 @@ func TestE2E_Agent(t *testing.T) {
k8stest.WaitForTelemetryGenToStart(t, k8sClient, info.Namespace, info.PodLabelSelectors, info.Workload, info.DataType)
}

WaitForMetrics(t, 5, metricsConsumer)
// WaitForMetrics(t, 5, metricsConsumer)
WaitForTraces(t, 5, tracesConsumer)

checkResourceMetrics(t, metricsConsumer.AllMetrics())
Expand Down
223 changes: 211 additions & 12 deletions otel-integration/k8s-helm/e2e-test/sinks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,114 @@ package e2e

import (
"context"
"encoding/json"
"errors"
"fmt"
"regexp"
"slices"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
"go.opentelemetry.io/collector/receiver/receivertest"
)

func StartUpSinks(t *testing.T, mc *consumertest.MetricsSink, tc *consumertest.TracesSink) func() {
f := otlpreceiver.NewFactory()
cfg := f.CreateDefaultConfig().(*otlpreceiver.Config)
cfg.HTTP = nil
cfg.GRPC.NetAddr.Endpoint = "0.0.0.0:4317"

_, err := f.CreateMetrics(context.Background(), receivertest.NewNopSettings(), cfg, mc)
require.NoError(t, err, "failed creating metrics receiver")
rcvr, err := f.CreateTraces(context.Background(), receivertest.NewNopSettings(), cfg, tc)
require.NoError(t, err, "failed creating traces receiver")
require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))
type ExpectedValueMode int

const (
AttributeMatchTypeEqual ExpectedValueMode = iota
AttributeMatchTypeRegex
AttributeMatchTypeExist
UidRe = "[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}"

ServiceNameAttribute = "service.name"
)

type ExpectedValue struct {
Mode ExpectedValueMode
Value string
}

func NewExpectedValue(mode ExpectedValueMode, value string) ExpectedValue {
return ExpectedValue{
Mode: mode,
Value: value,
}
}

type ReceiverSinks struct {
Metrics *MetricSinkConfig
Traces *TraceSinkConfig
Logs *LogSinkConfig
}

type MetricSinkConfig struct {
Ports *ReceiverPorts
Consumer *consumertest.MetricsSink
}

type TraceSinkConfig struct {
Ports *ReceiverPorts
Consumer *consumertest.TracesSink
}

type LogSinkConfig struct {
Ports *ReceiverPorts
Consumer *consumertest.LogsSink
}

type ReceiverPorts struct {
Grpc int
Http int
}

func StartUpSinks(t *testing.T, sinks ReceiverSinks) func() {
shutDownFuncs := []func(){}

if sinks.Metrics != nil {
fMetric := otlpreceiver.NewFactory()
cfg := fMetric.CreateDefaultConfig().(*otlpreceiver.Config)
setupReceiverPorts(cfg, sinks.Metrics.Ports)
metricsRcvr, err := fMetric.CreateMetrics(context.Background(), receivertest.NewNopSettings(), cfg, sinks.Metrics.Consumer)
require.NoError(t, err, "failed creating metrics receiver")
require.NoError(t, metricsRcvr.Start(context.Background(), componenttest.NewNopHost()))
shutDownFuncs = append(shutDownFuncs, func() {
assert.NoError(t, metricsRcvr.Shutdown(context.Background()))
})
}
if sinks.Traces != nil {
fTrace := otlpreceiver.NewFactory()
cfg := fTrace.CreateDefaultConfig().(*otlpreceiver.Config)
setupReceiverPorts(cfg, sinks.Traces.Ports)
tracesRcvr, err := fTrace.CreateTraces(context.Background(), receivertest.NewNopSettings(), cfg, sinks.Traces.Consumer)
require.NoError(t, err, "failed creating traces receiver")
require.NoError(t, tracesRcvr.Start(context.Background(), componenttest.NewNopHost()))
shutDownFuncs = append(shutDownFuncs, func() {
assert.NoError(t, tracesRcvr.Shutdown(context.Background()))
})
}

return func() {
assert.NoError(t, rcvr.Shutdown(context.Background()))
for _, f := range shutDownFuncs {
f()
}
}
}

func setupReceiverPorts(cfg *otlpreceiver.Config, ports *ReceiverPorts) {
if ports != nil {
cfg.GRPC.NetAddr.Endpoint = "0.0.0.0:" + strconv.Itoa(ports.Grpc)
cfg.HTTP.Endpoint = "0.0.0.0:" + strconv.Itoa(ports.Http)
} else {
cfg.GRPC.NetAddr.Endpoint = "0.0.0.0:4317"
cfg.HTTP.Endpoint = "0.0.0.0:4318"
}
}

Expand All @@ -50,3 +134,118 @@ func WaitForTraces(t *testing.T, entriesNum int, tc *consumertest.TracesSink) {
"failed to receive %d entries, received %d traces in %d seconds", entriesNum,
len(tc.AllTraces()), timeoutSeconds)
}

func ScanTracesForAttributes(t *testing.T, ts *consumertest.TracesSink, expectedService string, kvs map[string]ExpectedValue, scopeSpanAttrs []map[string]ExpectedValue) {
for i := 0; i < len(ts.AllTraces()); i++ {
traces := ts.AllTraces()[i]
for i := 0; i < traces.ResourceSpans().Len(); i++ {
resource := traces.ResourceSpans().At(i).Resource()
service, exist := resource.Attributes().Get(ServiceNameAttribute)
assert.True(t, exist, "Resource does not have the 'service.name' attribute")
if service.AsString() != expectedService {
continue
}
assert.NoError(t, assertExpectedAttributes(resource.Attributes(), kvs))

if len(scopeSpanAttrs) == 0 {
return
}

assert.NotZero(t, traces.ResourceSpans().At(i).ScopeSpans().Len())
assert.NotZero(t, traces.ResourceSpans().At(i).ScopeSpans().At(0).Spans().Len())

scopeSpan := traces.ResourceSpans().At(i).ScopeSpans().At(0)

// look for matching spans containing the desired attributes
for _, spanAttrs := range scopeSpanAttrs {
var err error
for j := 0; j < scopeSpan.Spans().Len(); j++ {
err = assertExpectedAttributes(scopeSpan.Spans().At(j).Attributes(), spanAttrs)
if err == nil {
break
}
}
assert.NoError(t, err)
}

return
}
}
t.Fatalf("no spans found for service %s", expectedService)
}

func assertExpectedAttributes(attrs pcommon.Map, kvs map[string]ExpectedValue) error {
foundAttrs := make(map[string]bool)
for k := range kvs {
foundAttrs[k] = false
}

attrs.Range(
func(k string, v pcommon.Value) bool {
if val, ok := kvs[k]; ok {
switch val.Mode {
case AttributeMatchTypeEqual:
if val.Value == v.AsString() {
foundAttrs[k] = true
}
case AttributeMatchTypeRegex:
matched, _ := regexp.MatchString(val.Value, v.AsString())
if matched {
foundAttrs[k] = true
}
case AttributeMatchTypeExist:
foundAttrs[k] = true
}
}
return true
},
)

var err error
for k, v := range foundAttrs {
if !v {
err = errors.Join(err, fmt.Errorf("attribute '%v' not found", k))
}
}
if err != nil {
// if something is missing, add a summary with an overview of the expected and actual attributes for easier troubleshooting
expectedJson, _ := json.MarshalIndent(kvs, "", " ")
actualJson, _ := json.MarshalIndent(attrs.AsRaw(), "", " ")
err = errors.Join(err, fmt.Errorf("one or more attributes were not found.\nExpected attributes:\n %s \nActual attributes: \n%s", expectedJson, actualJson))
}
return err
}

// ScanForServiceMetrics asserts that the metrics sink provided in the arguments
// contains the given metrics for a service
func ScanForServiceMetrics(t *testing.T, ms *consumertest.MetricsSink, expectedService string, expectedMetrics []string) {
for _, r := range ms.AllMetrics() {
for i := 0; i < r.ResourceMetrics().Len(); i++ {
resource := r.ResourceMetrics().At(i).Resource()
service, exist := resource.Attributes().Get(ServiceNameAttribute)
assert.Equal(t, true, exist, "resource does not have the 'service.name' attribute")
if service.AsString() != expectedService {
continue
}

sm := r.ResourceMetrics().At(i).ScopeMetrics().At(0).Metrics()
assert.NoError(t, assertExpectedMetrics(expectedMetrics, sm))
return
}
}
t.Fatalf("no metric found for service %s", expectedService)
}

func assertExpectedMetrics(expectedMetrics []string, sm pmetric.MetricSlice) error {
var actualMetrics []string
for i := 0; i < sm.Len(); i++ {
actualMetrics = append(actualMetrics, sm.At(i).Name())
}

for _, m := range expectedMetrics {
if !slices.Contains(actualMetrics, m) {
return fmt.Errorf("metric: %s not found", m)
}
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package k8stest // import "github.com/open-telemetry/opentelemetry-collector-con
import (
"bytes"
"context"
"fmt"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -41,6 +42,7 @@ func CreateTelemetryGenObjects(t *testing.T, client *K8sClient, createOpts *Tele
tmpl := template.Must(template.New(manifestFile.Name()).ParseFiles(filepath.Join(createOpts.ManifestsDir, manifestFile.Name())))
for _, dataType := range createOpts.DataTypes {
manifest := &bytes.Buffer{}
fmt.Println("Creating telemetrygen object for datatype: ", dataType, createOpts.OtlpEndpoint)
require.NoError(t, tmpl.Execute(manifest, map[string]string{
"Name": "telemetrygen-" + createOpts.TestID,
"DataType": dataType,
Expand Down

0 comments on commit c4993eb

Please sign in to comment.