Skip to content

Commit

Permalink
Merge pull request #2866 from lowang-bh/bindEnhancement
Browse files Browse the repository at this point in the history
fix: events miss when there is a pod bind failed
  • Loading branch information
volcano-sh-bot authored Dec 14, 2023
2 parents 29e2bbf + b48c7e6 commit c3d501b
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 14 deletions.
24 changes: 15 additions & 9 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,18 @@ type imageState struct {
nodes sets.String
}

// DefaultBinder with kube client and event recorder
type DefaultBinder struct {
// kubeclient *kubernetes.Clientset
kubeclient kubernetes.Interface
recorder record.EventRecorder
}

// Bind will send bind request to api server
func (db *DefaultBinder) Bind(kubeClient kubernetes.Interface, tasks []*schedulingapi.TaskInfo) ([]*schedulingapi.TaskInfo, error) {
var errTasks []*schedulingapi.TaskInfo
for _, task := range tasks {
p := task.Pod
if err := kubeClient.CoreV1().Pods(p.Namespace).Bind(context.TODO(),
if err := db.kubeclient.CoreV1().Pods(p.Namespace).Bind(context.TODO(),
&v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID, Annotations: p.Annotations},
Target: v1.ObjectReference{
Expand All @@ -178,6 +180,7 @@ func (db *DefaultBinder) Bind(kubeClient kubernetes.Interface, tasks []*scheduli
klog.Errorf("Failed to bind pod <%v/%v> to node %s : %#v", p.Namespace, p.Name, task.NodeName, err)
errTasks = append(errTasks, task)
} else {
db.recorder.Eventf(task.Pod, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", task.Namespace, task.Name, task.NodeName)
metrics.UpdateTaskScheduleDuration(metrics.Duration(p.CreationTimestamp.Time)) // update metrics as soon as pod is bind
}
}
Expand All @@ -189,8 +192,12 @@ func (db *DefaultBinder) Bind(kubeClient kubernetes.Interface, tasks []*scheduli
return nil, nil
}

func NewBinder() *DefaultBinder {
return &DefaultBinder{}
// NewDefaultBinder create binder with kube client and event recorder, support fake binder if passed fake client and fake event recorder
func NewDefaultBinder(kbclient kubernetes.Interface, record record.EventRecorder) *DefaultBinder {
return &DefaultBinder{
kubeclient: kbclient,
recorder: record,
}
}

type defaultEvictor struct {
Expand Down Expand Up @@ -476,6 +483,10 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
sc.Recorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: commonutil.GenerateComponentName(sc.schedulerNames)})

sc.BindFlowChannel = make(chan *schedulingapi.TaskInfo, 5000)
if bindMethodMap == nil {
klog.V(3).Info("no registered bind method, new a default one")
bindMethodMap = NewDefaultBinder(sc.kubeClient, sc.Recorder)
}
sc.Binder = GetBindMethod()

var batchNum int
Expand Down Expand Up @@ -812,11 +823,6 @@ func (sc *SchedulerCache) Bind(tasks []*schedulingapi.TaskInfo) {
errTasks, err := sc.Binder.Bind(sc.kubeClient, tasks)
if err == nil {
klog.V(3).Infof("bind ok, latency %v", time.Since(tmp))
// TODO: need to move this event recording into Bind so that record it as soon as pod is bind
for _, task := range tasks {
sc.Recorder.Eventf(task.Pod, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v",
task.Namespace, task.Name, task.NodeName)
}
} else {
for _, task := range errTasks {
klog.V(2).Infof("resyncTask task %s", task.Name)
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func TestBindTasks(t *testing.T) {
nodeQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
}

sc.Binder = &DefaultBinder{}
sc.Binder = &DefaultBinder{sc.kubeClient, sc.Recorder}
sc.VolumeBinder = &defaultVolumeBinder{
volumeBinder: volumescheduling.NewVolumeBinder(
sc.kubeClient,
Expand Down
4 changes: 0 additions & 4 deletions pkg/scheduler/cache/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,3 @@ func RegisterBindMethod(binder Binder) {
func GetBindMethod() Binder {
return bindMethodMap
}

func init() {
RegisterBindMethod(NewBinder())
}

0 comments on commit c3d501b

Please sign in to comment.