diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java index 332eb765eb..cb54578075 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java @@ -56,11 +56,9 @@ public class YarnResourceRequester implements ExternalResourceRequester { private final String HASTATE_ACTIVE = "active"; private static final ObjectMapper objectMapper = new ObjectMapper(); - - private ExternalResourceProvider provider = null; private final Map rmAddressMap = new ConcurrentHashMap<>(); - private String getAuthorizationStr() { + private String getAuthorizationStr(ExternalResourceProvider provider) { String user = (String) provider.getConfigMap().getOrDefault("user", ""); String pwd = (String) provider.getConfigMap().getOrDefault("pwd", ""); String authKey = user + ":" + pwd; @@ -70,18 +68,16 @@ private String getAuthorizationStr() { @Override public NodeResource requestResourceInfo( ExternalResourceIdentifier identifier, ExternalResourceProvider provider) { - String rmWebHaAddress = (String) provider.getConfigMap().get("rmWebAddress"); - this.provider = provider; - String rmWebAddress = getAndUpdateActiveRmWebAddress(rmWebHaAddress); + String rmWebAddress = getAndUpdateActiveRmWebAddress(provider); logger.info("rmWebAddress: " + rmWebAddress); - String queueName = ((YarnResourceIdentifier) identifier).getQueueName(); + String queueName = ((YarnResourceIdentifier) identifier).getQueueName(); String realQueueName = "root." + queueName; return LinkisUtils.tryCatch( () -> { Pair yarnResource = - getResources(rmWebAddress, realQueueName, queueName); + getResources(rmWebAddress, realQueueName, queueName, provider); CommonNodeResource nodeResource = new CommonNodeResource(); nodeResource.setMaxResource(yarnResource.getKey()); @@ -95,9 +91,12 @@ public NodeResource requestResourceInfo( } public Optional maxEffectiveHandle( - Optional queueValue, String rmWebAddress, String queueName) { + Optional queueValue, + String rmWebAddress, + String queueName, + ExternalResourceProvider provider) { try { - JsonNode metrics = getResponseByUrl("metrics", rmWebAddress); + JsonNode metrics = getResponseByUrl("metrics", rmWebAddress, provider); JsonNode clusterMetrics = metrics.path("clusterMetrics"); long totalMemory = clusterMetrics.path("totalMB").asLong(); long totalCores = clusterMetrics.path("totalVirtualCores").asLong(); @@ -201,8 +200,11 @@ static JsonNode getChildQueuesOfCapacity(JsonNode resp) { } public Pair getResources( - String rmWebAddress, String realQueueName, String queueName) { - JsonNode resp = getResponseByUrl("scheduler", rmWebAddress); + String rmWebAddress, + String realQueueName, + String queueName, + ExternalResourceProvider provider) { + JsonNode resp = getResponseByUrl("scheduler", rmWebAddress, provider); JsonNode schedulerInfo = resp.path("scheduler").path("schedulerInfo"); String schedulerType = schedulerInfo.path("type").asText(); if ("capacityScheduler".equals(schedulerType)) { @@ -217,7 +219,7 @@ public Pair getResources( MessageFormat.format(YARN_NOT_EXISTS_QUEUE.getErrorDesc(), queueName)); } return Pair.of( - maxEffectiveHandle(queue, rmWebAddress, queueName).get(), + maxEffectiveHandle(queue, rmWebAddress, queueName, provider).get(), getYarnResource(queue.map(node -> node.path("resourcesUsed")), queueName).get()); } else if ("fairScheduler".equals(schedulerType)) { @@ -277,13 +279,13 @@ public static Optional getAllocatedYarnResource( @Override public List requestAppInfo( ExternalResourceIdentifier identifier, ExternalResourceProvider provider) { - String rmWebHaAddress = (String) provider.getConfigMap().get("rmWebAddress"); - String rmWebAddress = getAndUpdateActiveRmWebAddress(rmWebHaAddress); - String queueName = ((YarnResourceIdentifier) identifier).getQueueName(); + String rmWebAddress = getAndUpdateActiveRmWebAddress(provider); + + String queueName = ((YarnResourceIdentifier) identifier).getQueueName(); String realQueueName = "root." + queueName; - JsonNode resp = getResponseByUrl("apps", rmWebAddress).path("apps").path("app"); + JsonNode resp = getResponseByUrl("apps", rmWebAddress, provider).path("apps").path("app"); if (resp.isMissingNode()) { return new ArrayList<>(); } @@ -317,22 +319,24 @@ public ResourceType getResourceType() { return ResourceType.Yarn; } - private JsonNode getResponseByUrl(String url, String rmWebAddress) { + private JsonNode getResponseByUrl( + String url, String rmWebAddress, ExternalResourceProvider provider) { + HttpGet httpGet = new HttpGet(rmWebAddress + "/ws/v1/cluster/" + url); httpGet.addHeader("Accept", "application/json"); - Object authorEnable = this.provider.getConfigMap().get("authorEnable"); + Object authorEnable = provider.getConfigMap().get("authorEnable"); HttpResponse httpResponse = null; if (authorEnable instanceof Boolean) { if ((Boolean) authorEnable) { - httpGet.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + getAuthorizationStr()); + httpGet.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + getAuthorizationStr(provider)); } } - Object kerberosEnable = this.provider.getConfigMap().get("kerberosEnable"); + Object kerberosEnable = provider.getConfigMap().get("kerberosEnable"); if (kerberosEnable instanceof Boolean) { if ((Boolean) kerberosEnable) { - String principalName = (String) this.provider.getConfigMap().get("principalName"); - String keytabPath = (String) this.provider.getConfigMap().get("keytabPath"); - String krb5Path = (String) this.provider.getConfigMap().get("krb5Path"); + String principalName = (String) provider.getConfigMap().get("principalName"); + String keytabPath = (String) provider.getConfigMap().get("keytabPath"); + String krb5Path = (String) provider.getConfigMap().get("krb5Path"); if (StringUtils.isNotBlank(krb5Path)) { logger.warn( "krb5Path: {} has been specified, but not allow to be set to avoid conflict", @@ -388,8 +392,9 @@ private JsonNode getResponseByUrl(String url, String rmWebAddress) { return jsonNode; } - public String getAndUpdateActiveRmWebAddress(String haAddress) { + public String getAndUpdateActiveRmWebAddress(ExternalResourceProvider provider) { // todo check if it will stuck for many requests + String haAddress = (String) provider.getConfigMap().get("rmWebAddress"); String activeAddress = rmAddressMap.get(haAddress); if (StringUtils.isBlank(activeAddress)) { synchronized (haAddress.intern()) { @@ -406,7 +411,7 @@ public String getAndUpdateActiveRmWebAddress(String haAddress) { haAddress.split(RMConfiguration.DEFAULT_YARN_RM_WEB_ADDRESS_DELIMITER.getValue()); for (String address : addresses) { try { - JsonNode response = getResponseByUrl("info", address); + JsonNode response = getResponseByUrl("info", address, provider); JsonNode haStateValue = response.path("clusterInfo").path("haState"); if (!haStateValue.isMissingNode() && haStateValue.isTextual()) { String haState = haStateValue.asText(); @@ -442,12 +447,10 @@ public String getAndUpdateActiveRmWebAddress(String haAddress) { @Override public Boolean reloadExternalResourceAddress(ExternalResourceProvider provider) { - if (null == provider) { - rmAddressMap.clear(); - } else { + if (null != provider) { String rmWebHaAddress = (String) provider.getConfigMap().get("rmWebAddress"); rmAddressMap.remove(rmWebHaAddress); - getAndUpdateActiveRmWebAddress(rmWebHaAddress); + getAndUpdateActiveRmWebAddress(provider); } return true; }