Skip to content

Commit

Permalink
import otel test code
Browse files Browse the repository at this point in the history
  • Loading branch information
ryantanjunming committed Nov 13, 2024
1 parent 45c84a9 commit 0ff26a6
Show file tree
Hide file tree
Showing 7 changed files with 371 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ metadata:
labels:
app: telemetrygen
name: telemetrygen-deployment
namespace: e2e-test
spec:
replicas: 1
selector:
Expand Down
4 changes: 2 additions & 2 deletions otel-integration/k8s-helm/e2e-test/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (
"strings"
"testing"

"coralogix.com/otel-integration/e2e/testcommon/k8stest"

"github.com/davecgh/go-spew/spew"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8stest"
)

func TestE2E_Agent(t *testing.T) {
Expand Down
45 changes: 45 additions & 0 deletions otel-integration/k8s-helm/e2e-test/testcommon/k8stest/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package k8stest // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8stest"

import (
"errors"
"fmt"

"k8s.io/client-go/discovery"
memory "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/clientcmd"
)

type K8sClient struct {
DynamicClient *dynamic.DynamicClient
DiscoveryClient *discovery.DiscoveryClient
Mapper *restmapper.DeferredDiscoveryRESTMapper
}

func NewK8sClient(kubeconfigPath string) (*K8sClient, error) {
if kubeconfigPath == "" {
return nil, errors.New("Please provide file path to load kubeconfig")
}
restConfig, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
if err != nil {
return nil, fmt.Errorf("unable to load kubeconfig from %s: %w", kubeconfigPath, err)
}

dynamicClient, err := dynamic.NewForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("error creating dynamic client: %w", err)
}

discoveryClient, err := discovery.NewDiscoveryClientForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("error creating discovery client: %w", err)
}

mapper := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(discoveryClient))
return &K8sClient{
DynamicClient: dynamicClient, DiscoveryClient: discoveryClient, Mapper: mapper}, nil
}
108 changes: 108 additions & 0 deletions otel-integration/k8s-helm/e2e-test/testcommon/k8stest/k8s_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package k8stest // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8stest"

import (
"bytes"
"context"
"os"
"path/filepath"
"testing"
"text/template"
"time"

"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)

func CreateCollectorObjects(t *testing.T, client *K8sClient, testID string, manifestsDir string) []*unstructured.Unstructured {
if manifestsDir == "" {
manifestsDir = filepath.Join(".", "testdata", "e2e", "collector")
}
manifestFiles, err := os.ReadDir(manifestsDir)
require.NoErrorf(t, err, "failed to read collector manifests directory %s", manifestsDir)
host := HostEndpoint(t)
if host == "" {
require.Fail(t, "host endpoint cannot be empty")
}
var podNamespace string
var podLabels map[string]any
createdObjs := make([]*unstructured.Unstructured, 0, len(manifestFiles))
for _, manifestFile := range manifestFiles {
tmpl := template.Must(template.New(manifestFile.Name()).ParseFiles(filepath.Join(manifestsDir, manifestFile.Name())))
manifest := &bytes.Buffer{}
require.NoError(t, tmpl.Execute(manifest, map[string]string{
"Name": "otelcol-" + testID,
"HostEndpoint": host,
"TestID": testID,
}))
obj, err := CreateObject(client, manifest.Bytes())
require.NoErrorf(t, err, "failed to create collector object from manifest %s", manifestFile.Name())
objKind := obj.GetKind()
if objKind == "Deployment" || objKind == "DaemonSet" {
podNamespace = obj.GetNamespace()
selector := obj.Object["spec"].(map[string]any)["selector"]
podLabels = selector.(map[string]any)["matchLabels"].(map[string]any)
}
createdObjs = append(createdObjs, obj)
}

WaitForCollectorToStart(t, client, podNamespace, podLabels)

return createdObjs
}

func WaitForCollectorToStart(t *testing.T, client *K8sClient, podNamespace string, podLabels map[string]any) {
podGVR := schema.GroupVersionResource{Version: "v1", Resource: "pods"}
listOptions := metav1.ListOptions{LabelSelector: SelectorFromMap(podLabels).String()}
podTimeoutMinutes := 3
t.Logf("waiting for collector pods to be ready")
require.Eventuallyf(t, func() bool {
list, err := client.DynamicClient.Resource(podGVR).Namespace(podNamespace).List(context.Background(), listOptions)
require.NoError(t, err, "failed to list collector pods")
podsNotReady := len(list.Items)
if podsNotReady == 0 {
t.Log("did not find collector pods")
return false
}

var pods v1.PodList
err = runtime.DefaultUnstructuredConverter.FromUnstructured(list.UnstructuredContent(), &pods)
require.NoError(t, err, "failed to convert unstructured to podList")

for _, pod := range pods.Items {
podReady := false
if pod.Status.Phase != v1.PodRunning {
t.Logf("pod %v is not running, current phase: %v", pod.Name, pod.Status.Phase)
continue
}
for _, cond := range pod.Status.Conditions {
if cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue {
podsNotReady--
podReady = true
}
}
// Add some debug logs for crashing pods
if !podReady {
for _, cs := range pod.Status.ContainerStatuses {
restartCount := cs.RestartCount
if restartCount > 0 && cs.LastTerminationState.Terminated != nil {
t.Logf("restart count = %d for container %s in pod %s, last terminated reason: %s", restartCount, cs.Name, pod.Name, cs.LastTerminationState.Terminated.Reason)
t.Logf("termination message: %s", cs.LastTerminationState.Terminated.Message)
}
}
}
}
if podsNotReady == 0 {
t.Logf("collector pods are ready")
return true
}
return false
}, time.Duration(podTimeoutMinutes)*time.Minute, 2*time.Second,
"collector pods were not ready within %d minutes", podTimeoutMinutes)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package k8stest // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8stest"

import (
"context"
"runtime"
"testing"
"time"

network2 "github.com/docker/docker/api/types/network"
docker "github.com/docker/docker/client"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/labels"
)

func HostEndpoint(t *testing.T) string {
if runtime.GOOS == "darwin" {
return "host.docker.internal"
}

client, err := docker.NewClientWithOpts(docker.FromEnv)
require.NoError(t, err)
client.NegotiateAPIVersion(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
network, err := client.NetworkInspect(ctx, "kind", network2.InspectOptions{})
require.NoError(t, err)
for _, ipam := range network.IPAM.Config {
if ipam.Gateway != "" {
return ipam.Gateway
}
}
require.Fail(t, "failed to find host endpoint")
return ""
}

func SelectorFromMap(labelMap map[string]any) labels.Selector {
labelStringMap := make(map[string]string)
for key, value := range labelMap {
labelStringMap[key] = value.(string)
}
labelSet := labels.Set(labelStringMap)
return labelSet.AsSelector()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package k8stest // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8stest"

import (
"context"
"os"
"path/filepath"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/serializer/yaml"
"k8s.io/client-go/dynamic"
)

func CreateObject(client *K8sClient, manifest []byte) (*unstructured.Unstructured, error) {
decoder := yaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme)
obj := &unstructured.Unstructured{}
_, gvk, err := decoder.Decode(manifest, nil, obj)
if err != nil {
return nil, err
}
gvr, err := client.Mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, err
}
var resource dynamic.ResourceInterface
if gvr.Scope.Name() == meta.RESTScopeNameNamespace {
resource = client.DynamicClient.Resource(gvr.Resource).Namespace(obj.GetNamespace())
} else {
// cluster-scoped resources
resource = client.DynamicClient.Resource(gvr.Resource)
}

return resource.Create(context.Background(), obj, metav1.CreateOptions{})
}

func DeleteObject(client *K8sClient, obj *unstructured.Unstructured) error {
gvk := obj.GroupVersionKind()
gvr, err := client.Mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return err
}
var resource dynamic.ResourceInterface
if gvr.Scope.Name() == meta.RESTScopeNameNamespace {
resource = client.DynamicClient.Resource(gvr.Resource).Namespace(obj.GetNamespace())
} else {
// cluster-scoped resources
resource = client.DynamicClient.Resource(gvr.Resource)
}
deletePolicy := metav1.DeletePropagationForeground
return resource.Delete(context.Background(), obj.GetName(), metav1.DeleteOptions{
PropagationPolicy: &deletePolicy,
})
}

func CreateObjects(client *K8sClient, dir string) ([]*unstructured.Unstructured, error) {
var objs []*unstructured.Unstructured
files, err := os.ReadDir(dir)
if err != nil {
return nil, err
}

for _, file := range files {
if file.IsDir() {
continue // Skip directories
}
manifest, err := os.ReadFile(filepath.Join(dir, file.Name()))
if err != nil {
return nil, err
}
obj, err := CreateObject(client, manifest)
if err != nil {
return nil, err
}
objs = append(objs, obj)
}
return objs, nil
}

func DeleteObjects(client *K8sClient, objs []*unstructured.Unstructured) error {
for _, obj := range objs {
if err := DeleteObject(client, obj); err != nil {
return err
}
}
return nil
}
Loading

0 comments on commit 0ff26a6

Please sign in to comment.