From 6ce7931e5c53bf70b211ce7a8535baea0bdf7d42 Mon Sep 17 00:00:00 2001 From: Artur Troian Date: Sat, 3 Feb 2024 17:22:32 -0500 Subject: [PATCH] chore(scripts): add retry option for setup-kube commands (#186) Signed-off-by: Artur Troian --- .github/workflows/integration-tests.yaml | 13 ++- .../akash-operator-inventory/deployment.yaml | 6 +- _run/.envrc | 5 ++ _run/common-kube.mk | 45 ++++++---- _run/kube/Makefile | 4 +- _run/ssh/.envrc | 6 +- _run/ssh/Makefile | 3 +- go.mod | 2 +- go.sum | 4 +- make/test-integration.mk | 2 +- operator/inventory/cmd.go | 29 +++++-- operator/inventory/node-discovery.go | 30 ++++++- operator/inventory/nodes.go | 24 +++--- operator/inventory/types.go | 1 + operator/psutil.go | 34 +++++++- operator/waiter/waiter_test.go | 5 +- script/setup-kube.sh | 82 ++++++++++++++----- 17 files changed, 215 insertions(+), 80 deletions(-) diff --git a/.github/workflows/integration-tests.yaml b/.github/workflows/integration-tests.yaml index 97258d4c..46e70e87 100644 --- a/.github/workflows/integration-tests.yaml +++ b/.github/workflows/integration-tests.yaml @@ -4,7 +4,7 @@ on: workflow_call: env: - KUBE_SSH_NODE_NAME: kind + KUBE_SSH_NODES: kind defaults: run: @@ -112,11 +112,20 @@ jobs: working-directory: ${{ env.GOPATH }}/src/github.com/akash-network/provider run: | make -s -C _run/kube kube-deployment-rollout-operator-inventory - make -s -C _run/kube kube-wait-inventory-available - name: Run E2E Tests working-directory: ${{ env.GOPATH }}/src/github.com/akash-network/provider run: | make test-e2e-integration + - name: Print operator inventory logs + if: always() + working-directory: ${{ env.GOPATH }}/src/github.com/akash-network/provider + run: | + kubectl -n akash-services logs -l app.kubernetes.io/part-of=provider,app.kubernetes.io/component=operator,app.kubernetes.io/instance=inventory-service,app.kubernetes.io/name=inventory + - name: Print operator inventory discovery logs + if: always() + working-directory: ${{ env.GOPATH }}/src/github.com/akash-network/provider + run: | + kubectl -n akash-services logs -l app.kubernetes.io/part-of=provider,app.kubernetes.io/component=operator,app.kubernetes.io/instance=inventory-hardware-discovery,app.kubernetes.io/name=inventory - name: Run K8s Tests working-directory: ${{ env.GOPATH }}/src/github.com/akash-network/provider run: | diff --git a/_docs/kustomize/akash-operator-inventory/deployment.yaml b/_docs/kustomize/akash-operator-inventory/deployment.yaml index dfa56295..14dda1ca 100644 --- a/_docs/kustomize/akash-operator-inventory/deployment.yaml +++ b/_docs/kustomize/akash-operator-inventory/deployment.yaml @@ -59,14 +59,14 @@ spec: path: /metrics/health port: api scheme: HTTP - initialDelaySeconds: 5 - periodSeconds: 5 + initialDelaySeconds: 15 + periodSeconds: 15 readinessProbe: httpGet: path: /metrics/ready port: api scheme: HTTP - initialDelaySeconds: 5 + initialDelaySeconds: 15 periodSeconds: 5 ports: - containerPort: 8080 diff --git a/_run/.envrc b/_run/.envrc index d16a20e2..2cd978c8 100644 --- a/_run/.envrc +++ b/_run/.envrc @@ -4,4 +4,9 @@ if ! has grpcurl ; then echo -e "\033[31mgrpcurl is not installed"; exit 1 fi + +if ! has tqdm ; then + echo -e "\033[31mtqdm is not installed. https://github.com/tqdm/tqdm"; exit 1 +fi + export AKASH_KEYRING_BACKEND=test diff --git a/_run/common-kube.mk b/_run/common-kube.mk index 74dcdf23..1f9f55d5 100644 --- a/_run/common-kube.mk +++ b/_run/common-kube.mk @@ -6,15 +6,17 @@ include ../common-kind.mk include ../common-helm.mk KUBE_UPLOAD_AKASH_IMAGE ?= false -KUBE_CLUSTER_CREATE_TARGET ?= default KUBE_ROLLOUT_TIMEOUT ?= 180 - INGRESS_CONFIG_PATH ?= ../ingress-nginx.yaml CALICO_MANIFEST ?= https://github.com/projectcalico/calico/blob/v3.25.0/manifests/calico.yaml CRD_FILE ?= $(AP_ROOT)/pkg/apis/akash.network/crd.yaml -ifeq ($(KUBE_SSH_NODE_NAME),) -$(error "KUBE_SSH_NODE_NAME is not set") +ifeq ($(KUBE_SSH_NODES),) +$(error "KUBE_SSH_NODES is not set") +endif + +ifeq ($(KUBE_CLUSTER_CREATE_TYPE),) +$(error "KUBE_CLUSTER_CREATE_TYPE is not set") endif # when image is built locally, for example on M1 (arm64) and kubernetes cluster is running on amd64 @@ -74,26 +76,26 @@ endif endif .PHONY: kube-upload-images -kube-upload-images: kube-upload-images-$(KUBE_CLUSTER_CREATE_TARGET) +kube-upload-images: kube-upload-images-$(KUBE_CLUSTER_CREATE_TYPE) .PHONY: kube-upload-images-kind kube-upload-images-kind: $(KIND) - $(AP_ROOT)/script/setup-kube.sh load-images docker2kind "$(KIND_NAME)" "$(DOCKER_LOAD_IMAGES)" + $(AP_ROOT)/script/setup-kube.sh upload images docker2kind "$(KIND_NAME)" "$(DOCKER_LOAD_IMAGES)" -.PHONY: kube-upload-images-default -kube-upload-images-default: - $(AP_ROOT)/script/setup-kube.sh load-images docker2ctr "$(KUBE_SSH_NODE_NAME)" "$(DOCKER_LOAD_IMAGES)" +.PHONY: kube-upload-images-ssh +kube-upload-images-ssh: + $(AP_ROOT)/script/setup-kube.sh upload images docker2ctr "$(KUBE_SSH_NODES)" "$(DOCKER_LOAD_IMAGES)" .PHONY: kube-upload-crd kube-upload-crd: - $(SETUP_KUBE) --crd=$(CRD_FILE) $(KUBE_SSH_NODE_NAME) init + $(SETUP_KUBE) --crd=$(CRD_FILE) upload crd -$(KUBE_CREATE): $(AP_RUN_DIR) kube-cluster-create-$(KUBE_CLUSTER_CREATE_TARGET) - $(SETUP_KUBE) --crd=$(CRD_FILE) $(KUBE_SSH_NODE_NAME) init +$(KUBE_CREATE): $(AP_RUN_DIR) kube-cluster-create-$(KUBE_CLUSTER_CREATE_TYPE) + $(SETUP_KUBE) --crd=$(CRD_FILE) $(KUBE_CLUSTER_CREATE_TYPE) init "$(KUBE_SSH_NODES)" touch $@ -.INTERMEDIATE: kube-cluster-create-default -kube-cluster-create-default: $(KUBE_CREATE) +.INTERMEDIATE: kube-cluster-create-ssh +kube-cluster-create-ssh: .PHONY: kube-cluster-check-alive kube-cluster-check-info: @@ -129,7 +131,7 @@ kube-cluster-setup-e2e-ci: \ kube-install-helm-charts .PHONY: kube-cluster-delete -kube-cluster-delete: kube-cluster-delete-$(KUBE_SSH_NODE_NAME) +kube-cluster-delete: kube-cluster-delete-$(KUBE_SSH_NODES) .PHONY: kube-setup-ingress kube-setup-ingress: kube-setup-ingress-$(KIND_CONFIG) @@ -161,12 +163,19 @@ kube-status-ingress-%: .PHONY: kube-deployment-rollout-operator-inventory kube-deployment-rollout-operator-inventory: kubectl -n akash-services rollout status deployment operator-inventory --timeout=$(KUBE_ROLLOUT_TIMEOUT)s - kubectl -n akash-services wait pods -l app.kubernetes.io/part-of=provider -l app.kubernetes.io/component=operator -l app.kubernetes.io/instance=inventory-service --for condition=Ready --timeout=$(KUBE_ROLLOUT_TIMEOUT)s + kubectl -n akash-services wait pods \ + -l app.kubernetes.io/part-of=provider,app.kubernetes.io/component=operator,app.kubernetes.io/instance=inventory-service \ + --for condition=Ready \ + --timeout=$(KUBE_ROLLOUT_TIMEOUT)s + kubectl -n akash-services wait pods \ + -l app.kubernetes.io/part-of=provider,app.kubernetes.io/component=operator,app.kubernetes.io/instance=inventory-hardware-discovery,app.kubernetes.io/name=inventory \ + --for=condition=ready \ + --timeout=$(KUBE_ROLLOUT_TIMEOUT)s .PHONY: kube-deployment-rollout-% kube-deployment-rollout-%: kubectl -n akash-services rollout status deployment $* --timeout=$(KUBE_ROLLOUT_TIMEOUT)s - kubectl -n akash-services wait pods -l akash.network/component=operator -l akash.network/name=$(patsubst %, operator-%,$*) --for condition=Ready --timeout=$(KUBE_ROLLOUT_TIMEOUT)s + kubectl -n akash-services wait pods -l akash.network/component=operator,akash.network/name=$(patsubst %, operator-%,$*) --for condition=Ready --timeout=$(KUBE_ROLLOUT_TIMEOUT)s .PHONY: akash-node-ready akash-node-ready: SHELL=$(BASH_PATH) @@ -195,4 +204,4 @@ kube-logs-operator-inventory: .PHONY: kube-wait-inventory-available kube-wait-inventory-available: - $(SETUP_KUBE) wait inventory-available + $(SETUP_KUBE) --retries=60 wait inventory-available diff --git a/_run/kube/Makefile b/_run/kube/Makefile index b10bbe20..ec50387a 100644 --- a/_run/kube/Makefile +++ b/_run/kube/Makefile @@ -1,7 +1,7 @@ KUBE_SETUP_PREREQUISITES ?= \ -KUBE_CLUSTER_CREATE_TARGET := kind -KUBE_SSH_NODE_NAME ?= kind +KUBE_CLUSTER_CREATE_TYPE := kind +KUBE_SSH_NODES := kind KUSTOMIZE_INSTALLS ?= \ akash-operator-hostname \ diff --git a/_run/ssh/.envrc b/_run/ssh/.envrc index cd4d9fdd..99ebb719 100644 --- a/_run/ssh/.envrc +++ b/_run/ssh/.envrc @@ -2,12 +2,8 @@ source_up .envrc dotenv_if_exists dev.env -#source_env ~/projects/akash/gpu +source_env ~/projects/akash/gpu export AKASH_HOME=$DEVCACHE_RUN/ssh/.akash export AKASH_KUBECONFIG=$KUBECONFIG export AP_KUBECONFIG=$KUBECONFIG - -if ! has tqdm ; then - echo -e "\033[31mtqdm is not installed. https://github.com/tqdm/tqdm"; exit 1 -fi diff --git a/_run/ssh/Makefile b/_run/ssh/Makefile index 72eb97a6..2e363a84 100644 --- a/_run/ssh/Makefile +++ b/_run/ssh/Makefile @@ -1,6 +1,7 @@ KUBE_SETUP_PREREQUISITES ?= \ -KUBE_UPLOAD_AKASH_IMAGE ?= true +KUBE_UPLOAD_AKASH_IMAGE ?= true +KUBE_CLUSTER_CREATE_TYPE := ssh KUBE_DOCKER_IMAGE_ARCH := amd64 diff --git a/go.mod b/go.mod index 92098121..9f0dfc8f 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/spf13/viper v1.18.2 github.com/stretchr/testify v1.8.4 github.com/tendermint/tendermint v0.34.27 - github.com/troian/pubsub v0.1.0 + github.com/troian/pubsub v0.1.1 github.com/vektra/mockery/v2 v2.40.1 go.uber.org/zap v1.24.0 golang.org/x/net v0.19.0 diff --git a/go.sum b/go.sum index b99b844e..70b3be55 100644 --- a/go.sum +++ b/go.sum @@ -1881,8 +1881,8 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1 github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/troian/hid v0.13.2 h1:O7PWZQm5YGyg0nVvknFVLVrNTPillz4ZXvxJOtoyteE= github.com/troian/hid v0.13.2/go.mod h1:n6adloQ1876oEXZr6fFsthy4FDHxwJhh7QYQspm30Ds= -github.com/troian/pubsub v0.1.0 h1:ePToDcB/zZjDMk5uuUSCV93Xl7i+1SNvc18tcWso1Q8= -github.com/troian/pubsub v0.1.0/go.mod h1:ALzDZB06e+BF8JeLnO1hbVIY9dCTu8x6mhcdvitlNRs= +github.com/troian/pubsub v0.1.1 h1:huc5qneo0rtSKKsrkroyyMu+b8bw0talql2tt7GXl98= +github.com/troian/pubsub v0.1.1/go.mod h1:fOUAEWXes/SkyWPTdBpW3L/ovyg74N+eBxRpWKik+2Q= github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/tv42/httpunix v0.0.0-20191220191345-2ba4b9c3382c h1:u6SKchux2yDvFQnDHS3lPnIRmfVJ5Sxy3ao2SIdysLQ= diff --git a/make/test-integration.mk b/make/test-integration.mk index 362f46ab..9feba2ce 100644 --- a/make/test-integration.mk +++ b/make/test-integration.mk @@ -65,7 +65,7 @@ test-nocache: .PHONY: test-full test-full: - $(GO_TEST) -tags=$(BUILD_TAGS) -race $(TEST_MODULES) + $(GO_TEST) -tags=$(BUILD_TAGS) -race -count=1 $(TEST_MODULES) .PHONY: test-coverage test-coverage: $(AP_DEVCACHE) diff --git a/operator/inventory/cmd.go b/operator/inventory/cmd.go index c4956a1d..26e277ee 100644 --- a/operator/inventory/cmd.go +++ b/operator/inventory/cmd.go @@ -165,7 +165,7 @@ func Cmd() *cobra.Command { } } - fd := newClusterNodes(ctx, discoveryImage, namespace) + clNodes := newClusterNodes(ctx, discoveryImage, namespace) storage = append(storage, st) @@ -175,7 +175,7 @@ func Cmd() *cobra.Command { } fromctx.CmdSetContextValue(cmd, CtxKeyStorage, storage) - fromctx.CmdSetContextValue(cmd, CtxKeyFeatureDiscovery, fd) + fromctx.CmdSetContextValue(cmd, CtxKeyFeatureDiscovery, clNodes) fromctx.CmdSetContextValue(cmd, CtxKeyClusterState, QuerierCluster(clState)) ctx = cmd.Context() @@ -211,7 +211,7 @@ func Cmd() *cobra.Command { gogoreflection.Register(grpcSrv) group.Go(func() error { - return registryLoader(ctx) + return configWatcher(ctx, viper.GetString(FlagConfig)) }) group.Go(func() error { @@ -219,11 +219,12 @@ func Cmd() *cobra.Command { }) group.Go(func() error { - return configWatcher(ctx, viper.GetString(FlagConfig)) + return registryLoader(ctx) }) group.Go(clState.run) - group.Go(fd.Wait) + group.Go(clNodes.Wait) + group.Go(func() error { log.Info(fmt.Sprintf("rest listening on \"%s\"", restEndpoint)) @@ -355,6 +356,12 @@ func loadKubeConfig(c *cobra.Command) error { } func configWatcher(ctx context.Context, file string) error { + log := fromctx.LogrFromCtx(ctx).WithName("watcher.config") + + defer func() { + log.Info("stopped") + }() + config, err := loadConfig(file, false) if err != nil { return err @@ -388,6 +395,8 @@ func configWatcher(ctx context.Context, file string) error { bus.Pub(config, []string{topicInventoryConfig}, pubsub.WithRetain()) + log.Info("started") + for { select { case <-ctx.Done(): @@ -527,12 +536,22 @@ func registryLoader(ctx context.Context) error { } func scWatcher(ctx context.Context) error { + log := fromctx.LogrFromCtx(ctx).WithName("watcher.storageclasses") + + defer func() { + log.Info("stopped") + }() + bus := fromctx.PubSubFromCtx(ctx) scch := bus.Sub(topicKubeSC) sc := make(storageClasses) + bus.Pub(sc.copy(), []string{topicStorageClasses}, pubsub.WithRetain()) + + log.Info("started") + for { select { case <-ctx.Done(): diff --git a/operator/inventory/node-discovery.go b/operator/inventory/node-discovery.go index 80989049..e2b92610 100644 --- a/operator/inventory/node-discovery.go +++ b/operator/inventory/node-discovery.go @@ -81,12 +81,18 @@ func (dp *nodeDiscovery) shutdown() error { func (dp *nodeDiscovery) queryCPU(ctx context.Context) (*cpu.Info, error) { respch := make(chan dpReadResp, 1) + rctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + select { case <-ctx.Done(): return nil, ctx.Err() case <-dp.ctx.Done(): return nil, dp.ctx.Err() + case <-rctx.Done(): + return nil, rctx.Err() case dp.readch <- dpReadReq{ + ctx: rctx, op: dpReqCPU, resp: respch, }: @@ -108,12 +114,18 @@ func (dp *nodeDiscovery) queryCPU(ctx context.Context) (*cpu.Info, error) { func (dp *nodeDiscovery) queryGPU(ctx context.Context) (*gpu.Info, error) { respch := make(chan dpReadResp, 1) + rctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + select { case <-ctx.Done(): return nil, ctx.Err() case <-dp.ctx.Done(): return nil, dp.ctx.Err() + case <-rctx.Done(): + return nil, rctx.Err() case dp.readch <- dpReadReq{ + ctx: rctx, op: dpReqGPU, resp: respch, }: @@ -256,7 +268,7 @@ initloop: return dp.ctx.Err() case evt := <-watcher.ResultChan(): resp := evt.Object.(*corev1.Pod) - if resp.Status.Phase != corev1.PodPending { + if resp.Status.Phase == corev1.PodRunning { watcher.Stop() break initloop } @@ -290,7 +302,7 @@ initloop: Name(fmt.Sprintf("%s:%d", pod.Name, apiPort)). SubResource("proxy"). Suffix(res). - Do(dp.ctx) + Do(readreq.ctx) resp.err = result.Error() @@ -322,10 +334,10 @@ initloop: func (dp *nodeDiscovery) monitor() error { ctx := dp.ctx + log := fromctx.LogrFromCtx(ctx).WithName("node.monitor") bus := fromctx.PubSubFromCtx(ctx) kc := fromctx.KubeClientFromCtx(ctx) - log := fromctx.LogrFromCtx(ctx).WithName("node.monitor") log.Info("starting", "node", dp.name) @@ -734,11 +746,17 @@ func generateLabels(cfg Config, knode *corev1.Node, node v1.Node, sc storageClas } func (dp *nodeDiscovery) parseCPUInfo(ctx context.Context) v1.CPUInfoS { + log := fromctx.LogrFromCtx(ctx).WithName("node.monitor") + + log.Info("query cpu started") cpus, err := dp.queryCPU(ctx) if err != nil { + log.Error(err, "unable to query cpu") return v1.CPUInfoS{} } + log.Info("query cpu done") + res := make(v1.CPUInfoS, 0, len(cpus.Processors)) for _, c := range cpus.Processors { @@ -756,11 +774,17 @@ func (dp *nodeDiscovery) parseCPUInfo(ctx context.Context) v1.CPUInfoS { func (dp *nodeDiscovery) parseGPUInfo(ctx context.Context, info RegistryGPUVendors) v1.GPUInfoS { res := make(v1.GPUInfoS, 0) + log := fromctx.LogrFromCtx(ctx).WithName("node.monitor") + + log.Info("query gpu started") gpus, err := dp.queryGPU(ctx) if err != nil { + log.Error(err, "unable to query gpu") return res } + log.Info("query gpu done") + if gpus == nil { return res } diff --git a/operator/inventory/nodes.go b/operator/inventory/nodes.go index 7c83c2b2..e44e54da 100644 --- a/operator/inventory/nodes.go +++ b/operator/inventory/nodes.go @@ -76,12 +76,12 @@ func newClusterNodes(ctx context.Context, image, namespace string) *clusterNodes return fd } -func (fd *clusterNodes) Wait() error { - return fd.group.Wait() +func (cl *clusterNodes) Wait() error { + return cl.group.Wait() } -func (fd *clusterNodes) connector() error { - ctx := fd.ctx +func (cl *clusterNodes) connector() error { + ctx := cl.ctx bus := fromctx.PubSubFromCtx(ctx) log := fromctx.LogrFromCtx(ctx).WithName("nodes") @@ -102,7 +102,7 @@ func (fd *clusterNodes) connector() error { select { case <-ctx.Done(): return ctx.Err() - case name := <-fd.signaldone: + case name := <-cl.signaldone: if node, exists := nodes[name]; exists { delete(nodes, node.name) @@ -111,7 +111,7 @@ func (fd *clusterNodes) connector() error { log.Error(err, fmt.Sprintf("\"%s\" exited with error. attempting restart", name)) } } - nodes[name] = newNodeDiscovery(nctx, name, fd.namespace, fd.image, fd.signaldone) + nodes[name] = newNodeDiscovery(nctx, name, cl.namespace, cl.image, cl.signaldone) case rEvt := <-events: switch evt := rEvt.(type) { case watch.Event: @@ -119,7 +119,7 @@ func (fd *clusterNodes) connector() error { case *corev1.Node: switch evt.Type { case watch.Added: - nodes[obj.Name] = newNodeDiscovery(nctx, obj.Name, fd.namespace, fd.image, fd.signaldone) + nodes[obj.Name] = newNodeDiscovery(nctx, obj.Name, cl.namespace, cl.image, cl.signaldone) case watch.Deleted: if node, exists := nodes[obj.Name]; exists { _ = node.shutdown() @@ -132,7 +132,7 @@ func (fd *clusterNodes) connector() error { } } -func (fd *clusterNodes) run() error { +func (cl *clusterNodes) run() error { nodes := make(map[string]inventoryV1.Node) snapshot := func() inventoryV1.Nodes { @@ -147,14 +147,14 @@ func (fd *clusterNodes) run() error { return res } - bus := fromctx.PubSubFromCtx(fd.ctx) + bus := fromctx.PubSubFromCtx(cl.ctx) events := bus.Sub(topicInventoryNode) defer bus.Unsub(events) for { select { - case <-fd.ctx.Done(): - return fd.ctx.Err() + case <-cl.ctx.Done(): + return cl.ctx.Err() case revt := <-events: switch evt := revt.(type) { case nodeState: @@ -168,7 +168,7 @@ func (fd *clusterNodes) run() error { bus.Pub(snapshot(), []string{topicInventoryNodes}, pubsub.WithRetain()) default: } - case req := <-fd.reqch: + case req := <-cl.reqch: resp := respNodes{ res: snapshot(), } diff --git a/operator/inventory/types.go b/operator/inventory/types.go index d096ce0f..559db355 100644 --- a/operator/inventory/types.go +++ b/operator/inventory/types.go @@ -63,6 +63,7 @@ type dpReadResp struct { err error } type dpReadReq struct { + ctx context.Context op dpReqType resp chan<- dpReadResp } diff --git a/operator/psutil.go b/operator/psutil.go index 22a70273..faaa13d7 100644 --- a/operator/psutil.go +++ b/operator/psutil.go @@ -3,6 +3,7 @@ package operator import ( "context" "encoding/json" + "errors" "fmt" "net" "net/http" @@ -15,6 +16,7 @@ import ( "github.com/jaypipes/ghw/pkg/pci" "github.com/spf13/cobra" "github.com/spf13/viper" + "golang.org/x/sync/errgroup" ) const ( @@ -55,8 +57,8 @@ func cmdPsutilServe() *cobra.Command { SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { router := mux.NewRouter() - router.Methods(http.MethodGet).HandlerFunc(infoHandler) + router.HandleFunc("/", infoHandler).Methods(http.MethodGet) router.HandleFunc("/cpu", cpuInfoHandler).Methods(http.MethodGet) router.HandleFunc("/gpu", gpuHandler).Methods(http.MethodGet) router.HandleFunc("/memory", memoryHandler).Methods(http.MethodGet) @@ -64,16 +66,40 @@ func cmdPsutilServe() *cobra.Command { port := viper.GetUint16(flagAPIPort) + group, ctx := errgroup.WithContext(cmd.Context()) + + endpoint := fmt.Sprintf(":%d", port) + srv := &http.Server{ - Addr: fmt.Sprintf(":%d", port), + Addr: endpoint, Handler: router, BaseContext: func(_ net.Listener) context.Context { - return cmd.Context() + return ctx }, ReadHeaderTimeout: 5 * time.Second, } - return srv.ListenAndServe() + group.Go(func() error { + fmt.Printf("listening on %s\n", endpoint) + + return srv.ListenAndServe() + }) + + group.Go(func() error { + <-ctx.Done() + + fmt.Printf("received shutdown signal\n") + + _ = srv.Shutdown(context.Background()) + return ctx.Err() + }) + + err := group.Wait() + if !errors.Is(err, context.Canceled) && !errors.Is(err, http.ErrServerClosed) { + return err + } + + return nil }, } diff --git a/operator/waiter/waiter_test.go b/operator/waiter/waiter_test.go index fd3291a6..d7cf6384 100644 --- a/operator/waiter/waiter_test.go +++ b/operator/waiter/waiter_test.go @@ -2,11 +2,12 @@ package waiter import ( "context" - "github.com/akash-network/node/testutil" - "github.com/stretchr/testify/require" "io" "testing" "time" + + "github.com/akash-network/node/testutil" + "github.com/stretchr/testify/require" ) func TestWaiterNoInput(t *testing.T) { diff --git a/script/setup-kube.sh b/script/setup-kube.sh index 296f4b68..4116b969 100755 --- a/script/setup-kube.sh +++ b/script/setup-kube.sh @@ -14,6 +14,8 @@ rootdir="$(dirname "$0")/.." CRD_FILE=$rootdir/pkg/apis/akash.network/crd.yaml timeout=10 +retries=10 +retrywait=1 usage() { cat <&2 "timeout option must be positive integer" + exit 1 + fi + ;; + retries) + retries=$OPTARG + + if ! isuint "$retries" ; then + echo >&2 "retries option must be positive integer" + exit 1 + fi + ;; + retry-wait) + retrywait=$OPTARG + if ! isuint "$retrywait" ; then + echo >&2 "retry-wait option must be positive integer" + exit 1 + fi + ;; esac done shift "$((OPTIND - 1))" @@ -172,7 +196,10 @@ command_ssh() { init) shift - while read -r node; do + local nodes=("$1") + + # shellcheck disable=SC2048 + for node in ${nodes[*]}; do if ! ssh -n "$node" "test -e /etc/systemd/system/user@.service.d/delegate.conf"; then ssh -n "$node" 'sudo mkdir -p /etc/systemd/system/user@.service.d' ssh -n "$node" 'cat < /dev/null 2>&1 ; do sleep 0.1; done' - local retries=0 + local r=0 while ! grpcurl -plaintext localhost:8455 akash.inventory.v1.ClusterRPC.QueryCluster | jq '(.nodes | length > 0) and (.storage | length > 0)' --exit-status > /dev/null 2>&1; do - retries=$((retries+1)) - if [ ${retries} -eq "${timeout}" ]; then - exit 1 + r=$((r+1)) + if [ ${r} -eq "${retries}" ]; then + exit 0 fi - sleep 1 + + # shellcheck disable=SC2086 + sleep $retrywait done } @@ -346,9 +390,9 @@ kustomize) shift command_kustomize "$@" ;; -load-images) +upload) shift - command_load_images "$@" + command_upload "$@" ;; "wait") shift