From 2570b0a0fcb8117da4a81aed827a5223ac0ed38b Mon Sep 17 00:00:00 2001 From: zwzhang Date: Fri, 17 Nov 2023 10:42:17 +0800 Subject: [PATCH] yarn-operator: only start yarn node syncer on leader (#53) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 佑祎 --- .../templates/hadoop-configmap.yaml | 8 +++-- charts/hadoop-yarn/templates/rbac.yaml | 36 +++++++++++++++++++ .../templates/yarn-nm-statefulset.yaml | 13 +++++-- charts/hadoop-yarn/values.yaml | 6 +++- docker/hadoop-yarn.dockerfile | 14 ++++++-- .../noderesource/resource_sync_controller.go | 13 ++++--- pkg/yarn/cache/nodes_syncer.go | 25 ++++++++----- 7 files changed, 93 insertions(+), 22 deletions(-) create mode 100644 charts/hadoop-yarn/templates/rbac.yaml diff --git a/charts/hadoop-yarn/templates/hadoop-configmap.yaml b/charts/hadoop-yarn/templates/hadoop-configmap.yaml index 8ad98c0e..b6d8d1b1 100644 --- a/charts/hadoop-yarn/templates/hadoop-configmap.yaml +++ b/charts/hadoop-yarn/templates/hadoop-configmap.yaml @@ -67,11 +67,11 @@ data: cat >> $HADOOP_HOME/etc/hadoop/yarn-site.xml <<- EOM yarn.nodemanager.resource.memory-mb - ${NM_INIT_MEMORY_MB:-2048} + ${NM_INIT_MEMORY_MB:-1024} yarn.nodemanager.resource.cpu-vcores - ${NM_INIT_CPU_CORES:-2} + ${NM_INIT_CPU_CORES:-1} yarn.nodemanager.address @@ -79,6 +79,9 @@ data: EOM + # annotate nm id on pod + kubectl annotate pod -n ${POD_NAMESPACE} ${POD_NAME} yarn.hadoop.apache.org/node-id=${HOSTNAME}:8041 + echo '' >> $HADOOP_HOME/etc/hadoop/yarn-site.xml # Wait with timeout for resourcemanager @@ -89,7 +92,6 @@ data: echo "$0: Timeout waiting for $TMP_URL, exiting." exit 1 fi - fi # ------------------------------------------------------ diff --git a/charts/hadoop-yarn/templates/rbac.yaml b/charts/hadoop-yarn/templates/rbac.yaml new file mode 100644 index 00000000..91c460df --- /dev/null +++ b/charts/hadoop-yarn/templates/rbac.yaml @@ -0,0 +1,36 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: yarn-nodemanager + namespace: {{ .Values.installation.namespace }} +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: yarn-nodemanager-role + namespace: {{ .Values.installation.namespace }} +rules: + - apiGroups: + - "" + resources: + - pods + verbs: + - patch + - update + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: yarn-nodemanager-rolebinding + namespace : {{ .Values.installation.namespace }} +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: yarn-nodemanager-role +subjects: + - kind: ServiceAccount + name: yarn-nodemanager + namespace: {{ .Values.installation.namespace }} \ No newline at end of file diff --git a/charts/hadoop-yarn/templates/yarn-nm-statefulset.yaml b/charts/hadoop-yarn/templates/yarn-nm-statefulset.yaml index 7c229f6c..6f7090d7 100644 --- a/charts/hadoop-yarn/templates/yarn-nm-statefulset.yaml +++ b/charts/hadoop-yarn/templates/yarn-nm-statefulset.yaml @@ -42,6 +42,7 @@ spec: app.kubernetes.io/name: {{ include "hadoop-yarn.name" . }} app.kubernetes.io/instance: {{ .Release.Name }} app.kubernetes.io/component: node-manager + serviceAccountName: yarn-nodemanager setHostnameAsFQDN: true terminationGracePeriodSeconds: 0 dnsPolicy: ClusterFirst @@ -93,9 +94,17 @@ spec: - name: YARN_ROLE value: yarn-nm - name: NM_INIT_CPU_CORES - value: "4" + value: "{{ .Values.yarn.nodeManager.initCPUVCores }}" - name: NM_INIT_MEMORY_MB - value: "4096" + value: "{{ .Values.yarn.nodeManager.initMemoryMB }}" + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name volumeMounts: - name: hadoop-config mountPath: /tmp/hadoop-config diff --git a/charts/hadoop-yarn/values.yaml b/charts/hadoop-yarn/values.yaml index bad1573a..a0aa6e98 100644 --- a/charts/hadoop-yarn/values.yaml +++ b/charts/hadoop-yarn/values.yaml @@ -4,7 +4,7 @@ installation: image: repository: registry.cn-hangzhou.aliyuncs.com/koordinator-sh/apache-hadoop tag: 3.3.2-v1.0 - pullPolicy: IfNotPresent + pullPolicy: Always # The version of the hadoop libraries being used in the image. hadoopVersion: 3.3.2 @@ -50,6 +50,10 @@ yarn: serviceName: node-manager + # initial cpu and memory of nm reported + initCPUVCores: 1 + initMemoryMB: 1024 + config: yarnSite: cgroupsHierarchy: /kubepods.slice/kubepods-besteffort.slice/hadoop-yarn diff --git a/docker/hadoop-yarn.dockerfile b/docker/hadoop-yarn.dockerfile index 29b6112a..0bfb924f 100644 --- a/docker/hadoop-yarn.dockerfile +++ b/docker/hadoop-yarn.dockerfile @@ -1,6 +1,6 @@ FROM alpine:3.14 as BUILDER -ENV HADOOP_VERSION 3.3.2 +ENV HADOOP_VERSION 3.3.3 ENV SPARK_VERSION 3.3.3 RUN apk update \ @@ -14,7 +14,7 @@ RUN curl -s -o /tmp/spark.tgz https://mirrors.aliyun.com/apache/spark/spark-${SP FROM openjdk:8 -ENV HADOOP_VERSION 3.3.2 +ENV HADOOP_VERSION 3.3.3 ENV SPARK_VERSION 3.3.3 ENV SPARK_HOME=/opt/spark ENV HADOOP_HOME=/opt/hadoop @@ -29,6 +29,14 @@ ENV HADOOP_COMMON_HOME=${HADOOP_HOME} \ COPY --from=BUILDER /opt/hadoop-${HADOOP_VERSION} ${HADOOP_HOME} COPY --from=BUILDER /opt/spark-${SPARK_VERSION}-bin-hadoop3 ${SPARK_HOME} -RUN apt-get update && apt-get install -y dnsutils +RUN apt-get update && apt-get install -y apt-transport-https +RUN curl https://mirrors.aliyun.com/kubernetes/apt/doc/apt-key.gpg | apt-key add - +RUN echo 'deb https://mirrors.aliyun.com/kubernetes/apt/ kubernetes-xenial main' >> /etc/apt/sources.list.d/kubernetes.list +#RUN cat </etc/apt/sources.list.d/kubernetes.list \ +# deb https://mirrors.aliyun.com/kubernetes/apt/ kubernetes-xenial main \ +# EOF + +RUN apt-get update +RUN apt-get install -y kubectl dnsutils WORKDIR $HADOOP_HOME \ No newline at end of file diff --git a/pkg/controller/noderesource/resource_sync_controller.go b/pkg/controller/noderesource/resource_sync_controller.go index 00029448..07fd7548 100644 --- a/pkg/controller/noderesource/resource_sync_controller.go +++ b/pkg/controller/noderesource/resource_sync_controller.go @@ -82,13 +82,13 @@ func (r *YARNResourceSyncReconciler) Reconcile(ctx context.Context, req reconcil // TODO exclude batch pod requested batchCPU, batchMemory, err := getNodeBatchResource(node) if err != nil { - return ctrl.Result{}, err + return ctrl.Result{Requeue: true}, err } klog.V(4).Infof("get node batch resource cpu: %d, memory: %d, name: %s", batchCPU.Value(), batchMemory.Value(), node.Name) vcores, memoryMB := calculate(batchCPU, batchMemory) - // TODO control update frequency + // TODO control update frequency by ignore unnecessary node update event if err := r.updateYARNNodeResource(yarnNode, vcores, memoryMB); err != nil { klog.Warningf("update batch resource to yarn node %+v failed, k8s node name: %s, error %v", yarnNode, node.Name, err) return ctrl.Result{Requeue: true}, err @@ -98,7 +98,7 @@ func (r *YARNResourceSyncReconciler) Reconcile(ctx context.Context, req reconcil core, mb, err := r.getYARNNodeAllocatedResource(yarnNode) if err != nil { - return reconcile.Result{}, err + return reconcile.Result{Requeue: true}, err } if err := r.updateYARNAllocatedResource(node, core, mb); err != nil { klog.Warningf("failed to update yarn allocated resource for node %v, error %v", node.Name, err) @@ -171,7 +171,7 @@ func Add(mgr ctrl.Manager) error { return err } yarnNodesSyncer := cache.NewNodesSyncer(clients) - go yarnNodesSyncer.Sync() + coll := yarnmetrics.NewYarnMetricCollector(yarnNodesSyncer) if err = metrics.Registry.Register(coll); err != nil { return err @@ -181,10 +181,15 @@ func Add(mgr ctrl.Manager) error { yarnClients: clients, yarnNodeCache: yarnNodesSyncer, } + + if err := mgr.Add(yarnNodesSyncer); err != nil { + return err + } return r.SetupWithManager(mgr) } func (r *YARNResourceSyncReconciler) SetupWithManager(mgr ctrl.Manager) error { + // TODO use source.Channel to handle yarn node requested update event return ctrl.NewControllerManagedBy(mgr). For(&corev1.Node{}). Named(Name). diff --git a/pkg/yarn/cache/nodes_syncer.go b/pkg/yarn/cache/nodes_syncer.go index 0546b5a5..043d767d 100644 --- a/pkg/yarn/cache/nodes_syncer.go +++ b/pkg/yarn/cache/nodes_syncer.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "context" "fmt" "sync" "time" @@ -67,19 +68,25 @@ func (r *NodesSyncer) getKey(yarnNodeName string, yarnNodePort int32) string { return fmt.Sprintf("%s-%d", yarnNodeName, yarnNodePort) } -func (r *NodesSyncer) Sync() { +func (r *NodesSyncer) Start(ctx context.Context) error { t := time.NewTicker(syncInterval) debug := time.NewTicker(syncInterval * 10) - for { - select { - case <-t.C: - if err := r.syncYARNNodeAllocatedResource(); err != nil { - klog.Errorf("sync yarn node allocated resource failed, error: %v", err) + go func() { + for { + select { + case <-t.C: + if err := r.syncYARNNodeAllocatedResource(); err != nil { + klog.Errorf("sync yarn node allocated resource failed, error: %v", err) + } + case <-debug.C: + r.debug() + case <-ctx.Done(): + klog.V(1).Infof("stop node syncer") + return } - case <-debug.C: - r.debug() } - } + }() + return nil } func (r *NodesSyncer) debug() {