Skip to content

Commit

Permalink
feat: 上云周期任务添加最低任务间隔时间配置 --story=120737215 (#7621)
Browse files Browse the repository at this point in the history
* feat: 上云周期任务添加最低任务间隔时间配置 --story=120737215

* fix: 修复逻辑问题 --story=120737215

* fix: 修复继承问题 --story=120737215
  • Loading branch information
guohelu authored Nov 27, 2024
1 parent 692b69d commit b6844f6
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 7 deletions.
5 changes: 5 additions & 0 deletions config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,11 @@ def check_engine_admin_permission(request, *args, **kwargs):
# 周期任务消息通知类型
PERIODIC_TASK_REMINDER_NOTIFY_TYPE = env.PERIODIC_TASK_REMINDER_NOTIFY_TYPE

# 周期任务最短时间间隔
PERIODIC_TASK_SHORTEST_TIME = env.PERIODIC_TASK_SHORTEST_TIME
# 周期任务迭代次数
PERIODIC_TASK_ITERATION = env.PERIODIC_TASK_ITERATION

# bk_audit
ENABLE_BK_AUDIT = True if env.BK_AUDIT_DATA_TOKEN else False
BK_AUDIT_SETTINGS = {
Expand Down
5 changes: 5 additions & 0 deletions env.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@
# 周期任务消息通知类型
PERIODIC_TASK_REMINDER_NOTIFY_TYPE = json.loads(os.getenv("PERIODIC_TASK_REMINDER_NOTIFY_TYPE", '["email"]'))

# 周期任务最短时间间隔,以分钟为单位,例如:30,0 表示无限制
PERIODIC_TASK_SHORTEST_TIME = int(os.getenv("PERIODIC_TASK_SHORTEST_TIME", 0))
# 周期任务迭代次数
PERIODIC_TASK_ITERATION = int(os.getenv("PERIODIC_TASK_ITERATION", 10))

# bk_audit
BK_AUDIT_ENDPOINT = os.getenv("BK_AUDIT_ENDPOINT", None)
BK_AUDIT_DATA_TOKEN = os.getenv("BK_AUDIT_DATA_TOKEN", None)
Expand Down
30 changes: 27 additions & 3 deletions gcloud/core/apis/drf/serilaziers/periodic_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from rest_framework.validators import ValidationError

import env
from gcloud.utils.strings import inspect_time
from gcloud.conf import settings
from gcloud.constants import PROJECT
from gcloud.core.apis.drf.serilaziers.project import ProjectSerializer
from gcloud.core.models import Project, ProjectConfig
Expand Down Expand Up @@ -152,9 +154,27 @@ def check_cron_params(cron, project):
raise ValidationError("周期任务时间格式过长")


class CreatePeriodicTaskSerializer(serializers.ModelSerializer):
project = serializers.IntegerField(write_only=True)
class CronFieldSerializer(serializers.Serializer):
cron = serializers.DictField(write_only=True)

def inspect_cron(self, cron):
minute = cron.get("minute", "*")
hour = cron.get("hour", "*")
day_of_month = cron.get("day_of_month", "*")
month = cron.get("month", "*")
day_of_week = cron.get("day_of_week", "*")

cron_expression = f"{minute} {hour} {day_of_month} {month} {day_of_week}"

result = inspect_time(cron_expression, settings.PERIODIC_TASK_SHORTEST_TIME, settings.PERIODIC_TASK_ITERATION)
if not result:
raise serializers.ValidationError(
"The interval between tasks should be at least {} minutes".format(settings.PERIODIC_TASK_SHORTEST_TIME)
)


class CreatePeriodicTaskSerializer(CronFieldSerializer, serializers.ModelSerializer):
project = serializers.IntegerField(write_only=True)
template_source = serializers.CharField(required=False, default=PROJECT)
pipeline_tree = ReadWriteSerializerMethodField()
template_scheme_ids = ReadWriteSerializerMethodField()
Expand Down Expand Up @@ -193,19 +213,23 @@ def validate_project(self, value):

def validate(self, attrs):
check_cron_params(attrs.get("cron"), attrs.get("project"))
if settings.PERIODIC_TASK_SHORTEST_TIME and not self.context["request"].user.is_superuser:
self.inspect_cron(attrs.get("cron"))
return attrs

class Meta:
model = PeriodicTask
fields = ["project", "cron", "name", "template_id", "pipeline_tree", "template_source", "template_scheme_ids"]


class PatchUpdatePeriodicTaskSerializer(serializers.Serializer):
class PatchUpdatePeriodicTaskSerializer(CronFieldSerializer, serializers.Serializer):
cron = serializers.DictField(help_text="周期", required=False)
project = serializers.IntegerField(help_text="项目ID", required=False)
constants = serializers.DictField(help_text="执行参数", required=False)
name = serializers.CharField(help_text="任务名", required=False)

def validate(self, attrs):
check_cron_params(attrs.get("cron"), attrs.get("project"))
if settings.PERIODIC_TASK_SHORTEST_TIME and not self.context["request"].user.is_superuser:
self.inspect_cron(attrs.get("cron"))
return attrs
6 changes: 3 additions & 3 deletions gcloud/core/apis/drf/viewsets/periodic_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def destroy(self, request, *args, **kwargs):
return super(PeriodicTaskViewSet, self).destroy(request, *args, **kwargs)

def create(self, request, *args, **kwargs):
serializer = CreatePeriodicTaskSerializer(data=request.data)
serializer = CreatePeriodicTaskSerializer(data=request.data, context={"request": request})
serializer.is_valid(raise_exception=True)
try:
self._handle_serializer(request, serializer)
Expand All @@ -270,7 +270,7 @@ def create(self, request, *args, **kwargs):

def update(self, request, *args, **kwargs):
instance = self.get_object()
serializer = CreatePeriodicTaskSerializer(instance, data=request.data)
serializer = CreatePeriodicTaskSerializer(instance, data=request.data, context={"request": request})
serializer.is_valid(raise_exception=True)
try:
self._handle_serializer(request, serializer)
Expand All @@ -287,7 +287,7 @@ def update(self, request, *args, **kwargs):

def partial_update(self, request, *args, **kwargs):
instance = self.get_object()
serializer = PatchUpdatePeriodicTaskSerializer(data=request.data)
serializer = PatchUpdatePeriodicTaskSerializer(data=request.data, context={"request": request})
serializer.is_valid(raise_exception=True)

with transaction.atomic():
Expand Down
41 changes: 41 additions & 0 deletions gcloud/tests/utils/strings/test_inspect_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
"""
Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community
Edition) available.
Copyright (C) 2017-2020 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
from django.test import TestCase

from gcloud.utils.strings import inspect_time


class InspectTimeTestCase(TestCase):
def test_inspect_time(self):
cron = "* * * * *"
shortest_time = 30
iter_count = 10
self.assertFalse(inspect_time(cron=cron, shortest_time=shortest_time, iter_count=iter_count))

def test_fail_inspect_time(self):
cron = "*/15 * * * *"
shortest_time = 30
iter_count = 10
self.assertFalse(inspect_time(cron=cron, shortest_time=shortest_time, iter_count=iter_count))

def test_success_inspect_time(self):
cron = "15 2 * * *"
shortest_time = 30
iter_count = 10
self.assertTrue(inspect_time(cron=cron, shortest_time=shortest_time, iter_count=iter_count))

def test_iter_count_inspect_time(self):
cron = "*/15 * * * *"
shortest_time = 30
iter_count = 100
self.assertFalse(inspect_time(cron=cron, shortest_time=shortest_time, iter_count=iter_count))
24 changes: 23 additions & 1 deletion gcloud/utils/strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

import re
import logging

from croniter import croniter
from datetime import datetime, timedelta
from gcloud.constants import TEMPLATE_NODE_NAME_MAX_LENGTH, AE

logger = logging.getLogger("root")
Expand Down Expand Up @@ -116,3 +117,24 @@ def django_celery_beat_cron_time_format_fit(cron_str):
cron_config = {time_format: cron_time for time_format, cron_time in zip(time_formats, cron_times)}
result_cron_list = [cron_config[unit] for unit in unit_order] + ["({})".format("/".join(unit_order))] + time_zone
return " ".join(result_cron_list).strip()


def inspect_time(cron, shortest_time, iter_count):
"""检查定时任务时间间隔是否符合要求
:param cron: 定时任务配置
:type cron str
:param shortest_time: 最短时间间隔,以分钟为单位,例如 30
:type shortest_time int
:param iter_count: 迭代次数
:type iter_count int
"""

schedule_iter = croniter(cron)
# 计算指定次数内的最短时间间隔
next_times = [schedule_iter.get_next(datetime) for _ in range(iter_count)]
min_interval = min((next_times[i] - next_times[i - 1] for i in range(1, len(next_times))))

if min_interval < timedelta(minutes=shortest_time):
return False

return True
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,4 @@ opentelemetry-instrumentation-logging==0.30b1
opentelemetry-instrumentation-requests==0.30b1

bk-notice-sdk==1.3.0
croniter==1.4.1

0 comments on commit b6844f6

Please sign in to comment.