diff --git a/operator/inventory/node-discovery.go b/operator/inventory/node-discovery.go index 433fb4a2..6626e780 100644 --- a/operator/inventory/node-discovery.go +++ b/operator/inventory/node-discovery.go @@ -362,22 +362,15 @@ func (dp *nodeDiscovery) monitor() error { bus.Unsub(scch) }() - podsWatch, err := kc.CoreV1().Pods(corev1.NamespaceAll).Watch(dp.ctx, metav1.ListOptions{ - FieldSelector: fields.OneTermEqualSelector("spec.nodeName", dp.name).String(), - }) - if err != nil { - log.Error(err, "unable to watch start pods") - return err - } - - defer podsWatch.Stop() - + var podsWatch watch.Interface var cfg Config var sc storageClasses var lastPubState nodeStateEnum gpusIDs := make(RegistryGPUVendors) currLabels := make(map[string]string) + currPods := make(map[string]corev1.Pod) + currPodsInitCount := 0 select { case <-dp.ctx.Done(): @@ -410,12 +403,63 @@ func (dp *nodeDiscovery) monitor() error { currLabels = copyAkashLabels(knode.Labels) } - node, initPods, err := dp.initNodeInfo(gpusIDs) + node, err := dp.initNodeInfo(gpusIDs) if err != nil { log.Error(err, "unable to init node info") return err } + restartWatcher := func() error { + var terr error + podsWatch, terr = kc.CoreV1().Pods(corev1.NamespaceAll).Watch(dp.ctx, metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("spec.nodeName", dp.name).String(), + }) + + if terr != nil { + log.Error(terr, "unable to watch start pods") + return terr + } + + pods, terr := kc.CoreV1().Pods(corev1.NamespaceAll).List(dp.ctx, metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("spec.nodeName", dp.name).String(), + }) + + if terr != nil { + return terr + } + + for name, pod := range currPods { + for _, container := range pod.Spec.Containers { + subAllocatedResources(&node, container.Resources.Requests) + } + + delete(currPods, name) + } + + for _, pod := range pods.Items { + for _, container := range pod.Spec.Containers { + addAllocatedResources(&node, container.Resources.Requests) + } + + currPods[pod.Name] = *pod.DeepCopy() + } + + currPodsInitCount = len(currPods) + + return nil + } + + err = restartWatcher() + if err != nil { + return err + } + + defer func() { + if podsWatch != nil { + podsWatch.Stop() + } + }() + statech := make(chan struct{}, 1) labelch := make(chan struct{}, 1) @@ -469,30 +513,47 @@ func (dp *nodeDiscovery) monitor() error { } case res, isopen := <-podsWatch.ResultChan(): if !isopen { - return errWorkerExit + podsWatch.Stop() + if err = restartWatcher(); err != nil { + return err + } + + continue } obj := res.Object.(*corev1.Pod) switch res.Type { case watch.Added: - if _, exists := initPods[obj.Name]; exists { - delete(initPods, obj.Name) - } else { + if _, exists := currPods[obj.Name]; !exists { + currPods[obj.Name] = *obj.DeepCopy() for _, container := range obj.Spec.Containers { addAllocatedResources(&node, container.Resources.Requests) } + } else { + currPodsInitCount-- } case watch.Deleted: - delete(initPods, obj.Name) + pod, exists := currPods[obj.Name] + if !exists { + log.Info("received pod delete event for item that does not exist. check node inventory logic, it's might have bug in it!") + break + } - for _, container := range obj.Spec.Containers { + for _, container := range pod.Spec.Containers { subAllocatedResources(&node, container.Resources.Requests) } + + if currPodsInitCount > 0 { + currPodsInitCount-- + } + + delete(currPods, obj.Name) } - if len(initPods) == 0 { + if currPodsInitCount == 0 { signalState() } + case <-statech: if len(currLabels) > 0 { bus.Pub(nodeState{ @@ -546,7 +607,7 @@ func (dp *nodeDiscovery) monitor() error { } } -func (dp *nodeDiscovery) initNodeInfo(gpusIds RegistryGPUVendors) (v1.Node, map[string]corev1.Pod, error) { +func (dp *nodeDiscovery) initNodeInfo(gpusIds RegistryGPUVendors) (v1.Node, error) { kc := fromctx.MustKubeClientFromCtx(dp.ctx) cpuInfo := dp.parseCPUInfo(dp.ctx) @@ -554,7 +615,7 @@ func (dp *nodeDiscovery) initNodeInfo(gpusIds RegistryGPUVendors) (v1.Node, map[ knode, err := kc.CoreV1().Nodes().Get(dp.ctx, dp.name, metav1.GetOptions{}) if err != nil { - return v1.Node{}, nil, fmt.Errorf("%w: error fetching node %s", err, dp.name) + return v1.Node{}, fmt.Errorf("%w: error fetching node %s", err, dp.name) } res := v1.Node{ @@ -593,31 +654,31 @@ func (dp *nodeDiscovery) initNodeInfo(gpusIds RegistryGPUVendors) (v1.Node, map[ } } - initPods := make(map[string]corev1.Pod) + // initPods := make(map[string]bool) - podsList, err := kc.CoreV1().Pods(corev1.NamespaceAll).List(dp.ctx, metav1.ListOptions{ - FieldSelector: fields.OneTermEqualSelector("spec.nodeName", dp.name).String(), - }) - if err != nil { - return res, nil, err - } + // podsList, err := kc.CoreV1().Pods(corev1.NamespaceAll).List(dp.ctx, metav1.ListOptions{ + // FieldSelector: fields.OneTermEqualSelector("spec.nodeName", dp.name).String(), + // }) + // if err != nil { + // return res, nil, err + // } - if podsList == nil { - return res, initPods, nil - } + // if podsList == nil { + // return res, initPods, nil + // } - for _, pod := range podsList.Items { - for _, container := range pod.Spec.Containers { - if container.Resources.Requests != nil { - addAllocatedResources(&res, container.Resources.Requests) - } else if container.Resources.Limits != nil { - addAllocatedResources(&res, container.Resources.Limits) - } - } - initPods[pod.Name] = pod - } + // for _, pod := range podsList.Items { + // for _, container := range pod.Spec.Containers { + // if container.Resources.Requests != nil { + // addAllocatedResources(&res, container.Resources.Requests) + // } else if container.Resources.Limits != nil { + // addAllocatedResources(&res, container.Resources.Limits) + // } + // } + // initPods[pod.Name] = true + // } - return res, initPods, nil + return res, nil } func addAllocatedResources(node *v1.Node, rl corev1.ResourceList) { @@ -722,8 +783,8 @@ func generateLabels(cfg Config, knode *corev1.Node, node v1.Node, sc storageClas node.Capabilities.StorageClasses = allowedSc - for _, gpu := range node.Resources.GPU.Info { - key := fmt.Sprintf("%s.vendor.%s.model.%s", builder.AkashServiceCapabilityGPU, gpu.Vendor, gpu.Name) + for _, info := range node.Resources.GPU.Info { + key := fmt.Sprintf("%s.vendor.%s.model.%s", builder.AkashServiceCapabilityGPU, info.Vendor, info.Name) if val, exists := res[key]; exists { nval, _ := strconv.ParseUint(val, 10, 32) nval++ @@ -732,8 +793,8 @@ func generateLabels(cfg Config, knode *corev1.Node, node v1.Node, sc storageClas res[key] = "1" } - if gpu.MemorySize != "" { - key := fmt.Sprintf("%s.ram.%s", key, gpu.MemorySize) + if info.MemorySize != "" { + key := fmt.Sprintf("%s.ram.%s", key, info.MemorySize) if val, exists := res[key]; exists { nval, _ := strconv.ParseUint(val, 10, 32) nval++ @@ -743,8 +804,8 @@ func generateLabels(cfg Config, knode *corev1.Node, node v1.Node, sc storageClas } } - if gpu.Interface != "" { - key := fmt.Sprintf("%s.interface.%s", key, gpu.Interface) + if info.Interface != "" { + key := fmt.Sprintf("%s.interface.%s", key, info.Interface) if val, exists := res[key]; exists { nval, _ := strconv.ParseUint(val, 10, 32) nval++ @@ -761,15 +822,12 @@ func generateLabels(cfg Config, knode *corev1.Node, node v1.Node, sc storageClas func (dp *nodeDiscovery) parseCPUInfo(ctx context.Context) v1.CPUInfoS { log := fromctx.LogrFromCtx(ctx).WithName("node.monitor") - log.Info("query cpu started") cpus, err := dp.queryCPU(ctx) if err != nil { log.Error(err, "unable to query cpu") return v1.CPUInfoS{} } - log.Info("query cpu done") - res := make(v1.CPUInfoS, 0, len(cpus.Processors)) for _, c := range cpus.Processors { @@ -789,15 +847,12 @@ func (dp *nodeDiscovery) parseGPUInfo(ctx context.Context, info RegistryGPUVendo log := fromctx.LogrFromCtx(ctx).WithName("node.monitor") - log.Info("query gpu started") gpus, err := dp.queryGPU(ctx) if err != nil { log.Error(err, "unable to query gpu") return res } - log.Info("query gpu done") - if gpus == nil { return res }