From cf353121eddcb5a6f9c893bacd412176cb65cca9 Mon Sep 17 00:00:00 2001 From: Andrew Han <75267943+andrewhan-square@users.noreply.github.com> Date: Mon, 21 Jun 2021 15:26:34 -0700 Subject: [PATCH] Allow kubernetes discoverer to use gRPC destinations (#829) * Allow kubernetes discoverer to use gRPC destinations Summary I updated the kubnetes discoverer to look for veneur global containers with a grpc port name. This will allow us to specify that we are using gRPC. Additionally, I removed the http:// prefix in the saved podIp because it hits a "too many colons in address" error with gRPC. This will still work with forwarding with http because in proxy.go, the doPost method will check and append it if its missing. This PR is related to issue https://github.com/stripe/veneur/issues/762 as it removes the http hard coding. Motivation In Kubernetes we want to proxy metrics to veneur global with gRPC. This edit in our fork fixed the issues we were hitting. Test plan Ran this in our veneur-proxy pods that utilize this Kubernetes discoverer code with gRPC to verify everything works. Rollout/monitoring/revert plan This change should be backwards compatible as the doPost function for http communication prepends the necessary prefix. This only affects gRPC destinations used by proxysrv which shouldn't have been available with kubernetes. * Update CHANGELOG for new release * Pulled the logic that generates pod IPs from pod info into it's own function to make it more testable and added some tests on generating these ips Co-authored-by: Chris Solidum Co-authored-by: csolidum --- CHANGELOG.md | 5 +++ kubernetes.go | 105 +++++++++++++++++++++++++-------------------- kubernetes_test.go | 83 +++++++++++++++++++++++++++++++++++ 3 files changed, 147 insertions(+), 46 deletions(-) create mode 100644 kubernetes_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 549c61500..605160feb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +# 14.2.0, UNRELEASED + +## Bugfixes +* A fix for forwarding metrics with gRPC using the kubernetes discoverer. Thanks, [androohan](https://github.com/androohan)! + # 14.1.0, 2021-03-16 ## Added diff --git a/kubernetes.go b/kubernetes.go index f27e4683c..3e08cba1b 100644 --- a/kubernetes.go +++ b/kubernetes.go @@ -29,6 +29,62 @@ func NewKubernetesDiscoverer() (*KubernetesDiscoverer, error) { return &KubernetesDiscoverer{clientset}, nil } +func getDestinationFromPod(podIndex int, pod v1.Pod) string { + var forwardPort string + protocolPrefix := "" + + if pod.Status.Phase != v1.PodRunning { + return "" + } + + // TODO don't assume there is only one container for the veneur global + if len(pod.Spec.Containers) > 0 { + for _, container := range pod.Spec.Containers { + for _, port := range container.Ports { + if port.Name == "grpc" { + forwardPort = strconv.Itoa(int(port.ContainerPort)) + log.WithField("port", forwardPort).Debug("Found grpc port") + break + } + + if port.Name == "http" { + protocolPrefix = "http://" + forwardPort = strconv.Itoa(int(port.ContainerPort)) + log.WithField("port", forwardPort).Debug("Found http port") + break + } + + // TODO don't assume all TCP ports are for importing + if port.Protocol == "TCP" { + protocolPrefix = "http://" + forwardPort = strconv.Itoa(int(port.ContainerPort)) + log.WithField("port", forwardPort).Debug("Found TCP port") + } + } + } + } + + if forwardPort == "" || forwardPort == "0" { + log.WithFields(logrus.Fields{ + "podIndex": podIndex, + "PodIP": pod.Status.PodIP, + "forwardPort": forwardPort, + }).Error("Could not find valid port for forwarding") + return "" + } + + if pod.Status.PodIP == "" { + log.WithFields(logrus.Fields{ + "podIndex": podIndex, + "PodIP": pod.Status.PodIP, + "forwardPort": forwardPort, + }).Error("Could not find valid podIP for forwarding") + return "" + } + + return fmt.Sprintf("%s%s:%s", protocolPrefix, pod.Status.PodIP, forwardPort) +} + func (kd *KubernetesDiscoverer) GetDestinationsForService(serviceName string) ([]string, error) { pods, err := kd.clientset.CoreV1().Pods(metav1.NamespaceAll).List(metav1.ListOptions{ LabelSelector: "app=veneur-global", @@ -39,53 +95,10 @@ func (kd *KubernetesDiscoverer) GetDestinationsForService(serviceName string) ([ ips := make([]string, 0, len(pods.Items)) for podIndex, pod := range pods.Items { - - var forwardPort string - - if pod.Status.Phase != v1.PodRunning { - continue + podIp := getDestinationFromPod(podIndex, pod) + if len(podIp) > 0 { + ips = append(ips, podIp) } - - // TODO don't assume there is only one container for the veneur global - if len(pod.Spec.Containers) > 0 { - for _, container := range pod.Spec.Containers { - for _, port := range container.Ports { - if port.Name == "http" { - forwardPort = strconv.Itoa(int(port.ContainerPort)) - log.WithField("port", forwardPort).Debug("Found http port") - break - } - - // TODO don't assume all TCP ports are for importing - if port.Protocol == "TCP" { - forwardPort = strconv.Itoa(int(port.ContainerPort)) - log.WithField("port", forwardPort).Debug("Found TCP port") - } - } - } - } - - if forwardPort == "" || forwardPort == "0" { - log.WithFields(logrus.Fields{ - "podIndex": podIndex, - "PodIP": pod.Status.PodIP, - "forwardPort": forwardPort, - }).Error("Could not find valid port for forwarding") - continue - } - - if pod.Status.PodIP == "" { - log.WithFields(logrus.Fields{ - "podIndex": podIndex, - "PodIP": pod.Status.PodIP, - "forwardPort": forwardPort, - }).Error("Could not find valid podIP for forwarding") - continue - } - - // prepend with // so that it is a valid URL parseable by url.Parse - podIp := fmt.Sprintf("http://%s:%s", pod.Status.PodIP, forwardPort) - ips = append(ips, podIp) } return ips, nil } diff --git a/kubernetes_test.go b/kubernetes_test.go new file mode 100644 index 000000000..7bfdd3ddf --- /dev/null +++ b/kubernetes_test.go @@ -0,0 +1,83 @@ +package veneur + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/api/core/v1" +) + +func TestGetDestinationFromHttpPod(t *testing.T) { + httpPod := v1.Pod{ + Status: v1.PodStatus{ + Phase: v1.PodRunning, + PodIP: "127.0.0.1", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + v1.Container{ + Ports: []v1.ContainerPort{ + v1.ContainerPort{ + Name: "http", + ContainerPort: 8080, + Protocol: v1.ProtocolTCP, + }, + }, + }, + }, + }, + } + + podID := getDestinationFromPod(0, httpPod) + assert.Equal(t, "http://127.0.0.1:8080", podID) +} + +func TestGetDestinationFromTcpPod(t *testing.T) { + tcpPod := v1.Pod{ + Status: v1.PodStatus{ + Phase: v1.PodRunning, + PodIP: "127.0.0.1", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + v1.Container{ + Ports: []v1.ContainerPort{ + v1.ContainerPort{ + Name: "tcp", + ContainerPort: 8080, + Protocol: v1.ProtocolTCP, + }, + }, + }, + }, + }, + } + + podID := getDestinationFromPod(0, tcpPod) + assert.Equal(t, "http://127.0.0.1:8080", podID) +} + +func TestGetDestinationFromGrpcPod(t *testing.T) { + grpcPod := v1.Pod{ + Status: v1.PodStatus{ + Phase: v1.PodRunning, + PodIP: "127.0.0.1", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + v1.Container{ + Ports: []v1.ContainerPort{ + v1.ContainerPort{ + Name: "grpc", + ContainerPort: 8080, + Protocol: v1.ProtocolTCP, + }, + }, + }, + }, + }, + } + + podID := getDestinationFromPod(0, grpcPod) + assert.Equal(t, "127.0.0.1:8080", podID) +}