Skip to content

Commit

Permalink
fix(operator/inventory): restart pods watcher when it is timesout (#198)
Browse files Browse the repository at this point in the history
Signed-off-by: Artur Troian <[email protected]>
  • Loading branch information
troian authored Feb 21, 2024
1 parent c6e8c71 commit 1938a8b
Showing 1 changed file with 108 additions and 53 deletions.
161 changes: 108 additions & 53 deletions operator/inventory/node-discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,22 +362,15 @@ func (dp *nodeDiscovery) monitor() error {
bus.Unsub(scch)
}()

podsWatch, err := kc.CoreV1().Pods(corev1.NamespaceAll).Watch(dp.ctx, metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", dp.name).String(),
})
if err != nil {
log.Error(err, "unable to watch start pods")
return err
}

defer podsWatch.Stop()

var podsWatch watch.Interface
var cfg Config
var sc storageClasses
var lastPubState nodeStateEnum

gpusIDs := make(RegistryGPUVendors)
currLabels := make(map[string]string)
currPods := make(map[string]corev1.Pod)
currPodsInitCount := 0

select {
case <-dp.ctx.Done():
Expand Down Expand Up @@ -410,12 +403,63 @@ func (dp *nodeDiscovery) monitor() error {
currLabels = copyAkashLabels(knode.Labels)
}

node, initPods, err := dp.initNodeInfo(gpusIDs)
node, err := dp.initNodeInfo(gpusIDs)
if err != nil {
log.Error(err, "unable to init node info")
return err
}

restartWatcher := func() error {
var terr error
podsWatch, terr = kc.CoreV1().Pods(corev1.NamespaceAll).Watch(dp.ctx, metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", dp.name).String(),
})

if terr != nil {
log.Error(terr, "unable to watch start pods")
return terr
}

pods, terr := kc.CoreV1().Pods(corev1.NamespaceAll).List(dp.ctx, metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", dp.name).String(),
})

if terr != nil {
return terr
}

for name, pod := range currPods {
for _, container := range pod.Spec.Containers {
subAllocatedResources(&node, container.Resources.Requests)
}

delete(currPods, name)
}

for _, pod := range pods.Items {
for _, container := range pod.Spec.Containers {
addAllocatedResources(&node, container.Resources.Requests)
}

currPods[pod.Name] = *pod.DeepCopy()
}

currPodsInitCount = len(currPods)

return nil
}

err = restartWatcher()
if err != nil {
return err
}

defer func() {
if podsWatch != nil {
podsWatch.Stop()
}
}()

statech := make(chan struct{}, 1)
labelch := make(chan struct{}, 1)

Expand Down Expand Up @@ -469,30 +513,47 @@ func (dp *nodeDiscovery) monitor() error {
}
case res, isopen := <-podsWatch.ResultChan():
if !isopen {
return errWorkerExit
podsWatch.Stop()
if err = restartWatcher(); err != nil {
return err
}

continue
}

obj := res.Object.(*corev1.Pod)
switch res.Type {
case watch.Added:
if _, exists := initPods[obj.Name]; exists {
delete(initPods, obj.Name)
} else {
if _, exists := currPods[obj.Name]; !exists {
currPods[obj.Name] = *obj.DeepCopy()
for _, container := range obj.Spec.Containers {
addAllocatedResources(&node, container.Resources.Requests)
}
} else {
currPodsInitCount--
}
case watch.Deleted:
delete(initPods, obj.Name)
pod, exists := currPods[obj.Name]
if !exists {
log.Info("received pod delete event for item that does not exist. check node inventory logic, it's might have bug in it!")
break
}

for _, container := range obj.Spec.Containers {
for _, container := range pod.Spec.Containers {
subAllocatedResources(&node, container.Resources.Requests)
}

if currPodsInitCount > 0 {
currPodsInitCount--
}

delete(currPods, obj.Name)
}

if len(initPods) == 0 {
if currPodsInitCount == 0 {
signalState()
}

case <-statech:
if len(currLabels) > 0 {
bus.Pub(nodeState{
Expand Down Expand Up @@ -546,15 +607,15 @@ func (dp *nodeDiscovery) monitor() error {
}
}

func (dp *nodeDiscovery) initNodeInfo(gpusIds RegistryGPUVendors) (v1.Node, map[string]corev1.Pod, error) {
func (dp *nodeDiscovery) initNodeInfo(gpusIds RegistryGPUVendors) (v1.Node, error) {
kc := fromctx.MustKubeClientFromCtx(dp.ctx)

cpuInfo := dp.parseCPUInfo(dp.ctx)
gpuInfo := dp.parseGPUInfo(dp.ctx, gpusIds)

knode, err := kc.CoreV1().Nodes().Get(dp.ctx, dp.name, metav1.GetOptions{})
if err != nil {
return v1.Node{}, nil, fmt.Errorf("%w: error fetching node %s", err, dp.name)
return v1.Node{}, fmt.Errorf("%w: error fetching node %s", err, dp.name)
}

res := v1.Node{
Expand Down Expand Up @@ -593,31 +654,31 @@ func (dp *nodeDiscovery) initNodeInfo(gpusIds RegistryGPUVendors) (v1.Node, map[
}
}

initPods := make(map[string]corev1.Pod)
// initPods := make(map[string]bool)

podsList, err := kc.CoreV1().Pods(corev1.NamespaceAll).List(dp.ctx, metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", dp.name).String(),
})
if err != nil {
return res, nil, err
}
// podsList, err := kc.CoreV1().Pods(corev1.NamespaceAll).List(dp.ctx, metav1.ListOptions{
// FieldSelector: fields.OneTermEqualSelector("spec.nodeName", dp.name).String(),
// })
// if err != nil {
// return res, nil, err
// }

if podsList == nil {
return res, initPods, nil
}
// if podsList == nil {
// return res, initPods, nil
// }

for _, pod := range podsList.Items {
for _, container := range pod.Spec.Containers {
if container.Resources.Requests != nil {
addAllocatedResources(&res, container.Resources.Requests)
} else if container.Resources.Limits != nil {
addAllocatedResources(&res, container.Resources.Limits)
}
}
initPods[pod.Name] = pod
}
// for _, pod := range podsList.Items {
// for _, container := range pod.Spec.Containers {
// if container.Resources.Requests != nil {
// addAllocatedResources(&res, container.Resources.Requests)
// } else if container.Resources.Limits != nil {
// addAllocatedResources(&res, container.Resources.Limits)
// }
// }
// initPods[pod.Name] = true
// }

return res, initPods, nil
return res, nil
}

func addAllocatedResources(node *v1.Node, rl corev1.ResourceList) {
Expand Down Expand Up @@ -722,8 +783,8 @@ func generateLabels(cfg Config, knode *corev1.Node, node v1.Node, sc storageClas

node.Capabilities.StorageClasses = allowedSc

for _, gpu := range node.Resources.GPU.Info {
key := fmt.Sprintf("%s.vendor.%s.model.%s", builder.AkashServiceCapabilityGPU, gpu.Vendor, gpu.Name)
for _, info := range node.Resources.GPU.Info {
key := fmt.Sprintf("%s.vendor.%s.model.%s", builder.AkashServiceCapabilityGPU, info.Vendor, info.Name)
if val, exists := res[key]; exists {
nval, _ := strconv.ParseUint(val, 10, 32)
nval++
Expand All @@ -732,8 +793,8 @@ func generateLabels(cfg Config, knode *corev1.Node, node v1.Node, sc storageClas
res[key] = "1"
}

if gpu.MemorySize != "" {
key := fmt.Sprintf("%s.ram.%s", key, gpu.MemorySize)
if info.MemorySize != "" {
key := fmt.Sprintf("%s.ram.%s", key, info.MemorySize)
if val, exists := res[key]; exists {
nval, _ := strconv.ParseUint(val, 10, 32)
nval++
Expand All @@ -743,8 +804,8 @@ func generateLabels(cfg Config, knode *corev1.Node, node v1.Node, sc storageClas
}
}

if gpu.Interface != "" {
key := fmt.Sprintf("%s.interface.%s", key, gpu.Interface)
if info.Interface != "" {
key := fmt.Sprintf("%s.interface.%s", key, info.Interface)
if val, exists := res[key]; exists {
nval, _ := strconv.ParseUint(val, 10, 32)
nval++
Expand All @@ -761,15 +822,12 @@ func generateLabels(cfg Config, knode *corev1.Node, node v1.Node, sc storageClas
func (dp *nodeDiscovery) parseCPUInfo(ctx context.Context) v1.CPUInfoS {
log := fromctx.LogrFromCtx(ctx).WithName("node.monitor")

log.Info("query cpu started")
cpus, err := dp.queryCPU(ctx)
if err != nil {
log.Error(err, "unable to query cpu")
return v1.CPUInfoS{}
}

log.Info("query cpu done")

res := make(v1.CPUInfoS, 0, len(cpus.Processors))

for _, c := range cpus.Processors {
Expand All @@ -789,15 +847,12 @@ func (dp *nodeDiscovery) parseGPUInfo(ctx context.Context, info RegistryGPUVendo

log := fromctx.LogrFromCtx(ctx).WithName("node.monitor")

log.Info("query gpu started")
gpus, err := dp.queryGPU(ctx)
if err != nil {
log.Error(err, "unable to query gpu")
return res
}

log.Info("query gpu done")

if gpus == nil {
return res
}
Expand Down

0 comments on commit 1938a8b

Please sign in to comment.