Skip to content

Commit

Permalink
Feature: Throttling job creation #161
Browse files Browse the repository at this point in the history
  • Loading branch information
senthilrch committed Nov 11, 2022
1 parent b46f464 commit 84d7488
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 12 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ For more detailed description, go through _kube-fledged's_ [design proposal](doc

`--job-retention-policy:` Determines if the jobs created by kubefledged-controller would be deleted or retained (for debugging) after it finishes. Possible values are 'delete' and 'retain'. default value is 'delete'.

`--jobs-max-surge:` Maximum no. of active jobs allowed. default: max surge checks disabled.

`--service-account-name:` serviceAccountName used in Jobs created for pulling or deleting images. Optional flag. If not specified the default service account of the namespace is used

`--stderrthreshold:` Log level. set the value of this flag to INFO
Expand Down
5 changes: 3 additions & 2 deletions cmd/controller/app/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ func NewController(
imageDeleteJobHostNetwork bool,
jobPriorityClassName string,
canDeleteJob bool,
criSocketPath string) *Controller {
criSocketPath string,
jobsMaxSurge int) *Controller {

runtime.Must(fledgedscheme.AddToScheme(scheme.Scheme))
glog.V(4).Info("Creating event broadcaster")
Expand All @@ -126,7 +127,7 @@ func NewController(
imageManager, _ := images.NewImageManager(controller.workqueue, controller.imageworkqueue,
controller.kubeclientset, controller.fledgedNameSpace, imagePullDeadlineDuration,
criClientImage, busyboxImage, imagePullPolicy, serviceAccountName, imageDeleteJobHostNetwork,
jobPriorityClassName, canDeleteJob, criSocketPath)
jobPriorityClassName, canDeleteJob, criSocketPath, jobsMaxSurge)
controller.imageManager = imageManager

glog.Info("Setting up event handlers")
Expand Down
3 changes: 2 additions & 1 deletion cmd/controller/app/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func newTestController(kubeclientset kubernetes.Interface, fledgedclientset clie
jobPriorityClassName := "priority-class-kube-fledged"
canDelete := false
socketPath := ""
jobsMaxSurge := 0

/* startInformers := true
if startInformers {
Expand All @@ -81,7 +82,7 @@ func newTestController(kubeclientset kubernetes.Interface, fledgedclientset clie
fledgedclientset, fledgedNameSpace, nodeInformer, imagecacheInformer,
imageCacheRefreshFrequency, imagePullDeadlineDuration, criClientImage,
busyboxImage, imagePullPolicy, serviceAccountName, imageDeleteJobHostNetwork,
jobPriorityClassName, canDelete, socketPath)
jobPriorityClassName, canDelete, socketPath, jobsMaxSurge)
controller.nodesSynced = func() bool { return true }
controller.imageCachesSynced = func() bool { return true }
return controller, nodeInformer, imagecacheInformer
Expand Down
8 changes: 7 additions & 1 deletion cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var (
//Default value for when `--job-retention-policy` flag is not set
canDeleteJob bool = true
criSocketPath string
jobsMaxSurge int
)

func main() {
Expand Down Expand Up @@ -79,7 +80,7 @@ func main() {
fledgedInformerFactory.Kubefledged().V1alpha2().ImageCaches(),
imageCacheRefreshFrequency, imagePullDeadlineDuration, criClientImage,
busyboxImage, imagePullPolicy, serviceAccountName, imageDeleteJobHostNetwork,
jobPriorityClassName, canDeleteJob, criSocketPath)
jobPriorityClassName, canDeleteJob, criSocketPath, jobsMaxSurge)

glog.Info("Starting pre-flight checks")
if err = controller.PreFlightChecks(); err != nil {
Expand Down Expand Up @@ -135,4 +136,9 @@ func init() {
},
)
flag.StringVar(&criSocketPath, "cri-socket-path", "", "path to the cri socket on the node e.g. /run/containerd/containerd.sock (default: /var/run/docker.sock, /run/containerd/containerd.sock, /var/run/crio/crio.sock)")
flag.IntVar(&jobsMaxSurge, "jobs-max-surge", 0, "maximum no. of active jobs allowed. default: max surge checks disabled")
if jobsMaxSurge < 0 {
jobsMaxSurge = 0
glog.Warningf("--jobs-max-surge set to incorrect value. max surge checks will be disabled")
}
}
19 changes: 17 additions & 2 deletions pkg/images/image_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type ImageManager struct {
jobPriorityClassName string
canDeleteJob bool
criSocketPath string
jobsMaxSurge int
lock sync.RWMutex
}

Expand Down Expand Up @@ -129,7 +130,8 @@ func NewImageManager(
imageDeleteJobHostNetwork bool,
jobPriorityClassName string,
canDeleteJob bool,
criSocketPath string) (*ImageManager, coreinformers.PodInformer) {
criSocketPath string,
jobsMaxSurge int) (*ImageManager, coreinformers.PodInformer) {

appEqKubefledged, _ := labels.NewRequirement("app", selection.Equals, []string{"kubefledged"})
kubefledgedEqImagemanager, _ := labels.NewRequirement("kubefledged", selection.Equals, []string{"kubefledged-image-manager"})
Expand Down Expand Up @@ -162,6 +164,7 @@ func NewImageManager(
jobPriorityClassName: jobPriorityClassName,
canDeleteJob: canDeleteJob,
criSocketPath: criSocketPath,
jobsMaxSurge: jobsMaxSurge,
}
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
//AddFunc: ,
Expand Down Expand Up @@ -398,7 +401,7 @@ func (m *ImageManager) Run(stopCh <-chan struct{}) error {
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (m *ImageManager) runWorker() {
for m.processNextWorkItem() {
for m.jobsMaxSurge > 0 && m.checkJobsMaxSurge() && m.processNextWorkItem() {
}
}

Expand Down Expand Up @@ -531,3 +534,15 @@ func (m *ImageManager) deleteImage(iwr ImageWorkRequest) (*batchv1.Job, error) {
}
return job, nil
}

func (m *ImageManager) checkJobsMaxSurge() bool {
m.lock.Lock()
defer m.lock.Unlock()
var activeJobs int = 0
for _, iwres := range m.imageworkstatus {
if iwres.Status == ImageWorkResultStatusJobCreated {
activeJobs++
}
}
return activeJobs < m.jobsMaxSurge
}
13 changes: 7 additions & 6 deletions pkg/images/image_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var node = corev1.Node{

func newTestImageManager(kubeclientset kubernetes.Interface, imagepullpolicy string,
serviceaccountname string, imagedeletejobhostnetwork bool,
jobpriorityclassname string, candeletejob bool, criSocketPath string) (*ImageManager, coreinformers.PodInformer) {
jobpriorityclassname string, candeletejob bool, criSocketPath string, jobsMaxSurge int) (*ImageManager, coreinformers.PodInformer) {
imagePullDeadlineDuration := time.Millisecond * 10
criClientImage := "senthilrch/fledged-docker-client:latest"
busyboxImage := "senthilrch/busybox:1.35.0"
Expand All @@ -56,12 +56,13 @@ func newTestImageManager(kubeclientset kubernetes.Interface, imagepullpolicy str
jobPriorityClassName := jobpriorityclassname
canDeleteJob := candeletejob
socketPath := criSocketPath
maxSurge := jobsMaxSurge
imagecacheworkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ImageCaches")
imageworkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ImagePullerStatus")

imagemanager, podInformer := NewImageManager(imagecacheworkqueue, imageworkqueue, kubeclientset,
fledgedNameSpace, imagePullDeadlineDuration, criClientImage, busyboxImage, imagePullPolicy,
serviceAccountName, imageDeleteJobHostNetwork, jobPriorityClassName, canDeleteJob, socketPath)
serviceAccountName, imageDeleteJobHostNetwork, jobPriorityClassName, canDeleteJob, socketPath, maxSurge)
imagemanager.podsSynced = func() bool { return true }

return imagemanager, podInformer
Expand Down Expand Up @@ -213,7 +214,7 @@ func TestPullDeleteImage(t *testing.T) {
})
}

imagemanager, _ := newTestImageManager(fakekubeclientset, "IfNotPresent", "sa-kube-fledged", false, "priority-class-kube-fledged", false, "")
imagemanager, _ := newTestImageManager(fakekubeclientset, "IfNotPresent", "sa-kube-fledged", false, "priority-class-kube-fledged", false, "", 0)
var err error
if test.action == "pullimage" {
_, err = imagemanager.pullImage(test.iwr)
Expand Down Expand Up @@ -311,7 +312,7 @@ func TestHandlePodStatusChange(t *testing.T) {
}
for _, test := range tests {
fakekubeclientset := &fakeclientset.Clientset{}
imagemanager, _ := newTestImageManager(fakekubeclientset, "IfNotPresent", "sa-kube-fledged", false, "priority-class-kube-fledged", false, "")
imagemanager, _ := newTestImageManager(fakekubeclientset, "IfNotPresent", "sa-kube-fledged", false, "priority-class-kube-fledged", false, "", 0)
imagemanager.imageworkstatus[test.pod.Labels["job-name"]] = ImageWorkResult{
Status: ImageWorkResultStatusJobCreated,
ImageWorkRequest: ImageWorkRequest{
Expand Down Expand Up @@ -624,7 +625,7 @@ func TestUpdateImageCacheStatus(t *testing.T) {
return true, nil, apierrors.NewInternalError(fmt.Errorf("fake error"))
})
}
imagemanager, podInformer := newTestImageManager(fakekubeclientset, "IfNotPresent", "sa-kube-fledged", false, "priority-class-kube-fledged", false, "")
imagemanager, podInformer := newTestImageManager(fakekubeclientset, "IfNotPresent", "sa-kube-fledged", false, "priority-class-kube-fledged", false, "", 0)
for _, pod := range test.pods {
if !reflect.DeepEqual(pod, corev1.Pod{}) {
podInformer.Informer().GetIndexer().Add(&pod)
Expand Down Expand Up @@ -903,7 +904,7 @@ func TestProcessNextWorkItem(t *testing.T) {
}
for _, test := range tests {
fakekubeclientset := &fakeclientset.Clientset{}
imagemanager, podInformer := newTestImageManager(fakekubeclientset, test.imagepullpolicy, "sa-kube-fledged", false, "priority-class-kube-fledged", false, "")
imagemanager, podInformer := newTestImageManager(fakekubeclientset, test.imagepullpolicy, "sa-kube-fledged", false, "priority-class-kube-fledged", false, "", 0)
for _, pod := range test.pods {
if !reflect.DeepEqual(pod, corev1.Pod{}) {
podInformer.Informer().GetIndexer().Add(&pod)
Expand Down

0 comments on commit 84d7488

Please sign in to comment.