Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(server): keep retrying connections till node/pod is deleted #60

Merged
merged 2 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions relay-server/elasticsearch/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,17 +197,17 @@ func (ecl *ElasticsearchClient) PrintBulkStats() {
if biStats.NumFailed > 0 {
fmt.Printf(
"Indexed [%s] documents with [%s] errors in %s (%s docs/sec)",
humanize.Comma(int64(biStats.NumFlushed)),
humanize.Comma(int64(biStats.NumFailed)),
humanize.Commaf(float64(biStats.NumFlushed)),
humanize.Commaf(float64(biStats.NumFailed)),
dur.Truncate(time.Millisecond),
humanize.Comma(int64(1000.0/float64(dur/time.Millisecond)*float64(biStats.NumFlushed))),
humanize.Commaf(float64(1000.0/float64(dur/time.Millisecond)*float64(biStats.NumFlushed))),
)
} else {
log.Printf(
"Sucessfuly indexed [%s] documents in %s (%s docs/sec)",
humanize.Comma(int64(biStats.NumFlushed)),
humanize.Commaf(float64(biStats.NumFlushed)),
dur.Truncate(time.Millisecond),
humanize.Comma(int64(1000.0/float64(dur/time.Millisecond)*float64(biStats.NumFlushed))),
humanize.Commaf(float64(1000.0/float64(dur/time.Millisecond)*float64(biStats.NumFlushed))),
)
}
println(strings.Repeat("▔", 80))
Expand Down
36 changes: 28 additions & 8 deletions relay-server/server/k8sHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,18 +259,38 @@
_, _ = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if ok {
if pod.Status.PodIP != "" {
ipsChan <- pod.Status.PodIP
}
if !ok {
return
}

if pod.Status.PodIP != "" {
ipsChan <- pod.Status.PodIP
}
},
UpdateFunc: func(old, new interface{}) {

Check warning on line 270 in relay-server/server/k8sHandler.go

View workflow job for this annotation

GitHub Actions / go-lint

redefinition of the built-in function new
oldPod, ok := old.(*corev1.Pod)
if !ok {
return
}

newPod, ok := new.(*corev1.Pod)
if ok {
if newPod.Status.PodIP != "" {
ipsChan <- newPod.Status.PodIP
}
if !ok {
return
}

if newPod.Status.PodIP != "" && newPod.Status.PodIP != oldPod.Status.PodIP {
ipsChan <- newPod.Status.PodIP
DeleteClientEntry(oldPod.Status.PodIP)
}
},
DeleteFunc: func(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
return
}

if pod.Status.PodIP != "" {
DeleteClientEntry(pod.Status.PodIP)
}
},
})
Expand Down
136 changes: 67 additions & 69 deletions relay-server/server/relayServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,12 @@ var Running bool
var ClientList map[string]int

// ClientListLock Lock
var ClientListLock *sync.Mutex
var ClientListLock *sync.RWMutex

func init() {
Running = true
ClientList = map[string]int{}
ClientListLock = &sync.Mutex{}

ClientListLock = &sync.RWMutex{}
}

// ========== //
Expand Down Expand Up @@ -706,11 +705,7 @@ func DeleteClientEntry(nodeIP string) {
ClientListLock.Lock()
defer ClientListLock.Unlock()

_, exists := ClientList[nodeIP]

if exists {
delete(ClientList, nodeIP)
}
delete(ClientList, nodeIP)
}

// =============== //
Expand All @@ -722,66 +717,72 @@ func connectToKubeArmor(nodeIP, port string) error {
// create connection info
server := nodeIP + ":" + port

defer DeleteClientEntry(nodeIP)
for Running {
ClientListLock.RLock()
_, found := ClientList[nodeIP]
ClientListLock.RUnlock()
if !found {
// KubeArmor with this IP is deleted or the IP has changed
// parent function will spawn a new goroutine accordingly
break
}

// create a client
client := NewClient(server)
if client == nil {
return nil
}
// create a client
client := NewClient(server)
if client == nil {
time.Sleep(5 * time.Second) // wait for 5 second before retrying
continue
}

// do healthcheck
if ok := client.DoHealthCheck(); !ok {
kg.Warnf("Failed to check the liveness of KubeArmor's gRPC service (%s)", server)
return nil
}
kg.Printf("Checked the liveness of KubeArmor's gRPC service (%s)", server)
// do healthcheck
if ok := client.DoHealthCheck(); !ok {
kg.Warnf("Failed to check the liveness of KubeArmor's gRPC service (%s)", server)
time.Sleep(5 * time.Second) // wait for 5 second before retrying
continue
}
kg.Printf("Checked the liveness of KubeArmor's gRPC service (%s)", server)

var wg sync.WaitGroup
stop := make(chan struct{})
errCh := make(chan error, 1)
var wg sync.WaitGroup
stop := make(chan struct{})
errCh := make(chan error, 1)

// Start watching messages
wg.Add(1)
go func() {
client.WatchMessages(&wg, stop, errCh)
}()
kg.Print("Started to watch messages from " + server)
// Start watching messages
wg.Add(1)
go client.WatchMessages(&wg, stop, errCh)
kg.Print("Started to watch messages from " + server)

// Start watching alerts
wg.Add(1)
go func() {
client.WatchAlerts(&wg, stop, errCh)
}()
kg.Print("Started to watch alerts from " + server)
// Start watching alerts
wg.Add(1)
go client.WatchAlerts(&wg, stop, errCh)
kg.Print("Started to watch alerts from " + server)

// Start watching logs
wg.Add(1)
go func() {
client.WatchLogs(&wg, stop, errCh)
}()
kg.Print("Started to watch logs from " + server)

// Wait for an error or all goroutines to finish
select {
case err := <-errCh:
close(stop) // Stop other goroutines
kg.Warn(err.Error())
case <-func() chan struct{} {
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
return done
}():
// All goroutines finished without error
}
// Start watching logs
wg.Add(1)
go client.WatchLogs(&wg, stop, errCh)
kg.Print("Started to watch logs from " + server)

if err := client.DestroyClient(); err != nil {
kg.Warnf("Failed to destroy the client (%s) %s", server, err.Error())
// Wait for an error or all goroutines to finish
select {
case err := <-errCh:
close(stop) // Stop other goroutines
kg.Warn(err.Error())
case <-func() chan struct{} {
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
return done
}():
// All goroutines finished without error
}

if err := client.DestroyClient(); err != nil {
kg.Warnf("Failed to destroy the client (%s) %s", server, err.Error())
}

kg.Printf("Destroyed the client (%s)", server)
}
kg.Printf("Destroyed the client (%s)", server)

return nil
}
Expand Down Expand Up @@ -810,16 +811,13 @@ func (rs *RelayServer) GetFeedsFromNodes() {
}

for Running {
select {
case ip := <-ipsChan:
ClientListLock.Lock()
if _, ok := ClientList[ip]; !ok {
ClientList[ip] = 1
go connectToKubeArmor(ip, rs.Port)
}
ClientListLock.Unlock()
ip := <-ipsChan
ClientListLock.Lock()
if _, ok := ClientList[ip]; !ok {
ClientList[ip] = 1
go connectToKubeArmor(ip, rs.Port)
}
time.Sleep(10 * time.Second)
ClientListLock.Unlock()
}
}
}
Loading