diff --git a/.github/workflows/integration-tests.yaml b/.github/workflows/integration-tests.yaml index 97258d4c..55a1f28c 100644 --- a/.github/workflows/integration-tests.yaml +++ b/.github/workflows/integration-tests.yaml @@ -4,7 +4,7 @@ on: workflow_call: env: - KUBE_SSH_NODE_NAME: kind + KUBE_SSH_NODES: kind defaults: run: @@ -113,6 +113,15 @@ jobs: run: | make -s -C _run/kube kube-deployment-rollout-operator-inventory make -s -C _run/kube kube-wait-inventory-available + kubectl get pods,ingress,svc -A + - name: Operator inventory logs + working-directory: ${{ env.GOPATH }}/src/github.com/akash-network/provider + run: | + kubectl -n akash-services logs -l app.kubernetes.io/part-of=provider,app.kubernetes.io/component=operator,app.kubernetes.io/instance=inventory-service,app.kubernetes.io/name=inventory + - name: Operator inventory discovery logs + working-directory: ${{ env.GOPATH }}/src/github.com/akash-network/provider + run: | + kubectl -n akash-services logs -l app.kubernetes.io/part-of=provider,app.kubernetes.io/component=operator,app.kubernetes.io/instance=inventory-hardware-discovery,app.kubernetes.io/name=inventory - name: Run E2E Tests working-directory: ${{ env.GOPATH }}/src/github.com/akash-network/provider run: | diff --git a/_docs/kustomize/akash-operator-inventory/deployment.yaml b/_docs/kustomize/akash-operator-inventory/deployment.yaml index dfa56295..14dda1ca 100644 --- a/_docs/kustomize/akash-operator-inventory/deployment.yaml +++ b/_docs/kustomize/akash-operator-inventory/deployment.yaml @@ -59,14 +59,14 @@ spec: path: /metrics/health port: api scheme: HTTP - initialDelaySeconds: 5 - periodSeconds: 5 + initialDelaySeconds: 15 + periodSeconds: 15 readinessProbe: httpGet: path: /metrics/ready port: api scheme: HTTP - initialDelaySeconds: 5 + initialDelaySeconds: 15 periodSeconds: 5 ports: - containerPort: 8080 diff --git a/_run/.envrc b/_run/.envrc index d16a20e2..2cd978c8 100644 --- a/_run/.envrc +++ b/_run/.envrc @@ -4,4 +4,9 @@ if ! has grpcurl ; then echo -e "\033[31mgrpcurl is not installed"; exit 1 fi + +if ! has tqdm ; then + echo -e "\033[31mtqdm is not installed. https://github.com/tqdm/tqdm"; exit 1 +fi + export AKASH_KEYRING_BACKEND=test diff --git a/_run/common-kube.mk b/_run/common-kube.mk index 74dcdf23..1f9f55d5 100644 --- a/_run/common-kube.mk +++ b/_run/common-kube.mk @@ -6,15 +6,17 @@ include ../common-kind.mk include ../common-helm.mk KUBE_UPLOAD_AKASH_IMAGE ?= false -KUBE_CLUSTER_CREATE_TARGET ?= default KUBE_ROLLOUT_TIMEOUT ?= 180 - INGRESS_CONFIG_PATH ?= ../ingress-nginx.yaml CALICO_MANIFEST ?= https://github.com/projectcalico/calico/blob/v3.25.0/manifests/calico.yaml CRD_FILE ?= $(AP_ROOT)/pkg/apis/akash.network/crd.yaml -ifeq ($(KUBE_SSH_NODE_NAME),) -$(error "KUBE_SSH_NODE_NAME is not set") +ifeq ($(KUBE_SSH_NODES),) +$(error "KUBE_SSH_NODES is not set") +endif + +ifeq ($(KUBE_CLUSTER_CREATE_TYPE),) +$(error "KUBE_CLUSTER_CREATE_TYPE is not set") endif # when image is built locally, for example on M1 (arm64) and kubernetes cluster is running on amd64 @@ -74,26 +76,26 @@ endif endif .PHONY: kube-upload-images -kube-upload-images: kube-upload-images-$(KUBE_CLUSTER_CREATE_TARGET) +kube-upload-images: kube-upload-images-$(KUBE_CLUSTER_CREATE_TYPE) .PHONY: kube-upload-images-kind kube-upload-images-kind: $(KIND) - $(AP_ROOT)/script/setup-kube.sh load-images docker2kind "$(KIND_NAME)" "$(DOCKER_LOAD_IMAGES)" + $(AP_ROOT)/script/setup-kube.sh upload images docker2kind "$(KIND_NAME)" "$(DOCKER_LOAD_IMAGES)" -.PHONY: kube-upload-images-default -kube-upload-images-default: - $(AP_ROOT)/script/setup-kube.sh load-images docker2ctr "$(KUBE_SSH_NODE_NAME)" "$(DOCKER_LOAD_IMAGES)" +.PHONY: kube-upload-images-ssh +kube-upload-images-ssh: + $(AP_ROOT)/script/setup-kube.sh upload images docker2ctr "$(KUBE_SSH_NODES)" "$(DOCKER_LOAD_IMAGES)" .PHONY: kube-upload-crd kube-upload-crd: - $(SETUP_KUBE) --crd=$(CRD_FILE) $(KUBE_SSH_NODE_NAME) init + $(SETUP_KUBE) --crd=$(CRD_FILE) upload crd -$(KUBE_CREATE): $(AP_RUN_DIR) kube-cluster-create-$(KUBE_CLUSTER_CREATE_TARGET) - $(SETUP_KUBE) --crd=$(CRD_FILE) $(KUBE_SSH_NODE_NAME) init +$(KUBE_CREATE): $(AP_RUN_DIR) kube-cluster-create-$(KUBE_CLUSTER_CREATE_TYPE) + $(SETUP_KUBE) --crd=$(CRD_FILE) $(KUBE_CLUSTER_CREATE_TYPE) init "$(KUBE_SSH_NODES)" touch $@ -.INTERMEDIATE: kube-cluster-create-default -kube-cluster-create-default: $(KUBE_CREATE) +.INTERMEDIATE: kube-cluster-create-ssh +kube-cluster-create-ssh: .PHONY: kube-cluster-check-alive kube-cluster-check-info: @@ -129,7 +131,7 @@ kube-cluster-setup-e2e-ci: \ kube-install-helm-charts .PHONY: kube-cluster-delete -kube-cluster-delete: kube-cluster-delete-$(KUBE_SSH_NODE_NAME) +kube-cluster-delete: kube-cluster-delete-$(KUBE_SSH_NODES) .PHONY: kube-setup-ingress kube-setup-ingress: kube-setup-ingress-$(KIND_CONFIG) @@ -161,12 +163,19 @@ kube-status-ingress-%: .PHONY: kube-deployment-rollout-operator-inventory kube-deployment-rollout-operator-inventory: kubectl -n akash-services rollout status deployment operator-inventory --timeout=$(KUBE_ROLLOUT_TIMEOUT)s - kubectl -n akash-services wait pods -l app.kubernetes.io/part-of=provider -l app.kubernetes.io/component=operator -l app.kubernetes.io/instance=inventory-service --for condition=Ready --timeout=$(KUBE_ROLLOUT_TIMEOUT)s + kubectl -n akash-services wait pods \ + -l app.kubernetes.io/part-of=provider,app.kubernetes.io/component=operator,app.kubernetes.io/instance=inventory-service \ + --for condition=Ready \ + --timeout=$(KUBE_ROLLOUT_TIMEOUT)s + kubectl -n akash-services wait pods \ + -l app.kubernetes.io/part-of=provider,app.kubernetes.io/component=operator,app.kubernetes.io/instance=inventory-hardware-discovery,app.kubernetes.io/name=inventory \ + --for=condition=ready \ + --timeout=$(KUBE_ROLLOUT_TIMEOUT)s .PHONY: kube-deployment-rollout-% kube-deployment-rollout-%: kubectl -n akash-services rollout status deployment $* --timeout=$(KUBE_ROLLOUT_TIMEOUT)s - kubectl -n akash-services wait pods -l akash.network/component=operator -l akash.network/name=$(patsubst %, operator-%,$*) --for condition=Ready --timeout=$(KUBE_ROLLOUT_TIMEOUT)s + kubectl -n akash-services wait pods -l akash.network/component=operator,akash.network/name=$(patsubst %, operator-%,$*) --for condition=Ready --timeout=$(KUBE_ROLLOUT_TIMEOUT)s .PHONY: akash-node-ready akash-node-ready: SHELL=$(BASH_PATH) @@ -195,4 +204,4 @@ kube-logs-operator-inventory: .PHONY: kube-wait-inventory-available kube-wait-inventory-available: - $(SETUP_KUBE) wait inventory-available + $(SETUP_KUBE) --retries=60 wait inventory-available diff --git a/_run/kube/Makefile b/_run/kube/Makefile index b10bbe20..ec50387a 100644 --- a/_run/kube/Makefile +++ b/_run/kube/Makefile @@ -1,7 +1,7 @@ KUBE_SETUP_PREREQUISITES ?= \ -KUBE_CLUSTER_CREATE_TARGET := kind -KUBE_SSH_NODE_NAME ?= kind +KUBE_CLUSTER_CREATE_TYPE := kind +KUBE_SSH_NODES := kind KUSTOMIZE_INSTALLS ?= \ akash-operator-hostname \ diff --git a/_run/ssh/.envrc b/_run/ssh/.envrc index cd4d9fdd..99ebb719 100644 --- a/_run/ssh/.envrc +++ b/_run/ssh/.envrc @@ -2,12 +2,8 @@ source_up .envrc dotenv_if_exists dev.env -#source_env ~/projects/akash/gpu +source_env ~/projects/akash/gpu export AKASH_HOME=$DEVCACHE_RUN/ssh/.akash export AKASH_KUBECONFIG=$KUBECONFIG export AP_KUBECONFIG=$KUBECONFIG - -if ! has tqdm ; then - echo -e "\033[31mtqdm is not installed. https://github.com/tqdm/tqdm"; exit 1 -fi diff --git a/_run/ssh/Makefile b/_run/ssh/Makefile index 72eb97a6..2e363a84 100644 --- a/_run/ssh/Makefile +++ b/_run/ssh/Makefile @@ -1,6 +1,7 @@ KUBE_SETUP_PREREQUISITES ?= \ -KUBE_UPLOAD_AKASH_IMAGE ?= true +KUBE_UPLOAD_AKASH_IMAGE ?= true +KUBE_CLUSTER_CREATE_TYPE := ssh KUBE_DOCKER_IMAGE_ARCH := amd64 diff --git a/make/test-integration.mk b/make/test-integration.mk index 362f46ab..9feba2ce 100644 --- a/make/test-integration.mk +++ b/make/test-integration.mk @@ -65,7 +65,7 @@ test-nocache: .PHONY: test-full test-full: - $(GO_TEST) -tags=$(BUILD_TAGS) -race $(TEST_MODULES) + $(GO_TEST) -tags=$(BUILD_TAGS) -race -count=1 $(TEST_MODULES) .PHONY: test-coverage test-coverage: $(AP_DEVCACHE) diff --git a/operator/inventory/cmd.go b/operator/inventory/cmd.go index c4956a1d..7ded13f2 100644 --- a/operator/inventory/cmd.go +++ b/operator/inventory/cmd.go @@ -165,7 +165,7 @@ func Cmd() *cobra.Command { } } - fd := newClusterNodes(ctx, discoveryImage, namespace) + clNodes := newClusterNodes(ctx, discoveryImage, namespace) storage = append(storage, st) @@ -175,7 +175,7 @@ func Cmd() *cobra.Command { } fromctx.CmdSetContextValue(cmd, CtxKeyStorage, storage) - fromctx.CmdSetContextValue(cmd, CtxKeyFeatureDiscovery, fd) + fromctx.CmdSetContextValue(cmd, CtxKeyFeatureDiscovery, clNodes) fromctx.CmdSetContextValue(cmd, CtxKeyClusterState, QuerierCluster(clState)) ctx = cmd.Context() @@ -211,19 +211,20 @@ func Cmd() *cobra.Command { gogoreflection.Register(grpcSrv) group.Go(func() error { - return registryLoader(ctx) + return configWatcher(ctx, viper.GetString(FlagConfig)) }) group.Go(func() error { - return scWatcher(ctx) + return registryLoader(ctx) }) group.Go(func() error { - return configWatcher(ctx, viper.GetString(FlagConfig)) + return scWatcher(ctx) }) group.Go(clState.run) - group.Go(fd.Wait) + group.Go(clNodes.Wait) + group.Go(func() error { log.Info(fmt.Sprintf("rest listening on \"%s\"", restEndpoint)) @@ -533,6 +534,8 @@ func scWatcher(ctx context.Context) error { sc := make(storageClasses) + bus.Pub(sc.copy(), []string{topicStorageClasses}, pubsub.WithRetain()) + for { select { case <-ctx.Done(): diff --git a/operator/inventory/node-discovery.go b/operator/inventory/node-discovery.go index 80989049..5e39a518 100644 --- a/operator/inventory/node-discovery.go +++ b/operator/inventory/node-discovery.go @@ -81,12 +81,18 @@ func (dp *nodeDiscovery) shutdown() error { func (dp *nodeDiscovery) queryCPU(ctx context.Context) (*cpu.Info, error) { respch := make(chan dpReadResp, 1) + rctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + select { case <-ctx.Done(): return nil, ctx.Err() case <-dp.ctx.Done(): return nil, dp.ctx.Err() + case <-rctx.Done(): + return nil, rctx.Err() case dp.readch <- dpReadReq{ + ctx: rctx, op: dpReqCPU, resp: respch, }: @@ -108,12 +114,18 @@ func (dp *nodeDiscovery) queryCPU(ctx context.Context) (*cpu.Info, error) { func (dp *nodeDiscovery) queryGPU(ctx context.Context) (*gpu.Info, error) { respch := make(chan dpReadResp, 1) + rctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + select { case <-ctx.Done(): return nil, ctx.Err() case <-dp.ctx.Done(): return nil, dp.ctx.Err() + case <-rctx.Done(): + return nil, rctx.Err() case dp.readch <- dpReadReq{ + ctx: rctx, op: dpReqGPU, resp: respch, }: @@ -256,7 +268,7 @@ initloop: return dp.ctx.Err() case evt := <-watcher.ResultChan(): resp := evt.Object.(*corev1.Pod) - if resp.Status.Phase != corev1.PodPending { + if resp.Status.Phase == corev1.PodRunning { watcher.Stop() break initloop } @@ -290,7 +302,7 @@ initloop: Name(fmt.Sprintf("%s:%d", pod.Name, apiPort)). SubResource("proxy"). Suffix(res). - Do(dp.ctx) + Do(readreq.ctx) resp.err = result.Error() @@ -364,6 +376,7 @@ func (dp *nodeDiscovery) monitor() error { case <-dp.readych: } + log.Info("waiting for config") select { case <-dp.ctx.Done(): return dp.ctx.Err() @@ -371,6 +384,7 @@ func (dp *nodeDiscovery) monitor() error { cfg = evt.(Config) } + log.Info("waiting for storage classes") select { case <-dp.ctx.Done(): return dp.ctx.Err() @@ -734,11 +748,17 @@ func generateLabels(cfg Config, knode *corev1.Node, node v1.Node, sc storageClas } func (dp *nodeDiscovery) parseCPUInfo(ctx context.Context) v1.CPUInfoS { + log := fromctx.LogrFromCtx(ctx).WithName("node.monitor") + + log.Info("query cpu started") cpus, err := dp.queryCPU(ctx) if err != nil { + log.Error(err, "unable to query cpu") return v1.CPUInfoS{} } + log.Info("query cpu done") + res := make(v1.CPUInfoS, 0, len(cpus.Processors)) for _, c := range cpus.Processors { @@ -756,11 +776,17 @@ func (dp *nodeDiscovery) parseCPUInfo(ctx context.Context) v1.CPUInfoS { func (dp *nodeDiscovery) parseGPUInfo(ctx context.Context, info RegistryGPUVendors) v1.GPUInfoS { res := make(v1.GPUInfoS, 0) + log := fromctx.LogrFromCtx(ctx).WithName("node.monitor") + + log.Info("query gpu started") gpus, err := dp.queryGPU(ctx) if err != nil { + log.Error(err, "unable to query gpu") return res } + log.Info("query gpu done") + if gpus == nil { return res } diff --git a/operator/inventory/nodes.go b/operator/inventory/nodes.go index 7c83c2b2..e44e54da 100644 --- a/operator/inventory/nodes.go +++ b/operator/inventory/nodes.go @@ -76,12 +76,12 @@ func newClusterNodes(ctx context.Context, image, namespace string) *clusterNodes return fd } -func (fd *clusterNodes) Wait() error { - return fd.group.Wait() +func (cl *clusterNodes) Wait() error { + return cl.group.Wait() } -func (fd *clusterNodes) connector() error { - ctx := fd.ctx +func (cl *clusterNodes) connector() error { + ctx := cl.ctx bus := fromctx.PubSubFromCtx(ctx) log := fromctx.LogrFromCtx(ctx).WithName("nodes") @@ -102,7 +102,7 @@ func (fd *clusterNodes) connector() error { select { case <-ctx.Done(): return ctx.Err() - case name := <-fd.signaldone: + case name := <-cl.signaldone: if node, exists := nodes[name]; exists { delete(nodes, node.name) @@ -111,7 +111,7 @@ func (fd *clusterNodes) connector() error { log.Error(err, fmt.Sprintf("\"%s\" exited with error. attempting restart", name)) } } - nodes[name] = newNodeDiscovery(nctx, name, fd.namespace, fd.image, fd.signaldone) + nodes[name] = newNodeDiscovery(nctx, name, cl.namespace, cl.image, cl.signaldone) case rEvt := <-events: switch evt := rEvt.(type) { case watch.Event: @@ -119,7 +119,7 @@ func (fd *clusterNodes) connector() error { case *corev1.Node: switch evt.Type { case watch.Added: - nodes[obj.Name] = newNodeDiscovery(nctx, obj.Name, fd.namespace, fd.image, fd.signaldone) + nodes[obj.Name] = newNodeDiscovery(nctx, obj.Name, cl.namespace, cl.image, cl.signaldone) case watch.Deleted: if node, exists := nodes[obj.Name]; exists { _ = node.shutdown() @@ -132,7 +132,7 @@ func (fd *clusterNodes) connector() error { } } -func (fd *clusterNodes) run() error { +func (cl *clusterNodes) run() error { nodes := make(map[string]inventoryV1.Node) snapshot := func() inventoryV1.Nodes { @@ -147,14 +147,14 @@ func (fd *clusterNodes) run() error { return res } - bus := fromctx.PubSubFromCtx(fd.ctx) + bus := fromctx.PubSubFromCtx(cl.ctx) events := bus.Sub(topicInventoryNode) defer bus.Unsub(events) for { select { - case <-fd.ctx.Done(): - return fd.ctx.Err() + case <-cl.ctx.Done(): + return cl.ctx.Err() case revt := <-events: switch evt := revt.(type) { case nodeState: @@ -168,7 +168,7 @@ func (fd *clusterNodes) run() error { bus.Pub(snapshot(), []string{topicInventoryNodes}, pubsub.WithRetain()) default: } - case req := <-fd.reqch: + case req := <-cl.reqch: resp := respNodes{ res: snapshot(), } diff --git a/operator/inventory/types.go b/operator/inventory/types.go index d096ce0f..559db355 100644 --- a/operator/inventory/types.go +++ b/operator/inventory/types.go @@ -63,6 +63,7 @@ type dpReadResp struct { err error } type dpReadReq struct { + ctx context.Context op dpReqType resp chan<- dpReadResp } diff --git a/operator/psutil.go b/operator/psutil.go index 22a70273..faaa13d7 100644 --- a/operator/psutil.go +++ b/operator/psutil.go @@ -3,6 +3,7 @@ package operator import ( "context" "encoding/json" + "errors" "fmt" "net" "net/http" @@ -15,6 +16,7 @@ import ( "github.com/jaypipes/ghw/pkg/pci" "github.com/spf13/cobra" "github.com/spf13/viper" + "golang.org/x/sync/errgroup" ) const ( @@ -55,8 +57,8 @@ func cmdPsutilServe() *cobra.Command { SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { router := mux.NewRouter() - router.Methods(http.MethodGet).HandlerFunc(infoHandler) + router.HandleFunc("/", infoHandler).Methods(http.MethodGet) router.HandleFunc("/cpu", cpuInfoHandler).Methods(http.MethodGet) router.HandleFunc("/gpu", gpuHandler).Methods(http.MethodGet) router.HandleFunc("/memory", memoryHandler).Methods(http.MethodGet) @@ -64,16 +66,40 @@ func cmdPsutilServe() *cobra.Command { port := viper.GetUint16(flagAPIPort) + group, ctx := errgroup.WithContext(cmd.Context()) + + endpoint := fmt.Sprintf(":%d", port) + srv := &http.Server{ - Addr: fmt.Sprintf(":%d", port), + Addr: endpoint, Handler: router, BaseContext: func(_ net.Listener) context.Context { - return cmd.Context() + return ctx }, ReadHeaderTimeout: 5 * time.Second, } - return srv.ListenAndServe() + group.Go(func() error { + fmt.Printf("listening on %s\n", endpoint) + + return srv.ListenAndServe() + }) + + group.Go(func() error { + <-ctx.Done() + + fmt.Printf("received shutdown signal\n") + + _ = srv.Shutdown(context.Background()) + return ctx.Err() + }) + + err := group.Wait() + if !errors.Is(err, context.Canceled) && !errors.Is(err, http.ErrServerClosed) { + return err + } + + return nil }, } diff --git a/operator/waiter/waiter_test.go b/operator/waiter/waiter_test.go index fd3291a6..d7cf6384 100644 --- a/operator/waiter/waiter_test.go +++ b/operator/waiter/waiter_test.go @@ -2,11 +2,12 @@ package waiter import ( "context" - "github.com/akash-network/node/testutil" - "github.com/stretchr/testify/require" "io" "testing" "time" + + "github.com/akash-network/node/testutil" + "github.com/stretchr/testify/require" ) func TestWaiterNoInput(t *testing.T) { diff --git a/script/setup-kube.sh b/script/setup-kube.sh index 296f4b68..4116b969 100755 --- a/script/setup-kube.sh +++ b/script/setup-kube.sh @@ -14,6 +14,8 @@ rootdir="$(dirname "$0")/.." CRD_FILE=$rootdir/pkg/apis/akash.network/crd.yaml timeout=10 +retries=10 +retrywait=1 usage() { cat <&2 "timeout option must be positive integer" + exit 1 + fi + ;; + retries) + retries=$OPTARG + + if ! isuint "$retries" ; then + echo >&2 "retries option must be positive integer" + exit 1 + fi + ;; + retry-wait) + retrywait=$OPTARG + if ! isuint "$retrywait" ; then + echo >&2 "retry-wait option must be positive integer" + exit 1 + fi + ;; esac done shift "$((OPTIND - 1))" @@ -172,7 +196,10 @@ command_ssh() { init) shift - while read -r node; do + local nodes=("$1") + + # shellcheck disable=SC2048 + for node in ${nodes[*]}; do if ! ssh -n "$node" "test -e /etc/systemd/system/user@.service.d/delegate.conf"; then ssh -n "$node" 'sudo mkdir -p /etc/systemd/system/user@.service.d' ssh -n "$node" 'cat < /dev/null 2>&1 ; do sleep 0.1; done' - local retries=0 + local r=0 while ! grpcurl -plaintext localhost:8455 akash.inventory.v1.ClusterRPC.QueryCluster | jq '(.nodes | length > 0) and (.storage | length > 0)' --exit-status > /dev/null 2>&1; do - retries=$((retries+1)) - if [ ${retries} -eq "${timeout}" ]; then - exit 1 + r=$((r+1)) + if [ ${r} -eq "${retries}" ]; then + exit 0 fi - sleep 1 + + # shellcheck disable=SC2086 + sleep $retrywait done } @@ -346,9 +390,9 @@ kustomize) shift command_kustomize "$@" ;; -load-images) +upload) shift - command_load_images "$@" + command_upload "$@" ;; "wait") shift