diff --git a/globals/globalVars.go b/globals/globalVars.go index 1a62f74..24ea18a 100644 --- a/globals/globalVars.go +++ b/globals/globalVars.go @@ -1,13 +1,13 @@ package globals -import discovery "github.com/gkarthiks/k8s-discovery" +import ( + discovery "github.com/gkarthiks/k8s-discovery" +) var ( - VaultIPList map[string]string K8s *discovery.K8s Namespace string HttpTimeout string - LabelSelector string ) const ( diff --git a/helper/util.go b/helper/util.go index 494b3f7..8fb13f7 100644 --- a/helper/util.go +++ b/helper/util.go @@ -9,7 +9,6 @@ import ( "reflect" "strings" "vault-balancer/globals" - "vault-balancer/types" ) const ( @@ -18,23 +17,22 @@ const ( ) // GetVaultIPsFromLabelSelectors will extract the IP Addresses for the pods that matches the labelSelectors -func GetVaultIPsFromLabelSelectors(vaultPool *types.VaultPool) { - if len(globals.LabelSelector) > 0 { - log.Infof("Discovering the Vault pods based on the label selector '%v'.", globals.LabelSelector) - strings.Split(globals.LabelSelector, ",") - log.Infof("Discovering pods with label selector...") +func GetVaultIPsFromLabelSelectors(labelSelector string, versionLogger *log.Entry) map[string]struct{} { + if len(labelSelector) > 0 { + labelSelector = strings.Join(strings.Split(labelSelector, ","), ",") + versionLogger.Infof("Discovering the Vault pods based on the label selector '%v'.", labelSelector) pods, err := globals.K8s.Clientset.CoreV1().Pods(globals.Namespace).List(context.Background(), metaV1.ListOptions{ - LabelSelector: strings.TrimSpace(globals.LabelSelector), + LabelSelector: strings.TrimSpace(labelSelector), }) if err != nil { - log.Fatalf("err while retrieving the pods: %v", err) + versionLogger.Fatalf("err while retrieving the pods: %v", err) } else { - populateIpAddresses(pods, vaultPool) + ipAddresses := populateIpAddresses(pods) + versionLogger.Infof("Finalized pods discovery process with label selector. Obtained the IP Address %v", reflect.ValueOf(ipAddresses).MapKeys()) + return ipAddresses } - log.Infof("Finalized pods discovery process with label selector. Obtained the IP Address %v", reflect.ValueOf(globals.VaultIPList).MapKeys()) } - - log.Printf("Vault Pool data at the end of GetVault IPs %v", reflect.ValueOf(&vaultPool.VaultBackends).MapKeys()) + return nil } // GetAttemptsFromContext returns the attempts for a request @@ -53,46 +51,14 @@ func GetRetryFromContext(r *http.Request) int { return 0 } -// HealthCheck runs a routine for check status of the pods every 2 mins -func HealthCheck(vaultPool *types.VaultPool) { - log.Info("Starting health check...") - vaultPool.HealthCheck() - log.Info("Health check completed") -} // extracts the pods IP from the selected pods -func populateIpAddresses(podsList *v1.PodList, vaultPool *types.VaultPool) { - currentPodNames := make(map[string]struct{}) +func populateIpAddresses(podsList *v1.PodList) map[string]struct{} { + podNameAddressMap := make(map[string]struct{}) for _, pod := range podsList.Items { - currentPodNames[pod.Name] = struct{}{} if pod.Status.Phase == v1.PodRunning { - log.Printf("TODO: Pod %v is running\n", pod.Name) - log.Printf("TODO: adding the current pod(%v) and ip(%v)\n", pod.Name, pod.Status.PodIP) - // adding the currently discovered pod ips - if _, ok := globals.VaultIPList[pod.Name]; ok { - log.Printf("TODO: inside if _, ok := globals.VaultIPList[pod.Name]; ok { with the value ` %v ` \n", globals.VaultIPList[pod.Name]) - log.Infof("%v already added and configured", pod.Name) - } else { - log.Printf("TODO: inside if _, ok := globals.VaultIPList[pod.Name]; ok { with the value ` %v ` \n", globals.VaultIPList[pod.Name]) - log.Infof("%v adding and configuring", pod.Name) - globals.VaultIPList[pod.Name] = pod.Status.PodIP - log.Printf("TODO: now the value of the vaultiplist is %v \n", reflect.ValueOf(globals.VaultIPList).MapKeys()) - } - } - } - log.Printf("TODO: now Vault IP List data at the end of populate %v \n", reflect.ValueOf(globals.VaultIPList).MapKeys()) - - for historyPodName, ipAddress := range globals.VaultIPList { - log.Printf("TODO: inside the reconciliation loop with historyPodName: %v, ipAddress: %v \n",historyPodName, ipAddress) - log.Printf("TODO: currentPodNames value:= %v",currentPodNames) - if _, ok := currentPodNames[historyPodName]; !ok { - log.Printf("TODO: inside the if _, ok := currentPodNames[historyPodName]; with ok value= %v and currentPodNames[historyPodName]= %v \n",ok, currentPodNames[historyPodName] ) - // removing the obsolete pod and its details - log.Printf("TODO: removing the obsolete pod and its details") - delete(globals.VaultIPList, historyPodName) - log.Printf("TODO: RetireBackend %v", ipAddress) - vaultPool.RetireBackend(ipAddress) - log.Printf("TODO: after RetireBackend vaultpool", reflect.ValueOf(vaultPool).MapKeys()) + podNameAddressMap[pod.Status.PodIP] = struct{}{} } } + return podNameAddressMap } diff --git a/main.go b/main.go index 90788fc..b2ee370 100644 --- a/main.go +++ b/main.go @@ -9,7 +9,6 @@ import ( "net/http/httputil" "net/url" "os" - "reflect" "strconv" "strings" "time" @@ -27,12 +26,9 @@ func init() { version, _ := globals.K8s.GetVersion() log.Infof("Running in %v version of Kubernetes cluster in %s namespace", version, globals.Namespace) - label, avail := os.LookupEnv("VAULT_LABEL_SELECTOR") + labelSelector, avail = os.LookupEnv("VAULT_LABEL_SELECTOR") if !avail { log.Fatalf("No label selector has been provided. Please provide the label selector in `VAULT_LABEL_SELECTOR` key.") - } else { - globals.VaultIPList = make(map[string]string) - globals.LabelSelector = label } balancerPortStr, avail := os.LookupEnv("BALANCER_PORT") @@ -51,10 +47,12 @@ func init() { } var ( - VersionLogger = log.WithFields(log.Fields{"vlb_version": BuildVersion}) + versionLogger = log.WithFields(log.Fields{"vlb_version": BuildVersion}) BuildVersion = "dev" balancerPort int vaultPool types.VaultPool + labelSelector string + avail bool ) const ( @@ -71,23 +69,23 @@ func main() { Handler: http.HandlerFunc(loadBalance), } // - VersionLogger.Infof("Vault Balancer started and running at :%d", balancerPort) + versionLogger.Infof("Vault Balancer started and running at :%d", balancerPort) if err := server.ListenAndServe(); err != nil { - VersionLogger.Fatalf("error while starting the load balance, %v", err) + versionLogger.Fatalf("error while starting the load balance, %v", err) } } // startRoutine starts the routine work of collecting IPs, setting up reverse // proxies and doing health check. func startRoutine(context context.Context) { - VersionLogger.Info("Starting the routines for discovery, proxy setup and health check") - t := time.NewTicker(time.Second * 30) + versionLogger.Info("Starting the routines for discovery, proxy setup and health check") + t := time.NewTicker(time.Second * 10) for { select { case <-t.C: - helper.GetVaultIPsFromLabelSelectors(&vaultPool) - setUpProxies(&vaultPool) - helper.HealthCheck(&vaultPool) + ipAddressMap := helper.GetVaultIPsFromLabelSelectors(labelSelector, versionLogger) + setUpProxies(ipAddressMap) + healthCheck(vaultPool) } } } @@ -96,7 +94,7 @@ func startRoutine(context context.Context) { func loadBalance(w http.ResponseWriter, r *http.Request) { attempts := helper.GetAttemptsFromContext(r) if attempts > 3 { - log.Infof("%s(%s) Max attempts reached, terminating\n", r.RemoteAddr, r.URL.Path) + versionLogger.Infof("%s(%s) Max attempts reached, terminating\n", r.RemoteAddr, r.URL.Path) http.Error(w, "Service not available", http.StatusServiceUnavailable) return } @@ -110,44 +108,67 @@ func loadBalance(w http.ResponseWriter, r *http.Request) { } // setUpProxies will create the reverse proxies for the identified IPs -func setUpProxies(vaultPool *types.VaultPool) { - log.Infof("Setting up the reverse proxy for %v", reflect.ValueOf(globals.VaultIPList).MapKeys()) - for _, individualIP := range globals.VaultIPList { - sanitizedIP := strings.TrimSpace(individualIP) - vaultUrl, err := url.Parse("http://" + sanitizedIP + ProxyPath) - if err != nil { - log.Errorf("error occurred while converting string to URL for proxy path. error: %v", err) - } - healthUrl, _ := url.Parse("http://" + sanitizedIP + HealthCheckPath) - - proxy := httputil.NewSingleHostReverseProxy(vaultUrl) - proxy.ErrorHandler = func(writer http.ResponseWriter, request *http.Request, e error) { - log.Infof("[%s] %s\n", vaultUrl.Host, e.Error()) - retries := helper.GetRetryFromContext(request) - if retries < 3 { - select { - case <-time.After(5 * time.Millisecond): - ctx := context.WithValue(request.Context(), helper.Retry, retries+1) - proxy.ServeHTTP(writer, request.WithContext(ctx)) - } - return +func setUpProxies(serviceNameAndIP map[string]struct{}) { + for podIP, _ := range serviceNameAndIP { + if !vaultPool.IsInThePool(podIP) { + sanitizedIP := strings.TrimSpace(podIP) + vaultUrl, err := url.Parse("http://" + sanitizedIP + ProxyPath) + if err != nil { + versionLogger.Errorf("error occurred while converting string to URL for proxy path. error: %v", err) } + healthUrl, _ := url.Parse("http://" + sanitizedIP + HealthCheckPath) + + proxy := httputil.NewSingleHostReverseProxy(vaultUrl) + proxy.ErrorHandler = func(writer http.ResponseWriter, request *http.Request, e error) { + versionLogger.Infof("[%s] %s\n", vaultUrl.Host, e.Error()) + retries := helper.GetRetryFromContext(request) + if retries < 3 { + select { + case <-time.After(5 * time.Millisecond): + ctx := context.WithValue(request.Context(), helper.Retry, retries+1) + proxy.ServeHTTP(writer, request.WithContext(ctx)) + } + return + } - // mark the ip address as not alice after 3 attempts - vaultPool.MarkVaultPodStatus(vaultUrl, false) + // mark the ip address as not alice after 3 attempts + vaultPool.MarkVaultPodStatus(vaultUrl, false) - attempts := helper.GetAttemptsFromContext(request) - log.Infof("Retry attempt for the %s(%s): %d\n", request.RemoteAddr, request.URL.Path, attempts) - ctx := context.WithValue(request.Context(), helper.Attempts, attempts+1) - loadBalance(writer, request.WithContext(ctx)) + attempts := helper.GetAttemptsFromContext(request) + versionLogger.Infof("Retry attempt for the %s(%s): %d\n", request.RemoteAddr, request.URL.Path, attempts) + ctx := context.WithValue(request.Context(), helper.Attempts, attempts+1) + loadBalance(writer, request.WithContext(ctx)) + } + vaultPool.AddBackend(&types.VaultBackend{ + IP: sanitizedIP, + ProxyURL: vaultUrl, + HealthURL: healthUrl, + Alive: true, + ReverseProxy: proxy, + }) + versionLogger.Infof("The service IP %s has been configured", vaultUrl) + } else { + versionLogger.Infof("Pod IP %v is already configured.", podIP) } - vaultPool.AddBackend(&types.VaultBackend{ - IP: sanitizedIP, - ProxyURL: vaultUrl, - HealthURL: healthUrl, - Alive: true, - ReverseProxy: proxy, - }) - log.Infof("The service IP %s has been configured", vaultUrl) } + + var toBeRemoved []*types.VaultBackend + for _, b := range vaultPool.VaultBackends { + if _, ok := serviceNameAndIP[b.IP]; !ok { + toBeRemoved = append(toBeRemoved, b) + } + } + for _, b := range toBeRemoved { + versionLogger.Infof("Retiring the backed with IP %v from load balancing", b.IP) + vaultPool.RetireBackend(b) + } +} + + +// healthCheck runs a routine for check status of the pods every 2 mins +func healthCheck(vaultPool types.VaultPool) { + + versionLogger.Info("Starting health check...") + vaultPool.HealthCheck() + versionLogger.Info("Health check completed") } diff --git a/types/vault_pool_type.go b/types/vault_pool_type.go index 221a40a..5758902 100644 --- a/types/vault_pool_type.go +++ b/types/vault_pool_type.go @@ -22,14 +22,27 @@ func (vp *VaultPool) AddBackend(vaultBackend *VaultBackend) { vp.VaultBackends = append(vp.VaultBackends, vaultBackend) } +// RetireBackend removes the provided backend from load balancing +func (vp *VaultPool) RetireBackend(b *VaultBackend) { + for i := range vp.VaultBackends { + if vp.VaultBackends[i].IP == b.IP { + copy(vp.VaultBackends[i:], vp.VaultBackends[i+1:]) + vp.VaultBackends[len(vp.VaultBackends)-1] = nil + vp.VaultBackends = vp.VaultBackends[:len(vp.VaultBackends)-1] + } + } +} + // AddBackend to the existing vault pool -func (vp *VaultPool) RetireBackend(obsoleteIP string) { - for index, currBackend := range vp.VaultBackends { - if currBackend.IP == obsoleteIP { - log.Infof("Retiring the backend %v from list of active IPs", obsoleteIP) - vp.VaultBackends = append(vp.VaultBackends[:index], vp.VaultBackends[index+1:]...) +func (vp *VaultPool) IsInThePool(podIP string) bool { + if vp.VaultBackends != nil { + for _, b := range vp.VaultBackends { + if b.IP == podIP { + return true + } } } + return false } // NextIndex atomically increase the counter and return an index @@ -67,7 +80,7 @@ func (vp *VaultPool) GetNextPod() *VaultBackend { func (vp *VaultPool) HealthCheck() { for _, vaults := range vp.VaultBackends { status := "up" - alive := isBackendAlive(vaults.HealthURL) + alive := true//isBackendAlive(vaults.HealthURL) vaults.SetAlive(alive) if !alive { status = "down"