diff --git a/extras/dogswatch/.dockerignore b/extras/dogswatch/.dockerignore index 48116e52425..1da2efa276f 100644 --- a/extras/dogswatch/.dockerignore +++ b/extras/dogswatch/.dockerignore @@ -1,5 +1,7 @@ .direnv/ +dev/ *.nix .envrc *.el *.tar* +Makefile diff --git a/extras/dogswatch/Dockerfile b/extras/dogswatch/Dockerfile index ce2973b7953..bc2b41312b7 100644 --- a/extras/dogswatch/Dockerfile +++ b/extras/dogswatch/Dockerfile @@ -1,9 +1,13 @@ + # syntax=docker/dockerfile:experimental FROM golang:1.13 as builder +ARG BUILD_LDFLAGS +ENV BUILD_LDFLAGS=$BUILD_LDFLAGS ENV GOPROXY=direct COPY ./ /go/src/github.com/amazonlinux/thar/dogswatch/ RUN cd /go/src/github.com/amazonlinux/thar/dogswatch && \ - CGO_ENABLED=0 GOOS=linux go build -o dogswatch . && mv dogswatch /dogswatch + CGO_ENABLED=0 GOOS=linux go build -mod=readonly ${BUILD_LDFLAGS:+-ldflags "$BUILD_LDFLAGS"} \ + -o dogswatch . && mv dogswatch /dogswatch FROM scratch COPY --from=builder /dogswatch /etc/ssl / diff --git a/extras/dogswatch/Makefile b/extras/dogswatch/Makefile index cee50a5b030..c0319207172 100644 --- a/extras/dogswatch/Makefile +++ b/extras/dogswatch/Makefile @@ -7,6 +7,7 @@ GOBIN = ./bin/ DOCKER_IMAGE := dogswatch DOCKER_IMAGE_REF_RELEASE := $(DOCKER_IMAGE):$(DOGSWATCH_VERSION) DOCKER_IMAGE_REF := $(DOCKER_IMAGE):$(shell git rev-parse --short=8 HEAD) +DEBUG_LDFLAGS := -X $(GOPKG)/pkg/logging.DebugEnable=true build: $(GOBIN) cd $(GOBIN) && \ @@ -17,10 +18,20 @@ $(GOBIN): mkdir -p $(GOBIN) test: - go test -ldflags '-X $(GOPKG)/pkg/logging.DebugEnable=true' $(GOPKGS) + go test -ldflags '$(DEBUG_LDFLAGS)' $(GOPKGS) + +container: + docker build --network=host \ + --tag $(DOCKER_IMAGE_REF)\ + --build-arg BUILD_LDFLAGS='' \ + . + +debug-container: + docker build --network=host \ + --tag $(DOCKER_IMAGE_REF)\ + --build-arg BUILD_LDFLAGS='$(DEBUG_LDFLAGS)' \ + . -container: vendor - docker build --network=host -t $(DOCKER_IMAGE_REF) . release-container: container docker tag $(DOCKER_IMAGE_REF) $(DOCKER_IMAGE_REF_RELEASE) @@ -28,10 +39,6 @@ release-container: container load: container kind load docker-image $(DOCKER_IMAGE) -vendor: go.sum go.mod - CGO_ENABLED=0 GOOS=linux go mod vendor - touch vendor/ - deploy: sed 's,@containerRef@,$(DOCKER_IMAGE_REF),g' ./dev/deployment.yaml \ | kubectl apply -f - @@ -51,4 +58,4 @@ dashboard: kubectl proxy get-nodes-status: - kubectl get nodes -o json | jq -C -S '.items| map({(.metadata.name): (.metadata.labels * .metadata.annotations)})' + kubectl get nodes -o json | jq -C -S '.items | map(.metadata|{(.name): (.annotations*.labels|to_entries|map(select(.key|startswith("thar")))|from_entries)}) | add' diff --git a/extras/dogswatch/README.md b/extras/dogswatch/README.md index 88a9ed43195..7ce632b3e29 100644 --- a/extras/dogswatch/README.md +++ b/extras/dogswatch/README.md @@ -1,14 +1,14 @@ # Dogswatch: Update Operator -Dogswatch is a [Kubernetes operator](https://Kubernetes.io/docs/concepts/extend-Kubernetes/operator/) that coordinates update activities on Thar hosts in a Kubernetes cluster. +Dogswatch is a [Kubernetes operator](https://Kubernetes.io/docs/concepts/extend-Kubernetes/operator/) that coordinates update activities on Thar hosts in a Kubernetes cluster. ## How to Run on Kubernetes -To run the Dogswatch Operator in your Kubernetes cluster, the following are required resources and configuration (examples given in the [./dev/deployment.yaml](./dev/deployment.yaml) template): +To run the Dogswatch Operator in a Kubernetes cluster, the following are required resources and configuration (examples given in the [./dev/deployment.yaml](./dev/deployment.yaml) template): - **`dogswatch` Container Image** - + Holding the Dogswatch binaries and its supporting environment. - **Controller Deployment** @@ -16,7 +16,7 @@ To run the Dogswatch Operator in your Kubernetes cluster, the following are requ Scheduling a stop-restart-tolerant Controller process on available Nodes. - **Agent DaemonSet** - + Scheduling Agent on Thar hosts - **Thar Namespace** @@ -28,67 +28,103 @@ To run the Dogswatch Operator in your Kubernetes cluster, the following are requ Configured for authenticating the Agent process on Kubernetes APIs. - **Cluster privileged credentials with read-write access to Nodes for Agent** - + Applied to Agent Service Account to update annotations on the Node resource that the Agent is running under. - + - **Service Account for the Controller** Configured for authenticating the Controller process on Kubernetes APIs. - + - **Cluster privileged credentials with access to Pods and Nodes for Controller** Applied to the Controller Service Account for manipulating annotations on Node resources as well as cordon & uncordoning for updates. The Controller also must be able to un-schedule (`delete`) Pods running on Nodes that will be updated. -In the [./dev/deployment.yaml example](./dev/deployment.yaml), the resource specifies the conditions that the Kubernetes Schedulers will place them in the Cluster. -These conditions include the Node being labeled as having the required level of support for the Operator to function on it: the `thar.amazonaws.com/platform-version` label. +Cluster administrators can deploy dogswatch with [suggested configuration defined here](./dogswatch.yaml) - this includes the above resources and Thar published container images. +The dogswatch deployment can be applied to a cluster by calling `kubectl apply -f ./dogswatch.yaml` with an appropriately configured `kubectl` client for the target cluster. + +Once resources are in place one last step is required to let the Kubernetes schedule place the required Pods. +The deployments control scheduling of the dogswatch pods by limiting Pods to appropriate Thar hosts using labels. +For now, these labels are not applied automatically at boot and will need to be set on each Node resource using a tool like `kubectl`. + +Each Node that is running Thar must be labeled with the Node's `platform-version` (a host compatibility indicator) in order to have `dogswatch` Pods scheduled on them, the label `thar.amazonaws.com/platform-version` is used for this: + +``` text +thar.amazonaws.com/platform-version=1.0.0 +``` + +`kubectl` may be used to set this label on a Node: + +``` sh +: kubectl label node $NODE_NAME thar.amazonaws.com/platform-version=1.0.0 +``` + +If all Nodes in the cluster are running Thar, they can all be labeled at the same time with a single command: + +``` sh +: kubectl label node $(kubectl get nodes -o jsonpath='{.items[*].metadata.name}') thar.amazonaws.com/platform-version=1.0.0 +``` + +In the [development example deployment](./dev/deployment.yaml) the resources specify conditions that the Kubernetes Schedulers uses to place Pods in the Cluster. +These conditions, among others, include a constraint on each Node being labeled as having support for the Operator to function on it: the `thar.amazonaws.com/platform-version` label. With this label present and the workloads scheduled, the Agent and Controller process will coordinate an update as soon as the Agent annotates its Node (by default only one update will happen at a time). -To use the example [./dev/deployment.yaml](./dev/deployment.yaml) as a base, you must modify the resources to use the appropriate container image that is available to your kubelets (a common image is forthcoming, see #505). -Then with a appropriately configured deployment yaml, you may call `kubelet apply -f ./my-deployment.yaml` to prepare the above resources and schedule the Dogswatch Pods in your Cluster. +To use the [suggested deployment](./dogswatch.yaml) or [development deployment](./dev/deployment.yaml) as a base, any customized resources must be updated to use a customized container image to run. +Then with the configured deployment, use `kubelet apply -f $UPDATED_DEPLOYMENT.yaml` to prepare the above resources and schedule the Dogswatch Pods in a Cluster. ## What Makes Up Dogswatch Dogswatch is made up of two distinct processes, one of which runs on each host. - `dogswatch -controller` - + The coordinating process responsible for the handling update of Thar nodes cooperatively with the cluster's workloads. - + - `dogswatch -agent` - + The on-host process responsible for publishing update metadata and executing update activities. - + ## How It Coordinates -The Dogswatch processes communicate by applying updates to the Kubernetes Node resources' Annotations. +The Dogswatch processes communicate by applying updates to the Kubernetes Node resources' Annotations. The Annotations are used to communicate the Agent activity (called an `intent`) as determined by the Controller process, the current Agent activity in response to the intent, and the Host's update status as known by the Agent process. The Agent and Controller processes listen to an event stream from the Kubernetes cluster in order to quickly and reliably handle communicated `intent` in addition to updated metadata pertinent to updates and the Operator itself. +### Observing Progress and State + +Dogwatch's operation can be simply observed by inspecting the labels and annotations on the Node resource. +The state and pending activity are posted as progress is made. + +``` sh +# With a configured kubectl and jq available on $PATH +kubectl get nodes -o json \ + | jq -C -S '.items | map(.metadata|{(.name): (.annotations*.labels|to_entries|map(select(.key|startswith("thar")))|from_entries)}) | add' +``` + ### Current Limitations - Pod replication & healthy count is not taken into consideration (#502) -- Nodes update without pause between each (#503) +- Nodes update without pause between each Node (#503) - Single Node cluster degrades into unscheduleable on update (#501) - Node labels are not automatically applied to allow scheduling (#504) ## How to Contribute and Develop Changes for Dogswatch -Working on Dogswatch requires a fully functioning Kubernetes cluster. -For the sake of development workflow, you may easily run this within a container or VM as with [`kind`](https://github.com/Kubernetes-sigs/kind) or [`minikube`](https://github.com/Kubernetes/minikube). +Working on Dogswatch requires a fully configured, working Kubernetes cluster. +For the sake of development workflow, we suggest using a cluster that is containerized or virtualized - tools to manage these are available: [`kind`](https://github.com/Kubernetes-sigs/kind) (containerized) and [`minikube`](https://github.com/Kubernetes/minikube) (virtualized). The `dev/` directory contains several resources that may be used for development and debugging purposes: - `dashboard.yaml` - A **development environment** set of Kubernetes resources (these use insecure settings and *are not suitable for use in Production*!) - `deployment.yaml` - A _template_ for Kubernetes resources for Dogswatch that schedule a controller and setup a DaemonSet - `kind-cluster.yml` - A `kind` Cluster definition that may be used to stand up a local development cluster -Much of the development workflow can be accommodated by the `Makefile` providedalongside the code. -Each of these targets utilize your existing environment and tools - for example: your `kubectl` as configured will be used. -If you have locally configured access to production, please ensure you've taken steps to reconfigure or otherwise cause `kubectl` to affect only your development cluster. +Much of the development workflow can be accommodated by the `Makefile` provided alongside the code. +Each of these targets utilize tools and environments they're configured to access - for example: `kubectl`, as configured on a host, will be used. +If `kubectl` is configured to configured with access to production, please ensure take steps to reconfigure `kubectl` to affect only a development cluster. **General use targets** diff --git a/extras/dogswatch/dev/deployment.yaml b/extras/dogswatch/dev/deployment.yaml index 053be4fd912..d9340483aaf 100644 --- a/extras/dogswatch/dev/deployment.yaml +++ b/extras/dogswatch/dev/deployment.yaml @@ -153,6 +153,8 @@ spec: - name: dogswatch image: "@containerRef@" imagePullPolicy: Always + # XXX: tty required to exec binaries that use `simplelog` until #576 is resolved. + tty: true args: - -agent - -debug diff --git a/extras/dogswatch/dogswatch.yaml b/extras/dogswatch/dogswatch.yaml new file mode 100644 index 00000000000..7d2822b8776 --- /dev/null +++ b/extras/dogswatch/dogswatch.yaml @@ -0,0 +1,185 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: thar +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: thar-dogswatch-controller +rules: + - apiGroups: [""] + resources: ["nodes"] + verbs: ["get", "list", "watch", "update", "patch"] + # Allow the controller to remove Pods running on the Nodes that are updating. + - apiGroups: [""] + resources: ["pods"] + verbs: ["get", "list", "delete"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: thar-dogswatch-controller +subjects: + - kind: ServiceAccount + name: dogswatch-controller + namespace: thar +roleRef: + kind: ClusterRole + name: thar-dogswatch-controller + apiGroup: rbac.authorization.k8s.io +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: thar-dogswatch-agent +rules: + - apiGroups: [""] + resources: ["nodes"] + verbs: ["get", "list", "watch", "update", "patch"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: thar-dogswatch-agent +subjects: + - kind: ServiceAccount + name: dogswatch-agent + namespace: thar +roleRef: + kind: ClusterRole + name: thar-dogswatch-agent + apiGroup: rbac.authorization.k8s.io +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: dogswatch-controller + namespace: thar + annotations: + kubernetes.io/service-account.name: dogswatch-controller +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: dogswatch-agent + namespace: thar + annotations: + kubernetes.io/service-account.name: dogswatch-controller +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: dogswatch-controller + namespace: thar + labels: + name: dogswatch-controller + app: dogswatch-controller +spec: + replicas: 1 + strategy: + rollingUpdate: + maxUnavailable: 100% + selector: + matchLabels: + app: dogswatch-controller + template: + metadata: + namespace: thar + labels: + name: dogswatch-controller + app: dogswatch-controller + spec: + serviceAccountName: dogswatch-controller + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: thar.amazonaws.com/platform-version + operator: Exists + podAntiAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 10 + podAffinityTerm: + topologyKey: thar.amazonaws.com/platform-version + labelSelector: + matchExpressions: + - key: app + operator: In + values: ["dogswatch-agent"] + containers: + - name: dogswatch + image: "328549459982.dkr.ecr.us-west-2.amazonaws.com/dogswatch:v0.1.0" + imagePullPolicy: Always + args: + - -controller + - -debug + - -nodeName + - $(NODE_NAME) + env: + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName +--- +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: dogswatch-agent + namespace: thar + labels: + dogswatch: agent +spec: + selector: + matchLabels: + name: dogswatch-agent + template: + metadata: + labels: + name: dogswatch-agent + app: dogswatch-agent + spec: + serviceAccountName: dogswatch-agent + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: thar.amazonaws.com/platform-version + operator: Exists + hostPID: true + containers: + - name: dogswatch + image: "328549459982.dkr.ecr.us-west-2.amazonaws.com/dogswatch:v0.1.0" + imagePullPolicy: Always + # XXX: tty required to exec binaries that use `simplelog` until #576 is resolved. + tty: true + args: + - -agent + - -debug + - -nodeName + - $(NODE_NAME) + env: + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + securityContext: + # Required for executing OS update operations. + privileged: true + resources: + limits: + memory: 600Mi + requests: + cpu: 100m + memory: 600Mi + volumeMounts: + - name: rootfs + mountPath: /.thar/rootfs + volumes: + - name: rootfs + hostPath: + path: / + type: Directory + diff --git a/extras/dogswatch/go.mod b/extras/dogswatch/go.mod index 4536458f191..222d3178e6c 100644 --- a/extras/dogswatch/go.mod +++ b/extras/dogswatch/go.mod @@ -3,15 +3,21 @@ module github.com/amazonlinux/thar/dogswatch go 1.12 require ( + github.com/coreos/go-systemd/v22 v22.0.0 + github.com/godbus/dbus/v5 v5.0.3 github.com/google/go-cmp v0.3.1 // indirect github.com/googleapis/gnostic v0.3.1 // indirect github.com/imdario/mergo v0.3.7 // indirect + github.com/karlseguin/ccache v2.0.3+incompatible + github.com/karlseguin/expect v1.0.1 // indirect github.com/pkg/errors v0.8.1 github.com/sirupsen/logrus v1.4.2 + github.com/wsxiaoys/terminal v0.0.0-20160513160801-0940f3fc43a0 // indirect golang.org/x/crypto v0.0.0-20190829043050-9756ffdc2472 // indirect golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297 // indirect golang.org/x/sync v0.0.0-20190423024810-112230192c58 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect + gopkg.in/karlseguin/expect.v1 v1.0.1 // indirect gotest.tools v2.2.0+incompatible k8s.io/api v0.0.0-20190905160310-fb749d2f1064 k8s.io/apimachinery v0.0.0-20190831074630-461753078381 diff --git a/extras/dogswatch/go.sum b/extras/dogswatch/go.sum index f8eb758417f..73509ad39f2 100644 --- a/extras/dogswatch/go.sum +++ b/extras/dogswatch/go.sum @@ -25,6 +25,8 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/coreos/go-systemd/v22 v22.0.0 h1:XJIw/+VlJ+87J+doOxznsAWIdmWuViOVhkQamW5YV28= +github.com/coreos/go-systemd/v22 v22.0.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/davecgh/go-spew v0.0.0-20151105211317-5215b55f46b2/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -56,6 +58,8 @@ github.com/go-openapi/spec v0.0.0-20160808142527-6aced65f8501/go.mod h1:J8+jY1nA github.com/go-openapi/spec v0.19.2/go.mod h1:sCxk3jxKgioEJikev4fgkNmwS+3kuYdJtcsZsD5zxMY= github.com/go-openapi/swag v0.0.0-20160704191624-1d0bd113de87/go.mod h1:DXUve3Dpr1UfpPtxFw+EFuQ41HhCWZfha5jSVRG7C7I= github.com/go-openapi/swag v0.19.2/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= +github.com/godbus/dbus/v5 v5.0.3 h1:ZqHaoEF7TBzh4jzPmqVhE/5A1z9of6orkAe5uHoAeME= +github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d h1:3PaI8p3seN09VjbTYC/QWlUZdZ1qS1zGjy7LH2Wt07I= github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -103,6 +107,10 @@ github.com/json-iterator/go v0.0.0-20180612202835-f2b4162afba3/go.mod h1:+SdeFBv github.com/json-iterator/go v1.1.7 h1:KfgG9LzI+pYjr4xvmz/5H4FXjokeP+rlHLhv3iH62Fo= github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= +github.com/karlseguin/ccache v2.0.3+incompatible h1:j68C9tWOROiOLWTS/kCGg9IcJG+ACqn5+0+t8Oh83UU= +github.com/karlseguin/ccache v2.0.3+incompatible/go.mod h1:CM9tNPzT6EdRh14+jiW8mEF9mkNZuuE51qmgGYUB93w= +github.com/karlseguin/expect v1.0.1 h1:z4wy4npwwHSWKjGWH85WNJO42VQhovxTCZDSzhjo8hY= +github.com/karlseguin/expect v1.0.1/go.mod h1:zNBxMY8P21owkeogJELCLeHIt+voOSduHYTFUbwRAV8= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= @@ -172,6 +180,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= +github.com/wsxiaoys/terminal v0.0.0-20160513160801-0940f3fc43a0 h1:3UeQBvD0TFrlVjOeLOBz+CPAI8dnbqNSVwUwRrkp7vQ= +github.com/wsxiaoys/terminal v0.0.0-20160513160801-0940f3fc43a0/go.mod h1:IXCdmsXIht47RaVFLEdVnh1t+pgYtTAhQGj73kz+2DM= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -260,6 +270,8 @@ gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/inf.v0 v0.9.0 h1:3zYtXIO92bvsdS3ggAdA8Gb4Azj0YU+TVY1uGYNFA8o= gopkg.in/inf.v0 v0.9.0/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/karlseguin/expect.v1 v1.0.1 h1:9u0iUltnhFbJTHaSIH0EP+cuTU5rafIgmcsEsg2JQFw= +gopkg.in/karlseguin/expect.v1 v1.0.1/go.mod h1:uB7QIJBcclvYbwlUDkSCsGjAOMis3fP280LyhuDEf2I= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/extras/dogswatch/main.go b/extras/dogswatch/main.go index 50318635617..b40390b7460 100644 --- a/extras/dogswatch/main.go +++ b/extras/dogswatch/main.go @@ -5,6 +5,7 @@ import ( "flag" "os" "syscall" + "time" "github.com/amazonlinux/thar/dogswatch/pkg/agent" "github.com/amazonlinux/thar/dogswatch/pkg/controller" @@ -12,15 +13,17 @@ import ( "github.com/amazonlinux/thar/dogswatch/pkg/logging" "github.com/amazonlinux/thar/dogswatch/pkg/platform/updog" "github.com/amazonlinux/thar/dogswatch/pkg/sigcontext" + "github.com/amazonlinux/thar/dogswatch/pkg/thar" "github.com/pkg/errors" "k8s.io/client-go/kubernetes" ) var ( - flagAgent = flag.Bool("agent", false, "Run agent component") - flagController = flag.Bool("controller", false, "Run controller component") - flagLogDebug = flag.Bool("debug", false, "") - flagNodeName = flag.String("nodeName", "", "nodeName of the Node that this process is running on") + flagAgent = flag.Bool("agent", false, "Run agent component") + flagController = flag.Bool("controller", false, "Run controller component") + flagSkipMitigation = flag.Bool("skip-mitigation", false, "Skip applying mitigations") + flagLogDebug = flag.Bool("debug", false, "") + flagNodeName = flag.String("nodeName", "", "nodeName of the Node that this process is running on") ) func main() { @@ -32,6 +35,18 @@ func main() { log := logging.New("main") + // "debuggable" builds at runtime produce extensive logging output compared + // to release builds with the debug flag enabled. This requires building and + // using a distinct build in the deployment in order to use. + if logging.Debuggable { + log.Info("low-level logging.Debuggable is enabled in this build") + log.Warn("logging.Debuggable produces large volumes of logs") + delay := 3 * time.Second + log.WithField("delay", delay).Warn("delaying start due to logging.Debuggable build") + time.Sleep(delay) + log.Info("starting logging.Debuggable enabled build") + } + kube, err := k8sutil.DefaultKubernetesClient() if err != nil { log.WithError(err).Fatalf("kubernetes client") @@ -48,7 +63,7 @@ func main() { log.Error("cannot run both agent and controller") os.Exit(1) case (!*flagController && !*flagAgent): - log.Error("no component specified to run, provide one") + log.Error("no component specified to run, provide either -agent or -controller") flag.Usage() os.Exit(1) case *flagController: @@ -57,6 +72,13 @@ func main() { log.WithError(err).Fatalf("controller stopped") } case *flagAgent: + if !*flagSkipMitigation { + log.Info("checking for necessary mitigations") + err := thar.ApplyMitigations() + if err != nil { + log.WithError(err).Fatalf("unable to perform mitigations") + } + } err = runAgent(ctx, kube, *flagNodeName) if err != nil { log.WithError(err).Fatalf("agent stopped") diff --git a/extras/dogswatch/pkg/agent/agent.go b/extras/dogswatch/pkg/agent/agent.go index 400c0fe665b..141f423a5d7 100644 --- a/extras/dogswatch/pkg/agent/agent.go +++ b/extras/dogswatch/pkg/agent/agent.go @@ -2,18 +2,21 @@ package agent import ( "context" - "fmt" "os" "time" "github.com/amazonlinux/thar/dogswatch/pkg/intent" + "github.com/amazonlinux/thar/dogswatch/pkg/intent/cache" + "github.com/amazonlinux/thar/dogswatch/pkg/internal/logfields" "github.com/amazonlinux/thar/dogswatch/pkg/k8sutil" "github.com/amazonlinux/thar/dogswatch/pkg/logging" "github.com/amazonlinux/thar/dogswatch/pkg/marker" "github.com/amazonlinux/thar/dogswatch/pkg/nodestream" "github.com/amazonlinux/thar/dogswatch/pkg/platform" "github.com/amazonlinux/thar/dogswatch/pkg/workgroup" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" v1meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -21,7 +24,7 @@ import ( ) const ( - initialPollDelay = time.Minute * 1 + initialPollDelay = updatePollInterval / 2 updatePollInterval = time.Minute * 30 ) @@ -29,6 +32,13 @@ var ( errInvalidProgress = errors.New("intended to make invalid progress") ) +// Agent is a privileged on-host process that acts on communicated Intents from +// the controller. Its event loop hinges off of a Kubernetes Informer which +// feeds it metadata and Intent data. +// +// The Agent only acts as directed, its logic covers safety checks and related +// on-host responsibilities. Larger coordination and gating is handled by the +// controller. type Agent struct { log logging.Logger kube kubernetes.Interface @@ -38,13 +48,21 @@ type Agent struct { poster poster proc proc + lastCache cache.LastCache + tracker *postTracker + progress progression } +// poster implements the logic for updating, or posting, a provided Intent for +// the appropriate resource. type poster interface { Post(*intent.Intent) error } +// proc interposes the self-terminate kill signaling allowing for an Agent to +// terminate itself from the outside. Signals are trapped and handled elsewhere +// within the application. type proc interface { KillProcess() error } @@ -58,12 +76,14 @@ func New(log logging.Logger, kube kubernetes.Interface, plat platform.Platform, nodeclient = kube.CoreV1().Nodes() } return &Agent{ - log: log, - kube: kube, - platform: plat, - poster: &k8sPoster{log, nodeclient}, - proc: &osProc{}, - nodeName: nodeName, + log: log, + kube: kube, + platform: plat, + poster: &k8sPoster{log, nodeclient}, + proc: &osProc{}, + nodeName: nodeName, + lastCache: cache.NewLastCache(), + tracker: newPostTracker(), }, nil } @@ -91,60 +111,99 @@ func (a *Agent) Run(ctx context.Context) error { NodeName: a.nodeName, }, a.handler()) - group.Work(ns.Run) - group.Work(a.periodicUpdateChecker) - err := a.checkNodePreflight() if err != nil { return err } - select { - case <-ctx.Done(): - a.log.Info("waiting on workers to finish") - return group.Wait() - } + group.Work(ns.Run) + group.Work(a.periodicUpdateChecker) + + <-ctx.Done() + a.log.Info("waiting on workers to finish") + return group.Wait() } +// periodicUpdateChecker regularly checks for available updates and posts this +// status on the Node resource. func (a *Agent) periodicUpdateChecker(ctx context.Context) error { timer := time.NewTimer(initialPollDelay) defer timer.Stop() + log := a.log.WithField("worker", "update-checker") + for { select { case <-ctx.Done(): + log.Debug("finished") return nil case <-timer.C: - // TODO: update this when we have richer data plumbed - _, err := a.platform.ListAvailable() - avail := err == nil - a.setUpdateAvailable(avail) + log.Info("checking for update") + err := a.checkPostUpdate(a.log) + if err != nil { + log.WithError(err).Error("periodic check error") + } } timer.Reset(updatePollInterval) } } -func (a *Agent) setUpdateAvailable(available bool) error { +// checkUpdate queries for an available update from the host. +func (a *Agent) checkUpdate(log logging.Logger) (bool, error) { + available, err := a.platform.ListAvailable() + if err != nil { + log.WithError(err).Error("unable to query available updates") + return false, err + } + hasUpdate := len(available.Updates()) > 0 + log = log.WithField("update-available", hasUpdate) + if hasUpdate { + log.Info("an update is available") + } + return hasUpdate, nil +} + +// checkPostUpdate checks for and posts the status of an available update. +func (a *Agent) checkPostUpdate(log logging.Logger) error { + hasUpdate, err := a.checkUpdate(log) + if err != nil { + return err + } + log = log.WithField("update-available", hasUpdate) + log.Debug("posting update status") + err = a.postUpdateAvailable(hasUpdate) + if err != nil { + log.WithError(err).Error("could not post update status") + return err + } + log.Debug("posted update status") + return nil +} + +// postUpdateAvailable posts the available update status to the Kubernetes Node +// resource. +func (a *Agent) postUpdateAvailable(available bool) error { // TODO: handle brief race condition internally - this needs to be improved, // though the kubernetes control plane will reject out of order updates by // way of resource versioning C-A-S operations. + if a.kube == nil { + return errors.New("kubernetes client is required to fetch node resource") + } node, err := a.kube.CoreV1().Nodes().Get(a.nodeName, v1meta.GetOptions{}) if err != nil { return errors.WithMessage(err, "unable to get node") } - in := intent.Given(node) - switch available { - case true: - in.UpdateAvailable = marker.NodeUpdateAvailable - case false: - in.UpdateAvailable = marker.NodeUpdateUnavailable - } - return a.poster.Post(in) + in := intent.Given(node).SetUpdateAvailable(available) + return a.postIntent(in) } +// handler is the entrypoint for the Kubernetes Informer to schedule handling of +// events for the Node to act on. func (a *Agent) handler() nodestream.Handler { return &nodestream.HandlerFuncs{ - OnAddFunc: a.handleEvent, + OnAddFunc: func(n *v1.Node) { + a.handleEvent(n) + }, // we don't mind the diff between old and new, so handle the new // resource. OnUpdateFunc: func(_, n *v1.Node) { @@ -156,18 +215,48 @@ func (a *Agent) handler() nodestream.Handler { } } -func (a *Agent) handleEvent(node *v1.Node) { +// handleEvent handles a coalesced Node resource received from a nodestream +// callback. +func (a *Agent) handleEvent(node intent.Input) { in := intent.Given(node) + + log := a.log.WithFields(logfields.Intent(in)) + + if a.skipIntentEvent(in) { + return + } + if activeIntent(in) { - a.log.Debug("active intent received") + a.lastCache.Record(in) + log.Debug("active intent received") if err := a.realize(in); err != nil { - a.log.WithError(err).Error("could not handle intent") + log.WithError(err).Error("unable to realize intent") } return } - a.log.Debug("inactive intent received") + log.Debug("inactive intent received") } +func (a *Agent) skipIntentEvent(in *intent.Intent) bool { + log := a.log.WithFields(logfields.Intent(in)) + if a.tracker.matchesPost(in) { + log.Debug("skipping emitted intent as event") + return true + } + if intent.Equivalent(a.lastCache.Last(in), in) { + log.Debug("skipping duplicate received event") + return true + } + if logging.Debuggable { + log.Debug("clearing tracked posted intents") + } + a.tracker.clear() + + return false +} + +// activeIntent filters an intent as an active intent which must be handled by +// the Agent. func activeIntent(i *intent.Intent) bool { wanted := i.InProgress() && !i.DegradedPath() empty := i.Wanted == "" || i.Active == "" || i.State == "" @@ -175,8 +264,14 @@ func activeIntent(i *intent.Intent) bool { return wanted && !empty && !unknown } +// realize acts on an Intent to achieve, or realize, the Intent's intent. func (a *Agent) realize(in *intent.Intent) error { - a.log.WithField("intent", fmt.Sprintf("%#v", in)).Debug("realizing intent") + log := a.log.WithFields(logrus.Fields{ + "worker": "handler", + "intent": in.DisplayString(), + }) + + log.Debug("handling intent") var err error @@ -185,7 +280,8 @@ func (a *Agent) realize(in *intent.Intent) error { // ACK the wanted action. in.Active = in.Wanted in.State = marker.NodeStateBusy - if err = a.poster.Post(in); err != nil { + err = a.postIntent(in) + if err != nil { return err } @@ -205,7 +301,7 @@ func (a *Agent) realize(in *intent.Intent) error { break } a.progress.SetTarget(ups.Updates()[0]) - a.log.Debug("preparing update") + log.Debug("preparing update") err = a.platform.Prepare(a.progress.GetTarget()) case marker.NodeActionPerformUpdate: @@ -213,27 +309,34 @@ func (a *Agent) realize(in *intent.Intent) error { err = errInvalidProgress break } - a.log.Debug("updating") + log.Debug("updating") err = a.platform.Update(a.progress.GetTarget()) case marker.NodeActionUnknown, marker.NodeActionStabilize: - a.log.Debug("sitrep") + log.Debug("sitrep") _, err = a.platform.Status() + if err != nil { + break + } + hasUpdate, err := a.checkUpdate(log) + if err != nil { + log.WithError(err).Error("sitrep update check errored") + } + in.SetUpdateAvailable(hasUpdate) case marker.NodeActionRebootUpdate: if !a.progress.Valid() { err = errInvalidProgress break } - a.log.Debug("rebooting") - a.log.Info("Rebooting Node to complete update") + log.Debug("rebooting") + log.Info("Rebooting Node to complete update") // TODO: ensure Node is setup to be validated on boot (ie: kubelet will // run agent again before we let other Pods get scheduled) err = a.platform.BootUpdate(a.progress.GetTarget(), true) // Shortcircuit to terminate. // TODO: actually handle shutdown. - // die("goodbye"); if err == nil { if a.proc != nil { defer a.proc.KillProcess() @@ -243,18 +346,31 @@ func (a *Agent) realize(in *intent.Intent) error { } if err != nil { - a.log.WithError(err).Error("could not realize intent") + log.WithError(err).Error("could not realize intent") in.State = marker.NodeStateError } else { - a.log.Debug("realized intent") + log.Debug("realized intent") in.State = marker.NodeStateReady } - a.poster.Post(in) + postErr := a.postIntent(in) + if postErr != nil { + log.WithError(postErr).Error("could not update intent") + } return err } +func (a *Agent) postIntent(in *intent.Intent) error { + err := a.poster.Post(in) + if err != nil { + a.tracker.recordPost(in) + } + return err +} + +// checkNodePreflight runs checks against the current Node resource and prepares +// it for use by the Agent and Controller. func (a *Agent) checkNodePreflight() error { // TODO: Run a check of the Node Resource and reset appropriately @@ -282,26 +398,44 @@ func (a *Agent) checkNodePreflight() error { // there's not a good way to re-prime ourselves in the prior state. in = in.Reset() } - a.poster.Post(in) + + postErr := a.postIntent(in) + if postErr != nil { + a.log.WithError(postErr).Error("could not update intent status") + return postErr + } return nil } +// osProc encapsulates host interactions in order to kill the current process. type osProc struct{} +// KillProcess kills the current process. func (*osProc) KillProcess() error { p, _ := os.FindProcess(os.Getpid()) go p.Kill() return nil } +// k8sPoster captures the functionality of the posting of a Node resource +// modification - in the form of an Intent. type k8sPoster struct { log logging.Logger nodeclient corev1.NodeInterface } +// Post writes out the Intent to the Kubernetes Node resource. func (k *k8sPoster) Post(i *intent.Intent) error { nodeName := i.GetName() - defer k.log.WithField("node", nodeName).Debugf("posted intent %s", i.DisplayString()) - return k8sutil.PostMetadata(k.nodeclient, nodeName, i) + log := k.log.WithFields(logrus.Fields{ + "node": nodeName, + "intent": i.DisplayString(), + }) + err := k8sutil.PostMetadata(k.nodeclient, nodeName, i) + if err != nil { + return err + } + log.Debugf("posted intent") + return nil } diff --git a/extras/dogswatch/pkg/agent/agent_test.go b/extras/dogswatch/pkg/agent/agent_test.go index 2c2009c1b89..60a747143cf 100644 --- a/extras/dogswatch/pkg/agent/agent_test.go +++ b/extras/dogswatch/pkg/agent/agent_test.go @@ -53,6 +53,10 @@ func TestActiveIntent(t *testing.T) { Active: marker.NodeActionPerformUpdate, State: marker.NodeStateUnknown, }, + + *intents.Stabilized(intents.WithUpdateAvailable("")), + *intents.Stabilized(intents.WithUpdateAvailable(marker.NodeUpdateUnavailable)), + *intents.Stabilized(intents.WithUpdateAvailable(marker.NodeUpdateUnknown)), } for _, in := range active { diff --git a/extras/dogswatch/pkg/agent/post_tracker.go b/extras/dogswatch/pkg/agent/post_tracker.go new file mode 100644 index 00000000000..d0c73650d65 --- /dev/null +++ b/extras/dogswatch/pkg/agent/post_tracker.go @@ -0,0 +1,50 @@ +package agent + +import ( + "container/list" + "sync" + + "github.com/amazonlinux/thar/dogswatch/pkg/intent" +) + +// postTracker records posted Intents to compare to inbound Intents. +type postTracker struct { + mu *sync.RWMutex + list *list.List +} + +func newPostTracker() *postTracker { + return &postTracker{ + mu: &sync.RWMutex{}, + list: list.New()} +} + +// recordPost retains a record of the posted Intent. +func (p *postTracker) recordPost(in *intent.Intent) { + p.mu.Lock() + p.list.PushBack(in.Clone()) + p.mu.Unlock() +} + +// matchesPost checks for the presence of a matching tracked posted Intent. +func (p *postTracker) matchesPost(in *intent.Intent) bool { + p.mu.RLock() + defer p.mu.RUnlock() + if p.list.Len() == 0 { + return false + } + for elm := p.list.Front(); elm != nil; elm = elm.Next() { + if intent.Equivalent(elm.Value.(*intent.Intent), in) { + return true + } + } + + return false +} + +// clear removes all tracked posted Intents. +func (p *postTracker) clear() { + p.mu.Lock() + p.list.Init() + p.mu.Unlock() +} diff --git a/extras/dogswatch/pkg/controller/controller.go b/extras/dogswatch/pkg/controller/controller.go index 5242b4d3666..3a28a5091eb 100644 --- a/extras/dogswatch/pkg/controller/controller.go +++ b/extras/dogswatch/pkg/controller/controller.go @@ -9,12 +9,16 @@ import ( "k8s.io/client-go/kubernetes" ) +// Controller is the Dogswatch component that runs coordination for the Thar +// upgrade processes across many hosts, running the Dogswatch Agent, in a +// cluster. type Controller struct { log logging.Logger kube kubernetes.Interface - manager *ActionManager + manager *actionManager } +// New creates a Dogswatch Controller instance. func New(log logging.Logger, kube kubernetes.Interface, nodeName string) (*Controller, error) { return &Controller{ log: log, @@ -23,6 +27,7 @@ func New(log logging.Logger, kube kubernetes.Interface, nodeName string) (*Contr }, nil } +// Run executes the event loop for the Controller until signaled to exit. func (c *Controller) Run(ctx context.Context) error { worker, cancel := context.WithCancel(ctx) defer cancel() diff --git a/extras/dogswatch/pkg/controller/manager.go b/extras/dogswatch/pkg/controller/manager.go index 8d1366f4027..aefdcb63655 100644 --- a/extras/dogswatch/pkg/controller/manager.go +++ b/extras/dogswatch/pkg/controller/manager.go @@ -2,32 +2,48 @@ package controller import ( "context" + "fmt" + "math/rand" "github.com/amazonlinux/thar/dogswatch/pkg/intent" + intentcache "github.com/amazonlinux/thar/dogswatch/pkg/intent/cache" + "github.com/amazonlinux/thar/dogswatch/pkg/internal/logfields" "github.com/amazonlinux/thar/dogswatch/pkg/logging" + "github.com/amazonlinux/thar/dogswatch/pkg/marker" "github.com/amazonlinux/thar/dogswatch/pkg/nodestream" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" ) -const maxQueuedIntents = 10 +const ( + // maxQueuedIntents controls the number of queued Intents that are waiting + // to be handled. + maxQueuedIntents = 100 + maxQueuedInputs = maxQueuedIntents * (1 / 4) + queueSkipThreshold = maxQueuedIntents / 2 +) + +var _ nodestream.Handler = (*actionManager)(nil) -var _ nodestream.Handler = (*ActionManager)(nil) +var randDropIntFunc func(int) int = rand.Intn -// ActionManager handles node changes according to policy and runs a node update +// actionManager handles node changes according to policy and runs a node update // flow to completion as allowed by policy. -type ActionManager struct { - log logging.Logger - kube kubernetes.Interface - policy Policy - input chan *intent.Intent - storer storer - poster poster - nodeName string - nodem nodeManager +type actionManager struct { + log logging.Logger + kube kubernetes.Interface + policy Policy + inputs chan *intent.Intent + storer storer + poster poster + nodeName string + nodem nodeManager + lastCache intentcache.LastCache } // poster is the implementation of the intent poster that publishes the provided @@ -48,27 +64,28 @@ type storer interface { GetStore() cache.Store } -func newManager(log logging.Logger, kube kubernetes.Interface, nodeName string) *ActionManager { +func newManager(log logging.Logger, kube kubernetes.Interface, nodeName string) *actionManager { var nodeclient corev1.NodeInterface if kube != nil { nodeclient = kube.CoreV1().Nodes() } - return &ActionManager{ - log: log, - kube: kube, - policy: &defaultPolicy{}, - input: make(chan *intent.Intent, 1), - poster: &k8sPoster{log, nodeclient}, - nodem: &k8sNodeManager{kube}, + return &actionManager{ + log: log, + kube: kube, + policy: &defaultPolicy{log: log.WithField(logging.SubComponentField, "policy-check")}, + inputs: make(chan *intent.Intent, maxQueuedInputs), + poster: &k8sPoster{log, nodeclient}, + nodem: &k8sNodeManager{kube}, + lastCache: intentcache.NewLastCache(), } } -func (am *ActionManager) Run(ctx context.Context) error { +func (am *actionManager) Run(ctx context.Context) error { am.log.Debug("starting") defer am.log.Debug("finished") - permit := make(chan *intent.Intent, maxQueuedIntents) + queuedIntents := make(chan *intent.Intent, maxQueuedIntents) // TODO: split out accepted intent handler - it should handle its // prioritization as needed to ensure that active nodes' events reach it. @@ -79,47 +96,92 @@ func (am *ActionManager) Run(ctx context.Context) error { case <-ctx.Done(): return nil - case pin, ok := <-permit: + case qin, ok := <-queuedIntents: + log := am.log.WithFields(logfields.Intent(qin)) + log.Debug("checking with policy") + // TODO: make policy checking and consideration richer + pview, err := am.makePolicyCheck(qin) + if err != nil { + log.WithError(err).Error("policy unenforceable") + continue + } + proceed, err := am.policy.Check(pview) + if err != nil { + log.WithError(err).Error("policy check errored") + continue + } + if !proceed { + log.Debug("policy denied intent") + continue + } if !ok { break } - am.log.Debug("handling permitted event") - am.takeAction(pin) + log.Debug("handling permitted intent") + am.takeAction(qin) - case in, ok := <-am.input: + case input, ok := <-am.inputs: if !ok { + am.log.Error("input channel closed") break } - am.log.Debug("checking with policy") - // TODO: make policy checking and consideration richer - pview, err := am.makePolicyCheck(in) - if err != nil { - am.log.WithError(err).Error("policy unenforceable") + queued := len(queuedIntents) + log := am.log.WithFields(logfields.Intent(input)). + WithFields(logrus.Fields{ + "queue": "process", + "queue-length": fmt.Sprintf("%d", queued), + }) + if queued < queueSkipThreshold { + queuedIntents <- input continue } - proceed, err := am.policy.Check(pview) - if err != nil { - am.log.WithError(err).Error("policy check errored") + // Start dropping if its not possible to queue at all. + if queued+1 > maxQueuedIntents { + log.Warn("queue full, dropping intent this try") continue } - if !proceed { - am.log.Debug("policy denied intent") - return nil + + // TODO: handle backpressure better with rescheduling instead of drops + + // Queue is getting full, let's be more selective about events that + // are propagated. + + if isClusterActive(input) { + log.Info("queue active intent") + queuedIntents <- input + continue } - am.log.Debug("policy permitted intent") - if len(permit) < maxQueuedIntents { - permit <- in - } else { - // TODO: handle backpressure with scheduling - am.log.Warn("backpressure blocking permitted intents") + + if isLowPriority(input) { + n := randDropIntFunc(10) + willDrop := n%2 == 0 + if willDrop { + // Intent is picked up again when cached Intent expires & + // Informer syncs OR if the Intent is changed (from update + // or otherwise by Node). This provides indirect + // backpressure by delaying the next time the Intent will be + // handled. + log.Warn("queue backlog high, randomly dropping intent") + continue + } } + log.Debug("queue intent") + queuedIntents <- input + } } } -func (am *ActionManager) takeAction(pin *intent.Intent) error { - log := am.log.WithField("node", pin.GetName()) +func isLowPriority(in *intent.Intent) bool { + stabilizing := in.Wanted == marker.NodeActionStabilize + unknown := in.Wanted == marker.NodeActionUnknown || in.Wanted == "" + hasUpdate := in.UpdateAvailable == marker.NodeUpdateAvailable + return (stabilizing && !hasUpdate) || unknown +} + +func (am *actionManager) takeAction(pin *intent.Intent) error { + log := am.log.WithFields(logfields.Intent(pin)) successCheckRun := successfulUpdate(pin) if successCheckRun { log.Debug("handling successful update") @@ -162,23 +224,25 @@ func (am *ActionManager) takeAction(pin *intent.Intent) error { err := am.poster.Post(pin) if err != nil { - log.WithError(err).Error("could not post intent") + log.WithError(err).Error("unable to post intent") } return err } -func (am *ActionManager) makePolicyCheck(in *intent.Intent) (*PolicyCheck, error) { +// makePolicyCheck collects cluster information as a PolicyCheck for which to be +// provided to a policy checker. +func (am *actionManager) makePolicyCheck(in *intent.Intent) (*PolicyCheck, error) { if am.storer == nil { return nil, errors.Errorf("manager has no store to access, needed for policy check") } return newPolicyCheck(in, am.storer.GetStore()) } -func (am *ActionManager) SetStoreProvider(storer storer) { +func (am *actionManager) SetStoreProvider(storer storer) { am.storer = storer } -func (am *ActionManager) handle(node *v1.Node) { +func (am *actionManager) handle(node intent.Input) { log := am.log.WithField("node", node.GetName()) log.Debug("handling event") @@ -186,25 +250,41 @@ func (am *ActionManager) handle(node *v1.Node) { if in == nil { return // no actionable intent signaled } + log = log.WithFields(logfields.Intent(in)) + + lastQueued := am.lastCache.Last(in) + if logging.Debuggable && lastQueued != nil { + log.WithField("last-intent", lastQueued.DisplayString()). + Debug("retrieved cached queued intent to dedupe") + } + if intent.Equivalent(lastQueued, in) { + log.Debug("not queuing duplicate intent") + return + } + record := in.Clone() select { - case am.input <- in: - log.Debug("submitted intent") + case am.inputs <- in: + log.Debug("queue intent") + am.lastCache.Record(record) default: - log.Warn("unable to submit intent") + log.WithFields(logrus.Fields{ + "queue": "input", + "queue-length": len(am.inputs), + }).Warn("unable to queue intent (back pressure)") } } // intentFor interprets the intention given the Node's annotations. -func (am *ActionManager) intentFor(node intent.Input) *intent.Intent { - log := am.log.WithField("node", node.GetName()) +func (am *actionManager) intentFor(node intent.Input) *intent.Intent { in := intent.Given(node) + log := am.log.WithFields(logfields.Intent(in)) if in.Stuck() { - log.Debug("intent is stuck") - log.Warn("resetting to stabilize stuck intent state") - in = in.Reset() - return in + reset := in.Reset() + log.WithField("intent-reset", reset.DisplayString()).Debug("node intent indicates stuck") + log.Warn("stabilizing stuck node") + return reset } // TODO: add per-node bucketed backoff for error handling and retries. if in.Errored() { @@ -243,16 +323,16 @@ func successfulUpdate(in *intent.Intent) bool { } // OnAdd is a Handler implementation for nodestream -func (am *ActionManager) OnAdd(node *v1.Node) { +func (am *actionManager) OnAdd(node *v1.Node) { am.handle(node) } // OnDelete is a Handler implementation for nodestream -func (am *ActionManager) OnDelete(node *v1.Node) { +func (am *actionManager) OnDelete(node *v1.Node) { am.handle(node) } // OnUpdate is a Handler implementation for nodestream -func (am *ActionManager) OnUpdate(_ *v1.Node, node *v1.Node) { +func (am *actionManager) OnUpdate(_ *v1.Node, node *v1.Node) { am.handle(node) } diff --git a/extras/dogswatch/pkg/controller/kubernetes.go b/extras/dogswatch/pkg/controller/manager_kubernetes.go similarity index 87% rename from extras/dogswatch/pkg/controller/kubernetes.go rename to extras/dogswatch/pkg/controller/manager_kubernetes.go index eb68d9e8cad..8eeb6183a3e 100644 --- a/extras/dogswatch/pkg/controller/kubernetes.go +++ b/extras/dogswatch/pkg/controller/manager_kubernetes.go @@ -51,7 +51,7 @@ func (k *k8sNodeManager) Drain(nodeName string) error { return drain.RunNodeDrain(drainer, nodeName) } -func (am *ActionManager) cordonNode(nodeName string) error { +func (am *actionManager) cordonNode(nodeName string) error { log := am.log.WithField("node", nodeName) log.Debug("preparing to cordon") node, err := am.kube.CoreV1().Nodes().Get(nodeName, v1meta.GetOptions{}) @@ -75,16 +75,16 @@ func (am *ActionManager) cordonNode(nodeName string) error { return nil } -func (am *ActionManager) uncordonNode(nodeName string) error { +func (am *actionManager) uncordonNode(nodeName string) error { return nil } -func (am *ActionManager) checkNode(nodeName string) error { +func (am *actionManager) checkNode(nodeName string) error { return nil } -func (am *ActionManager) drainWorkload(nodeName string) error { +func (am *actionManager) drainWorkload(nodeName string) error { log := am.log.WithField("node", nodeName) log.Debug("draining workload") helper := drain.Helper{ @@ -121,6 +121,13 @@ type k8sPoster struct { func (k *k8sPoster) Post(i *intent.Intent) error { nodeName := i.GetName() - defer k.log.WithField("node", nodeName).Debugf("posted intent on node: %s", i) - return k8sutil.PostMetadata(k.nodeclient, nodeName, i) + err := k8sutil.PostMetadata(k.nodeclient, nodeName, i) + if err != nil { + return err + } + k.log.WithFields(logrus.Fields{ + "node": nodeName, + "intent": i.DisplayString(), + }).Debugf("posted intent") + return nil } diff --git a/extras/dogswatch/pkg/controller/manager_test.go b/extras/dogswatch/pkg/controller/manager_test.go index 200468e1a6d..6520e260b3c 100644 --- a/extras/dogswatch/pkg/controller/manager_test.go +++ b/extras/dogswatch/pkg/controller/manager_test.go @@ -64,7 +64,7 @@ type testManagerHooks struct { NodeManager *testingNodeManager } -func testManager(t *testing.T) (*ActionManager, *testManagerHooks) { +func testManager(t *testing.T) (*actionManager, *testManagerHooks) { m := newManager(testoutput.Logger(t, logging.New("manager")), nil, "test-node") hooks := &testManagerHooks{ @@ -79,13 +79,18 @@ func testManager(t *testing.T) (*ActionManager, *testManagerHooks) { func TestManagerIntentForSimple(t *testing.T) { nils := []*intent.Intent{ intents.BusyRebootUpdate(), + intents.Stabilized(intents.WithUpdateAvailable(marker.NodeUpdateUnavailable)), + intents.PendingUpdate(), } nonnils := []*intent.Intent{ intents.UpdateError(), + intents.Stabilized(intents.WithUpdateAvailable(marker.NodeUpdateAvailable)), } + intents.NormalizeNodeName("inactive", nils...) + intents.NormalizeNodeName("active", nonnils...) + for _, in := range nils { - in.NodeName = "test-node" t.Run(fmt.Sprintf("nil(%s)", in.DisplayString()), func(t *testing.T) { m, _ := testManager(t) actual := m.intentFor(in) @@ -93,7 +98,6 @@ func TestManagerIntentForSimple(t *testing.T) { }) } for _, in := range nonnils { - in.NodeName = "test-node" t.Run(fmt.Sprintf("non(%s)", in.DisplayString()), func(t *testing.T) { m, _ := testManager(t) actual := m.intentFor(in) @@ -102,6 +106,30 @@ func TestManagerIntentForSimple(t *testing.T) { } } +func TestManagerHandleCacheFilter(t *testing.T) { + m, _ := testManager(t) + nodes := []string{"node-a", "node-b"} + eventInputs := []*intent.Intent{ + // 2 events should make it through (deduped) + intents.UpdatePrepared(intents.WithNodeName(nodes[0])), + intents.UpdatePrepared(intents.WithNodeName(nodes[1])), + intents.UpdatePrepared(intents.WithNodeName(nodes[0])), + intents.UpdatePrepared(intents.WithNodeName(nodes[1])), + + // another 2, they're different from the first set + intents.UpdateSuccess(intents.WithNodeName(nodes[0])), + intents.UpdateSuccess(intents.WithNodeName(nodes[1])), + } + m.inputs = make(chan *intent.Intent, len(eventInputs)) + defer close(m.inputs) + + for _, eventInput := range eventInputs { + m.handle(eventInput) + } + + assert.Equal(t, len(m.inputs), 4) +} + func TestManagerIntentForTargeted(t *testing.T) { cases := []struct { input *intent.Intent @@ -111,8 +139,8 @@ func TestManagerIntentForTargeted(t *testing.T) { input: intents.UpdateError(), expected: intents.Reset(), }, - // Update handling is a pass through to handle the "exact" intent. { + // Update handling is a pass through to handle the "exact" intent. input: intents.UpdateSuccess(), expected: intents.UpdateSuccess(), }, @@ -121,10 +149,35 @@ func TestManagerIntentForTargeted(t *testing.T) { expected: intents.UpdatePrepared( intents.Pending(marker.NodeActionPerformUpdate)), }, + { + // Busy doing work, shouldn't modify the intent. + input: intents.UpdatePrepared(intents.WithBusy()), + expected: nil, + }, + { + // Busy doing work, shouldn't modify the intent. + input: intents.UpdatePrepared( + intents.Pending(marker.NodeActionPerformUpdate)), + expected: nil, + }, { input: intents.PendingStabilizing(), expected: nil, }, + { + input: intents.Stabilized(intents.WithUpdateAvailable(marker.NodeUpdateUnavailable)), + expected: nil, + }, + { + input: intents.PerformingUpdate(), + expected: nil, + }, + { + // Available updates are primed and intended to be tried. + input: intents.Stabilized(intents.WithUpdateAvailable(marker.NodeUpdateAvailable)), + expected: intents.Stabilized(intents.WithUpdateAvailable(marker.NodeUpdateAvailable)). + SetBeginUpdate(), + }, } for _, tc := range cases { @@ -220,6 +273,8 @@ func TestSuccessfulUpdate(t *testing.T) { }, falsy: []*intent.Intent{ intents.Unknown(), + intents.Stabilized(), + intents.Stabilized(intents.WithUpdateAvailable(marker.NodeUpdateUnavailable)), }, } diff --git a/extras/dogswatch/pkg/controller/policy.go b/extras/dogswatch/pkg/controller/policy.go index e45e4fc0868..7c225e2f71b 100644 --- a/extras/dogswatch/pkg/controller/policy.go +++ b/extras/dogswatch/pkg/controller/policy.go @@ -1,14 +1,20 @@ package controller import ( + "fmt" + "github.com/amazonlinux/thar/dogswatch/pkg/intent" + "github.com/amazonlinux/thar/dogswatch/pkg/internal/logfields" + "github.com/amazonlinux/thar/dogswatch/pkg/logging" + "github.com/amazonlinux/thar/dogswatch/pkg/marker" "github.com/pkg/errors" + "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" ) const ( - allowedClusterActive = 1 + maxClusterActive = 1 ) type Policy interface { @@ -17,21 +23,6 @@ type Policy interface { Check(*PolicyCheck) (bool, error) } -type defaultPolicy struct{} - -func (p *defaultPolicy) Check(ck *PolicyCheck) (bool, error) { - // If already active, continue to handle it. - if ck.Intent.InProgress() { - return true, nil - } - // If there are no other active nodes in the cluster, then go ahead with the - // intended action. - if ck.ClusterActive == 0 { - return true, nil - } - return false, nil -} - type PolicyCheck struct { Intent *intent.Intent ClusterActive int @@ -51,11 +42,24 @@ func newPolicyCheck(in *intent.Intent, resources cache.Store) (*PolicyCheck, err continue } cin := intent.Given(node) - if !cin.Terminal() { + if isClusterActive(cin) { clusterActive++ + if logging.Debuggable { + logging.New("policy-check").WithFields(logfields.Intent(cin)). + WithField("cluster-active", fmt.Sprintf("%d", clusterActive)). + Debug("cluster node's intent considered active") + } } } + if logging.Debuggable { + logging.New("policy-check").WithFields(logfields.Intent(in)).WithFields(logrus.Fields{ + "cluster-count": fmt.Sprintf("%d", clusterCount), + "cluster-active": fmt.Sprintf("%d", clusterActive), + "resource-count": fmt.Sprintf("%d", len(ress)), + }).Debug("collected policy check") + } + if clusterCount <= 0 { return nil, errors.Errorf("%d resources listed of inappropriate type", len(ress)) } @@ -66,3 +70,53 @@ func newPolicyCheck(in *intent.Intent, resources cache.Store) (*PolicyCheck, err ClusterCount: clusterCount, }, nil } + +// isClusterActive matches intents that the cluster shouldn't run concurrently. +func isClusterActive(i *intent.Intent) bool { + stabilizing := i.Wanted == marker.NodeActionStabilize + return !stabilizing && !i.Stuck() +} + +type defaultPolicy struct { + log logging.Logger +} + +func (p *defaultPolicy) Check(ck *PolicyCheck) (bool, error) { + log := p.log.WithFields(logfields.Intent(ck.Intent)). + WithFields(logrus.Fields{ + "cluster-active": fmt.Sprintf("%d", ck.ClusterActive), + "cluster-count": fmt.Sprintf("%d", ck.ClusterCount), + }) + + // policy checks are applied to intended actions, Intents that are next in + // line to be executed. Projections are made without considering the policy + // at time of the projection to the next state. So, we have to check when + // the update process is starting up. + startingUpdate := ck.Intent.Active == marker.NodeActionStabilize + if !startingUpdate { + if ck.Intent.InProgress() { + if logging.Debuggable { + log.Debug("permit already in progress") + } + return true, nil + } + + if ck.Intent.Terminal() { + if logging.Debuggable { + log.Debug("permit terminal intent") + } + return true, nil + } + } + + // If there are no other active nodes in the cluster, then go ahead with the + // intended action. + if ck.ClusterActive < maxClusterActive { + log.WithField("allowed-active", fmt.Sprintf("%d", maxClusterActive)).Debugf("permit according to active threshold") + + return true, nil + } + + log.Debug("deny intent") + return false, nil +} diff --git a/extras/dogswatch/pkg/controller/policy_test.go b/extras/dogswatch/pkg/controller/policy_test.go new file mode 100644 index 00000000000..9a14a7bc04a --- /dev/null +++ b/extras/dogswatch/pkg/controller/policy_test.go @@ -0,0 +1,140 @@ +package controller + +import ( + "fmt" + "testing" + + "github.com/amazonlinux/thar/dogswatch/pkg/intent" + "github.com/amazonlinux/thar/dogswatch/pkg/internal/intents" + "github.com/amazonlinux/thar/dogswatch/pkg/internal/testoutput" + "github.com/amazonlinux/thar/dogswatch/pkg/logging" + "github.com/amazonlinux/thar/dogswatch/pkg/marker" + + "gotest.tools/assert" +) + +func TestPolicyCheck(t *testing.T) { + cases := []struct { + Name string + PolicyCheck *PolicyCheck + ShouldPermit bool + ShouldError bool + }{ + // should not update when threshold would be exceeded + { + Name: "update-available-maxactive", + ShouldPermit: false, + PolicyCheck: &PolicyCheck{ + Intent: intents.Stabilized(intents.WithUpdateAvailable(marker.NodeUpdateUnavailable)), + ClusterActive: maxClusterActive, + ClusterCount: maxClusterActive + 1, + }, + }, + // stabilize should always be permitted + { + Name: "stabilize-new", + ShouldPermit: true, + PolicyCheck: &PolicyCheck{ + Intent: intents.PendingStabilizing(), + ClusterActive: maxClusterActive, + ClusterCount: maxClusterActive + 1, + }, + }, + { + Name: "stabilize-new", + ShouldPermit: true, + PolicyCheck: &PolicyCheck{ + Intent: intents.PendingStabilizing(), + ClusterActive: 0, + ClusterCount: maxClusterActive + 1, + }, + }, + { + Name: "stabilize-new", + ShouldPermit: true, + PolicyCheck: &PolicyCheck{ + Intent: intents.PendingStabilizing(), + ClusterActive: maxClusterActive, + ClusterCount: maxClusterActive + 1, + }, + }, + { + Name: "perform-max-active", + ShouldPermit: false, + PolicyCheck: &PolicyCheck{ + Intent: intents.PendingPrepareUpdate(), + ClusterActive: maxClusterActive, + ClusterCount: maxClusterActive + 1, + }, + }, + { + Name: "perform-over-threshold", + ShouldPermit: false, + PolicyCheck: &PolicyCheck{ + Intent: intents.PendingPrepareUpdate(), + ClusterActive: maxClusterActive + 1, + ClusterCount: maxClusterActive + 1, + }, + }, + { + // After an update, we'll need to handle the Node's intent to + // uncordon it. + Name: "updated", + ShouldPermit: true, + PolicyCheck: &PolicyCheck{ + Intent: intents.UpdateSuccess(), + ClusterActive: maxClusterActive, + ClusterCount: maxClusterActive + 1, + }, + }, + } + + for _, tc := range cases { + + check := tc.PolicyCheck + t.Run(fmt.Sprintf("%s(%s) %d/%d", tc.Name, check.Intent.DisplayString(), check.ClusterActive, check.ClusterCount), + func(t *testing.T) { + policy := defaultPolicy{ + log: testoutput.Logger(t, logging.New("policy-check")), + } + + permit, err := policy.Check(check) + assert.Equal(t, tc.ShouldPermit, permit) + if tc.ShouldError { + assert.Error(t, err, "") + } else { + assert.NilError(t, err) + } + }) + } +} + +func TestIsClusterActiveIntents(t *testing.T) { + cases := []struct { + Intent *intent.Intent + Expected bool + }{ + // Nodes beginning updates are actively working towards a goal, they're + // active and should be counted. + {Intent: intents.PendingPrepareUpdate(), Expected: true}, + // Nodes waiting to start an update are actively working. + {Intent: intents.PendingUpdate(), Expected: true}, + // Updates success is yet to be handled, so should "occupy" a slot in + // the active count. + {Intent: intents.UpdateSuccess(), Expected: true}, + // Errors should prevent others from making progress (eg: error prevents + // updates in cluster) and "occupy" a slot in the active count. + {Intent: intents.UpdateError(), Expected: true}, + // Resets and Stabilization are normative, non-intrusive operations and + // shouldn't add to active count. + {Intent: intents.PendingStabilizing(), Expected: false}, + {Intent: intents.Reset(), Expected: false}, + } + + for _, tc := range cases { + t.Run(tc.Intent.DisplayString(), func(t *testing.T) { + actual := isClusterActive(tc.Intent) + assert.Equal(t, tc.Expected, actual) + }) + } +} diff --git a/extras/dogswatch/pkg/intent/cache/cache.go b/extras/dogswatch/pkg/intent/cache/cache.go new file mode 100644 index 00000000000..4717c8d116b --- /dev/null +++ b/extras/dogswatch/pkg/intent/cache/cache.go @@ -0,0 +1,65 @@ +package cache + +import ( + "time" + + "github.com/amazonlinux/thar/dogswatch/pkg/intent" + + "github.com/karlseguin/ccache" +) + +const ( + cacheTimeout = time.Second * 15 +) + +// LastCache provides access to the last cached Intent that came from the same +// source of the provided Intent. +type LastCache interface { + Last(*intent.Intent) *intent.Intent + Record(*intent.Intent) +} + +type lastCache struct { + cache *ccache.Cache +} + +// NewLastCache creates a general cache suitable for storing and retrieving the +// last observed Intent given its source. +func NewLastCache() LastCache { + return &lastCache{ + cache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)), + } +} + +// Last returns the last intent to be sent through. +func (i *lastCache) Last(in *intent.Intent) *intent.Intent { + if in == nil { + return nil + } + val := i.cache.Get(in.GetName()) + if val == nil { + return nil + } + if val.Expired() { + return nil + } + lastCachedIntent, ok := val.Value().(*intent.Intent) + if !ok { + return nil + } + + // TODO: possibly extend a cached item + // val.Extend(cacheExtension) + + // Copy to protect against misuse of cached in-memory Intent. + return lastCachedIntent.Clone() +} + +// Record caches the provided Intent as the most recent Intent handled for a +// given intent. +func (i *lastCache) Record(in *intent.Intent) { + if in == nil { + return + } + i.cache.Set(in.GetName(), in.Clone(), cacheTimeout) +} diff --git a/extras/dogswatch/pkg/intent/intent.go b/extras/dogswatch/pkg/intent/intent.go index 315c114cc99..b797e4359bd 100644 --- a/extras/dogswatch/pkg/intent/intent.go +++ b/extras/dogswatch/pkg/intent/intent.go @@ -112,10 +112,12 @@ func (i *Intent) Stuck() bool { degradedUnknown := stuckUnknown || wantingUnknown // The action's step was out of line and resulted in an taking an unknown // action. - degradedPath := i.DegradedPath() + degradedPath := i.DegradedPath() && i.Waiting() // The action was not one of progress and yet was acted upon. degradedBusy := !i.isInProgress(i.Wanted) && i.Wanted == i.Active && i.State == marker.NodeStateBusy + result := degradedStatic || degradedUnknown || degradedPath || degradedBusy + if logging.Debuggable { logging.New("intent").WithFields(logrus.Fields{ "intent": i.DisplayString(), @@ -123,10 +125,11 @@ func (i *Intent) Stuck() bool { "degradedUnknown": degradedUnknown, "degradedPath": degradedPath, "degradedBusy": degradedBusy, - }).Debug("Stuck") + "result": result, + }).Debug("intent:Stuck") } - return degradedStatic || degradedUnknown || degradedPath || degradedBusy + return result } // DegradedPaths indicates that the intent will derail and step into an unknown @@ -138,6 +141,8 @@ func (i *Intent) DegradedPath() bool { untargeted := anticipated.Wanted == marker.NodeActionUnknown inconsistent := !i.Realized() && anticipated.Wanted != i.Wanted + result := (!starting || i.Terminal()) && (untargeted || inconsistent) + if logging.Debuggable { logging.New("intent").WithFields(logrus.Fields{ "intent": i.DisplayString(), @@ -145,9 +150,11 @@ func (i *Intent) DegradedPath() bool { "starting": starting, "untargeted": untargeted, "inconsistent": inconsistent, - }).Debug("DegradedPath") + "result": result, + }).Debug("intent:DegradedPath") } - return (!starting || i.Terminal()) && (untargeted || inconsistent) + + return result } // Realized indicates that the Intent reached the intended state. @@ -163,15 +170,18 @@ func (i *Intent) InProgress() bool { // waiting on handler to complete its intent handling pendingFinish := i.Wanted == i.Active && !i.Waiting() + result := pendingNode || pendingFinish + if logging.Debuggable { logging.New("intent").WithFields(logrus.Fields{ "intent": i.DisplayString(), "pendingNode": pendingNode, "pendingFinish": pendingFinish, - }).Debug("InProgress") + "result": result, + }).Debug("intent:InProgress") } - return pendingNode || pendingFinish + return result } // isInProgress indicates that the field provided is an action that may be able @@ -242,8 +252,8 @@ func (i *Intent) Terminal() bool { atTerminal := next == i.Wanted && i.Wanted == i.Active if logging.Debuggable { logging.New("intent").WithFields(logrus.Fields{ - "atTerminal": atTerminal, - }).Debug("Targeted") + "result": atTerminal, + }).Debug("intent:Targeted") } return atTerminal } @@ -265,11 +275,24 @@ func (i *Intent) reset() { i.UpdateAvailable = marker.NodeUpdateUnknown } +// SetUpdateAvailable modifies the intent to reflect the provided available +// state. +func (i *Intent) SetUpdateAvailable(available bool) *Intent { + switch available { + case true: + i.UpdateAvailable = marker.NodeUpdateAvailable + case false: + i.UpdateAvailable = marker.NodeUpdateUnavailable + } + + return i +} + func (i *Intent) DisplayString() string { if i == nil { - return fmt.Sprintf(",,") + return fmt.Sprintf(",, update:") } - return fmt.Sprintf("%s,%s,%s", i.Wanted, i.Active, i.State) + return fmt.Sprintf("%s,%s,%s update:%s", i.Wanted, i.Active, i.State, i.UpdateAvailable) } // Clone returns a copy of the Intent to mutate independently of the source @@ -280,9 +303,14 @@ func (i Intent) Clone() *Intent { // Equivalent compares intentional state to determine equivalency. func Equivalent(i, j *Intent) bool { + if i == nil || j == nil { + return false + } + return i.Wanted == j.Wanted && - i.Active == i.Active && - i.State == i.State + i.Active == j.Active && + i.State == j.State && + i.UpdateAvailable == j.UpdateAvailable } // Given determines the commuincated intent from a Node without projecting into diff --git a/extras/dogswatch/pkg/intent/intent_predicate_test.go b/extras/dogswatch/pkg/intent/intent_predicate_test.go new file mode 100644 index 00000000000..609af8dfa1b --- /dev/null +++ b/extras/dogswatch/pkg/intent/intent_predicate_test.go @@ -0,0 +1,264 @@ +package intent_test + +import ( + "fmt" + "testing" + + "github.com/amazonlinux/thar/dogswatch/pkg/intent" + "github.com/amazonlinux/thar/dogswatch/pkg/intent/internal/callcheck" + "github.com/amazonlinux/thar/dogswatch/pkg/internal/intents" + "github.com/amazonlinux/thar/dogswatch/pkg/internal/testoutput" + "github.com/amazonlinux/thar/dogswatch/pkg/logging" + "github.com/amazonlinux/thar/dogswatch/pkg/marker" + + "gotest.tools/assert" +) + +func TestIntentTruths(t *testing.T) { + type pred = string + + testcases := []struct { + name string + intents []intent.Intent + truthy []pred + falsy []pred + }{ + { + name: "empty", + intents: []intent.Intent{ + {}, // empty + }, + truthy: []pred{"Stuck"}, + falsy: []pred{"Errored"}, + }, + { + name: "success", + intents: []intent.Intent{ + { + Wanted: marker.NodeActionRebootUpdate, + Active: marker.NodeActionRebootUpdate, + State: marker.NodeStateReady, + }, + }, + truthy: []pred{"Waiting", "Terminal", "Realized"}, + falsy: []pred{"Intrusive", "Stuck", "InProgress"}, + }, + { + name: "working", + intents: []intent.Intent{ + { + Wanted: marker.NodeActionStabilize, + Active: marker.NodeActionStabilize, + State: marker.NodeStateBusy, + }, + }, + truthy: []pred{"InProgress"}, + falsy: []pred{"Waiting", "Actionable", "Realized", "Stuck"}, + }, + { + name: "not-stuck", + intents: []intent.Intent{ + { + Wanted: marker.NodeActionStabilize, + Active: marker.NodeActionStabilize, + State: marker.NodeStateReady, + }, + { + // The first step we take in an update, should be coming + // from a stable place. + Wanted: ((&intent.Intent{}).SetBeginUpdate().Wanted), + Active: marker.NodeActionStabilize, + State: marker.NodeStateReady, + }, + }, + truthy: []pred{"Waiting"}, + falsy: []pred{"Stuck", "Errored", "DegradedPath"}, + }, + { + name: "not-stuck-busy", + intents: []intent.Intent{ + *intents.PreparingUpdate(), + *intents.PerformingUpdate(), + *intents.BusyRebootUpdate(), + }, + truthy: []pred{"InProgress"}, + falsy: []pred{"Waiting", "Errored", "Stuck"}, + }, + { + name: "stuck", + intents: []intent.Intent{ + { + Wanted: marker.NodeActionUnknown, + Active: marker.NodeActionUnknown, + State: marker.NodeStateBusy, + }, + { + Wanted: marker.NodeActionUnknown, + Active: marker.NodeActionUnknown, + State: marker.NodeStateError, + }, + { + Wanted: marker.NodeActionUnknown, + Active: marker.NodeActionPerformUpdate, + State: marker.NodeStateReady, + }, + }, + truthy: []pred{"Stuck"}, + falsy: []pred{"Realized", "Terminal"}, + }, + { + name: "stuck", + intents: []intent.Intent{ + { + Wanted: marker.NodeActionRebootUpdate, + Active: marker.NodeActionUnknown, + State: marker.NodeStateError, + }, + }, + truthy: []pred{"DegradedPath"}, + }, + { + name: "waiting", + intents: []intent.Intent{ + { + Wanted: marker.NodeActionStabilize, + Active: marker.NodeActionStabilize, + State: marker.NodeStateReady, + }, + }, + truthy: []pred{"Waiting", "Realized", "Terminal"}, + falsy: []pred{"Actionable"}, + }, + { + name: "waiting", + intents: []intent.Intent{ + { + Wanted: marker.NodeActionStabilize, + Active: marker.NodeActionUnknown, + State: marker.NodeStateUnknown, + UpdateAvailable: marker.NodeUpdateAvailable, + }, + }, + truthy: []pred{"InProgress"}, + falsy: []pred{"Realized", "Actionable", "Stuck"}, + }, + { + name: "errored-nominal", + intents: []intent.Intent{ + { + Wanted: marker.NodeActionStabilize, + Active: marker.NodeActionStabilize, + State: marker.NodeStateError, + }, + }, + truthy: []pred{"Errored", "Waiting"}, + falsy: []pred{"Realized"}, + }, + { + name: "errored-unusual", + intents: []intent.Intent{ + { + Wanted: "arst", + Active: "neio", + State: marker.NodeStateError, + }, + }, + truthy: []pred{"Errored", "Waiting", "Stuck"}, + falsy: []pred{"Realized"}, + }, + { + name: "inprogress", + intents: []intent.Intent{ + { + Wanted: marker.NodeActionRebootUpdate, + Active: marker.NodeActionRebootUpdate, + State: marker.NodeStateBusy, + }, + }, + truthy: []pred{"InProgress", "Intrusive"}, + falsy: []pred{"Errored", "Realized", "Stuck", "Waiting"}, + }, + { + name: "actionable", + intents: []intent.Intent{ + { + Wanted: marker.NodeActionPrepareUpdate, + Active: marker.NodeActionPrepareUpdate, + State: marker.NodeStateReady, + }, + { + Wanted: marker.NodeActionPerformUpdate, + Active: marker.NodeActionPerformUpdate, + State: marker.NodeStateReady, + }, + }, + truthy: []pred{"Actionable", "Realized", "Waiting"}, + falsy: []pred{"Errored", "Stuck", "DegradedPath"}, + }, + { + name: "terminal", + intents: []intent.Intent{ + { + Wanted: marker.NodeActionRebootUpdate, + Active: marker.NodeActionRebootUpdate, + State: marker.NodeStateBusy, + }, + }, + truthy: []pred{"Terminal", "InProgress"}, + falsy: []pred{"Errored", "Realized", "Stuck", "Actionable", "Waiting"}, + }, + { + name: "terminal", + intents: []intent.Intent{ + + { + Wanted: marker.NodeActionRebootUpdate, + Active: marker.NodeActionRebootUpdate, + State: marker.NodeStateReady, + }, + }, + truthy: []pred{"Terminal", "Realized", "Waiting"}, + falsy: []pred{"Errored", "Stuck", "Actionable"}, + }, + } + + for _, tc := range testcases { + for _, intent := range tc.intents { + name := fmt.Sprintf("%s(%s)", tc.name, intent.DisplayString()) + t.Run(name, func(t *testing.T) { + intent.NodeName = "state-machine" + + preds := map[pred]struct{}{} + noOverlap := func(p pred) { + _, overlappingPredicate := preds[p] + assert.Assert(t, !overlappingPredicate, "the predicate %q was asserted twice", p) + preds[p] = struct{}{} + } + + for _, predT := range tc.truthy { + t.Run(predT, func(t *testing.T) { + logging.Set(testoutput.Setter(t)) + defer logging.Set(testoutput.Revert()) + + noOverlap(predT) + match, err := callcheck.Predicate(&intent, predT) + assert.NilError(t, err) + assert.Check(t, match, "%q expected to be true", predT) + }) + } + + for _, predF := range tc.falsy { + t.Run(predF, func(t *testing.T) { + logging.Set(testoutput.Setter(t)) + defer logging.Set(testoutput.Revert()) + + noOverlap(predF) + match, err := callcheck.Predicate(&intent, predF) + assert.NilError(t, err) + assert.Check(t, !match, "%q expected to be false", predF) + }) + } + }) + } + } +} diff --git a/extras/dogswatch/pkg/intent/intent_test.go b/extras/dogswatch/pkg/intent/intent_test.go index da007f0b90b..104b333dbaf 100644 --- a/extras/dogswatch/pkg/intent/intent_test.go +++ b/extras/dogswatch/pkg/intent/intent_test.go @@ -2,13 +2,13 @@ package intent import ( "fmt" - "reflect" "testing" + "github.com/amazonlinux/thar/dogswatch/pkg/intent/internal/callcheck" "github.com/amazonlinux/thar/dogswatch/pkg/internal/testoutput" "github.com/amazonlinux/thar/dogswatch/pkg/logging" "github.com/amazonlinux/thar/dogswatch/pkg/marker" - "github.com/pkg/errors" + "gotest.tools/assert" ) @@ -22,234 +22,22 @@ func testIntent() *Intent { return i } -func TestReset(t *testing.T) { - i := testIntent() - s := testIntent() - - s.reset() - - // first action after reset - assert.Equal(t, s.Projected().Wanted, marker.NodeActionStabilize) - assert.Check(t, i.Active != s.Active) -} - -func TestGivenDuplicate(t *testing.T) { - i := testIntent() - s := Given(i) - assert.DeepEqual(t, i, s) -} - -func TestClone(t *testing.T) { - i := testIntent() - i.State = marker.NodeStateUnknown - s := i.Clone() - assert.DeepEqual(t, i, s) -} - -func TestIntentTruths(t *testing.T) { +func TestIntentTruthsInternal(t *testing.T) { type pred = string - testcases := []struct { name string intents []Intent truthy []pred falsy []pred }{ - { - name: "empty", - intents: []Intent{ - {}, // empty - }, - truthy: []pred{"Stuck"}, - falsy: []pred{"Errored"}, - }, { name: "reset", intents: []Intent{ - func() Intent { i := testIntent(); i.reset(); return *i }(), + func(i *Intent) Intent { i.reset(); return *i }(testIntent()), }, truthy: []pred{"Realized", "Waiting", "Stuck"}, falsy: []pred{"Intrusive"}, }, - { - name: "success", - intents: []Intent{ - { - Wanted: marker.NodeActionRebootUpdate, - Active: marker.NodeActionRebootUpdate, - State: marker.NodeStateReady, - }, - }, - truthy: []pred{"Waiting", "Terminal", "Realized"}, - falsy: []pred{"Intrusive", "Stuck", "InProgress"}, - }, - { - name: "working", - intents: []Intent{ - { - Wanted: marker.NodeActionStabilize, - Active: marker.NodeActionStabilize, - State: marker.NodeStateBusy, - }, - }, - truthy: []pred{"InProgress"}, - falsy: []pred{"Waiting", "Actionable", "Realized", "Stuck"}, - }, - { - name: "not-stuck-pending", - intents: []Intent{ - { - Wanted: marker.NodeActionStabilize, - Active: marker.NodeActionStabilize, - State: marker.NodeStateReady, - }, - { - // The first step we take in an update, should be coming - // from a stable place. - Wanted: ((&Intent{}).SetBeginUpdate().Wanted), - Active: marker.NodeActionStabilize, - State: marker.NodeStateReady, - }, - }, - truthy: []pred{"Waiting"}, - falsy: []pred{"Stuck", "Errored", "DegradedPath"}, - }, - { - name: "stuck", - intents: []Intent{ - { - Wanted: marker.NodeActionUnknown, - Active: marker.NodeActionUnknown, - State: marker.NodeStateBusy, - }, - { - Wanted: marker.NodeActionUnknown, - Active: marker.NodeActionUnknown, - State: marker.NodeStateError, - }, - { - Wanted: marker.NodeActionUnknown, - Active: marker.NodeActionPerformUpdate, - State: marker.NodeStateReady, - }, - }, - truthy: []pred{"Stuck"}, - falsy: []pred{"Realized", "Terminal"}, - }, - { - name: "stuck", - intents: []Intent{ - { - Wanted: marker.NodeActionRebootUpdate, - Active: marker.NodeActionUnknown, - State: marker.NodeStateError, - }, - }, - truthy: []pred{"DegradedPath"}, - }, - { - name: "waiting", - intents: []Intent{ - { - Wanted: marker.NodeActionStabilize, - Active: marker.NodeActionStabilize, - State: marker.NodeStateReady, - }, - }, - truthy: []pred{"Waiting", "Realized", "Terminal"}, - falsy: []pred{"Actionable"}, - }, - { - name: "waiting", - intents: []Intent{ - { - Wanted: marker.NodeActionStabilize, - Active: marker.NodeActionUnknown, - State: marker.NodeStateUnknown, - UpdateAvailable: marker.NodeUpdateAvailable, - }, - }, - truthy: []pred{"InProgress"}, - falsy: []pred{"Realized", "Actionable", "Stuck"}, - }, - { - name: "errored-nominal", - intents: []Intent{ - { - Wanted: marker.NodeActionStabilize, - Active: marker.NodeActionStabilize, - State: marker.NodeStateError, - }, - }, - truthy: []pred{"Errored", "Waiting"}, - falsy: []pred{"Realized"}, - }, - { - name: "errored-unusual", - intents: []Intent{ - { - Wanted: "arst", - Active: "neio", - State: marker.NodeStateError, - }, - }, - truthy: []pred{"Errored", "Waiting", "Stuck"}, - falsy: []pred{"Realized"}, - }, - { - name: "inprogress", - intents: []Intent{ - { - Wanted: marker.NodeActionRebootUpdate, - Active: marker.NodeActionRebootUpdate, - State: marker.NodeStateBusy, - }, - }, - truthy: []pred{"InProgress", "Intrusive"}, - falsy: []pred{"Errored", "Realized", "Stuck", "Waiting"}, - }, - { - name: "actionable", - intents: []Intent{ - { - Wanted: marker.NodeActionPrepareUpdate, - Active: marker.NodeActionPrepareUpdate, - State: marker.NodeStateReady, - }, - { - Wanted: marker.NodeActionPerformUpdate, - Active: marker.NodeActionPerformUpdate, - State: marker.NodeStateReady, - }, - }, - truthy: []pred{"Actionable", "Realized", "Waiting"}, - falsy: []pred{"Errored", "Stuck", "DegradedPath"}, - }, - { - name: "terminal", - intents: []Intent{ - { - Wanted: marker.NodeActionRebootUpdate, - Active: marker.NodeActionRebootUpdate, - State: marker.NodeStateBusy, - }, - }, - truthy: []pred{"Terminal", "InProgress"}, - falsy: []pred{"Errored", "Realized", "Stuck", "Actionable", "Waiting"}, - }, - { - name: "terminal", - intents: []Intent{ - - { - Wanted: marker.NodeActionRebootUpdate, - Active: marker.NodeActionRebootUpdate, - State: marker.NodeStateReady, - }, - }, - truthy: []pred{"Terminal", "Realized", "Waiting"}, - falsy: []pred{"Errored", "Stuck", "Actionable"}, - }, } for _, tc := range testcases { @@ -268,18 +56,16 @@ func TestIntentTruths(t *testing.T) { preds[p] = struct{}{} } - //t.Logf("projectActive %#v", intent.projectActive()) - for _, predT := range tc.truthy { noOverlap(predT) - match, err := callCheck(&intent, predT) + match, err := callcheck.Predicate(&intent, predT) assert.NilError(t, err) assert.Check(t, match, "%q expected to be true", predT) } for _, predF := range tc.falsy { noOverlap(predF) - match, err := callCheck(&intent, predF) + match, err := callcheck.Predicate(&intent, predF) assert.NilError(t, err) assert.Check(t, !match, "%q expected to be false", predF) } @@ -288,21 +74,28 @@ func TestIntentTruths(t *testing.T) { } } -func callCheck(recv *Intent, methodName string) (bool, error) { - val := reflect.ValueOf(recv) - typ := reflect.TypeOf(recv) - method, ok := typ.MethodByName(methodName) - if !ok { - return false, errors.Errorf("no predicate method named %q", methodName) - } - res := method.Func.Call([]reflect.Value{val}) - if len(res) != 1 { - return false, errors.Errorf("expected single return value from predicate method") - } - if res[0].Type().Name() != "bool" { - return false, errors.Errorf("return value from predicate was not a bool") - } - return res[0].Bool(), nil +func TestReset(t *testing.T) { + i := testIntent() + s := testIntent() + + s.reset() + + // first action after reset + assert.Equal(t, s.Projected().Wanted, marker.NodeActionStabilize) + assert.Check(t, i.Active != s.Active) +} + +func TestGivenDuplicate(t *testing.T) { + i := testIntent() + s := Given(i) + assert.DeepEqual(t, i, s) +} + +func TestClone(t *testing.T) { + i := testIntent() + i.State = marker.NodeStateUnknown + s := i.Clone() + assert.DeepEqual(t, i, s) } func TestProjectionMatches(t *testing.T) { diff --git a/extras/dogswatch/pkg/intent/internal/callcheck/check.go b/extras/dogswatch/pkg/intent/internal/callcheck/check.go new file mode 100644 index 00000000000..9ac82ea7258 --- /dev/null +++ b/extras/dogswatch/pkg/intent/internal/callcheck/check.go @@ -0,0 +1,25 @@ +package callcheck + +import ( + "reflect" + + "github.com/pkg/errors" +) + +// Predicate calls a method `methodName` on the Reciever `recv`. +func Predicate(recv interface{}, methodName string) (bool, error) { + val := reflect.ValueOf(recv) + typ := reflect.TypeOf(recv) + method, ok := typ.MethodByName(methodName) + if !ok { + return false, errors.Errorf("no predicate method named %q", methodName) + } + res := method.Func.Call([]reflect.Value{val}) + if len(res) != 1 { + return false, errors.Errorf("expected single return value from predicate method") + } + if res[0].Type().Name() != "bool" { + return false, errors.Errorf("return value from predicate was not a bool") + } + return res[0].Bool(), nil +} diff --git a/extras/dogswatch/pkg/internal/intents/intents.go b/extras/dogswatch/pkg/internal/intents/intents.go index 33f448873b0..ab3403f18ab 100644 --- a/extras/dogswatch/pkg/internal/intents/intents.go +++ b/extras/dogswatch/pkg/internal/intents/intents.go @@ -41,9 +41,12 @@ func WithNodeName(name string) func(i *intent.Intent) { } } -// WithBusy marks the intent as busy with the intent. +// WithBusy marks the intent as busy with the Active intent. func WithBusy() func(i *intent.Intent) { return func(i *intent.Intent) { + if i.Wanted != i.Active { + panic("provided Wanted and Active do not match - incoherent") + } i.State = marker.NodeStateBusy } } @@ -134,25 +137,37 @@ var ( Wanted: marker.NodeActionPrepareUpdate, Active: marker.NodeActionPrepareUpdate, State: marker.NodeStateReady, - }, WithUpdateAvailable(marker.NodeUpdateAvailable)) + }, WithUpdateAvailable()) PendingPrepareUpdate = ret("PendingPrepareUpdate", intent.Intent{ Wanted: marker.NodeActionPrepareUpdate, Active: marker.NodeActionStabilize, State: marker.NodeStateReady, - }) + }, WithUpdateAvailable()) + + PreparingUpdate = ret("PendingPrepareUpdate", intent.Intent{ + Wanted: marker.NodeActionPrepareUpdate, + Active: marker.NodeActionPrepareUpdate, + State: marker.NodeStateBusy, + }, WithUpdateAvailable()) UpdatePerformed = ret("UpdatePerformed", intent.Intent{ Wanted: marker.NodeActionPerformUpdate, Active: marker.NodeActionPerformUpdate, State: marker.NodeStateReady, - }, WithUpdateAvailable(marker.NodeUpdateAvailable)) + }, WithUpdateAvailable()) + + PerformingUpdate = ret("PerformingUpdate", intent.Intent{ + Wanted: marker.NodeActionPerformUpdate, + Active: marker.NodeActionPerformUpdate, + State: marker.NodeStateBusy, + }, WithUpdateAvailable()) PendingUpdate = ret("PendingUpdate", intent.Intent{ Wanted: marker.NodeActionPerformUpdate, Active: marker.NodeActionPrepareUpdate, State: marker.NodeStateReady, - }) + }, WithUpdateAvailable()) Unknown = ret("Unknown", intent.Intent{ Wanted: marker.NodeActionUnknown, diff --git a/extras/dogswatch/pkg/internal/logfields/intent.go b/extras/dogswatch/pkg/internal/logfields/intent.go new file mode 100644 index 00000000000..e1073030c92 --- /dev/null +++ b/extras/dogswatch/pkg/internal/logfields/intent.go @@ -0,0 +1,14 @@ +package logfields + +import ( + "github.com/amazonlinux/thar/dogswatch/pkg/intent" + + "github.com/sirupsen/logrus" +) + +func Intent(i *intent.Intent) logrus.Fields { + return logrus.Fields{ + "node": i.GetName(), + "intent": i.DisplayString(), + } +} diff --git a/extras/dogswatch/pkg/k8sutil/marker.go b/extras/dogswatch/pkg/k8sutil/marker.go index 4569a4546bd..94860ead7df 100644 --- a/extras/dogswatch/pkg/k8sutil/marker.go +++ b/extras/dogswatch/pkg/k8sutil/marker.go @@ -5,6 +5,7 @@ import ( "github.com/amazonlinux/thar/dogswatch/pkg/marker" "github.com/pkg/errors" + "github.com/sirupsen/logrus" v1meta "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "k8s.io/client-go/kubernetes/typed/core/v1" ) @@ -17,8 +18,11 @@ func PostMetadata(nc v1.NodeInterface, nodeName string, cont marker.Container) e marker.OverwriteFrom(cont, node) if logging.Debuggable { l := logging.New("k8sutil") - l.Debugf("annotations: %#v", node.GetAnnotations()) - l.Debugf("labels: %#v", node.GetLabels()) + l.WithFields(logrus.Fields{ + "node": nodeName, + "annotations": node.GetAnnotations(), + "labels": node.GetLabels(), + }).Debug("merged in new metadata") } _, err = nc.Update(node) if err != nil { diff --git a/extras/dogswatch/pkg/logging/debug.go b/extras/dogswatch/pkg/logging/debug.go index aaccb3f6aa4..cb41edbe77c 100644 --- a/extras/dogswatch/pkg/logging/debug.go +++ b/extras/dogswatch/pkg/logging/debug.go @@ -6,4 +6,4 @@ var DebugEnable string // Debuggable means that the build should include any debugging logic in it. The // compiler *should* erase anything that's otherwise in a conditional. -var Debuggable = DebugEnable != "" +var Debuggable = DebugEnable == "true" diff --git a/extras/dogswatch/pkg/logging/logrus.go b/extras/dogswatch/pkg/logging/logrus.go index 6b36f909b5c..75ec96890aa 100644 --- a/extras/dogswatch/pkg/logging/logrus.go +++ b/extras/dogswatch/pkg/logging/logrus.go @@ -7,6 +7,11 @@ import ( "github.com/sirupsen/logrus" ) +const ( + SubComponentField = "subcomponent" + ComponentField = "component" +) + type Setter func(*logrus.Logger) error var root = struct { @@ -25,6 +30,7 @@ var root = struct { mutex: &sync.Mutex{}, } +// Logger is a top level component logging facade. type Logger interface { logrus.FieldLogger @@ -32,14 +38,22 @@ type Logger interface { WriterLevel(logrus.Level) *io.PipeWriter } +// SubLogger is a sub component logging facade for subsystems that do not have +// their own top level logging owner. +type SubLogger interface { + logrus.FieldLogger +} + +// New creates a top level Logger for a given component. func New(component string, setters ...Setter) Logger { for _, setter := range setters { // no errors handling for now _ = Set(setter) } - return root.logger.WithField("component", component) + return root.logger.WithField(ComponentField, component) } +// Set mutates the underlying root logging implementation. func Set(setter Setter) error { root.mutex.Lock() err := setter(root.logger) @@ -47,6 +61,7 @@ func Set(setter Setter) error { return err } +// Level sets the level of the root logger. func Level(lvl string) Setter { l, err := logrus.ParseLevel(lvl) if err != nil { diff --git a/extras/dogswatch/pkg/nodestream/informer.go b/extras/dogswatch/pkg/nodestream/informer.go index b92a41c2a5e..a92f92e3220 100644 --- a/extras/dogswatch/pkg/nodestream/informer.go +++ b/extras/dogswatch/pkg/nodestream/informer.go @@ -91,16 +91,16 @@ func (is *informerStream) shutdown() { } func (is *informerStream) OnAdd(obj interface{}) { - is.log.Debug("add event") + is.log.Debug("resource add event") is.handler.OnAdd(obj.(*v1.Node)) } func (is *informerStream) OnDelete(obj interface{}) { - is.log.Debug("delete event") + is.log.Debug("resource delete event") is.handler.OnDelete(obj.(*v1.Node)) } func (is *informerStream) OnUpdate(oldObj, newObj interface{}) { - is.log.Debug("update event") + is.log.Debug("resource update event") is.handler.OnUpdate(oldObj.(*v1.Node), newObj.(*v1.Node)) } diff --git a/extras/dogswatch/pkg/platform/updog/updog.go b/extras/dogswatch/pkg/platform/updog/updog.go index ed4155aeb8f..c813ea116de 100644 --- a/extras/dogswatch/pkg/platform/updog/updog.go +++ b/extras/dogswatch/pkg/platform/updog/updog.go @@ -10,17 +10,23 @@ import ( "github.com/amazonlinux/thar/dogswatch/pkg/logging" "github.com/amazonlinux/thar/dogswatch/pkg/thar" "github.com/pkg/errors" - "github.com/sirupsen/logrus" ) var ( updogBin = filepath.Join(thar.PlatformBin, "updog") ) +const ( + // updateIdentifier is a stand-in update identifier for Updog sourced updates. + updateIdentifier = "latest" +) + // updog implements the binding for the platform to the host's implementation // for manipulating updates on its behalf. type updog struct { Bin command + + log logging.Logger } type command interface { @@ -31,7 +37,9 @@ type command interface { Status() (bool, error) } -type executable struct{} +type executable struct { + log logging.SubLogger +} func (e *executable) runOk(cmd *exec.Cmd) (bool, error) { cmd.SysProcAttr = thar.ProcessAttrs() @@ -41,42 +49,33 @@ func (e *executable) runOk(cmd *exec.Cmd) (bool, error) { cmd.Stdout = writer cmd.Stderr = writer - if logging.Debuggable { - logging.New("updog").WithFields(logrus.Fields{ - "cmd": cmd.String(), - }).Debug("Executing") - } - + log := e.log.WithField("cmd", cmd.String()) + log.Debug("running command") if err := cmd.Start(); err != nil { + log.WithError(err).Error("failed to start command") if logging.Debuggable { - logging.New("updog").WithFields(logrus.Fields{ - "cmd": cmd.String(), - "output": buf.String(), - }).WithError(err).Error("Failed to start command") + log.WithField("output", buf.String()).Debugf("command output") } return false, err } err := cmd.Wait() if err != nil { + log.WithError(err).Error("error during command run") if logging.Debuggable { - logging.New("updog").WithFields(logrus.Fields{ - "cmd": cmd.String(), - "output": buf.String(), - }).WithError(err).Error("Command errored durring run") + log.WithField("output", buf.String()).Debug("command output") } return false, err } + log.Debug("command completed successfully") if logging.Debuggable { - logging.New("updog").WithFields(logrus.Fields{ - "cmd": cmd.String(), - "output": buf.String(), - }).Debug("Command completed successfully") + log.WithField("output", buf.String()).Debug("command output") } // Boolean currently only used by ListUpdate. Returns true if the // command yielded output, which indicates an update is available. // TODO: Update this when an interface is defined between updog // and dogswatch. - return len(buf.String()) > 0, err + updateEmitted := len(buf.String()) > 0 + return updateEmitted, err } func (e *executable) CheckUpdate() (bool, error) { @@ -109,7 +108,12 @@ func (e *executable) Status() (bool, error) { } func newUpdogHost() Host { - return &updog{Bin: &executable{}} + log := logging.New("updog") + + return &updog{ + Bin: &executable{log: log.WithField(logging.SubComponentField, "host-bin")}, + log: log, + } } func (u *updog) Status() (*statusResponse, error) { @@ -120,24 +124,23 @@ func (u *updog) Status() (*statusResponse, error) { } func (u *updog) ListAvailable() (*listAvailableResponse, error) { - if avail, err := u.Bin.CheckUpdate(); err != nil { - return nil, err - } else { - if avail { - return &listAvailableResponse{ - // TODO: deserialize output from updog and plumb version IDs - ReportedUpdates: []*availableUpdate{&availableUpdate{ID: "POSITIVE_STUB_INDICATOR"}}, - }, nil - } else { - return &listAvailableResponse{}, nil - } + avail, err := u.Bin.CheckUpdate() + if err != nil { + return nil, errors.Wrap(err, "unable to check for updates") + } + if avail { + return &listAvailableResponse{ + // TODO: deserialize output from updog and plumb version IDs + ReportedUpdates: []*availableUpdate{&availableUpdate{ID: updateIdentifier}}, + }, nil } + return &listAvailableResponse{}, nil } func (u *updog) PrepareUpdate(id UpdateID) (*prepareUpdateResponse, error) { // TODO: extend updog for prepare steps. return &prepareUpdateResponse{ - ID: "POSITIVE_STUB_INDICATOR", + ID: updateIdentifier, }, nil } diff --git a/extras/dogswatch/pkg/thar/containerd.go b/extras/dogswatch/pkg/thar/containerd.go new file mode 100644 index 00000000000..3a35c01b58f --- /dev/null +++ b/extras/dogswatch/pkg/thar/containerd.go @@ -0,0 +1,156 @@ +package thar + +import ( + "io" + "os" + "path/filepath" + "strconv" + + "github.com/amazonlinux/thar/dogswatch/pkg/logging" + systemd "github.com/coreos/go-systemd/v22/dbus" + "github.com/coreos/go-systemd/v22/unit" + dbus "github.com/godbus/dbus/v5" + "github.com/pkg/errors" +) + +var ( + systemdUnitTransient = filepath.Join(RootFS, "/run/systemd/system") + systemdSocket = filepath.Join(RootFS, "/run/systemd/private") + + containerdUnit = "containerd.service" + containerdDropInDir = filepath.Join(systemdUnitTransient, containerdUnit+".d") + + containerdKillMode = "mixed" +) + +type containerdDropIn struct{} + +func (*containerdDropIn) Name() string { + return "containerd-killmode" +} + +func (c *containerdDropIn) Apply(log logging.SubLogger) (bool, error) { + err := c.writeUnit() + if err != nil { + return false, err + } + err = c.reloadUnit() + if err != nil { + return false, err + } + return true, nil +} + +func (c *containerdDropIn) Check(log logging.SubLogger) (bool, error) { + if !c.runEnvironment(log) { + log.Debug("environment prevents run") + return false, nil + } + + conn, err := c.connect() + if err != nil { + log.Warn("unable to connect to systemd daemon socket") + return false, err + } + defer conn.Close() + + prop, err := conn.GetUnitTypeProperty(containerdUnit, "Service", "KillMode") + if err != nil { + return false, errors.Wrap(err, "unable to query service unit") + } + variant := prop.Value + if mode, ok := variant.Value().(string); ok { + log.WithField("KillMode", mode).Debugf("identified %s KillMode", containerdUnit) + if mode == containerdKillMode { + log.Debug("mitigation not required") + return false, nil + } + } else { + // KillMode property wasn't a string, but it should be. + log.Debugf("failed to reflect string for property %q", "KillMode") + log.Debugf("property object %#v", prop) + return false, errors.Errorf("unable to handle queried property: %q", prop) + } + return true, nil +} + +func (*containerdDropIn) runEnvironment(log logging.SubLogger) bool { + // This doesn't apply without having root. + if uid := os.Getuid(); uid != 0 { + log.WithField("uid", uid).Debug("requires root") + return false + } + + // And needs systemd access + stat, err := os.Stat(systemdSocket) + if err != nil { + log.WithField("socket", systemdSocket).Debug("requires systemd socket at path") + return false + } + isSocket := stat.Mode()&os.ModeSocket == os.ModeSocket + if !isSocket { + log.WithField("socket", systemdSocket).Debug("requires systemd unix socket access") + return false + } + log.Debug("environment permits run") + return true +} + +func (*containerdDropIn) writeUnit() error { + // Drop-In Unit + options := []*unit.UnitOption{ + unit.NewUnitOption("Service", "KillMode", containerdKillMode), + } + + err := os.MkdirAll(containerdDropInDir, 0750) + if err != nil { + return errors.Wrap(err, "unable to create transient unit dir") + } + + f, err := os.Create(filepath.Join(containerdDropInDir, "99-killmode-workaround.conf")) + if err != nil { + return errors.Wrap(err, "unable to create drop in unit") + } + _, err = io.Copy(f, unit.Serialize(options)) + if err != nil { + f.Close() + os.Remove(f.Name()) + return errors.Wrap(err, "unable to write drop in unit") + } + f.Close() + return nil +} + +func (c *containerdDropIn) reloadUnit() error { + sd, err := c.connect() + if err != nil { + return errors.Wrap(err, "unable to connect to systemd") + } + defer sd.Close() + + err = sd.Reload() + if err != nil { + return errors.Wrap(err, "unable to execute daemon-reload") + } + // For now, this is all that's needed. + return nil +} + +func (c *containerdDropIn) connect() (*systemd.Conn, error) { + dialer := func() (*dbus.Conn, error) { + // Connect to the thar systemd socket + conn, err := dbus.Dial("unix:path=" + systemdSocket) + if err != nil { + return nil, errors.Wrap(err, "unable to connect to thar systemd socket") + } + // Authenticate with the user's authority. + methods := []dbus.Auth{dbus.AuthExternal(strconv.Itoa(os.Getuid()))} + err = conn.Auth(methods) + if err != nil { + conn.Close() + return nil, errors.Wrap(err, "unable to authenticate with thar systemd") + } + return conn, nil + } + return systemd.NewConnection(dialer) +} diff --git a/extras/dogswatch/pkg/thar/containerd_test.go b/extras/dogswatch/pkg/thar/containerd_test.go new file mode 100644 index 00000000000..da20e052ba2 --- /dev/null +++ b/extras/dogswatch/pkg/thar/containerd_test.go @@ -0,0 +1,12 @@ +package thar + +import ( + "testing" +) + +func TestContainerdSystemdPath(t *testing.T) { + t.Log(containerdDropInDir) + if containerdDropInDir != "/.thar/rootfs/run/systemd/system/containerd.service.d" { + t.Fatal("should have matched") + } +} diff --git a/extras/dogswatch/pkg/thar/mitigate.go b/extras/dogswatch/pkg/thar/mitigate.go new file mode 100644 index 00000000000..1ccd3e3d939 --- /dev/null +++ b/extras/dogswatch/pkg/thar/mitigate.go @@ -0,0 +1,65 @@ +package thar + +import ( + "github.com/amazonlinux/thar/dogswatch/pkg/logging" + "github.com/pkg/errors" +) + +var mitigations = []mitigation{ + &containerdDropIn{}, +} + +// mitigation applies a change, if needed, to permit dogswatch operation. +type mitigation interface { + Name() string + Check(logging.SubLogger) (bool, error) + Apply(logging.SubLogger) (bool, error) +} + +func ApplyMitigations() error { + log := logging.New("mitigation") + errored := false + applied := false + + for _, m := range mitigations { + mlog := log.WithField("mitigation", m.Name()) + needed, err := m.Check(log) + if err != nil { + errored = true + mlog.WithError(err).Error("unable to determine need") + continue + } + if !needed { + mlog.Debug("not needed") + continue + } + + applied = true + mlog.Warn("applying mitigation") + applied, err := m.Apply(log) + if err != nil { + errored = true + mlog.WithError(err).Error("unable to apply") + + continue + } + if !applied { + errored = true + mlog.Error("unsuccessful") + continue + } + mlog.Warn("applied mitigation") + } + + if errored { + err := errors.New("errors occurred during mitigation fixes") + log.WithError(err).Error("see log for mitigation attempts") + return err + } + + if applied { + log.Info("applied mitigations") + } + + return nil +}