diff --git a/_docs/kustomize/akash-operator-inventory/cluster-roles.yaml b/_docs/kustomize/akash-operator-inventory/cluster-roles.yaml index 05c24d8d..b163fd05 100644 --- a/_docs/kustomize/akash-operator-inventory/cluster-roles.yaml +++ b/_docs/kustomize/akash-operator-inventory/cluster-roles.yaml @@ -15,7 +15,6 @@ rules: resources: - namespaces - nodes - - pods - events - services - persistentvolumes @@ -30,6 +29,16 @@ rules: - nodes verbs: - patch + - apiGroups: + - '' + resources: + - pods + verbs: + - create + - delete + - get + - list + - watch - apiGroups: - '' resources: @@ -40,6 +49,12 @@ rules: - get - list - watch + - apiGroups: + - '' + resources: + - pods/proxy + verbs: + - get - apiGroups: - storage.k8s.io resources: @@ -75,27 +90,15 @@ rules: - get - list - watch - - apiGroups: - - '' - resources: - - configmaps - verbs: - - create - - update - - patch - - delete - - get - - list - - watch --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: - name: operator-inventory-node + name: operator-inventory-hardware-discovery labels: akash.network: "true" app.kubernetes.io/name: inventory - app.kubernetes.io/instance: inventory-node + app.kubernetes.io/instance: inventory-hardware-discovery app.kubernetes.io/component: operator app.kubernetes.io/part-of: provider rules: diff --git a/_docs/kustomize/akash-operator-inventory/daemonset.yaml b/_docs/kustomize/akash-operator-inventory/daemonset.yaml deleted file mode 100644 index b5039e38..00000000 --- a/_docs/kustomize/akash-operator-inventory/daemonset.yaml +++ /dev/null @@ -1,77 +0,0 @@ ---- -apiVersion: apps/v1 -kind: DaemonSet -metadata: - name: operator-inventory-node - namespace: akash-services - labels: - akash.network: "true" - app.kubernetes.io/name: inventory - app.kubernetes.io/instance: inventory-node - app.kubernetes.io/component: operator - app.kubernetes.io/part-of: provider -spec: - selector: - matchLabels: - app.kubernetes.io/name: inventory - app.kubernetes.io/instance: inventory-node - app.kubernetes.io/component: operator - app.kubernetes.io/part-of: provider - template: - metadata: - labels: - akash.network: "true" - app.kubernetes.io/name: inventory - app.kubernetes.io/instance: inventory-node - app.kubernetes.io/component: operator - app.kubernetes.io/part-of: provider - spec: - serviceAccountName: operator-inventory-node - containers: - - name: inventory-node - image: ghcr.io/akash-network/provider - args: - - "provider-services" - - "operator" - - "inventory" - - "node" - imagePullPolicy: IfNotPresent - ports: - - containerPort: 8080 - name: api - protocol: TCP - - containerPort: 8081 - name: grpc - protocol: TCP - resources: - requests: - memory: "64Mi" - cpu: "250m" - limits: - memory: "128Mi" - cpu: "500m" - livenessProbe: - httpGet: - path: /metrics/health - port: api - scheme: HTTP - initialDelaySeconds: 5 - periodSeconds: 5 - readinessProbe: - httpGet: - path: /metrics/ready - port: api - scheme: HTTP - initialDelaySeconds: 5 - periodSeconds: 5 - env: - - name: PCIDB_ENABLE_NETWORK_FETCH - value: "1" - - name: AP_POD_NAME - valueFrom: - fieldRef: - fieldPath: metadata.name - - name: AP_NODE_NAME - valueFrom: - fieldRef: - fieldPath: spec.nodeName diff --git a/_docs/kustomize/akash-operator-inventory/deployment.yaml b/_docs/kustomize/akash-operator-inventory/deployment.yaml index dc93d96b..dfa56295 100644 --- a/_docs/kustomize/akash-operator-inventory/deployment.yaml +++ b/_docs/kustomize/akash-operator-inventory/deployment.yaml @@ -39,6 +39,14 @@ spec: env: - name: AP_CONFIG value: /akash/config.yaml + - name: AP_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: AP_POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace resources: limits: cpu: 500m diff --git a/_docs/kustomize/akash-operator-inventory/kustomization.yaml b/_docs/kustomize/akash-operator-inventory/kustomization.yaml index 05e939c0..ff80d5ae 100644 --- a/_docs/kustomize/akash-operator-inventory/kustomization.yaml +++ b/_docs/kustomize/akash-operator-inventory/kustomization.yaml @@ -7,7 +7,6 @@ resources: - cluster-roles.yaml - role-bindings.yaml - service.yaml - - daemonset.yaml - deployment.yaml configMapGenerator: - name: operator-inventory diff --git a/_docs/kustomize/akash-operator-inventory/role-bindings.yaml b/_docs/kustomize/akash-operator-inventory/role-bindings.yaml index f8e82b53..6dbc1889 100644 --- a/_docs/kustomize/akash-operator-inventory/role-bindings.yaml +++ b/_docs/kustomize/akash-operator-inventory/role-bindings.yaml @@ -21,20 +21,20 @@ subjects: apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: - name: operator-inventory-node + name: operator-inventory-hardware-discovery labels: akash.network: "true" app.kubernetes.io/name: inventory - app.kubernetes.io/instance: inventory-node + app.kubernetes.io/instance: inventory-hardware-discovery app.kubernetes.io/component: operator app.kubernetes.io/part-of: provider roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole - name: operator-inventory-node + name: operator-inventory-hardware-discovery subjects: - kind: ServiceAccount - name: operator-inventory-node + name: operator-inventory-hardware-discovery namespace: akash-services --- apiVersion: rbac.authorization.k8s.io/v1 diff --git a/_docs/kustomize/akash-operator-inventory/service-accounts.yaml b/_docs/kustomize/akash-operator-inventory/service-accounts.yaml index 22bd52fe..24c43915 100644 --- a/_docs/kustomize/akash-operator-inventory/service-accounts.yaml +++ b/_docs/kustomize/akash-operator-inventory/service-accounts.yaml @@ -15,11 +15,11 @@ automountServiceAccountToken: true apiVersion: v1 kind: ServiceAccount metadata: - name: operator-inventory-node + name: operator-inventory-hardware-discovery namespace: akash-services labels: akash.network: "true" app.kubernetes.io/name: inventory - app.kubernetes.io/instance: inventory-node + app.kubernetes.io/instance: inventory-hardware-discovery app.kubernetes.io/component: operator app.kubernetes.io/part-of: provider diff --git a/_docs/kustomize/templates/akash-operator-inventory/kustomization.yaml b/_docs/kustomize/templates/akash-operator-inventory/kustomization.yaml index 18cfe34e..3c5a3ae1 100644 --- a/_docs/kustomize/templates/akash-operator-inventory/kustomization.yaml +++ b/_docs/kustomize/templates/akash-operator-inventory/kustomization.yaml @@ -12,9 +12,3 @@ patches: group: apps name: operator-inventory version: v1 - - path: docker-image.yaml - target: - kind: DaemonSet - group: apps - name: operator-inventory-node - version: v1 diff --git a/_run/common-kube.mk b/_run/common-kube.mk index cf3fa43e..553df543 100644 --- a/_run/common-kube.mk +++ b/_run/common-kube.mk @@ -147,7 +147,6 @@ kube-setup-ingress-calico: .PHONY: kube-setup-ingress-default kube-setup-ingress-default: - kubectl label nodes $(KIND_NAME)-control-plane akash.network/role=ingress kubectl apply -f "$(INGRESS_CONFIG_PATH)" kubectl rollout status deployment -n ingress-nginx ingress-nginx-controller --timeout=$(KUBE_ROLLOUT_TIMEOUT)s kubectl apply -f "$(METALLB_CONFIG_PATH)" @@ -163,8 +162,6 @@ kube-status-ingress-%: kube-deployment-rollout-operator-inventory: kubectl -n akash-services rollout status daemonset operator-inventory-node --timeout=$(KUBE_ROLLOUT_TIMEOUT)s kubectl -n akash-services wait pods -l app.kubernetes.io/part-of=provider -l app.kubernetes.io/component=operator -l app.kubernetes.io/instance=inventory-node --for condition=Ready --timeout=$(KUBE_ROLLOUT_TIMEOUT)s - kubectl -n akash-services rollout status deployment operator-inventory --timeout=$(KUBE_ROLLOUT_TIMEOUT)s - kubectl -n akash-services wait pods -l app.kubernetes.io/part-of=provider -l app.kubernetes.io/component=operator -l app.kubernetes.io/instance=inventory-service --for condition=Ready --timeout=$(KUBE_ROLLOUT_TIMEOUT)s .PHONY: kube-deployment-rollout-% kube-deployment-rollout-%: @@ -192,13 +189,12 @@ akash-node-ready: ) -.PHONY: kube-operator-inventory-logs -kube-operator-inventory-logs: +.PHONY: kube-logs-operator-inventory +kube-logs-operator-inventory: kubectl -n akash-services logs -f \ -l app.kubernetes.io/part-of=provider,app.kubernetes.io/component=operator,app.kubernetes.io/instance=inventory-service,app.kubernetes.io/name=inventory -.PHONY: kube-operator-inventory-node-logs -kube-operator-inventory-node-logs: - kubectl -n akash-services logs -f \ - -l app.kubernetes.io/part-of=provider,app.kubernetes.io/component=operator,app.kubernetes.io/instance=inventory-node,app.kubernetes.io/name=inventory - +#.PHONY: kube-operator-inventory-node-logs +#kube-operator-inventory-node-logs: +# kubectl -n akash-services logs -f \ +# -l app.kubernetes.io/part-of=provider,app.kubernetes.io/component=operator,app.kubernetes.io/instance=inventory-node,app.kubernetes.io/name=inventory diff --git a/_run/kind-config-calico.yaml b/_run/kind-config-calico.yaml index 5c3acf4e..8d35864c 100644 --- a/_run/kind-config-calico.yaml +++ b/_run/kind-config-calico.yaml @@ -1,17 +1,18 @@ +--- kind: Cluster apiVersion: kind.x-k8s.io/v1alpha4 networking: disableDefaultCNI: true #podSubnet: 192.168.0.0/16 # Default Calico subnet nodes: -- role: control-plane - kubeadmConfigPatches: - - kind: InitConfiguration - nodeRegistration: - kubeletExtraArgs: - node-labels: "ingress-ready=true" - extraPortMappings: - - containerPort: 80 - protocol: TCP - - containerPort: 443 - protocol: TCP + - role: control-plane + kubeadmConfigPatches: + - kind: InitConfiguration + nodeRegistration: + kubeletExtraArgs: + node-labels: "ingress-ready=true" + extraPortMappings: + - containerPort: 80 + protocol: TCP + - containerPort: 443 + protocol: TCP diff --git a/_run/kube/kind-config-80.yaml b/_run/kube/kind-config-80.yaml index a85fe0d6..22e0b06e 100644 --- a/_run/kube/kind-config-80.yaml +++ b/_run/kube/kind-config-80.yaml @@ -1,14 +1,15 @@ +--- kind: Cluster apiVersion: kind.x-k8s.io/v1alpha4 nodes: -- role: control-plane - kubeadmConfigPatches: - - | - kind: InitConfiguration - nodeRegistration: - kubeletExtraArgs: - node-labels: "ingress-ready=true" - extraPortMappings: - - containerPort: 80 - hostPort: 80 - protocol: TCP + - role: control-plane + kubeadmConfigPatches: + - | + kind: InitConfiguration + nodeRegistration: + kubeletExtraArgs: + node-labels: "ingress-ready=true" + extraPortMappings: + - containerPort: 80 + hostPort: 80 + protocol: TCP diff --git a/_run/kube/kind-config.yaml b/_run/kube/kind-config.yaml index 43ad9149..a36333b7 100644 --- a/_run/kube/kind-config.yaml +++ b/_run/kube/kind-config.yaml @@ -1,13 +1,14 @@ +--- kind: Cluster apiVersion: kind.x-k8s.io/v1alpha4 nodes: -- role: control-plane - kubeadmConfigPatches: - - | - kind: InitConfiguration - nodeRegistration: - kubeletExtraArgs: - node-labels: "ingress-ready=true" - extraPortMappings: - - containerPort: 80 - protocol: TCP + - role: control-plane + kubeadmConfigPatches: + - | + kind: InitConfiguration + nodeRegistration: + kubeletExtraArgs: + node-labels: "ingress-ready=true" + extraPortMappings: + - containerPort: 80 + protocol: TCP diff --git a/cluster/kube/builder/builder.go b/cluster/kube/builder/builder.go index 07329ac5..73aa163e 100644 --- a/cluster/kube/builder/builder.go +++ b/cluster/kube/builder/builder.go @@ -25,6 +25,7 @@ const ( AkashNetworkStorageClasses = "akash.network/storageclasses" AkashServiceTarget = "akash.network/service-target" AkashServiceCapabilityGPU = "akash.network/capabilities.gpu" + AkashServiceCapabilityStorage = "akash.network/capabilities.storage" AkashMetalLB = "metal-lb" akashDeploymentPolicyName = "akash-deployment-restrictions" akashNetworkNamespace = "akash.network/namespace" diff --git a/cluster/kube/builder/workload.go b/cluster/kube/builder/workload.go index 1e178084..48683f28 100644 --- a/cluster/kube/builder/workload.go +++ b/cluster/kube/builder/workload.go @@ -218,17 +218,41 @@ func (b *Workload) replicas() *int32 { } func (b *Workload) affinity() *corev1.Affinity { + service := &b.deployment.ManifestGroup().Services[b.serviceIdx] svc := b.deployment.ClusterParams().SchedulerParams[b.serviceIdx] - if svc == nil || svc.Resources == nil { - return nil + selectors := []corev1.NodeSelectorRequirement{ + { + Key: AkashManagedLabelName, + Operator: corev1.NodeSelectorOpIn, + Values: []string{ + "true", + }, + }, } - selectors := nodeSelectorsFromResources(svc.Resources) - if len(selectors) == 0 { - return nil + if svc != nil && svc.Resources != nil { + selectors = append(selectors, nodeSelectorsFromResources(svc.Resources)...) } + for _, storage := range service.Resources.Storage { + attr := storage.Attributes.Find(sdl.StorageAttributePersistent) + if persistent, valid := attr.AsBool(); !valid || !persistent { + continue + } + + attr = storage.Attributes.Find(sdl.StorageAttributeClass) + if class, valid := attr.AsString(); valid { + selectors = append(selectors, corev1.NodeSelectorRequirement{ + Key: fmt.Sprintf("%s.class.%s", AkashServiceCapabilityStorage, class), + Operator: corev1.NodeSelectorOpGt, + Values: []string{ + "0", + }, + }) + } + + } affinity := &corev1.Affinity{ NodeAffinity: &corev1.NodeAffinity{ RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ @@ -252,13 +276,35 @@ func nodeSelectorsFromResources(res *crd.SchedulerResources) []corev1.NodeSelect var selectors []corev1.NodeSelectorRequirement if gpu := res.GPU; gpu != nil { + key := fmt.Sprintf("%s.vendor.%s.model.%s", AkashServiceCapabilityGPU, gpu.Vendor, gpu.Model) + selectors = append(selectors, corev1.NodeSelectorRequirement{ - Key: fmt.Sprintf("%s.vendor.%s.model.%s", AkashServiceCapabilityGPU, gpu.Vendor, gpu.Model), - Operator: "In", + Key: key, + Operator: corev1.NodeSelectorOpGt, Values: []string{ - "true", + "0", }, }) + + if gpu.MemorySize != "" { + selectors = append(selectors, corev1.NodeSelectorRequirement{ + Key: fmt.Sprintf("%s.ram.%s", key, gpu.MemorySize), + Operator: corev1.NodeSelectorOpGt, + Values: []string{ + "0", + }, + }) + } + + if gpu.Interface != "" { + selectors = append(selectors, corev1.NodeSelectorRequirement{ + Key: fmt.Sprintf("%s.interface.%s", key, gpu.MemorySize), + Operator: corev1.NodeSelectorOpGt, + Values: []string{ + "0", + }, + }) + } } return selectors diff --git a/cluster/kube/clientcommon/open_kube_config.go b/cluster/kube/clientcommon/open_kube_config.go index 093c94a0..bff1d4ba 100644 --- a/cluster/kube/clientcommon/open_kube_config.go +++ b/cluster/kube/clientcommon/open_kube_config.go @@ -12,7 +12,8 @@ import ( func OpenKubeConfig(cfgPath string, log log.Logger) (*rest.Config, error) { // Always bypass the default rate limiting - rateLimiter := flowcontrol.NewTokenBucketRateLimiter(1000, 3000) + rateLimiter := flowcontrol.NewFakeAlwaysRateLimiter() + // rateLimiter := flowcontrol.NewTokenBucketRateLimiter(1000, 3000) // if cfgPath contains value it is either set to default value $HOME/.kube/config // or explicitly by env/flag AP_KUBECONFIG/--kubeconfig diff --git a/operator/inventory/annotation.go b/operator/inventory/annotation.go index 16974117..cfa1c502 100644 --- a/operator/inventory/annotation.go +++ b/operator/inventory/annotation.go @@ -1,20 +1,25 @@ package inventory import ( + "crypto/sha256" + "encoding/hex" "encoding/json" "errors" "fmt" "sort" "strings" + inventoryV1 "github.com/akash-network/akash-api/go/inventory/v1" "github.com/blang/semver/v4" "gopkg.in/yaml.v3" ) const ( - sdlVersionField = "version" + annotationVersionField = "version" - AnnotationKeyCapabilities = "akash.network/capabilities" + AnnotationKeyCapabilitiesSHA256 = "akash.network/capabilities.sha256" + AnnotationKeyCapabilities = "akash.network/capabilities" + AnnotationNodeSelfCapabilities = "akash.network/node.self.capabilities" ) var ( @@ -25,15 +30,28 @@ var ( errCapabilitiesUnsupportedVersion = fmt.Errorf("%w: unsupported version", errCapabilitiesInvalid) ) +type CapabilitiesV1GPU struct { + Vendor string `json:"vendor"` + VendorID string `json:"vendor_id"` + Model string `json:"model"` + ModelID string `json:"model_id"` + MemorySize string `json:"memory_size"` + Interface string `json:"interface"` +} + +// type CapabilitiesV1GPUs []CapabilitiesV1GPU + type CapabilitiesV1 struct { - StorageClasses []string `json:"storage_classes"` + StorageClasses []string `json:"storage_classes"` + GPUs inventoryV1.GPUInfoS `json:"gpus"` } type Capabilities interface{} type AnnotationCapabilities struct { - Version semver.Version `json:"version" yaml:"version"` - Capabilities `yaml:",inline"` + Version semver.Version `json:"version" yaml:"version"` + LastAppliedSHA256 string `json:"last_applied_sha256" yaml:"last_applied_sha256"` + Capabilities `yaml:",inline"` } var ( @@ -59,6 +77,22 @@ func NewAnnotationCapabilities(sc []string) *AnnotationCapabilities { return res } + +func NewAnnotationNodeSelfCapabilities(sc []string) *AnnotationCapabilities { + caps := &CapabilitiesV1{ + StorageClasses: make([]string, len(sc)), + } + + copy(caps.StorageClasses, sc) + + res := &AnnotationCapabilities{ + Version: semver.Version{Major: 1}, + Capabilities: caps, + } + + return res +} + func (s *CapabilitiesV1) RemoveClass(name string) bool { for i, c := range s.StorageClasses { if c == name { @@ -71,12 +105,12 @@ func (s *CapabilitiesV1) RemoveClass(name string) bool { return false } -func parseNodeCapabilities(annotations map[string]string) (*AnnotationCapabilities, error) { +func parseNodeCapabilities(annotations map[string]string) (*AnnotationCapabilities, []byte, error) { res := &AnnotationCapabilities{} val, exists := annotations[AnnotationKeyCapabilities] if !exists { - return res, nil + return res, []byte{}, nil } var err error @@ -87,10 +121,20 @@ func parseNodeCapabilities(annotations map[string]string) (*AnnotationCapabiliti } if err != nil { - return nil, err + return nil, []byte{}, err + } + + val, exists = annotations[AnnotationKeyCapabilitiesSHA256] + if !exists { + return res, []byte{}, nil + } + + checksum, err := hex.DecodeString(val) + if err != nil && len(checksum) != sha256.Size { + return res, []byte{}, nil } - return res, nil + return res, checksum, nil } func (s *AnnotationCapabilities) UnmarshalYAML(node *yaml.Node) error { @@ -98,7 +142,7 @@ func (s *AnnotationCapabilities) UnmarshalYAML(node *yaml.Node) error { foundVersion := false for idx := range node.Content { - if node.Content[idx].Value == sdlVersionField { + if node.Content[idx].Value == annotationVersionField { var err error if result.Version, err = semver.ParseTolerant(node.Content[idx+1].Value); err != nil { return fmt.Errorf("%w: %w", errCapabilitiesInvalidVersion, err) @@ -140,13 +184,9 @@ func (s *AnnotationCapabilities) UnmarshalJSON(data []byte) error { return fmt.Errorf("%w: %w", errCapabilitiesInvalidContent, err) } - if _, exists := core[sdlVersionField]; !exists { - return errCapabilitiesInvalidNoVersion - } - result := AnnotationCapabilities{} - if val, valid := core[sdlVersionField].(string); valid { + if val, valid := core[annotationVersionField].(string); valid { if result.Version, err = semver.ParseTolerant(val); err != nil { return fmt.Errorf("%w: %w", errCapabilitiesInvalidVersion, err) } @@ -183,12 +223,24 @@ func (s *AnnotationCapabilities) MarshalJSON() ([]byte, error) { // nolint: gocritic switch caps := s.Capabilities.(type) { case *CapabilitiesV1: + data, err := json.Marshal(caps) + if err != nil { + return nil, err + } + + enc := sha256.New() + if _, err = enc.Write(data); err != nil { + return nil, err + } + obj = struct { - Version semver.Version `json:"version"` + Version semver.Version `json:"version"` + LastAppliedSHA256 string `json:"last_applied_sha256"` CapabilitiesV1 }{ - Version: s.Version, - CapabilitiesV1: *caps, + Version: s.Version, + LastAppliedSHA256: hex.EncodeToString(enc.Sum(nil)), + CapabilitiesV1: *caps, } } diff --git a/operator/inventory/ceph.go b/operator/inventory/ceph.go index d903b4e4..ca8c1eea 100644 --- a/operator/inventory/ceph.go +++ b/operator/inventory/ceph.go @@ -164,9 +164,7 @@ func (c *ceph) crdInstalled(log logr.Logger, rc *rookclientset.Clientset) bool { func (c *ceph) run(startch chan<- struct{}) error { bus := fromctx.PubSubFromCtx(c.ctx) - cephClustersTopic := "cephclusters" - - events := bus.Sub("ns", "sc", "pv", cephClustersTopic) + events := bus.Sub(topicKubeNS, topicKubeSC, topicKubePV, topicKubeCephClusters) defer bus.Unsub(events) @@ -209,7 +207,7 @@ func (c *ceph) run(startch chan<- struct{}) error { InformKubeObjects(c.ctx, bus, informer, - cephClustersTopic) + topicKubeCephClusters) } else { crdDiscoverTick.Reset(crdDiscoverPeriod) } @@ -293,7 +291,7 @@ func (c *ceph) run(startch chan<- struct{}) error { bus.Pub(storageSignal{ driver: "ceph", storage: res.storage, - }, []string{topicStorage}) + }, []string{topicInventoryStorage}) } scrapech = c.scrapech diff --git a/operator/inventory/cmd.go b/operator/inventory/cmd.go index a3ba7eb5..c4956a1d 100644 --- a/operator/inventory/cmd.go +++ b/operator/inventory/cmd.go @@ -1,15 +1,20 @@ package inventory import ( + "bytes" "context" + "crypto/sha256" + "crypto/tls" "encoding/json" "errors" "fmt" + "io" "net" "net/http" "strings" "time" + "github.com/akash-network/akash-api/go/grpc/gogoreflection" "github.com/fsnotify/fsnotify" "github.com/go-logr/logr" "github.com/go-logr/zapr" @@ -23,8 +28,10 @@ import ( "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" - "google.golang.org/grpc/reflection" "google.golang.org/protobuf/types/known/emptypb" + storagev1 "k8s.io/api/storage/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" @@ -119,6 +126,7 @@ func Cmd() *cobra.Command { RunE: func(cmd *cobra.Command, args []string) error { ctx := cmd.Context() + log := fromctx.LogrFromCtx(cmd.Context()) bus := fromctx.PubSubFromCtx(ctx) group := fromctx.ErrGroupFromCtx(ctx) @@ -133,7 +141,31 @@ func Cmd() *cobra.Command { return err } - fd := newFeatureDiscovery(ctx) + kubecfg := fromctx.KubeConfigFromCtx(ctx) + + discoveryImage := viper.GetString(FlagDiscoveryImage) + namespace := viper.GetString(FlagPodNamespace) + + if kubecfg.BearerTokenFile != "/var/run/secrets/kubernetes.io/serviceaccount/token" { + log.Info("service is not running as kubernetes pod. detecting discovery image name from flags") + } else { + name := viper.GetString(FlagPodName) + kc := fromctx.KubeClientFromCtx(ctx) + + pod, err := kc.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return err + } + + for _, container := range pod.Spec.Containers { + if container.Name == "operator-inventory" { + discoveryImage = container.Image + break + } + } + } + + fd := newClusterNodes(ctx, discoveryImage, namespace) storage = append(storage, st) @@ -156,8 +188,6 @@ func Cmd() *cobra.Command { restEndpoint := fmt.Sprintf(":%d", restPort) grpcEndpoint := fmt.Sprintf(":%d", grpcPort) - log := fromctx.LogrFromCtx(ctx) - restSrv := &http.Server{ Addr: restEndpoint, Handler: newServiceRouter(apiTimeout, queryTimeout), @@ -173,11 +203,20 @@ func Cmd() *cobra.Command { PermitWithoutStream: false, })) - inventory.RegisterClusterRPCServer(grpcSrv, &grpcServiceServer{ + gSrc := &grpcServiceServer{ ctx: ctx, + } + + inventory.RegisterClusterRPCServer(grpcSrv, gSrc) + gogoreflection.Register(grpcSrv) + + group.Go(func() error { + return registryLoader(ctx) }) - reflection.Register(grpcSrv) + group.Go(func() error { + return scWatcher(ctx) + }) group.Go(func() error { return configWatcher(ctx, viper.GetString(FlagConfig)) @@ -221,22 +260,22 @@ func Cmd() *cobra.Command { InformKubeObjects(ctx, bus, factory.Core().V1().Namespaces().Informer(), - "ns") + topicKubeNS) InformKubeObjects(ctx, bus, factory.Storage().V1().StorageClasses().Informer(), - "sc") + topicKubeSC) InformKubeObjects(ctx, bus, factory.Core().V1().PersistentVolumes().Informer(), - "pv") + topicKubePV) InformKubeObjects(ctx, bus, factory.Core().V1().Nodes().Informer(), - "nodes") + topicKubeNodes) fromctx.StartupChFromCtx(ctx) <- struct{}{} err = group.Wait() @@ -254,23 +293,23 @@ func Cmd() *cobra.Command { panic(err) } - cmd.PersistentFlags().Duration(FlagAPITimeout, 3*time.Second, "api timeout") - if err = viper.BindPFlag(FlagAPITimeout, cmd.PersistentFlags().Lookup(FlagAPITimeout)); err != nil { + cmd.Flags().Duration(FlagAPITimeout, 3*time.Second, "api timeout") + if err = viper.BindPFlag(FlagAPITimeout, cmd.Flags().Lookup(FlagAPITimeout)); err != nil { panic(err) } - cmd.PersistentFlags().Duration(FlagQueryTimeout, 2*time.Second, "query timeout") - if err = viper.BindPFlag(FlagQueryTimeout, cmd.PersistentFlags().Lookup(FlagQueryTimeout)); err != nil { + cmd.Flags().Duration(FlagQueryTimeout, 2*time.Second, "query timeout") + if err = viper.BindPFlag(FlagQueryTimeout, cmd.Flags().Lookup(FlagQueryTimeout)); err != nil { panic(err) } - cmd.PersistentFlags().Uint16(FlagRESTPort, 8080, "port to REST api") - if err = viper.BindPFlag(FlagRESTPort, cmd.PersistentFlags().Lookup(FlagRESTPort)); err != nil { + cmd.Flags().Uint16(FlagRESTPort, 8080, "port to REST api") + if err = viper.BindPFlag(FlagRESTPort, cmd.Flags().Lookup(FlagRESTPort)); err != nil { panic(err) } - cmd.PersistentFlags().Uint16(FlagGRPCPort, 8081, "port to GRPC api") - if err = viper.BindPFlag(FlagGRPCPort, cmd.PersistentFlags().Lookup(FlagGRPCPort)); err != nil { + cmd.Flags().Uint16(FlagGRPCPort, 8081, "port to GRPC api") + if err = viper.BindPFlag(FlagGRPCPort, cmd.Flags().Lookup(FlagGRPCPort)); err != nil { panic(err) } @@ -284,7 +323,20 @@ func Cmd() *cobra.Command { panic(err) } - cmd.AddCommand(cmdFeatureDiscoveryNode()) + cmd.Flags().String(FlagDiscoveryImage, "ghcr.io/akash-network/provider", "hardware discovery docker image") + if err = viper.BindPFlag(FlagDiscoveryImage, cmd.Flags().Lookup(FlagDiscoveryImage)); err != nil { + panic(err) + } + + cmd.Flags().String(FlagPodNamespace, "akash-services", "namespace for discovery pods") + if err = viper.BindPFlag(FlagPodNamespace, cmd.Flags().Lookup(FlagPodNamespace)); err != nil { + panic(err) + } + + cmd.Flags().String(FlagProviderConfigsURL, defaultProviderConfigsURL, "provider configs server") + if err := viper.BindPFlag(FlagProviderConfigsURL, cmd.Flags().Lookup(FlagProviderConfigsURL)); err != nil { + panic(err) + } return cmd } @@ -334,7 +386,7 @@ func configWatcher(ctx context.Context, file string) error { bus := fromctx.PubSubFromCtx(ctx) - bus.Pub(config, []string{"config"}, pubsub.WithRetain()) + bus.Pub(config, []string{topicInventoryConfig}, pubsub.WithRetain()) for { select { @@ -346,7 +398,162 @@ func configWatcher(ctx context.Context, file string) error { } else if evt.Has(fsnotify.Remove) { config, _ = loadConfig("", true) } - bus.Pub(config, []string{"config"}, pubsub.WithRetain()) + bus.Pub(config, []string{topicInventoryConfig}, pubsub.WithRetain()) + } + } +} + +// this function is piece of sh*t. refactor it! +func registryLoader(ctx context.Context) error { + log := fromctx.LogrFromCtx(ctx).WithName("watcher.registry") + bus := fromctx.PubSubFromCtx(ctx) + + tlsConfig := http.DefaultTransport.(*http.Transport).TLSClientConfig + + cl := &http.Client{ + Transport: &http.Transport{ + DialTLSContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + return tls.Dial(network, addr, tlsConfig) + }, + }, + } + + urlGPU := fmt.Sprintf("%s/devices/gpus", strings.TrimSuffix(viper.GetString(FlagProviderConfigsURL), "/")) + urlPcieDB := viper.GetString(FlagPciDbURL) + + var gpuCurrHash []byte + var pcidbHash []byte + + gpuIDs := make(RegistryGPUVendors) + + queryGPUs := func() bool { + res, err := cl.Get(urlGPU) + if err != nil { + log.Error(err, "couldn't query inventory registry") + return false + } + + defer func() { + _ = res.Body.Close() + }() + + if res.StatusCode != http.StatusOK { + return false + } + + gpus, err := io.ReadAll(res.Body) + if err != nil { + return false + } + + upstreamHash := sha256.New() + _, _ = upstreamHash.Write(gpus) + newHash := upstreamHash.Sum(nil) + + if bytes.Equal(gpuCurrHash, newHash) { + return false + } + + _ = json.Unmarshal(gpus, &gpuIDs) + + gpuCurrHash = newHash + + return true + } + + queryPCI := func() bool { + res, err := cl.Get(urlPcieDB) + if err != nil { + log.Error(err, "couldn't query pci.ids") + return false + } + + defer func() { + _ = res.Body.Close() + }() + + if res.StatusCode != http.StatusOK { + return false + } + + pcie, err := io.ReadAll(res.Body) + if err != nil { + return false + } + + upstreamHash := sha256.New() + _, _ = upstreamHash.Write(pcie) + newHash := upstreamHash.Sum(nil) + + if bytes.Equal(pcidbHash, newHash) { + return false + } + + pcidbHash = newHash + + return true + } + + queryGPUs() + bus.Pub(gpuIDs, []string{topicGPUIDs}) + + queryPeriod := viper.GetDuration(FlagRegistryQueryPeriod) + tmGPU := time.NewTimer(queryPeriod) + tmPCIe := time.NewTimer(24 * time.Hour) + + for { + select { + case <-ctx.Done(): + if !tmGPU.Stop() { + <-tmGPU.C + } + + if !tmPCIe.Stop() { + <-tmPCIe.C + } + + return ctx.Err() + case <-tmGPU.C: + if queryGPUs() { + bus.Pub(gpuIDs, []string{topicGPUIDs}) + } + tmGPU.Reset(queryPeriod) + case <-tmPCIe.C: + queryPCI() + + tmGPU.Reset(24 * time.Hour) + } + } +} + +func scWatcher(ctx context.Context) error { + bus := fromctx.PubSubFromCtx(ctx) + + scch := bus.Sub(topicKubeSC) + + sc := make(storageClasses) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case rEvt := <-scch: + evt, valid := rEvt.(watch.Event) + if !valid { + continue + } + + switch obj := evt.Object.(type) { + case *storagev1.StorageClass: + switch evt.Type { + case watch.Added: + sc[obj.Name] = obj.DeepCopy() + case watch.Deleted: + delete(sc, obj.Name) + } + } + + bus.Pub(sc.copy(), []string{topicStorageClasses}, pubsub.WithRetain()) } } } @@ -476,10 +683,10 @@ func (gm *grpcServiceServer) QueryCluster(ctx context.Context, _ *emptypb.Empty) func (gm *grpcServiceServer) StreamCluster(_ *emptypb.Empty, stream inventory.ClusterRPC_StreamClusterServer) error { bus := fromctx.PubSubFromCtx(gm.ctx) - subch := bus.Sub(topicClusterState) + subch := bus.Sub(topicInventoryCluster) defer func() { - bus.Unsub(subch, topicClusterState) + bus.Unsub(subch, topicInventoryCluster) }() loop: diff --git a/operator/inventory/feature-discovery-client.go b/operator/inventory/feature-discovery-client.go deleted file mode 100644 index 495b5ea3..00000000 --- a/operator/inventory/feature-discovery-client.go +++ /dev/null @@ -1,538 +0,0 @@ -package inventory - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "net/http" - "net/url" - "os" - "slices" - "sort" - "strings" - "time" - - inventoryV1 "github.com/akash-network/akash-api/go/inventory/v1" - "github.com/go-logr/logr" - "github.com/troian/pubsub" - "golang.org/x/sync/errgroup" - "google.golang.org/grpc" - "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/credentials/insecure" - - "google.golang.org/protobuf/types/known/emptypb" - corev1 "k8s.io/api/core/v1" - storagev1 "k8s.io/api/storage/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - k8stypes "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/portforward" - "k8s.io/client-go/transport/spdy" - - "github.com/akash-network/provider/cluster/kube/builder" - "github.com/akash-network/provider/tools/fromctx" -) - -type nodeStateEnum int - -const ( - daemonSetNamespace = "akash-services" - reconnectTimeout = 5 * time.Second -) - -const ( - nodeStateUpdated nodeStateEnum = iota - nodeStateRemoved -) - -type k8sPatch struct { - Op string `json:"op"` - Path string `json:"path"` - Value interface{} `json:"value"` -} - -type podStream struct { - ctx context.Context - cancel context.CancelFunc - group *errgroup.Group - log logr.Logger - nodeName string - address string - port uint16 -} - -type nodeState struct { - state nodeStateEnum - name string - node inventoryV1.Node -} - -type featureDiscovery struct { - querierNodes - ctx context.Context - group *errgroup.Group - log logr.Logger - kc *kubernetes.Clientset -} - -func newFeatureDiscovery(ctx context.Context) *featureDiscovery { - log := fromctx.LogrFromCtx(ctx).WithName("feature-discovery") - - group, ctx := errgroup.WithContext(ctx) - - fd := &featureDiscovery{ - log: log, - ctx: logr.NewContext(ctx, log), - group: group, - kc: fromctx.KubeClientFromCtx(ctx), - querierNodes: newQuerierNodes(), - } - - group.Go(fd.connectorRun) - group.Go(fd.run) - group.Go(fd.nodeLabeler) - - return fd -} - -func (fd *featureDiscovery) Wait() error { - return fd.group.Wait() -} - -func (fd *featureDiscovery) connectorRun() error { - watcher, err := fd.kc.CoreV1().Pods(daemonSetNamespace).Watch(fd.ctx, metav1.ListOptions{ - LabelSelector: "app.kubernetes.io/name=inventory" + - ",app.kubernetes.io/instance=inventory-node" + - ",app.kubernetes.io/component=operator" + - ",app.kubernetes.io/part-of=provider", - }) - - if err != nil { - return fmt.Errorf("error setting up Kubernetes watcher: %w", err) - } - - nodes := make(map[string]*podStream) - - for { - select { - case <-fd.ctx.Done(): - for _, nd := range nodes { - nd.cancel() - delete(nodes, nd.nodeName) - } - - return fd.ctx.Err() - case event := <-watcher.ResultChan(): - if obj, valid := event.Object.(*corev1.Pod); valid { - nodeName := obj.Spec.NodeName - - switch event.Type { - case watch.Added: - fallthrough - case watch.Modified: - if obj.Status.Phase == corev1.PodRunning && obj.Status.PodIP != "" { - if _, exists := nodes[nodeName]; exists { - continue - } - - var containerPort uint16 - - for _, container := range obj.Spec.Containers { - if container.Name == fdContainerName { - for _, port := range container.Ports { - if port.Name == fdContainerGRPCPortName { - containerPort = uint16(port.ContainerPort) - break - } - } - break - } - } - - nodes[nodeName], err = newNodeWatcher(fd.ctx, nodeName, obj.Name, obj.Status.PodIP, containerPort) - if err != nil { - return err - } - } - case watch.Deleted: - nd, exists := nodes[nodeName] - if !exists { - continue - } - - nd.cancel() - delete(nodes, nodeName) - } - } - } - } -} - -func (fd *featureDiscovery) run() error { - nodes := make(map[string]inventoryV1.Node) - - snapshot := func() inventoryV1.Nodes { - res := make(inventoryV1.Nodes, 0, len(nodes)) - - for _, nd := range nodes { - res = append(res, nd.Dup()) - } - - sort.Sort(res) - - return res - } - - bus := fromctx.PubSubFromCtx(fd.ctx) - - events := bus.Sub(topicNodeState) - defer bus.Unsub(events) - for { - select { - case <-fd.ctx.Done(): - return fd.ctx.Err() - case revt := <-events: - switch evt := revt.(type) { - case nodeState: - switch evt.state { - case nodeStateUpdated: - nodes[evt.name] = evt.node - case nodeStateRemoved: - delete(nodes, evt.name) - } - - bus.Pub(snapshot(), []string{topicNodes}, pubsub.WithRetain()) - default: - } - case req := <-fd.reqch: - resp := respNodes{ - res: snapshot(), - } - - req.respCh <- resp - } - } -} - -func (fd *featureDiscovery) nodeLabeler() error { - bus := fromctx.PubSubFromCtx(fd.ctx) - log := fromctx.LogrFromCtx(fd.ctx) - - var unsubChs []<-chan interface{} - var eventsConfigBackup <-chan interface{} - var eventsBackup <-chan interface{} - var events <-chan interface{} - - eventsConfig := bus.Sub("config") - unsubChs = append(unsubChs, eventsConfig) - - configReloadCh := make(chan struct{}, 1) - - defer func() { - for _, ch := range unsubChs { - bus.Unsub(ch) - } - }() - - var cfg Config - - signalConfigReload := func() { - select { - case configReloadCh <- struct{}{}: - eventsConfigBackup = eventsConfig - eventsBackup = events - - events = nil - eventsConfig = nil - default: - } - } - - for { - select { - case <-fd.ctx.Done(): - return fd.ctx.Err() - case <-configReloadCh: - log.Info("received signal to rebuild config. invalidating all inventory and restarting query process") - fd.reloadConfig(cfg) - - events = eventsBackup - eventsConfig = eventsConfigBackup - - if events == nil { - events = bus.Sub("nodes", "sc") - unsubChs = append(unsubChs, events) - } - - case rawEvt := <-events: - if evt, valid := rawEvt.(watch.Event); valid { - signal := false - - switch evt.Object.(type) { - case *corev1.Node: - signal = (evt.Type == watch.Added) || (evt.Type == watch.Modified) - case *storagev1.StorageClass: - signal = evt.Type == watch.Deleted - } - - if signal { - signalConfigReload() - } - } - case evt := <-eventsConfig: - log.Info("received config update") - - cfg = evt.(Config) - signalConfigReload() - } - } -} - -func isNodeAkashLabeled(labels map[string]string) bool { - _, exists := labels[builder.AkashManagedLabelName] - - return exists -} - -func isNodeReady(conditions []corev1.NodeCondition) bool { - for _, c := range conditions { - if c.Type == corev1.NodeReady { - return c.Status == "True" - } - } - - return false -} - -func (fd *featureDiscovery) reloadConfig(cfg Config) { - log := fromctx.LogrFromCtx(fd.ctx) - - adjConfig := cfg.Copy() - - nodesList, _ := fd.kc.CoreV1().Nodes().List(fd.ctx, metav1.ListOptions{}) - - scList, _ := fd.kc.StorageV1().StorageClasses().List(fd.ctx, metav1.ListOptions{}) - - presentSc := make([]string, 0, len(scList.Items)) - for _, sc := range scList.Items { - presentSc = append(presentSc, sc.Name) - } - - sort.Strings(presentSc) - - adjConfig.FilterOutStorageClasses(presentSc) - patches := make(map[string][]k8sPatch) - - for _, node := range nodesList.Items { - var p []k8sPatch - - isExcluded := !isNodeReady(node.Status.Conditions) || node.Spec.Unschedulable || adjConfig.Exclude.IsNodeExcluded(node.Name) - - // node is currently labeled for akash inventory but is excluded from config - if isNodeAkashLabeled(node.Labels) && isExcluded { - delete(node.Labels, builder.AkashManagedLabelName) - delete(node.Annotations, AnnotationKeyCapabilities) - - p = append(p, k8sPatch{ - Op: "add", - Path: "/metadata/labels", - Value: node.Labels, - }) - p = append(p, k8sPatch{ - Op: "add", - Path: "/metadata/annotations", - Value: node.Annotations, - }) - log.Info(fmt.Sprintf("node \"%s\" has matching exclude rule. removing from intentory", node.Name)) - } else if !isNodeAkashLabeled(node.Labels) && !isExcluded { - node.Labels[builder.AkashManagedLabelName] = "true" - p = append(p, k8sPatch{ - Op: "add", - Path: "/metadata/labels", - Value: node.Labels, - }) - log.Info(fmt.Sprintf("node \"%s\" is being added to intentory", node.Name)) - } - - if !isExcluded { - var op string - caps, _ := parseNodeCapabilities(node.Annotations) - if caps.Capabilities == nil { - op = "add" - caps = NewAnnotationCapabilities(adjConfig.ClusterStorage) - } else { - sc := adjConfig.StorageClassesForNode(node.Name) - switch obj := caps.Capabilities.(type) { - case *CapabilitiesV1: - if !slices.Equal(sc, obj.StorageClasses) { - op = "add" - obj.StorageClasses = sc - } - default: - } - } - - if op != "" { - data, _ := json.Marshal(caps) - node.Annotations[AnnotationKeyCapabilities] = string(data) - - p = append(p, k8sPatch{ - Op: "add", - Path: "/metadata/annotations", - Value: node.Annotations, - }) - } - } - - if len(p) > 0 { - patches[node.Name] = p - } - } - - for node, p := range patches { - data, _ := json.Marshal(p) - - _, err := fd.kc.CoreV1().Nodes().Patch(fd.ctx, node, k8stypes.JSONPatchType, data, metav1.PatchOptions{}) - if err != nil { - log.Error(err, fmt.Sprintf("couldn't apply patches for node \"%s\"", node)) - } else { - log.Info(fmt.Sprintf("successfully applied labels and/or annotations patches for node \"%s\"", node)) - log.Info(fmt.Sprintf("node %s: %s", node, string(data))) - } - } -} - -func newNodeWatcher(ctx context.Context, nodeName string, podName string, address string, port uint16) (*podStream, error) { - ctx, cancel := context.WithCancel(ctx) - group, ctx := errgroup.WithContext(ctx) - - ps := &podStream{ - ctx: ctx, - cancel: cancel, - group: group, - log: fromctx.LogrFromCtx(ctx).WithName("node-watcher"), - nodeName: nodeName, - address: address, - port: port, - } - - kubecfg := fromctx.KubeConfigFromCtx(ctx) - - if kubecfg.BearerTokenFile != "/var/run/secrets/kubernetes.io/serviceaccount/token" { - roundTripper, upgrader, err := spdy.RoundTripperFor(kubecfg) - if err != nil { - return nil, err - } - - path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", daemonSetNamespace, podName) - hostIP := strings.TrimPrefix(kubecfg.Host, "https://") - serverURL := url.URL{Scheme: "https", Path: path, Host: hostIP} - - dialer := spdy.NewDialer(upgrader, &http.Client{Transport: roundTripper}, http.MethodPost, &serverURL) - - errch := make(chan error, 1) - pf, err := portforward.New(dialer, []string{fmt.Sprintf(":%d", port)}, ctx.Done(), make(chan struct{}), os.Stdout, os.Stderr) - if err != nil { - return nil, err - } - - group.Go(func() error { - err := pf.ForwardPorts() - errch <- err - return err - }) - - select { - case <-pf.Ready: - case err := <-errch: - return nil, err - } - - ports, err := pf.GetPorts() - if err != nil { - return nil, err - } - - ps.address = "localhost" - ps.port = ports[0].Local - } - - go ps.run() - - return ps, nil -} - -func (nd *podStream) run() { - // Establish the gRPC connection - conn, err := grpc.Dial(fmt.Sprintf("%s:%d", nd.address, nd.port), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) - if err != nil { - nd.log.Error(err, "couldn't dial endpoint") - return - } - - defer func() { - _ = conn.Close() - }() - - nd.log.Info(fmt.Sprintf("(connected to node's \"%s\" inventory streamer at %s:%d", nd.nodeName, nd.address, nd.port)) - - client := inventoryV1.NewNodeRPCClient(conn) - - var stream inventoryV1.NodeRPC_StreamNodeClient - - pub := fromctx.PubSubFromCtx(nd.ctx) - - for { - for stream == nil { - conn.Connect() - - if state := conn.GetState(); state != connectivity.Ready { - if !conn.WaitForStateChange(nd.ctx, connectivity.Ready) { - return - } - } - - // do not replace empty argument with nil. stream will panic - stream, err = client.StreamNode(nd.ctx, &emptypb.Empty{}) - if err != nil { - if errors.Is(err, context.Canceled) { - return - } - - nd.log.Error(err, "couldn't establish stream") - - tctx, tcancel := context.WithTimeout(nd.ctx, 2*time.Second) - <-tctx.Done() - tcancel() - - if !errors.Is(tctx.Err(), context.DeadlineExceeded) { - return - } - } - } - - node, err := stream.Recv() - if err != nil { - pub.Pub(nodeState{ - state: nodeStateRemoved, - name: nd.nodeName, - }, []string{topicNodeState}) - - stream = nil - - if errors.Is(err, context.Canceled) { - return - } - - conn.ResetConnectBackoff() - } else { - pub.Pub(nodeState{ - state: nodeStateUpdated, - name: nd.nodeName, - node: node.Dup(), - }, []string{topicNodeState}) - } - } -} diff --git a/operator/inventory/feature-discovery-node.go b/operator/inventory/feature-discovery-node.go deleted file mode 100644 index 901113dd..00000000 --- a/operator/inventory/feature-discovery-node.go +++ /dev/null @@ -1,1190 +0,0 @@ -package inventory - -import ( - "bytes" - "crypto/sha256" - "crypto/tls" - "encoding/json" - "errors" - "fmt" - "io" - "net" - "net/http" - "strconv" - "strings" - "time" - - "github.com/go-logr/logr" - "github.com/gorilla/mux" - "github.com/jaypipes/ghw/pkg/cpu" - "github.com/jaypipes/ghw/pkg/gpu" - "github.com/jaypipes/ghw/pkg/memory" - "github.com/spf13/cobra" - "github.com/spf13/viper" - "github.com/troian/pubsub" - "golang.org/x/net/context" - "google.golang.org/grpc" - "google.golang.org/grpc/keepalive" - "google.golang.org/grpc/reflection" - "google.golang.org/protobuf/types/known/emptypb" - corev1 "k8s.io/api/core/v1" - kerrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/watch" - - v1 "github.com/akash-network/akash-api/go/inventory/v1" - - "github.com/akash-network/provider/cluster/kube/builder" - "github.com/akash-network/provider/tools/fromctx" -) - -const ( - fdContainerName = "inventory-node" - fdContainerRESTPortName = "api" - fdContainerGRPCPortName = "grpc" - topicNode = "node" - topicNodeState = "node-state" - topicNodes = "nodes" - topicStorage = "storage" - topicConfig = "config" - topicClusterState = "cluster-state" - topicGPUIDs = "gpu-ids" -) - -type dpReqType int - -const ( - dpReqCPU dpReqType = iota - dpReqGPU - dpReqMem -) - -type dpReadResp struct { - data interface{} - err error -} -type dpReadReq struct { - op dpReqType - resp chan<- dpReadResp -} - -type debuggerPod struct { - ctx context.Context - readch chan dpReadReq -} - -type nodeRouter struct { - *mux.Router - queryTimeout time.Duration -} - -type grpcNodeServer struct { - v1.NodeRPCServer - ctx context.Context - log logr.Logger - sub pubsub.Subscriber - reqch chan<- chan<- v1.Node -} - -type fdNodeServer struct { - ctx context.Context - log logr.Logger - reqch <-chan chan<- v1.Node - pub pubsub.Publisher - nodeName string -} - -func cmdFeatureDiscoveryNode() *cobra.Command { - cmd := &cobra.Command{ - Use: "node", - Short: "feature discovery daemon-set node", - Args: cobra.ExactArgs(0), - SilenceUsage: true, - PreRunE: func(cmd *cobra.Command, args []string) error { - kubecfg := fromctx.KubeConfigFromCtx(cmd.Context()) - - var hw hwInfo - - log := fromctx.LogrFromCtx(cmd.Context()) - - if kubecfg.BearerTokenFile != "/var/run/secrets/kubernetes.io/serviceaccount/token" { - log.Info("service is not running as kubernetes pod. starting debugger pod") - - dp := &debuggerPod{ - ctx: cmd.Context(), - readch: make(chan dpReadReq, 1), - } - - group := fromctx.ErrGroupFromCtx(cmd.Context()) - - startch := make(chan struct{}) - - group.Go(func() error { - return dp.run(startch) - }) - - ctx, cancel := context.WithTimeout(cmd.Context(), 5*time.Second) - - select { - case <-ctx.Done(): - if !errors.Is(ctx.Err(), context.DeadlineExceeded) { - return ctx.Err() - } - case <-startch: - cancel() - } - - hw = dp - } else { - hw = &localHwReader{} - } - - fromctx.CmdSetContextValue(cmd, CtxKeyHwInfo, hw) - - return nil - }, - RunE: func(cmd *cobra.Command, args []string) error { - ctx := cmd.Context() - log := fromctx.LogrFromCtx(ctx) - - log.Info("starting k8s node features discovery") - - var err error - - podName := viper.GetString(FlagPodName) - nodeName := viper.GetString(FlagNodeName) - - restPort := viper.GetUint16(FlagRESTPort) - grpcPort := viper.GetUint16(FlagGRPCPort) - - apiTimeout := viper.GetDuration(FlagAPITimeout) - queryTimeout := viper.GetDuration(FlagQueryTimeout) - - kc := fromctx.KubeClientFromCtx(ctx) - - if grpcPort == 0 { - // this is dirty hack to discover exposed api port if this service runs within kubernetes - podInfo, err := kc.CoreV1().Pods(daemonSetNamespace).Get(ctx, podName, metav1.GetOptions{}) - if err != nil { - return err - } - - for _, container := range podInfo.Spec.Containers { - if container.Name == fdContainerName { - for _, port := range container.Ports { - if port.Name == fdContainerGRPCPortName { - grpcPort = uint16(port.ContainerPort) - } - } - } - } - - if grpcPort == 0 { - return fmt.Errorf("unable to detect pod's grpc port") // nolint: goerr113 - } - } - - if restPort == 0 { - // this is dirty hack to discover exposed api port if this service runs within kubernetes - podInfo, err := kc.CoreV1().Pods(daemonSetNamespace).Get(ctx, podName, metav1.GetOptions{}) - if err != nil { - return err - } - - for _, container := range podInfo.Spec.Containers { - if container.Name == fdContainerName { - for _, port := range container.Ports { - if port.Name == fdContainerRESTPortName { - restPort = uint16(port.ContainerPort) - } - } - } - } - - if grpcPort == 0 { - return fmt.Errorf("unable to detect pod's grpc port") // nolint: goerr113 - } - } - - restEndpoint := fmt.Sprintf(":%d", restPort) - grpcEndpoint := fmt.Sprintf(":%d", grpcPort) - - restSrv := &http.Server{ - Addr: restEndpoint, - Handler: newNodeRouter(apiTimeout, queryTimeout), - BaseContext: func(_ net.Listener) context.Context { - return ctx - }, - ReadHeaderTimeout: 5 * time.Second, - ReadTimeout: 60 * time.Second, - } - - bus := fromctx.PubSubFromCtx(cmd.Context()) - - grpcSrv := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ - MinTime: 30 * time.Second, - PermitWithoutStream: false, - })) - - reqch := make(chan chan<- v1.Node, 1) - - v1.RegisterNodeRPCServer(grpcSrv, &grpcNodeServer{ - ctx: ctx, - log: log.WithName("msg-srv"), - sub: bus, - reqch: reqch, - }) - - reflection.Register(grpcSrv) - - group := fromctx.ErrGroupFromCtx(ctx) - - fdns := &fdNodeServer{ - ctx: ctx, - log: log.WithName("watcher"), - reqch: reqch, - pub: bus, - nodeName: nodeName, - } - - group.Go(func() error { - return registryLoader(ctx) - }) - - startch := make(chan struct{}, 1) - group.Go(func() error { - defer func() { - log.Info("node discovery stopped") - }() - return fdns.run(startch) - }) - - select { - case <-startch: - group.Go(func() error { - defer func() { - log.Info("grpc server stopped") - }() - - log.Info(fmt.Sprintf("grpc listening on \"%s\"", grpcEndpoint)) - - lis, err := net.Listen("tcp", grpcEndpoint) - if err != nil { - return err - } - - return grpcSrv.Serve(lis) - }) - case <-ctx.Done(): - return ctx.Err() - } - - group.Go(func() error { - log.Info(fmt.Sprintf("rest listening on \"%s\"", restEndpoint)) - - return restSrv.ListenAndServe() - }) - - group.Go(func() error { - <-ctx.Done() - log.Info("received shutdown signal") - - err := restSrv.Shutdown(context.Background()) - - grpcSrv.GracefulStop() - - if err == nil { - err = ctx.Err() - } - - return err - }) - - fromctx.StartupChFromCtx(ctx) <- struct{}{} - err = group.Wait() - - if !errors.Is(err, context.Canceled) { - return err - } - - return nil - }, - } - - cmd.Flags().String(FlagPodName, "", "instance name") - if err := viper.BindPFlag(FlagPodName, cmd.Flags().Lookup(FlagPodName)); err != nil { - panic(err) - } - - cmd.Flags().String(FlagNodeName, "", "node name") - if err := viper.BindPFlag(FlagNodeName, cmd.Flags().Lookup(FlagNodeName)); err != nil { - panic(err) - } - - cmd.Flags().String(FlagProviderConfigsURL, defaultProviderConfigsURL, "provider configs server") - if err := viper.BindPFlag(FlagProviderConfigsURL, cmd.Flags().Lookup(FlagProviderConfigsURL)); err != nil { - panic(err) - } - - cmd.Flags().String(FlagPciDbURL, "https://pci-ids.ucw.cz/v2.2/pci.ids", "query period for registry changes") - if err := viper.BindPFlag(FlagPciDbURL, cmd.Flags().Lookup(FlagPciDbURL)); err != nil { - panic(err) - } - - return cmd -} - -func newNodeRouter(apiTimeout, queryTimeout time.Duration) *nodeRouter { - mRouter := mux.NewRouter() - rt := &nodeRouter{ - Router: mRouter, - queryTimeout: queryTimeout, - } - - mRouter.Use(func(h http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - rCtx, cancel := context.WithTimeout(r.Context(), apiTimeout) - defer cancel() - - h.ServeHTTP(w, r.WithContext(rCtx)) - }) - }) - - metricsRouter := mRouter.PathPrefix("/metrics").Subrouter() - metricsRouter.HandleFunc("/health", rt.healthHandler).GetHandler() - metricsRouter.HandleFunc("/ready", rt.readyHandler) - - return rt -} - -func (rt *nodeRouter) healthHandler(w http.ResponseWriter, req *http.Request) { - var err error - - defer func() { - code := http.StatusOK - - if err != nil { - if errors.Is(err, ErrMetricsUnsupportedRequest) { - code = http.StatusBadRequest - } else { - code = http.StatusInternalServerError - } - } - - w.WriteHeader(code) - }() - - if req.Method != "" && req.Method != http.MethodGet { - err = ErrMetricsUnsupportedRequest - return - } - - return -} - -func (rt *nodeRouter) readyHandler(w http.ResponseWriter, req *http.Request) { - var err error - - defer func() { - code := http.StatusOK - - if err != nil { - if errors.Is(err, ErrMetricsUnsupportedRequest) { - code = http.StatusBadRequest - } else { - code = http.StatusInternalServerError - } - } - - w.WriteHeader(code) - }() - - if req.Method != "" && req.Method != http.MethodGet { - err = ErrMetricsUnsupportedRequest - return - } - - return -} - -func (nd *fdNodeServer) run(startch chan<- struct{}) error { - kc := fromctx.KubeClientFromCtx(nd.ctx) - bus := fromctx.PubSubFromCtx(nd.ctx) - - subEvents := bus.Sub(topicGPUIDs) - defer bus.Unsub(subEvents) - - nodeWatch, err := kc.CoreV1().Nodes().Watch(nd.ctx, metav1.ListOptions{ - LabelSelector: builder.AkashManagedLabelName + "=true", - FieldSelector: fields.OneTermEqualSelector(metav1.ObjectNameField, nd.nodeName).String(), - }) - if err != nil { - nd.log.Error(err, fmt.Sprintf("unable to watch node \"%s\"", nd.nodeName)) - return err - } - - defer nodeWatch.Stop() - - podsWatch, err := kc.CoreV1().Pods(corev1.NamespaceAll).Watch(nd.ctx, metav1.ListOptions{ - FieldSelector: fields.OneTermEqualSelector("spec.nodeName", nd.nodeName).String(), - }) - if err != nil { - nd.log.Error(err, "unable to watch start pods") - return err - } - - defer podsWatch.Stop() - - gpusIDs := make(RegistryGPUVendors) - - select { - case evt := <-subEvents: - gpusIDs = evt.(RegistryGPUVendors) - default: - } - - node, initPods, err := initNodeInfo(nd.ctx, nd.nodeName, gpusIDs) - if err != nil { - nd.log.Error(err, "unable to init node info") - return err - } - - select { - case <-nd.ctx.Done(): - return nd.ctx.Err() - case startch <- struct{}{}: - } - - signalch := make(chan struct{}, 1) - signalch <- struct{}{} - - trySignal := func() { - select { - case signalch <- struct{}{}: - default: - } - } - - trySignal() - - for { - select { - case <-nd.ctx.Done(): - return nd.ctx.Err() - case <-signalch: - nd.pub.Pub(node.Dup(), []string{topicNode}, pubsub.WithRetain()) - case req := <-nd.reqch: - req <- node.Dup() - case evt := <-subEvents: - gpusIDs = evt.(RegistryGPUVendors) - node.Resources.GPU.Info, _ = parseGPUInfo(nd.ctx, gpusIDs) - case res := <-nodeWatch.ResultChan(): - obj := res.Object.(*corev1.Node) - switch res.Type { - case watch.Added: - fallthrough - case watch.Modified: - caps, _ := parseNodeCapabilities(obj.Annotations) - switch obj := caps.Capabilities.(type) { - case *CapabilitiesV1: - node.Capabilities.StorageClasses = obj.StorageClasses - default: - } - } - trySignal() - case res := <-podsWatch.ResultChan(): - obj := res.Object.(*corev1.Pod) - switch res.Type { - case watch.Added: - if _, exists := initPods[obj.Name]; exists { - delete(initPods, obj.Name) - } else { - for _, container := range obj.Spec.Containers { - addAllocatedResources(&node, container.Resources.Requests) - } - } - case watch.Deleted: - delete(initPods, obj.Name) - - for _, container := range obj.Spec.Containers { - subAllocatedResources(&node, container.Resources.Requests) - } - } - - trySignal() - } - } -} - -func addAllocatedResources(node *v1.Node, rl corev1.ResourceList) { - for name, quantity := range rl { - switch name { - case corev1.ResourceCPU: - node.Resources.CPU.Quantity.Allocated.Add(quantity) - case corev1.ResourceMemory: - node.Resources.Memory.Quantity.Allocated.Add(quantity) - case corev1.ResourceEphemeralStorage: - node.Resources.EphemeralStorage.Allocated.Add(quantity) - case builder.ResourceGPUNvidia: - fallthrough - case builder.ResourceGPUAMD: - node.Resources.GPU.Quantity.Allocated.Add(quantity) - } - } -} - -func subAllocatedResources(node *v1.Node, rl corev1.ResourceList) { - for name, quantity := range rl { - switch name { - case corev1.ResourceCPU: - node.Resources.CPU.Quantity.Allocated.Sub(quantity) - case corev1.ResourceMemory: - node.Resources.Memory.Quantity.Allocated.Sub(quantity) - case corev1.ResourceEphemeralStorage: - node.Resources.EphemeralStorage.Allocated.Sub(quantity) - case builder.ResourceGPUNvidia: - fallthrough - case builder.ResourceGPUAMD: - node.Resources.GPU.Quantity.Allocated.Sub(quantity) - } - } -} - -func initNodeInfo(ctx context.Context, name string, gpusIds RegistryGPUVendors) (v1.Node, map[string]corev1.Pod, error) { - defer func() { - if r := recover(); r != nil { - fromctx.LogrFromCtx(ctx).Info(fmt.Sprintf("recovered from panic: %s", r)) - } - }() - kc := fromctx.KubeClientFromCtx(ctx) - - cpuInfo, err := parseCPUInfo(ctx) - if err != nil { - return v1.Node{}, nil, err - } - - gpuInfo, err := parseGPUInfo(ctx, gpusIds) - if err != nil { - return v1.Node{}, nil, err - } - - knode, err := kc.CoreV1().Nodes().Get(ctx, name, metav1.GetOptions{}) - if err != nil { - return v1.Node{}, nil, fmt.Errorf("%w: error fetching node %s", err, name) - } - - caps, err := parseNodeCapabilities(knode.Annotations) - if err != nil { - return v1.Node{}, nil, fmt.Errorf("%w: parsing capabilities for node%s", err, name) - } - - res := v1.Node{ - Name: knode.Name, - Resources: v1.NodeResources{ - CPU: v1.CPU{ - Quantity: v1.NewResourcePairMilli(0, 0, resource.DecimalSI), - Info: cpuInfo, - }, - GPU: v1.GPU{ - Quantity: v1.NewResourcePair(0, 0, resource.DecimalSI), - Info: gpuInfo, - }, - Memory: v1.Memory{ - Quantity: v1.NewResourcePair(0, 0, resource.DecimalSI), - Info: nil, - }, - EphemeralStorage: v1.NewResourcePair(0, 0, resource.DecimalSI), - VolumesAttached: v1.NewResourcePair(0, 0, resource.DecimalSI), - VolumesMounted: v1.NewResourcePair(0, 0, resource.DecimalSI), - }, - } - - switch obj := caps.Capabilities.(type) { - case *CapabilitiesV1: - res.Capabilities.StorageClasses = obj.StorageClasses - default: - } - - for name, r := range knode.Status.Allocatable { - switch name { - case corev1.ResourceCPU: - res.Resources.CPU.Quantity.Allocatable.SetMilli(r.MilliValue()) - case corev1.ResourceMemory: - res.Resources.Memory.Quantity.Allocatable.Set(r.Value()) - case corev1.ResourceEphemeralStorage: - res.Resources.EphemeralStorage.Allocatable.Set(r.Value()) - case builder.ResourceGPUNvidia: - fallthrough - case builder.ResourceGPUAMD: - res.Resources.GPU.Quantity.Allocatable.Set(r.Value()) - } - } - - initPods := make(map[string]corev1.Pod) - - podsList, err := kc.CoreV1().Pods(corev1.NamespaceAll).List(ctx, metav1.ListOptions{ - FieldSelector: fields.OneTermEqualSelector("spec.nodeName", name).String(), - }) - if err != nil { - return res, nil, err - } - - if podsList == nil { - return res, initPods, nil - } - - for _, pod := range podsList.Items { - for _, container := range pod.Spec.Containers { - if container.Resources.Requests != nil { - addAllocatedResources(&res, container.Resources.Requests) - } else if container.Resources.Limits != nil { - addAllocatedResources(&res, container.Resources.Limits) - } - } - initPods[pod.Name] = pod - } - - return res, initPods, nil -} - -func (s *grpcNodeServer) QueryNode(ctx context.Context, _ *emptypb.Empty) (*v1.Node, error) { - reqch := make(chan v1.Node, 1) - - select { - case <-s.ctx.Done(): - return nil, s.ctx.Err() - case <-ctx.Done(): - return nil, ctx.Err() - case s.reqch <- reqch: - } - - select { - case <-s.ctx.Done(): - return nil, s.ctx.Err() - case <-ctx.Done(): - return nil, ctx.Err() - case req := <-reqch: - return &req, nil - } -} - -func (s *grpcNodeServer) StreamNode(_ *emptypb.Empty, stream v1.NodeRPC_StreamNodeServer) error { - subch := s.sub.Sub(topicNode) - - defer func() { - s.sub.Unsub(subch, topicNode) - }() - - for { - select { - case <-s.ctx.Done(): - return s.ctx.Err() - case <-stream.Context().Done(): - return stream.Context().Err() - case nd := <-subch: - switch msg := nd.(type) { - case v1.Node: - if err := stream.Send(&msg); err != nil { - return err - } - default: - } - } - } -} - -type hwInfo interface { - CPU(context.Context) (*cpu.Info, error) - GPU(context.Context) (*gpu.Info, error) - Memory(context.Context) (*memory.Info, error) -} - -type localHwReader struct{} - -func (lfs *localHwReader) CPU(_ context.Context) (*cpu.Info, error) { - return cpu.New() -} - -func (lfs *localHwReader) GPU(_ context.Context) (*gpu.Info, error) { - return gpu.New() -} - -func (lfs *localHwReader) Memory(_ context.Context) (*memory.Info, error) { - return memory.New() -} - -func parseCPUInfo(ctx context.Context) (v1.CPUInfoS, error) { - if err := ctx.Err(); err != nil { - return nil, err - } - - hw := HWInfoFromCtx(ctx) - - cpus, err := hw.CPU(ctx) - if err != nil { - return nil, err - } - - res := make(v1.CPUInfoS, 0, len(cpus.Processors)) - - for _, c := range cpus.Processors { - res = append(res, v1.CPUInfo{ - ID: strconv.Itoa(c.ID), - Vendor: c.Vendor, - Model: c.Model, - Vcores: c.NumThreads, - }) - } - - return res, nil -} - -func parseGPUInfo(ctx context.Context, info RegistryGPUVendors) (v1.GPUInfoS, error) { - defer func() { - if r := recover(); r != nil { - fromctx.LogrFromCtx(ctx).Info(fmt.Sprintf("recovered from panic: %s", r)) - } - }() - - res := make(v1.GPUInfoS, 0) - - if err := ctx.Err(); err != nil { - return res, err - } - - hw := HWInfoFromCtx(ctx) - gpus, err := hw.GPU(ctx) - if err != nil { - return res, err - } - - if gpus == nil { - return res, nil - } - - for _, dev := range gpus.GraphicsCards { - dinfo := dev.DeviceInfo - if dinfo == nil { - continue - } - - vinfo := dinfo.Vendor - pinfo := dinfo.Product - if vinfo == nil || pinfo == nil { - continue - } - - vendor, exists := info[vinfo.ID] - if !exists { - continue - } - - model, exists := vendor.Devices[pinfo.ID] - if !exists { - continue - } - - res = append(res, v1.GPUInfo{ - Vendor: vendor.Name, - VendorID: dev.DeviceInfo.Vendor.ID, - Name: model.Name, - ModelID: dev.DeviceInfo.Product.ID, - Interface: model.Interface, - MemorySize: model.MemorySize, - }) - } - - return res, nil -} - -func (dp *debuggerPod) CPU(ctx context.Context) (*cpu.Info, error) { - respch := make(chan dpReadResp, 1) - - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-dp.ctx.Done(): - return nil, dp.ctx.Err() - case dp.readch <- dpReadReq{ - op: dpReqCPU, - resp: respch, - }: - } - - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-dp.ctx.Done(): - return nil, dp.ctx.Err() - case resp := <-respch: - return resp.data.(*cpu.Info), resp.err - } -} - -func (dp *debuggerPod) GPU(ctx context.Context) (*gpu.Info, error) { - respch := make(chan dpReadResp, 1) - - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-dp.ctx.Done(): - return nil, dp.ctx.Err() - case dp.readch <- dpReadReq{ - op: dpReqGPU, - resp: respch, - }: - } - - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-dp.ctx.Done(): - return nil, dp.ctx.Err() - case resp := <-respch: - return resp.data.(*gpu.Info), resp.err - } -} - -func (dp *debuggerPod) Memory(ctx context.Context) (*memory.Info, error) { - respch := make(chan dpReadResp, 1) - - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-dp.ctx.Done(): - return nil, dp.ctx.Err() - case dp.readch <- dpReadReq{ - op: dpReqMem, - resp: respch, - }: - } - - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-dp.ctx.Done(): - return nil, dp.ctx.Err() - case resp := <-respch: - return resp.data.(*memory.Info), resp.err - } -} - -func (dp *debuggerPod) run(startch chan<- struct{}) error { - log := fromctx.LogrFromCtx(dp.ctx) - - log.Info("staring debugger pod") - - req := &corev1.Pod{ - TypeMeta: metav1.TypeMeta{ - Kind: "Pod", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "fd-debugger-pod", - }, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "psutil", - Image: "ghcr.io/akash-network/provider-test:latest-arm64", - Command: []string{ - "provider-services", - "operator", - "psutil", - "serve", - "--api-port=8081", - }, - Ports: []corev1.ContainerPort{ - { - Name: "api", - ContainerPort: 8081, - }, - }, - }, - }, - }, - } - - kc := fromctx.KubeClientFromCtx(dp.ctx) - - pod, err := kc.CoreV1().Pods(daemonSetNamespace).Create(dp.ctx, req, metav1.CreateOptions{}) - if err != nil && !kerrors.IsAlreadyExists(err) { - return err - } - - defer func() { - // using default context here to delete pod as main might have been canceled - _ = kc.CoreV1().Pods(daemonSetNamespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{}) - }() - - watcher, err := kc.CoreV1().Pods(daemonSetNamespace).Watch(dp.ctx, metav1.ListOptions{ - Watch: true, - ResourceVersion: pod.ResourceVersion, - FieldSelector: fields.Set{"metadata.name": pod.Name}.AsSelector().String(), - LabelSelector: labels.Everything().String(), - }) - - if err != nil { - return err - } - - defer func() { - watcher.Stop() - }() - - var apiPort int32 - - for _, container := range pod.Spec.Containers { - if container.Name == "psutil" { - for _, port := range container.Ports { - if port.Name == "api" { - apiPort = port.ContainerPort - } - } - } - } - - if apiPort == 0 { - return fmt.Errorf("debugger pod does not have port named \"api\"") // nolint: goerr113 - } - -initloop: - for { - select { - case <-dp.ctx.Done(): - return dp.ctx.Err() - case evt := <-watcher.ResultChan(): - resp := evt.Object.(*corev1.Pod) - if resp.Status.Phase != corev1.PodPending { - watcher.Stop() - startch <- struct{}{} - break initloop - } - } - } - - for { - select { - case <-dp.ctx.Done(): - return dp.ctx.Err() - case readreq := <-dp.readch: - var res string - resp := dpReadResp{} - - switch readreq.op { - case dpReqCPU: - res = "cpu" - case dpReqGPU: - res = "gpu" - case dpReqMem: - res = "memory" - } - - result := kc.CoreV1().RESTClient().Get(). - Namespace(daemonSetNamespace). - Resource("pods"). - Name(fmt.Sprintf("%s:%d", pod.Name, apiPort)). - SubResource("proxy"). - Suffix(res). - Do(dp.ctx) - - resp.err = result.Error() - - if resp.err == nil { - var data []byte - data, resp.err = result.Raw() - if resp.err == nil { - switch readreq.op { - case dpReqCPU: - var res cpu.Info - resp.err = json.Unmarshal(data, &res) - resp.data = &res - case dpReqGPU: - var res gpu.Info - resp.err = json.Unmarshal(data, &res) - resp.data = &res - case dpReqMem: - var res memory.Info - resp.err = json.Unmarshal(data, &res) - resp.data = &res - } - } - } - - readreq.resp <- resp - } - } -} - -// this function is piece of sh*t. refactor it! -func registryLoader(ctx context.Context) error { - log := fromctx.LogrFromCtx(ctx).WithName("registry-loader") - bus := fromctx.PubSubFromCtx(ctx) - - tlsConfig := http.DefaultTransport.(*http.Transport).TLSClientConfig - - cl := &http.Client{ - Transport: &http.Transport{ - DialTLSContext: func(ctx context.Context, network, addr string) (net.Conn, error) { - return tls.Dial(network, addr, tlsConfig) - }, - }, - } - - urlGPU := fmt.Sprintf("%s/devices/gpus", strings.TrimSuffix(viper.GetString(FlagProviderConfigsURL), "/")) - urlPcieDB := viper.GetString(FlagPciDbURL) - - var gpuCurrHash []byte - var pcidbHash []byte - - gpuIDs := make(RegistryGPUVendors) - - queryGPUs := func() bool { - res, err := cl.Get(urlGPU) - if err != nil { - log.Error(err, "couldn't query inventory registry") - return false - } - - defer func() { - _ = res.Body.Close() - }() - - if res.StatusCode != http.StatusOK { - return false - } - - gpus, err := io.ReadAll(res.Body) - if err != nil { - return false - } - - upstreamHash := sha256.New() - _, _ = upstreamHash.Write(gpus) - newHash := upstreamHash.Sum(nil) - - if bytes.Equal(gpuCurrHash, newHash) { - return false - } - - _ = json.Unmarshal(gpus, &gpuIDs) - - gpuCurrHash = newHash - - return true - } - - queryPCI := func() bool { - res, err := cl.Get(urlPcieDB) - if err != nil { - log.Error(err, "couldn't query pci.ids") - return false - } - - defer func() { - _ = res.Body.Close() - }() - - if res.StatusCode != http.StatusOK { - return false - } - - pcie, err := io.ReadAll(res.Body) - if err != nil { - return false - } - - upstreamHash := sha256.New() - _, _ = upstreamHash.Write(pcie) - newHash := upstreamHash.Sum(nil) - - if bytes.Equal(pcidbHash, newHash) { - return false - } - - pcidbHash = newHash - - return true - } - - queryGPUs() - bus.Pub(gpuIDs, []string{topicGPUIDs}) - - queryPeriod := viper.GetDuration(FlagRegistryQueryPeriod) - tmGPU := time.NewTimer(queryPeriod) - tmPCIe := time.NewTimer(24 * time.Hour) - - for { - select { - case <-ctx.Done(): - if !tmGPU.Stop() { - <-tmGPU.C - } - - if !tmPCIe.Stop() { - <-tmPCIe.C - } - - return ctx.Err() - case <-tmGPU.C: - if queryGPUs() { - bus.Pub(gpuIDs, []string{topicGPUIDs}) - } - tmGPU.Reset(queryPeriod) - case <-tmPCIe.C: - queryPCI() - - tmGPU.Reset(24 * time.Hour) - } - } -} - -// // ExecCmd exec command on specific pod and wait the command's output. -// func ExecCmd(ctx context.Context, podName string, command string, stdin io.Reader, stdout io.Writer, stderr io.Writer) error { -// kc := KubeClientFromCtx(ctx) -// cfg := KubeConfigFromCtx(ctx) -// -// cmd := []string{ -// "sh", -// "-c", -// command, -// } -// -// option := &corev1.PodExecOptions{ -// Command: cmd, -// Stdin: true, -// Stdout: true, -// Stderr: true, -// TTY: true, -// } -// if stdin == nil { -// option.Stdin = false -// } -// -// req := kc.CoreV1(). -// RESTClient(). -// Post(). -// Resource("pods"). -// Name(podName). -// Namespace(daemonSetNamespace). -// SubResource("exec"). -// VersionedParams(option, scheme.ParameterCodec) -// -// exec, err := remotecommand.NewSPDYExecutor(cfg, "POST", req.URL()) -// if err != nil { -// return err -// } -// err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ -// Stdin: stdin, -// Stdout: stdout, -// Stderr: stderr, -// }) -// if err != nil { -// return err -// } -// -// return nil -// } diff --git a/operator/inventory/node-discovery.go b/operator/inventory/node-discovery.go new file mode 100644 index 00000000..f322af6c --- /dev/null +++ b/operator/inventory/node-discovery.go @@ -0,0 +1,784 @@ +package inventory + +import ( + "context" + "encoding/json" + "fmt" + "reflect" + "sort" + "strconv" + "strings" + + "github.com/jaypipes/ghw/pkg/cpu" + "github.com/jaypipes/ghw/pkg/gpu" + "github.com/jaypipes/ghw/pkg/memory" + "golang.org/x/sync/errgroup" + corev1 "k8s.io/api/core/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + k8stypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + + v1 "github.com/akash-network/akash-api/go/inventory/v1" + + "github.com/akash-network/provider/cluster/kube/builder" + "github.com/akash-network/provider/tools/fromctx" +) + +type k8sPatch struct { + Op string `json:"op"` + Path string `json:"path"` + Value interface{} `json:"value"` +} + +type nodeDiscovery struct { + ctx context.Context + cancel context.CancelFunc + group *errgroup.Group + kc *kubernetes.Clientset + readch chan dpReadReq + readych chan struct{} + sig chan<- string + name string + namespace string + image string +} + +func newNodeDiscovery(ctx context.Context, name, namespace string, image string, sig chan<- string) *nodeDiscovery { + ctx, cancel := context.WithCancel(ctx) + group, ctx := errgroup.WithContext(ctx) + + nd := &nodeDiscovery{ + ctx: ctx, + cancel: cancel, + group: group, + kc: fromctx.KubeClientFromCtx(ctx), + readch: make(chan dpReadReq, 1), + readych: make(chan struct{}), + sig: sig, + name: name, + namespace: namespace, + image: image, + } + + group.Go(nd.apiConnector) + group.Go(nd.monitor) + + return nd +} + +func (dp *nodeDiscovery) shutdown() error { + dp.cancel() + + return dp.group.Wait() +} + +func (dp *nodeDiscovery) queryCPU(ctx context.Context) (*cpu.Info, error) { + respch := make(chan dpReadResp, 1) + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-dp.ctx.Done(): + return nil, dp.ctx.Err() + case dp.readch <- dpReadReq{ + op: dpReqCPU, + resp: respch, + }: + } + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-dp.ctx.Done(): + return nil, dp.ctx.Err() + case resp := <-respch: + if resp.data == nil { + return nil, resp.err + } + return resp.data.(*cpu.Info), resp.err + } +} + +func (dp *nodeDiscovery) queryGPU(ctx context.Context) (*gpu.Info, error) { + respch := make(chan dpReadResp, 1) + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-dp.ctx.Done(): + return nil, dp.ctx.Err() + case dp.readch <- dpReadReq{ + op: dpReqGPU, + resp: respch, + }: + } + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-dp.ctx.Done(): + return nil, dp.ctx.Err() + case resp := <-respch: + if resp.data == nil { + return nil, resp.err + } + return resp.data.(*gpu.Info), resp.err + } +} + +func (dp *nodeDiscovery) apiConnector() error { + defer func() { + dp.sig <- dp.name + }() + + log := fromctx.LogrFromCtx(dp.ctx).WithName("node.discovery") + + log.Info("starting hardware discovery pod", "node", dp.name) + + apiPort := 8081 + + name := fmt.Sprintf("operator-inventory-hardware-discovery-%s", dp.name) + req := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: dp.namespace, + Labels: map[string]string{ + "app.kubernetes.io/name": "inventory", + "app.kubernetes.io/instance": "inventory-hardware-discovery", + "app.kubernetes.io/component": "operator", + "app.kubernetes.io/part-of": "provider", + }, + }, + Spec: corev1.PodSpec{ + NodeName: dp.name, + ServiceAccountName: "operator-inventory-hardware-discovery", + Containers: []corev1.Container{ + { + Name: "psutil", + Image: dp.image, + Command: []string{ + "provider-services", + "operator", + "psutil", + "serve", + fmt.Sprintf("--api-port=%d", apiPort), + }, + Ports: []corev1.ContainerPort{ + { + Name: "api", + ContainerPort: 8081, + }, + }, + Env: []corev1.EnvVar{ + { + Name: "PCIDB_ENABLE_NETWORK_FETCH", + Value: "1", + }, + }, + }, + }, + }, + } + + kc := fromctx.KubeClientFromCtx(dp.ctx) + + pod, err := kc.CoreV1().Pods(dp.namespace).Create(dp.ctx, req, metav1.CreateOptions{}) + if err != nil && !kerrors.IsAlreadyExists(err) { + log.Error(err, fmt.Sprintf("unable to start discovery pod on node \"%s\"", dp.name)) + return err + } + + defer func() { + // using default context here to delete pod as main might have been canceled + log.Info("shutting down hardware discovery pod", "node", dp.name) + _ = kc.CoreV1().Pods(dp.namespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{}) + }() + + watcher, err := kc.CoreV1().Pods(dp.namespace).Watch(dp.ctx, metav1.ListOptions{ + Watch: true, + ResourceVersion: pod.ResourceVersion, + FieldSelector: fields.Set{ + "metadata.name": pod.Name, + "spec.nodeName": pod.Spec.NodeName}.AsSelector().String(), + LabelSelector: "app.kubernetes.io/name=inventory" + + ",app.kubernetes.io/instance=inventory-hardware-discovery" + + ",app.kubernetes.io/component=operator" + + ",app.kubernetes.io/part-of=provider", + }) + + if err != nil { + log.Error(err, fmt.Sprintf("unable to start pod watcher on node \"%s\"", dp.name)) + return err + } + + defer func() { + watcher.Stop() + }() + +initloop: + for { + select { + case <-dp.ctx.Done(): + return dp.ctx.Err() + case evt := <-watcher.ResultChan(): + resp := evt.Object.(*corev1.Pod) + if resp.Status.Phase != corev1.PodPending { + watcher.Stop() + break initloop + } + } + } + + log.Info("started hardware discovery pod", "node", dp.name) + + dp.readych <- struct{}{} + + for { + select { + case <-dp.ctx.Done(): + return dp.ctx.Err() + case readreq := <-dp.readch: + var res string + resp := dpReadResp{} + + switch readreq.op { + case dpReqCPU: + res = "cpu" + case dpReqGPU: + res = "gpu" + case dpReqMem: + res = "memory" + } + + result := kc.CoreV1().RESTClient().Get(). + Namespace(dp.namespace). + Resource("pods"). + Name(fmt.Sprintf("%s:%d", pod.Name, apiPort)). + SubResource("proxy"). + Suffix(res). + Do(dp.ctx) + + resp.err = result.Error() + + if resp.err == nil { + var data []byte + data, resp.err = result.Raw() + if resp.err == nil { + switch readreq.op { + case dpReqCPU: + var res cpu.Info + resp.err = json.Unmarshal(data, &res) + resp.data = &res + case dpReqGPU: + var res gpu.Info + resp.err = json.Unmarshal(data, &res) + resp.data = &res + case dpReqMem: + var res memory.Info + resp.err = json.Unmarshal(data, &res) + resp.data = &res + } + } + } + + readreq.resp <- resp + } + } +} + +func (dp *nodeDiscovery) monitor() error { + ctx := dp.ctx + + bus := fromctx.PubSubFromCtx(ctx) + kc := fromctx.KubeClientFromCtx(ctx) + log := fromctx.LogrFromCtx(ctx).WithName("node.monitor") + + log.Info("starting", "node", dp.name) + + nodesch := bus.Sub(topicKubeNodes) + cfgch := bus.Sub(topicInventoryConfig) + idsch := bus.Sub(topicGPUIDs) + scch := bus.Sub(topicStorageClasses) + + defer func() { + bus.Unsub(nodesch) + bus.Unsub(idsch) + bus.Unsub(cfgch) + bus.Unsub(scch) + }() + + podsWatch, err := kc.CoreV1().Pods(corev1.NamespaceAll).Watch(dp.ctx, metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("spec.nodeName", dp.name).String(), + }) + if err != nil { + log.Error(err, "unable to watch start pods") + return err + } + + defer podsWatch.Stop() + + var cfg Config + var sc storageClasses + var lastPubState nodeStateEnum + + gpusIDs := make(RegistryGPUVendors) + currLabels := make(map[string]string) + + select { + case <-dp.ctx.Done(): + return dp.ctx.Err() + case <-dp.readych: + } + + select { + case <-dp.ctx.Done(): + return dp.ctx.Err() + case evt := <-cfgch: + cfg = evt.(Config) + } + + select { + case <-dp.ctx.Done(): + return dp.ctx.Err() + case evt := <-scch: + sc = evt.(storageClasses) + } + + select { + case evt := <-idsch: + gpusIDs = evt.(RegistryGPUVendors) + default: + } + + knode, err := dp.kc.CoreV1().Nodes().Get(ctx, dp.name, metav1.GetOptions{}) + if err == nil { + currLabels = copyAkashLabels(knode.Labels) + } + + node, initPods, err := dp.initNodeInfo(gpusIDs) + if err != nil { + log.Error(err, "unable to init node info") + return err + } + + statech := make(chan struct{}, 1) + labelch := make(chan struct{}, 1) + + signalState := func() { + select { + case statech <- struct{}{}: + default: + } + } + + signalLabels := func() { + select { + case labelch <- struct{}{}: + default: + } + } + + defer func() { + if lastPubState != nodeStateRemoved { + bus.Pub(nodeState{ + state: nodeStateRemoved, + name: dp.name, + }, []string{topicInventoryNode}) + } + }() + + log.Info("started", "node", dp.name) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case evt := <-cfgch: + cfg = evt.(Config) + signalLabels() + case evt := <-scch: + sc = evt.(storageClasses) + signalLabels() + case evt := <-idsch: + gpusIDs = evt.(RegistryGPUVendors) + node.Resources.GPU.Info, _ = dp.parseGPUInfo(ctx, gpusIDs) + signalLabels() + case rEvt := <-nodesch: + evt := rEvt.(watch.Event) + switch obj := evt.Object.(type) { + case *corev1.Node: + if obj.Name == dp.name { + currLabels = copyAkashLabels(obj.Labels) + signalLabels() + } + } + case res := <-podsWatch.ResultChan(): + obj := res.Object.(*corev1.Pod) + switch res.Type { + case watch.Added: + if _, exists := initPods[obj.Name]; exists { + delete(initPods, obj.Name) + } else { + for _, container := range obj.Spec.Containers { + addAllocatedResources(&node, container.Resources.Requests) + } + } + case watch.Deleted: + delete(initPods, obj.Name) + + for _, container := range obj.Spec.Containers { + subAllocatedResources(&node, container.Resources.Requests) + } + } + + if len(initPods) == 0 { + signalState() + } + case <-statech: + if len(currLabels) > 0 { + bus.Pub(nodeState{ + state: nodeStateUpdated, + name: dp.name, + node: node.Dup(), + }, []string{topicInventoryNode}) + lastPubState = nodeStateUpdated + } else if len(currLabels) == 0 && lastPubState != nodeStateRemoved { + bus.Pub(nodeState{ + state: nodeStateRemoved, + name: dp.name, + }, []string{topicInventoryNode}) + + lastPubState = nodeStateRemoved + } + case <-labelch: + labels, nNode := generateLabels(cfg, knode, node.Dup(), sc) + if !reflect.DeepEqual(&nNode, &node) { + node = nNode + signalState() + } + + if !reflect.DeepEqual(labels, currLabels) { + currLabels = copyAkashLabels(labels) + + for key, val := range removeAkashLabels(knode.Labels) { + labels[key] = val + } + + patches := []k8sPatch{ + { + Op: "add", + Path: "/metadata/labels", + Value: labels, + }, + } + + data, _ := json.Marshal(patches) + + _, err := dp.kc.CoreV1().Nodes().Patch(dp.ctx, node.Name, k8stypes.JSONPatchType, data, metav1.PatchOptions{}) + if err != nil { + log.Error(err, fmt.Sprintf("couldn't apply patches for node \"%s\"", node.Name)) + } else { + log.Info(fmt.Sprintf("successfully applied labels and/or annotations patches for node \"%s\"", node.Name), "labels", currLabels) + } + + signalState() + } + } + } +} + +func (dp *nodeDiscovery) initNodeInfo(gpusIds RegistryGPUVendors) (v1.Node, map[string]corev1.Pod, error) { + kc := fromctx.KubeClientFromCtx(dp.ctx) + + cpuInfo, err := dp.parseCPUInfo(dp.ctx) + if err != nil { + return v1.Node{}, nil, err + } + + gpuInfo, err := dp.parseGPUInfo(dp.ctx, gpusIds) + if err != nil { + return v1.Node{}, nil, err + } + + knode, err := kc.CoreV1().Nodes().Get(dp.ctx, dp.name, metav1.GetOptions{}) + if err != nil { + return v1.Node{}, nil, fmt.Errorf("%w: error fetching node %s", err, dp.name) + } + + res := v1.Node{ + Name: knode.Name, + Resources: v1.NodeResources{ + CPU: v1.CPU{ + Quantity: v1.NewResourcePairMilli(0, 0, resource.DecimalSI), + Info: cpuInfo, + }, + GPU: v1.GPU{ + Quantity: v1.NewResourcePair(0, 0, resource.DecimalSI), + Info: gpuInfo, + }, + Memory: v1.Memory{ + Quantity: v1.NewResourcePair(0, 0, resource.DecimalSI), + Info: nil, + }, + EphemeralStorage: v1.NewResourcePair(0, 0, resource.DecimalSI), + VolumesAttached: v1.NewResourcePair(0, 0, resource.DecimalSI), + VolumesMounted: v1.NewResourcePair(0, 0, resource.DecimalSI), + }, + } + + for name, r := range knode.Status.Allocatable { + switch name { + case corev1.ResourceCPU: + res.Resources.CPU.Quantity.Allocatable.SetMilli(r.MilliValue()) + case corev1.ResourceMemory: + res.Resources.Memory.Quantity.Allocatable.Set(r.Value()) + case corev1.ResourceEphemeralStorage: + res.Resources.EphemeralStorage.Allocatable.Set(r.Value()) + case builder.ResourceGPUNvidia: + fallthrough + case builder.ResourceGPUAMD: + res.Resources.GPU.Quantity.Allocatable.Set(r.Value()) + } + } + + initPods := make(map[string]corev1.Pod) + + podsList, err := kc.CoreV1().Pods(corev1.NamespaceAll).List(dp.ctx, metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("spec.nodeName", dp.name).String(), + }) + if err != nil { + return res, nil, err + } + + if podsList == nil { + return res, initPods, nil + } + + for _, pod := range podsList.Items { + for _, container := range pod.Spec.Containers { + if container.Resources.Requests != nil { + addAllocatedResources(&res, container.Resources.Requests) + } else if container.Resources.Limits != nil { + addAllocatedResources(&res, container.Resources.Limits) + } + } + initPods[pod.Name] = pod + } + + return res, initPods, nil +} + +func addAllocatedResources(node *v1.Node, rl corev1.ResourceList) { + for name, quantity := range rl { + switch name { + case corev1.ResourceCPU: + node.Resources.CPU.Quantity.Allocated.Add(quantity) + case corev1.ResourceMemory: + node.Resources.Memory.Quantity.Allocated.Add(quantity) + case corev1.ResourceEphemeralStorage: + node.Resources.EphemeralStorage.Allocated.Add(quantity) + case builder.ResourceGPUNvidia: + fallthrough + case builder.ResourceGPUAMD: + node.Resources.GPU.Quantity.Allocated.Add(quantity) + } + } +} + +func subAllocatedResources(node *v1.Node, rl corev1.ResourceList) { + for name, quantity := range rl { + switch name { + case corev1.ResourceCPU: + node.Resources.CPU.Quantity.Allocated.Sub(quantity) + case corev1.ResourceMemory: + node.Resources.Memory.Quantity.Allocated.Sub(quantity) + case corev1.ResourceEphemeralStorage: + node.Resources.EphemeralStorage.Allocated.Sub(quantity) + case builder.ResourceGPUNvidia: + fallthrough + case builder.ResourceGPUAMD: + node.Resources.GPU.Quantity.Allocated.Sub(quantity) + } + } +} + +func copyAkashLabels(in map[string]string) map[string]string { + out := make(map[string]string, len(in)) + + for key, val := range in { + if !strings.HasPrefix(key, builder.AkashManagedLabelName) { + continue + } + + out[key] = val + } + + return out +} + +func removeAkashLabels(in map[string]string) map[string]string { + out := make(map[string]string) + + for key, val := range in { + if strings.HasPrefix(key, builder.AkashManagedLabelName) { + continue + } + + out[key] = val + } + + return out +} + +func isNodeReady(conditions []corev1.NodeCondition) bool { + for _, c := range conditions { + if c.Type == corev1.NodeReady { + return c.Status == "True" + } + } + + return false +} + +func generateLabels(cfg Config, knode *corev1.Node, node v1.Node, sc storageClasses) (map[string]string, v1.Node) { + res := make(map[string]string) + + presentSc := make([]string, 0, len(sc)) + for name := range sc { + presentSc = append(presentSc, name) + } + + adjConfig := cfg.Copy() + + sort.Strings(presentSc) + adjConfig.FilterOutStorageClasses(presentSc) + + isExcluded := !isNodeReady(knode.Status.Conditions) || knode.Spec.Unschedulable || adjConfig.Exclude.IsNodeExcluded(knode.Name) + + if isExcluded { + node.Capabilities.StorageClasses = []string{} + return res, node + } + + res[builder.AkashManagedLabelName] = "true" + + allowedSc := adjConfig.StorageClassesForNode(knode.Name) + for _, class := range allowedSc { + key := fmt.Sprintf("%s.class.%s", builder.AkashServiceCapabilityStorage, class) + res[key] = "1" + } + + node.Capabilities.StorageClasses = allowedSc + + for _, gpu := range node.Resources.GPU.Info { + key := fmt.Sprintf("%s.vendor.%s.model.%s", builder.AkashServiceCapabilityGPU, gpu.Vendor, gpu.Name) + if val, exists := res[key]; exists { + nval, _ := strconv.ParseUint(val, 10, 32) + nval++ + res[key] = strconv.FormatUint(nval, 10) + } else { + res[key] = "1" + } + + if gpu.MemorySize != "" { + key := fmt.Sprintf("%s.ram.%s", key, gpu.MemorySize) + if val, exists := res[key]; exists { + nval, _ := strconv.ParseUint(val, 10, 32) + nval++ + res[key] = strconv.FormatUint(nval, 10) + } else { + res[key] = "1" + } + } + + if gpu.Interface != "" { + key := fmt.Sprintf("%s.interface.%s", key, gpu.Interface) + if val, exists := res[key]; exists { + nval, _ := strconv.ParseUint(val, 10, 32) + nval++ + res[key] = strconv.FormatUint(nval, 10) + } else { + res[key] = "1" + } + } + } + + return res, node +} + +func (dp *nodeDiscovery) parseCPUInfo(ctx context.Context) (v1.CPUInfoS, error) { + cpus, err := dp.queryCPU(ctx) + if err != nil { + return v1.CPUInfoS{}, nil + } + + res := make(v1.CPUInfoS, 0, len(cpus.Processors)) + + for _, c := range cpus.Processors { + res = append(res, v1.CPUInfo{ + ID: strconv.Itoa(c.ID), + Vendor: c.Vendor, + Model: c.Model, + Vcores: c.NumThreads, + }) + } + + return res, nil +} + +func (dp *nodeDiscovery) parseGPUInfo(ctx context.Context, info RegistryGPUVendors) (v1.GPUInfoS, error) { + res := make(v1.GPUInfoS, 0) + + gpus, err := dp.queryGPU(ctx) + if err != nil { + return res, nil + } + + if gpus == nil { + return res, nil + } + + for _, dev := range gpus.GraphicsCards { + dinfo := dev.DeviceInfo + if dinfo == nil { + continue + } + + vinfo := dinfo.Vendor + pinfo := dinfo.Product + if vinfo == nil || pinfo == nil { + continue + } + + vendor, exists := info[vinfo.ID] + if !exists { + continue + } + + model, exists := vendor.Devices[pinfo.ID] + if !exists { + continue + } + + res = append(res, v1.GPUInfo{ + Vendor: vendor.Name, + VendorID: dev.DeviceInfo.Vendor.ID, + Name: model.Name, + ModelID: dev.DeviceInfo.Product.ID, + Interface: model.Interface, + MemorySize: model.MemorySize, + }) + } + + sort.Sort(res) + + return res, nil +} diff --git a/operator/inventory/nodes.go b/operator/inventory/nodes.go new file mode 100644 index 00000000..865283c0 --- /dev/null +++ b/operator/inventory/nodes.go @@ -0,0 +1,185 @@ +package inventory + +import ( + "context" + "errors" + "fmt" + "sort" + "time" + + "github.com/go-logr/logr" + "github.com/troian/pubsub" + "golang.org/x/sync/errgroup" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + + inventoryV1 "github.com/akash-network/akash-api/go/inventory/v1" + + "github.com/akash-network/provider/tools/fromctx" +) + +type nodeStateEnum int + +const ( + daemonSetNamespace = "akash-services" + reconnectTimeout = 5 * time.Second +) + +const ( + nodeStateUpdated nodeStateEnum = iota + nodeStateRemoved +) + +type nodeState struct { + state nodeStateEnum + name string + node inventoryV1.Node +} + +type clusterNodes struct { + querierNodes + ctx context.Context + group *errgroup.Group + log logr.Logger + kc *kubernetes.Clientset + signaldone chan string + image string + namespace string +} + +func newClusterNodes(ctx context.Context, image, namespace string) *clusterNodes { + log := fromctx.LogrFromCtx(ctx).WithName("nodes") + + group, ctx := errgroup.WithContext(ctx) + + fd := &clusterNodes{ + querierNodes: newQuerierNodes(), + log: log, + ctx: logr.NewContext(ctx, log), + group: group, + kc: fromctx.KubeClientFromCtx(ctx), + signaldone: make(chan string, 1), + image: image, + namespace: namespace, + } + + leftovers, _ := fd.kc.CoreV1().Pods(namespace).List(fd.ctx, metav1.ListOptions{ + LabelSelector: "app.kubernetes.io/name=inventory" + + ",app.kubernetes.io/instance=inventory-hardware-discovery" + + ",app.kubernetes.io/component=operator" + + ",app.kubernetes.io/part-of=provider", + }) + + for _, pod := range leftovers.Items { + _ = fd.kc.CoreV1().Pods(namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) + } + + group.Go(fd.connector) + group.Go(fd.run) + + return fd +} + +func (fd *clusterNodes) Wait() error { + return fd.group.Wait() +} + +func (fd *clusterNodes) connector() error { + ctx := fd.ctx + bus := fromctx.PubSubFromCtx(ctx) + log := fromctx.LogrFromCtx(ctx).WithName("nodes") + + events := bus.Sub(topicKubeNodes) + nodes := make(map[string]*nodeDiscovery) + + nctx, ncancel := context.WithCancel(ctx) + defer func() { + ncancel() + + for name, node := range nodes { + _ = node.shutdown() + delete(nodes, name) + } + }() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case name := <-fd.signaldone: + if node, exists := nodes[name]; exists { + delete(nodes, node.name) + + err := node.shutdown() + if err != nil && !errors.Is(err, context.Canceled) { + log.Error(err, fmt.Sprintf("\"%s\" exited with error. attempting restart", name)) + } + } + nodes[name] = newNodeDiscovery(nctx, name, fd.namespace, fd.image, fd.signaldone) + case rEvt := <-events: + switch evt := rEvt.(type) { + case watch.Event: + switch obj := evt.Object.(type) { + case *corev1.Node: + switch evt.Type { + case watch.Added: + nodes[obj.Name] = newNodeDiscovery(nctx, obj.Name, fd.namespace, fd.image, fd.signaldone) + case watch.Deleted: + if node, exists := nodes[obj.Name]; exists { + _ = node.shutdown() + delete(nodes, node.name) + } + } + } + } + } + } +} + +func (fd *clusterNodes) run() error { + nodes := make(map[string]inventoryV1.Node) + + snapshot := func() inventoryV1.Nodes { + res := make(inventoryV1.Nodes, 0, len(nodes)) + + for _, nd := range nodes { + res = append(res, nd.Dup()) + } + + sort.Sort(res) + + return res + } + + bus := fromctx.PubSubFromCtx(fd.ctx) + + events := bus.Sub(topicInventoryNode) + defer bus.Unsub(events) + for { + select { + case <-fd.ctx.Done(): + return fd.ctx.Err() + case revt := <-events: + switch evt := revt.(type) { + case nodeState: + switch evt.state { + case nodeStateUpdated: + nodes[evt.name] = evt.node + case nodeStateRemoved: + delete(nodes, evt.name) + } + + bus.Pub(snapshot(), []string{topicInventoryNodes}, pubsub.WithRetain()) + default: + } + case req := <-fd.reqch: + resp := respNodes{ + res: snapshot(), + } + + req.respCh <- resp + } + } +} diff --git a/operator/inventory/rancher.go b/operator/inventory/rancher.go index 11da58af..9aad6040 100644 --- a/operator/inventory/rancher.go +++ b/operator/inventory/rancher.go @@ -82,7 +82,7 @@ func (c *rancher) run(startch chan<- struct{}) error { } }() - events := bus.Sub("ns", "sc", "nodes") + events := bus.Sub(topicKubeNS, topicKubeSC, topicKubeNodes) log := fromctx.LogrFromCtx(c.ctx).WithName("rancher") @@ -247,7 +247,7 @@ func (c *rancher) run(startch chan<- struct{}) error { bus.Pub(storageSignal{ driver: "rancher", storage: res, - }, []string{topicStorage}, pubsub.WithRetain()) + }, []string{topicInventoryStorage}, pubsub.WithRetain()) } scrapech = scrapeCh diff --git a/operator/inventory/state.go b/operator/inventory/state.go index 7930141a..3b28543d 100644 --- a/operator/inventory/state.go +++ b/operator/inventory/state.go @@ -23,7 +23,7 @@ func (s *clusterState) run() error { state := inventory.Cluster{} signalch := make(chan struct{}, 1) - datach := bus.Sub(topicNodes, topicStorage, topicConfig) + datach := bus.Sub(topicInventoryNodes, topicInventoryStorage, topicInventoryConfig) defer bus.Unsub(datach) @@ -76,7 +76,7 @@ func (s *clusterState) run() error { err: nil, } case <-signalch: - bus.Pub(*state.Dup(), []string{topicClusterState}, pubsub.WithRetain()) + bus.Pub(*state.Dup(), []string{topicInventoryCluster}, pubsub.WithRetain()) } } } diff --git a/operator/inventory/types.go b/operator/inventory/types.go index 28365bce..d096ce0f 100644 --- a/operator/inventory/types.go +++ b/operator/inventory/types.go @@ -4,9 +4,9 @@ import ( "context" "errors" - "github.com/troian/pubsub" - rookexec "github.com/rook/rook/pkg/util/exec" + "github.com/troian/pubsub" + storagev1 "k8s.io/api/storage/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" @@ -20,23 +20,59 @@ import ( ) const ( - FlagAPITimeout = "api-timeout" - FlagQueryTimeout = "query-timeout" - FlagRESTPort = "rest-port" - FlagGRPCPort = "grpc-port" - FlagPodName = "pod-name" - FlagNodeName = "node-name" - FlagConfig = "config" - FlagProviderConfigsURL = "provider-configs-url" - FlagPciDbURL = "provider-pcidb-url" - FlagRegistryQueryPeriod = "registry-query-period" + FlagAPITimeout = "api-timeout" + FlagQueryTimeout = "query-timeout" + FlagRESTPort = "rest-port" + FlagGRPCPort = "grpc-port" + FlagPodName = "pod-name" + FlagPodNamespace = "pod-namespace" + FlagConfig = "config" + FlagProviderConfigsURL = "provider-configs-url" + FlagPciDbURL = "provider-pcidb-url" + FlagRegistryQueryPeriod = "registry-query-period" + FlagDiscoveryImage = "discovery-image" + defaultProviderConfigsURL = "https://provider-configs.akash.network" ) +const ( + topicInventoryNode = "inventory-node" + topicInventoryNodes = "inventory-nodes" + topicInventoryStorage = "inventory-storage" + topicInventoryConfig = "inventory-config" + topicInventoryCluster = "inventory-cluster" + topicGPUIDs = "gpu-ids" + topicStorageClasses = "storage-classes" + topicKubeSC = "kube-sc" + topicKubeNS = "kube-ns" + topicKubeNodes = "kube-nodes" + topicKubeCephClusters = "kube-ceph-clusters" + topicKubePV = "kube-pv" +) + +type dpReqType int + +const ( + dpReqCPU dpReqType = iota + dpReqGPU + dpReqMem +) + +type dpReadResp struct { + data interface{} + err error +} +type dpReadReq struct { + op dpReqType + resp chan<- dpReadResp +} + var ( ErrMetricsUnsupportedRequest = errors.New("unsupported request method") ) +type storageClasses map[string]*storagev1.StorageClass + type storageSignal struct { driver string storage inventory.ClusterStorage @@ -179,3 +215,13 @@ func InformKubeObjects(ctx context.Context, pub pubsub.Publisher, informer cache return nil }) } + +func (s storageClasses) copy() storageClasses { + res := make(storageClasses) + + for name, sc := range s { + res[name] = sc.DeepCopy() + } + + return res +} diff --git a/operator/inventory/util.go b/operator/inventory/util.go index c8c17d50..eac73730 100644 --- a/operator/inventory/util.go +++ b/operator/inventory/util.go @@ -55,15 +55,6 @@ func FeatureDiscoveryFromCtx(ctx context.Context) QuerierNodes { return val.(QuerierNodes) } -func HWInfoFromCtx(ctx context.Context) hwInfo { - val := ctx.Value(CtxKeyHwInfo) - if val == nil { - panic("context does not have file reader set") - } - - return val.(hwInfo) -} - func ClusterStateFromCtx(ctx context.Context) QuerierCluster { val := ctx.Value(CtxKeyClusterState) if val == nil { diff --git a/operator/psutil.go b/operator/psutil.go index e763c584..22a70273 100644 --- a/operator/psutil.go +++ b/operator/psutil.go @@ -21,6 +21,14 @@ const ( flagAPIPort = "api-port" ) +type hwInfo struct { + Errors []string `json:"errors"` + CPU *cpu.Info `json:"cpu,omitempty"` + Memory *memory.Info `json:"memory,omitempty"` + GPU *gpu.Info `json:"gpu,omitempty"` + PCI *pci.Info `json:"pci,omitempty"` +} + func cmdPsutil() *cobra.Command { cmd := &cobra.Command{ Use: "psutil", @@ -47,6 +55,8 @@ func cmdPsutilServe() *cobra.Command { SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { router := mux.NewRouter() + router.Methods(http.MethodGet).HandlerFunc(infoHandler) + router.HandleFunc("/cpu", cpuInfoHandler).Methods(http.MethodGet) router.HandleFunc("/gpu", gpuHandler).Methods(http.MethodGet) router.HandleFunc("/memory", memoryHandler).Methods(http.MethodGet) @@ -115,6 +125,33 @@ func cmdPsutilList() *cobra.Command { return cmd } +func infoHandler(w http.ResponseWriter, _ *http.Request) { + res := &hwInfo{} + var err error + + res.CPU, err = cpu.New() + if err != nil { + res.Errors = append(res.Errors, err.Error()) + } + + res.GPU, err = gpu.New() + if err != nil { + res.Errors = append(res.Errors, err.Error()) + } + + res.Memory, err = memory.New() + if err != nil { + res.Errors = append(res.Errors, err.Error()) + } + + res.PCI, err = pci.New() + if err != nil { + res.Errors = append(res.Errors, err.Error()) + } + + writeJSON(w, res) +} + func cpuInfoHandler(w http.ResponseWriter, _ *http.Request) { res, err := cpu.New() if err != nil { diff --git a/pkg/apis/akash.network/v2beta2/manifest.go b/pkg/apis/akash.network/v2beta2/manifest.go index 0f8d1cb3..bc7b50ad 100644 --- a/pkg/apis/akash.network/v2beta2/manifest.go +++ b/pkg/apis/akash.network/v2beta2/manifest.go @@ -80,8 +80,10 @@ type ManifestServiceParams struct { } type SchedulerResourceGPU struct { - Vendor string `json:"vendor"` - Model string `json:"model"` + Vendor string `json:"vendor"` + Model string `json:"model"` + MemorySize string `json:"memory_size"` + Interface string `json:"interface"` } type SchedulerResources struct {