Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
Signed-off-by: souravbiswassanto <[email protected]>
  • Loading branch information
souravbiswassanto committed Oct 30, 2024
1 parent 3605624 commit e0a5cae
Show file tree
Hide file tree
Showing 3 changed files with 318 additions and 235 deletions.
124 changes: 124 additions & 0 deletions pkg/controllers/apps/pod.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package apps

import (
"context"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/klog/v2"
cu "kmodules.xyz/client-go/client"
core_util "kmodules.xyz/client-go/core/v1"
appsv1alpha1 "kubeops.dev/sidekick/apis/apps/v1alpha1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func (r *SidekickReconciler) removePodFinalizerIfMarkedForDeletion(ctx context.Context, req ctrl.Request) (bool, error) {
var pod corev1.Pod
err := r.Get(ctx, req.NamespacedName, &pod)
if err != nil && !errors.IsNotFound(err) {
return false, err
}

if err == nil && pod.DeletionTimestamp != nil {
// Increase the failureCount if the pod was terminated externally
// if the pod was terminated externally, then it will not have
// deletionInitiatorKey set in its annotations

_, exists := pod.ObjectMeta.Annotations[deletionInitiatorKey]
hash, exists1 := pod.ObjectMeta.Annotations[podHash]
if !exists {
var sk appsv1alpha1.Sidekick
err = r.Get(ctx, req.NamespacedName, &sk)
if err != nil && !errors.IsNotFound(err) {
return false, err
}
// if sidekick is not found or it is in deletion state,
// ignore updating failureCount in this case

if err == nil && sk.DeletionTimestamp == nil && exists1 {
if sk.Status.FailureCount == nil {
sk.Status.FailureCount = make(map[string]bool)
}
sk.Status.FailureCount[hash] = true
err = r.updateSidekickStatus(ctx, &sk)
if err != nil {
return false, err
}
}
}

// removing finalizer, the reason behind adding this finalizer is stated below
// where we created the pod
if core_util.HasFinalizer(pod.ObjectMeta, getFinalizerName()) {
err = r.removePodFinalizer(ctx, &pod)
if err != nil {
return false, err
}
return true, nil
}
}
return false, nil
}

func (r *SidekickReconciler) updatePodFailureCount(ctx context.Context, sidekick *appsv1alpha1.Sidekick, pod *corev1.Pod, status bool) error {
if pod.Status.Phase != corev1.PodFailed && status {
return nil
}
hash, exists := pod.Annotations[podHash]
if !exists {
return nil
}
sidekick.Status.FailureCount[hash] = status
return r.updateSidekickStatus(ctx, sidekick)
}

func (r *SidekickReconciler) deletePod(ctx context.Context, pod *corev1.Pod) error {
err := r.setDeletionInitiatorAnnotation(ctx, pod)
if err != nil {
return err
}
return r.Delete(ctx, pod)
}

func getContainerRestartCounts(pod *corev1.Pod) int32 {
restartCounter := int32(0)
for _, cs := range pod.Status.ContainerStatuses {
restartCounter += cs.RestartCount
}
for _, ics := range pod.Status.InitContainerStatuses {
restartCounter += ics.RestartCount
}
return restartCounter
}

func getTotalContainerRestartCounts(sidekick *appsv1alpha1.Sidekick) int32 {
totalContainerRestartCount := int32(0)
for _, value := range sidekick.Status.ContainerRestartCountsPerPod {
totalContainerRestartCount += value
}
return totalContainerRestartCount
}

func (r *SidekickReconciler) setDeletionInitiatorAnnotation(ctx context.Context, pod *corev1.Pod) error {
_, err := cu.CreateOrPatch(ctx, r.Client, pod,
func(in client.Object, createOp bool) client.Object {
po := in.(*corev1.Pod)
po.ObjectMeta.Annotations[deletionInitiatorKey] = deletionInitiatesBySidekickOperator
return po
},
)
klog.Infoln("pod annotation set: ", pod.Annotations)

return err
}

func (r *SidekickReconciler) removePodFinalizer(ctx context.Context, pod *corev1.Pod) error {
_, err := cu.CreateOrPatch(ctx, r.Client, pod,
func(in client.Object, createOp bool) client.Object {
po := in.(*corev1.Pod)
po.ObjectMeta = core_util.RemoveFinalizer(po.ObjectMeta, getFinalizerName())
return po
},
)
return err
}
172 changes: 172 additions & 0 deletions pkg/controllers/apps/sidekick.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package apps

import (
"context"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
cu "kmodules.xyz/client-go/client"
core_util "kmodules.xyz/client-go/core/v1"
appsv1alpha1 "kubeops.dev/sidekick/apis/apps/v1alpha1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func (r *SidekickReconciler) handleSidekickFinalizer(ctx context.Context, sidekick *appsv1alpha1.Sidekick) error {
if sidekick.DeletionTimestamp != nil {
if core_util.HasFinalizer(sidekick.ObjectMeta, getFinalizerName()) {
return r.terminate(ctx, sidekick)
}
}

_, err := cu.CreateOrPatch(ctx, r.Client, sidekick,
func(in client.Object, createOp bool) client.Object {
sk := in.(*appsv1alpha1.Sidekick)
sk.ObjectMeta = core_util.AddFinalizer(sk.ObjectMeta, getFinalizerName())
return sk
},
)
return err
}

func (r *SidekickReconciler) updateSidekickPhase(ctx context.Context, req ctrl.Request, sidekick *appsv1alpha1.Sidekick) (bool, error) {
// TODO: currently not setting pending phase

phase, err := r.calculateSidekickPhase(ctx, sidekick)
if err != nil {
return false, err
}
klog.Infoln("5", phase)

if phase == appsv1alpha1.SideKickPhaseFailed {
var pod corev1.Pod
if err = r.Get(ctx, req.NamespacedName, &pod); err != nil {
return true, client.IgnoreNotFound(err)
}
if pod.Status.Phase == corev1.PodRunning {
pod.Status.Phase = corev1.PodFailed
// Note: (taken from https://kubernetes.io/docs/concepts/workloads/controllers/job/#pod-backoff-failure-policy )
// If your sidekick has restartPolicy = "OnFailure", keep in mind that your Pod running the Job will be
// terminated once the job backoff limit has been reached. This can make debugging the Job's executable
// more difficult. We suggest setting restartPolicy = "Never" when debugging the Job or using a logging
// system to ensure output from failed Jobs is not lost inadvertently.
err = r.Status().Update(ctx, &pod)
if err != nil {
return false, err
}
}
// increasing backOffLimit or changing restartPolicy will change
// the sidekick phase from failed to current
return true, nil
}
if sidekick.Status.Phase == appsv1alpha1.SidekickPhaseSucceeded {
return true, nil
}
return false, nil
}

func (r *SidekickReconciler) updateSidekickStatus(ctx context.Context, sidekick *appsv1alpha1.Sidekick) error {
_, err := cu.PatchStatus(ctx, r.Client, sidekick, func(obj client.Object) client.Object {
sk := obj.(*appsv1alpha1.Sidekick)
sk.Status = sidekick.Status
return sk
})
return err
}

func getFailureCountFromSidekickStatus(sidekick *appsv1alpha1.Sidekick) int32 {
failureCount := int32(0)
for _, val := range sidekick.Status.FailureCount {
if val {
failureCount++
}
}
return failureCount
}

func (r *SidekickReconciler) getSidekickPhase(sidekick *appsv1alpha1.Sidekick, pod *corev1.Pod) appsv1alpha1.SideKickPhase {
// if restartPolicy is always, we will always try to keep a pod running
// if pod.status.phase == failed, then we will start a new pod
// TODO: which of these two should come first?
if sidekick.Spec.RestartPolicy == corev1.RestartPolicyAlways {
return appsv1alpha1.SideKickPhaseCurrent
}
if sidekick.Status.Phase == appsv1alpha1.SidekickPhaseSucceeded {
return appsv1alpha1.SidekickPhaseSucceeded
}

// now restartPolicy onFailure & Never remaining
// In both cases we return phase as succeeded if our
// pod return with exit code 0
if pod != nil && pod.Status.Phase == corev1.PodSucceeded && pod.ObjectMeta.DeletionTimestamp == nil {
return appsv1alpha1.SidekickPhaseSucceeded
}
//if sidekick.Status.Phase != appsv1alpha1.SidekickPhaseSucceeded && pod.Status.Phase == corev1.PodSucceeded && pod.ObjectMeta.DeletionTimestamp != nil {
// podSpecificHash := pod.Annotations[podHash]
// sidekick.Status.FailureCount[podSpecificHash] = true
//}
// Now we will figure if we should update the sidekick phase
// as failed or not by checking the backOffLimit

backOffCounts := getTotalBackOffCounts(sidekick)
// TODO: is it > or >= ?
if backOffCounts > *sidekick.Spec.BackoffLimit {
return appsv1alpha1.SideKickPhaseFailed
}
return appsv1alpha1.SideKickPhaseCurrent
}

func (r *SidekickReconciler) calculateSidekickPhase(ctx context.Context, sidekick *appsv1alpha1.Sidekick) (appsv1alpha1.SideKickPhase, error) {
if sidekick.Status.ContainerRestartCountsPerPod == nil {
sidekick.Status.ContainerRestartCountsPerPod = make(map[string]int32)
}
if sidekick.Status.FailureCount == nil {
sidekick.Status.FailureCount = make(map[string]bool)
}

var pod corev1.Pod
err := r.Get(ctx, client.ObjectKeyFromObject(sidekick), &pod)
if err != nil && !errors.IsNotFound(err) {
return sidekick.Status.Phase, err
}

if err == nil {
restartCounter := getContainerRestartCounts(&pod)
podSpecificHash := pod.Annotations[podHash]
sidekick.Status.ContainerRestartCountsPerPod[podSpecificHash] = restartCounter
if pod.Status.Phase == corev1.PodFailed && pod.ObjectMeta.DeletionTimestamp == nil {
klog.Infoln(pod.Status)
klog.Infoln("-------------- updating podspecific hash--------")
sidekick.Status.FailureCount[podSpecificHash] = true
// case: restartPolicy OnFailure and backofflimit crosses when last time pod restarts,
// in that situation we need to fail the pod, but this manual failure shouldn't take into account
if sidekick.Spec.RestartPolicy != corev1.RestartPolicyNever && getTotalBackOffCounts(sidekick) > *sidekick.Spec.BackoffLimit {
sidekick.Status.FailureCount[podSpecificHash] = false
}
}

}

phase := r.getSidekickPhase(sidekick, &pod)
sidekick.Status.Phase = phase
err = r.updateSidekickStatus(ctx, sidekick)
if err != nil {
return "", err
}
return phase, nil
}

func getTotalBackOffCounts(sidekick *appsv1alpha1.Sidekick) int32 {
// failureCount keeps track of the total number of pods that had pod.status.phase == failed
failureCount := getFailureCountFromSidekickStatus(sidekick)
// totalContainerRestartCount counts the overall
// restart counts of all the containers over all
// pods created by sidekick obj
totalContainerRestartCount := getTotalContainerRestartCounts(sidekick)
if sidekick.Spec.BackoffLimit == nil {
sidekick.Spec.BackoffLimit = ptr.To(int32(0))
}
klog.Infoln(*sidekick.Spec.BackoffLimit, totalContainerRestartCount, failureCount)
return failureCount + totalContainerRestartCount
}
Loading

0 comments on commit e0a5cae

Please sign in to comment.