From 6ca9690138c06848d1fc3fb3ebbf30041b1f4f12 Mon Sep 17 00:00:00 2001 From: wangyu096 Date: Wed, 11 Dec 2024 12:36:17 +0800 Subject: [PATCH 1/9] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E5=88=86?= =?UTF-8?q?=E5=BA=93=E5=88=86=E8=A1=A8=E8=BF=81=E7=A7=BB=E8=BF=87=E7=A8=8B?= =?UTF-8?q?=E4=B8=AD=EF=BC=8Ctask=5Finstance=5Fid=20=E5=8A=A8=E6=80=81?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E6=9D=A1=E4=BB=B6=E6=9E=84=E9=80=A0=E9=80=BB?= =?UTF-8?q?=E8=BE=91=20#3324?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../job/common/web/config/FilterConfig.java | 2 +- .../AppResourceScopeInterceptor.java | 279 ++++++++++++------ .../inner/ServiceStepInstanceResource.java | 2 +- 3 files changed, 197 insertions(+), 86 deletions(-) diff --git a/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/config/FilterConfig.java b/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/config/FilterConfig.java index 584d61a2ca..4587ac73eb 100644 --- a/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/config/FilterConfig.java +++ b/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/config/FilterConfig.java @@ -36,7 +36,7 @@ public FilterRegistrationBean repeatableRSRRFilterRegister() { FilterRegistrationBean registration = new FilterRegistrationBean<>(); registration.setFilter(repeatableRRRFilter()); - registration.addUrlPatterns("/esb/api/*"); + registration.addUrlPatterns("/esb/api/*", "/service/*", "/web/*"); registration.setName("repeatableReadRequestResponseFilter"); registration.setOrder(0); return registration; diff --git a/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/interceptor/AppResourceScopeInterceptor.java b/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/interceptor/AppResourceScopeInterceptor.java index 6b6d40465a..e1cc2b624a 100644 --- a/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/interceptor/AppResourceScopeInterceptor.java +++ b/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/interceptor/AppResourceScopeInterceptor.java @@ -66,10 +66,21 @@ public class AppResourceScopeInterceptor implements AsyncHandlerInterceptor { private static final Pattern SCOPE_PATTERN = Pattern.compile("/scope/(\\w+)/(\\d+)"); + private static final Pattern APP_PATTERN = Pattern.compile("/app/(\\d+)"); + private final AppScopeMappingService appScopeMappingService; + private final AppResourceScopeParser webAppResourceScopeParser; + + private final AppResourceScopeParser esbAppResourceScopeParser; + + private final AppResourceScopeParser internalAppResourceScopeParser; + public AppResourceScopeInterceptor(AppScopeMappingService appScopeMappingService) { this.appScopeMappingService = appScopeMappingService; + this.webAppResourceScopeParser = new WebAppResourceScopeParser(); + this.esbAppResourceScopeParser = new EsbAppResourceScopeParser(); + this.internalAppResourceScopeParser = new InternalAppResourceScopeParser(); } @Override @@ -99,12 +110,16 @@ private void addAppResourceScope(HttpServletRequest request) { AppResourceScope appResourceScope = null; switch (requestSource) { case WEB: - appResourceScope = parseAppResourceScopeFromPath(request.getRequestURI()); - log.debug("Scope from path:{}", appResourceScope); + appResourceScope = webAppResourceScopeParser.parseAppResourceScope(request); + log.debug("Scope from web:{}", appResourceScope); break; case ESB: - appResourceScope = parseAppResourceScopeFromQueryStringOrBody(request); - log.debug("Scope from query/body:{}", appResourceScope); + appResourceScope = esbAppResourceScopeParser.parseAppResourceScope(request); + log.debug("Scope from esb:{}", appResourceScope); + break; + case INTERNAL: + appResourceScope = internalAppResourceScopeParser.parseAppResourceScope(request); + log.debug("Scope from internal:{}", appResourceScope); break; default: log.debug("Ignore invalid scope: {}", requestSource); @@ -113,113 +128,209 @@ private void addAppResourceScope(HttpServletRequest request) { if (appResourceScope != null) { request.setAttribute("appResourceScope", appResourceScope); JobContextUtil.setAppResourceScope(appResourceScope); + } else { + log.debug("AppResourceScope is empty"); } } - private AppResourceScope parseAppResourceScopeFromPath(String requestURI) { - ResourceScope resourceScope = parseResourceScopeFromURI(requestURI); - if (resourceScope != null) { - return buildAppResourceScope(resourceScope); - } - - return null; + /** + * 从 http 请求中解析 AppResourceScope + */ + public interface AppResourceScopeParser { + AppResourceScope parseAppResourceScope(HttpServletRequest request); } - private ResourceScope parseResourceScopeFromURI(String requestURI) { - ResourceScope resourceScope = null; - Matcher scopeMatcher = SCOPE_PATTERN.matcher(requestURI); - if (scopeMatcher.find()) { - resourceScope = new ResourceScope(scopeMatcher.group(1), scopeMatcher.group(2)); + /** + * 从 http 请求中解析 AppResourceScope - Web 请求 + */ + public class WebAppResourceScopeParser implements AppResourceScopeParser { + @Override + public AppResourceScope parseAppResourceScope(HttpServletRequest request) { + return parseAppResourceScopeFromPath(request.getRequestURI()); } - return resourceScope; - } - - private AppResourceScope buildAppResourceScope(ResourceScope resourceScope) { - Long appId = appScopeMappingService.getAppIdByScope(resourceScope); - return new AppResourceScope(appId, resourceScope); - } - private AppResourceScope parseAppResourceScopeFromQueryStringOrBody(HttpServletRequest request) { - Map params = parseMultiValueFromQueryStringOrBody(request, "bk_scope_type", "bk_scope_id", - "bk_biz_id"); - String scopeType = params.get("bk_scope_type"); - String scopeId = params.get("bk_scope_id"); - String bizIdStr = params.get("bk_biz_id"); + private AppResourceScope parseAppResourceScopeFromPath(String requestURI) { + ResourceScope resourceScope = parseResourceScopeFromURI(requestURI); + if (resourceScope != null) { + return buildAppResourceScope(resourceScope); + } - if (StringUtils.isNotBlank(scopeType) && StringUtils.isNotBlank(scopeId)) { - return new AppResourceScope(scopeType, scopeId, null); + return null; } - // 如果兼容bk_biz_id参数 - if (FeatureToggle.checkFeature(FeatureIdConstants.FEATURE_BK_BIZ_ID_COMPATIBLE, - ToggleEvaluateContext.EMPTY)) { - // 兼容当前业务ID参数 - if (StringUtils.isNotBlank(bizIdStr)) { - long bizId = Long.parseLong(bizIdStr); - // [8000000,9999999]是迁移业务集之前约定的业务集ID范围。为了兼容老的API调用方,在这个范围内的bizId解析为业务集 - scopeId = bizIdStr; - if (bizId >= JOB_BUILD_IN_BIZ_SET_ID_MIN && bizId <= JOB_BUILD_IN_BIZ_SET_ID_MAX) { - Long appId = appScopeMappingService.getAppIdByScope(ResourceScopeTypeEnum.BIZ_SET.getValue(), - scopeId); - return new AppResourceScope(ResourceScopeTypeEnum.BIZ_SET, scopeId, appId); - } else { - Long appId = appScopeMappingService.getAppIdByScope(ResourceScopeTypeEnum.BIZ.getValue(), scopeId); - return new AppResourceScope(ResourceScopeTypeEnum.BIZ, scopeId, appId); - } + private ResourceScope parseResourceScopeFromURI(String requestURI) { + ResourceScope resourceScope = null; + Matcher scopeMatcher = SCOPE_PATTERN.matcher(requestURI); + if (scopeMatcher.find()) { + resourceScope = new ResourceScope(scopeMatcher.group(1), scopeMatcher.group(2)); } + return resourceScope; + } + + private AppResourceScope buildAppResourceScope(ResourceScope resourceScope) { + Long appId = appScopeMappingService.getAppIdByScope(resourceScope); + return new AppResourceScope(appId, resourceScope); } - // 其他情况返回null,后续拦截器会处理null - return null; } /** - * 从请求的解析多个参数 - * - * @param request http请求 - * @param keys 参数名称 - * @return Map + * 从 http 请求中解析 AppResourceScope - ESB/蓝鲸网关请求 */ - private Map parseMultiValueFromQueryStringOrBody(HttpServletRequest request, String... keys) { - Map params = new HashMap<>(); - try { - if (request.getMethod().equals(HttpMethod.POST.name()) - || request.getMethod().equals(HttpMethod.PUT.name())) { - if (!(request instanceof RepeatableReadWriteHttpServletRequest)) { - return params; + public class EsbAppResourceScopeParser implements AppResourceScopeParser { + @Override + public AppResourceScope parseAppResourceScope(HttpServletRequest request) { + return parseAppResourceScopeFromQueryStringOrBody(request); + } + + private AppResourceScope parseAppResourceScopeFromQueryStringOrBody(HttpServletRequest request) { + Map params = parseMultiValueFromQueryStringOrBody(request, + "bk_scope_type", "bk_scope_id", "bk_biz_id"); + String scopeType = params.get("bk_scope_type"); + String scopeId = params.get("bk_scope_id"); + String bizIdStr = params.get("bk_biz_id"); + + if (StringUtils.isNotBlank(scopeType) && StringUtils.isNotBlank(scopeId)) { + return new AppResourceScope(scopeType, scopeId, null); + } + + // 如果兼容bk_biz_id参数 + if (FeatureToggle.checkFeature(FeatureIdConstants.FEATURE_BK_BIZ_ID_COMPATIBLE, + ToggleEvaluateContext.EMPTY)) { + // 兼容当前业务ID参数 + if (StringUtils.isNotBlank(bizIdStr)) { + long bizId = Long.parseLong(bizIdStr); + // [8000000,9999999]是迁移业务集之前约定的业务集ID范围。为了兼容老的API调用方,在这个范围内的bizId解析为业务集 + scopeId = bizIdStr; + if (bizId >= JOB_BUILD_IN_BIZ_SET_ID_MIN && bizId <= JOB_BUILD_IN_BIZ_SET_ID_MAX) { + Long appId = appScopeMappingService.getAppIdByScope(ResourceScopeTypeEnum.BIZ_SET.getValue(), + scopeId); + return new AppResourceScope(ResourceScopeTypeEnum.BIZ_SET, scopeId, appId); + } else { + Long appId = appScopeMappingService.getAppIdByScope(ResourceScopeTypeEnum.BIZ.getValue(), + scopeId); + return new AppResourceScope(ResourceScopeTypeEnum.BIZ, scopeId, appId); + } } - RepeatableReadWriteHttpServletRequest wrapperRequest = - (RepeatableReadWriteHttpServletRequest) request; - if (StringUtils.isNotBlank(wrapperRequest.getBody())) { - ObjectNode jsonBody = (ObjectNode) JsonUtils.toJsonNode(wrapperRequest.getBody()); - if (jsonBody == null) { + } + // 其他情况返回null,后续拦截器会处理null + return null; + } + + /** + * 从请求的解析多个参数 + * + * @param request http请求 + * @param keys 参数名称 + * @return Map + */ + private Map parseMultiValueFromQueryStringOrBody(HttpServletRequest request, String... keys) { + Map params = new HashMap<>(); + try { + if (request.getMethod().equals(HttpMethod.POST.name()) + || request.getMethod().equals(HttpMethod.PUT.name())) { + if (!(request instanceof RepeatableReadWriteHttpServletRequest)) { return params; } + RepeatableReadWriteHttpServletRequest wrapperRequest = + (RepeatableReadWriteHttpServletRequest) request; + if (StringUtils.isNotBlank(wrapperRequest.getBody())) { + ObjectNode jsonBody = (ObjectNode) JsonUtils.toJsonNode(wrapperRequest.getBody()); + if (jsonBody == null) { + return params; + } + for (String key : keys) { + JsonNode valueNode = jsonBody.get(key); + String value = (valueNode == null || valueNode.isNull()) ? null : + jsonBody.get(key).asText(); + log.debug("Parsed from POST/PUT: {}={}", key, value); + if (value != null) { + params.put(key, value); + } + } + } + } else if (request.getMethod().equals(HttpMethod.GET.name())) { for (String key : keys) { - JsonNode valueNode = jsonBody.get(key); - String value = (valueNode == null || valueNode.isNull()) ? null : jsonBody.get(key).asText(); - log.debug("Parsed from POST/PUT: {}={}", key, value); + String value = request.getParameter(key); + log.debug("Parsed from GET: {}={}", key, value); if (value != null) { params.put(key, value); } } } - } else if (request.getMethod().equals(HttpMethod.GET.name())) { - for (String key : keys) { - String value = request.getParameter(key); - log.debug("Parsed from GET: {}={}", key, value); - if (value != null) { - params.put(key, value); - } - } + return params; + } catch (Exception e) { + String msg = MessageFormatter.format( + "Fail to parse keys: {} from request", + keys + ).getMessage(); + log.warn(msg, e); } return params; - } catch (Exception e) { - String msg = MessageFormatter.format( - "Fail to parse keys: {} from request", - keys - ).getMessage(); - log.warn(msg, e); } - return params; + } + + /** + * 从 http 请求中解析 AppResourceScope - Job 内部请求 + */ + public class InternalAppResourceScopeParser implements AppResourceScopeParser { + @Override + public AppResourceScope parseAppResourceScope(HttpServletRequest request) { + // 优先从 path 解析 + Long appId = parseAppIdFromPath(request.getRequestURI()); + if (appId == null) { + // 从 QueryParam解析 + appId = parseAppIdFromQueryParam(request); + } + if (appId == null) { + // 从 Request Body 解析 + appId = parseAppIdFromRequestBody(request); + } + if (appId == null) { + return null; + } + return appScopeMappingService.getAppResourceScope(appId); + } + + private Long parseAppIdFromPath(String requestURI) { + Long appId = null; + Matcher appIdMatcher = APP_PATTERN.matcher(requestURI); + if (appIdMatcher.find()) { + appId = Long.parseLong(appIdMatcher.group()); + } + return appId; + } + + private Long parseAppIdFromQueryParam(HttpServletRequest request) { + String value = request.getParameter("appId"); + log.debug("Parsed from GET: {}={}", "appId", value); + return value != null ? Long.parseLong(value) : null; + } + + private Long parseAppIdFromRequestBody(HttpServletRequest request) { + if (!request.getMethod().equals(HttpMethod.POST.name()) + && !request.getMethod().equals(HttpMethod.PUT.name())) { + return null; + } + if (!(request instanceof RepeatableReadWriteHttpServletRequest)) { + return null; + } + + RepeatableReadWriteHttpServletRequest wrapperRequest = + (RepeatableReadWriteHttpServletRequest) request; + if (StringUtils.isNotBlank(wrapperRequest.getBody())) { + ObjectNode jsonBody = (ObjectNode) JsonUtils.toJsonNode(wrapperRequest.getBody()); + if (jsonBody == null) { + return null; + } + String fieldName = "appId"; + JsonNode valueNode = jsonBody.get(fieldName); + String value = (valueNode == null || valueNode.isNull()) ? null : + jsonBody.get(fieldName).asText(); + log.debug("Parsed from POST/PUT: {}={}", fieldName, value); + return value != null ? Long.parseLong(value) : null; + } else { + return null; + } + } } } diff --git a/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/api/inner/ServiceStepInstanceResource.java b/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/api/inner/ServiceStepInstanceResource.java index 378463ef1d..cd75c5aaf2 100644 --- a/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/api/inner/ServiceStepInstanceResource.java +++ b/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/api/inner/ServiceStepInstanceResource.java @@ -41,7 +41,7 @@ @SmartFeignClient(value = "job-execute", contextId = "stepInstanceResource") @InternalAPI public interface ServiceStepInstanceResource { - @GetMapping("/service/stepInstance/appIds/{appId}/stepInstanceIds/{stepInstanceId}") + @GetMapping("/service/stepInstance/app/{appId}/stepInstanceId/{stepInstanceId}") InternalResponse getStepInstance( @RequestHeader("username") String username, From 7e0bb795eff36dcb4427d87f8a184b67ce2f0ca8 Mon Sep 17 00:00:00 2001 From: wangyu096 Date: Wed, 11 Dec 2024 14:44:48 +0800 Subject: [PATCH 2/9] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E5=88=86?= =?UTF-8?q?=E5=BA=93=E5=88=86=E8=A1=A8=E8=BF=81=E7=A7=BB=E8=BF=87=E7=A8=8B?= =?UTF-8?q?=E4=B8=AD=EF=BC=8Ctask=5Finstance=5Fid=20=E5=8A=A8=E6=80=81?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E6=9D=A1=E4=BB=B6=E6=9E=84=E9=80=A0=E9=80=BB?= =?UTF-8?q?=E8=BE=91=20#3324?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../strategy/AbstractToggleStrategy.java | 4 +-- .../AppResourceScopeInterceptor.java | 7 +++-- .../impl/TaskInstanceIdDynamicCondition.java | 29 ++++++++++--------- .../engine/listener/BaseJobMqListener.java | 3 -- 4 files changed, 20 insertions(+), 23 deletions(-) diff --git a/src/backend/commons/common-service/src/main/java/com/tencent/bk/job/common/service/toggle/strategy/AbstractToggleStrategy.java b/src/backend/commons/common-service/src/main/java/com/tencent/bk/job/common/service/toggle/strategy/AbstractToggleStrategy.java index 4827437d3d..2705e43056 100644 --- a/src/backend/commons/common-service/src/main/java/com/tencent/bk/job/common/service/toggle/strategy/AbstractToggleStrategy.java +++ b/src/backend/commons/common-service/src/main/java/com/tencent/bk/job/common/service/toggle/strategy/AbstractToggleStrategy.java @@ -87,9 +87,7 @@ public void assertRequiredInitParam(String paramName) { public boolean checkRequiredContextParam(ToggleEvaluateContext context, String paramName) { boolean checkResult = true; if (context.getParam(paramName) == null) { - String msg = MessageFormatter.format( - "Context param {} is required for evaluate", paramName).getMessage(); - log.warn(msg); + log.info("Context param {} is required for evaluate", paramName); checkResult = false; } return checkResult; diff --git a/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/interceptor/AppResourceScopeInterceptor.java b/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/interceptor/AppResourceScopeInterceptor.java index e1cc2b624a..cd972ee575 100644 --- a/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/interceptor/AppResourceScopeInterceptor.java +++ b/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/interceptor/AppResourceScopeInterceptor.java @@ -295,14 +295,15 @@ private Long parseAppIdFromPath(String requestURI) { Long appId = null; Matcher appIdMatcher = APP_PATTERN.matcher(requestURI); if (appIdMatcher.find()) { - appId = Long.parseLong(appIdMatcher.group()); + appId = Long.parseLong(appIdMatcher.group(1)); } return appId; } private Long parseAppIdFromQueryParam(HttpServletRequest request) { - String value = request.getParameter("appId"); - log.debug("Parsed from GET: {}={}", "appId", value); + String queryParam = "appId"; + String value = request.getParameter(queryParam); + log.debug("Parsed from GET: {}={}", queryParam, value); return value != null ? Long.parseLong(value) : null; } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java index 387f055ab4..1fa3e3e664 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java @@ -25,7 +25,6 @@ package com.tencent.bk.job.execute.dao.impl; import com.tencent.bk.job.common.model.dto.ResourceScope; -import com.tencent.bk.job.common.util.JobContextUtil; import com.tencent.bk.job.common.util.toggle.ToggleEvaluateContext; import com.tencent.bk.job.common.util.toggle.ToggleStrategyContextParams; import com.tencent.bk.job.common.util.toggle.feature.FeatureIdConstants; @@ -51,33 +50,35 @@ public class TaskInstanceIdDynamicCondition { public static Condition build(Long taskInstanceId, Function taskInstanceIdConditionBuilder) { + ToggleEvaluateContext toggleEvaluateContext; JobExecuteContext jobExecuteContext = JobExecuteContextThreadLocalRepo.get(); if (jobExecuteContext == null) { log.info("TaskInstanceIdDynamicCondition : Empty JobExecuteContext!"); // JobExecuteContext 正常应该不会为 null 。为了不影响请求正常处理,忽略错误,直接返回 TRUE Condition // (不会影响 DAO 查询,task_instance_id 仅作为分片功能实用,实际业务数据关系并不强依赖 task_instance_id) - return DSL.trueCondition(); - } - ResourceScope resourceScope = jobExecuteContext.getResourceScope(); - if (resourceScope == null) { - log.info("TaskInstanceIdDynamicCondition : Empty resource scope!"); - // 无法根据业务决定是否使用 task_instance_id 作为查询条件。为了不影响请求正常处理,直接返回 TRUE Condition - // (不会影响 DAO 查询,task_instance_id 仅作为分片功能,实际业务数据关系并不强依赖 task_instance_id) - return DSL.trueCondition(); + toggleEvaluateContext = ToggleEvaluateContext.EMPTY; + } else { + ResourceScope resourceScope = jobExecuteContext.getResourceScope(); + if (resourceScope != null) { + toggleEvaluateContext = ToggleEvaluateContext.builder() + .addContextParam(ToggleStrategyContextParams.CTX_PARAM_RESOURCE_SCOPE, resourceScope); + } else { + log.info("TaskInstanceIdDynamicCondition : Empty resource scope!"); + toggleEvaluateContext = ToggleEvaluateContext.EMPTY; + } } - if (FeatureToggle.checkFeature( - FeatureIdConstants.DAO_ADD_TASK_INSTANCE_ID, - ToggleEvaluateContext.builder() - .addContextParam(ToggleStrategyContextParams.CTX_PARAM_RESOURCE_SCOPE, - JobContextUtil.getAppResourceScope()))) { + + if (FeatureToggle.checkFeature(FeatureIdConstants.DAO_ADD_TASK_INSTANCE_ID, toggleEvaluateContext)) { if (taskInstanceId == null || taskInstanceId <= 0L) { log.info("TaskInstanceIdDynamicCondition : Invalid taskInstanceId {}", taskInstanceId); // 为了不影响兼容性,忽略错误 return DSL.trueCondition(); } else { + log.debug("TaskInstanceIdDynamicCondition: Use task_instance_id condition"); return taskInstanceIdConditionBuilder.apply(taskInstanceId); } } else { + log.debug("TaskInstanceIdDynamicCondition: Ignore task_instance_id condition"); return DSL.trueCondition(); } } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/BaseJobMqListener.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/BaseJobMqListener.java index aedf83078c..30a99eb961 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/BaseJobMqListener.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/BaseJobMqListener.java @@ -47,18 +47,15 @@ public final void onEvent(Message message) { } private void beforeHandleMessage(Message message) { - log.info("beforeHandleMessage"); MessageHeaders headers = message.getHeaders(); String jobExecuteContextJson = (String) headers.get(JobExecuteContext.KEY); if (StringUtils.isNotEmpty(jobExecuteContextJson)) { - log.info("setJobExecuteContextThreadLocalRepo"); JobExecuteContextThreadLocalRepo.set(JsonUtils.fromJson(jobExecuteContextJson, JobExecuteContext.class)); } } private void afterHandle(Message message) { - log.info("afterHandleMessage"); JobExecuteContextThreadLocalRepo.unset(); } From 3afd17f295b3a608f4979063e28ed4e9fc86d577 Mon Sep 17 00:00:00 2001 From: wangyu096 Date: Wed, 11 Dec 2024 15:26:39 +0800 Subject: [PATCH 3/9] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E5=88=86?= =?UTF-8?q?=E5=BA=93=E5=88=86=E8=A1=A8=E8=BF=81=E7=A7=BB=E8=BF=87=E7=A8=8B?= =?UTF-8?q?=E4=B8=AD=EF=BC=8Ctask=5Finstance=5Fid=20=E5=8A=A8=E6=80=81?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E6=9D=A1=E4=BB=B6=E6=9E=84=E9=80=A0=E9=80=BB?= =?UTF-8?q?=E8=BE=91=20#3324?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dao/impl/TaskInstanceIdDynamicCondition.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java index 1fa3e3e664..f6e72b29a8 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java @@ -53,7 +53,7 @@ public static Condition build(Long taskInstanceId, ToggleEvaluateContext toggleEvaluateContext; JobExecuteContext jobExecuteContext = JobExecuteContextThreadLocalRepo.get(); if (jobExecuteContext == null) { - log.info("TaskInstanceIdDynamicCondition : Empty JobExecuteContext!"); + log.info("TaskInstanceIdDynamicCondition : EmptyJobExecuteContext!"); // JobExecuteContext 正常应该不会为 null 。为了不影响请求正常处理,忽略错误,直接返回 TRUE Condition // (不会影响 DAO 查询,task_instance_id 仅作为分片功能实用,实际业务数据关系并不强依赖 task_instance_id) toggleEvaluateContext = ToggleEvaluateContext.EMPTY; @@ -63,22 +63,23 @@ public static Condition build(Long taskInstanceId, toggleEvaluateContext = ToggleEvaluateContext.builder() .addContextParam(ToggleStrategyContextParams.CTX_PARAM_RESOURCE_SCOPE, resourceScope); } else { - log.info("TaskInstanceIdDynamicCondition : Empty resource scope!"); + log.info("TaskInstanceIdDynamicCondition : EmptyResourceScope!"); toggleEvaluateContext = ToggleEvaluateContext.EMPTY; } } if (FeatureToggle.checkFeature(FeatureIdConstants.DAO_ADD_TASK_INSTANCE_ID, toggleEvaluateContext)) { if (taskInstanceId == null || taskInstanceId <= 0L) { - log.info("TaskInstanceIdDynamicCondition : Invalid taskInstanceId {}", taskInstanceId); + log.info("TaskInstanceIdDynamicCondition : InvalidTaskInstanceId : {}", taskInstanceId); // 为了不影响兼容性,忽略错误 - return DSL.trueCondition(); +// return DSL.trueCondition(); + throw new IllegalStateException("TaskInstanceId required"); } else { - log.debug("TaskInstanceIdDynamicCondition: Use task_instance_id condition"); + log.debug("TaskInstanceIdDynamicCondition: UseTaskInstanceIdCondition"); return taskInstanceIdConditionBuilder.apply(taskInstanceId); } } else { - log.debug("TaskInstanceIdDynamicCondition: Ignore task_instance_id condition"); + log.debug("TaskInstanceIdDynamicCondition: IgnoreTaskInstanceIdCondition"); return DSL.trueCondition(); } } From 736e959f4af6cffffede342175108fdcaf43bdae Mon Sep 17 00:00:00 2001 From: wangyu096 Date: Wed, 11 Dec 2024 15:48:08 +0800 Subject: [PATCH 4/9] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E5=88=86?= =?UTF-8?q?=E5=BA=93=E5=88=86=E8=A1=A8=E8=BF=81=E7=A7=BB=E8=BF=87=E7=A8=8B?= =?UTF-8?q?=E4=B8=AD=EF=BC=8Ctask=5Finstance=5Fid=20=E5=8A=A8=E6=80=81?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E6=9D=A1=E4=BB=B6=E6=9E=84=E9=80=A0=E9=80=BB?= =?UTF-8?q?=E8=BE=91=20#3324?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../execute/api/web/impl/WebTaskExecutionResultResourceImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/api/web/impl/WebTaskExecutionResultResourceImpl.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/api/web/impl/WebTaskExecutionResultResourceImpl.java index f1ddd71a46..5597d120c7 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/api/web/impl/WebTaskExecutionResultResourceImpl.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/api/web/impl/WebTaskExecutionResultResourceImpl.java @@ -1037,6 +1037,7 @@ public Response getStepExecutionResult(String username, String orderField, Integer order) { StepExecutionResultQuery query = StepExecutionResultQuery.builder() + .taskInstanceId(taskInstanceId) .stepInstanceId(stepInstanceId) .executeCount(executeCount) .batch(batch == null ? null : (batch == 0 ? null : batch)) From 0f9c5f7f0a4f96459f017b3d804b734f97c76332 Mon Sep 17 00:00:00 2001 From: wangyu096 Date: Wed, 11 Dec 2024 17:20:49 +0800 Subject: [PATCH 5/9] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E5=88=86?= =?UTF-8?q?=E5=BA=93=E5=88=86=E8=A1=A8=E8=BF=81=E7=A7=BB=E8=BF=87=E7=A8=8B?= =?UTF-8?q?=E4=B8=AD=EF=BC=8Ctask=5Finstance=5Fid=20=E5=8A=A8=E6=80=81?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E6=9D=A1=E4=BB=B6=E6=9E=84=E9=80=A0=E9=80=BB?= =?UTF-8?q?=E8=BE=91=20#3324?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../execute/config/ExecutorConfiguration.java | 41 ++--- .../impl/TaskInstanceIdDynamicCondition.java | 3 +- .../ArtifactoryLocalFilePrepareTask.java | 9 +- .../local/LocalFilePrepareService.java | 9 +- .../execute/util/ContextExecutorService.java | 150 ++++++++++++++++++ 5 files changed, 183 insertions(+), 29 deletions(-) create mode 100644 src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/util/ContextExecutorService.java diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/config/ExecutorConfiguration.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/config/ExecutorConfiguration.java index c63e6c57da..cb90845304 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/config/ExecutorConfiguration.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/config/ExecutorConfiguration.java @@ -26,15 +26,16 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.tencent.bk.job.common.WatchableThreadPoolExecutor; +import com.tencent.bk.job.execute.util.ContextExecutorService; import io.micrometer.core.instrument.MeterRegistry; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @Slf4j @@ -42,9 +43,9 @@ public class ExecutorConfiguration { @Bean("logExportExecutor") - public ThreadPoolExecutor logExportExecutor(MeterRegistry meterRegistry) { + public ExecutorService logExportExecutor(MeterRegistry meterRegistry) { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("log-export-thread-%d").build(); - return new WatchableThreadPoolExecutor( + return ContextExecutorService.wrap(new WatchableThreadPoolExecutor( meterRegistry, "logExportExecutor", 10, @@ -53,12 +54,12 @@ public ThreadPoolExecutor logExportExecutor(MeterRegistry meterRegistry) { TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory - ); + )); } @Bean("getHostsByTopoExecutor") - public ThreadPoolExecutor getHostsByTopoExecutor(MeterRegistry meterRegistry) { - return new WatchableThreadPoolExecutor( + public ExecutorService getHostsByTopoExecutor(MeterRegistry meterRegistry) { + return ContextExecutorService.wrap(new WatchableThreadPoolExecutor( meterRegistry, "getHostsByTopoExecutor", 50, @@ -66,12 +67,12 @@ public ThreadPoolExecutor getHostsByTopoExecutor(MeterRegistry meterRegistry) { 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>() - ); + )); } @Bean("getHostTopoPathExecutor") - public ThreadPoolExecutor getHostTopoPathExecutor(MeterRegistry meterRegistry) { - return new WatchableThreadPoolExecutor( + public ExecutorService getHostTopoPathExecutor(MeterRegistry meterRegistry) { + return ContextExecutorService.wrap(new WatchableThreadPoolExecutor( meterRegistry, "getHostTopoPathExecutor", 5, @@ -87,14 +88,14 @@ public ThreadPoolExecutor getHostTopoPathExecutor(MeterRegistry meterRegistry) { Thread.currentThread().getName()); r.run(); } - ); + )); } @Bean("localFileDownloadExecutor") - public ThreadPoolExecutor localFileDownloadExecutor(LocalFileConfigForExecute localFileConfigForExecute, - MeterRegistry meterRegistry) { + public ExecutorService localFileDownloadExecutor(LocalFileConfigForExecute localFileConfigForExecute, + MeterRegistry meterRegistry) { int concurrency = localFileConfigForExecute.getDownloadConcurrency(); - return new WatchableThreadPoolExecutor( + return ContextExecutorService.wrap(new WatchableThreadPoolExecutor( meterRegistry, "localFileDownloadExecutor", concurrency, @@ -110,12 +111,12 @@ public ThreadPoolExecutor localFileDownloadExecutor(LocalFileConfigForExecute lo Thread.currentThread().getName()); r.run(); } - ); + )); } @Bean("localFileWatchExecutor") - public ThreadPoolExecutor localFileWatchExecutor(MeterRegistry meterRegistry) { - return new WatchableThreadPoolExecutor( + public ExecutorService localFileWatchExecutor(MeterRegistry meterRegistry) { + return ContextExecutorService.wrap(new WatchableThreadPoolExecutor( meterRegistry, "localFileWatchExecutor", 0, @@ -131,12 +132,12 @@ public ThreadPoolExecutor localFileWatchExecutor(MeterRegistry meterRegistry) { Thread.currentThread().getName()); r.run(); } - ); + )); } @Bean("shutdownExecutor") - public ThreadPoolExecutor shutdownExecutor(MeterRegistry meterRegistry) { - return new WatchableThreadPoolExecutor( + public ExecutorService shutdownExecutor(MeterRegistry meterRegistry) { + return ContextExecutorService.wrap(new WatchableThreadPoolExecutor( meterRegistry, "shutdownExecutor", 10, @@ -144,6 +145,6 @@ public ThreadPoolExecutor shutdownExecutor(MeterRegistry meterRegistry) { 120, TimeUnit.SECONDS, new LinkedBlockingQueue<>() - ); + )); } } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java index f6e72b29a8..e2679ffdf1 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java @@ -56,7 +56,8 @@ public static Condition build(Long taskInstanceId, log.info("TaskInstanceIdDynamicCondition : EmptyJobExecuteContext!"); // JobExecuteContext 正常应该不会为 null 。为了不影响请求正常处理,忽略错误,直接返回 TRUE Condition // (不会影响 DAO 查询,task_instance_id 仅作为分片功能实用,实际业务数据关系并不强依赖 task_instance_id) - toggleEvaluateContext = ToggleEvaluateContext.EMPTY; +// toggleEvaluateContext = ToggleEvaluateContext.EMPTY; + throw new IllegalStateException("EmptyJobExecuteContext"); } else { ResourceScope resourceScope = jobExecuteContext.getResourceScope(); if (resourceScope != null) { diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/local/ArtifactoryLocalFilePrepareTask.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/local/ArtifactoryLocalFilePrepareTask.java index e23ac02d97..74064cc652 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/local/ArtifactoryLocalFilePrepareTask.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/local/ArtifactoryLocalFilePrepareTask.java @@ -34,6 +34,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; @@ -52,8 +53,8 @@ public class ArtifactoryLocalFilePrepareTask implements JobTaskContext { private final String artifactoryRepo; private final String jobStorageRootPath; private final List> futureList = new ArrayList<>(); - private final ThreadPoolExecutor localFileDownloadExecutor; - private final ThreadPoolExecutor localFileWatchExecutor; + private final ExecutorService localFileDownloadExecutor; + private final ExecutorService localFileWatchExecutor; public static Future localFileWatchFuture = null; public ArtifactoryLocalFilePrepareTask( @@ -65,8 +66,8 @@ public ArtifactoryLocalFilePrepareTask( String artifactoryProject, String artifactoryRepo, String jobStorageRootPath, - ThreadPoolExecutor localFileDownloadExecutor, - ThreadPoolExecutor localFileWatchExecutor + ExecutorService localFileDownloadExecutor, + ExecutorService localFileWatchExecutor ) { this.stepInstance = stepInstance; this.isForRetry = isForRetry; diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/local/LocalFilePrepareService.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/local/LocalFilePrepareService.java index c738e3c17b..261be91f7e 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/local/LocalFilePrepareService.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/local/LocalFilePrepareService.java @@ -47,6 +47,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; @Slf4j @@ -60,8 +61,8 @@ public class LocalFilePrepareService { private final StepInstanceService stepInstanceService; private final ArtifactoryClient artifactoryClient; private final Map taskMap = new ConcurrentHashMap<>(); - private final ThreadPoolExecutor localFileDownloadExecutor; - private final ThreadPoolExecutor localFileWatchExecutor; + private final ExecutorService localFileDownloadExecutor; + private final ExecutorService localFileWatchExecutor; @Autowired public LocalFilePrepareService(FileDistributeConfig fileDistributeConfig, @@ -70,8 +71,8 @@ public LocalFilePrepareService(FileDistributeConfig fileDistributeConfig, AgentService agentService, StepInstanceService stepInstanceService, @Qualifier("jobArtifactoryClient") ArtifactoryClient artifactoryClient, - @Qualifier("localFileDownloadExecutor") ThreadPoolExecutor localFileDownloadExecutor, - @Qualifier("localFileWatchExecutor") ThreadPoolExecutor localFileWatchExecutor) { + @Qualifier("localFileDownloadExecutor") ExecutorService localFileDownloadExecutor, + @Qualifier("localFileWatchExecutor") ExecutorService localFileWatchExecutor) { this.fileDistributeConfig = fileDistributeConfig; this.artifactoryConfig = artifactoryConfig; this.localFileConfigForExecute = localFileConfigForExecute; diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/util/ContextExecutorService.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/util/ContextExecutorService.java new file mode 100644 index 0000000000..87fffd0d22 --- /dev/null +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/util/ContextExecutorService.java @@ -0,0 +1,150 @@ +package com.tencent.bk.job.execute.util; + +import com.tencent.bk.job.execute.common.context.JobExecuteContext; +import com.tencent.bk.job.execute.common.context.JobExecuteContextThreadLocalRepo; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * 支持上下文传播的 ExecutorService + */ +public class ContextExecutorService implements ExecutorService { + + private final ExecutorService delegate; + + private ContextExecutorService(ExecutorService delegate) { + this.delegate = delegate; + } + + public static ContextExecutorService wrap(ExecutorService delegate) { + return new ContextExecutorService(delegate); + } + + + public Future submit(Callable task) { + return this.delegate.submit(ContextCallable.wrap(task)); + } + + public Future submit(Runnable task, T result) { + return this.delegate.submit(ContextRunnable.wrap(task), result); + } + + public Future submit(Runnable task) { + return this.delegate.submit(ContextRunnable.wrap(task)); + } + + public List> invokeAll(Collection> tasks) throws InterruptedException { + return this.delegate.invokeAll(ContextCallable.wrap(tasks)); + } + + public List> invokeAll(Collection> tasks, long timeout, + TimeUnit unit) throws InterruptedException { + return this.delegate.invokeAll(ContextCallable.wrap(tasks), timeout, unit); + } + + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + return this.delegate.invokeAny(ContextCallable.wrap(tasks)); + } + + public T invokeAny(Collection> tasks, long timeout, + TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return this.delegate.invokeAny(ContextCallable.wrap(tasks), timeout, unit); + } + + public void execute(Runnable command) { + this.delegate.execute(ContextRunnable.wrap(command)); + } + + public final void shutdown() { + this.delegate.shutdown(); + } + + public final List shutdownNow() { + return this.delegate.shutdownNow(); + } + + public final boolean isShutdown() { + return this.delegate.isShutdown(); + } + + public final boolean isTerminated() { + return this.delegate.isTerminated(); + } + + public final boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return this.delegate.awaitTermination(timeout, unit); + } + + private static class ContextCallable implements Callable { + + private final JobExecuteContext context; + private final Callable delegate; + + + private ContextCallable(Callable delegate) { + this.context = JobExecuteContextThreadLocalRepo.get(); + this.delegate = delegate; + } + + public static ContextCallable wrap(Callable delegate) { + return new ContextCallable<>(delegate); + } + + public static List> wrap(Collection> delegates) { + List> contextCallables = new ArrayList<>(delegates.size()); + delegates.forEach(delegate -> contextCallables.add(new ContextCallable<>(delegate))); + return contextCallables; + } + + @Override + public T call() throws Exception { + try { + JobExecuteContextThreadLocalRepo.set(context); + return delegate.call(); + } finally { + JobExecuteContextThreadLocalRepo.unset(); + } + } + } + + private static class ContextRunnable implements Runnable { + + private final JobExecuteContext context; + private final Runnable delegate; + + + private ContextRunnable(Runnable delegate) { + this.context = JobExecuteContextThreadLocalRepo.get(); + this.delegate = delegate; + } + + public static ContextRunnable wrap(Runnable delegate) { + return new ContextRunnable(delegate); + } + + public static List wrap(Collection delegates) { + List contextRunnableList = new ArrayList<>(delegates.size()); + delegates.forEach(delegate -> contextRunnableList.add(new ContextRunnable(delegate))); + return contextRunnableList; + } + + @Override + public void run() { + try { + JobExecuteContextThreadLocalRepo.set(context); + delegate.run(); + } finally { + JobExecuteContextThreadLocalRepo.unset(); + } + } + } +} + From 6aba6c014d011538f9d744789af52281d3ce34bd Mon Sep 17 00:00:00 2001 From: wangyu096 Date: Wed, 11 Dec 2024 17:39:40 +0800 Subject: [PATCH 6/9] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E5=88=86?= =?UTF-8?q?=E5=BA=93=E5=88=86=E8=A1=A8=E8=BF=81=E7=A7=BB=E8=BF=87=E7=A8=8B?= =?UTF-8?q?=E4=B8=AD=EF=BC=8Ctask=5Finstance=5Fid=20=E5=8A=A8=E6=80=81?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E6=9D=A1=E4=BB=B6=E6=9E=84=E9=80=A0=E9=80=BB?= =?UTF-8?q?=E8=BE=91=20#3324?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../job/execute/dao/impl/TaskInstanceIdDynamicCondition.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java index e2679ffdf1..f6e72b29a8 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java @@ -56,8 +56,7 @@ public static Condition build(Long taskInstanceId, log.info("TaskInstanceIdDynamicCondition : EmptyJobExecuteContext!"); // JobExecuteContext 正常应该不会为 null 。为了不影响请求正常处理,忽略错误,直接返回 TRUE Condition // (不会影响 DAO 查询,task_instance_id 仅作为分片功能实用,实际业务数据关系并不强依赖 task_instance_id) -// toggleEvaluateContext = ToggleEvaluateContext.EMPTY; - throw new IllegalStateException("EmptyJobExecuteContext"); + toggleEvaluateContext = ToggleEvaluateContext.EMPTY; } else { ResourceScope resourceScope = jobExecuteContext.getResourceScope(); if (resourceScope != null) { From 2ed051732363a5c55caa94e13f499ed920bb873b Mon Sep 17 00:00:00 2001 From: wangyu096 Date: Wed, 11 Dec 2024 19:08:56 +0800 Subject: [PATCH 7/9] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E5=88=86?= =?UTF-8?q?=E5=BA=93=E5=88=86=E8=A1=A8=E8=BF=81=E7=A7=BB=E8=BF=87=E7=A8=8B?= =?UTF-8?q?=E4=B8=AD=EF=BC=8Ctask=5Finstance=5Fid=20=E5=8A=A8=E6=80=81?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E6=9D=A1=E4=BB=B6=E6=9E=84=E9=80=A0=E9=80=BB?= =?UTF-8?q?=E8=BE=91=20#3324?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bk/job/execute/engine/result/ResultHandleManager.java | 3 +++ .../engine/result/ScheduledContinuousResultHandleTask.java | 2 ++ 2 files changed, 5 insertions(+) diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/ResultHandleManager.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/ResultHandleManager.java index 006981bd15..58084726ac 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/ResultHandleManager.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/ResultHandleManager.java @@ -394,7 +394,9 @@ private static final class StopTask implements Runnable { @Override public void run() { Span span = tracer1.nextSpan(task.getTraceContext()).name("stop-task"); + try (Tracer.SpanInScope ignored = tracer1.withSpan(span.start())) { + JobExecuteContextThreadLocalRepo.set(task.getJobExecuteContext()); log.info("Begin to stop task, task: {}", task.getResultHandleTask()); task.getResultHandleTask().stop(); log.info("Stop task successfully, task: {}", task.getResultHandleTask()); @@ -406,6 +408,7 @@ public void run() { if (span != null) { span.end(); } + JobExecuteContextThreadLocalRepo.unset(); } } } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/ScheduledContinuousResultHandleTask.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/ScheduledContinuousResultHandleTask.java index 69a086419c..f4135348c5 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/ScheduledContinuousResultHandleTask.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/ScheduledContinuousResultHandleTask.java @@ -30,6 +30,7 @@ import com.tencent.bk.job.execute.engine.result.ha.ResultHandleLimiter; import com.tencent.bk.job.execute.engine.result.ha.ResultHandleTaskKeepaliveManager; import com.tencent.bk.job.execute.monitor.ExecuteMetricNames; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.sleuth.Span; import org.springframework.cloud.sleuth.Tracer; @@ -77,6 +78,7 @@ public class ScheduledContinuousResultHandleTask extends DelayedTask { /** * 作业执行上下文信息 */ + @Getter private final JobExecuteContext jobExecuteContext; /** From 851ea32708b9d4494413da480f2fff2ef682d8b7 Mon Sep 17 00:00:00 2001 From: wangyu096 Date: Thu, 12 Dec 2024 09:22:42 +0800 Subject: [PATCH 8/9] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E5=88=86?= =?UTF-8?q?=E5=BA=93=E5=88=86=E8=A1=A8=E8=BF=81=E7=A7=BB=E8=BF=87=E7=A8=8B?= =?UTF-8?q?=E4=B8=AD=EF=BC=8Ctask=5Finstance=5Fid=20=E5=8A=A8=E6=80=81?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E6=9D=A1=E4=BB=B6=E6=9E=84=E9=80=A0=E9=80=BB?= =?UTF-8?q?=E8=BE=91=20#3324?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../execute/dao/impl/TaskInstanceIdDynamicCondition.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java index f6e72b29a8..8f82fbc9fa 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java @@ -72,14 +72,15 @@ public static Condition build(Long taskInstanceId, if (taskInstanceId == null || taskInstanceId <= 0L) { log.info("TaskInstanceIdDynamicCondition : InvalidTaskInstanceId : {}", taskInstanceId); // 为了不影响兼容性,忽略错误 -// return DSL.trueCondition(); - throw new IllegalStateException("TaskInstanceId required"); + return DSL.trueCondition(); } else { - log.debug("TaskInstanceIdDynamicCondition: UseTaskInstanceIdCondition"); + // 为了便于观察和排查,暂时设定为 INFO 级别,等后续正式交付再改成 DEBUG + log.info("TaskInstanceIdDynamicCondition: UseTaskInstanceIdCondition"); return taskInstanceIdConditionBuilder.apply(taskInstanceId); } } else { - log.debug("TaskInstanceIdDynamicCondition: IgnoreTaskInstanceIdCondition"); + // 为了便于观察和排查,暂时设定为 INFO 级别,等后续正式交付再改成 DEBUG + log.info("TaskInstanceIdDynamicCondition: IgnoreTaskInstanceIdCondition"); return DSL.trueCondition(); } } From 83828ebccade74d6e820e63a79653bf3388fd093 Mon Sep 17 00:00:00 2001 From: wangyu096 Date: Thu, 12 Dec 2024 11:58:01 +0800 Subject: [PATCH 9/9] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E5=88=86?= =?UTF-8?q?=E5=BA=93=E5=88=86=E8=A1=A8=E8=BF=81=E7=A7=BB=E8=BF=87=E7=A8=8B?= =?UTF-8?q?=E4=B8=AD=EF=BC=8Ctask=5Finstance=5Fid=20=E5=8A=A8=E6=80=81?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E6=9D=A1=E4=BB=B6=E6=9E=84=E9=80=A0=E9=80=BB?= =?UTF-8?q?=E8=BE=91=20#3324?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/ai/context/impl/TaskContextServiceImpl.java | 1 + .../execute/api/inner/ServiceStepInstanceResource.java | 5 ++++- .../api/inner/ServiceStepInstanceResourceImpl.java | 8 ++++++-- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/backend/job-analysis/service-job-analysis/src/main/java/com/tencent/bk/job/analysis/service/ai/context/impl/TaskContextServiceImpl.java b/src/backend/job-analysis/service-job-analysis/src/main/java/com/tencent/bk/job/analysis/service/ai/context/impl/TaskContextServiceImpl.java index 05c90495fd..5e13d289d8 100644 --- a/src/backend/job-analysis/service-job-analysis/src/main/java/com/tencent/bk/job/analysis/service/ai/context/impl/TaskContextServiceImpl.java +++ b/src/backend/job-analysis/service-job-analysis/src/main/java/com/tencent/bk/job/analysis/service/ai/context/impl/TaskContextServiceImpl.java @@ -62,6 +62,7 @@ public TaskContext getTaskContext(String username, TaskContextQuery contextQuery InternalResponse resp = serviceStepInstanceResource.getStepInstance( username, contextQuery.getAppId(), + contextQuery.getTaskInstanceId(), contextQuery.getStepInstanceId() ); if (resp.isSuccess()) { diff --git a/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/api/inner/ServiceStepInstanceResource.java b/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/api/inner/ServiceStepInstanceResource.java index cd75c5aaf2..396edc5efa 100644 --- a/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/api/inner/ServiceStepInstanceResource.java +++ b/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/api/inner/ServiceStepInstanceResource.java @@ -41,13 +41,16 @@ @SmartFeignClient(value = "job-execute", contextId = "stepInstanceResource") @InternalAPI public interface ServiceStepInstanceResource { - @GetMapping("/service/stepInstance/app/{appId}/stepInstanceId/{stepInstanceId}") + @GetMapping("/service/app/{appId}/taskInstance/{taskInstanceId}/stepInstance/{stepInstanceId}") InternalResponse getStepInstance( @RequestHeader("username") String username, @ApiParam(value = "作业平台业务ID", required = true) @PathVariable(value = "appId") Long appId, + @ApiParam(value = "作业实例ID", name = "taskInstanceId", required = true) + @PathVariable("taskInstanceId") + Long taskInstanceId, @ApiParam(value = "步骤实例ID", name = "stepInstanceId", required = true) @PathVariable("stepInstanceId") Long stepInstanceId); diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/api/inner/ServiceStepInstanceResourceImpl.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/api/inner/ServiceStepInstanceResourceImpl.java index 1b9fb10d5b..4b0177b1ea 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/api/inner/ServiceStepInstanceResourceImpl.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/api/inner/ServiceStepInstanceResourceImpl.java @@ -52,9 +52,13 @@ public ServiceStepInstanceResourceImpl(TaskInstanceAccessProcessor taskInstanceA } @Override - public InternalResponse getStepInstance(String username, Long appId, Long stepInstanceId) { + public InternalResponse getStepInstance(String username, + Long appId, + Long taskInstanceId, + Long stepInstanceId) { try { - StepInstanceDTO stepInstance = stepInstanceService.getStepInstanceDetail(appId, stepInstanceId); + StepInstanceDTO stepInstance = stepInstanceService.getStepInstanceDetail(appId, + taskInstanceId, stepInstanceId); taskInstanceAccessProcessor.processBeforeAccess(username, appId, stepInstance.getTaskInstanceId()); return InternalResponse.buildSuccessResp(stepInstance.toServiceStepInstanceDTO()); } catch (NotFoundException e) {