Skip to content

Commit

Permalink
Added least connection algorithm for selecting service nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
Anand Pandey committed Nov 7, 2017
1 parent 1157df8 commit 1d40cae
Show file tree
Hide file tree
Showing 11 changed files with 423 additions and 21 deletions.
23 changes: 21 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<groupId>com.flipkart.ranger</groupId>
<artifactId>ranger</artifactId>
<packaging>jar</packaging>
<version>0.3.0</version>
<version>0.3.1.16-connections</version>

<distributionManagement>
<repository>
Expand Down Expand Up @@ -44,11 +44,16 @@
<name>Tushar Naik</name>
<email>[email protected]</email>
</developer>
<developer>
<id>asp0585</id>
<name>Anand Pandey</name>
<email>[email protected]</email>
</developer>
</developers>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<curator.version>3.1.0</curator.version>
<curator.version>2.12.0</curator.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -100,11 +105,25 @@
<artifactId>guava-retrying</artifactId>
<version>2.0.0</version>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.1</version>
</dependency>

<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>1.7.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito</artifactId>
<version>1.7.3</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Copyright 2015 Flipkart Internet Pvt. Ltd.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.flipkart.ranger.finder;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

public class ActiveConnectionMetrics {

private static final Logger log = LoggerFactory.getLogger(ActiveConnectionMetrics.class);

// this map stores the number of active connections to various replicas at any point of time
private static final Map<ConnectionRequest, AtomicLong> activeConnections = Maps.newConcurrentMap();

public static void incrementConnectionCount(ConnectionRequest connectionRequest, String requestId) {
Preconditions.checkNotNull(connectionRequest, "connectionRequestCannot be null");
if (!activeConnections.containsKey(connectionRequest)) {
activeConnections.put(connectionRequest, new AtomicLong(1));
return;
}
log.debug("RequestId = {}, INCREMENTED connection count for {} = {}", requestId, connectionRequest,
activeConnections.get(connectionRequest).incrementAndGet());
}

public static void decrementConnectionCount(ConnectionRequest connectionRequest, String requestId) {
Preconditions.checkNotNull(connectionRequest, "connectionRequestCannot be null");
if (!activeConnections.containsKey(connectionRequest)) {
String errorMsg = "connectionRequest " + connectionRequest + " should be already present in activeConnections map";
log.error(errorMsg);
return;
}
log.debug("RequestId = {}, DECREMENTED connection count for {} = {}", requestId, connectionRequest,
activeConnections.get(connectionRequest).decrementAndGet());
}

public static Map<ConnectionRequest, AtomicLong> getActiveConnections() {
return ImmutableMap.copyOf(activeConnections);
}
}
68 changes: 68 additions & 0 deletions src/main/java/com/flipkart/ranger/finder/ConnectionRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Copyright 2015 Flipkart Internet Pvt. Ltd.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.flipkart.ranger.finder;


public class ConnectionRequest {

private final String shardName;
private final String replicaHostName;
private final int port;

public ConnectionRequest(String shardName, String replicaHostName, int port) {
this.shardName = shardName;
this.replicaHostName = replicaHostName;
this.port = port;
}

public String getShardName() {
return shardName;
}

public String getReplicaHostName() {
return replicaHostName;
}

public int getPort() {
return port;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

ConnectionRequest that = (ConnectionRequest) o;

if (getPort() != that.getPort()) return false;
if (!getShardName().equals(that.getShardName())) return false;
return getReplicaHostName().equals(that.getReplicaHostName());
}

@Override
public int hashCode() {
int result = getShardName().hashCode();
result = 31 * result + getReplicaHostName().hashCode();
result = 31 * result + getPort();
return result;
}

@Override
public String toString() {
return shardName + ":" + replicaHostName + ":" + port;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/**
* Copyright 2015 Flipkart Internet Pvt. Ltd.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.flipkart.ranger.finder;

import com.flipkart.ranger.model.ServiceNode;
import com.flipkart.ranger.model.ServiceNodeSelector;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;

/**
* This class returns the service node having least number of active connections currently assigned to it.
* It also takes care that nodes do not suffer from starvation.
*/
public class LeastConnectionServiceNodeSelector<T> implements ServiceNodeSelector<T> {

private static final Logger log = LoggerFactory.getLogger(LeastConnectionServiceNodeSelector.class.getSimpleName());

/**
* ActiveNode is a ServiceNode to which at least one request has been sent in the past along with its current connection count
*/
private static class ActiveNode<T> implements Comparable {

private ServiceNode<T> serviceNode;
private AtomicLong connectionCount;

ActiveNode(ServiceNode<T> serviceNode, AtomicLong connectionCount) {
this.serviceNode = serviceNode;
this.connectionCount = connectionCount;
}

AtomicLong getConnectionCount() {
return connectionCount;
}

ServiceNode<T> getServiceNode() {
return serviceNode;
}

@Override
public int compareTo(Object o) {
if (connectionCount.get() < ((ActiveNode) o).getConnectionCount().get()) {
return -1;
}
return 1;
}

public String toString() {
return serviceNode.getHost() + ":" + serviceNode.getPort();
}
}

@Override
public ServiceNode<T> select(List<ServiceNode<T>> serviceNodes) {
Map<ConnectionRequest, AtomicLong> activeConnections = ActiveConnectionMetrics.getActiveConnections();
String correlationId = UUID.randomUUID().toString();
log.debug("CorrelationId = {}, Active Connections {} = {}", correlationId, activeConnections);
final List<ActiveNode<T>> activeNodes = Lists.newArrayList();
for (ConnectionRequest connectionRequest : activeConnections.keySet()) {
for (ServiceNode<T> serviceNode : serviceNodes) {
if (serviceNode.getHost().equals(connectionRequest.getReplicaHostName()) &&
serviceNode.getPort() == connectionRequest.getPort()) {
ActiveNode<T> activeNode = new ActiveNode<>(serviceNode, activeConnections.get(connectionRequest));
activeNodes.add(activeNode);
}
}
}

// List of those service nodes to which request has never been sent.
List<ServiceNode<T>> newServiceNodes = Lists.newLinkedList();
for (ServiceNode<T> serviceNode : serviceNodes) {
boolean found = false;
for (ActiveNode<T> activeNode : activeNodes) {
if (activeNode.getServiceNode().equals(serviceNode)) {
found = true;
break;
}
}
if (!found) {
newServiceNodes.add(serviceNode);
}
}
log.debug("CorrelationId = {}, ActiveNodes = {}, ServiceNodes = {}", correlationId, activeNodes, serviceNodes);
log.debug("CorrelationId={}, ActiveNodesCount = {}, ServiceNodesCount = {}, NewNodesCount = {}",
correlationId, activeNodes.size(), serviceNodes.size(), newServiceNodes.size());
/*
If there are a few service nodes to which connection request has never been sent in the past, we should first try
to pick amongst those nodes before kicking off Least connection algorithm. This will ensure starvation of those nodes never happens.
*/
if (!newServiceNodes.isEmpty()) {
ServiceNode<T> serviceNode = newServiceNodes.get(ThreadLocalRandom.current().nextInt(newServiceNodes.size()));
log.info("CorrelationId = {}, Randomly selected serviceNode = {}", correlationId, serviceNode);
return serviceNode;
}
Collections.sort(activeNodes);
ServiceNode<T> serviceNode = activeNodes.get(0).getServiceNode();
log.info("CorrelationId = {}. Selected Node with Least connection = {}", correlationId, serviceNode);
return serviceNode;
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2015 Flipkart Internet Pvt. Ltd.
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -20,19 +20,20 @@
import com.flipkart.ranger.model.ServiceNodeSelector;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class RoundRobinServiceNodeSelector<T> implements ServiceNodeSelector<T> {
private static final ThreadLocal<Integer> index =
new ThreadLocal<Integer>() {
@Override protected Integer initialValue() {
@Override
protected Integer initialValue() {
return 0;
}
};

@Override
public ServiceNode<T> select(List<ServiceNode<T>> serviceNodes) {
ServiceNode<T> serviceNode = serviceNodes.get(index.get() % serviceNodes.size());
index.set((index.get() + 1) % serviceNodes.size());
return serviceNodes.get(index.get());
return serviceNode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
import com.flipkart.ranger.finder.Service;
import com.flipkart.ranger.model.Deserializer;
import com.flipkart.ranger.model.ServiceNode;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.*;

import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.flipkart.ranger.finder.Service;
import com.flipkart.ranger.model.Deserializer;
import com.flipkart.ranger.model.ServiceNode;
import com.google.common.collect.ImmutableList;

import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -41,6 +40,6 @@ public List<ServiceNode<UnshardedClusterInfo>> nodes() {

@Override
public void nodes(List<ServiceNode<UnshardedClusterInfo>> serviceNodes) {
nodes.set(ImmutableList.copyOf(serviceNodes));
nodes.set(serviceNodes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@

public enum HealthcheckStatus {
healthy,
unhealthy
unhealthy,
down
}
Loading

0 comments on commit 1d40cae

Please sign in to comment.