From b6844f65ae97bc2f7a1a0934106043bc04da5d8a Mon Sep 17 00:00:00 2001 From: guohelu <141622458+guohelu@users.noreply.github.com> Date: Wed, 27 Nov 2024 19:50:52 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=B8=8A=E4=BA=91=E5=91=A8=E6=9C=9F?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E6=B7=BB=E5=8A=A0=E6=9C=80=E4=BD=8E=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E9=97=B4=E9=9A=94=E6=97=B6=E9=97=B4=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=20--story=3D120737215=20(#7621)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: 上云周期任务添加最低任务间隔时间配置 --story=120737215 * fix: 修复逻辑问题 --story=120737215 * fix: 修复继承问题 --story=120737215 --- config/default.py | 5 +++ env.py | 5 +++ .../apis/drf/serilaziers/periodic_task.py | 30 ++++++++++++-- .../core/apis/drf/viewsets/periodic_task.py | 6 +-- .../tests/utils/strings/test_inspect_time.py | 41 +++++++++++++++++++ gcloud/utils/strings.py | 24 ++++++++++- requirements.txt | 1 + 7 files changed, 105 insertions(+), 7 deletions(-) create mode 100644 gcloud/tests/utils/strings/test_inspect_time.py diff --git a/config/default.py b/config/default.py index cfde7dba3e..1f9bef8ccc 100644 --- a/config/default.py +++ b/config/default.py @@ -868,6 +868,11 @@ def check_engine_admin_permission(request, *args, **kwargs): # 周期任务消息通知类型 PERIODIC_TASK_REMINDER_NOTIFY_TYPE = env.PERIODIC_TASK_REMINDER_NOTIFY_TYPE +# 周期任务最短时间间隔 +PERIODIC_TASK_SHORTEST_TIME = env.PERIODIC_TASK_SHORTEST_TIME +# 周期任务迭代次数 +PERIODIC_TASK_ITERATION = env.PERIODIC_TASK_ITERATION + # bk_audit ENABLE_BK_AUDIT = True if env.BK_AUDIT_DATA_TOKEN else False BK_AUDIT_SETTINGS = { diff --git a/env.py b/env.py index 30a10047df..b97c772517 100644 --- a/env.py +++ b/env.py @@ -150,6 +150,11 @@ # 周期任务消息通知类型 PERIODIC_TASK_REMINDER_NOTIFY_TYPE = json.loads(os.getenv("PERIODIC_TASK_REMINDER_NOTIFY_TYPE", '["email"]')) +# 周期任务最短时间间隔,以分钟为单位,例如:30,0 表示无限制 +PERIODIC_TASK_SHORTEST_TIME = int(os.getenv("PERIODIC_TASK_SHORTEST_TIME", 0)) +# 周期任务迭代次数 +PERIODIC_TASK_ITERATION = int(os.getenv("PERIODIC_TASK_ITERATION", 10)) + # bk_audit BK_AUDIT_ENDPOINT = os.getenv("BK_AUDIT_ENDPOINT", None) BK_AUDIT_DATA_TOKEN = os.getenv("BK_AUDIT_DATA_TOKEN", None) diff --git a/gcloud/core/apis/drf/serilaziers/periodic_task.py b/gcloud/core/apis/drf/serilaziers/periodic_task.py index c29b27b36c..670a8ba592 100644 --- a/gcloud/core/apis/drf/serilaziers/periodic_task.py +++ b/gcloud/core/apis/drf/serilaziers/periodic_task.py @@ -22,6 +22,8 @@ from rest_framework.validators import ValidationError import env +from gcloud.utils.strings import inspect_time +from gcloud.conf import settings from gcloud.constants import PROJECT from gcloud.core.apis.drf.serilaziers.project import ProjectSerializer from gcloud.core.models import Project, ProjectConfig @@ -152,9 +154,27 @@ def check_cron_params(cron, project): raise ValidationError("周期任务时间格式过长") -class CreatePeriodicTaskSerializer(serializers.ModelSerializer): - project = serializers.IntegerField(write_only=True) +class CronFieldSerializer(serializers.Serializer): cron = serializers.DictField(write_only=True) + + def inspect_cron(self, cron): + minute = cron.get("minute", "*") + hour = cron.get("hour", "*") + day_of_month = cron.get("day_of_month", "*") + month = cron.get("month", "*") + day_of_week = cron.get("day_of_week", "*") + + cron_expression = f"{minute} {hour} {day_of_month} {month} {day_of_week}" + + result = inspect_time(cron_expression, settings.PERIODIC_TASK_SHORTEST_TIME, settings.PERIODIC_TASK_ITERATION) + if not result: + raise serializers.ValidationError( + "The interval between tasks should be at least {} minutes".format(settings.PERIODIC_TASK_SHORTEST_TIME) + ) + + +class CreatePeriodicTaskSerializer(CronFieldSerializer, serializers.ModelSerializer): + project = serializers.IntegerField(write_only=True) template_source = serializers.CharField(required=False, default=PROJECT) pipeline_tree = ReadWriteSerializerMethodField() template_scheme_ids = ReadWriteSerializerMethodField() @@ -193,6 +213,8 @@ def validate_project(self, value): def validate(self, attrs): check_cron_params(attrs.get("cron"), attrs.get("project")) + if settings.PERIODIC_TASK_SHORTEST_TIME and not self.context["request"].user.is_superuser: + self.inspect_cron(attrs.get("cron")) return attrs class Meta: @@ -200,7 +222,7 @@ class Meta: fields = ["project", "cron", "name", "template_id", "pipeline_tree", "template_source", "template_scheme_ids"] -class PatchUpdatePeriodicTaskSerializer(serializers.Serializer): +class PatchUpdatePeriodicTaskSerializer(CronFieldSerializer, serializers.Serializer): cron = serializers.DictField(help_text="周期", required=False) project = serializers.IntegerField(help_text="项目ID", required=False) constants = serializers.DictField(help_text="执行参数", required=False) @@ -208,4 +230,6 @@ class PatchUpdatePeriodicTaskSerializer(serializers.Serializer): def validate(self, attrs): check_cron_params(attrs.get("cron"), attrs.get("project")) + if settings.PERIODIC_TASK_SHORTEST_TIME and not self.context["request"].user.is_superuser: + self.inspect_cron(attrs.get("cron")) return attrs diff --git a/gcloud/core/apis/drf/viewsets/periodic_task.py b/gcloud/core/apis/drf/viewsets/periodic_task.py index 658331065b..650ef834c1 100644 --- a/gcloud/core/apis/drf/viewsets/periodic_task.py +++ b/gcloud/core/apis/drf/viewsets/periodic_task.py @@ -251,7 +251,7 @@ def destroy(self, request, *args, **kwargs): return super(PeriodicTaskViewSet, self).destroy(request, *args, **kwargs) def create(self, request, *args, **kwargs): - serializer = CreatePeriodicTaskSerializer(data=request.data) + serializer = CreatePeriodicTaskSerializer(data=request.data, context={"request": request}) serializer.is_valid(raise_exception=True) try: self._handle_serializer(request, serializer) @@ -270,7 +270,7 @@ def create(self, request, *args, **kwargs): def update(self, request, *args, **kwargs): instance = self.get_object() - serializer = CreatePeriodicTaskSerializer(instance, data=request.data) + serializer = CreatePeriodicTaskSerializer(instance, data=request.data, context={"request": request}) serializer.is_valid(raise_exception=True) try: self._handle_serializer(request, serializer) @@ -287,7 +287,7 @@ def update(self, request, *args, **kwargs): def partial_update(self, request, *args, **kwargs): instance = self.get_object() - serializer = PatchUpdatePeriodicTaskSerializer(data=request.data) + serializer = PatchUpdatePeriodicTaskSerializer(data=request.data, context={"request": request}) serializer.is_valid(raise_exception=True) with transaction.atomic(): diff --git a/gcloud/tests/utils/strings/test_inspect_time.py b/gcloud/tests/utils/strings/test_inspect_time.py new file mode 100644 index 0000000000..155ed06f06 --- /dev/null +++ b/gcloud/tests/utils/strings/test_inspect_time.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017-2020 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +from django.test import TestCase + +from gcloud.utils.strings import inspect_time + + +class InspectTimeTestCase(TestCase): + def test_inspect_time(self): + cron = "* * * * *" + shortest_time = 30 + iter_count = 10 + self.assertFalse(inspect_time(cron=cron, shortest_time=shortest_time, iter_count=iter_count)) + + def test_fail_inspect_time(self): + cron = "*/15 * * * *" + shortest_time = 30 + iter_count = 10 + self.assertFalse(inspect_time(cron=cron, shortest_time=shortest_time, iter_count=iter_count)) + + def test_success_inspect_time(self): + cron = "15 2 * * *" + shortest_time = 30 + iter_count = 10 + self.assertTrue(inspect_time(cron=cron, shortest_time=shortest_time, iter_count=iter_count)) + + def test_iter_count_inspect_time(self): + cron = "*/15 * * * *" + shortest_time = 30 + iter_count = 100 + self.assertFalse(inspect_time(cron=cron, shortest_time=shortest_time, iter_count=iter_count)) diff --git a/gcloud/utils/strings.py b/gcloud/utils/strings.py index 9657df3498..706bf68937 100644 --- a/gcloud/utils/strings.py +++ b/gcloud/utils/strings.py @@ -13,7 +13,8 @@ import re import logging - +from croniter import croniter +from datetime import datetime, timedelta from gcloud.constants import TEMPLATE_NODE_NAME_MAX_LENGTH, AE logger = logging.getLogger("root") @@ -116,3 +117,24 @@ def django_celery_beat_cron_time_format_fit(cron_str): cron_config = {time_format: cron_time for time_format, cron_time in zip(time_formats, cron_times)} result_cron_list = [cron_config[unit] for unit in unit_order] + ["({})".format("/".join(unit_order))] + time_zone return " ".join(result_cron_list).strip() + + +def inspect_time(cron, shortest_time, iter_count): + """检查定时任务时间间隔是否符合要求 + :param cron: 定时任务配置 + :type cron str + :param shortest_time: 最短时间间隔,以分钟为单位,例如 30 + :type shortest_time int + :param iter_count: 迭代次数 + :type iter_count int + """ + + schedule_iter = croniter(cron) + # 计算指定次数内的最短时间间隔 + next_times = [schedule_iter.get_next(datetime) for _ in range(iter_count)] + min_interval = min((next_times[i] - next_times[i - 1] for i in range(1, len(next_times)))) + + if min_interval < timedelta(minutes=shortest_time): + return False + + return True diff --git a/requirements.txt b/requirements.txt index acd5115175..4727dfc8a8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -102,3 +102,4 @@ opentelemetry-instrumentation-logging==0.30b1 opentelemetry-instrumentation-requests==0.30b1 bk-notice-sdk==1.3.0 +croniter==1.4.1