From 80ff2ff7b219f792165f31b937819f83d87aacdb Mon Sep 17 00:00:00 2001 From: Artur Troian Date: Thu, 1 Feb 2024 21:01:09 -0500 Subject: [PATCH] refactor(operator/inventory): retry discovery pod create Signed-off-by: Artur Troian --- operator/inventory/annotation.go | 248 -------------------------- operator/inventory/annotation_test.go | 106 ----------- operator/inventory/config.go | 7 +- operator/inventory/node-discovery.go | 67 ++++--- operator/inventory/nodes.go | 6 - operator/inventory/util.go | 4 + 6 files changed, 53 insertions(+), 385 deletions(-) delete mode 100644 operator/inventory/annotation.go delete mode 100644 operator/inventory/annotation_test.go diff --git a/operator/inventory/annotation.go b/operator/inventory/annotation.go deleted file mode 100644 index cfa1c502..00000000 --- a/operator/inventory/annotation.go +++ /dev/null @@ -1,248 +0,0 @@ -package inventory - -import ( - "crypto/sha256" - "encoding/hex" - "encoding/json" - "errors" - "fmt" - "sort" - "strings" - - inventoryV1 "github.com/akash-network/akash-api/go/inventory/v1" - "github.com/blang/semver/v4" - "gopkg.in/yaml.v3" -) - -const ( - annotationVersionField = "version" - - AnnotationKeyCapabilitiesSHA256 = "akash.network/capabilities.sha256" - AnnotationKeyCapabilities = "akash.network/capabilities" - AnnotationNodeSelfCapabilities = "akash.network/node.self.capabilities" -) - -var ( - errCapabilitiesInvalid = errors.New("capabilities: invalid") - errCapabilitiesInvalidContent = fmt.Errorf("%w: content", errCapabilitiesInvalid) - errCapabilitiesInvalidNoVersion = fmt.Errorf("%w: no version found", errCapabilitiesInvalid) - errCapabilitiesInvalidVersion = fmt.Errorf("%w: version", errCapabilitiesInvalid) - errCapabilitiesUnsupportedVersion = fmt.Errorf("%w: unsupported version", errCapabilitiesInvalid) -) - -type CapabilitiesV1GPU struct { - Vendor string `json:"vendor"` - VendorID string `json:"vendor_id"` - Model string `json:"model"` - ModelID string `json:"model_id"` - MemorySize string `json:"memory_size"` - Interface string `json:"interface"` -} - -// type CapabilitiesV1GPUs []CapabilitiesV1GPU - -type CapabilitiesV1 struct { - StorageClasses []string `json:"storage_classes"` - GPUs inventoryV1.GPUInfoS `json:"gpus"` -} - -type Capabilities interface{} - -type AnnotationCapabilities struct { - Version semver.Version `json:"version" yaml:"version"` - LastAppliedSHA256 string `json:"last_applied_sha256" yaml:"last_applied_sha256"` - Capabilities `yaml:",inline"` -} - -var ( - _ json.Marshaler = (*AnnotationCapabilities)(nil) - _ json.Unmarshaler = (*AnnotationCapabilities)(nil) -) - -func remove[T any](slice []T, s int) []T { - return append(slice[:s], slice[s+1:]...) -} - -func NewAnnotationCapabilities(sc []string) *AnnotationCapabilities { - caps := &CapabilitiesV1{ - StorageClasses: make([]string, len(sc)), - } - - copy(caps.StorageClasses, sc) - - res := &AnnotationCapabilities{ - Version: semver.Version{Major: 1}, - Capabilities: caps, - } - - return res -} - -func NewAnnotationNodeSelfCapabilities(sc []string) *AnnotationCapabilities { - caps := &CapabilitiesV1{ - StorageClasses: make([]string, len(sc)), - } - - copy(caps.StorageClasses, sc) - - res := &AnnotationCapabilities{ - Version: semver.Version{Major: 1}, - Capabilities: caps, - } - - return res -} - -func (s *CapabilitiesV1) RemoveClass(name string) bool { - for i, c := range s.StorageClasses { - if c == name { - s.StorageClasses = remove(s.StorageClasses, i) - sort.Strings(s.StorageClasses) - return true - } - } - - return false -} - -func parseNodeCapabilities(annotations map[string]string) (*AnnotationCapabilities, []byte, error) { - res := &AnnotationCapabilities{} - - val, exists := annotations[AnnotationKeyCapabilities] - if !exists { - return res, []byte{}, nil - } - - var err error - if strings.HasPrefix(val, "{") { - err = json.Unmarshal([]byte(val), res) - } else { - err = yaml.Unmarshal([]byte(val), res) - } - - if err != nil { - return nil, []byte{}, err - } - - val, exists = annotations[AnnotationKeyCapabilitiesSHA256] - if !exists { - return res, []byte{}, nil - } - - checksum, err := hex.DecodeString(val) - if err != nil && len(checksum) != sha256.Size { - return res, []byte{}, nil - } - - return res, checksum, nil -} - -func (s *AnnotationCapabilities) UnmarshalYAML(node *yaml.Node) error { - var result AnnotationCapabilities - - foundVersion := false - for idx := range node.Content { - if node.Content[idx].Value == annotationVersionField { - var err error - if result.Version, err = semver.ParseTolerant(node.Content[idx+1].Value); err != nil { - return fmt.Errorf("%w: %w", errCapabilitiesInvalidVersion, err) - } - foundVersion = true - break - } - } - - if !foundVersion { - return errCapabilitiesInvalidNoVersion - } - - // nolint: gocritic - switch result.Version.String() { - case "1.0.0": - var decoded CapabilitiesV1 - if err := node.Decode(&decoded); err != nil { - return fmt.Errorf("%w: %w", errCapabilitiesInvalidContent, err) - } - - sort.Strings(decoded.StorageClasses) - - result.Capabilities = &decoded - default: - return fmt.Errorf("%w: %q", errCapabilitiesUnsupportedVersion, result.Version) - } - - *s = result - - return nil -} - -func (s *AnnotationCapabilities) UnmarshalJSON(data []byte) error { - core := make(map[string]interface{}) - - err := json.Unmarshal(data, &core) - if err != nil { - return fmt.Errorf("%w: %w", errCapabilitiesInvalidContent, err) - } - - result := AnnotationCapabilities{} - - if val, valid := core[annotationVersionField].(string); valid { - if result.Version, err = semver.ParseTolerant(val); err != nil { - return fmt.Errorf("%w: %w", errCapabilitiesInvalidVersion, err) - } - } else { - return errCapabilitiesInvalidNoVersion - } - - // nolint: gocritic - switch result.Version.String() { - case "1.0.0": - var decoded CapabilitiesV1 - if err := json.Unmarshal(data, &decoded); err != nil { - return fmt.Errorf("%w: %w", errCapabilitiesInvalidContent, err) - } - - sort.Strings(decoded.StorageClasses) - - result.Capabilities = &decoded - default: - return fmt.Errorf("%w: %q", errCapabilitiesUnsupportedVersion, result.Version) - } - - *s = result - - return nil -} - -// MarshalJSON bc at the time of writing Go 1.21 json/encoding does not support inline tag -// this function circumvents the issue by using temporary anonymous struct -func (s *AnnotationCapabilities) MarshalJSON() ([]byte, error) { - var obj interface{} - - // remove no lint when next version added - // nolint: gocritic - switch caps := s.Capabilities.(type) { - case *CapabilitiesV1: - data, err := json.Marshal(caps) - if err != nil { - return nil, err - } - - enc := sha256.New() - if _, err = enc.Write(data); err != nil { - return nil, err - } - - obj = struct { - Version semver.Version `json:"version"` - LastAppliedSHA256 string `json:"last_applied_sha256"` - CapabilitiesV1 - }{ - Version: s.Version, - LastAppliedSHA256: hex.EncodeToString(enc.Sum(nil)), - CapabilitiesV1: *caps, - } - } - - return json.Marshal(obj) -} diff --git a/operator/inventory/annotation_test.go b/operator/inventory/annotation_test.go deleted file mode 100644 index 00e69064..00000000 --- a/operator/inventory/annotation_test.go +++ /dev/null @@ -1,106 +0,0 @@ -package inventory - -import ( - "encoding/json" - "testing" - - "github.com/stretchr/testify/assert" - "gopkg.in/yaml.v3" -) - -type testCase struct { - name string - data string - expErr error -} - -func TestAnnotationsJson(t *testing.T) { - testCases := []testCase{ - { - name: "v1/valid", - data: `{"version":"v1.0.0","storage_classes":["beta2","default"]}`, - expErr: nil, - }, - { - name: "v1/invalid/missing version", - data: `{"storage_classes":["beta2","default"]}`, - expErr: errCapabilitiesInvalidNoVersion, - }, - { - name: "v1/invalid/bad version", - data: `{"version":"bla","storage_classes":["beta2","default"]}`, - expErr: errCapabilitiesInvalidVersion, - }, - { - name: "v1/invalid/unsupported version", - data: `{"version":"v10000.0.0","storage_classes":["beta2","default"]}`, - expErr: errCapabilitiesUnsupportedVersion, - }, - } - - for _, test := range testCases { - t.Run(test.name, func(t *testing.T) { - caps := &AnnotationCapabilities{} - - err := json.Unmarshal([]byte(test.data), caps) - if test.expErr == nil { - assert.NoError(t, err) - } else { - assert.ErrorAs(t, err, &test.expErr) - } - }) - } -} - -func TestAnnotationsYaml(t *testing.T) { - testCases := []testCase{ - { - name: "v1/valid", - data: `--- -version: v1.0.0 -storage_classes: - - beta2 - - default`, - expErr: nil, - }, - { - name: "v1/invalid/missing version", - data: `--- -storage_classes: - - beta2 - - default`, - expErr: errCapabilitiesInvalidNoVersion, - }, - { - name: "v1/invalid/bad version", - data: `--- -version: bla -storage_classes: - - beta2 - - default`, - expErr: errCapabilitiesInvalidVersion, - }, - { - name: "v1/invalid/unsupported version", - data: `--- -version: v10000.0.0 -storage_classes: - - beta2 - - default`, - expErr: errCapabilitiesUnsupportedVersion, - }, - } - - for _, test := range testCases { - t.Run(test.name, func(t *testing.T) { - caps := &AnnotationCapabilities{} - - err := yaml.Unmarshal([]byte(test.data), caps) - if test.expErr == nil { - assert.NoError(t, err) - } else { - assert.ErrorAs(t, err, &test.expErr) - } - }) - } -} diff --git a/operator/inventory/config.go b/operator/inventory/config.go index 51e69f13..269449d4 100644 --- a/operator/inventory/config.go +++ b/operator/inventory/config.go @@ -1,6 +1,7 @@ package inventory import ( + "errors" "fmt" "os" "regexp" @@ -13,6 +14,10 @@ import ( "gopkg.in/yaml.v3" ) +var ( + errConfigInvalidVersion = errors.New("invalid version") +) + type ExcludeRules []*regexp.Regexp type ConfigStorage struct { @@ -182,7 +187,7 @@ loop: switch node.Content[i].Value { case "version": if res.Version, err = semver.ParseTolerant(node.Content[i+1].Value); err != nil { - return fmt.Errorf("%w: %w", errCapabilitiesInvalidVersion, err) + return fmt.Errorf("%w: %w", errConfigInvalidVersion, err) } continue loop case "cluster_storage": diff --git a/operator/inventory/node-discovery.go b/operator/inventory/node-discovery.go index f322af6c..80989049 100644 --- a/operator/inventory/node-discovery.go +++ b/operator/inventory/node-discovery.go @@ -3,11 +3,13 @@ package inventory import ( "context" "encoding/json" + "errors" "fmt" "reflect" "sort" "strconv" "strings" + "time" "github.com/jaypipes/ghw/pkg/cpu" "github.com/jaypipes/ghw/pkg/gpu" @@ -135,7 +137,9 @@ func (dp *nodeDiscovery) apiConnector() error { dp.sig <- dp.name }() - log := fromctx.LogrFromCtx(dp.ctx).WithName("node.discovery") + ctx := dp.ctx + + log := fromctx.LogrFromCtx(ctx).WithName("node.discovery") log.Info("starting hardware discovery pod", "node", dp.name) @@ -188,12 +192,35 @@ func (dp *nodeDiscovery) apiConnector() error { }, } - kc := fromctx.KubeClientFromCtx(dp.ctx) + kc := fromctx.KubeClientFromCtx(ctx) - pod, err := kc.CoreV1().Pods(dp.namespace).Create(dp.ctx, req, metav1.CreateOptions{}) - if err != nil && !kerrors.IsAlreadyExists(err) { - log.Error(err, fmt.Sprintf("unable to start discovery pod on node \"%s\"", dp.name)) - return err + var pod *corev1.Pod + var err error + + for { + pod, err = kc.CoreV1().Pods(dp.namespace).Create(ctx, req, metav1.CreateOptions{}) + if err == nil { + break + } + + if errors.Is(err, context.Canceled) { + return err + } + + if !kerrors.IsAlreadyExists(err) { + log.Error(err, fmt.Sprintf("unable to start discovery pod on node \"%s\"", dp.name)) + } + + tctx, tcancel := context.WithTimeout(ctx, time.Second) + + select { + case <-tctx.Done(): + } + + tcancel() + if !errors.Is(tctx.Err(), context.DeadlineExceeded) { + return tctx.Err() + } } defer func() { @@ -213,7 +240,6 @@ func (dp *nodeDiscovery) apiConnector() error { ",app.kubernetes.io/component=operator" + ",app.kubernetes.io/part-of=provider", }) - if err != nil { log.Error(err, fmt.Sprintf("unable to start pod watcher on node \"%s\"", dp.name)) return err @@ -409,7 +435,7 @@ func (dp *nodeDiscovery) monitor() error { signalLabels() case evt := <-idsch: gpusIDs = evt.(RegistryGPUVendors) - node.Resources.GPU.Info, _ = dp.parseGPUInfo(ctx, gpusIDs) + node.Resources.GPU.Info = dp.parseGPUInfo(ctx, gpusIDs) signalLabels() case rEvt := <-nodesch: evt := rEvt.(watch.Event) @@ -498,15 +524,8 @@ func (dp *nodeDiscovery) monitor() error { func (dp *nodeDiscovery) initNodeInfo(gpusIds RegistryGPUVendors) (v1.Node, map[string]corev1.Pod, error) { kc := fromctx.KubeClientFromCtx(dp.ctx) - cpuInfo, err := dp.parseCPUInfo(dp.ctx) - if err != nil { - return v1.Node{}, nil, err - } - - gpuInfo, err := dp.parseGPUInfo(dp.ctx, gpusIds) - if err != nil { - return v1.Node{}, nil, err - } + cpuInfo := dp.parseCPUInfo(dp.ctx) + gpuInfo := dp.parseGPUInfo(dp.ctx, gpusIds) knode, err := kc.CoreV1().Nodes().Get(dp.ctx, dp.name, metav1.GetOptions{}) if err != nil { @@ -714,10 +733,10 @@ func generateLabels(cfg Config, knode *corev1.Node, node v1.Node, sc storageClas return res, node } -func (dp *nodeDiscovery) parseCPUInfo(ctx context.Context) (v1.CPUInfoS, error) { +func (dp *nodeDiscovery) parseCPUInfo(ctx context.Context) v1.CPUInfoS { cpus, err := dp.queryCPU(ctx) if err != nil { - return v1.CPUInfoS{}, nil + return v1.CPUInfoS{} } res := make(v1.CPUInfoS, 0, len(cpus.Processors)) @@ -731,19 +750,19 @@ func (dp *nodeDiscovery) parseCPUInfo(ctx context.Context) (v1.CPUInfoS, error) }) } - return res, nil + return res } -func (dp *nodeDiscovery) parseGPUInfo(ctx context.Context, info RegistryGPUVendors) (v1.GPUInfoS, error) { +func (dp *nodeDiscovery) parseGPUInfo(ctx context.Context, info RegistryGPUVendors) v1.GPUInfoS { res := make(v1.GPUInfoS, 0) gpus, err := dp.queryGPU(ctx) if err != nil { - return res, nil + return res } if gpus == nil { - return res, nil + return res } for _, dev := range gpus.GraphicsCards { @@ -780,5 +799,5 @@ func (dp *nodeDiscovery) parseGPUInfo(ctx context.Context, info RegistryGPUVendo sort.Sort(res) - return res, nil + return res } diff --git a/operator/inventory/nodes.go b/operator/inventory/nodes.go index 865283c0..7c83c2b2 100644 --- a/operator/inventory/nodes.go +++ b/operator/inventory/nodes.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "sort" - "time" "github.com/go-logr/logr" "github.com/troian/pubsub" @@ -22,11 +21,6 @@ import ( type nodeStateEnum int -const ( - daemonSetNamespace = "akash-services" - reconnectTimeout = 5 * time.Second -) - const ( nodeStateUpdated nodeStateEnum = iota nodeStateRemoved diff --git a/operator/inventory/util.go b/operator/inventory/util.go index eac73730..4b9977da 100644 --- a/operator/inventory/util.go +++ b/operator/inventory/util.go @@ -72,3 +72,7 @@ func ConfigFromCtx(ctx context.Context) Config { return val.(Config) } + +func remove[T any](slice []T, s int) []T { + return append(slice[:s], slice[s+1:]...) +}