Skip to content

Commit

Permalink
feat: YarnResourceRequester supports multiple providers (#4852)
Browse files Browse the repository at this point in the history
* feat: YarnResourceRequester supports multiple providers

* fix: format code according to scala style
  • Loading branch information
lenoxzhao authored Aug 5, 2023
1 parent 5e50196 commit bbcb518
Showing 1 changed file with 33 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,9 @@ public class YarnResourceRequester implements ExternalResourceRequester {

private final String HASTATE_ACTIVE = "active";
private static final ObjectMapper objectMapper = new ObjectMapper();

private ExternalResourceProvider provider = null;
private final Map<String, String> rmAddressMap = new ConcurrentHashMap<>();

private String getAuthorizationStr() {
private String getAuthorizationStr(ExternalResourceProvider provider) {
String user = (String) provider.getConfigMap().getOrDefault("user", "");
String pwd = (String) provider.getConfigMap().getOrDefault("pwd", "");
String authKey = user + ":" + pwd;
Expand All @@ -70,18 +68,16 @@ private String getAuthorizationStr() {
@Override
public NodeResource requestResourceInfo(
ExternalResourceIdentifier identifier, ExternalResourceProvider provider) {
String rmWebHaAddress = (String) provider.getConfigMap().get("rmWebAddress");
this.provider = provider;
String rmWebAddress = getAndUpdateActiveRmWebAddress(rmWebHaAddress);
String rmWebAddress = getAndUpdateActiveRmWebAddress(provider);
logger.info("rmWebAddress: " + rmWebAddress);
String queueName = ((YarnResourceIdentifier) identifier).getQueueName();

String queueName = ((YarnResourceIdentifier) identifier).getQueueName();
String realQueueName = "root." + queueName;

return LinkisUtils.tryCatch(
() -> {
Pair<YarnResource, YarnResource> yarnResource =
getResources(rmWebAddress, realQueueName, queueName);
getResources(rmWebAddress, realQueueName, queueName, provider);

CommonNodeResource nodeResource = new CommonNodeResource();
nodeResource.setMaxResource(yarnResource.getKey());
Expand All @@ -95,9 +91,12 @@ public NodeResource requestResourceInfo(
}

public Optional<YarnResource> maxEffectiveHandle(
Optional<JsonNode> queueValue, String rmWebAddress, String queueName) {
Optional<JsonNode> queueValue,
String rmWebAddress,
String queueName,
ExternalResourceProvider provider) {
try {
JsonNode metrics = getResponseByUrl("metrics", rmWebAddress);
JsonNode metrics = getResponseByUrl("metrics", rmWebAddress, provider);
JsonNode clusterMetrics = metrics.path("clusterMetrics");
long totalMemory = clusterMetrics.path("totalMB").asLong();
long totalCores = clusterMetrics.path("totalVirtualCores").asLong();
Expand Down Expand Up @@ -201,8 +200,11 @@ static JsonNode getChildQueuesOfCapacity(JsonNode resp) {
}

public Pair<YarnResource, YarnResource> getResources(
String rmWebAddress, String realQueueName, String queueName) {
JsonNode resp = getResponseByUrl("scheduler", rmWebAddress);
String rmWebAddress,
String realQueueName,
String queueName,
ExternalResourceProvider provider) {
JsonNode resp = getResponseByUrl("scheduler", rmWebAddress, provider);
JsonNode schedulerInfo = resp.path("scheduler").path("schedulerInfo");
String schedulerType = schedulerInfo.path("type").asText();
if ("capacityScheduler".equals(schedulerType)) {
Expand All @@ -217,7 +219,7 @@ public Pair<YarnResource, YarnResource> getResources(
MessageFormat.format(YARN_NOT_EXISTS_QUEUE.getErrorDesc(), queueName));
}
return Pair.of(
maxEffectiveHandle(queue, rmWebAddress, queueName).get(),
maxEffectiveHandle(queue, rmWebAddress, queueName, provider).get(),
getYarnResource(queue.map(node -> node.path("resourcesUsed")), queueName).get());

} else if ("fairScheduler".equals(schedulerType)) {
Expand Down Expand Up @@ -277,13 +279,13 @@ public static Optional<YarnResource> getAllocatedYarnResource(
@Override
public List<ExternalAppInfo> requestAppInfo(
ExternalResourceIdentifier identifier, ExternalResourceProvider provider) {
String rmWebHaAddress = (String) provider.getConfigMap().get("rmWebAddress");
String rmWebAddress = getAndUpdateActiveRmWebAddress(rmWebHaAddress);
String queueName = ((YarnResourceIdentifier) identifier).getQueueName();

String rmWebAddress = getAndUpdateActiveRmWebAddress(provider);

String queueName = ((YarnResourceIdentifier) identifier).getQueueName();
String realQueueName = "root." + queueName;

JsonNode resp = getResponseByUrl("apps", rmWebAddress).path("apps").path("app");
JsonNode resp = getResponseByUrl("apps", rmWebAddress, provider).path("apps").path("app");
if (resp.isMissingNode()) {
return new ArrayList<>();
}
Expand Down Expand Up @@ -317,22 +319,24 @@ public ResourceType getResourceType() {
return ResourceType.Yarn;
}

private JsonNode getResponseByUrl(String url, String rmWebAddress) {
private JsonNode getResponseByUrl(
String url, String rmWebAddress, ExternalResourceProvider provider) {

HttpGet httpGet = new HttpGet(rmWebAddress + "/ws/v1/cluster/" + url);
httpGet.addHeader("Accept", "application/json");
Object authorEnable = this.provider.getConfigMap().get("authorEnable");
Object authorEnable = provider.getConfigMap().get("authorEnable");
HttpResponse httpResponse = null;
if (authorEnable instanceof Boolean) {
if ((Boolean) authorEnable) {
httpGet.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + getAuthorizationStr());
httpGet.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + getAuthorizationStr(provider));
}
}
Object kerberosEnable = this.provider.getConfigMap().get("kerberosEnable");
Object kerberosEnable = provider.getConfigMap().get("kerberosEnable");
if (kerberosEnable instanceof Boolean) {
if ((Boolean) kerberosEnable) {
String principalName = (String) this.provider.getConfigMap().get("principalName");
String keytabPath = (String) this.provider.getConfigMap().get("keytabPath");
String krb5Path = (String) this.provider.getConfigMap().get("krb5Path");
String principalName = (String) provider.getConfigMap().get("principalName");
String keytabPath = (String) provider.getConfigMap().get("keytabPath");
String krb5Path = (String) provider.getConfigMap().get("krb5Path");
if (StringUtils.isNotBlank(krb5Path)) {
logger.warn(
"krb5Path: {} has been specified, but not allow to be set to avoid conflict",
Expand Down Expand Up @@ -388,8 +392,9 @@ private JsonNode getResponseByUrl(String url, String rmWebAddress) {
return jsonNode;
}

public String getAndUpdateActiveRmWebAddress(String haAddress) {
public String getAndUpdateActiveRmWebAddress(ExternalResourceProvider provider) {
// todo check if it will stuck for many requests
String haAddress = (String) provider.getConfigMap().get("rmWebAddress");
String activeAddress = rmAddressMap.get(haAddress);
if (StringUtils.isBlank(activeAddress)) {
synchronized (haAddress.intern()) {
Expand All @@ -406,7 +411,7 @@ public String getAndUpdateActiveRmWebAddress(String haAddress) {
haAddress.split(RMConfiguration.DEFAULT_YARN_RM_WEB_ADDRESS_DELIMITER.getValue());
for (String address : addresses) {
try {
JsonNode response = getResponseByUrl("info", address);
JsonNode response = getResponseByUrl("info", address, provider);
JsonNode haStateValue = response.path("clusterInfo").path("haState");
if (!haStateValue.isMissingNode() && haStateValue.isTextual()) {
String haState = haStateValue.asText();
Expand Down Expand Up @@ -442,12 +447,10 @@ public String getAndUpdateActiveRmWebAddress(String haAddress) {

@Override
public Boolean reloadExternalResourceAddress(ExternalResourceProvider provider) {
if (null == provider) {
rmAddressMap.clear();
} else {
if (null != provider) {
String rmWebHaAddress = (String) provider.getConfigMap().get("rmWebAddress");
rmAddressMap.remove(rmWebHaAddress);
getAndUpdateActiveRmWebAddress(rmWebHaAddress);
getAndUpdateActiveRmWebAddress(provider);
}
return true;
}
Expand Down

0 comments on commit bbcb518

Please sign in to comment.