diff --git a/CHANGELOG.md b/CHANGELOG.md index 549c61500..605160feb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +# 14.2.0, UNRELEASED + +## Bugfixes +* A fix for forwarding metrics with gRPC using the kubernetes discoverer. Thanks, [androohan](https://github.com/androohan)! + # 14.1.0, 2021-03-16 ## Added diff --git a/kubernetes.go b/kubernetes.go index f27e4683c..3e08cba1b 100644 --- a/kubernetes.go +++ b/kubernetes.go @@ -29,6 +29,62 @@ func NewKubernetesDiscoverer() (*KubernetesDiscoverer, error) { return &KubernetesDiscoverer{clientset}, nil } +func getDestinationFromPod(podIndex int, pod v1.Pod) string { + var forwardPort string + protocolPrefix := "" + + if pod.Status.Phase != v1.PodRunning { + return "" + } + + // TODO don't assume there is only one container for the veneur global + if len(pod.Spec.Containers) > 0 { + for _, container := range pod.Spec.Containers { + for _, port := range container.Ports { + if port.Name == "grpc" { + forwardPort = strconv.Itoa(int(port.ContainerPort)) + log.WithField("port", forwardPort).Debug("Found grpc port") + break + } + + if port.Name == "http" { + protocolPrefix = "http://" + forwardPort = strconv.Itoa(int(port.ContainerPort)) + log.WithField("port", forwardPort).Debug("Found http port") + break + } + + // TODO don't assume all TCP ports are for importing + if port.Protocol == "TCP" { + protocolPrefix = "http://" + forwardPort = strconv.Itoa(int(port.ContainerPort)) + log.WithField("port", forwardPort).Debug("Found TCP port") + } + } + } + } + + if forwardPort == "" || forwardPort == "0" { + log.WithFields(logrus.Fields{ + "podIndex": podIndex, + "PodIP": pod.Status.PodIP, + "forwardPort": forwardPort, + }).Error("Could not find valid port for forwarding") + return "" + } + + if pod.Status.PodIP == "" { + log.WithFields(logrus.Fields{ + "podIndex": podIndex, + "PodIP": pod.Status.PodIP, + "forwardPort": forwardPort, + }).Error("Could not find valid podIP for forwarding") + return "" + } + + return fmt.Sprintf("%s%s:%s", protocolPrefix, pod.Status.PodIP, forwardPort) +} + func (kd *KubernetesDiscoverer) GetDestinationsForService(serviceName string) ([]string, error) { pods, err := kd.clientset.CoreV1().Pods(metav1.NamespaceAll).List(metav1.ListOptions{ LabelSelector: "app=veneur-global", @@ -39,53 +95,10 @@ func (kd *KubernetesDiscoverer) GetDestinationsForService(serviceName string) ([ ips := make([]string, 0, len(pods.Items)) for podIndex, pod := range pods.Items { - - var forwardPort string - - if pod.Status.Phase != v1.PodRunning { - continue + podIp := getDestinationFromPod(podIndex, pod) + if len(podIp) > 0 { + ips = append(ips, podIp) } - - // TODO don't assume there is only one container for the veneur global - if len(pod.Spec.Containers) > 0 { - for _, container := range pod.Spec.Containers { - for _, port := range container.Ports { - if port.Name == "http" { - forwardPort = strconv.Itoa(int(port.ContainerPort)) - log.WithField("port", forwardPort).Debug("Found http port") - break - } - - // TODO don't assume all TCP ports are for importing - if port.Protocol == "TCP" { - forwardPort = strconv.Itoa(int(port.ContainerPort)) - log.WithField("port", forwardPort).Debug("Found TCP port") - } - } - } - } - - if forwardPort == "" || forwardPort == "0" { - log.WithFields(logrus.Fields{ - "podIndex": podIndex, - "PodIP": pod.Status.PodIP, - "forwardPort": forwardPort, - }).Error("Could not find valid port for forwarding") - continue - } - - if pod.Status.PodIP == "" { - log.WithFields(logrus.Fields{ - "podIndex": podIndex, - "PodIP": pod.Status.PodIP, - "forwardPort": forwardPort, - }).Error("Could not find valid podIP for forwarding") - continue - } - - // prepend with // so that it is a valid URL parseable by url.Parse - podIp := fmt.Sprintf("http://%s:%s", pod.Status.PodIP, forwardPort) - ips = append(ips, podIp) } return ips, nil } diff --git a/kubernetes_test.go b/kubernetes_test.go new file mode 100644 index 000000000..7bfdd3ddf --- /dev/null +++ b/kubernetes_test.go @@ -0,0 +1,83 @@ +package veneur + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/api/core/v1" +) + +func TestGetDestinationFromHttpPod(t *testing.T) { + httpPod := v1.Pod{ + Status: v1.PodStatus{ + Phase: v1.PodRunning, + PodIP: "127.0.0.1", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + v1.Container{ + Ports: []v1.ContainerPort{ + v1.ContainerPort{ + Name: "http", + ContainerPort: 8080, + Protocol: v1.ProtocolTCP, + }, + }, + }, + }, + }, + } + + podID := getDestinationFromPod(0, httpPod) + assert.Equal(t, "http://127.0.0.1:8080", podID) +} + +func TestGetDestinationFromTcpPod(t *testing.T) { + tcpPod := v1.Pod{ + Status: v1.PodStatus{ + Phase: v1.PodRunning, + PodIP: "127.0.0.1", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + v1.Container{ + Ports: []v1.ContainerPort{ + v1.ContainerPort{ + Name: "tcp", + ContainerPort: 8080, + Protocol: v1.ProtocolTCP, + }, + }, + }, + }, + }, + } + + podID := getDestinationFromPod(0, tcpPod) + assert.Equal(t, "http://127.0.0.1:8080", podID) +} + +func TestGetDestinationFromGrpcPod(t *testing.T) { + grpcPod := v1.Pod{ + Status: v1.PodStatus{ + Phase: v1.PodRunning, + PodIP: "127.0.0.1", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + v1.Container{ + Ports: []v1.ContainerPort{ + v1.ContainerPort{ + Name: "grpc", + ContainerPort: 8080, + Protocol: v1.ProtocolTCP, + }, + }, + }, + }, + }, + } + + podID := getDestinationFromPod(0, grpcPod) + assert.Equal(t, "127.0.0.1:8080", podID) +}