From 603bfd263c55fcaf1100226fc1a4508b8d8964da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cv=5Fkkhuang=E2=80=9D?= <“420895376@qq.com”> Date: Thu, 8 Aug 2024 15:00:42 +0800 Subject: [PATCH 01/16] Resource reset code optimization --- .../manager/am/restful/EMRestfulApi.java | 13 +- .../am/service/em/DefaultEMInfoService.scala | 142 +++++++++++------- .../manager/label/utils/LabelUtil.scala | 21 +++ .../package/admin/clear_resource_set.sh | 55 +++++++ .../monitor/scheduled/ResourceClear.java | 29 ++++ 5 files changed, 199 insertions(+), 61 deletions(-) create mode 100644 linkis-dist/package/admin/clear_resource_set.sh create mode 100644 linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/ResourceClear.java diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java index 02573a8482..e5d6f9d25e 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java @@ -577,16 +577,23 @@ public Message taskprediction( .data("yarnResource", canCreateECRes.getYarnResource()) .data("checkResult", canCreateECRes.isCanCreateEC()); } - + @ApiOperation( + value = "reset resource", + notes = "ecm & user resource reset", + response = Message.class) + @ApiImplicitParams({ + @ApiImplicitParam(name = "serviceInstance", dataType = "String", example = "gz.bdz.bdplxxxxx.webank:9102"), + @ApiImplicitParam(name = "username", dataType = "String", example = "hadoop") + }) @RequestMapping(path = "/reset-resource", method = RequestMethod.GET) public Message resetResource( HttpServletRequest req, @RequestParam(value = "serviceInstance", required = false) String serviceInstance, @RequestParam(value = "username", required = false) String username) { - String loginUser = ModuleUserUtils.getOperationUser(req, "taskprediction"); + String loginUser = ModuleUserUtils.getOperationUser(req, "reset resource"); if (Configuration.isNotAdmin(loginUser)) { - return Message.error("Only admin can use reset resource (重置资源仅管理员使用)"); + return Message.error("Only Admin Can Use Reset Resource (重置资源仅管理员使用)"); } emInfoService.resetResource(serviceInstance, username); return Message.ok(); diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala index 298a05246b..f098256821 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala @@ -155,24 +155,40 @@ class DefaultEMInfoService extends EMInfoService with Logging { } // 遍历处理ECM filteredECMs.foreach { ecmInstance => - // 获取ecm下所有node - val nodeResource = - engineInfoService.listEMEngines(ecmInstance).asScala.map(_.getNodeResource).toArray - // 收集所有node所使用的资源(汇总、已使用、上锁) - val (realSumResource, useResource, lockResource) = - collectResource(nodeResource, ResourceType.LoadInstance) - // 收集ECM资源 - val ecmResource = - ecmInstance.getNodeResource.getUsedResource + ecmInstance.getNodeResource.getLockedResource - // 资源对比,资源重置 - if (!(ecmResource == realSumResource)) { - // lock ECMInstance - val lock = - resourceManager.tryLockOneLabel(ecmInstance.getLabels.head, -1, Utils.getJvmUser) - // set unhealthy - engineInfoService - .updateEngineHealthyStatus(ecmInstance.getServiceInstance, NodeHealthy.UnHealthy) - Utils.tryFinally { + // lock ECMInstance && set unhealthy + logger.info( + MessageFormat.format( + s"ECM:{0} will be marked as unhealthy and locked", + ecmInstance.getServiceInstance.getInstance + ) + ) + val lock = + resourceManager.tryLockOneLabel(ecmInstance.getLabels.head, -1, Utils.getJvmUser) + engineInfoService + .updateEngineHealthyStatus(ecmInstance.getServiceInstance, NodeHealthy.UnHealthy) + if (StringUtils.isNotBlank(serviceInstance)) { + Thread.sleep(180000) + } + Utils.tryFinally { + // 获取ecm下所有node + val nodeResource = + engineInfoService.listEMEngines(ecmInstance).asScala.map(_.getNodeResource).toArray + // 收集所有node所使用的资源(汇总、已使用、上锁) + val (realSumResource, useResource, lockResource) = + collectResource(nodeResource, ResourceType.LoadInstance) + // 收集ECM资源 + val ecmResource = + ecmInstance.getNodeResource.getUsedResource + ecmInstance.getNodeResource.getLockedResource + // 资源对比,资源重置 + if (!(ecmResource == realSumResource)) { + logger.info( + MessageFormat.format( + "ECM:{0} resources will be reset, Record Resources:{1} ,Real Resources:{2}", + ecmInstance.getServiceInstance.getInstance, + ecmResource, + realSumResource + ) + ) val ecmNodeResource = ecmInstance.getNodeResource ecmNodeResource.setLockedResource(lockResource) ecmNodeResource.setLeftResource(ecmNodeResource.getMaxResource - realSumResource) @@ -180,11 +196,17 @@ class DefaultEMInfoService extends EMInfoService with Logging { val persistence = ResourceUtils.toPersistenceResource(ecmInstance.getNodeResource) val resourceLabel = labelManagerPersistence.getLabelByResource(persistence) resourceManager.resetResource(resourceLabel.head, ecmNodeResource) - } { - resourceManager.unLock(lock) - engineInfoService - .updateEngineHealthyStatus(ecmInstance.getServiceInstance, NodeHealthy.Healthy) } + } { + logger.info( + MessageFormat.format( + s"ECM:{0} will be marked as healthy and the lock will be released", + ecmInstance.getServiceInstance.getInstance + ) + ) + resourceManager.unLock(lock) + engineInfoService + .updateEngineHealthyStatus(ecmInstance.getServiceInstance, NodeHealthy.Healthy) } } } @@ -209,52 +231,56 @@ class DefaultEMInfoService extends EMInfoService with Logging { val userLabelResources = resourceManagerPersistence.getResourceByLabels(userLabels).asScala // 遍历用户标签资源 userLabelResources.foreach { userLabelResource => - val userPersistenceResource = ResourceUtils.fromPersistenceResource(userLabelResource) - val userLabelResourceSum = - userPersistenceResource.getUsedResource + userPersistenceResource.getLockedResource - val (userResourceType, matchResult) = userLabelResource.getResourceType match { - case "LoadInstance" => - ( - ResourceType.LoadInstance, - !(userLabelResourceSum == Resource.initResource(ResourceType.LoadInstance)) - ) - case "DriverAndYarn" => - ( - ResourceType.DriverAndYarn, - userLabelResourceSum.moreThan(Resource.initResource(ResourceType.DriverAndYarn)) - ) + val labelUser = LabelUtil.getFromLabelStr(userLabelResource.getCreator, "user") + val creator = LabelUtil.getFromLabelStr(userLabelResource.getCreator, "creator") + val engine = LabelUtil.getFromLabelStr(userLabelResource.getCreator, "engine") + val resourceLabel = labelManagerPersistence.getLabelByResource(userLabelResource) + resourceLabel.head.setStringValue(userLabelResource.getCreator) + // lock userCreatorEngineTypeLabel + val lock = resourceManager.tryLockOneLabel(resourceLabel.head, -1, labelUser) + if (StringUtils.isNotBlank(username) && creator.equals("IDE") && engine.equals("hive")) { + Thread.sleep(180000) } - if (matchResult) { - val labelUser = userLabelResource.getCreator.split("-").head - val userEngineNodes = nodeLabelService.getEngineNodesWithResourceByUser(labelUser, true) - val userEngineNodeFilter = userEngineNodes - .filter { node => - val userCreatorLabelStr = LabelUtil.getUserCreatorLabel(node.getLabels).getStringValue - val engineTypeLabelStr = LabelUtil.getEngineTypeLabel(node.getLabels).getStringValue - userLabelResource.getCreator.equalsIgnoreCase( - s"${userCreatorLabelStr},${engineTypeLabelStr}" + Utils.tryFinally { + val userPersistenceResource = ResourceUtils.fromPersistenceResource(userLabelResource) + val userLabelResourceSum = + userPersistenceResource.getUsedResource + userPersistenceResource.getLockedResource + val userResourceType = ResourceType.valueOf(userLabelResource.getResourceType) + val matchResult = userLabelResourceSum.caseMore(Resource.initResource(userResourceType)) + if (matchResult) { + val userEngineNodes = nodeLabelService.getEngineNodesWithResourceByUser(labelUser, true) + val userEngineNodeFilter = userEngineNodes + .filter { node => + val userCreatorLabelStr = + LabelUtil.getUserCreatorLabel(node.getLabels).getStringValue + val engineTypeLabelStr = LabelUtil.getEngineTypeLabel(node.getLabels).getStringValue + userLabelResource.getCreator.equalsIgnoreCase( + s"${userCreatorLabelStr},${engineTypeLabelStr}" + ) + } + .map(_.getNodeResource) + // 收集所有node所使用的资源(汇总、已使用、上锁) + val (sumResource, uedResource, lockResource) = + collectResource(userEngineNodeFilter, userResourceType) + if (!(sumResource == userLabelResourceSum)) { + logger.info( + MessageFormat.format( + "LabelUser:{0} resources will be reset, Record Resources:{1} ,Real Resources:{2}", + labelUser, + userLabelResourceSum, + sumResource + ) ) - } - .map(_.getNodeResource) - // 收集所有node所使用的资源(汇总、已使用、上锁) - val (sumResource, uedResource, lockResource) = - collectResource(userEngineNodeFilter, userResourceType) - if (!(sumResource == userLabelResourceSum)) { - logger.info("正在执行用户资源重置") - val resourceLabel = labelManagerPersistence.getLabelByResource(userLabelResource) - // lock userCreatorEngineTypeLabel - val lock = resourceManager.tryLockOneLabel(resourceLabel.head, -1, labelUser) - Utils.tryFinally { userPersistenceResource.setLeftResource( userPersistenceResource.getMaxResource - sumResource ) userPersistenceResource.setUsedResource(uedResource) userPersistenceResource.setLockedResource(lockResource) resourceManager.resetResource(resourceLabel.head, userPersistenceResource) - } { - resourceManager.unLock(lock) } } + } { + resourceManager.unLock(lock) } } } diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala index 63fa377d0b..557104ff32 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala @@ -31,6 +31,8 @@ import org.apache.linkis.manager.label.entity.entrance.{ } import org.apache.linkis.manager.label.entity.route.RouteLabel +import org.apache.commons.lang3.StringUtils + import java.util import scala.collection.JavaConverters.asScalaBufferConverter @@ -144,4 +146,23 @@ object LabelUtil { null.asInstanceOf[A] } + def getFromLabelStr(labelStr: String, key: String): String = { + // hadoop-IDE,hive-2.3.3 or hadoop-IDE or hive-2.3.3 + if (StringUtils.isNotBlank(labelStr)) { + val labelArray = labelStr.split(",") + (labelArray.length, key.toLowerCase()) match { + case (1, "user") => labelStr.split("-")(0) + case (1, "creator") => labelStr.split("-")(1) + case (1, "engine") => labelStr.split("-")(0) + case (1, "version") => labelStr.split("-")(1) + case (2, "user") => labelArray(0).split("-")(0) + case (2, "creator") => labelArray(0).split("-")(1) + case (2, "engine") => labelArray(1).split("-")(0) + case (2, "version") => labelArray(1).split("-")(1) + } + } else { + "" + } + } + } diff --git a/linkis-dist/package/admin/clear_resource_set.sh b/linkis-dist/package/admin/clear_resource_set.sh new file mode 100644 index 0000000000..b8976e3327 --- /dev/null +++ b/linkis-dist/package/admin/clear_resource_set.sh @@ -0,0 +1,55 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# description: +# Clean up the resource script, call linkismanager to reset the resource interface, +# parameters: serviceInstance, username, +# demo: sh clear_resource_set.sh bdpdws110002:9102 hadoop +# demo: sh clear_resource_set.sh "" hduser0102 +# + +cd "$(dirname "$0")" +cd .. +INSTALL_HOME="$(pwd)" + +# Set LINKIS_HOME +if [ -z "$LINKIS_HOME" ]; then + export LINKIS_HOME="$INSTALL_HOME" +fi + +# Set LINKIS_CONF_DIR +if [ -z "$LINKIS_CONF_DIR" ]; then + export LINKIS_CONF_DIR="$LINKIS_HOME/conf" +fi + +# Read configuration +linkisMainConf="$LINKIS_CONF_DIR/linkis.properties" +gatewayUrl=$(grep '^wds.linkis.gateway.url' "$linkisMainConf" | cut -d "=" -f2) + +echo "gatewayUrl: $gatewayUrl" + +# Parse command-line arguments +serviceInstance=$1 +username=$2 + +# Construct request URL +requestUrl="$gatewayUrl/api/rest_j/v1/linkisManager/reset-resource?serviceInstance=$serviceInstance&username=$username" +echo "requestUrl: $requestUrl" +# Execute request +response=$(curl --silent --location --request GET "$requestUrl" -H "Token-Code:LINKIS-AUTH-eTaYLbQpmIulPyrXcMl" -H "Token-User:hadoop") + +# Print response +echo "Response: $response" diff --git a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/ResourceClear.java b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/ResourceClear.java new file mode 100644 index 0000000000..6bbd07935e --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/ResourceClear.java @@ -0,0 +1,29 @@ +package org.apache.linkis.monitor.scheduled; + +import org.apache.linkis.monitor.config.MonitorConfig; +import org.apache.linkis.monitor.until.ThreadUtils; +import org.apache.linkis.monitor.utils.log.LogUtils; +import org.slf4j.Logger; +import org.springframework.context.annotation.PropertySource; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; + +@Component +@PropertySource(value = "classpath:linkis-et-monitor.properties", encoding = "UTF-8") +public class ResourceClear { + private static final Logger logger = LogUtils.stdOutLogger(); + @Scheduled(cron = "${linkis.monitor.clear.resource.reset.cron:0 30 18 * * ?}") + public void ResourceReset() { + logger.info("Start to clear_resource_set shell"); + List cmdlist = new ArrayList<>(); + cmdlist.add("sh"); + cmdlist.add(MonitorConfig.shellPath + "clear_resource_set.sh"); + logger.info("clear_resource_set shell command {}", cmdlist); + String exec = ThreadUtils.run(cmdlist, "clear_resource_set.sh"); + logger.info("shell log {}", exec); + logger.info("End to clear_resource_set shell "); + } +} From 1ccabc2af073434f3ed856c7183af6bbd1c9d332 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cv=5Fkkhuang=E2=80=9D?= <“420895376@qq.com”> Date: Thu, 8 Aug 2024 15:06:22 +0800 Subject: [PATCH 02/16] Resource reset code optimization --- .../manager/am/service/em/DefaultEMInfoService.scala | 8 -------- 1 file changed, 8 deletions(-) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala index f098256821..b5a71a8c2d 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala @@ -166,9 +166,6 @@ class DefaultEMInfoService extends EMInfoService with Logging { resourceManager.tryLockOneLabel(ecmInstance.getLabels.head, -1, Utils.getJvmUser) engineInfoService .updateEngineHealthyStatus(ecmInstance.getServiceInstance, NodeHealthy.UnHealthy) - if (StringUtils.isNotBlank(serviceInstance)) { - Thread.sleep(180000) - } Utils.tryFinally { // 获取ecm下所有node val nodeResource = @@ -232,15 +229,10 @@ class DefaultEMInfoService extends EMInfoService with Logging { // 遍历用户标签资源 userLabelResources.foreach { userLabelResource => val labelUser = LabelUtil.getFromLabelStr(userLabelResource.getCreator, "user") - val creator = LabelUtil.getFromLabelStr(userLabelResource.getCreator, "creator") - val engine = LabelUtil.getFromLabelStr(userLabelResource.getCreator, "engine") val resourceLabel = labelManagerPersistence.getLabelByResource(userLabelResource) resourceLabel.head.setStringValue(userLabelResource.getCreator) // lock userCreatorEngineTypeLabel val lock = resourceManager.tryLockOneLabel(resourceLabel.head, -1, labelUser) - if (StringUtils.isNotBlank(username) && creator.equals("IDE") && engine.equals("hive")) { - Thread.sleep(180000) - } Utils.tryFinally { val userPersistenceResource = ResourceUtils.fromPersistenceResource(userLabelResource) val userLabelResourceSum = From 7ef81d40d786ad51ee216d24aba6dc6365302c5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cv=5Fkkhuang=E2=80=9D?= <“420895376@qq.com”> Date: Thu, 8 Aug 2024 20:33:02 +0800 Subject: [PATCH 03/16] Resource johhistory code optimization --- .../jobhistory/dao/JobHistoryMapper.java | 2 ++ .../linkis/jobhistory/entity/QueryTaskVO.java | 16 +++++++++--- .../restful/api/QueryRestfulApi.java | 26 ++++++++++++------- .../jobhistory/util/JobhistoryUtils.java | 15 ++++++++--- .../mapper/common/JobHistoryMapper.xml | 23 ++++++++++++++-- .../conversions/TaskConversions.scala | 15 ++++++----- .../service/JobHistoryQueryService.java | 2 ++ .../impl/JobHistoryQueryServiceImpl.scala | 17 ++++++++++++ 8 files changed, 90 insertions(+), 26 deletions(-) diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java index a6c58f5c27..a0d27ed480 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java @@ -127,4 +127,6 @@ void updateJobHistoryCancelById( List selectJobHistoryByTaskidList( @Param("idList") List idList, @Param("umUser") String username); + + List selectJobHistoryNoMetrics(JobHistory jobReq); } diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/entity/QueryTaskVO.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/entity/QueryTaskVO.java index 90007bece8..6cc9714f63 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/entity/QueryTaskVO.java +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/entity/QueryTaskVO.java @@ -59,11 +59,13 @@ public class QueryTaskVO { private boolean canRetry; private String observeInfo; - private String isReuse; + private Boolean isReuse; private Date requestStartTime; private Date requestEndTime; private Long requestSpendTime; + private String metrics; + public List getSubJobs() { return subJobs; } @@ -298,11 +300,11 @@ public void setExecuteUser(String executeUser) { this.executeUser = executeUser; } - public String getIsReuse() { + public Boolean getIsReuse() { return isReuse; } - public void setIsReuse(String isReuse) { + public void setIsReuse(Boolean isReuse) { this.isReuse = isReuse; } @@ -329,4 +331,12 @@ public Long getRequestSpendTime() { public void setRequestSpendTime(Long requestSpendTime) { this.requestSpendTime = requestSpendTime; } + + public String getMetrics() { + return metrics; + } + + public void setMetrics(String metrics) { + this.metrics = metrics; + } } diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java index 8f5cdedd60..efd27e32a7 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java @@ -106,10 +106,11 @@ public Message getTaskByID(HttpServletRequest req, @PathVariable("id") Long jobI || Configuration.isDepartmentAdmin(username)) { username = null; } - JobHistory jobHistory = jobHistoryQueryService.getJobHistoryByIdAndName(jobId, username); + JobHistory jobHistory = + jobHistoryQueryService.getJobHistoryByIdAndNameNoMetrics(jobId, username); try { - if (JobhistoryConfiguration.JOB_HISTORY_QUERY_EXECUTION_CODE_SWITCH() && null != jobHistory) { + if (null != jobHistory) { QueryUtils.exchangeExecutionCode(jobHistory); } } catch (Exception e) { @@ -502,17 +503,22 @@ public Message jobeExtraInfo( username = null; } JobHistory jobHistory = jobHistoryQueryService.getJobHistoryByIdAndName(jobId, username); - String runtime = TaskConversions.getJobRuntime(jobHistory); - try { - if (null != jobHistory) { + if (jobHistory == null) { + return Message.error( + "The corresponding job was not found, or there may be no permission to view the job" + + "(没有找到对应的job,也可能是没有查看该job的权限)"); + } else { + try { QueryUtils.exchangeExecutionCode(jobHistory); + } catch (Exception e) { + log.error("Exchange executionCode for job with id : {} failed, {}", jobHistory.getId(), e); } - } catch (Exception e) { - log.error("Exchange executionCode for job with id : {} failed, {}", jobHistory.getId(), e); } - return Message.ok() - .data("runtime", runtime) - .data("executionCode", jobHistory.getExecutionCode()); + Map metricsMap = + BDPJettyServerHelper.gson().fromJson(jobHistory.getMetrics(), Map.class); + metricsMap.put("executionCode", jobHistory.getExecutionCode()); + metricsMap.put("runtime", TaskConversions.getJobRuntime(metricsMap)); + return Message.ok().data("metricsMap", metricsMap); } @ApiOperation( diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/util/JobhistoryUtils.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/util/JobhistoryUtils.java index abf1191430..4b6bef2fb7 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/util/JobhistoryUtils.java +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/util/JobhistoryUtils.java @@ -31,8 +31,7 @@ public class JobhistoryUtils { - public static String headersStr = - "任务ID,来源,查询语句,状态,已耗时,关键信息,应用/引擎,创建时间,是否复用,申请开始时间,申请结束时间,申请花费时间"; + public static String headersStr = "任务ID,来源,查询语句,状态,已耗时,关键信息,应用/引擎,创建时间,是否复用,申请开始时间,申请结束时间,申请花费时间"; public static String headersEnStr = "JobID,Source,Execution Code,Status,Time Elapsed,Key Information,App / Engine,Created at,IsRuse,Application Start Time,Application End Time,Application Takes Time"; @@ -62,7 +61,11 @@ public static byte[] downLoadJobToExcel(List jobHistoryList, String Row row = sheet.createRow(rowNum++); row.createCell(0).setCellValue(queryTaskVO.getTaskID()); row.createCell(1).setCellValue(queryTaskVO.getSourceTailor()); - row.createCell(2).setCellValue(queryTaskVO.getExecutionCode()); + String executionCode = queryTaskVO.getExecutionCode(); + if (executionCode.length() >= 32767) { + executionCode = executionCode.substring(0, 32767); + } + row.createCell(2).setCellValue(executionCode); row.createCell(3).setCellValue(queryTaskVO.getStatus()); if (null == queryTaskVO.getCostTime()) { queryTaskVO.setCostTime(0L); @@ -75,7 +78,11 @@ public static byte[] downLoadJobToExcel(List jobHistoryList, String + "/" + queryTaskVO.getRequestApplicationName()); row.createCell(7).setCellValue(TaskConversions.dateFomat(queryTaskVO.getCreatedTime())); - row.createCell(8).setCellValue(queryTaskVO.getIsReuse()); + if (null == queryTaskVO.getIsReuse()) { + row.createCell(8).setCellValue(""); + } else { + row.createCell(8).setCellValue(queryTaskVO.getIsReuse()); + } row.createCell(9).setCellValue(TaskConversions.dateFomat(queryTaskVO.getRequestStartTime())); row.createCell(10).setCellValue(TaskConversions.dateFomat(queryTaskVO.getRequestEndTime())); if (null == queryTaskVO.getRequestSpendTime()) { diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/common/JobHistoryMapper.xml b/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/common/JobHistoryMapper.xml index 64f2a025e2..3a4913dfda 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/common/JobHistoryMapper.xml +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/common/JobHistoryMapper.xml @@ -46,8 +46,8 @@ `id`, `job_req_id`, `submit_user`, `execute_user`, `source`, `labels`, `params`, - `progress`, `status`, `log_path`, `error_code`, `error_desc`, `created_time`, `updated_time`, - `instances`, `metrics`,`engine_type`, `execution_code`, `result_location` + `status`, `log_path`, `error_code`, `error_desc`, `created_time`, `updated_time`, + `instances`,`engine_type`, `result_location`, `observe_info` @@ -80,6 +80,24 @@ + + + diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/conversions/TaskConversions.scala b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/conversions/TaskConversions.scala index 6efab76e1e..dee1d2c3f7 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/conversions/TaskConversions.scala +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/conversions/TaskConversions.scala @@ -31,13 +31,13 @@ import org.apache.linkis.protocol.constants.TaskConstant import org.apache.linkis.protocol.utils.ZuulEntranceUtils import org.apache.linkis.server.{toScalaBuffer, toScalaMap, BDPJettyServerHelper} -import org.apache.commons.lang3.StringUtils +import org.apache.commons.lang3.{BooleanUtils, StringUtils} import org.springframework.beans.BeanUtils import java.text.SimpleDateFormat import java.util -import java.util.Date +import java.util.{Date, Map} import scala.collection.JavaConverters.{asScalaBufferConverter, mapAsScalaMapConverter} @@ -278,7 +278,9 @@ object TaskConversions extends Logging { null != metrics && metrics.containsKey(TaskConstant.JOB_IS_REUSE) && metrics .get(TaskConstant.JOB_IS_REUSE) != null ) { - taskVO.setIsReuse(metrics.get(TaskConstant.JOB_IS_REUSE).toString) + + taskVO.setIsReuse(BooleanUtils.toBoolean(metrics.get(TaskConstant.JOB_IS_REUSE).toString)) + } var requestStartTime: Date = null @@ -313,7 +315,7 @@ object TaskConversions extends Logging { taskVO.setCostTime(System.currentTimeMillis() - createTime.getTime) } } - if (metrics.containsKey(TaskConstant.ENGINE_INSTANCE)) { + if (null != metrics && metrics.containsKey(TaskConstant.ENGINE_INSTANCE)) { taskVO.setEngineInstance(metrics.get(TaskConstant.ENGINE_INSTANCE).toString) } else if (TaskStatus.Failed.toString.equals(job.getStatus)) { taskVO.setCanRetry(true) @@ -342,6 +344,7 @@ object TaskConversions extends Logging { } } taskVO.setObserveInfo(job.getObserveInfo) + taskVO.setMetrics(job.getMetrics) taskVO } @@ -377,9 +380,7 @@ object TaskConversions extends Logging { } } - def getJobRuntime(jobHistory: JobHistory): String = { - val metricsMap = - BDPJettyServerHelper.gson.fromJson((jobHistory.getMetrics), classOf[util.Map[String, Object]]) + def getJobRuntime(metricsMap: util.Map[String, String]): String = { var runTime = "" if (metricsMap.containsKey(TaskConstant.JOB_COMPLETE_TIME)) { val completeTime = dealString2Date( diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/JobHistoryQueryService.java b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/JobHistoryQueryService.java index b859009ba3..fde82b0ed2 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/JobHistoryQueryService.java +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/JobHistoryQueryService.java @@ -40,6 +40,8 @@ public interface JobHistoryQueryService { JobHistory getJobHistoryByIdAndName(Long jobID, String userName); + JobHistory getJobHistoryByIdAndNameNoMetrics(Long jobID, String userName); + List search(Long jobId, String username, String creator, String status, Date sDate, Date eDate, String engineType, Long startJobId, String instance, String departmentId, String engineInstance); Integer countUndoneTasks(String username, String creator, Date sDate, Date eDate, String engineType, Long startJobId); diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala index 4a73fa1f09..a9fe5a5f1e 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala @@ -28,6 +28,7 @@ import org.apache.linkis.governance.common.entity.job.{ } import org.apache.linkis.governance.common.protocol.conf.EntranceInstanceConfRequest import org.apache.linkis.governance.common.protocol.job._ +import org.apache.linkis.jobhistory.conf.JobhistoryConfiguration import org.apache.linkis.jobhistory.conversions.TaskConversions._ import org.apache.linkis.jobhistory.dao.JobHistoryMapper import org.apache.linkis.jobhistory.entity.{JobHistory, QueryJobHistory} @@ -255,6 +256,22 @@ class JobHistoryQueryServiceImpl extends JobHistoryQueryService with Logging { if (jobHistoryList.isEmpty) null else jobHistoryList.get(0) } + override def getJobHistoryByIdAndNameNoMetrics( + jobId: java.lang.Long, + userName: String + ): JobHistory = { + val jobReq = new JobHistory + jobReq.setId(jobId) + jobReq.setSubmitUser(userName) + if (JobhistoryConfiguration.JOB_HISTORY_QUERY_EXECUTION_CODE_SWITCH) { + val jobHistoryList = jobHistoryMapper.selectJobHistory(jobReq) + if (jobHistoryList.isEmpty) null else jobHistoryList.get(0) + } else { + val jobHistoryList = jobHistoryMapper.selectJobHistoryNoMetrics(jobReq) + if (jobHistoryList.isEmpty) null else jobHistoryList.get(0) + } + } + override def search( jobId: lang.Long, username: String, From 69d83f11131774e209927b99992ca494b13e42b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cv=5Fkkhuang=E2=80=9D?= <“420895376@qq.com”> Date: Fri, 9 Aug 2024 11:13:16 +0800 Subject: [PATCH 04/16] Resource johhistory code optimization --- .../restful/api/QueryRestfulApi.java | 43 +++++++----- .../jobhistory/util/JobhistoryUtils.java | 69 ++++++++++++++----- 2 files changed, 77 insertions(+), 35 deletions(-) diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java index efd27e32a7..eccaa16686 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java @@ -20,8 +20,6 @@ import org.apache.linkis.common.conf.Configuration; import org.apache.linkis.governance.common.constant.job.JobRequestConstants; import org.apache.linkis.governance.common.entity.job.QueryException; -import org.apache.linkis.governance.common.protocol.conf.DepartmentRequest; -import org.apache.linkis.governance.common.protocol.conf.DepartmentResponse; import org.apache.linkis.jobhistory.cache.impl.DefaultQueryCacheManager; import org.apache.linkis.jobhistory.conf.JobhistoryConfiguration; import org.apache.linkis.jobhistory.conversions.TaskConversions; @@ -211,17 +209,11 @@ public Message list( username = null; } } else if (null != isDeptView && isDeptView) { - Object responseObject = sender.ask(new DepartmentRequest(username)); - if (responseObject instanceof DepartmentResponse) { - DepartmentResponse departmentResponse = (DepartmentResponse) responseObject; - if (StringUtils.isNotBlank(departmentResponse.departmentId())) { - departmentId = departmentResponse.departmentId(); - if (StringUtils.isNotBlank(proxyUser)) { - username = proxyUser; - } else { - username = null; - } - } + departmentId = JobhistoryUtils.getDepartmentByuser(username); + if (StringUtils.isNotBlank(departmentId)) { + username = proxyUser; + } else { + username = null; } } if (StringUtils.isBlank(instance)) { @@ -497,12 +489,26 @@ public Message jobeExtraInfo( if (null == jobId) { return Message.error("Invalid jobId cannot be empty"); } - if (Configuration.isJobHistoryAdmin(username) - || Configuration.isAdmin(username) - || Configuration.isDepartmentAdmin(username)) { + JobHistory jobHistory = null; + if (Configuration.isJobHistoryAdmin(username) || Configuration.isAdmin(username)) { username = null; + jobHistory = jobHistoryQueryService.getJobHistoryByIdAndName(jobId, username); + } else { + if (Configuration.isDepartmentAdmin(username)) { + String departmentId = JobhistoryUtils.getDepartmentByuser(username); + if (StringUtils.isNotBlank(departmentId)) { + List list = + jobHistoryQueryService.search( + jobId, null, null, null, null, null, null, null, null, departmentId, null); + if (!CollectionUtils.isEmpty(list)) { + jobHistory = list.get(0); + } + } + } else { + jobHistory = jobHistoryQueryService.getJobHistoryByIdAndName(jobId, username); + } } - JobHistory jobHistory = jobHistoryQueryService.getJobHistoryByIdAndName(jobId, username); + if (jobHistory == null) { return Message.error( "The corresponding job was not found, or there may be no permission to view the job" @@ -569,7 +575,8 @@ public void downloadJobList( List queryTaskVOList = BDPJettyServerHelper.gson() .fromJson(jsonStr, new TypeToken>() {}.getType()); - byte[] bytes = JobhistoryUtils.downLoadJobToExcel(queryTaskVOList, language); + byte[] bytes = + JobhistoryUtils.downLoadJobToExcel(queryTaskVOList, language, isAdminView, isDeptView); response.setCharacterEncoding(Consts.UTF_8.toString()); response.addHeader("Content-Type", "application/json;charset=UTF-8"); response.setContentType("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"); diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/util/JobhistoryUtils.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/util/JobhistoryUtils.java index 4b6bef2fb7..fffa6a5536 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/util/JobhistoryUtils.java +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/util/JobhistoryUtils.java @@ -17,11 +17,16 @@ package org.apache.linkis.jobhistory.util; +import org.apache.linkis.common.conf.Configuration; import org.apache.linkis.common.utils.Utils; +import org.apache.linkis.governance.common.protocol.conf.DepartmentRequest; +import org.apache.linkis.governance.common.protocol.conf.DepartmentResponse; import org.apache.linkis.jobhistory.conversions.TaskConversions; import org.apache.linkis.jobhistory.entity.QueryTaskVO; +import org.apache.linkis.rpc.Sender; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.poi.ss.usermodel.*; import org.apache.poi.xssf.usermodel.XSSFWorkbook; @@ -31,24 +36,37 @@ public class JobhistoryUtils { - public static String headersStr = "任务ID,来源,查询语句,状态,已耗时,关键信息,应用/引擎,创建时间,是否复用,申请开始时间,申请结束时间,申请花费时间"; + public static String headersStr = + "任务ID,来源,查询语句,状态,已耗时,关键信息,是否复用,申请开始时间,申请结束时间,申请耗时,应用/引擎,用户,创建时间"; public static String headersEnStr = - "JobID,Source,Execution Code,Status,Time Elapsed,Key Information,App / Engine,Created at,IsRuse,Application Start Time,Application End Time,Application Takes Time"; + "JobID,Source,Execution Code,Status,Time Elapsed,Key Information,IsRuse,Application Start Time,Application End Time,Application Takes Time,App / Engine,User,Created at"; + private static Sender sender = + Sender.getSender( + Configuration.CLOUD_CONSOLE_CONFIGURATION_SPRING_APPLICATION_NAME().getValue());; - public static byte[] downLoadJobToExcel(List jobHistoryList, String language) + public static byte[] downLoadJobToExcel( + List jobHistoryList, String language, Boolean isAdminView, Boolean isDeptView) throws IOException { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); Workbook workbook = new XSSFWorkbook(); byte[] byteArray = new byte[0]; - Sheet sheet = workbook.createSheet("任务信息表"); // Create header row Row headerRow = sheet.createRow(0); String headers = ""; + Boolean viewResult = isAdminView || isDeptView; if (!"en".equals(language)) { - headers = headersStr; + if (viewResult) { + headers = headersStr; + } else { + headers = headersStr.replace(",用户", ""); + } } else { - headers = headersEnStr; + if (viewResult) { + headers = headersEnStr; + } else { + headers = headersEnStr.replace(",User", ""); + } } String[] headersArray = headers.split(","); for (int i = 0; i < headersArray.length; i++) { @@ -72,23 +90,28 @@ public static byte[] downLoadJobToExcel(List jobHistoryList, String } row.createCell(4).setCellValue(Utils.msDurationToString(queryTaskVO.getCostTime())); row.createCell(5).setCellValue(queryTaskVO.getErrDesc()); - row.createCell(6) - .setCellValue( - queryTaskVO.getExecuteApplicationName() - + "/" - + queryTaskVO.getRequestApplicationName()); - row.createCell(7).setCellValue(TaskConversions.dateFomat(queryTaskVO.getCreatedTime())); if (null == queryTaskVO.getIsReuse()) { - row.createCell(8).setCellValue(""); + row.createCell(6).setCellValue(""); } else { - row.createCell(8).setCellValue(queryTaskVO.getIsReuse()); + row.createCell(6).setCellValue(queryTaskVO.getIsReuse()); } - row.createCell(9).setCellValue(TaskConversions.dateFomat(queryTaskVO.getRequestStartTime())); - row.createCell(10).setCellValue(TaskConversions.dateFomat(queryTaskVO.getRequestEndTime())); + row.createCell(7).setCellValue(TaskConversions.dateFomat(queryTaskVO.getRequestStartTime())); + row.createCell(8).setCellValue(TaskConversions.dateFomat(queryTaskVO.getRequestEndTime())); if (null == queryTaskVO.getRequestSpendTime()) { queryTaskVO.setRequestSpendTime(0L); } - row.createCell(11).setCellValue(Utils.msDurationToString(queryTaskVO.getRequestSpendTime())); + row.createCell(9).setCellValue(Utils.msDurationToString(queryTaskVO.getRequestSpendTime())); + row.createCell(10) + .setCellValue( + queryTaskVO.getExecuteApplicationName() + + "/" + + queryTaskVO.getRequestApplicationName()); + if (viewResult) { + row.createCell(11).setCellValue(queryTaskVO.getUmUser()); + row.createCell(12).setCellValue(TaskConversions.dateFomat(queryTaskVO.getCreatedTime())); + } else { + row.createCell(11).setCellValue(TaskConversions.dateFomat(queryTaskVO.getCreatedTime())); + } } try { workbook.write(outputStream); @@ -101,4 +124,16 @@ public static byte[] downLoadJobToExcel(List jobHistoryList, String } return byteArray; } + + public static String getDepartmentByuser(String username) { + String departmentId = ""; + Object responseObject = sender.ask(new DepartmentRequest(username)); + if (responseObject instanceof DepartmentResponse) { + DepartmentResponse departmentResponse = (DepartmentResponse) responseObject; + if (StringUtils.isNotBlank(departmentResponse.departmentId())) { + departmentId = departmentResponse.departmentId(); + } + } + return departmentId; + } } From 22217c7f3e01ffd180f89049a1a8d6459ab46751 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cv=5Fkkhuang=E2=80=9D?= <“420895376@qq.com”> Date: Mon, 12 Aug 2024 10:00:32 +0800 Subject: [PATCH 05/16] revent jobhistory: get data --- .../linkis/manager/am/restful/EMRestfulApi.java | 14 +++++++++----- .../jobhistory/restful/api/QueryRestfulApi.java | 5 ++--- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java index e5d6f9d25e..c0b481d318 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java @@ -577,13 +577,17 @@ public Message taskprediction( .data("yarnResource", canCreateECRes.getYarnResource()) .data("checkResult", canCreateECRes.isCanCreateEC()); } + @ApiOperation( - value = "reset resource", - notes = "ecm & user resource reset", - response = Message.class) + value = "reset resource", + notes = "ecm & user resource reset", + response = Message.class) @ApiImplicitParams({ - @ApiImplicitParam(name = "serviceInstance", dataType = "String", example = "gz.bdz.bdplxxxxx.webank:9102"), - @ApiImplicitParam(name = "username", dataType = "String", example = "hadoop") + @ApiImplicitParam( + name = "serviceInstance", + dataType = "String", + example = "gz.bdz.bdplxxxxx.webank:9102"), + @ApiImplicitParam(name = "username", dataType = "String", example = "hadoop") }) @RequestMapping(path = "/reset-resource", method = RequestMethod.GET) public Message resetResource( diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java index eccaa16686..446dbb7edd 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java @@ -104,11 +104,10 @@ public Message getTaskByID(HttpServletRequest req, @PathVariable("id") Long jobI || Configuration.isDepartmentAdmin(username)) { username = null; } - JobHistory jobHistory = - jobHistoryQueryService.getJobHistoryByIdAndNameNoMetrics(jobId, username); + JobHistory jobHistory = jobHistoryQueryService.getJobHistoryByIdAndName(jobId, username); try { - if (null != jobHistory) { + if (JobhistoryConfiguration.JOB_HISTORY_QUERY_EXECUTION_CODE_SWITCH()&&null != jobHistory) { QueryUtils.exchangeExecutionCode(jobHistory); } } catch (Exception e) { From 33bd43e20acaab3ca52362928e9fa7f2b97a5e3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cv=5Fkkhuang=E2=80=9D?= <“420895376@qq.com”> Date: Mon, 12 Aug 2024 17:37:08 +0800 Subject: [PATCH 06/16] update isreused --- .../java/org/apache/linkis/jobhistory/util/JobhistoryUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/util/JobhistoryUtils.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/util/JobhistoryUtils.java index fffa6a5536..ea6e188367 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/util/JobhistoryUtils.java +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/util/JobhistoryUtils.java @@ -93,7 +93,7 @@ public static byte[] downLoadJobToExcel( if (null == queryTaskVO.getIsReuse()) { row.createCell(6).setCellValue(""); } else { - row.createCell(6).setCellValue(queryTaskVO.getIsReuse()); + row.createCell(6).setCellValue(queryTaskVO.getIsReuse() ? "是" : "否"); } row.createCell(7).setCellValue(TaskConversions.dateFomat(queryTaskVO.getRequestStartTime())); row.createCell(8).setCellValue(TaskConversions.dateFomat(queryTaskVO.getRequestEndTime())); From 651d51fdc9c1fa80caae68b9b87e85c7cd9be183 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cv=5Fkkhuang=E2=80=9D?= <“420895376@qq.com”> Date: Mon, 12 Aug 2024 19:26:55 +0800 Subject: [PATCH 07/16] fix executionCode error --- .../apache/linkis/jobhistory/util/QueryUtils.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/util/QueryUtils.scala b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/util/QueryUtils.scala index b2cb25051d..48beb0d09e 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/util/QueryUtils.scala +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/util/QueryUtils.scala @@ -30,7 +30,7 @@ import org.apache.linkis.storage.utils.{FileSystemUtils, StorageUtils} import org.apache.commons.io.IOUtils import org.apache.commons.lang3.time.DateFormatUtils -import java.io.{InputStream, OutputStream} +import java.io.{BufferedReader, InputStream, InputStreamReader, OutputStream} import java.text.SimpleDateFormat import java.util import java.util.{Arrays, Date} @@ -115,28 +115,31 @@ object QueryUtils extends Logging { val infos: Array[String] = codeInfo.split(LENGTH_SPLIT) val position = infos(0).toInt var lengthLeft = infos(1).toInt - val tub = new Array[Byte](1024) + val tub = new Array[Char](1024) val executionCode: StringBuilder = new StringBuilder val fsPath: FsPath = new FsPath(path) val fileSystem = FSFactory.getFsByProxyUser(fsPath, queryTask.getExecuteUser).asInstanceOf[FileSystem] fileSystem.init(null) var is: InputStream = null + var bufferedReader: BufferedReader = null if (!fileSystem.exists(fsPath)) return Utils.tryFinally { is = fileSystem.read(fsPath) - if (position > 0) is.skip(position) + bufferedReader = new BufferedReader(new InputStreamReader(is, CHARSET)) + if (position > 0) bufferedReader.skip(position) breakable { while (lengthLeft > 0) { - val readed = is.read(tub) + val readed = bufferedReader.read(tub) val useful = Math.min(readed, lengthLeft) if (useful < 0) break() lengthLeft -= useful - executionCode.append(new String(tub, 0, useful, CHARSET)) + executionCode.append(new String(tub)) } } } { IOUtils.closeQuietly(is) + IOUtils.closeQuietly(bufferedReader) if (fileSystem != null) Utils.tryAndWarn(fileSystem.close()) } queryTask.setExecutionCode(executionCode.toString()) From 4c973690254233733bc8b99c8651e60db016a869 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cv=5Fkkhuang=E2=80=9D?= <“420895376@qq.com”> Date: Tue, 13 Aug 2024 10:55:23 +0800 Subject: [PATCH 08/16] Code optimization --- .../linkis/jobhistory/restful/api/QueryRestfulApi.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java index 446dbb7edd..d9fc189f00 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java @@ -107,8 +107,12 @@ public Message getTaskByID(HttpServletRequest req, @PathVariable("id") Long jobI JobHistory jobHistory = jobHistoryQueryService.getJobHistoryByIdAndName(jobId, username); try { - if (JobhistoryConfiguration.JOB_HISTORY_QUERY_EXECUTION_CODE_SWITCH()&&null != jobHistory) { - QueryUtils.exchangeExecutionCode(jobHistory); + if (null != jobHistory) { + if (JobhistoryConfiguration.JOB_HISTORY_QUERY_EXECUTION_CODE_SWITCH()) { + QueryUtils.exchangeExecutionCode(jobHistory); + } else { + jobHistory.setExecutionCode(null); + } } } catch (Exception e) { log.error("Exchange executionCode for job with id : {} failed, {}", jobHistory.getId(), e); From 74e9abcffbca697e35ab96c995e395367c9006be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cv=5Fkkhuang=E2=80=9D?= <“420895376@qq.com”> Date: Tue, 13 Aug 2024 12:01:49 +0800 Subject: [PATCH 09/16] Code optimization --- .../scala/org/apache/linkis/jobhistory/util/QueryUtils.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/util/QueryUtils.scala b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/util/QueryUtils.scala index 48beb0d09e..c109f24ab2 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/util/QueryUtils.scala +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/util/QueryUtils.scala @@ -134,7 +134,9 @@ object QueryUtils extends Logging { val useful = Math.min(readed, lengthLeft) if (useful < 0) break() lengthLeft -= useful - executionCode.append(new String(tub)) + val usefulChars = new Array[Char](useful) + System.arraycopy(tub, 0, usefulChars, 0, useful) + executionCode.append(new String(usefulChars)) } } } { From d0aec92d9724729ca573d548945c617e985e6e5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cv=5Fkkhuang=E2=80=9D?= <“420895376@qq.com”> Date: Tue, 13 Aug 2024 16:18:32 +0800 Subject: [PATCH 10/16] Code optimization --- .../apache/linkis/jobhistory/restful/api/QueryRestfulApi.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java index d9fc189f00..6c069efaa7 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java @@ -213,7 +213,7 @@ public Message list( } } else if (null != isDeptView && isDeptView) { departmentId = JobhistoryUtils.getDepartmentByuser(username); - if (StringUtils.isNotBlank(departmentId)) { + if (StringUtils.isNotBlank(departmentId) && StringUtils.isNotBlank(proxyUser)) { username = proxyUser; } else { username = null; From f9579ba93220c178bef5e82e0022f6eb55d239e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cv=5Fkkhuang=E2=80=9D?= <“420895376@qq.com”> Date: Wed, 14 Aug 2024 15:15:38 +0800 Subject: [PATCH 11/16] Code optimization --- .../am/service/em/DefaultEMInfoService.scala | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala index b5a71a8c2d..fc4a24ab89 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala @@ -30,6 +30,7 @@ import org.apache.linkis.manager.common.entity.resource.{NodeResource, Resource, import org.apache.linkis.manager.common.protocol.em.GetEMInfoRequest import org.apache.linkis.manager.common.protocol.node.NodeHealthyRequest import org.apache.linkis.manager.common.utils.ResourceUtils +import org.apache.linkis.manager.label.entity.em.EMInstanceLabel import org.apache.linkis.manager.label.entity.node.AliasServiceInstanceLabel import org.apache.linkis.manager.label.service.NodeLabelService import org.apache.linkis.manager.label.utils.LabelUtil @@ -162,8 +163,9 @@ class DefaultEMInfoService extends EMInfoService with Logging { ecmInstance.getServiceInstance.getInstance ) ) + val eMInstanceLabel = ecmInstance.getLabels.filter(_.isInstanceOf[EMInstanceLabel]).head val lock = - resourceManager.tryLockOneLabel(ecmInstance.getLabels.head, -1, Utils.getJvmUser) + resourceManager.tryLockOneLabel(eMInstanceLabel, -1, Utils.getJvmUser) engineInfoService .updateEngineHealthyStatus(ecmInstance.getServiceInstance, NodeHealthy.UnHealthy) Utils.tryFinally { @@ -174,23 +176,23 @@ class DefaultEMInfoService extends EMInfoService with Logging { val (realSumResource, useResource, lockResource) = collectResource(nodeResource, ResourceType.LoadInstance) // 收集ECM资源 - val ecmResource = - ecmInstance.getNodeResource.getUsedResource + ecmInstance.getNodeResource.getLockedResource + val ecmNodeResource = ecmInstance.getNodeResource // 资源对比,资源重置 - if (!(ecmResource == realSumResource)) { + if ( + (!(useResource == ecmNodeResource.getUsedResource)) || (!(lockResource == ecmNodeResource.getLockedResource)) + ) { logger.info( MessageFormat.format( "ECM:{0} resources will be reset, Record Resources:{1} ,Real Resources:{2}", ecmInstance.getServiceInstance.getInstance, - ecmResource, + ecmNodeResource.getUsedResource + ecmNodeResource.getLockedResource, realSumResource ) ) - val ecmNodeResource = ecmInstance.getNodeResource ecmNodeResource.setLockedResource(lockResource) ecmNodeResource.setLeftResource(ecmNodeResource.getMaxResource - realSumResource) ecmNodeResource.setUsedResource(useResource) - val persistence = ResourceUtils.toPersistenceResource(ecmInstance.getNodeResource) + val persistence = ResourceUtils.toPersistenceResource(ecmNodeResource) val resourceLabel = labelManagerPersistence.getLabelByResource(persistence) resourceManager.resetResource(resourceLabel.head, ecmNodeResource) } @@ -238,7 +240,9 @@ class DefaultEMInfoService extends EMInfoService with Logging { val userLabelResourceSum = userPersistenceResource.getUsedResource + userPersistenceResource.getLockedResource val userResourceType = ResourceType.valueOf(userLabelResource.getResourceType) - val matchResult = userLabelResourceSum.caseMore(Resource.initResource(userResourceType)) + val initResource = Resource.initResource(userResourceType) + val matchResult = + (!(userPersistenceResource.getUsedResource == initResource)) || (!(userPersistenceResource.getLockedResource == initResource)) if (matchResult) { val userEngineNodes = nodeLabelService.getEngineNodesWithResourceByUser(labelUser, true) val userEngineNodeFilter = userEngineNodes @@ -254,12 +258,14 @@ class DefaultEMInfoService extends EMInfoService with Logging { // 收集所有node所使用的资源(汇总、已使用、上锁) val (sumResource, uedResource, lockResource) = collectResource(userEngineNodeFilter, userResourceType) - if (!(sumResource == userLabelResourceSum)) { + if ( + (!(uedResource == userPersistenceResource.getUsedResource)) || (!(lockResource == userPersistenceResource.getLockedResource)) + ) { logger.info( MessageFormat.format( "LabelUser:{0} resources will be reset, Record Resources:{1} ,Real Resources:{2}", labelUser, - userLabelResourceSum, + userPersistenceResource.getUsedResource + userPersistenceResource.getLockedResource, sumResource ) ) From 5c03d62cbb67e2548f55121fe9538eff6a1ae707 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cv=5Fkkhuang=E2=80=9D?= <“420895376@qq.com”> Date: Wed, 14 Aug 2024 15:52:40 +0800 Subject: [PATCH 12/16] Code optimization --- .../am/service/em/DefaultEMInfoService.scala | 65 +++++++++---------- 1 file changed, 29 insertions(+), 36 deletions(-) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala index fc4a24ab89..c9dbb39ace 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala @@ -237,45 +237,38 @@ class DefaultEMInfoService extends EMInfoService with Logging { val lock = resourceManager.tryLockOneLabel(resourceLabel.head, -1, labelUser) Utils.tryFinally { val userPersistenceResource = ResourceUtils.fromPersistenceResource(userLabelResource) - val userLabelResourceSum = - userPersistenceResource.getUsedResource + userPersistenceResource.getLockedResource val userResourceType = ResourceType.valueOf(userLabelResource.getResourceType) - val initResource = Resource.initResource(userResourceType) - val matchResult = - (!(userPersistenceResource.getUsedResource == initResource)) || (!(userPersistenceResource.getLockedResource == initResource)) - if (matchResult) { - val userEngineNodes = nodeLabelService.getEngineNodesWithResourceByUser(labelUser, true) - val userEngineNodeFilter = userEngineNodes - .filter { node => - val userCreatorLabelStr = - LabelUtil.getUserCreatorLabel(node.getLabels).getStringValue - val engineTypeLabelStr = LabelUtil.getEngineTypeLabel(node.getLabels).getStringValue - userLabelResource.getCreator.equalsIgnoreCase( - s"${userCreatorLabelStr},${engineTypeLabelStr}" - ) - } - .map(_.getNodeResource) - // 收集所有node所使用的资源(汇总、已使用、上锁) - val (sumResource, uedResource, lockResource) = - collectResource(userEngineNodeFilter, userResourceType) - if ( - (!(uedResource == userPersistenceResource.getUsedResource)) || (!(lockResource == userPersistenceResource.getLockedResource)) - ) { - logger.info( - MessageFormat.format( - "LabelUser:{0} resources will be reset, Record Resources:{1} ,Real Resources:{2}", - labelUser, - userPersistenceResource.getUsedResource + userPersistenceResource.getLockedResource, - sumResource - ) + val userEngineNodes = nodeLabelService.getEngineNodesWithResourceByUser(labelUser, true) + val userEngineNodeFilter = userEngineNodes + .filter { node => + val userCreatorLabelStr = + LabelUtil.getUserCreatorLabel(node.getLabels).getStringValue + val engineTypeLabelStr = LabelUtil.getEngineTypeLabel(node.getLabels).getStringValue + userLabelResource.getCreator.equalsIgnoreCase( + s"${userCreatorLabelStr},${engineTypeLabelStr}" ) - userPersistenceResource.setLeftResource( - userPersistenceResource.getMaxResource - sumResource - ) - userPersistenceResource.setUsedResource(uedResource) - userPersistenceResource.setLockedResource(lockResource) - resourceManager.resetResource(resourceLabel.head, userPersistenceResource) } + .map(_.getNodeResource) + // 收集所有node所使用的资源(汇总、已使用、上锁) + val (sumResource, uedResource, lockResource) = + collectResource(userEngineNodeFilter, userResourceType) + if ( + (!(uedResource == userPersistenceResource.getUsedResource)) || (!(lockResource == userPersistenceResource.getLockedResource)) + ) { + logger.info( + MessageFormat.format( + "LabelUser:{0} resources will be reset, Record Resources:{1} ,Real Resources:{2}", + labelUser, + userPersistenceResource.getUsedResource + userPersistenceResource.getLockedResource, + sumResource + ) + ) + userPersistenceResource.setLeftResource( + userPersistenceResource.getMaxResource - sumResource + ) + userPersistenceResource.setUsedResource(uedResource) + userPersistenceResource.setLockedResource(lockResource) + resourceManager.resetResource(resourceLabel.head, userPersistenceResource) } } { resourceManager.unLock(lock) From f8e95c6017899558bfc303d4e4dc78e61453f48c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cv=5Fkkhuang=E2=80=9D?= <“420895376@qq.com”> Date: Wed, 14 Aug 2024 19:30:43 +0800 Subject: [PATCH 13/16] code review fix --- .../linkis/server/PerformanceInterceptor.java | 10 ++-- .../am/service/em/DefaultEMInfoService.scala | 34 +++++++----- .../package/admin/clear_resource_set.sh | 55 ------------------- .../jobhistory/dao/JobHistoryMapper.java | 2 +- .../restful/api/QueryRestfulApi.java | 18 +++--- .../mapper/common/JobHistoryMapper.xml | 6 +- .../service/JobHistoryQueryService.java | 2 +- .../impl/JobHistoryQueryServiceImpl.scala | 11 +--- .../linkis/jobhistory/util/QueryUtils.scala | 1 - 9 files changed, 43 insertions(+), 96 deletions(-) delete mode 100644 linkis-dist/package/admin/clear_resource_set.sh diff --git a/linkis-commons/linkis-module/src/main/java/org/apache/linkis/server/PerformanceInterceptor.java b/linkis-commons/linkis-module/src/main/java/org/apache/linkis/server/PerformanceInterceptor.java index de5d367c53..2a9cb2dd02 100644 --- a/linkis-commons/linkis-module/src/main/java/org/apache/linkis/server/PerformanceInterceptor.java +++ b/linkis-commons/linkis-module/src/main/java/org/apache/linkis/server/PerformanceInterceptor.java @@ -47,11 +47,11 @@ public void afterCompletion( long endTime = System.currentTimeMillis(); long executeTime = endTime - startTime; logger.info( - "Request client address:{} request URL: {} Method: {} taken: {} ms", - LinkisSpringUtils.getClientIP(request), - request.getRequestURI(), - request.getMethod(), - executeTime); + "Request client address:{} request URL: {} Method: {} taken: {} ms", + LinkisSpringUtils.getClientIP(request), + request.getRequestURI(), + request.getMethod(), + executeTime); } } } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala index c9dbb39ace..ea641b92eb 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala @@ -150,9 +150,13 @@ class DefaultEMInfoService extends EMInfoService with Logging { // ECM开关 if (AMConfiguration.AM_ECM_RESET_RESOURCE) { val filteredECMs = if (StringUtils.isNotBlank(serviceInstance)) { - getAllEM().filter(_.getServiceInstance.getInstance.equals(serviceInstance)) + if (serviceInstance.equals("*")) { + getAllEM() + } else { + getAllEM().filter(_.getServiceInstance.getInstance.equals(serviceInstance)) + } } else { - getAllEM() + null } // 遍历处理ECM filteredECMs.foreach { ecmInstance => @@ -213,19 +217,23 @@ class DefaultEMInfoService extends EMInfoService with Logging { // 用户资源重置 if (AMConfiguration.AM_USER_RESET_RESOURCE) { // 获取用户的标签 - val user = if (StringUtils.isNotBlank(username)) { - username + val userLabels = if (StringUtils.isNotBlank(username)) { + val user = if (username.equals("*")) { + "" + } else { + username + } + val labelValuePattern = + MessageFormat.format("%{0}%,%{1}%,%{2}%,%", "", user, "") + labelManagerPersistence.getLabelByPattern( + labelValuePattern, + "combined_userCreator_engineType", + null, + null + ) } else { - "" - } - val labelValuePattern = - MessageFormat.format("%{0}%,%{1}%,%{2}%,%", "", user, "") - val userLabels = labelManagerPersistence.getLabelByPattern( - labelValuePattern, - "combined_userCreator_engineType", - null, null - ) + } // 获取与这些标签关联的资源 val userLabelResources = resourceManagerPersistence.getResourceByLabels(userLabels).asScala // 遍历用户标签资源 diff --git a/linkis-dist/package/admin/clear_resource_set.sh b/linkis-dist/package/admin/clear_resource_set.sh deleted file mode 100644 index b8976e3327..0000000000 --- a/linkis-dist/package/admin/clear_resource_set.sh +++ /dev/null @@ -1,55 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# -# description: -# Clean up the resource script, call linkismanager to reset the resource interface, -# parameters: serviceInstance, username, -# demo: sh clear_resource_set.sh bdpdws110002:9102 hadoop -# demo: sh clear_resource_set.sh "" hduser0102 -# - -cd "$(dirname "$0")" -cd .. -INSTALL_HOME="$(pwd)" - -# Set LINKIS_HOME -if [ -z "$LINKIS_HOME" ]; then - export LINKIS_HOME="$INSTALL_HOME" -fi - -# Set LINKIS_CONF_DIR -if [ -z "$LINKIS_CONF_DIR" ]; then - export LINKIS_CONF_DIR="$LINKIS_HOME/conf" -fi - -# Read configuration -linkisMainConf="$LINKIS_CONF_DIR/linkis.properties" -gatewayUrl=$(grep '^wds.linkis.gateway.url' "$linkisMainConf" | cut -d "=" -f2) - -echo "gatewayUrl: $gatewayUrl" - -# Parse command-line arguments -serviceInstance=$1 -username=$2 - -# Construct request URL -requestUrl="$gatewayUrl/api/rest_j/v1/linkisManager/reset-resource?serviceInstance=$serviceInstance&username=$username" -echo "requestUrl: $requestUrl" -# Execute request -response=$(curl --silent --location --request GET "$requestUrl" -H "Token-Code:LINKIS-AUTH-eTaYLbQpmIulPyrXcMl" -H "Token-User:hadoop") - -# Print response -echo "Response: $response" diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java index a0d27ed480..ebd7cdf31b 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java @@ -128,5 +128,5 @@ void updateJobHistoryCancelById( List selectJobHistoryByTaskidList( @Param("idList") List idList, @Param("umUser") String username); - List selectJobHistoryNoMetrics(JobHistory jobReq); + List selectJobHistoryNoCode(JobHistory jobReq); } diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java index 6c069efaa7..d1e202a338 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java @@ -104,18 +104,18 @@ public Message getTaskByID(HttpServletRequest req, @PathVariable("id") Long jobI || Configuration.isDepartmentAdmin(username)) { username = null; } - JobHistory jobHistory = jobHistoryQueryService.getJobHistoryByIdAndName(jobId, username); - - try { - if (null != jobHistory) { - if (JobhistoryConfiguration.JOB_HISTORY_QUERY_EXECUTION_CODE_SWITCH()) { + JobHistory jobHistory = null; + if (JobhistoryConfiguration.JOB_HISTORY_QUERY_EXECUTION_CODE_SWITCH()) { + jobHistory = jobHistoryQueryService.getJobHistoryByIdAndNameNoCode(jobId, username); + } else { + jobHistory = jobHistoryQueryService.getJobHistoryByIdAndName(jobId, username); + try { + if (null != jobHistory) { QueryUtils.exchangeExecutionCode(jobHistory); - } else { - jobHistory.setExecutionCode(null); } + } catch (Exception e) { + log.error("Exchange executionCode for job with id : {} failed, {}", jobHistory.getId(), e); } - } catch (Exception e) { - log.error("Exchange executionCode for job with id : {} failed, {}", jobHistory.getId(), e); } QueryTaskVO taskVO = TaskConversions.jobHistory2TaskVO(jobHistory, null); // todo check diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/common/JobHistoryMapper.xml b/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/common/JobHistoryMapper.xml index 3a4913dfda..524f6905f6 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/common/JobHistoryMapper.xml +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/common/JobHistoryMapper.xml @@ -46,8 +46,8 @@ `id`, `job_req_id`, `submit_user`, `execute_user`, `source`, `labels`, `params`, - `status`, `log_path`, `error_code`, `error_desc`, `created_time`, `updated_time`, - `instances`,`engine_type`, `result_location`, `observe_info` + `progress`, `status`, `log_path`, `error_code`, `error_desc`, `created_time`, `updated_time`, + `instances`, `metrics`,`engine_type`, `result_location` @@ -80,7 +80,7 @@ - SELECT FROM linkis_ps_job_history_group_history diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/JobHistoryQueryService.java b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/JobHistoryQueryService.java index fde82b0ed2..158457d79a 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/JobHistoryQueryService.java +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/JobHistoryQueryService.java @@ -40,7 +40,7 @@ public interface JobHistoryQueryService { JobHistory getJobHistoryByIdAndName(Long jobID, String userName); - JobHistory getJobHistoryByIdAndNameNoMetrics(Long jobID, String userName); + JobHistory getJobHistoryByIdAndNameNoCode(Long jobID, String userName); List search(Long jobId, String username, String creator, String status, Date sDate, Date eDate, String engineType, Long startJobId, String instance, String departmentId, String engineInstance); diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala index a9fe5a5f1e..470c40b83f 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala @@ -256,20 +256,15 @@ class JobHistoryQueryServiceImpl extends JobHistoryQueryService with Logging { if (jobHistoryList.isEmpty) null else jobHistoryList.get(0) } - override def getJobHistoryByIdAndNameNoMetrics( + override def getJobHistoryByIdAndNameNoCode( jobId: java.lang.Long, userName: String ): JobHistory = { val jobReq = new JobHistory jobReq.setId(jobId) jobReq.setSubmitUser(userName) - if (JobhistoryConfiguration.JOB_HISTORY_QUERY_EXECUTION_CODE_SWITCH) { - val jobHistoryList = jobHistoryMapper.selectJobHistory(jobReq) - if (jobHistoryList.isEmpty) null else jobHistoryList.get(0) - } else { - val jobHistoryList = jobHistoryMapper.selectJobHistoryNoMetrics(jobReq) - if (jobHistoryList.isEmpty) null else jobHistoryList.get(0) - } + val jobHistoryList = jobHistoryMapper.selectJobHistoryNoCode(jobReq) + if (jobHistoryList.isEmpty) null else jobHistoryList.get(0) } override def search( diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/util/QueryUtils.scala b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/util/QueryUtils.scala index c109f24ab2..6e866df77a 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/util/QueryUtils.scala +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/util/QueryUtils.scala @@ -140,7 +140,6 @@ object QueryUtils extends Logging { } } } { - IOUtils.closeQuietly(is) IOUtils.closeQuietly(bufferedReader) if (fileSystem != null) Utils.tryAndWarn(fileSystem.close()) } From 619ac14aef7093ebdb13239a811e8b9c96cb472f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cv=5Fkkhuang=E2=80=9D?= <“420895376@qq.com”> Date: Wed, 14 Aug 2024 19:56:37 +0800 Subject: [PATCH 14/16] code review fix --- .../am/service/em/DefaultEMInfoService.scala | 40 ++++++++----------- 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala index ea641b92eb..4f0f5f6aa5 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala @@ -148,15 +148,11 @@ class DefaultEMInfoService extends EMInfoService with Logging { override def resetResource(serviceInstance: String, username: String): Unit = { // ECM开关 - if (AMConfiguration.AM_ECM_RESET_RESOURCE) { - val filteredECMs = if (StringUtils.isNotBlank(serviceInstance)) { - if (serviceInstance.equals("*")) { - getAllEM() - } else { - getAllEM().filter(_.getServiceInstance.getInstance.equals(serviceInstance)) - } + if (AMConfiguration.AM_ECM_RESET_RESOURCE && StringUtils.isNotBlank(serviceInstance)) { + val filteredECMs = if (serviceInstance.equals("*")) { + getAllEM() } else { - null + getAllEM().filter(_.getServiceInstance.getInstance.equals(serviceInstance)) } // 遍历处理ECM filteredECMs.foreach { ecmInstance => @@ -215,25 +211,21 @@ class DefaultEMInfoService extends EMInfoService with Logging { } // 用户资源重置 - if (AMConfiguration.AM_USER_RESET_RESOURCE) { + if (AMConfiguration.AM_USER_RESET_RESOURCE && StringUtils.isNotBlank(username)) { // 获取用户的标签 - val userLabels = if (StringUtils.isNotBlank(username)) { - val user = if (username.equals("*")) { - "" - } else { - username - } - val labelValuePattern = - MessageFormat.format("%{0}%,%{1}%,%{2}%,%", "", user, "") - labelManagerPersistence.getLabelByPattern( - labelValuePattern, - "combined_userCreator_engineType", - null, - null - ) + val user = if (username.equals("*")) { + "" } else { - null + username } + val labelValuePattern = + MessageFormat.format("%{0}%,%{1}%,%{2}%,%", "", user, "") + val userLabels = labelManagerPersistence.getLabelByPattern( + labelValuePattern, + "combined_userCreator_engineType", + null, + null + ) // 获取与这些标签关联的资源 val userLabelResources = resourceManagerPersistence.getResourceByLabels(userLabels).asScala // 遍历用户标签资源 From 132898560fefff507af96606557693f41149d2c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cv=5Fkkhuang=E2=80=9D?= <“420895376@qq.com”> Date: Thu, 15 Aug 2024 14:42:05 +0800 Subject: [PATCH 15/16] Fix Security Work Order: Upgrade Spring Version:5.3.27 -> 5.3.34 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index d47464ebc0..6bb0bdfd17 100644 --- a/pom.xml +++ b/pom.xml @@ -169,7 +169,7 @@ 1.4.200 - 5.3.27 + 5.3.34 5.7.8 2.7.11 3.1.7 From 0c38a9211eed555a24415a406342058d020a1bfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cv=5Fkkhuang=E2=80=9D?= <“420895376@qq.com”> Date: Thu, 15 Aug 2024 14:45:49 +0800 Subject: [PATCH 16/16] Optimize script log address --- linkis-dist/package/sbin/kill-process-by-pid.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkis-dist/package/sbin/kill-process-by-pid.sh b/linkis-dist/package/sbin/kill-process-by-pid.sh index 275fe88702..a90e7febbf 100644 --- a/linkis-dist/package/sbin/kill-process-by-pid.sh +++ b/linkis-dist/package/sbin/kill-process-by-pid.sh @@ -22,7 +22,7 @@ if [ "$LINKIS_HOME" = "" ]; then export LINKIS_HOME=$INSTALL_HOME fi if [ "$LINKIS_LOG_DIR" = "" ]; then - export LINKIS_LOG_DIR="/data/bdp/logs/linkis" + export LINKIS_LOG_DIR="/data/logs/bdpe-ujes" fi ecmPid=`cat $LINKIS_HOME/pid/linkis_cg-engineconnmanager.pid` month=`date '+%Y-%m'`