-
Notifications
You must be signed in to change notification settings - Fork 21
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added least connection algorithm for selecting service nodes
- Loading branch information
Anand Pandey
committed
Nov 7, 2017
1 parent
1157df8
commit ff394c5
Showing
11 changed files
with
424 additions
and
21 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,7 +7,7 @@ | |
<groupId>com.flipkart.ranger</groupId> | ||
<artifactId>ranger</artifactId> | ||
<packaging>jar</packaging> | ||
<version>0.3.0</version> | ||
<version>0.3.1.15-connections</version> | ||
|
||
<distributionManagement> | ||
<repository> | ||
|
@@ -44,11 +44,16 @@ | |
<name>Tushar Naik</name> | ||
<email>[email protected]</email> | ||
</developer> | ||
<developer> | ||
<id>asp0585</id> | ||
<name>Anand Pandey</name> | ||
<email>[email protected]</email> | ||
</developer> | ||
</developers> | ||
|
||
<properties> | ||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | ||
<curator.version>3.1.0</curator.version> | ||
<curator.version>2.12.0</curator.version> | ||
</properties> | ||
|
||
<dependencies> | ||
|
@@ -100,11 +105,25 @@ | |
<artifactId>guava-retrying</artifactId> | ||
<version>2.0.0</version> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.httpcomponents</groupId> | ||
<artifactId>httpclient</artifactId> | ||
<version>4.5.1</version> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.powermock</groupId> | ||
<artifactId>powermock-module-junit4</artifactId> | ||
<version>1.7.3</version> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.powermock</groupId> | ||
<artifactId>powermock-api-mockito</artifactId> | ||
<version>1.7.3</version> | ||
<scope>test</scope> | ||
</dependency> | ||
</dependencies> | ||
|
||
<build> | ||
|
59 changes: 59 additions & 0 deletions
59
src/main/java/com/flipkart/ranger/finder/ActiveConnectionMetrics.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
/** | ||
* Copyright 2015 Flipkart Internet Pvt. Ltd. | ||
* <p> | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* <p> | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* <p> | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.flipkart.ranger.finder; | ||
|
||
import com.google.common.base.Preconditions; | ||
import com.google.common.collect.ImmutableMap; | ||
import com.google.common.collect.Maps; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.Map; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
|
||
public class ActiveConnectionMetrics { | ||
|
||
private static final Logger log = LoggerFactory.getLogger(ActiveConnectionMetrics.class); | ||
|
||
// this map stores the number of active connections to various replicas at any point of time | ||
private static final Map<ConnectionRequest, AtomicLong> activeConnections = Maps.newConcurrentMap(); | ||
|
||
public static void incrementConnectionCount(ConnectionRequest connectionRequest, String requestId) { | ||
Preconditions.checkNotNull(connectionRequest, "connectionRequestCannot be null"); | ||
if (!activeConnections.containsKey(connectionRequest)) { | ||
activeConnections.put(connectionRequest, new AtomicLong(1)); | ||
return; | ||
} | ||
log.debug("RequestId = {}, INCREMENTED connection count for {} = {}", requestId, connectionRequest, | ||
activeConnections.get(connectionRequest).incrementAndGet()); | ||
} | ||
|
||
public static void decrementConnectionCount(ConnectionRequest connectionRequest, String requestId) { | ||
Preconditions.checkNotNull(connectionRequest, "connectionRequestCannot be null"); | ||
if (!activeConnections.containsKey(connectionRequest)) { | ||
String errorMsg = "connectionRequest " + connectionRequest + " should be already present in activeConnections map"; | ||
log.error(errorMsg); | ||
return; | ||
} | ||
log.debug("RequestId = {}, DECREMENTED connection count for {} = {}", requestId, connectionRequest, | ||
activeConnections.get(connectionRequest).decrementAndGet()); | ||
} | ||
|
||
public static Map<ConnectionRequest, AtomicLong> getActiveConnections() { | ||
return ImmutableMap.copyOf(activeConnections); | ||
} | ||
} |
68 changes: 68 additions & 0 deletions
68
src/main/java/com/flipkart/ranger/finder/ConnectionRequest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
/** | ||
* Copyright 2015 Flipkart Internet Pvt. Ltd. | ||
* <p> | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* <p> | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* <p> | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.flipkart.ranger.finder; | ||
|
||
|
||
public class ConnectionRequest { | ||
|
||
private final String shardName; | ||
private final String replicaHostName; | ||
private final int port; | ||
|
||
public ConnectionRequest(String shardName, String replicaHostName, int port) { | ||
this.shardName = shardName; | ||
this.replicaHostName = replicaHostName; | ||
this.port = port; | ||
} | ||
|
||
public String getShardName() { | ||
return shardName; | ||
} | ||
|
||
public String getReplicaHostName() { | ||
return replicaHostName; | ||
} | ||
|
||
public int getPort() { | ||
return port; | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) return true; | ||
if (o == null || getClass() != o.getClass()) return false; | ||
|
||
ConnectionRequest that = (ConnectionRequest) o; | ||
|
||
if (getPort() != that.getPort()) return false; | ||
if (!getShardName().equals(that.getShardName())) return false; | ||
return getReplicaHostName().equals(that.getReplicaHostName()); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
int result = getShardName().hashCode(); | ||
result = 31 * result + getReplicaHostName().hashCode(); | ||
result = 31 * result + getPort(); | ||
return result; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return shardName + ":" + replicaHostName + ":" + port; | ||
} | ||
} |
120 changes: 120 additions & 0 deletions
120
src/main/java/com/flipkart/ranger/finder/LeastConnectionServiceNodeSelector.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
/** | ||
* Copyright 2015 Flipkart Internet Pvt. Ltd. | ||
* <p> | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* <p> | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* <p> | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package com.flipkart.ranger.finder; | ||
|
||
import com.flipkart.ranger.model.ServiceNode; | ||
import com.flipkart.ranger.model.ServiceNodeSelector; | ||
import com.google.common.collect.Lists; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.UUID; | ||
import java.util.concurrent.ThreadLocalRandom; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
|
||
/** | ||
* This class returns the service node having least number of active connections currently assigned to it. | ||
* It also takes care that nodes do not suffer from starvation. | ||
*/ | ||
public class LeastConnectionServiceNodeSelector<T> implements ServiceNodeSelector<T> { | ||
|
||
private static final Logger log = LoggerFactory.getLogger(LeastConnectionServiceNodeSelector.class.getSimpleName()); | ||
|
||
/** | ||
* ActiveNode is a ServiceNode to which at least one request has been sent in the past along with its current connection count | ||
*/ | ||
private static class ActiveNode<T> implements Comparable { | ||
|
||
private ServiceNode<T> serviceNode; | ||
private AtomicLong connectionCount; | ||
|
||
ActiveNode(ServiceNode<T> serviceNode, AtomicLong connectionCount) { | ||
this.serviceNode = serviceNode; | ||
this.connectionCount = connectionCount; | ||
} | ||
|
||
AtomicLong getConnectionCount() { | ||
return connectionCount; | ||
} | ||
|
||
ServiceNode<T> getServiceNode() { | ||
return serviceNode; | ||
} | ||
|
||
@Override | ||
public int compareTo(Object o) { | ||
if (connectionCount.get() < ((ActiveNode) o).getConnectionCount().get()) { | ||
return -1; | ||
} | ||
return 1; | ||
} | ||
|
||
public String toString() { | ||
return serviceNode.getHost() + ":" + serviceNode.getPort(); | ||
} | ||
} | ||
|
||
@Override | ||
public ServiceNode<T> select(List<ServiceNode<T>> serviceNodes) { | ||
Map<ConnectionRequest, AtomicLong> activeConnections = ActiveConnectionMetrics.getActiveConnections(); | ||
String correlationId = UUID.randomUUID().toString(); | ||
log.debug("CorrelationId = {}, Active Connections {} = {}", correlationId, activeConnections); | ||
final List<ActiveNode<T>> activeNodes = Lists.newArrayList(); | ||
for (ConnectionRequest connectionRequest : activeConnections.keySet()) { | ||
for (ServiceNode<T> serviceNode : serviceNodes) { | ||
if (serviceNode.getHost().equals(connectionRequest.getReplicaHostName()) && | ||
serviceNode.getPort() == connectionRequest.getPort()) { | ||
ActiveNode<T> activeNode = new ActiveNode<>(serviceNode, activeConnections.get(connectionRequest)); | ||
activeNodes.add(activeNode); | ||
} | ||
} | ||
} | ||
|
||
// List of those service nodes to which request has never been sent. | ||
List<ServiceNode<T>> newServiceNodes = Lists.newLinkedList(); | ||
for (ServiceNode<T> serviceNode : serviceNodes) { | ||
boolean found = false; | ||
for (ActiveNode<T> activeNode : activeNodes) { | ||
if (activeNode.getServiceNode().equals(serviceNode)) { | ||
found = true; | ||
break; | ||
} | ||
} | ||
if (!found) { | ||
newServiceNodes.add(serviceNode); | ||
} | ||
} | ||
log.debug("CorrelationId = {}, ActiveNodes = {}, ServiceNodes = {}", correlationId, activeNodes, serviceNodes); | ||
log.debug("CorrelationId={}, ActiveNodesCount = {}, ServiceNodesCount = {}, NewNodesCount = {}", | ||
correlationId, activeNodes.size(), serviceNodes.size(), newServiceNodes.size()); | ||
/* | ||
If there are a few service nodes to which connection request has never been sent in the past, we should first try | ||
to pick amongst those nodes before kicking off Least connection algorithm. This will ensure starvation of those nodes never happens. | ||
*/ | ||
if (!newServiceNodes.isEmpty()) { | ||
ServiceNode<T> serviceNode = newServiceNodes.get(ThreadLocalRandom.current().nextInt(newServiceNodes.size())); | ||
log.info("CorrelationId = {}, Randomly selected serviceNode = {}", correlationId, serviceNode); | ||
return serviceNode; | ||
} | ||
Collections.sort(activeNodes); | ||
ServiceNode<T> serviceNode = activeNodes.get(0).getServiceNode(); | ||
log.info("CorrelationId = {}. Selected Node with Least connection = {}", correlationId, serviceNode); | ||
return serviceNode; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,5 +18,6 @@ | |
|
||
public enum HealthcheckStatus { | ||
healthy, | ||
unhealthy | ||
unhealthy, | ||
down | ||
} |
Oops, something went wrong.