Skip to content

Commit

Permalink
yarn-operator: only start yarn node syncer on leader (#53)
Browse files Browse the repository at this point in the history
Signed-off-by: 佑祎 <[email protected]>
  • Loading branch information
zwzhang0107 authored Nov 17, 2023
1 parent 93389f4 commit 2570b0a
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 22 deletions.
8 changes: 5 additions & 3 deletions charts/hadoop-yarn/templates/hadoop-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,21 @@ data:
cat >> $HADOOP_HOME/etc/hadoop/yarn-site.xml <<- EOM
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>${NM_INIT_MEMORY_MB:-2048}</value>
<value>${NM_INIT_MEMORY_MB:-1024}</value>
</property>
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>${NM_INIT_CPU_CORES:-2}</value>
<value>${NM_INIT_CPU_CORES:-1}</value>
</property>
<property>
<name>yarn.nodemanager.address</name>
<value>${HOSTNAME}:8041</value>
</property>
EOM
# annotate nm id on pod
kubectl annotate pod -n ${POD_NAMESPACE} ${POD_NAME} yarn.hadoop.apache.org/node-id=${HOSTNAME}:8041
echo '</configuration>' >> $HADOOP_HOME/etc/hadoop/yarn-site.xml
# Wait with timeout for resourcemanager
Expand All @@ -89,7 +92,6 @@ data:
echo "$0: Timeout waiting for $TMP_URL, exiting."
exit 1
fi
fi
# ------------------------------------------------------
Expand Down
36 changes: 36 additions & 0 deletions charts/hadoop-yarn/templates/rbac.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: yarn-nodemanager
namespace: {{ .Values.installation.namespace }}
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: yarn-nodemanager-role
namespace: {{ .Values.installation.namespace }}
rules:
- apiGroups:
- ""
resources:
- pods
verbs:
- patch
- update
- get
- list
- watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: yarn-nodemanager-rolebinding
namespace : {{ .Values.installation.namespace }}
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: yarn-nodemanager-role
subjects:
- kind: ServiceAccount
name: yarn-nodemanager
namespace: {{ .Values.installation.namespace }}
13 changes: 11 additions & 2 deletions charts/hadoop-yarn/templates/yarn-nm-statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ spec:
app.kubernetes.io/name: {{ include "hadoop-yarn.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/component: node-manager
serviceAccountName: yarn-nodemanager
setHostnameAsFQDN: true
terminationGracePeriodSeconds: 0
dnsPolicy: ClusterFirst
Expand Down Expand Up @@ -93,9 +94,17 @@ spec:
- name: YARN_ROLE
value: yarn-nm
- name: NM_INIT_CPU_CORES
value: "4"
value: "{{ .Values.yarn.nodeManager.initCPUVCores }}"
- name: NM_INIT_MEMORY_MB
value: "4096"
value: "{{ .Values.yarn.nodeManager.initMemoryMB }}"
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
volumeMounts:
- name: hadoop-config
mountPath: /tmp/hadoop-config
Expand Down
6 changes: 5 additions & 1 deletion charts/hadoop-yarn/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ installation:
image:
repository: registry.cn-hangzhou.aliyuncs.com/koordinator-sh/apache-hadoop
tag: 3.3.2-v1.0
pullPolicy: IfNotPresent
pullPolicy: Always

# The version of the hadoop libraries being used in the image.
hadoopVersion: 3.3.2
Expand Down Expand Up @@ -50,6 +50,10 @@ yarn:

serviceName: node-manager

# initial cpu and memory of nm reported
initCPUVCores: 1
initMemoryMB: 1024

config:
yarnSite:
cgroupsHierarchy: /kubepods.slice/kubepods-besteffort.slice/hadoop-yarn
Expand Down
14 changes: 11 additions & 3 deletions docker/hadoop-yarn.dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM alpine:3.14 as BUILDER

ENV HADOOP_VERSION 3.3.2
ENV HADOOP_VERSION 3.3.3
ENV SPARK_VERSION 3.3.3

RUN apk update \
Expand All @@ -14,7 +14,7 @@ RUN curl -s -o /tmp/spark.tgz https://mirrors.aliyun.com/apache/spark/spark-${SP

FROM openjdk:8

ENV HADOOP_VERSION 3.3.2
ENV HADOOP_VERSION 3.3.3
ENV SPARK_VERSION 3.3.3
ENV SPARK_HOME=/opt/spark
ENV HADOOP_HOME=/opt/hadoop
Expand All @@ -29,6 +29,14 @@ ENV HADOOP_COMMON_HOME=${HADOOP_HOME} \
COPY --from=BUILDER /opt/hadoop-${HADOOP_VERSION} ${HADOOP_HOME}
COPY --from=BUILDER /opt/spark-${SPARK_VERSION}-bin-hadoop3 ${SPARK_HOME}

RUN apt-get update && apt-get install -y dnsutils
RUN apt-get update && apt-get install -y apt-transport-https
RUN curl https://mirrors.aliyun.com/kubernetes/apt/doc/apt-key.gpg | apt-key add -
RUN echo 'deb https://mirrors.aliyun.com/kubernetes/apt/ kubernetes-xenial main' >> /etc/apt/sources.list.d/kubernetes.list
#RUN cat <<EOF >/etc/apt/sources.list.d/kubernetes.list \
# deb https://mirrors.aliyun.com/kubernetes/apt/ kubernetes-xenial main \
# EOF

RUN apt-get update
RUN apt-get install -y kubectl dnsutils

WORKDIR $HADOOP_HOME
13 changes: 9 additions & 4 deletions pkg/controller/noderesource/resource_sync_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ func (r *YARNResourceSyncReconciler) Reconcile(ctx context.Context, req reconcil
// TODO exclude batch pod requested
batchCPU, batchMemory, err := getNodeBatchResource(node)
if err != nil {
return ctrl.Result{}, err
return ctrl.Result{Requeue: true}, err
}
klog.V(4).Infof("get node batch resource cpu: %d, memory: %d, name: %s", batchCPU.Value(), batchMemory.Value(), node.Name)

vcores, memoryMB := calculate(batchCPU, batchMemory)

// TODO control update frequency
// TODO control update frequency by ignore unnecessary node update event
if err := r.updateYARNNodeResource(yarnNode, vcores, memoryMB); err != nil {
klog.Warningf("update batch resource to yarn node %+v failed, k8s node name: %s, error %v", yarnNode, node.Name, err)
return ctrl.Result{Requeue: true}, err
Expand All @@ -98,7 +98,7 @@ func (r *YARNResourceSyncReconciler) Reconcile(ctx context.Context, req reconcil

core, mb, err := r.getYARNNodeAllocatedResource(yarnNode)
if err != nil {
return reconcile.Result{}, err
return reconcile.Result{Requeue: true}, err
}
if err := r.updateYARNAllocatedResource(node, core, mb); err != nil {
klog.Warningf("failed to update yarn allocated resource for node %v, error %v", node.Name, err)
Expand Down Expand Up @@ -171,7 +171,7 @@ func Add(mgr ctrl.Manager) error {
return err
}
yarnNodesSyncer := cache.NewNodesSyncer(clients)
go yarnNodesSyncer.Sync()

coll := yarnmetrics.NewYarnMetricCollector(yarnNodesSyncer)
if err = metrics.Registry.Register(coll); err != nil {
return err
Expand All @@ -181,10 +181,15 @@ func Add(mgr ctrl.Manager) error {
yarnClients: clients,
yarnNodeCache: yarnNodesSyncer,
}

if err := mgr.Add(yarnNodesSyncer); err != nil {
return err
}
return r.SetupWithManager(mgr)
}

func (r *YARNResourceSyncReconciler) SetupWithManager(mgr ctrl.Manager) error {
// TODO use source.Channel to handle yarn node requested update event
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Node{}).
Named(Name).
Expand Down
25 changes: 16 additions & 9 deletions pkg/yarn/cache/nodes_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package cache

import (
"context"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -67,19 +68,25 @@ func (r *NodesSyncer) getKey(yarnNodeName string, yarnNodePort int32) string {
return fmt.Sprintf("%s-%d", yarnNodeName, yarnNodePort)
}

func (r *NodesSyncer) Sync() {
func (r *NodesSyncer) Start(ctx context.Context) error {
t := time.NewTicker(syncInterval)
debug := time.NewTicker(syncInterval * 10)
for {
select {
case <-t.C:
if err := r.syncYARNNodeAllocatedResource(); err != nil {
klog.Errorf("sync yarn node allocated resource failed, error: %v", err)
go func() {
for {
select {
case <-t.C:
if err := r.syncYARNNodeAllocatedResource(); err != nil {
klog.Errorf("sync yarn node allocated resource failed, error: %v", err)
}
case <-debug.C:
r.debug()
case <-ctx.Done():
klog.V(1).Infof("stop node syncer")
return
}
case <-debug.C:
r.debug()
}
}
}()
return nil
}

func (r *NodesSyncer) debug() {
Expand Down

0 comments on commit 2570b0a

Please sign in to comment.