Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add dag processor deployment #577

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
25 changes: 25 additions & 0 deletions charts/airflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,31 @@ Parameter | Description | Default

</details>

<details>
<summary><code>dagProcessor.*</code></summary>

Parameter | Description | Default
--- | --- | ---
`dagProcessor.enabled` | if the dag processor should be deployed | `true`
`dagProcessor.replicas` | the number of dag processor Pods to run | `1`
`dagProcessor.resources` | resource requests/limits for the airflow dag processor Pods | `{}`
`dagProcessor.nodeSelector` | the nodeSelector configs for the dag processor Pods | `{}`
`dagProcessor.affinity` | the affinity configs for the dag processor Pods | `{}`
`dagProcessor.tolerations` | the toleration configs for the dag processor Pods | `[]`
`dagProcessor.securityContext` | the security context for the dag processor Pods | `{}`
`dagProcessor.labels` | labels for the dag processor Deployment | `{}`
`dagProcessor.podLabels` | Pod labels for the dag processor Deployment | `{}`
`dagProcessor.annotations` | annotations for the dag processor Deployment | `{}`
`dagProcessor.podAnnotations` | Pod annotations for the dag processor Deployment | `{}`
`dagProcessor.safeToEvict` | if we add the annotation: "cluster-autoscaler.kubernetes.io/safe-to-evict" = "true" | `true`
`dagProcessor.podDisruptionBudget.*` | configs for the PodDisruptionBudget of the dag processor Deployment | `<see values.yaml>`
`dagProcessor.livenessProbe.*` | liveness probe for the dag processor Pods | `<see values.yaml>`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no livenessProbe we should remove this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a liveness probe, please give it a look

`dagProcessor.extraPipPackages` | extra pip packages to install in the dag processor Pods | `[]`
`dagProcessor.extraVolumeMounts` | extra VolumeMounts for the dag processor Pods | `[]`
`dagProcessor.extraVolumes` | extra Volumes for the dag processor Pods | `[]`

</details>

<details>
<summary><code>flower.*</code></summary>

Expand Down
9 changes: 9 additions & 0 deletions charts/airflow/templates/config/secret-config-envs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,15 @@ data:
{{- end }}
{{- end }}

## ================
## Airflow Configs (Dag Processor)
## ================
{{- if .Values.dagProcessor.enabled }}
{{- if not .Values.airflow.config.AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR }}
AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR: {{ "true" | b64enc | quote }}
{{- end }}
{{- end }}

## ================
## Airflow Configs (Logging)
## ================
Expand Down
137 changes: 137 additions & 0 deletions charts/airflow/templates/dag-processor/dag-processor-deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
{{- if .Values.dagProcessor.enabled }}
{{- $podNodeSelector := include "airflow.podNodeSelector" (dict "Release" .Release "Values" .Values "nodeSelector" .Values.dagProcessor.nodeSelector) }}
{{- $podAffinity := include "airflow.podAffinity" (dict "Release" .Release "Values" .Values "affinity" .Values.dagProcessor.affinity) }}
{{- $podTolerations := include "airflow.podTolerations" (dict "Release" .Release "Values" .Values "tolerations" .Values.dagProcessor.tolerations) }}
{{- $podSecurityContext := include "airflow.podSecurityContext" (dict "Release" .Release "Values" .Values "securityContext" .Values.dagProcessor.securityContext) }}
{{- $extraPipPackages := concat .Values.airflow.extraPipPackages .Values.dagProcessor.extraPipPackages }}
{{- $extraVolumeMounts := .Values.dagProcessor.extraVolumeMounts }}
{{- $volumeMounts := include "airflow.volumeMounts" (dict "Release" .Release "Values" .Values "extraPipPackages" $extraPipPackages "extraVolumeMounts" $extraVolumeMounts) }}
{{- $extraVolumes := .Values.dagProcessor.extraVolumes }}
{{- $volumes := include "airflow.volumes" (dict "Release" .Release "Values" .Values "extraPipPackages" $extraPipPackages "extraVolumes" $extraVolumes) }}
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "airflow.fullname" . }}-dag-processor
{{- if .Values.dagProcessor.annotations }}
annotations:
{{- toYaml .Values.dagProcessor.annotations | nindent 4 }}
{{- end }}
labels:
app: {{ include "airflow.labels.app" . }}
component: dag-processor
chart: {{ include "airflow.labels.chart" . }}
release: {{ .Release.Name }}
heritage: {{ .Release.Service }}
{{- if .Values.dagProcessor.labels }}
{{- toYaml .Values.dagProcessor.labels | nindent 4 }}
{{- end }}
spec:
replicas: {{ .Values.dagProcessor.replicas }}
strategy:
type: RollingUpdate
rollingUpdate:
## multiple dag-processor pods can run concurrently
maxSurge: 25%
maxUnavailable: 0
selector:
matchLabels:
app: {{ include "airflow.labels.app" . }}
component: dag-processor
release: {{ .Release.Name }}
template:
metadata:
annotations:
checksum/secret-config-envs: {{ include (print $.Template.BasePath "/config/secret-config-envs.yaml") . | sha256sum }}
checksum/secret-local-settings: {{ include (print $.Template.BasePath "/config/secret-local-settings.yaml") . | sha256sum }}
{{- if .Values.airflow.podAnnotations }}
{{- toYaml .Values.airflow.podAnnotations | nindent 8 }}
{{- end }}
{{- if .Values.dagProcessor.podAnnotations }}
{{- toYaml .Values.dagProcessor.podAnnotations | nindent 8 }}
{{- end }}
{{- if .Values.dagProcessor.safeToEvict }}
cluster-autoscaler.kubernetes.io/safe-to-evict: "true"
{{- end }}
labels:
app: {{ include "airflow.labels.app" . }}
component: dag-processor
release: {{ .Release.Name }}
{{- if .Values.dagProcessor.podLabels }}
{{- toYaml .Values.dagProcessor.podLabels | nindent 8 }}
{{- end }}
spec:
restartPolicy: Always
{{- if .Values.airflow.image.pullSecret }}
imagePullSecrets:
- name: {{ .Values.airflow.image.pullSecret }}
{{- end }}
{{- if $podNodeSelector }}
nodeSelector:
{{- $podNodeSelector | nindent 8 }}
{{- end }}
{{- if $podAffinity }}
affinity:
{{- $podAffinity | nindent 8 }}
{{- end }}
{{- if $podTolerations }}
tolerations:
{{- $podTolerations | nindent 8 }}
{{- end }}
serviceAccountName: {{ include "airflow.serviceAccountName" . }}
{{- if $podSecurityContext }}
securityContext:
{{- $podSecurityContext | nindent 8 }}
{{- end }}
initContainers:
{{- if $extraPipPackages }}
{{- include "airflow.init_container.install_pip_packages" (dict "Release" .Release "Values" .Values "extraPipPackages" $extraPipPackages) | indent 8 }}
{{- end }}
{{- if and (.Values.dags.gitSync.enabled) (not .Values.dags.persistence.enabled) }}
{{- include "airflow.container.git_sync" (dict "Release" .Release "Values" .Values "sync_one_time" "true") | indent 8 }}
{{- end }}
{{- include "airflow.init_container.check_db" (dict "Release" .Release "Values" .Values "volumeMounts" $volumeMounts) | indent 8 }}
{{- include "airflow.init_container.wait_for_db_migrations" (dict "Release" .Release "Values" .Values "volumeMounts" $volumeMounts) | indent 8 }}
containers:
- name: airflow-dag-processor
{{- include "airflow.image" . | indent 10 }}
resources:
{{- toYaml .Values.dagProcessor.resources | nindent 12 }}
envFrom:
{{- include "airflow.envFrom" . | indent 12 }}
env:
{{- include "airflow.env" . | indent 12 }}
command:
{{- include "airflow.command" . | indent 12 }}
args:
- "bash"
- "-c"
- "exec airflow dag-processor"
{{- if .Values.dagProcessor.livenessProbe.enabled }}
livenessProbe:
initialDelaySeconds: {{ .Values.dagProcessor.livenessProbe.initialDelaySeconds }}
periodSeconds: {{ .Values.dagProcessor.livenessProbe.periodSeconds }}
failureThreshold: {{ .Values.dagProcessor.livenessProbe.failureThreshold }}
timeoutSeconds: {{ .Values.dagProcessor.livenessProbe.timeoutSeconds }}
exec:
command:
{{- include "airflow.command" . | indent 4 }}
args:
- "bash"
- "-c"
- CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR exec airflow jobs check
{{- end }}
{{- if $volumeMounts }}
volumeMounts:
{{- $volumeMounts | indent 12 }}
{{- end }}
{{- if and (.Values.dags.gitSync.enabled) (not .Values.dags.persistence.enabled) }}
{{- include "airflow.container.git_sync" . | indent 8 }}
{{- end }}
{{- if .Values.airflow.extraContainers }}
{{- toYaml .Values.airflow.extraContainers | nindent 8 }}
{{- end }}
{{- if $volumes }}
volumes:
{{- $volumes | indent 8 }}
{{- end }}
{{- end }}
24 changes: 24 additions & 0 deletions charts/airflow/templates/dag-processor/dag-processor-pdb.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{{- if and (.Values.dagProcessor.enabled) (.Values.dagProcessor.podDisruptionBudget.enabled) }}
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
name: {{ include "airflow.fullname" . }}-dag-processor
labels:
app: {{ include "airflow.labels.app" . }}
component: dag-processor
chart: {{ include "airflow.labels.chart" . }}
release: {{ .Release.Name }}
heritage: {{ .Release.Service }}
spec:
{{- if .Values.dagProcessor.podDisruptionBudget.maxUnavailable }}
maxUnavailable: {{ .Values.dagProcessor.podDisruptionBudget.maxUnavailable }}
{{- end }}
{{- if .Values.dagProcessor.podDisruptionBudget.minAvailable }}
minAvailable: {{ .Values.dagProcessor.podDisruptionBudget.minAvailable }}
{{- end }}
selector:
matchLabels:
app: {{ include "airflow.labels.app" . }}
component: dag-processor
release: {{ .Release.Name }}
{{- end }}
109 changes: 109 additions & 0 deletions charts/airflow/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,115 @@ triggerer:
##
extraVolumes: []

###################################
## COMPONENT | Dag Processor
###################################
dagProcessor:
## if the airflow dag processor should be deployed
## - [WARNING] the dag processor component was added in airflow 2.3.0
##
enabled: false

## the number of dag processor Pods to run
## - if you set this >1 we recommend defining a `dagProcessor.podDisruptionBudget`
##
replicas: 1

## resource requests/limits for the dag processor Pods
## - spec for ResourceRequirements:
## https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#resourcerequirements-v1-core
##
resources: {}

## the nodeSelector configs for the dag processor Pods
## - docs for nodeSelector:
## https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#nodeselector
##
nodeSelector: {}

## the affinity configs for the dag processor Pods
## - spec for Affinity:
## https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#affinity-v1-core
##
affinity: {}

## the toleration configs for the dag processor Pods
## - spec for Toleration:
## https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#toleration-v1-core
##
tolerations: []

## the security context for the dag processor Pods
## - spec for PodSecurityContext:
## https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#podsecuritycontext-v1-core
##
securityContext: {}

## labels for the dag processor Deployment
##
labels: {}

## Pod labels for the dag processor Deployment
##
podLabels: {}

## annotations for the dag processor Deployment
##
annotations: {}

## Pod annotations for the dag processor Deployment
##
podAnnotations: {}

## if we add the annotation: "cluster-autoscaler.kubernetes.io/safe-to-evict" = "true"
##
safeToEvict: true

## configs for the PodDisruptionBudget of the dag processor Deployment
##
podDisruptionBudget:
## if a PodDisruptionBudget resource is created for the dag processor Deployment
##
enabled: false

## the maximum unavailable pods/percentage for the dag processor Deployment
##
maxUnavailable: ""

## the minimum available pods/percentage for the dag processor Deployment
##
minAvailable: ""

## configs for the dag processor Pods' liveness probe
##
livenessProbe:
enabled: true
initialDelaySeconds: 10
periodSeconds: 30
timeoutSeconds: 60
failureThreshold: 5
Comment on lines +1161 to +1166
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not have these values if we don't actually use them.


## extra pip packages to install in the dag processor Pod
##
## ____ EXAMPLE _______________
## extraPipPackages:
## - "SomeProject==1.0.0"
##
extraPipPackages: []

## extra VolumeMounts for the dag processor Pods
## - spec for VolumeMount:
## https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#volumemount-v1-core
##
extraVolumeMounts: []

## extra Volumes for the dag processor Pods
## - spec for Volume:
## https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#volume-v1-core
##
extraVolumes: []


###################################
## COMPONENT | Flower
###################################
Expand Down