Skip to content

Commit

Permalink
Merge pull request #15 from gkarthiks/feat/re-factore
Browse files Browse the repository at this point in the history
chore: re-factored code
  • Loading branch information
gkarthiks authored Jul 28, 2020
2 parents 10780ab + 13342fa commit c5fbeb7
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 106 deletions.
6 changes: 3 additions & 3 deletions globals/globalVars.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package globals

import discovery "github.com/gkarthiks/k8s-discovery"
import (
discovery "github.com/gkarthiks/k8s-discovery"
)

var (
VaultIPList map[string]string
K8s *discovery.K8s
Namespace string
HttpTimeout string
LabelSelector string
)

const (
Expand Down
62 changes: 14 additions & 48 deletions helper/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"reflect"
"strings"
"vault-balancer/globals"
"vault-balancer/types"
)

const (
Expand All @@ -18,23 +17,22 @@ const (
)

// GetVaultIPsFromLabelSelectors will extract the IP Addresses for the pods that matches the labelSelectors
func GetVaultIPsFromLabelSelectors(vaultPool *types.VaultPool) {
if len(globals.LabelSelector) > 0 {
log.Infof("Discovering the Vault pods based on the label selector '%v'.", globals.LabelSelector)
strings.Split(globals.LabelSelector, ",")
log.Infof("Discovering pods with label selector...")
func GetVaultIPsFromLabelSelectors(labelSelector string, versionLogger *log.Entry) map[string]struct{} {
if len(labelSelector) > 0 {
labelSelector = strings.Join(strings.Split(labelSelector, ","), ",")
versionLogger.Infof("Discovering the Vault pods based on the label selector '%v'.", labelSelector)
pods, err := globals.K8s.Clientset.CoreV1().Pods(globals.Namespace).List(context.Background(), metaV1.ListOptions{
LabelSelector: strings.TrimSpace(globals.LabelSelector),
LabelSelector: strings.TrimSpace(labelSelector),
})
if err != nil {
log.Fatalf("err while retrieving the pods: %v", err)
versionLogger.Fatalf("err while retrieving the pods: %v", err)
} else {
populateIpAddresses(pods, vaultPool)
ipAddresses := populateIpAddresses(pods)
versionLogger.Infof("Finalized pods discovery process with label selector. Obtained the IP Address %v", reflect.ValueOf(ipAddresses).MapKeys())
return ipAddresses
}
log.Infof("Finalized pods discovery process with label selector. Obtained the IP Address %v", reflect.ValueOf(globals.VaultIPList).MapKeys())
}

log.Printf("Vault Pool data at the end of GetVault IPs %v", reflect.ValueOf(&vaultPool.VaultBackends).MapKeys())
return nil
}

// GetAttemptsFromContext returns the attempts for a request
Expand All @@ -53,46 +51,14 @@ func GetRetryFromContext(r *http.Request) int {
return 0
}

// HealthCheck runs a routine for check status of the pods every 2 mins
func HealthCheck(vaultPool *types.VaultPool) {
log.Info("Starting health check...")
vaultPool.HealthCheck()
log.Info("Health check completed")
}

// extracts the pods IP from the selected pods
func populateIpAddresses(podsList *v1.PodList, vaultPool *types.VaultPool) {
currentPodNames := make(map[string]struct{})
func populateIpAddresses(podsList *v1.PodList) map[string]struct{} {
podNameAddressMap := make(map[string]struct{})
for _, pod := range podsList.Items {
currentPodNames[pod.Name] = struct{}{}
if pod.Status.Phase == v1.PodRunning {
log.Printf("TODO: Pod %v is running\n", pod.Name)
log.Printf("TODO: adding the current pod(%v) and ip(%v)\n", pod.Name, pod.Status.PodIP)
// adding the currently discovered pod ips
if _, ok := globals.VaultIPList[pod.Name]; ok {
log.Printf("TODO: inside if _, ok := globals.VaultIPList[pod.Name]; ok { with the value ` %v ` \n", globals.VaultIPList[pod.Name])
log.Infof("%v already added and configured", pod.Name)
} else {
log.Printf("TODO: inside if _, ok := globals.VaultIPList[pod.Name]; ok { with the value ` %v ` \n", globals.VaultIPList[pod.Name])
log.Infof("%v adding and configuring", pod.Name)
globals.VaultIPList[pod.Name] = pod.Status.PodIP
log.Printf("TODO: now the value of the vaultiplist is %v \n", reflect.ValueOf(globals.VaultIPList).MapKeys())
}
}
}
log.Printf("TODO: now Vault IP List data at the end of populate %v \n", reflect.ValueOf(globals.VaultIPList).MapKeys())

for historyPodName, ipAddress := range globals.VaultIPList {
log.Printf("TODO: inside the reconciliation loop with historyPodName: %v, ipAddress: %v \n",historyPodName, ipAddress)
log.Printf("TODO: currentPodNames value:= %v",currentPodNames)
if _, ok := currentPodNames[historyPodName]; !ok {
log.Printf("TODO: inside the if _, ok := currentPodNames[historyPodName]; with ok value= %v and currentPodNames[historyPodName]= %v \n",ok, currentPodNames[historyPodName] )
// removing the obsolete pod and its details
log.Printf("TODO: removing the obsolete pod and its details")
delete(globals.VaultIPList, historyPodName)
log.Printf("TODO: RetireBackend %v", ipAddress)
vaultPool.RetireBackend(ipAddress)
log.Printf("TODO: after RetireBackend vaultpool", reflect.ValueOf(vaultPool).MapKeys())
podNameAddressMap[pod.Status.PodIP] = struct{}{}
}
}
return podNameAddressMap
}
119 changes: 70 additions & 49 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"net/http/httputil"
"net/url"
"os"
"reflect"
"strconv"
"strings"
"time"
Expand All @@ -27,12 +26,9 @@ func init() {
version, _ := globals.K8s.GetVersion()
log.Infof("Running in %v version of Kubernetes cluster in %s namespace", version, globals.Namespace)

label, avail := os.LookupEnv("VAULT_LABEL_SELECTOR")
labelSelector, avail = os.LookupEnv("VAULT_LABEL_SELECTOR")
if !avail {
log.Fatalf("No label selector has been provided. Please provide the label selector in `VAULT_LABEL_SELECTOR` key.")
} else {
globals.VaultIPList = make(map[string]string)
globals.LabelSelector = label
}

balancerPortStr, avail := os.LookupEnv("BALANCER_PORT")
Expand All @@ -51,10 +47,12 @@ func init() {
}

var (
VersionLogger = log.WithFields(log.Fields{"vlb_version": BuildVersion})
versionLogger = log.WithFields(log.Fields{"vlb_version": BuildVersion})
BuildVersion = "dev"
balancerPort int
vaultPool types.VaultPool
labelSelector string
avail bool
)

const (
Expand All @@ -71,23 +69,23 @@ func main() {
Handler: http.HandlerFunc(loadBalance),
}
//
VersionLogger.Infof("Vault Balancer started and running at :%d", balancerPort)
versionLogger.Infof("Vault Balancer started and running at :%d", balancerPort)
if err := server.ListenAndServe(); err != nil {
VersionLogger.Fatalf("error while starting the load balance, %v", err)
versionLogger.Fatalf("error while starting the load balance, %v", err)
}
}

// startRoutine starts the routine work of collecting IPs, setting up reverse
// proxies and doing health check.
func startRoutine(context context.Context) {
VersionLogger.Info("Starting the routines for discovery, proxy setup and health check")
t := time.NewTicker(time.Second * 30)
versionLogger.Info("Starting the routines for discovery, proxy setup and health check")
t := time.NewTicker(time.Second * 10)
for {
select {
case <-t.C:
helper.GetVaultIPsFromLabelSelectors(&vaultPool)
setUpProxies(&vaultPool)
helper.HealthCheck(&vaultPool)
ipAddressMap := helper.GetVaultIPsFromLabelSelectors(labelSelector, versionLogger)
setUpProxies(ipAddressMap)
healthCheck(vaultPool)
}
}
}
Expand All @@ -96,7 +94,7 @@ func startRoutine(context context.Context) {
func loadBalance(w http.ResponseWriter, r *http.Request) {
attempts := helper.GetAttemptsFromContext(r)
if attempts > 3 {
log.Infof("%s(%s) Max attempts reached, terminating\n", r.RemoteAddr, r.URL.Path)
versionLogger.Infof("%s(%s) Max attempts reached, terminating\n", r.RemoteAddr, r.URL.Path)
http.Error(w, "Service not available", http.StatusServiceUnavailable)
return
}
Expand All @@ -110,44 +108,67 @@ func loadBalance(w http.ResponseWriter, r *http.Request) {
}

// setUpProxies will create the reverse proxies for the identified IPs
func setUpProxies(vaultPool *types.VaultPool) {
log.Infof("Setting up the reverse proxy for %v", reflect.ValueOf(globals.VaultIPList).MapKeys())
for _, individualIP := range globals.VaultIPList {
sanitizedIP := strings.TrimSpace(individualIP)
vaultUrl, err := url.Parse("http://" + sanitizedIP + ProxyPath)
if err != nil {
log.Errorf("error occurred while converting string to URL for proxy path. error: %v", err)
}
healthUrl, _ := url.Parse("http://" + sanitizedIP + HealthCheckPath)

proxy := httputil.NewSingleHostReverseProxy(vaultUrl)
proxy.ErrorHandler = func(writer http.ResponseWriter, request *http.Request, e error) {
log.Infof("[%s] %s\n", vaultUrl.Host, e.Error())
retries := helper.GetRetryFromContext(request)
if retries < 3 {
select {
case <-time.After(5 * time.Millisecond):
ctx := context.WithValue(request.Context(), helper.Retry, retries+1)
proxy.ServeHTTP(writer, request.WithContext(ctx))
}
return
func setUpProxies(serviceNameAndIP map[string]struct{}) {
for podIP, _ := range serviceNameAndIP {
if !vaultPool.IsInThePool(podIP) {
sanitizedIP := strings.TrimSpace(podIP)
vaultUrl, err := url.Parse("http://" + sanitizedIP + ProxyPath)
if err != nil {
versionLogger.Errorf("error occurred while converting string to URL for proxy path. error: %v", err)
}
healthUrl, _ := url.Parse("http://" + sanitizedIP + HealthCheckPath)

proxy := httputil.NewSingleHostReverseProxy(vaultUrl)
proxy.ErrorHandler = func(writer http.ResponseWriter, request *http.Request, e error) {
versionLogger.Infof("[%s] %s\n", vaultUrl.Host, e.Error())
retries := helper.GetRetryFromContext(request)
if retries < 3 {
select {
case <-time.After(5 * time.Millisecond):
ctx := context.WithValue(request.Context(), helper.Retry, retries+1)
proxy.ServeHTTP(writer, request.WithContext(ctx))
}
return
}

// mark the ip address as not alice after 3 attempts
vaultPool.MarkVaultPodStatus(vaultUrl, false)
// mark the ip address as not alice after 3 attempts
vaultPool.MarkVaultPodStatus(vaultUrl, false)

attempts := helper.GetAttemptsFromContext(request)
log.Infof("Retry attempt for the %s(%s): %d\n", request.RemoteAddr, request.URL.Path, attempts)
ctx := context.WithValue(request.Context(), helper.Attempts, attempts+1)
loadBalance(writer, request.WithContext(ctx))
attempts := helper.GetAttemptsFromContext(request)
versionLogger.Infof("Retry attempt for the %s(%s): %d\n", request.RemoteAddr, request.URL.Path, attempts)
ctx := context.WithValue(request.Context(), helper.Attempts, attempts+1)
loadBalance(writer, request.WithContext(ctx))
}
vaultPool.AddBackend(&types.VaultBackend{
IP: sanitizedIP,
ProxyURL: vaultUrl,
HealthURL: healthUrl,
Alive: true,
ReverseProxy: proxy,
})
versionLogger.Infof("The service IP %s has been configured", vaultUrl)
} else {
versionLogger.Infof("Pod IP %v is already configured.", podIP)
}
vaultPool.AddBackend(&types.VaultBackend{
IP: sanitizedIP,
ProxyURL: vaultUrl,
HealthURL: healthUrl,
Alive: true,
ReverseProxy: proxy,
})
log.Infof("The service IP %s has been configured", vaultUrl)
}

var toBeRemoved []*types.VaultBackend
for _, b := range vaultPool.VaultBackends {
if _, ok := serviceNameAndIP[b.IP]; !ok {
toBeRemoved = append(toBeRemoved, b)
}
}
for _, b := range toBeRemoved {
versionLogger.Infof("Retiring the backed with IP %v from load balancing", b.IP)
vaultPool.RetireBackend(b)
}
}


// healthCheck runs a routine for check status of the pods every 2 mins
func healthCheck(vaultPool types.VaultPool) {

versionLogger.Info("Starting health check...")
vaultPool.HealthCheck()
versionLogger.Info("Health check completed")
}
25 changes: 19 additions & 6 deletions types/vault_pool_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,27 @@ func (vp *VaultPool) AddBackend(vaultBackend *VaultBackend) {
vp.VaultBackends = append(vp.VaultBackends, vaultBackend)
}

// RetireBackend removes the provided backend from load balancing
func (vp *VaultPool) RetireBackend(b *VaultBackend) {
for i := range vp.VaultBackends {
if vp.VaultBackends[i].IP == b.IP {
copy(vp.VaultBackends[i:], vp.VaultBackends[i+1:])
vp.VaultBackends[len(vp.VaultBackends)-1] = nil
vp.VaultBackends = vp.VaultBackends[:len(vp.VaultBackends)-1]
}
}
}

// AddBackend to the existing vault pool
func (vp *VaultPool) RetireBackend(obsoleteIP string) {
for index, currBackend := range vp.VaultBackends {
if currBackend.IP == obsoleteIP {
log.Infof("Retiring the backend %v from list of active IPs", obsoleteIP)
vp.VaultBackends = append(vp.VaultBackends[:index], vp.VaultBackends[index+1:]...)
func (vp *VaultPool) IsInThePool(podIP string) bool {
if vp.VaultBackends != nil {
for _, b := range vp.VaultBackends {
if b.IP == podIP {
return true
}
}
}
return false
}

// NextIndex atomically increase the counter and return an index
Expand Down Expand Up @@ -67,7 +80,7 @@ func (vp *VaultPool) GetNextPod() *VaultBackend {
func (vp *VaultPool) HealthCheck() {
for _, vaults := range vp.VaultBackends {
status := "up"
alive := isBackendAlive(vaults.HealthURL)
alive := true//isBackendAlive(vaults.HealthURL)
vaults.SetAlive(alive)
if !alive {
status = "down"
Expand Down

0 comments on commit c5fbeb7

Please sign in to comment.