From fd2e1251d8b993193db04c9bd9e6b31ea472413c Mon Sep 17 00:00:00 2001 From: Yi Chen Date: Thu, 24 Oct 2024 09:52:30 +0800 Subject: [PATCH 1/3] Update default container security context (#2265) * Update default container security context Signed-off-by: Yi Chen * Push user and group directives into Dockerfile Signed-off-by: Yi Chen * Add allowPrivilegeEscalation to container security context Signed-off-by: Yi Chen * fix: fsGroup should be moved to pod security context Signed-off-by: Yi Chen --------- Signed-off-by: Yi Chen --- Dockerfile | 8 +++++- charts/spark-operator-chart/README.md | 8 +++--- charts/spark-operator-chart/values.yaml | 34 +++++++++++++------------ 3 files changed, 29 insertions(+), 21 deletions(-) diff --git a/Dockerfile b/Dockerfile index 77803169b..aeda3b2cc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -26,7 +26,9 @@ RUN --mount=type=cache,target=/go/pkg/mod/ \ go mod download COPY . . + ENV GOCACHE=/root/.cache/go-build + ARG TARGETARCH RUN --mount=type=cache,target=/go/pkg/mod/ \ @@ -35,6 +37,10 @@ RUN --mount=type=cache,target=/go/pkg/mod/ \ FROM ${SPARK_IMAGE} +ARG SPARK_UID=185 + +ARG SPARK_GID=185 + USER root RUN apt-get update \ @@ -45,7 +51,7 @@ RUN mkdir -p /etc/k8s-webhook-server/serving-certs /home/spark && \ chmod -R g+rw /etc/k8s-webhook-server/serving-certs && \ chown -R spark /etc/k8s-webhook-server/serving-certs /home/spark -USER spark +USER ${SPARK_UID}:${SPARK_GID} COPY --from=builder /workspace/bin/spark-operator /usr/bin/spark-operator diff --git a/charts/spark-operator-chart/README.md b/charts/spark-operator-chart/README.md index 1df28cdc7..c0e683b75 100644 --- a/charts/spark-operator-chart/README.md +++ b/charts/spark-operator-chart/README.md @@ -106,13 +106,13 @@ See [helm uninstall](https://helm.sh/docs/helm/helm_uninstall) for command docum | controller.affinity | object | `{}` | Affinity for controller pods. | | controller.tolerations | list | `[]` | List of node taints to tolerate for controller pods. | | controller.priorityClassName | string | `""` | Priority class for controller pods. | -| controller.podSecurityContext | object | `{}` | Security context for controller pods. | +| controller.podSecurityContext | object | `{"fsGroup":185}` | Security context for controller pods. | | controller.topologySpreadConstraints | list | `[]` | Topology spread constraints rely on node labels to identify the topology domain(s) that each Node is in. Ref: [Pod Topology Spread Constraints](https://kubernetes.io/docs/concepts/workloads/pods/pod-topology-spread-constraints/). The labelSelector field in topology spread constraint will be set to the selector labels for controller pods if not specified. | | controller.env | list | `[]` | Environment variables for controller containers. | | controller.envFrom | list | `[]` | Environment variable sources for controller containers. | | controller.volumeMounts | list | `[]` | Volume mounts for controller containers. | | controller.resources | object | `{}` | Pod resource requests and limits for controller containers. Note, that each job submission will spawn a JVM within the controller pods using "/usr/local/openjdk-11/bin/java -Xmx128m". Kubernetes may kill these Java processes at will to enforce resource limits. When that happens, you will see the following error: 'failed to run spark-submit for SparkApplication [...]: signal: killed' - when this happens, you may want to increase memory limits. | -| controller.securityContext | object | `{}` | Security context for controller containers. | +| controller.securityContext | object | `{"allowPrivilegeEscalation":false,"capabilities":{"drop":["ALL"]},"privileged":false,"runAsNonRoot":true}` | Security context for controller containers. | | controller.sidecars | list | `[]` | Sidecar containers for controller pods. | | controller.podDisruptionBudget.enable | bool | `false` | Specifies whether to create pod disruption budget for controller. Ref: [Specifying a Disruption Budget for your Application](https://kubernetes.io/docs/tasks/run-application/configure-pdb/) | | controller.podDisruptionBudget.minAvailable | int | `1` | The number of pods that must be available. Require `controller.replicas` to be greater than 1 | @@ -144,13 +144,13 @@ See [helm uninstall](https://helm.sh/docs/helm/helm_uninstall) for command docum | webhook.affinity | object | `{}` | Affinity for webhook pods. | | webhook.tolerations | list | `[]` | List of node taints to tolerate for webhook pods. | | webhook.priorityClassName | string | `""` | Priority class for webhook pods. | -| webhook.podSecurityContext | object | `{}` | Security context for webhook pods. | +| webhook.podSecurityContext | object | `{"fsGroup":185}` | Security context for webhook pods. | | webhook.topologySpreadConstraints | list | `[]` | Topology spread constraints rely on node labels to identify the topology domain(s) that each Node is in. Ref: [Pod Topology Spread Constraints](https://kubernetes.io/docs/concepts/workloads/pods/pod-topology-spread-constraints/). The labelSelector field in topology spread constraint will be set to the selector labels for webhook pods if not specified. | | webhook.env | list | `[]` | Environment variables for webhook containers. | | webhook.envFrom | list | `[]` | Environment variable sources for webhook containers. | | webhook.volumeMounts | list | `[]` | Volume mounts for webhook containers. | | webhook.resources | object | `{}` | Pod resource requests and limits for webhook pods. | -| webhook.securityContext | object | `{}` | Security context for webhook containers. | +| webhook.securityContext | object | `{"allowPrivilegeEscalation":false,"capabilities":{"drop":["ALL"]},"privileged":false,"runAsNonRoot":true}` | Security context for webhook containers. | | webhook.podDisruptionBudget.enable | bool | `false` | Specifies whether to create pod disruption budget for webhook. Ref: [Specifying a Disruption Budget for your Application](https://kubernetes.io/docs/tasks/run-application/configure-pdb/) | | webhook.podDisruptionBudget.minAvailable | int | `1` | The number of pods that must be available. Require `webhook.replicas` to be greater than 1 | | spark.jobNamespaces | list | `["default"]` | List of namespaces where to run spark jobs. If empty string is included, all namespaces will be allowed. Make sure the namespaces have already existed. | diff --git a/charts/spark-operator-chart/values.yaml b/charts/spark-operator-chart/values.yaml index 672d62252..9032087c6 100644 --- a/charts/spark-operator-chart/values.yaml +++ b/charts/spark-operator-chart/values.yaml @@ -120,10 +120,8 @@ controller: priorityClassName: "" # -- Security context for controller pods. - podSecurityContext: {} - # runAsUser: 1000 - # runAsGroup: 2000 - # fsGroup: 3000 + podSecurityContext: + fsGroup: 185 # -- Topology spread constraints rely on node labels to identify the topology domain(s) that each Node is in. # Ref: [Pod Topology Spread Constraints](https://kubernetes.io/docs/concepts/workloads/pods/pod-topology-spread-constraints/). @@ -158,10 +156,13 @@ controller: # memory: 300Mi # -- Security context for controller containers. - securityContext: {} - # runAsUser: 1000 - # runAsGroup: 2000 - # fsGroup: 3000 + securityContext: + privileged: false + allowPrivilegeEscalation: false + runAsNonRoot: true + capabilities: + drop: + - ALL # -- Sidecar containers for controller pods. sidecars: [] @@ -266,10 +267,8 @@ webhook: priorityClassName: "" # -- Security context for webhook pods. - podSecurityContext: {} - # runAsUser: 1000 - # runAsGroup: 2000 - # fsGroup: 3000 + podSecurityContext: + fsGroup: 185 # -- Topology spread constraints rely on node labels to identify the topology domain(s) that each Node is in. # Ref: [Pod Topology Spread Constraints](https://kubernetes.io/docs/concepts/workloads/pods/pod-topology-spread-constraints/). @@ -301,10 +300,13 @@ webhook: # memory: 300Mi # -- Security context for webhook containers. - securityContext: {} - # runAsUser: 1000 - # runAsGroup: 2000 - # fsGroup: 3000 + securityContext: + privileged: false + allowPrivilegeEscalation: false + runAsNonRoot: true + capabilities: + drop: + - ALL # Pod disruption budget for webhook to avoid service degradation. podDisruptionBudget: From d0daf2fd1740a6c448baf7091b1976d46ef4a023 Mon Sep 17 00:00:00 2001 From: Yi Chen Date: Thu, 24 Oct 2024 10:23:30 +0800 Subject: [PATCH 2/3] Support pod template for Spark 3.x applications (#2141) * Update API definition to support pod template Signed-off-by: Yi Chen * Mark pod template field as schemaless Signed-off-by: Yi Chen * Add kubebuilder marker to preserve unknown fields Signed-off-by: Yi Chen * Add example for using pod template Signed-off-by: Yi Chen * Support pod template Signed-off-by: Yi Chen --------- Signed-off-by: Yi Chen --- Makefile | 2 +- api/v1beta2/sparkapplication_types.go | 8 + api/v1beta2/zz_generated.deepcopy.go | 5 + ...tor.k8s.io_scheduledsparkapplications.yaml | 31 +++ ...parkoperator.k8s.io_sparkapplications.yaml | 31 +++ ...tor.k8s.io_scheduledsparkapplications.yaml | 31 +++ ...parkoperator.k8s.io_sparkapplications.yaml | 31 +++ docs/api-docs.md | 16 ++ examples/spark-pi-pod-template.yaml | 197 ++++++++++++++++++ go.mod | 3 +- .../controller/sparkapplication/controller.go | 27 ++- .../controller/sparkapplication/submission.go | 60 +++++- .../webhook/sparkapplication_defaulter.go | 23 -- .../webhook/sparkapplication_validator.go | 14 ++ pkg/common/spark.go | 3 + pkg/util/util.go | 41 ++++ pkg/util/util_test.go | 72 +++++++ 17 files changed, 566 insertions(+), 29 deletions(-) create mode 100644 examples/spark-pi-pod-template.yaml diff --git a/Makefile b/Makefile index 3283f2a4a..49a2713c1 100644 --- a/Makefile +++ b/Makefile @@ -109,7 +109,7 @@ print-%: ; @echo $*=$($*) .PHONY: manifests manifests: controller-gen ## Generate CustomResourceDefinition, RBAC and WebhookConfiguration manifests. - $(CONTROLLER_GEN) crd rbac:roleName=spark-operator-controller webhook paths="./..." output:crd:artifacts:config=config/crd/bases + $(CONTROLLER_GEN) crd:generateEmbeddedObjectMeta=true rbac:roleName=spark-operator-controller webhook paths="./..." output:crd:artifacts:config=config/crd/bases .PHONY: generate generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations. diff --git a/api/v1beta2/sparkapplication_types.go b/api/v1beta2/sparkapplication_types.go index 71a810cbf..c56891187 100644 --- a/api/v1beta2/sparkapplication_types.go +++ b/api/v1beta2/sparkapplication_types.go @@ -409,6 +409,14 @@ type Dependencies struct { // SparkPodSpec defines common things that can be customized for a Spark driver or executor pod. // TODO: investigate if we should use v1.PodSpec and limit what can be set instead. type SparkPodSpec struct { + // Template is a pod template that can be used to define the driver or executor pod configurations that Spark configurations do not support. + // Spark version >= 3.0.0 is required. + // Ref: https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template. + // +optional + // +kubebuilder:validation:Schemaless + // +kubebuilder:validation:Type:=object + // +kubebuilder:pruning:PreserveUnknownFields + Template *corev1.PodTemplateSpec `json:"template,omitempty"` // Cores maps to `spark.driver.cores` or `spark.executor.cores` for the driver and executors, respectively. // +optional // +kubebuilder:validation:Minimum=1 diff --git a/api/v1beta2/zz_generated.deepcopy.go b/api/v1beta2/zz_generated.deepcopy.go index eb7c6239d..635e19af5 100644 --- a/api/v1beta2/zz_generated.deepcopy.go +++ b/api/v1beta2/zz_generated.deepcopy.go @@ -876,6 +876,11 @@ func (in *SparkApplicationStatus) DeepCopy() *SparkApplicationStatus { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SparkPodSpec) DeepCopyInto(out *SparkPodSpec) { *out = *in + if in.Template != nil { + in, out := &in.Template, &out.Template + *out = new(v1.PodTemplateSpec) + (*in).DeepCopyInto(*out) + } if in.Cores != nil { in, out := &in.Cores, &out.Cores *out = new(int32) diff --git a/charts/spark-operator-chart/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml b/charts/spark-operator-chart/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml index 0ce3ee05e..7aa9c4af2 100644 --- a/charts/spark-operator-chart/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml +++ b/charts/spark-operator-chart/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml @@ -4755,6 +4755,13 @@ spec: - name type: object type: array + template: + description: |- + Template is a pod template that can be used to define the driver or executor pod configurations that Spark configurations do not support. + Spark version >= 3.0.0 is required. + Ref: https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template. + type: object + x-kubernetes-preserve-unknown-fields: true terminationGracePeriodSeconds: description: Termination grace period seconds for the pod format: int64 @@ -9512,6 +9519,13 @@ spec: - name type: object type: array + template: + description: |- + Template is a pod template that can be used to define the driver or executor pod configurations that Spark configurations do not support. + Spark version >= 3.0.0 is required. + Ref: https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template. + type: object + x-kubernetes-preserve-unknown-fields: true terminationGracePeriodSeconds: description: Termination grace period seconds for the pod format: int64 @@ -10351,6 +10365,23 @@ spec: May contain labels and annotations that will be copied into the PVC when creating it. No other fields are allowed and will be rejected during validation. + properties: + annotations: + additionalProperties: + type: string + type: object + finalizers: + items: + type: string + type: array + labels: + additionalProperties: + type: string + type: object + name: + type: string + namespace: + type: string type: object spec: description: |- diff --git a/charts/spark-operator-chart/crds/sparkoperator.k8s.io_sparkapplications.yaml b/charts/spark-operator-chart/crds/sparkoperator.k8s.io_sparkapplications.yaml index 70034a0f9..4c839e36e 100644 --- a/charts/spark-operator-chart/crds/sparkoperator.k8s.io_sparkapplications.yaml +++ b/charts/spark-operator-chart/crds/sparkoperator.k8s.io_sparkapplications.yaml @@ -4694,6 +4694,13 @@ spec: - name type: object type: array + template: + description: |- + Template is a pod template that can be used to define the driver or executor pod configurations that Spark configurations do not support. + Spark version >= 3.0.0 is required. + Ref: https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template. + type: object + x-kubernetes-preserve-unknown-fields: true terminationGracePeriodSeconds: description: Termination grace period seconds for the pod format: int64 @@ -9421,6 +9428,13 @@ spec: - name type: object type: array + template: + description: |- + Template is a pod template that can be used to define the driver or executor pod configurations that Spark configurations do not support. + Spark version >= 3.0.0 is required. + Ref: https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template. + type: object + x-kubernetes-preserve-unknown-fields: true terminationGracePeriodSeconds: description: Termination grace period seconds for the pod format: int64 @@ -10257,6 +10271,23 @@ spec: May contain labels and annotations that will be copied into the PVC when creating it. No other fields are allowed and will be rejected during validation. + properties: + annotations: + additionalProperties: + type: string + type: object + finalizers: + items: + type: string + type: array + labels: + additionalProperties: + type: string + type: object + name: + type: string + namespace: + type: string type: object spec: description: |- diff --git a/config/crd/bases/sparkoperator.k8s.io_scheduledsparkapplications.yaml b/config/crd/bases/sparkoperator.k8s.io_scheduledsparkapplications.yaml index 0ce3ee05e..7aa9c4af2 100644 --- a/config/crd/bases/sparkoperator.k8s.io_scheduledsparkapplications.yaml +++ b/config/crd/bases/sparkoperator.k8s.io_scheduledsparkapplications.yaml @@ -4755,6 +4755,13 @@ spec: - name type: object type: array + template: + description: |- + Template is a pod template that can be used to define the driver or executor pod configurations that Spark configurations do not support. + Spark version >= 3.0.0 is required. + Ref: https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template. + type: object + x-kubernetes-preserve-unknown-fields: true terminationGracePeriodSeconds: description: Termination grace period seconds for the pod format: int64 @@ -9512,6 +9519,13 @@ spec: - name type: object type: array + template: + description: |- + Template is a pod template that can be used to define the driver or executor pod configurations that Spark configurations do not support. + Spark version >= 3.0.0 is required. + Ref: https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template. + type: object + x-kubernetes-preserve-unknown-fields: true terminationGracePeriodSeconds: description: Termination grace period seconds for the pod format: int64 @@ -10351,6 +10365,23 @@ spec: May contain labels and annotations that will be copied into the PVC when creating it. No other fields are allowed and will be rejected during validation. + properties: + annotations: + additionalProperties: + type: string + type: object + finalizers: + items: + type: string + type: array + labels: + additionalProperties: + type: string + type: object + name: + type: string + namespace: + type: string type: object spec: description: |- diff --git a/config/crd/bases/sparkoperator.k8s.io_sparkapplications.yaml b/config/crd/bases/sparkoperator.k8s.io_sparkapplications.yaml index 70034a0f9..4c839e36e 100644 --- a/config/crd/bases/sparkoperator.k8s.io_sparkapplications.yaml +++ b/config/crd/bases/sparkoperator.k8s.io_sparkapplications.yaml @@ -4694,6 +4694,13 @@ spec: - name type: object type: array + template: + description: |- + Template is a pod template that can be used to define the driver or executor pod configurations that Spark configurations do not support. + Spark version >= 3.0.0 is required. + Ref: https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template. + type: object + x-kubernetes-preserve-unknown-fields: true terminationGracePeriodSeconds: description: Termination grace period seconds for the pod format: int64 @@ -9421,6 +9428,13 @@ spec: - name type: object type: array + template: + description: |- + Template is a pod template that can be used to define the driver or executor pod configurations that Spark configurations do not support. + Spark version >= 3.0.0 is required. + Ref: https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template. + type: object + x-kubernetes-preserve-unknown-fields: true terminationGracePeriodSeconds: description: Termination grace period seconds for the pod format: int64 @@ -10257,6 +10271,23 @@ spec: May contain labels and annotations that will be copied into the PVC when creating it. No other fields are allowed and will be rejected during validation. + properties: + annotations: + additionalProperties: + type: string + type: object + finalizers: + items: + type: string + type: array + labels: + additionalProperties: + type: string + type: object + name: + type: string + namespace: + type: string type: object spec: description: |- diff --git a/docs/api-docs.md b/docs/api-docs.md index 531c53f9a..08f8d42d8 100644 --- a/docs/api-docs.md +++ b/docs/api-docs.md @@ -2828,6 +2828,22 @@ TODO: investigate if we should use v1.PodSpec and limit what can be set instead. +template
+ + +Kubernetes core/v1.PodTemplateSpec + + + + +(Optional) +

Template is a pod template that can be used to define the driver or executor pod configurations that Spark configurations do not support. +Spark version >= 3.0.0 is required. +Ref: https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template.

+ + + + cores
int32 diff --git a/examples/spark-pi-pod-template.yaml b/examples/spark-pi-pod-template.yaml new file mode 100644 index 000000000..552dc0030 --- /dev/null +++ b/examples/spark-pi-pod-template.yaml @@ -0,0 +1,197 @@ +# +# Copyright 2024 The Kubeflow authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: ConfigMap +metadata: + name: test-configmap + namespace: default +data: + KEY1: VALUE1 + +--- +apiVersion: v1 +kind: Secret +metadata: + name: test-secret + namespace: default +stringData: + KEY2: VALUE2 + +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: test-configmap-2 + namespace: default +data: + KEY3: VALUE3 + +--- +apiVersion: v1 +kind: Secret +metadata: + name: test-secret-2 + namespace: default +stringData: + KEY4: VALUE4 + +--- +apiVersion: sparkoperator.k8s.io/v1beta2 +kind: SparkApplication +metadata: + name: spark-pi-pod-template + namespace: default +spec: + type: Scala + mode: cluster + sparkVersion: 3.5.3 + image: spark:3.5.3 + imagePullPolicy: IfNotPresent + mainApplicationFile: local:///opt/spark/examples/jars/spark-examples.jar + mainClass: org.apache.spark.examples.SparkPi + arguments: + - "10000" + driver: + template: + metadata: + labels: + spark.apache.org/version: 3.5.3 + annotations: + spark.apache.org/version: 3.5.3 + spec: + containers: + - name: spark-kubernetes-driver + env: + - name: KEY0 + value: VALUE0 + - name: KEY1 + valueFrom: + configMapKeyRef: + name: test-configmap + key: KEY1 + - name: KEY2 + valueFrom: + secretKeyRef: + name: test-secret + key: KEY2 + envFrom: + - configMapRef: + name: test-configmap-2 + - secretRef: + name: test-secret-2 + ports: + - name: custom-port + containerPort: 12345 + protocol: TCP + # The resources section will not work for cpu/memory requests and limits. + # Ref: https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template. + resources: + requests: + # Please use `spec.driver.cores` instead. + cpu: 500m + # Please use `spec.driver.memory` and `spec.driver.memoryOverhead` instead. + memory: 512Mi + limits: + # Please use `spec.driver.coreLimit` instead. + cpu: 1 + # Please use `spec.driver.memory` and `spec.driver.memoryOverhead` instead. + memory: 1Gi + nodeSelector: + kubernetes.io/os: linux + affinity: + podAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 1 + podAffinityTerm: + labelSelector: + matchLabels: + spark-app-name: spark-pi-pod-template + topologyKey: kubernetes.io/hostname + tolerations: + - operator: Exists + effect: NoSchedule + serviceAccountName: spark-operator-spark + cores: 1 + coreLimit: "1" + memory: 512m + memoryOverhead: 512m + executor: + instances: 1 + template: + metadata: + labels: + spark.apache.org/version: 3.5.3 + annotations: + spark.apache.org/version: 3.5.3 + spec: + containers: + - name: spark-kubernetes-executor + env: + - name: KEY0 + value: VALUE0 + - name: KEY1 + valueFrom: + configMapKeyRef: + name: test-configmap + key: KEY1 + - name: KEY2 + valueFrom: + secretKeyRef: + name: test-secret + key: KEY2 + envFrom: + - configMapRef: + name: test-configmap-2 + - secretRef: + name: test-secret-2 + volumeMounts: + - name: spark-local-dir-1 + mountPath: /mnt/disk1 + # The resources section will not work for cpu/memory requests and limits. + # Ref: https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template. + resources: + requests: + # Please use `spec.executor.cores` instead. + cpu: 1 + # Please use `spec.executor.memory` and `spec.executor.memoryOverhead` instead. + memory: 1Gi + limits: + # Please use `spec.executor.coreLimit` instead. + cpu: 1500m + # Please use `spec.executor.memory` and `spec.executor.memoryOverhead` instead. + memory: 1512Mi + volumes: + - name: spark-local-dir-1 + emptyDir: + sizeLimit: 100Mi + nodeSelector: + kubernetes.io/os: linux + affinity: + podAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 1 + podAffinityTerm: + labelSelector: + matchLabels: + spark-app-name: spark-pi-pod-template + topologyKey: kubernetes.io/hostname + tolerations: + - operator: Exists + effect: NoSchedule + cores: 1 + coreLimit: 1500m + memory: 1g + memoryOverhead: 512m diff --git a/go.mod b/go.mod index d5e600b9c..11af87ff3 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/stretchr/testify v1.9.0 go.uber.org/zap v1.27.0 gocloud.dev v0.40.0 + golang.org/x/mod v0.20.0 golang.org/x/net v0.30.0 golang.org/x/time v0.7.0 helm.sh/helm/v3 v3.16.2 @@ -30,6 +31,7 @@ require ( k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 sigs.k8s.io/controller-runtime v0.17.5 sigs.k8s.io/scheduler-plugins v0.29.8 + sigs.k8s.io/yaml v1.4.0 volcano.sh/apis v1.9.0 ) @@ -229,7 +231,6 @@ require ( sigs.k8s.io/kustomize/api v0.17.2 // indirect sigs.k8s.io/kustomize/kyaml v0.17.1 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect - sigs.k8s.io/yaml v1.4.0 // indirect ) replace ( diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index 7a707df00..e4cb78d24 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -19,6 +19,7 @@ package sparkapplication import ( "context" "fmt" + "os" "strconv" "time" @@ -643,8 +644,10 @@ func (r *Reconciler) getSparkApplication(key types.NamespacedName) (*v1beta2.Spa // submitSparkApplication creates a new submission for the given SparkApplication and submits it using spark-submit. func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (submitErr error) { logger.Info("Submitting SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) + // SubmissionID must be set before creating any resources to ensure all the resources are labeled. app.Status.SubmissionID = uuid.New().String() + app.Status.DriverInfo.PodName = util.GetDriverPodName(app) app.Status.LastSubmissionAttemptTime = metav1.Now() app.Status.SubmissionAttempts = app.Status.SubmissionAttempts + 1 @@ -736,8 +739,12 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (subm } } - driverPodName := util.GetDriverPodName(app) - app.Status.DriverInfo.PodName = driverPodName + defer func() { + if err := r.cleanUpPodTemplateFiles(app); err != nil { + logger.Error(fmt.Errorf("failed to clean up pod template files: %v", err), "name", app.Name, "namespace", app.Namespace) + } + }() + sparkSubmitArgs, err := buildSparkSubmitArgs(app) if err != nil { return fmt.Errorf("failed to build spark-submit arguments: %v", err) @@ -746,6 +753,7 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (subm // Try submitting the application by running spark-submit. logger.Info("Running spark-submit for SparkApplication", "name", app.Name, "namespace", app.Namespace, "arguments", sparkSubmitArgs) if err := runSparkSubmit(newSubmission(sparkSubmitArgs, app)); err != nil { + r.recordSparkApplicationEvent(app) return fmt.Errorf("failed to run spark-submit: %v", err) } return nil @@ -1224,3 +1232,18 @@ func (r *Reconciler) cleanUpOnTermination(_, newApp *v1beta2.SparkApplication) e } return nil } + +// cleanUpPodTemplateFiles cleans up the driver and executor pod template files. +func (r *Reconciler) cleanUpPodTemplateFiles(app *v1beta2.SparkApplication) error { + if app.Spec.Driver.Template == nil && app.Spec.Executor.Template == nil { + return nil + } + path := fmt.Sprintf("/tmp/spark/%s", app.Status.SubmissionID) + if err := os.RemoveAll(path); err != nil { + if !os.IsNotExist(err) { + return err + } + } + logger.V(1).Info("Deleted pod template files", "path", path) + return nil +} diff --git a/internal/controller/sparkapplication/submission.go b/internal/controller/sparkapplication/submission.go index 66e4a0be8..d0bb6f578 100644 --- a/internal/controller/sparkapplication/submission.go +++ b/internal/controller/sparkapplication/submission.go @@ -83,15 +83,17 @@ func buildSparkSubmitArgs(app *v1beta2.SparkApplication) ([]string, error) { submissionWaitAppCompletionOption, sparkConfOption, hadoopConfOption, + driverPodTemplateOption, driverPodNameOption, driverConfOption, - driverSecretOption, driverEnvOption, + driverSecretOption, driverVolumeMountsOption, + executorPodTemplateOption, executorConfOption, + executorEnvOption, executorSecretOption, executorVolumeMountsOption, - executorEnvOption, nodeSelectorOption, dynamicAllocationOption, proxyUserOption, @@ -303,6 +305,12 @@ func driverConfOption(app *v1beta2.SparkApplication) ([]string, error) { property = fmt.Sprintf(common.SparkKubernetesDriverLabelTemplate, common.LabelLaunchedBySparkOperator) args = append(args, "--conf", fmt.Sprintf("%s=%s", property, "true")) + // If Spark version is less than 3.0.0 or driver pod template is not defined, then the driver pod needs to be mutated by the webhook. + if util.CompareSemanticVersion(app.Spec.SparkVersion, "3.0.0") < 0 || app.Spec.Driver.Template == nil { + property = fmt.Sprintf(common.SparkKubernetesDriverLabelTemplate, common.LabelMutatedBySparkOperator) + args = append(args, "--conf", fmt.Sprintf("%s=%s", property, "true")) + } + property = fmt.Sprintf(common.SparkKubernetesDriverLabelTemplate, common.LabelSubmissionID) args = append(args, "--conf", fmt.Sprintf("%s=%s", property, app.Status.SubmissionID)) @@ -646,6 +654,12 @@ func executorConfOption(app *v1beta2.SparkApplication) ([]string, error) { property = fmt.Sprintf(common.SparkKubernetesExecutorLabelTemplate, common.LabelLaunchedBySparkOperator) args = append(args, "--conf", fmt.Sprintf("%s=%s", property, "true")) + // If Spark version is less than 3.0.0 or executor pod template is not defined, then the executor pods need to be mutated by the webhook. + if util.CompareSemanticVersion(app.Spec.SparkVersion, "3.0.0") < 0 || app.Spec.Executor.Template == nil { + property = fmt.Sprintf(common.SparkKubernetesExecutorLabelTemplate, common.LabelMutatedBySparkOperator) + args = append(args, "--conf", fmt.Sprintf("%s=%s", property, "true")) + } + property = fmt.Sprintf(common.SparkKubernetesExecutorLabelTemplate, common.LabelSubmissionID) args = append(args, "--conf", fmt.Sprintf("%s=%s", property, app.Status.SubmissionID)) @@ -1022,3 +1036,45 @@ func mainApplicationFileOption(app *v1beta2.SparkApplication) ([]string, error) func applicationOption(app *v1beta2.SparkApplication) ([]string, error) { return app.Spec.Arguments, nil } + +// driverPodTemplateOption returns the driver pod template arguments. +func driverPodTemplateOption(app *v1beta2.SparkApplication) ([]string, error) { + if app.Spec.Driver.Template == nil { + return []string{}, nil + } + + podTemplateFile := fmt.Sprintf("/tmp/spark/%s/driver-pod-template.yaml", app.Status.SubmissionID) + if err := util.WriteObjectToFile(app.Spec.Driver.Template, podTemplateFile); err != nil { + return []string{}, err + } + logger.V(1).Info("Created driver pod template file for SparkApplication", "name", app.Name, "namespace", app.Namespace, "file", podTemplateFile) + + args := []string{ + "--conf", + fmt.Sprintf("%s=%s", common.SparkKubernetesDriverPodTemplateFile, podTemplateFile), + "--conf", + fmt.Sprintf("%s=%s", common.SparkKubernetesDriverPodTemplateContainerName, common.SparkDriverContainerName), + } + return args, nil +} + +// executorPodTemplateOption returns the executor pod template arguments. +func executorPodTemplateOption(app *v1beta2.SparkApplication) ([]string, error) { + if app.Spec.Executor.Template == nil { + return []string{}, nil + } + + podTemplateFile := fmt.Sprintf("/tmp/spark/%s/executor-pod-template.yaml", app.Status.SubmissionID) + if err := util.WriteObjectToFile(app.Spec.Executor.Template, podTemplateFile); err != nil { + return []string{}, err + } + logger.V(1).Info("Created executor pod template file for SparkApplication", "name", app.Name, "namespace", app.Namespace, "file", podTemplateFile) + + args := []string{ + "--conf", + fmt.Sprintf("%s=%s", common.SparkKubernetesExecutorPodTemplateFile, podTemplateFile), + "--conf", + fmt.Sprintf("%s=%s", common.SparkKubernetesExecutorPodTemplateContainerName, common.Spark3DefaultExecutorContainerName), + } + return args, nil +} diff --git a/internal/webhook/sparkapplication_defaulter.go b/internal/webhook/sparkapplication_defaulter.go index 661ecf708..9c10ea10c 100644 --- a/internal/webhook/sparkapplication_defaulter.go +++ b/internal/webhook/sparkapplication_defaulter.go @@ -83,32 +83,9 @@ func defaultSparkApplication(app *v1beta2.SparkApplication) { } func defaultDriverSpec(app *v1beta2.SparkApplication) { - if app.Spec.Driver.Cores == nil { - if app.Spec.SparkConf == nil || app.Spec.SparkConf[common.SparkDriverCores] == "" { - app.Spec.Driver.Cores = util.Int32Ptr(1) - } - } - - if app.Spec.Driver.Memory == nil { - if app.Spec.SparkConf == nil || app.Spec.SparkConf[common.SparkDriverMemory] == "" { - app.Spec.Driver.Memory = util.StringPtr("1g") - } - } } func defaultExecutorSpec(app *v1beta2.SparkApplication) { - if app.Spec.Executor.Cores == nil { - if app.Spec.SparkConf == nil || app.Spec.SparkConf[common.SparkExecutorCores] == "" { - app.Spec.Executor.Cores = util.Int32Ptr(1) - } - } - - if app.Spec.Executor.Memory == nil { - if app.Spec.SparkConf == nil || app.Spec.SparkConf[common.SparkExecutorMemory] == "" { - app.Spec.Executor.Memory = util.StringPtr("1g") - } - } - if app.Spec.Executor.Instances == nil { // Check whether dynamic allocation is enabled in application spec. enableDynamicAllocation := app.Spec.DynamicAllocation != nil && app.Spec.DynamicAllocation.Enabled diff --git a/internal/webhook/sparkapplication_validator.go b/internal/webhook/sparkapplication_validator.go index 7b1fd4108..e1dd4f6f6 100644 --- a/internal/webhook/sparkapplication_validator.go +++ b/internal/webhook/sparkapplication_validator.go @@ -117,6 +117,10 @@ func (v *SparkApplicationValidator) ValidateDelete(ctx context.Context, obj runt func (v *SparkApplicationValidator) validateSpec(_ context.Context, app *v1beta2.SparkApplication) error { logger.V(1).Info("Validating SparkApplication spec", "name", app.Name, "namespace", app.Namespace, "state", util.GetApplicationState(app)) + if err := v.validateSparkVersion(app); err != nil { + return err + } + if app.Spec.NodeSelector != nil && (app.Spec.Driver.NodeSelector != nil || app.Spec.Executor.NodeSelector != nil) { return fmt.Errorf("node selector cannot be defined at both SparkApplication and Driver/Executor") } @@ -144,6 +148,16 @@ func (v *SparkApplicationValidator) validateSpec(_ context.Context, app *v1beta2 return nil } +func (v *SparkApplicationValidator) validateSparkVersion(app *v1beta2.SparkApplication) error { + // The pod template feature requires Spark version 3.0.0 or higher. + if app.Spec.Driver.Template != nil || app.Spec.Executor.Template != nil { + if util.CompareSemanticVersion(app.Spec.SparkVersion, "3.0.0") < 0 { + return fmt.Errorf("pod template feature requires Spark version 3.0.0 or higher") + } + } + return nil +} + func (v *SparkApplicationValidator) validateResourceUsage(ctx context.Context, app *v1beta2.SparkApplication) error { logger.V(1).Info("Validating SparkApplication resource usage", "name", app.Name, "namespace", app.Namespace, "state", util.GetApplicationState(app)) diff --git a/pkg/common/spark.go b/pkg/common/spark.go index 24ea5ff31..94ae2c51d 100644 --- a/pkg/common/spark.go +++ b/pkg/common/spark.go @@ -307,6 +307,9 @@ const ( // LabelLaunchedBySparkOperator is a label on Spark pods launched through the Spark Operator. LabelLaunchedBySparkOperator = LabelAnnotationPrefix + "launched-by-spark-operator" + // LabelMutatedBySparkOperator is a label on Spark pods that need to be mutated by webhook. + LabelMutatedBySparkOperator = LabelAnnotationPrefix + "mutated-by-spark-operator" + // LabelSubmissionID is the label that records the submission ID of the current run of an application. LabelSubmissionID = LabelAnnotationPrefix + "submission-id" diff --git a/pkg/util/util.go b/pkg/util/util.go index 850bc209d..25f664dbc 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -19,8 +19,12 @@ package util import ( "fmt" "os" + "path/filepath" "strings" + "golang.org/x/mod/semver" + "sigs.k8s.io/yaml" + "github.com/kubeflow/spark-operator/pkg/common" ) @@ -77,3 +81,40 @@ func Int64Ptr(n int64) *int64 { func StringPtr(s string) *string { return &s } + +// CompareSemanticVersion compares two semantic versions. +func CompareSemanticVersion(v1, v2 string) int { + // Add 'v' prefix if needed + addPrefix := func(s string) string { + if !strings.HasPrefix(s, "v") { + return "v" + s + } + return s + } + return semver.Compare(addPrefix(v1), addPrefix(v2)) +} + +// WriteObjectToFile marshals the given object into a YAML document and writes it to the given file. +func WriteObjectToFile(obj interface{}, filePath string) error { + if err := os.MkdirAll(filepath.Dir(filePath), 0755); err != nil { + return err + } + + file, err := os.Create(filePath) + if err != nil { + return err + } + defer file.Close() + + data, err := yaml.Marshal(obj) + if err != nil { + return err + } + + _, err = file.Write(data) + if err != nil { + return err + } + + return nil +} diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 324ed3580..5f24d4a37 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -21,6 +21,8 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/kubeflow/spark-operator/pkg/common" "github.com/kubeflow/spark-operator/pkg/util" @@ -129,3 +131,73 @@ var _ = Describe("StringPtr", func() { Expect(util.StringPtr(s)).To(Equal(&s)) }) }) + +var _ = Describe("CompareSemanticVersions", func() { + It("Should return 0 if the two versions are equal", func() { + Expect(util.CompareSemanticVersion("1.2.3", "1.2.3")) + Expect(util.CompareSemanticVersion("1.2.3", "v1.2.3")).To(Equal(0)) + }) + + It("Should return -1 if the first version is less than the second version", func() { + Expect(util.CompareSemanticVersion("2.3.4", "2.4.5")).To(Equal(-1)) + Expect(util.CompareSemanticVersion("2.4.5", "2.4.8")).To(Equal(-1)) + Expect(util.CompareSemanticVersion("2.4.8", "3.5.2")).To(Equal(-1)) + }) + + It("Should return +1 if the first version is greater than the second version", func() { + Expect(util.CompareSemanticVersion("2.4.5", "2.3.4")).To(Equal(1)) + Expect(util.CompareSemanticVersion("2.4.8", "2.4.5")).To(Equal(1)) + Expect(util.CompareSemanticVersion("3.5.2", "2.4.8")).To(Equal(1)) + }) +}) + +var _ = Describe("WriteObjectToFile", func() { + It("Should write the object to the file", func() { + podTemplate := &corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Labels: map[string]string{ + "key1": "value1", + "key2": "value2", + }, + Annotations: map[string]string{ + "key3": "value3", + "key4": "value4", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Image: "test-image", + }, + }, + }, + } + + expected := `metadata: + annotations: + key3: value3 + key4: value4 + creationTimestamp: null + labels: + key1: value1 + key2: value2 + name: test-pod +spec: + containers: + - image: test-image + name: test-container + resources: {} +` + file := "pod-template.yaml" + Expect(util.WriteObjectToFile(podTemplate, file)).To(Succeed()) + + data, err := os.ReadFile(file) + Expect(err).NotTo(HaveOccurred()) + actual := string(data) + + Expect(actual).To(Equal(expected)) + Expect(os.Remove(file)).NotTo(HaveOccurred()) + }) +}) From 1e864c8b91562d065e1f86453347b52beb8703ee Mon Sep 17 00:00:00 2001 From: Yi Chen Date: Thu, 24 Oct 2024 16:27:30 +0800 Subject: [PATCH 3/3] Add workflow for releasing sparkctl binary (#2264) Signed-off-by: Yi Chen --- .github/workflows/release.yaml | 54 ++++++++++++++++++++++++++++++---- 1 file changed, 49 insertions(+), 5 deletions(-) diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index cd9f09a5b..1ea2a4acd 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -65,12 +65,54 @@ jobs: echo "Tag '${VERSION}' does not exist." fi - build_images: + release_sparkctl: needs: - check-release runs-on: ubuntu-latest + strategy: + fail-fast: true + matrix: + os: + - linux + - darwin + arch: + - amd64 + - arm64 + + env: + GOOS: ${{ matrix.os }} + GOARCH: ${{ matrix.arch }} + + steps: + - name: Checkout source code + uses: actions/checkout@v4 + + - name: Read version from VERSION file + run: | + VERSION=$(cat VERSION | sed "s/^v//") + echo "VERSION=${VERSION}" >> $GITHUB_ENV + + - name: Build sparkctl binary + run: | + make build-sparkctl + tar -czvf sparkctl-${VERSION}-${GOOS}-${GOARCH}.tgz -C bin sparkctl + + - name: Upload sparkctl binary + uses: actions/upload-artifact@v4 + with: + name: sparkctl-${{ env.VERSION }}-${{ env.GOOS }}-${{ env.GOARCH }} + path: sparkctl-${{ env.VERSION }}-${{ env.GOOS }}-${{ env.GOARCH }}.tgz + if-no-files-found: error + retention-days: 1 + + build_images: + needs: + - release_sparkctl + + runs-on: ubuntu-latest + strategy: fail-fast: false matrix: @@ -90,10 +132,6 @@ jobs: - name: Read version from VERSION file run: | VERSION=$(cat VERSION) - if [[ ! ${VERSION} =~ ${{ env.SEMVER_PATTERN }} ]]; then - echo "Version '${VERSION}' does not match semver pattern." - exit 1 - fi echo "VERSION=${VERSION}" >> $GITHUB_ENV - name: Docker meta @@ -250,6 +288,11 @@ jobs: helm package charts/${chart} done + - name: Download artifacts + uses: actions/download-artifact@v4 + with: + pattern: sparkctl-* + - name: Release id: release uses: softprops/action-gh-release@v2 @@ -262,3 +305,4 @@ jobs: draft: true files: | *.tgz + sparkctl-*/sparkctl-*.tgz