Skip to content

Commit

Permalink
feat(abort): shifting chaosengine updation & event creation to experi…
Browse files Browse the repository at this point in the history
…ment pod (#294)

Signed-off-by: shubhamchaudhary <[email protected]>
  • Loading branch information
ispeakc0de authored Feb 24, 2021
1 parent fafec1a commit ed38c30
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 96 deletions.
208 changes: 112 additions & 96 deletions chaoslib/litmus/network-chaos/helper/netem.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,26 @@ import (
"syscall"
"time"

"github.com/litmuschaos/litmus-go/chaoslib/litmus/network_latency/tc"
clients "github.com/litmuschaos/litmus-go/pkg/clients"
"github.com/litmuschaos/litmus-go/pkg/events"
experimentEnv "github.com/litmuschaos/litmus-go/pkg/generic/network-chaos/environment"
experimentTypes "github.com/litmuschaos/litmus-go/pkg/generic/network-chaos/types"
"github.com/litmuschaos/litmus-go/pkg/log"
"github.com/litmuschaos/litmus-go/pkg/result"
"github.com/litmuschaos/litmus-go/pkg/types"
"github.com/litmuschaos/litmus-go/pkg/utils/common"
"github.com/pkg/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientTypes "k8s.io/apimachinery/pkg/types"
)

const (
qdiscNotFound = "Cannot delete qdisc with handle of zero"
qdiscNoFileFound = "RTNETLINK answers: No such file or directory"
)

var err error
var inject, abort chan os.Signal

func main() {

Expand All @@ -34,6 +40,16 @@ func main() {
chaosDetails := types.ChaosDetails{}
resultDetails := types.ResultDetails{}

// inject channel is used to transmit signal notifications.
inject = make(chan os.Signal, 1)
// Catch and relay certain signal(s) to inject channel.
signal.Notify(inject, os.Interrupt, syscall.SIGTERM, syscall.SIGKILL)

// abort channel is used to transmit signal notifications.
abort = make(chan os.Signal, 1)
// Catch and relay certain signal(s) to abort channel.
signal.Notify(abort, os.Interrupt, syscall.SIGTERM, syscall.SIGKILL)

//Getting kubeConfig and Generate ClientSets
if err := clients.GenerateClientSetFromKubeConfig(); err != nil {
log.Fatalf("Unable to Get the kubeconfig, err: %v", err)
Expand Down Expand Up @@ -79,8 +95,8 @@ func PreparePodNetworkChaos(experimentsDetails *experimentTypes.ExperimentDetail
events.GenerateEvents(eventsDetails, clients, chaosDetails, "ChaosEngine")
}

var endTime <-chan time.Time
timeDelay := time.Duration(experimentsDetails.ChaosDuration) * time.Second
// watching for the abort signal and revert the chaos
go abortWatcher(targetPID)

// injecting network chaos inside target container
if err = InjectChaos(experimentsDetails, targetPID); err != nil {
Expand All @@ -89,46 +105,12 @@ func PreparePodNetworkChaos(experimentsDetails *experimentTypes.ExperimentDetail

log.Infof("[Chaos]: Waiting for %vs", experimentsDetails.ChaosDuration)

// signChan channel is used to transmit signal notifications.
signChan := make(chan os.Signal, 1)
// Catch and relay certain signal(s) to signChan channel.
signal.Notify(signChan, os.Interrupt, syscall.SIGTERM, syscall.SIGKILL)

loop:
for {
endTime = time.After(timeDelay)
select {
case <-signChan:
log.Info("[Chaos]: Killing process started because of terminated signal received")
if err = tc.Killnetem(targetPID); err != nil {
log.Errorf("unable to kill netem process, err :%v", err)

}
// updating the chaosresult after stopped
failStep := "Network Chaos injection stopped!"
types.SetResultAfterCompletion(resultDetails, "Stopped", "Stopped", failStep)
result.ChaosResult(chaosDetails, clients, resultDetails, "EOT")

// generating summary event in chaosengine
msg := experimentsDetails.ExperimentName + " experiment has been aborted"
types.SetEngineEventAttributes(eventsDetails, types.Summary, msg, "Warning", chaosDetails)
events.GenerateEvents(eventsDetails, clients, chaosDetails, "ChaosEngine")

// generating summary event in chaosresult
types.SetResultEventAttributes(eventsDetails, types.StoppedVerdict, msg, "Warning", resultDetails)
events.GenerateEvents(eventsDetails, clients, chaosDetails, "ChaosResult")
os.Exit(1)
case <-endTime:
log.Infof("[Chaos]: Time is up for experiment: %v", experimentsDetails.ExperimentName)
endTime = nil
break loop
}
}
common.WaitForDuration(experimentsDetails.ChaosDuration)

log.Info("[Chaos]: Stopping the experiment")

// cleaning the netem process after chaos injection
if err = tc.Killnetem(targetPID); err != nil {
if err = Killnetem(targetPID); err != nil {
return err
}

Expand Down Expand Up @@ -307,68 +289,75 @@ func InjectChaos(experimentDetails *experimentTypes.ExperimentDetails, pid int)
netemCommands := os.Getenv("NETEM_COMMAND")
destinationIPs := os.Getenv("DESTINATION_IPS")

if destinationIPs == "" {
tc := fmt.Sprintf("sudo nsenter -t %d -n tc qdisc add dev %s root netem %v", pid, experimentDetails.NetworkInterface, netemCommands)
cmd := exec.Command("/bin/bash", "-c", tc)
out, err := cmd.CombinedOutput()
log.Info(cmd.String())
if err != nil {
log.Error(string(out))
return err
}
} else {

ips := strings.Split(destinationIPs, ",")
var uniqueIps []string
select {
case <-inject:
// stopping the chaos execution, if abort signal recieved
os.Exit(1)
default:

// removing duplicates ips from the list, if any
for i := range ips {
isPresent := false
for j := range uniqueIps {
if ips[i] == uniqueIps[j] {
isPresent = true
}
}
if !isPresent {
uniqueIps = append(uniqueIps, ips[i])
if destinationIPs == "" {
tc := fmt.Sprintf("sudo nsenter -t %d -n tc qdisc add dev %s root netem %v", pid, experimentDetails.NetworkInterface, netemCommands)
cmd := exec.Command("/bin/bash", "-c", tc)
out, err := cmd.CombinedOutput()
log.Info(cmd.String())
if err != nil {
log.Error(string(out))
return err
}
} else {

ips := strings.Split(destinationIPs, ",")
var uniqueIps []string

// removing duplicates ips from the list, if any
for i := range ips {
isPresent := false
for j := range uniqueIps {
if ips[i] == uniqueIps[j] {
isPresent = true
}
}
if !isPresent {
uniqueIps = append(uniqueIps, ips[i])
}

}
}

// Create a priority-based queue
// This instantly creates classes 1:1, 1:2, 1:3
priority := fmt.Sprintf("sudo nsenter -t %v -n tc qdisc add dev %v root handle 1: prio", pid, experimentDetails.NetworkInterface)
cmd := exec.Command("/bin/bash", "-c", priority)
out, err := cmd.CombinedOutput()
log.Info(cmd.String())
if err != nil {
log.Error(string(out))
return err
}
// Create a priority-based queue
// This instantly creates classes 1:1, 1:2, 1:3
priority := fmt.Sprintf("sudo nsenter -t %v -n tc qdisc add dev %v root handle 1: prio", pid, experimentDetails.NetworkInterface)
cmd := exec.Command("/bin/bash", "-c", priority)
out, err := cmd.CombinedOutput()
log.Info(cmd.String())
if err != nil {
log.Error(string(out))
return err
}

// Add queueing discipline for 1:3 class.
// No traffic is going through 1:3 yet
traffic := fmt.Sprintf("sudo nsenter -t %v -n tc qdisc add dev %v parent 1:3 netem %v", pid, experimentDetails.NetworkInterface, netemCommands)
cmd = exec.Command("/bin/bash", "-c", traffic)
out, err = cmd.CombinedOutput()
log.Info(cmd.String())
if err != nil {
log.Error(string(out))
return err
}
// Add queueing discipline for 1:3 class.
// No traffic is going through 1:3 yet
traffic := fmt.Sprintf("sudo nsenter -t %v -n tc qdisc add dev %v parent 1:3 netem %v", pid, experimentDetails.NetworkInterface, netemCommands)
cmd = exec.Command("/bin/bash", "-c", traffic)
out, err = cmd.CombinedOutput()
log.Info(cmd.String())
if err != nil {
log.Error(string(out))
return err
}

for _, ip := range uniqueIps {

// redirect traffic to specific IP through band 3
// It allows ipv4 addresses only
if !strings.Contains(ip, ":") {
tc := fmt.Sprintf("sudo nsenter -t %v -n tc filter add dev %v protocol ip parent 1:0 prio 3 u32 match ip dst %v flowid 1:3", pid, experimentDetails.NetworkInterface, ip)
cmd = exec.Command("/bin/bash", "-c", tc)
out, err = cmd.CombinedOutput()
log.Info(cmd.String())
if err != nil {
log.Error(string(out))
return err
for _, ip := range uniqueIps {

// redirect traffic to specific IP through band 3
// It allows ipv4 addresses only
if !strings.Contains(ip, ":") {
tc := fmt.Sprintf("sudo nsenter -t %v -n tc filter add dev %v protocol ip parent 1:0 prio 3 u32 match ip dst %v flowid 1:3", pid, experimentDetails.NetworkInterface, ip)
cmd = exec.Command("/bin/bash", "-c", tc)
out, err = cmd.CombinedOutput()
log.Info(cmd.String())
if err != nil {
log.Error(string(out))
return err
}
}
}
}
Expand All @@ -386,9 +375,13 @@ func Killnetem(PID int) error {

if err != nil {
log.Error(string(out))
// ignoring err if qdisc process doesn't exist inside the target container
if strings.Contains(string(out), qdiscNotFound) || strings.Contains(string(out), qdiscNoFileFound) {
log.Warn("The network chaos process has already been removed")
return nil
}
return err
}

return nil
}

Expand Down Expand Up @@ -418,3 +411,26 @@ func Getenv(key string, defaultValue string) string {
}
return value
}

// abortWatcher continuosly watch for the abort signals
func abortWatcher(targetPID int) {

for {
select {
case <-abort:
log.Info("[Chaos]: Killing process started because of terminated signal received")
log.Info("Chaos Revert Started")
// retry thrice for the chaos revert
retry := 3
for retry > 0 {
if err = Killnetem(targetPID); err != nil {
log.Errorf("unable to kill netem process, err :%v", err)
}
retry--
time.Sleep(1 * time.Second)
}
log.Info("Chaos Revert Completed")
os.Exit(1)
}
}
}
38 changes: 38 additions & 0 deletions chaoslib/litmus/network-chaos/lib/network-chaos.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ package lib

import (
"net"
"os"
"os/signal"
"strconv"
"strings"
"syscall"

clients "github.com/litmuschaos/litmus-go/pkg/clients"
"github.com/litmuschaos/litmus-go/pkg/events"
experimentTypes "github.com/litmuschaos/litmus-go/pkg/generic/network-chaos/types"
"github.com/litmuschaos/litmus-go/pkg/log"
"github.com/litmuschaos/litmus-go/pkg/result"
"github.com/litmuschaos/litmus-go/pkg/status"
"github.com/litmuschaos/litmus-go/pkg/types"
"github.com/litmuschaos/litmus-go/pkg/utils/common"
Expand Down Expand Up @@ -70,6 +75,8 @@ func PrepareAndInjectChaos(experimentsDetails *experimentTypes.ExperimentDetails
}
}

go abortWatcher(resultDetails, chaosDetails, clients, eventsDetails, experimentsDetails)

if experimentsDetails.Sequence == "serial" {
if err = InjectChaosInSerialMode(experimentsDetails, targetPodList, clients, chaosDetails, args); err != nil {
return err
Expand Down Expand Up @@ -371,3 +378,34 @@ func GetIpsForTargetHosts(targetHosts string) string {
}
return strings.Join(commaSeparatedIPs, ",")
}

// abortWatcher continuosly watch for the abort signals
// it will update the chaosresult
func abortWatcher(resultDetails *types.ResultDetails, chaosDetails *types.ChaosDetails, clients clients.ClientSets, eventsDetails *types.EventDetails, experimentsDetails *experimentTypes.ExperimentDetails) {

// signChan channel is used to transmit signal notifications.
signChan := make(chan os.Signal, 1)
// Catch and relay certain signal(s) to signChan channel.
signal.Notify(signChan, os.Interrupt, syscall.SIGTERM, syscall.SIGKILL)

for {
select {
case <-signChan:
log.Info("termination signal recieved, updating chaos status")
// updating the chaosresult after stopped
failStep := "Network Chaos injection stopped!"
types.SetResultAfterCompletion(resultDetails, "Stopped", "Stopped", failStep)
result.ChaosResult(chaosDetails, clients, resultDetails, "EOT")

// generating summary event in chaosengine
msg := experimentsDetails.ExperimentName + " experiment has been aborted"
types.SetEngineEventAttributes(eventsDetails, types.Summary, msg, "Warning", chaosDetails)
events.GenerateEvents(eventsDetails, clients, chaosDetails, "ChaosEngine")

// generating summary event in chaosresult
types.SetResultEventAttributes(eventsDetails, types.StoppedVerdict, msg, "Warning", resultDetails)
events.GenerateEvents(eventsDetails, clients, chaosDetails, "ChaosResult")
os.Exit(1)
}
}
}

0 comments on commit ed38c30

Please sign in to comment.