diff --git a/README.md b/README.md index 2aa7d80b..aa307e22 100644 --- a/README.md +++ b/README.md @@ -307,6 +307,8 @@ For more detailed description, go through _kube-fledged's_ [design proposal](doc `--job-retention-policy:` Determines if the jobs created by kubefledged-controller would be deleted or retained (for debugging) after it finishes. Possible values are 'delete' and 'retain'. default value is 'delete'. +`--jobs-max-surge:` Maximum no. of active jobs allowed. default: max surge checks disabled. + `--service-account-name:` serviceAccountName used in Jobs created for pulling or deleting images. Optional flag. If not specified the default service account of the namespace is used `--stderrthreshold:` Log level. set the value of this flag to INFO diff --git a/cmd/controller/app/controller.go b/cmd/controller/app/controller.go index 68bf14f2..fc3a633d 100644 --- a/cmd/controller/app/controller.go +++ b/cmd/controller/app/controller.go @@ -100,7 +100,8 @@ func NewController( imageDeleteJobHostNetwork bool, jobPriorityClassName string, canDeleteJob bool, - criSocketPath string) *Controller { + criSocketPath string, + jobsMaxSurge int) *Controller { runtime.Must(fledgedscheme.AddToScheme(scheme.Scheme)) glog.V(4).Info("Creating event broadcaster") @@ -126,7 +127,7 @@ func NewController( imageManager, _ := images.NewImageManager(controller.workqueue, controller.imageworkqueue, controller.kubeclientset, controller.fledgedNameSpace, imagePullDeadlineDuration, criClientImage, busyboxImage, imagePullPolicy, serviceAccountName, imageDeleteJobHostNetwork, - jobPriorityClassName, canDeleteJob, criSocketPath) + jobPriorityClassName, canDeleteJob, criSocketPath, jobsMaxSurge) controller.imageManager = imageManager glog.Info("Setting up event handlers") diff --git a/cmd/controller/app/controller_test.go b/cmd/controller/app/controller_test.go index 0d95fc4c..091a70ea 100644 --- a/cmd/controller/app/controller_test.go +++ b/cmd/controller/app/controller_test.go @@ -68,6 +68,7 @@ func newTestController(kubeclientset kubernetes.Interface, fledgedclientset clie jobPriorityClassName := "priority-class-kube-fledged" canDelete := false socketPath := "" + jobsMaxSurge := 0 /* startInformers := true if startInformers { @@ -81,7 +82,7 @@ func newTestController(kubeclientset kubernetes.Interface, fledgedclientset clie fledgedclientset, fledgedNameSpace, nodeInformer, imagecacheInformer, imageCacheRefreshFrequency, imagePullDeadlineDuration, criClientImage, busyboxImage, imagePullPolicy, serviceAccountName, imageDeleteJobHostNetwork, - jobPriorityClassName, canDelete, socketPath) + jobPriorityClassName, canDelete, socketPath, jobsMaxSurge) controller.nodesSynced = func() bool { return true } controller.imageCachesSynced = func() bool { return true } return controller, nodeInformer, imagecacheInformer diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 65423c91..5926678d 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -48,6 +48,7 @@ var ( //Default value for when `--job-retention-policy` flag is not set canDeleteJob bool = true criSocketPath string + jobsMaxSurge int ) func main() { @@ -79,7 +80,7 @@ func main() { fledgedInformerFactory.Kubefledged().V1alpha2().ImageCaches(), imageCacheRefreshFrequency, imagePullDeadlineDuration, criClientImage, busyboxImage, imagePullPolicy, serviceAccountName, imageDeleteJobHostNetwork, - jobPriorityClassName, canDeleteJob, criSocketPath) + jobPriorityClassName, canDeleteJob, criSocketPath, jobsMaxSurge) glog.Info("Starting pre-flight checks") if err = controller.PreFlightChecks(); err != nil { @@ -135,4 +136,9 @@ func init() { }, ) flag.StringVar(&criSocketPath, "cri-socket-path", "", "path to the cri socket on the node e.g. /run/containerd/containerd.sock (default: /var/run/docker.sock, /run/containerd/containerd.sock, /var/run/crio/crio.sock)") + flag.IntVar(&jobsMaxSurge, "jobs-max-surge", 0, "maximum no. of active jobs allowed. default: max surge checks disabled") + if jobsMaxSurge < 0 { + jobsMaxSurge = 0 + glog.Warningf("--jobs-max-surge set to incorrect value. max surge checks will be disabled") + } } diff --git a/deploy/kubefledged-operator/helm-charts/kubefledged/README.md b/deploy/kubefledged-operator/helm-charts/kubefledged/README.md index 71636a3f..cc7f239c 100644 --- a/deploy/kubefledged-operator/helm-charts/kubefledged/README.md +++ b/deploy/kubefledged-operator/helm-charts/kubefledged/README.md @@ -48,6 +48,7 @@ Kube-fledged is a kubernetes operator for creating and managing a cache of conta | args.controllerImagePullPolicy | IfNotPresent | Image pull policy for pulling images into and refreshing the cache. Possible values are 'IfNotPresent' and 'Always'. Default value is 'IfNotPresent'. Image with no or ":latest" tag are always pulled | | args.controllerJobPriorityClassName | "" | priorityClassName of jobs created by kubefledged-controller. If not specified, priorityClassName won't be set | | args.controllerJobRetentionPolicy | "delete" | Determines if the jobs created by kubefledged-controller would be deleted or retained (for debugging) after it finishes. Possible values are 'delete' and 'retain'. default value is 'delete'. | +| args.controllerJobsMaxSurge | | Maximum no. of active jobs allowed. default: max surge checks disabled | | args.controllerServiceAccountName | "" | serviceAccountName used in Jobs created for pulling or deleting images. Optional flag. If not specified the default service account of the namespace is used | | args.controllerLogLevel | INFO | Log level of kubefledged-controller | | args.webhookServerCertFile | /var/run/secrets/webhook-server/tls.crt | Path of server certificate of kubefledged-webhook-server | diff --git a/deploy/kubefledged-operator/helm-charts/kubefledged/templates/deployment-controller.yaml b/deploy/kubefledged-operator/helm-charts/kubefledged/templates/deployment-controller.yaml index 8fc7b3da..79db4405 100644 --- a/deploy/kubefledged-operator/helm-charts/kubefledged/templates/deployment-controller.yaml +++ b/deploy/kubefledged-operator/helm-charts/kubefledged/templates/deployment-controller.yaml @@ -50,7 +50,10 @@ spec: {{- end }} {{- if .Values.args.controllerCRISocketPath }} - "--cri-socket-path={{ .Values.args.controllerCRISocketPath }}" - {{- end }} + {{- end }} + {{- if .Values.args.controllerJobsMaxSurge }} + - "--jobs-max-surge={{ .Values.args.controllerJobsMaxSurge }}" + {{- end }} imagePullPolicy: {{ .Values.image.pullPolicy }} env: - name: KUBEFLEDGED_NAMESPACE diff --git a/deploy/kubefledged-operator/helm-charts/kubefledged/values.yaml b/deploy/kubefledged-operator/helm-charts/kubefledged/values.yaml index ee02b531..9d7e7f7c 100644 --- a/deploy/kubefledged-operator/helm-charts/kubefledged/values.yaml +++ b/deploy/kubefledged-operator/helm-charts/kubefledged/values.yaml @@ -31,6 +31,7 @@ args: controllerJobPriorityClassName: "" controllerJobRetentionPolicy: "delete" controllerCRISocketPath: "" + controllerJobsMaxSurge: 0 webhookServerLogLevel: INFO webhookServerCertFile: /var/run/secrets/webhook-server/tls.crt webhookServerKeyFile: /var/run/secrets/webhook-server/tls.key diff --git a/docs/helm-parameters.md b/docs/helm-parameters.md index 2c01af92..a5f6d507 100644 --- a/docs/helm-parameters.md +++ b/docs/helm-parameters.md @@ -20,6 +20,7 @@ | args.controllerImagePullPolicy | IfNotPresent | Image pull policy for pulling images into and refreshing the cache. Possible values are 'IfNotPresent' and 'Always'. Default value is 'IfNotPresent'. Image with no or ":latest" tag are always pulled | | args.controllerJobPriorityClassName | "" | priorityClassName of jobs created by kubefledged-controller. If not specified, priorityClassName won't be set | | args.controllerJobRetentionPolicy | "delete" | Determines if the jobs created by kubefledged-controller would be deleted or retained (for debugging) after it finishes. Possible values are 'delete' and 'retain'. default value is 'delete'. | +| args.controllerJobsMaxSurge | | Maximum no. of active jobs allowed. default: max surge checks disabled | | args.controllerServiceAccountName | "" | serviceAccountName used in Jobs created for pulling or deleting images. Optional flag. If not specified the default service account of the namespace is used | | args.controllerLogLevel | INFO | Log level of kubefledged-controller | | args.webhookServerCertFile | /var/run/secrets/webhook-server/tls.crt | Path of server certificate of kubefledged-webhook-server | diff --git a/pkg/images/image_manager.go b/pkg/images/image_manager.go index 278d415b..5777344f 100644 --- a/pkg/images/image_manager.go +++ b/pkg/images/image_manager.go @@ -77,6 +77,7 @@ type ImageManager struct { jobPriorityClassName string canDeleteJob bool criSocketPath string + jobsMaxSurge int lock sync.RWMutex } @@ -129,7 +130,8 @@ func NewImageManager( imageDeleteJobHostNetwork bool, jobPriorityClassName string, canDeleteJob bool, - criSocketPath string) (*ImageManager, coreinformers.PodInformer) { + criSocketPath string, + jobsMaxSurge int) (*ImageManager, coreinformers.PodInformer) { appEqKubefledged, _ := labels.NewRequirement("app", selection.Equals, []string{"kubefledged"}) kubefledgedEqImagemanager, _ := labels.NewRequirement("kubefledged", selection.Equals, []string{"kubefledged-image-manager"}) @@ -162,6 +164,7 @@ func NewImageManager( jobPriorityClassName: jobPriorityClassName, canDeleteJob: canDeleteJob, criSocketPath: criSocketPath, + jobsMaxSurge: jobsMaxSurge, } podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ //AddFunc: , @@ -398,7 +401,7 @@ func (m *ImageManager) Run(stopCh <-chan struct{}) error { // processNextWorkItem function in order to read and process a message on the // workqueue. func (m *ImageManager) runWorker() { - for m.processNextWorkItem() { + for m.checkJobsMaxSurge() && m.processNextWorkItem() { } } @@ -531,3 +534,18 @@ func (m *ImageManager) deleteImage(iwr ImageWorkRequest) (*batchv1.Job, error) { } return job, nil } + +func (m *ImageManager) checkJobsMaxSurge() bool { + if m.jobsMaxSurge == 0 { + return true + } + m.lock.Lock() + defer m.lock.Unlock() + var activeJobs int = 0 + for _, iwres := range m.imageworkstatus { + if iwres.Status == ImageWorkResultStatusJobCreated { + activeJobs++ + } + } + return activeJobs < m.jobsMaxSurge +} diff --git a/pkg/images/image_manager_test.go b/pkg/images/image_manager_test.go index cd2e7e5f..62335e6b 100644 --- a/pkg/images/image_manager_test.go +++ b/pkg/images/image_manager_test.go @@ -46,7 +46,7 @@ var node = corev1.Node{ func newTestImageManager(kubeclientset kubernetes.Interface, imagepullpolicy string, serviceaccountname string, imagedeletejobhostnetwork bool, - jobpriorityclassname string, candeletejob bool, criSocketPath string) (*ImageManager, coreinformers.PodInformer) { + jobpriorityclassname string, candeletejob bool, criSocketPath string, jobsMaxSurge int) (*ImageManager, coreinformers.PodInformer) { imagePullDeadlineDuration := time.Millisecond * 10 criClientImage := "senthilrch/fledged-docker-client:latest" busyboxImage := "senthilrch/busybox:1.35.0" @@ -56,12 +56,13 @@ func newTestImageManager(kubeclientset kubernetes.Interface, imagepullpolicy str jobPriorityClassName := jobpriorityclassname canDeleteJob := candeletejob socketPath := criSocketPath + maxSurge := jobsMaxSurge imagecacheworkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ImageCaches") imageworkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ImagePullerStatus") imagemanager, podInformer := NewImageManager(imagecacheworkqueue, imageworkqueue, kubeclientset, fledgedNameSpace, imagePullDeadlineDuration, criClientImage, busyboxImage, imagePullPolicy, - serviceAccountName, imageDeleteJobHostNetwork, jobPriorityClassName, canDeleteJob, socketPath) + serviceAccountName, imageDeleteJobHostNetwork, jobPriorityClassName, canDeleteJob, socketPath, maxSurge) imagemanager.podsSynced = func() bool { return true } return imagemanager, podInformer @@ -213,7 +214,7 @@ func TestPullDeleteImage(t *testing.T) { }) } - imagemanager, _ := newTestImageManager(fakekubeclientset, "IfNotPresent", "sa-kube-fledged", false, "priority-class-kube-fledged", false, "") + imagemanager, _ := newTestImageManager(fakekubeclientset, "IfNotPresent", "sa-kube-fledged", false, "priority-class-kube-fledged", false, "", 0) var err error if test.action == "pullimage" { _, err = imagemanager.pullImage(test.iwr) @@ -311,7 +312,7 @@ func TestHandlePodStatusChange(t *testing.T) { } for _, test := range tests { fakekubeclientset := &fakeclientset.Clientset{} - imagemanager, _ := newTestImageManager(fakekubeclientset, "IfNotPresent", "sa-kube-fledged", false, "priority-class-kube-fledged", false, "") + imagemanager, _ := newTestImageManager(fakekubeclientset, "IfNotPresent", "sa-kube-fledged", false, "priority-class-kube-fledged", false, "", 0) imagemanager.imageworkstatus[test.pod.Labels["job-name"]] = ImageWorkResult{ Status: ImageWorkResultStatusJobCreated, ImageWorkRequest: ImageWorkRequest{ @@ -624,7 +625,7 @@ func TestUpdateImageCacheStatus(t *testing.T) { return true, nil, apierrors.NewInternalError(fmt.Errorf("fake error")) }) } - imagemanager, podInformer := newTestImageManager(fakekubeclientset, "IfNotPresent", "sa-kube-fledged", false, "priority-class-kube-fledged", false, "") + imagemanager, podInformer := newTestImageManager(fakekubeclientset, "IfNotPresent", "sa-kube-fledged", false, "priority-class-kube-fledged", false, "", 0) for _, pod := range test.pods { if !reflect.DeepEqual(pod, corev1.Pod{}) { podInformer.Informer().GetIndexer().Add(&pod) @@ -903,7 +904,7 @@ func TestProcessNextWorkItem(t *testing.T) { } for _, test := range tests { fakekubeclientset := &fakeclientset.Clientset{} - imagemanager, podInformer := newTestImageManager(fakekubeclientset, test.imagepullpolicy, "sa-kube-fledged", false, "priority-class-kube-fledged", false, "") + imagemanager, podInformer := newTestImageManager(fakekubeclientset, test.imagepullpolicy, "sa-kube-fledged", false, "priority-class-kube-fledged", false, "", 0) for _, pod := range test.pods { if !reflect.DeepEqual(pod, corev1.Pod{}) { podInformer.Informer().GetIndexer().Add(&pod)