diff --git a/src/main/java/com/flipkart/ranger/finder/ActiveConnectionMetrics.java b/src/main/java/com/flipkart/ranger/finder/ActiveConnectionMetrics.java
new file mode 100644
index 00000000..fb210d90
--- /dev/null
+++ b/src/main/java/com/flipkart/ranger/finder/ActiveConnectionMetrics.java
@@ -0,0 +1,59 @@
+/**
+ * Copyright 2015 Flipkart Internet Pvt. Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.flipkart.ranger.finder;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ActiveConnectionMetrics {
+
+ private static final Logger log = LoggerFactory.getLogger(ActiveConnectionMetrics.class);
+
+ // this map stores the number of active connections to various replicas at any point of time
+ private static final Map activeConnections = Maps.newConcurrentMap();
+
+ public static void incrementConnectionCount(ConnectionRequest connectionRequest, String requestId) {
+ Preconditions.checkNotNull(connectionRequest, "connectionRequestCannot be null");
+ if (!activeConnections.containsKey(connectionRequest)) {
+ activeConnections.put(connectionRequest, new AtomicLong(1));
+ return;
+ }
+ log.debug("RequestId = {}, INCREMENTED connection count for {} = {}", requestId, connectionRequest,
+ activeConnections.get(connectionRequest).incrementAndGet());
+ }
+
+ public static void decrementConnectionCount(ConnectionRequest connectionRequest, String requestId) {
+ Preconditions.checkNotNull(connectionRequest, "connectionRequestCannot be null");
+ if (!activeConnections.containsKey(connectionRequest)) {
+ String errorMsg = "connectionRequest " + connectionRequest + " should be already present in activeConnections map";
+ log.error(errorMsg);
+ return;
+ }
+ log.debug("RequestId = {}, DECREMENTED connection count for {} = {}", requestId, connectionRequest,
+ activeConnections.get(connectionRequest).decrementAndGet());
+ }
+
+ public static Map getActiveConnections() {
+ return ImmutableMap.copyOf(activeConnections);
+ }
+}
diff --git a/src/main/java/com/flipkart/ranger/finder/ConnectionRequest.java b/src/main/java/com/flipkart/ranger/finder/ConnectionRequest.java
new file mode 100644
index 00000000..805cd9eb
--- /dev/null
+++ b/src/main/java/com/flipkart/ranger/finder/ConnectionRequest.java
@@ -0,0 +1,68 @@
+/**
+ * Copyright 2015 Flipkart Internet Pvt. Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.flipkart.ranger.finder;
+
+
+public class ConnectionRequest {
+
+ private final String shardName;
+ private final String replicaHostName;
+ private final int port;
+
+ public ConnectionRequest(String shardName, String replicaHostName, int port) {
+ this.shardName = shardName;
+ this.replicaHostName = replicaHostName;
+ this.port = port;
+ }
+
+ public String getShardName() {
+ return shardName;
+ }
+
+ public String getReplicaHostName() {
+ return replicaHostName;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ConnectionRequest that = (ConnectionRequest) o;
+
+ if (getPort() != that.getPort()) return false;
+ if (!getShardName().equals(that.getShardName())) return false;
+ return getReplicaHostName().equals(that.getReplicaHostName());
+ }
+
+ @Override
+ public int hashCode() {
+ int result = getShardName().hashCode();
+ result = 31 * result + getReplicaHostName().hashCode();
+ result = 31 * result + getPort();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return shardName + ":" + replicaHostName + ":" + port;
+ }
+}
diff --git a/src/main/java/com/flipkart/ranger/finder/LeastConnectionServiceNodeSelector.java b/src/main/java/com/flipkart/ranger/finder/LeastConnectionServiceNodeSelector.java
new file mode 100644
index 00000000..5f1b0dbf
--- /dev/null
+++ b/src/main/java/com/flipkart/ranger/finder/LeastConnectionServiceNodeSelector.java
@@ -0,0 +1,120 @@
+/**
+ * Copyright 2015 Flipkart Internet Pvt. Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.flipkart.ranger.finder;
+
+import com.flipkart.ranger.model.ServiceNode;
+import com.flipkart.ranger.model.ServiceNodeSelector;
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This class returns the service node having least number of active connections currently assigned to it.
+ * It also takes care that nodes do not suffer from starvation.
+ */
+public class LeastConnectionServiceNodeSelector implements ServiceNodeSelector {
+
+ private static final Logger log = LoggerFactory.getLogger(LeastConnectionServiceNodeSelector.class.getSimpleName());
+
+ /**
+ * ActiveNode is a ServiceNode to which at least one request has been sent in the past along with its current connection count
+ */
+ private static class ActiveNode implements Comparable {
+
+ private ServiceNode serviceNode;
+ private AtomicLong connectionCount;
+
+ ActiveNode(ServiceNode serviceNode, AtomicLong connectionCount) {
+ this.serviceNode = serviceNode;
+ this.connectionCount = connectionCount;
+ }
+
+ AtomicLong getConnectionCount() {
+ return connectionCount;
+ }
+
+ ServiceNode getServiceNode() {
+ return serviceNode;
+ }
+
+ @Override
+ public int compareTo(Object o) {
+ if (connectionCount.get() < ((ActiveNode) o).getConnectionCount().get()) {
+ return -1;
+ }
+ return 1;
+ }
+
+ public String toString() {
+ return serviceNode.getHost() + ":" + serviceNode.getPort();
+ }
+ }
+
+ @Override
+ public ServiceNode select(List> serviceNodes) {
+ Map activeConnections = ActiveConnectionMetrics.getActiveConnections();
+ String correlationId = UUID.randomUUID().toString();
+ log.debug("CorrelationId = {}, Active Connections {} = {}", correlationId, activeConnections);
+ final List> activeNodes = Lists.newArrayList();
+ for (ConnectionRequest connectionRequest : activeConnections.keySet()) {
+ for (ServiceNode serviceNode : serviceNodes) {
+ if (serviceNode.getHost().equals(connectionRequest.getReplicaHostName()) &&
+ serviceNode.getPort() == connectionRequest.getPort()) {
+ ActiveNode activeNode = new ActiveNode<>(serviceNode, activeConnections.get(connectionRequest));
+ activeNodes.add(activeNode);
+ }
+ }
+ }
+
+ // List of those service nodes to which request has never been sent.
+ List> newServiceNodes = Lists.newLinkedList();
+ for (ServiceNode serviceNode : serviceNodes) {
+ boolean found = false;
+ for (ActiveNode activeNode : activeNodes) {
+ if (activeNode.getServiceNode().equals(serviceNode)) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ newServiceNodes.add(serviceNode);
+ }
+ }
+ log.debug("CorrelationId = {}, ActiveNodes = {}, ServiceNodes = {}", correlationId, activeNodes, serviceNodes);
+ log.debug("CorrelationId={}, ActiveNodesCount = {}, ServiceNodesCount = {}, NewNodesCount = {}",
+ correlationId, activeNodes.size(), serviceNodes.size(), newServiceNodes.size());
+ /*
+ If there are a few service nodes to which connection request has never been sent in the past, we should first try
+ to pick amongst those nodes before kicking off Least connection algorithm. This will ensure starvation of those nodes never happens.
+ */
+ if (!newServiceNodes.isEmpty()) {
+ ServiceNode serviceNode = newServiceNodes.get(ThreadLocalRandom.current().nextInt(newServiceNodes.size()));
+ log.info("CorrelationId = {}, Randomly selected serviceNode = {}", correlationId, serviceNode);
+ return serviceNode;
+ }
+ Collections.sort(activeNodes);
+ ServiceNode serviceNode = activeNodes.get(0).getServiceNode();
+ log.info("CorrelationId = {}. Selected Node with Least connection = {}", correlationId, serviceNode);
+ return serviceNode;
+ }
+}
diff --git a/src/main/java/com/flipkart/ranger/finder/RoundRobinServiceNodeSelector.java b/src/main/java/com/flipkart/ranger/finder/RoundRobinServiceNodeSelector.java
index 9e0d7fac..c2bf32b4 100644
--- a/src/main/java/com/flipkart/ranger/finder/RoundRobinServiceNodeSelector.java
+++ b/src/main/java/com/flipkart/ranger/finder/RoundRobinServiceNodeSelector.java
@@ -1,12 +1,12 @@
/**
* Copyright 2015 Flipkart Internet Pvt. Ltd.
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,19 +20,20 @@
import com.flipkart.ranger.model.ServiceNodeSelector;
import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
public class RoundRobinServiceNodeSelector implements ServiceNodeSelector {
private static final ThreadLocal index =
new ThreadLocal() {
- @Override protected Integer initialValue() {
+ @Override
+ protected Integer initialValue() {
return 0;
}
};
@Override
public ServiceNode select(List> serviceNodes) {
+ ServiceNode serviceNode = serviceNodes.get(index.get() % serviceNodes.size());
index.set((index.get() + 1) % serviceNodes.size());
- return serviceNodes.get(index.get());
+ return serviceNode;
}
}
diff --git a/src/main/java/com/flipkart/ranger/finder/sharded/MapBasedServiceRegistry.java b/src/main/java/com/flipkart/ranger/finder/sharded/MapBasedServiceRegistry.java
index b544f372..452ef1d0 100644
--- a/src/main/java/com/flipkart/ranger/finder/sharded/MapBasedServiceRegistry.java
+++ b/src/main/java/com/flipkart/ranger/finder/sharded/MapBasedServiceRegistry.java
@@ -20,9 +20,7 @@
import com.flipkart.ranger.finder.Service;
import com.flipkart.ranger.model.Deserializer;
import com.flipkart.ranger.model.ServiceNode;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ImmutableListMultimap;
-import com.google.common.collect.ListMultimap;
+import com.google.common.collect.*;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
@@ -35,7 +33,7 @@ public MapBasedServiceRegistry(Service service, Deserializer deserializer, in
}
public ListMultimap> nodes() {
- return nodes.get();
+ return ImmutableListMultimap.copyOf(nodes.get());
}
@Override
diff --git a/src/main/java/com/flipkart/ranger/finder/unsharded/UnshardedClusterServiceRegistry.java b/src/main/java/com/flipkart/ranger/finder/unsharded/UnshardedClusterServiceRegistry.java
index 438e2785..cf9c06e8 100644
--- a/src/main/java/com/flipkart/ranger/finder/unsharded/UnshardedClusterServiceRegistry.java
+++ b/src/main/java/com/flipkart/ranger/finder/unsharded/UnshardedClusterServiceRegistry.java
@@ -36,7 +36,7 @@ protected UnshardedClusterServiceRegistry(Service service,
}
public List> nodes() {
- return nodes.get();
+ return ImmutableList.copyOf(nodes.get());
}
@Override
diff --git a/src/main/java/com/flipkart/ranger/healthcheck/HealthcheckStatus.java b/src/main/java/com/flipkart/ranger/healthcheck/HealthcheckStatus.java
index 56e433c0..9245915e 100644
--- a/src/main/java/com/flipkart/ranger/healthcheck/HealthcheckStatus.java
+++ b/src/main/java/com/flipkart/ranger/healthcheck/HealthcheckStatus.java
@@ -18,5 +18,6 @@
public enum HealthcheckStatus {
healthy,
- unhealthy
+ unhealthy,
+ down
}
diff --git a/src/main/java/com/flipkart/ranger/model/ServiceNode.java b/src/main/java/com/flipkart/ranger/model/ServiceNode.java
index b9afdc50..e911614a 100644
--- a/src/main/java/com/flipkart/ranger/model/ServiceNode.java
+++ b/src/main/java/com/flipkart/ranger/model/ServiceNode.java
@@ -22,6 +22,7 @@ public class ServiceNode {
private String host;
private int port;
private T nodeData;
+
private HealthcheckStatus healthcheckStatus = HealthcheckStatus.healthy;
private long lastUpdatedTimeStamp = Long.MIN_VALUE;
@@ -77,4 +78,32 @@ public long getLastUpdatedTimeStamp() {
public void setLastUpdatedTimeStamp(long lastUpdatedTimeStamp) {
this.lastUpdatedTimeStamp = lastUpdatedTimeStamp;
}
+
+ @Override
+ public String toString() {
+ return "ServiceNode{" +
+ "host='" + host + '\'' +
+ ", port=" + port +
+ ", nodeData=" + nodeData +
+ ", healthcheckStatus=" + healthcheckStatus +
+ ", lastUpdatedTimeStamp=" + lastUpdatedTimeStamp +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ServiceNode> that = (ServiceNode>) o;
+ if (getPort() != that.getPort()) return false;
+ return getHost().equals(that.getHost());
+ }
+
+ @Override
+ public int hashCode() {
+ int result = getHost().hashCode();
+ result = 31 * result + getPort();
+ return result;
+ }
+
}
diff --git a/src/test/java/com/flipkart/ranger/finder/LeastConnectionServiceNodeSelectorTest.java b/src/test/java/com/flipkart/ranger/finder/LeastConnectionServiceNodeSelectorTest.java
new file mode 100644
index 00000000..dd4de426
--- /dev/null
+++ b/src/test/java/com/flipkart/ranger/finder/LeastConnectionServiceNodeSelectorTest.java
@@ -0,0 +1,102 @@
+package com.flipkart.ranger.finder;
+
+import com.flipkart.ranger.model.ServiceNode;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.MockitoAnnotations;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static junit.framework.TestCase.assertEquals;
+import static junit.framework.TestCase.assertNotNull;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ActiveConnectionMetrics.class)
+public class LeastConnectionServiceNodeSelectorTest {
+
+ private List> serviceNodes;
+ private LeastConnectionServiceNodeSelector serviceNodeSelector;
+ private Map emptyMap = Maps.newConcurrentMap();
+
+ @Before
+ public void setUp() throws Exception {
+ mockStatic(ActiveConnectionMetrics.class);
+ MockitoAnnotations.initMocks(this);
+ serviceNodes = getServiceNodes();
+ serviceNodeSelector = new LeastConnectionServiceNodeSelector<>();
+ }
+
+ private List> getServiceNodes() {
+ ServiceNode a = new ServiceNode<>("10.85.23.1", 28231, "Node A");
+ ServiceNode b = new ServiceNode<>("10.85.24.1", 28241, "Node B");
+ ServiceNode c = new ServiceNode<>("10.85.25.1", 28251, "Node C");
+ return Lists.newArrayList(a, b, c);
+ }
+
+ private Map getActiveConnections() {
+ Map activeConnections = Maps.newConcurrentMap();
+ ConnectionRequest c1 = new ConnectionRequest("electronicsLow-shard3", "10.85.23.1", 28231);
+ ConnectionRequest c2 = new ConnectionRequest("electronicsLow-shard3", "10.85.24.1", 28241);
+ ConnectionRequest c3 = new ConnectionRequest("electronicsLow-shard3", "10.85.25.1", 28251);
+ activeConnections.put(c1, new AtomicLong(10));
+ activeConnections.put(c2, new AtomicLong(5));
+ activeConnections.put(c3, new AtomicLong(15));
+
+ ConnectionRequest c4 = new ConnectionRequest("lifestyleCore-shard1", "10.85.23.2", 28232);
+ ConnectionRequest c5 = new ConnectionRequest("lifestyleCore-shard1", "10.85.24.2", 28242);
+ ConnectionRequest c6 = new ConnectionRequest("lifestyleCore-shard1", "10.85.25.2", 28252);
+ activeConnections.put(c4, new AtomicLong(23));
+ activeConnections.put(c5, new AtomicLong(26));
+ activeConnections.put(c6, new AtomicLong(10));
+
+ ConnectionRequest c7 = new ConnectionRequest("booksCore-shard4", "10.85.23.3", 28233);
+ ConnectionRequest c8 = new ConnectionRequest("lifestyleCore-shard5", "10.85.24.3", 28243);
+ ConnectionRequest c9 = new ConnectionRequest("electronicsHigh-shard4", "10.85.25.3", 28253);
+ activeConnections.put(c7, new AtomicLong(11));
+ activeConnections.put(c8, new AtomicLong(3));
+ activeConnections.put(c9, new AtomicLong(6));
+
+ return activeConnections;
+ }
+
+ /**
+ * Case: This test case simulates cases when activeConnections map has only one entry for respective service node
+ * and there as multiple service nodes to select from.
+ * Output: Nodes should be selected using Random algorithm.
+ */
+ @Test
+ public void selectServiceNodeWhenOnlyOneActiveConnection() throws Exception {
+ Map activeConnections = Maps.newConcurrentMap();
+ ConnectionRequest c1 = new ConnectionRequest("electronicsLow-shard3", "10.85.25.1", 28251);
+ activeConnections.put(c1, new AtomicLong(10));
+
+ when(ActiveConnectionMetrics.getActiveConnections()).thenReturn(activeConnections, activeConnections, activeConnections,
+ activeConnections);
+
+ ServiceNode serviceNode;
+ serviceNode = serviceNodeSelector.select(serviceNodes);
+ assertNotNull(serviceNode);
+ }
+
+ @Test
+ /**
+ * Output: Service node having least number of connections.
+ */
+ public void selectLeastConnectionServiceNode() throws Exception {
+ when(ActiveConnectionMetrics.getActiveConnections()).thenReturn(getActiveConnections());
+ ServiceNode serviceNode = serviceNodeSelector.select(serviceNodes);
+ assertEquals(28241, serviceNode.getPort());
+ assertEquals("10.85.24.1", serviceNode.getHost());
+ }
+
+
+}
\ No newline at end of file
diff --git a/src/test/java/com/flipkart/ranger/model/ServiceProviderTest.java b/src/test/java/com/flipkart/ranger/model/ServiceProviderTest.java
index cf4cc7d9..30127759 100644
--- a/src/test/java/com/flipkart/ranger/model/ServiceProviderTest.java
+++ b/src/test/java/com/flipkart/ranger/model/ServiceProviderTest.java
@@ -150,7 +150,7 @@ public ServiceNode deserialize(byte[] data) {
@Test
public void testBasicDiscoveryRR() throws Exception {
- SimpleShardedServiceFinder serviceFinder = ServiceFinderBuilders.shardedFinderBuilder()
+ final SimpleShardedServiceFinder serviceFinder = ServiceFinderBuilders.shardedFinderBuilder()
.withConnectionString(testingCluster.getConnectString())
.withNamespace("test")
.withServiceName("test-service")
@@ -183,12 +183,18 @@ public ServiceNode deserialize(byte[] data) {
System.out.println(node.getHost());
}
long startTime = System.currentTimeMillis();
- for(long i = 0; i <1000000; i++)
- {
- ServiceNode node = serviceFinder.get(new TestShardInfo(2));
- Assert.assertNotNull(node);
- Assert.assertEquals(2, node.getNodeData().getShardId());
- }
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ for(long i = 0; i <1000000; i++) {
+ ServiceNode node = serviceFinder.get(new TestShardInfo(2));
+ Assert.assertNotNull(node);
+ Assert.assertEquals(2, node.getNodeData().getShardId());
+ }
+ }
+ };
+ Thread t = new Thread(runnable);
+ t.start();
logger.info("PERF::RoundRobinSelector::Took (ms):" + (System.currentTimeMillis() - startTime));
{
ServiceNode node = serviceFinder.get(new TestShardInfo(99));