diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml new file mode 100644 index 0000000..9e22a34 --- /dev/null +++ b/.github/workflows/docker.yml @@ -0,0 +1,55 @@ +name: Publish Docker image + +on: + push: + tags: + - '*' + +jobs: + push_to_registry: + name: Push Docker image to Docker Hub + runs-on: ubuntu-latest + steps: + - name: Check out the repo + uses: actions/checkout@v2 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v1 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v1 + + - name: Cache Docker layers + uses: actions/cache@v2 + with: + path: /tmp/.buildx-cache + key: ${{ runner.os }}-buildx-${{ github.sha }} + restore-keys: | + ${{ runner.os }}-buildx- + + - name: Prepare Build Arg + id: prepare_build_arg + run: | + CURRENT_TAG=${GITHUB_REF#refs/tags/} + echo ::set-output name=CURRENT_TAG::${CURRENT_TAG} + + - name: Log in to Docker Hub + uses: docker/login-action@v1 + with: + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_PASSWORD }} + + - name: Build and push Docker image to Docker Hub + uses: docker/build-push-action@v3 + with: + context: . + platforms: linux/arm64,linux/amd64 + push: true + tags: chatwork/kube-schedule-scaler:${{ steps.prepare_build_arg.outputs.CURRENT_TAG }} + cache-from: type=local,src=/tmp/.buildx-cache + cache-to: type=local,dest=/tmp/.buildx-cache-new + + - name: Move cache + run: | + rm -rf /tmp/.buildx-cache + mv /tmp/.buildx-cache-new /tmp/.buildx-cache diff --git a/Dockerfile b/Dockerfile index 50f5ba3..0107a52 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,23 +1,23 @@ -FROM ubuntu:20.04 +FROM ubuntu:22.04 MAINTAINER "sakamoto@chatwork.com" # Install python tools and dev packages RUN apt-get update \ - && apt-get install -q -y --no-install-recommends python3-pip python3-setuptools python3-wheel gcc cron \ + && apt-get install -q -y --no-install-recommends python3-pip python3-setuptools python3-wheel gcc cron tini \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* # set python 3 as the default python version RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1 \ && update-alternatives --install /usr/bin/pip pip /usr/bin/pip3 1 + +COPY ./requirements.txt /root/ RUN pip3 install --upgrade pip requests setuptools pipenv -RUN pip3 install pykube-ng -RUN pip3 install python-crontab -RUN pip3 install croniter +RUN pip3 install -r /root/requirements.txt ADD schedule_scaling /root/schedule_scaling COPY ./run_missed_jobs.py /root RUN chmod a+x /root/run_missed_jobs.py COPY ./startup.sh /root RUN chmod a+x /root/startup.sh -CMD /root/startup.sh +ENTRYPOINT ["tini", "--", "/root/startup.sh"] diff --git a/README.md b/README.md index 2ebb1d5..8cc9d8f 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,10 @@ At the moment it supports reading the scaling definitions from directly in the a ## Usage +### install + +https://github.com/chatwork/charts/tree/master/kube-schedule-scaler#tldr + ### deployment Add the annotation to either your `Deployment`. @@ -27,7 +31,7 @@ Add the annotation to either your `Deployment`. zalando.org/schedule-actions: '[{"schedule": "10 18 * * *", "minReplicas": "3"}]' ``` -## Available Fields +## Available Fields The following fields are available * `schedule` - Typical crontab format @@ -102,4 +106,4 @@ spec: type: Utilization averageUtilization: 50 ``` - + diff --git a/deploy/deployment.yaml b/deploy/deployment.yaml deleted file mode 100644 index 9499a9d..0000000 --- a/deploy/deployment.yaml +++ /dev/null @@ -1,79 +0,0 @@ -apiVersion: v1 -kind: ServiceAccount -metadata: - name: kube-schedule-scaler - namespace: kube-system ---- -apiVersion: rbac.authorization.k8s.io/v1 -kind: ClusterRole -metadata: - name: kube-schedule-scaler -rules: -- apiGroups: - - "" - resources: - - namespaces - verbs: - - get - - list - - watch -- apiGroups: - - apps - resources: - - deployments - verbs: - - get - - list - - patch - - update -- apiGroups: - - autoscaling - resources: - - horizontalpodautoscalers - verbs: - - get - - list - - patch - - update ---- -apiVersion: rbac.authorization.k8s.io/v1 -kind: ClusterRoleBinding -metadata: - name: kube-schedule-scaler -roleRef: - apiGroup: rbac.authorization.k8s.io - kind: ClusterRole - name: cluster-admin -subjects: -- kind: ServiceAccount - name: kube-schedule-scaler - namespace: kube-system ---- -apiVersion: apps/v1 -kind: Deployment -metadata: - name: kube-schedule-scaler - namespace: kube-system -spec: - replicas: 1 - selector: - matchLabels: - application: kube-schedule-scaler - template: - metadata: - labels: - application: kube-schedule-scaler - spec: - serviceAccountName: kube-schedule-scaler - priorityClassName: system-cluster-critical - containers: - - name: kube-schedule-scaler - image: chatwork/kube-schedule-scaler:edge - imagePullPolicy: Always - resources: - limits: - cpu: 500m - memory: 500Mi - requests: - cpu: 250m - memory: 500Mi diff --git a/examples/example-deployment.yaml b/examples/example-deployment.yaml new file mode 100644 index 0000000..b45b015 --- /dev/null +++ b/examples/example-deployment.yaml @@ -0,0 +1,53 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + annotations: + zalando.org/schedule-actions: | + [ + {"schedule": "45 22 * * *", "replicas": "1"}, + {"schedule": "45 1 * * *", "replicas": "0"} + ] + labels: + app.kubernetes.io/instance: example + app.kubernetes.io/name: example + name: example-deployment + namespace: default +spec: + progressDeadlineSeconds: 600 + replicas: 1 + revisionHistoryLimit: 10 + selector: + matchLabels: + app.kubernetes.io/instance: example + app.kubernetes.io/name: example + strategy: + rollingUpdate: + maxSurge: 25% + maxUnavailable: 25% + type: RollingUpdate + template: + metadata: + labels: + app.kubernetes.io/instance: example + app.kubernetes.io/name: example + spec: + containers: + - image: k8s.gcr.io/pause:latest + imagePullPolicy: IfNotPresent + name: example-pause + resources: + limits: + cpu: 100 + memory: 128Mi + requests: + cpu: 100 + memory: 128Mi + terminationMessagePath: /dev/termination-log + terminationMessagePolicy: File + dnsPolicy: ClusterFirst + priorityClassName: low + restartPolicy: Always + schedulerName: default-scheduler + serviceAccount: default + serviceAccountName: default + terminationGracePeriodSeconds: 30 diff --git a/examples/example-hpa.yaml b/examples/example-hpa.yaml new file mode 100644 index 0000000..4a600d8 --- /dev/null +++ b/examples/example-hpa.yaml @@ -0,0 +1,26 @@ +apiVersion: autoscaling/v2beta2 +kind: HorizontalPodAutoscaler +metadata: + name: example-hpa + namespace: default + annotations: + zalando.org/schedule-actions: | + [ + {"schedule": "00 08 * * *", "minReplicas": "1", "maxReplicas": 3}, + {"schedule": "30 23 * * *", "minReplicas": 2, "maxReplicas": "3"}, + {"schedule": "00 09 * * *", "minReplicas": 1, "maxReplicas": "4"} + ] +spec: + maxReplicas: 2 + minReplicas: 1 + metrics: + - resource: + name: cpu + target: + averageUtilization: 50 + type: Utilization + type: Resource + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: example-deployment diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..210bfd5 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +kubernetes==23.6.0 +croniter==1.3.5 +python-crontab==2.6.0 diff --git a/run_missed_jobs.py b/run_missed_jobs.py index e0f0657..46ae36c 100644 --- a/run_missed_jobs.py +++ b/run_missed_jobs.py @@ -4,18 +4,23 @@ from datetime import datetime from datetime import timedelta -scaling_cron = CronTab(user='root') -scale_jobs = scaling_cron.find_comment('Scheduling_Jobs') - +scaling_cron = CronTab(user="root") print("[INFO]", datetime.now(), "Running the Jobs of the last 5 minutes") -for job in scale_jobs: - schedule = job.schedule(date_from=datetime.now()) - schedule = str(schedule.get_prev()) - schedule = time.strptime(schedule, "%Y-%m-%d %H:%M:%S") - retry_execution_threshold = str(datetime.now() - timedelta(minutes=5)) - retry_execution_threshold = time.strptime(retry_execution_threshold, "%Y-%m-%d %H:%M:%S.%f") +for target in ["Deployment", "HPA"]: + scale_jobs = scaling_cron.find_comment("Scheduling_Jobs_" + target) + + for job in scale_jobs: + # print(job) + schedule = job.schedule(date_from=datetime.now()) + schedule = str(schedule.get_prev()) + schedule = time.strptime(schedule, "%Y-%m-%d %H:%M:%S") + retry_execution_threshold = str(datetime.now() - timedelta(minutes=5)) + retry_execution_threshold = time.strptime( + retry_execution_threshold, "%Y-%m-%d %H:%M:%S.%f" + ) - if schedule > retry_execution_threshold: - schedule_to_execute = str(job).split(";")[2] - os.system(schedule_to_execute) + if schedule > retry_execution_threshold: + # 5 7 * * * . /root/.profile ; /usr/bin/python /root/schedule_scaling/deployment-script.py --namespace ... --deployment ... --replicas 9 2>&1 | tee -a ... # Scheduling_Jobs + schedule_to_execute = str(job).split(";")[1] + os.system(schedule_to_execute) diff --git a/schedule_scaling/deployment-script.py b/schedule_scaling/deployment-script.py new file mode 100644 index 0000000..8860aef --- /dev/null +++ b/schedule_scaling/deployment-script.py @@ -0,0 +1,75 @@ +from kubernetes import client, config +from kubernetes.client.rest import ApiException +import time +import datetime +import random +import os +import sys +import argparse + + +def patch_deployment(client, deployment, namespace, replicas): + body = {"spec": {"replicas": replicas}} + + try: + client.patch_namespaced_deployment(deployment, namespace, body) + except ApiException as err: + print( + "[ERROR]", + datetime.datetime.now(), + "deployment {}/{} has not been patched".format(namespace, deployment), + err, + ) + return + except Exception as err: + print( + "[ERROR]", + datetime.datetime.now(), + "Exception! Something went wrong... HPA {}/{} has not been scaled with body: {}".format( + namespace, + deployment, + body, + err, + ), + ) + return + + print( + "[INFO]", + datetime.datetime.now(), + "Deployment {}/{} has been updated to {}".format(namespace, deployment, body), + ) + + +def main(): + parser = argparse.ArgumentParser(description="deployment scaling replicas") + parser.add_argument( + "--namespace", "-n", type=str, required=True, help="deployment namespace" + ) + parser.add_argument( + "--deployment", "--deploy", type=str, required=True, help="deployment name" + ) + parser.add_argument( + "--replicas", type=int, required=True, help="replicas number", default=-1 + ) + args = parser.parse_args() + + time.sleep(random.uniform(1, 10)) + + if os.getenv("KUBERNETES_SERVICE_HOST"): + config.load_incluster_config() + else: + config.load_kube_config() + + v1 = client.AppsV1Api() + + patch_deployment( + client=v1, + deployment=args.deployment, + namespace=args.namespace, + replicas=args.replicas, + ) + + +if __name__ == "__main__": + main() diff --git a/schedule_scaling/hpa-script.py b/schedule_scaling/hpa-script.py new file mode 100644 index 0000000..90aac8e --- /dev/null +++ b/schedule_scaling/hpa-script.py @@ -0,0 +1,92 @@ +from kubernetes import client, config +from kubernetes.client.rest import ApiException +import time +import datetime +import sys +import random +import os +import argparse + + +def _patch_hpa(client, hpa, namespace, body): + try: + client.patch_namespaced_horizontal_pod_autoscaler(hpa, namespace, body) + + except ApiException as err: + print( + "[ERROR]", + datetime.datetime.now(), + "ApiException! HPA {}/{} has not been updated with body: {}. err: {}".format( + namespace, hpa, body, err + ), + ) + return + except Exception as err: + print( + "[ERROR]", + datetime.datetime.now(), + "Exception! Something went wrong... HPA {}/{} has not been scaled with body: {}".format( + namespace, + hpa, + body, + err, + ), + ) + return + + print( + "[INFO]", + datetime.datetime.now(), + "HPA {}/{} has been updated to {}".format(namespace, hpa, body), + ) + + +def patch_hpa(client, hpa, namespace, max_replicas, min_replicas): + body = "" + if max_replicas > 0 and min_replicas > 0: + body = {"spec": {"minReplicas": min_replicas, "maxReplicas": max_replicas}} + + elif min_replicas > 0: + body = {"spec": {"minReplicas": min_replicas}} + + elif max_replicas > 0: + body = {"spec": {"maxReplicas": max_replicas}} + + _patch_hpa(client, hpa, namespace, body) + + +def main(): + parser = argparse.ArgumentParser(description="deployment scaling replicas") + parser.add_argument( + "--namespace", "-n", type=str, required=True, help="hpa namespace" + ) + parser.add_argument("--hpa", type=str, required=True, help="hpa name") + parser.add_argument( + "--min_replicas", type=int, help="minReplicas number", default=-1 + ) + parser.add_argument( + "--max_replicas", type=int, help="maxReplicas number", default=-1 + ) + args = parser.parse_args() + + # jitter + time.sleep(random.uniform(1, 10)) + + if os.getenv("KUBERNETES_SERVICE_HOST"): + config.load_incluster_config() + else: + config.load_kube_config() + + v1 = client.AutoscalingV1Api() + + patch_hpa( + client=v1, + hpa=args.hpa, + namespace=args.namespace, + max_replicas=args.max_replicas, + min_replicas=args.min_replicas, + ) + + +if __name__ == "__main__": + main() diff --git a/schedule_scaling/schedule_scaling.py b/schedule_scaling/schedule_scaling.py index add6ae3..8b7f034 100644 --- a/schedule_scaling/schedule_scaling.py +++ b/schedule_scaling/schedule_scaling.py @@ -1,200 +1,314 @@ """ Collecting Deployments configured for Scaling """ import os -import pathlib import json import logging -import shutil -import pykube +from kubernetes import client, config +from kubernetes.client.rest import ApiException import re import urllib.request from crontab import CronTab import datetime +from socket import gethostname -EXECUTION_TIME = 'datetime.datetime.now().strftime("%d-%m-%Y %H:%M UTC")' +deployment_script = os.getenv("CRON_SCRIPT_PATH_BASE") + "/deployment-script.py" +hpa_script = os.getenv("CRON_SCRIPT_PATH_BASE") + "/hpa-script.py" +schedule_actions_annotation = "zalando.org/schedule-actions" -def create_job_directory(): - """ This directory will hold the temp python scripts to execute the scaling jobs """ - temp__dir = '/tmp/scaling_jobs' - if os.path.isdir(temp__dir): - shutil.rmtree(temp__dir) - pathlib.Path(temp__dir).mkdir(parents=True, exist_ok=True) - - -def clear_cron(): - """ This is needed so that if any one removes his scaling action - it should not be trigger again """ - my_cron = CronTab(user='root') - my_cron.remove_all(comment="Scheduling_Jobs") +def clear_cron(comment): + """This is needed so that if any one removes his scaling action + it should not be trigger again""" + my_cron = CronTab(user="root") + my_cron.remove_all(comment=comment) my_cron.write() -def get_kube_api(): - """ Initiating the API from Service Account or when running locally from ~/.kube/config """ - try: - config = pykube.KubeConfig.from_service_account() - except FileNotFoundError: - # local testing - config = pykube.KubeConfig.from_file(os.path.expanduser('~/.kube/config')) - api = pykube.HTTPClient(config) - return api - - def deployments_for_scale(): - ''' + """ Getting the deployments configured for schedule scaling... - ''' - api = get_kube_api() - deployments = [] + """ + v1 = client.AppsV1Api() scaling_dict = {} - for namespace in list(pykube.Namespace.objects(api)): - namespace = str(namespace) - for deployment in pykube.Deployment.objects(api).filter(namespace=namespace): - annotations = deployment.metadata.get('annotations', {}) - f_deployment = str(namespace + '/' + str(deployment)) - - schedule_actions = parse_content(annotations.get('zalando.org/schedule-actions', None), f_deployment) + deployments = v1.list_deployment_for_all_namespaces(watch=False) - if schedule_actions is None or len(schedule_actions) == 0: - continue + for i in deployments.items: + if schedule_actions_annotation in i.metadata.annotations: + deployment = str(i.metadata.namespace + "/" + str(i.metadata.name)) + schedule_actions = parse_content( + i.metadata.annotations[schedule_actions_annotation], deployment + ) + scaling_dict[deployment] = schedule_actions - deployments.append([deployment.metadata['name']]) - scaling_dict[f_deployment] = schedule_actions - if not deployments: - logging.info('No deployment is configured for schedule scaling') + if not scaling_dict: + logging.info("No deployment is configured for schedule scaling") return scaling_dict -def hpa_job_creator(): - """ Create CronJobs for configured hpa """ - - hpa__for_scale = hpa_for_scale() - print("[INFO]", datetime.datetime.now(), "HPA collected for scaling: ") - for namespace_hpa, schedules in hpa__for_scale.items(): - namespace = namespace_hpa.split("/")[0] - hpa = namespace_hpa.split("/")[1] - for n in range(len(schedules)): - schedules_n = schedules[n] - minReplicas = schedules_n.get('minReplicas', None) - maxReplicas = schedules_n.get('maxReplicas', None) - schedule = schedules_n.get('schedule', None) - print("[INFO]", datetime.datetime.now(), "HPA: {}, Namespace: {}, MinReplicas: {}, MaxReplicas: {}, Schedule: {}".format(hpa, namespace, minReplicas, maxReplicas, schedule)) - - with open("/root/schedule_scaling/templates/hpa-script.py", 'r') as script: - script = script.read() - hpa_script = script % { - 'namespace': namespace, - 'name': hpa, - 'minReplicas': minReplicas, - 'maxReplicas': maxReplicas, - 'time': EXECUTION_TIME, - } - i = 0 - while os.path.exists("/tmp/scaling_jobs/%s-%s.py" % (hpa, i)): - i += 1 - script_creator = open("/tmp/scaling_jobs/%s-%s.py" % (hpa, i), "w") - script_creator.write(hpa_script) - script_creator.close() - cmd = ['sleep 1 ; . /root/.profile ; /usr/bin/python', script_creator.name, - '2>&1 | tee -a', os.environ['SCALING_LOG_FILE']] - cmd = ' '.join(map(str, cmd)) - scaling_cron = CronTab(user='root') - job = scaling_cron.new(command=cmd) - try: - job.setall(schedule) - job.set_comment("Scheduling_Jobs") - scaling_cron.write() - except Exception: - print("[ERROR]", datetime.datetime.now(),'HPA: {} has syntax error in the schedule'.format(hpa)) - pass def hpa_for_scale(): - ''' + """ Getting the hpa configured for schedule scaling... - ''' - api = get_kube_api() - hpas = [] + """ + + v1 = client.AutoscalingV1Api() scaling_dict = {} - for namespace in list(pykube.Namespace.objects(api)): - namespace = str(namespace) - for hpa in pykube.HorizontalPodAutoscaler.objects(api).filter(namespace=namespace): - annotations = hpa.metadata.get('annotations', {}) - f_hpa = str(namespace + '/' + str(hpa)) - schedule_actions = parse_content(annotations.get('zalando.org/schedule-actions', None), f_hpa) + hpas = v1.list_horizontal_pod_autoscaler_for_all_namespaces(watch=False) - if schedule_actions is None or len(schedule_actions) == 0: - continue + for i in hpas.items: + if schedule_actions_annotation in i.metadata.annotations: + hpa = str(i.metadata.namespace + "/" + str(i.metadata.name)) + schedule_actions = parse_content( + i.metadata.annotations[schedule_actions_annotation], hpa + ) + scaling_dict[hpa] = schedule_actions - hpas.append([hpa.metadata['name']]) - scaling_dict[f_hpa] = schedule_actions - if not hpas: - logging.info('No hpa is configured for schedule scaling') + if not scaling_dict: + logging.info("No hpa is configured for schedule scaling") return scaling_dict + def deployment_job_creator(): - """ Create CronJobs for configured Deployments """ + """Create CronJobs for configured Deployments""" deployments__for_scale = deployments_for_scale() - print("[INFO]",datetime.datetime.now(), "Deployments collected for scaling: ") + + deployment_job_creator_file = "/tmp/deployment_job_creator.json" + configmap_name = "kube-schedule-scaler-deployment-status" + if os.path.isfile(deployment_job_creator_file): + with open(deployment_job_creator_file, "r") as f: + old_deployments__for_scale = json.load(f) + if deployments__for_scale == old_deployments__for_scale: + patch_configmap(configmap_name=configmap_name) + + return + + cron_comment = "Scheduling_Jobs_Deployment" + clear_cron(cron_comment) for namespace_deployment, schedules in deployments__for_scale.items(): namespace = namespace_deployment.split("/")[0] deployment = namespace_deployment.split("/")[1] - for n in range(len(schedules)): - schedules_n = schedules[n] - replicas = schedules_n.get('replicas', None) - schedule = schedules_n.get('schedule', None) - print("[INFO]", datetime.datetime.now(), "Deployment: {}, Namespace: {}, Replicas: {}, Schedule: {}".format(deployment, namespace, replicas, schedule)) - - with open("/root/schedule_scaling/templates/deployment-script.py", 'r') as script: - script = script.read() - deployment_script = script % { - 'namespace': namespace, - 'name': deployment, - 'replicas': replicas, - 'time': EXECUTION_TIME, - } - i = 0 - while os.path.exists("/tmp/scaling_jobs/%s-%s.py" % (deployment, i)): - i += 1 - script_creator = open("/tmp/scaling_jobs/%s-%s.py" % (deployment, i), "w") - script_creator.write(deployment_script) - script_creator.close() - cmd = ['sleep 1 ; . /root/.profile ; /usr/bin/python', script_creator.name, - '2>&1 | tee -a', os.environ['SCALING_LOG_FILE']] - cmd = ' '.join(map(str, cmd)) - scaling_cron = CronTab(user='root') + for i, contents in enumerate(schedules): + replicas = contents.get("replicas", None) + if replicas is None: + print( + "[ERROR]", + datetime.datetime.now(), + "{}, Deployment: {}, Namespace: {} is not set replicas".format( + i, deployment, namespace + ), + ) + continue + schedule = contents.get("schedule", None) + # print( + # "[INFO]", + # datetime.datetime.now(), + # "{}, Deployment: {}, Namespace: {}, Replicas: {}, Schedule: {}".format( + # i, deployment, namespace, replicas, schedule + # ), + # ) + cmd = [ + ". /root/.profile ; python3", + deployment_script, + "--namespace", + namespace, + "--deployment", + deployment, + "--replicas", + replicas, + "2>&1 | tee -a", + os.environ["SCALING_LOG_FILE"], + ] + cmd = " ".join(map(str, cmd)) + # print(cmd) + scaling_cron = CronTab(user="root") + job = scaling_cron.new(command=cmd) + try: + job.set_comment(cron_comment) + job.setall(schedule) + scaling_cron.write() + except Exception as err: + print( + "[ERROR]", + datetime.datetime.now(), + "Deployment: {} has syntax error in the schedule".format( + deployment + ), + err, + ) + pass + + with open(deployment_job_creator_file, "w") as f: + json.dump(deployments__for_scale, f, indent=2) + + create_status_configmap( + json.dumps(deployments__for_scale, indent=2), configmap_name + ) + patch_configmap( + configmap_name=configmap_name, + configmap_data=json.dumps(deployments__for_scale, indent=2), + ) + + print("[INFO]", datetime.datetime.now(), "Deployment cronjob for scaling: ") + my_cron = CronTab(user="root") + for cron in my_cron.find_comment(comment=cron_comment): + print("[INFO]", datetime.datetime.now(), cron) + + +def hpa_job_creator(): + """Create CronJobs for configured hpa""" + + hpa__for_scale = hpa_for_scale() + # print("[INFO]", datetime.datetime.now(), "HPA collected for scaling: ") + + hpa_job_creator_file = "/tmp/hpa_job_creator.json" + configmap_name = "kube-schedule-scaler-hpa-status" + if os.path.isfile(hpa_job_creator_file): + with open(hpa_job_creator_file, "r") as f: + old_hpa__for_scale = json.load(f) + if hpa__for_scale == old_hpa__for_scale: + patch_configmap(configmap_name=configmap_name) + + return + + cron_comment = "Scheduling_Jobs_HPA" + clear_cron(cron_comment) + for namespace_hpa, schedules in hpa__for_scale.items(): + namespace = namespace_hpa.split("/")[0] + hpa = namespace_hpa.split("/")[1] + for i, contents in enumerate(schedules): + minReplicas = contents.get("minReplicas", None) + maxReplicas = contents.get("maxReplicas", None) + + if maxReplicas == 0 or maxReplicas == "0": + print( + "[ERROR]", + datetime.datetime.now(), + "HPA: {}, Namespace: {}, MaxReplicas: {} is not set to 0".format( + hpa, namespace, maxReplicas + ), + ) + continue + + if minReplicas == 0 or minReplicas == "0": + print( + "[ERROR]", + datetime.datetime.now(), + "HPA: {}, Namespace: {}, MinReplicas: {} is not set to 0".format( + hpa, namespace, minReplicas + ), + ) + continue + + schedule = contents.get("schedule", None) + # print( + # "[INFO]", + # datetime.datetime.now(), + # "{}, HPA: {}, Namespace: {}, MinReplicas: {}, MaxReplicas: {}, Schedule: {}".format( + # i, hpa, namespace, minReplicas, maxReplicas, schedule + # ), + # ) + + if minReplicas is not None and maxReplicas is not None: + cmd = [ + ". /root/.profile ; python3", + hpa_script, + "--namespace", + namespace, + "--hpa", + hpa, + "--min_replicas", + minReplicas, + "--max_replicas", + maxReplicas, + "2>&1 | tee -a", + os.environ["SCALING_LOG_FILE"], + ] + + elif minReplicas is not None: + cmd = [ + ". /root/.profile ; python3", + hpa_script, + "--namespace", + namespace, + "--hpa", + hpa, + "--min_replicas", + minReplicas, + "2>&1 | tee -a", + os.environ["SCALING_LOG_FILE"], + ] + + elif maxReplicas is not None: + cmd = [ + ". /root/.profile ; python3", + hpa_script, + "--namespace", + namespace, + "--hpa", + hpa, + "--max_replicas", + maxReplicas, + "2>&1 | tee -a", + os.environ["SCALING_LOG_FILE"], + ] + + cmd = " ".join(map(str, cmd)) + # print(cmd) + scaling_cron = CronTab(user="root") job = scaling_cron.new(command=cmd) try: + job.set_comment(cron_comment) job.setall(schedule) - job.set_comment("Scheduling_Jobs") scaling_cron.write() - except Exception: - print("[ERROR]", datetime.datetime.now(), 'Deployment: {} has syntax error in the schedule'.format(deployment)) + except Exception as err: + print( + "[ERROR]", + datetime.datetime.now(), + "HPA: {} has syntax error in the schedule".format(hpa), + err, + ) pass + with open(hpa_job_creator_file, "w") as f: + json.dump(hpa__for_scale, f, indent=2) + + create_status_configmap(json.dumps(hpa__for_scale, indent=2), configmap_name) + patch_configmap( + configmap_name=configmap_name, + configmap_data=json.dumps(hpa__for_scale, indent=2), + ) + + print("[INFO]", datetime.datetime.now(), "HPA cronjob for scaling: ") + my_cron = CronTab(user="root") + for cron in my_cron.find_comment(comment=cron_comment): + print("[INFO]", datetime.datetime.now(), cron) + + def parse_content(content, identifier): - if content == None: + if content is None: return [] if is_valid_url(content): schedules = fetch_schedule_actions_from_url(content) - if schedules == None: + if schedules is None: return [] return parse_schedules(schedules, identifier) return parse_schedules(content, identifier) + def is_valid_url(url): - return re.search('^(https?)://(\\S+)\.(\\S{2,}?)(/\\S+)?$', url, re.I) != None + return re.search("^(https?)://(\\S+)\.(\\S{2,}?)(/\\S+)?$", url, re.I) != None + def fetch_schedule_actions_from_url(url): request = urllib.request.urlopen(url) try: - content = request.read().decode('utf-8') + content = request.read().decode("utf-8") except: content = None finally: @@ -202,15 +316,116 @@ def fetch_schedule_actions_from_url(url): return content + def parse_schedules(schedules, identifier): try: return json.loads(schedules) except Exception as err: - print("[ERROR]", datetime.datetime.now(), '{} - Error in parsing JSON {} with error'.format(identifier, schedules), err) + print( + "[ERROR]", + datetime.datetime.now(), + "{} - Error in parsing JSON {} with error".format(identifier, schedules), + err, + ) return [] -if __name__ == '__main__': - create_job_directory() - clear_cron() + +def _create_configmap_object(name, namespace, configmap_data): + # Configureate ConfigMap metadata + metadata = client.V1ObjectMeta( + name=name, + namespace=namespace, + ) + configmap_object = client.V1ConfigMap( + api_version="v1", + kind="ConfigMap", + data=dict(schedules=configmap_data), + metadata=metadata, + ) + return configmap_object + + +def create_status_configmap(configmap_data, configmap_name): + v1 = client.CoreV1Api() + + if os.getenv("NAMESPACE"): + namespace = os.getenv("NAMESPACE") + else: + namespace = "kube-system" + + configmap_names = [] + for configmap in v1.list_namespaced_config_map(namespace, watch=False).items: + configmap_names.append(configmap.metadata.name) + + if configmap_name not in configmap_names: + configmap_object = _create_configmap_object( + configmap_name, namespace, configmap_data + ) + try: + v1.create_namespaced_config_map( + namespace=namespace, + body=configmap_object, + ) + + except ApiException as e: + print( + "[ERROR]", + datetime.datetime.now(), + "{} has not created.".format(configmap_name), + ) + + +def patch_configmap(configmap_name, configmap_data=None): + v1 = client.CoreV1Api() + + if configmap_data is None: + body = { + "metadata": { + "annotations": { + "kube-schedule-scaler/last-updated": datetime.datetime.now(), + "kube-schedule-scaler/checked-by": gethostname(), + } + } + } + + else: + body = { + "metadata": { + "annotations": { + "kube-schedule-scaler/last-updated": datetime.datetime.now(), + "kube-schedule-scaler/checked-by": gethostname(), + } + }, + "data": { + "schedules": configmap_data, + }, + } + + if os.getenv("NAMESPACE"): + namespace = os.getenv("NAMESPACE") + else: + namespace = "kube-system" + + try: + v1.patch_namespaced_config_map(configmap_name, namespace, body) + except ApiException as err: + print( + "[ERROR]", + datetime.datetime.now(), + "{} has not patched.".format(configmap_name), + err, + ) + + +def main(): + if os.getenv("KUBERNETES_SERVICE_HOST"): + config.load_incluster_config() + else: + config.load_kube_config() + deployment_job_creator() hpa_job_creator() + + +if __name__ == "__main__": + main() diff --git a/schedule_scaling/templates/deployment-script.py b/schedule_scaling/templates/deployment-script.py deleted file mode 100644 index bd1dd37..0000000 --- a/schedule_scaling/templates/deployment-script.py +++ /dev/null @@ -1,33 +0,0 @@ -import pykube -import operator -import time -import datetime -import random - -def get_kube_api(): - try: - config = pykube.KubeConfig.from_service_account() - except FileNotFoundError: - # local testing - config = pykube.KubeConfig.from_file(os.path.expanduser('~/.kube/config')) - api = pykube.HTTPClient(config) - return api - -time.sleep(random.uniform(1, 10)) -api = get_kube_api() -deployment = pykube.Deployment.objects(api).filter(namespace="%(namespace)s").get(name="%(name)s") - -replicas = %(replicas)s - -if replicas != None: - deployment.replicas = replicas - try: - deployment.update() - except Exception as err: - print("[ERROR]", datetime.datetime.now(),'deployment %(namespace)s/%(name)s has not been updated',err) - - deployment = pykube.Deployment.objects(api).filter(namespace="%(namespace)s").get(name="%(name)s") - if deployment.replicas == replicas: - print("[INFO]", datetime.datetime.now(), 'Deployment %(namespace)s/%(name)s has been scaled successfully to %(replicas)s replica at', %(time)s) - else: - print("[ERROR]", datetime.datetime.now(), 'Something went wrong... deployment %(namespace)s/%(name)s has not been scaled to %(replicas)s') diff --git a/schedule_scaling/templates/hpa-script.py b/schedule_scaling/templates/hpa-script.py deleted file mode 100644 index fa3d91e..0000000 --- a/schedule_scaling/templates/hpa-script.py +++ /dev/null @@ -1,81 +0,0 @@ -import pykube -import operator -import time -import datetime -import sys -import random - -def get_kube_api(): - try: - config = pykube.KubeConfig.from_service_account() - except FileNotFoundError: - # local testing - config = pykube.KubeConfig.from_file(os.path.expanduser('~/.kube/config')) - api = pykube.HTTPClient(config) - return api - -time.sleep(random.uniform(1, 10)) -api = get_kube_api() -hpa = pykube.HorizontalPodAutoscaler.objects(api).filter(namespace="%(namespace)s").get(name="%(name)s") - -maxReplicas = %(maxReplicas)s -minReplicas = %(minReplicas)s - -if hpa: - if maxReplicas != None and minReplicas != None: - hpa.obj["spec"]["maxReplicas"] = maxReplicas - hpa.obj["spec"]["minReplicas"] = minReplicas - - try: - hpa.update() - except Exception as err: - print("[ERROR]", datetime.datetime.now(),'HPA %(namespace)s/%(name)s has not been updated',err) - - hpa = pykube.HorizontalPodAutoscaler.objects(api).filter(namespace="%(namespace)s").get(name="%(name)s") - if hpa.obj["spec"]["maxReplicas"] == maxReplicas and hpa.obj["spec"]["minReplicas"] == minReplicas: - print("[INFO]", datetime.datetime.now(), 'HPA %(namespace)s/%(name)s has been adjusted to maxReplicas to %(maxReplicas)s at', %(time)s) - print("[INFO]", datetime.datetime.now(), 'HPA %(namespace)s/%(name)s has been adjusted to minReplicas to %(minReplicas)s at', %(time)s) - else: - print("[ERROR]", datetime.datetime.now(), ' Something went wrong... HPA %(namespace)s/%(name)s has not been scaled(maxReplicas to %(maxReplicas)s, minReplicas to %(minReplicas)s)') - - elif minReplicas != None: - currentMaxReplicas = hpa.obj["spec"].get('maxReplicas', {}) - - if currentMaxReplicas: - if currentMaxReplicas < minReplicas: - print("[ERROR]", datetime.datetime.now(), 'HPA %(namespace)s/%(name)s cannot be set minReplicas(desired:{}) larger than maxReplicas(current:{}).'.format(minReplicas, currentMaxReplicas)) - sys.exit(1) - - hpa.obj["spec"]["minReplicas"] = minReplicas - - try: - hpa.update() - except Exception as err: - print("[ERROR]", datetime.datetime.now(),'HPA %(namespace)s/%(name)s has not been updated',err) - - hpa = pykube.HorizontalPodAutoscaler.objects(api).filter(namespace="%(namespace)s").get(name="%(name)s") - if hpa.obj["spec"]["minReplicas"] == minReplicas: - print("[INFO]", datetime.datetime.now(), 'HPA %(namespace)s/%(name)s has been adjusted to minReplicas to %(minReplicas)s at', %(time)s) - else: - print("[ERROR]", datetime.datetime.now(), 'Something went wrong... HPA %(namespace)s/%(name)s has not been scaled(minReplicas to %(minReplicas)s)') - - elif maxReplicas != None: - currentMinReplicas = hpa.obj["spec"].get('minReplicas', {}) - - if currentMinReplicas: - if currentMinReplicas > maxReplicas: - print("[ERROR]", datetime.datetime.now(), 'HPA %(namespace)s/%(name)s cannot be set maxReplicas(desired:{}) larger than minReplicas(current:{}).'.format(maxReplicas, currentMinReplicas)) - sys.exit(1) - - hpa.obj["spec"]["maxReplicas"] = maxReplicas - - try: - hpa.update() - except Exception as err: - print("[ERROR]", datetime.datetime.now(),'HPA %(namespace)s/%(name)s has not been updated',err) - - hpa = pykube.HorizontalPodAutoscaler.objects(api).filter(namespace="%(namespace)s").get(name="%(name)s") - if hpa.obj["spec"]["maxReplicas"] == maxReplicas: - print("[INFO]", datetime.datetime.now(), 'HPA %(namespace)s/%(name)s has been adjusted to maxReplicas to %(maxReplicas)s at', %(time)s) - else: - print("[ERROR]", datetime.datetime.now(), 'Something went wrong... HPA %(namespace)s/%(name)s has not been scaled(maxReplicas to %(maxReplicas)s)') diff --git a/startup.sh b/startup.sh index bae3dfc..3c49255 100644 --- a/startup.sh +++ b/startup.sh @@ -13,6 +13,8 @@ if [ ! -e ${SCALING_LOG_FILE} ]; then fi # Starting Cron +export CRON_SCRIPT_PATH_BASE="/root/schedule_scaling" + /usr/sbin/cron -f & cron_pid=$! @@ -39,18 +41,21 @@ datetime=`date "+%Y-%m-%d %H:%M:%S.%6N"` echo "[INFO] $datetime Creating the main cron" echo " ## The main script to collect the deployments to be scaled ## -*/3 * * * * sleep 7; . /root/.profile; /usr/bin/python /root/schedule_scaling/schedule_scaling.py >> ${SCALING_LOG_FILE} 2>&1 +*/3 * * * * sleep 7; . /root/.profile; python3 /root/schedule_scaling/schedule_scaling.py >> ${SCALING_LOG_FILE} 2>&1 " | /usr/bin/crontab - # Running the main Script at the beginning datetime=`date "+%Y-%m-%d %H:%M:%S.%6N"` echo "[INFO] $datetime Running the main script at the beginning" -/usr/bin/python /root/schedule_scaling/schedule_scaling.py >> ${SCALING_LOG_FILE} 2>&1 +python3 /root/schedule_scaling/schedule_scaling.py + +#echo "[INFO] $datetime confirm cronjob for the begging" +#crontab -l # Run once at the startup of container datetime=`date "+%Y-%m-%d %H:%M:%S.%6N"` echo "[INFO] $datetime Run once at the startup of container" -/usr/bin/python /root/run_missed_jobs.py >> ${SCALING_LOG_FILE} +sleep 2; python3 /root/run_missed_jobs.py >> ${SCALING_LOG_FILE} 2>&1 trap 'jobs -p | xargs kill; sleep 10; echo === Finish this script ===; exit 0' SIGTERM