Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Http service #22

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<groupId>com.flipkart.ranger</groupId>
<artifactId>ranger</artifactId>
<packaging>jar</packaging>
<version>0.3.0</version>
<version>0.4.0</version>

<distributionManagement>
<repository>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.flipkart.ranger.finder;

import com.flipkart.ranger.model.ServiceNode;
import com.flipkart.ranger.model.ServiceRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public abstract class AbstractServiceRegistryUpdater<T> implements Callable<Void> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename this to ServiceRegistryUpdater

private static final Logger logger = LoggerFactory.getLogger(AbstractServiceRegistryUpdater.class);

public abstract void start() throws Exception;
public abstract void stop() throws Exception;

protected ServiceRegistry<T> serviceRegistry;

private Lock checkLock = new ReentrantLock();
private Condition checkCondition = checkLock.newCondition();
private boolean checkForUpdate = false;

public void setServiceRegistry(ServiceRegistry<T> serviceRegistry) {
this.serviceRegistry = serviceRegistry;
}

@Override
public Void call() throws Exception {
//Start checking for updates
while (true) {
try {
checkLock.lock();
while (!checkForUpdate) {
checkCondition.await();
}
List<ServiceNode<T>> nodes = getHealthyServiceNodes();
if(null != nodes) {
logger.debug("Setting nodelist of size: " + nodes.size());
serviceRegistry.nodes(nodes);
}
else {
logger.warn("No service shards/nodes found. We are disconnected from zookeeper. Keeping old list.");
}
checkForUpdate =false;
} finally {
checkLock.unlock();
}
}
}

//TODO: rename this method
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove todo, add documentation to explain what the method is intended to do

protected abstract List<ServiceNode<T>> getHealthyServiceNodes();

public void checkForUpdate() {
try {
checkLock.lock();
checkForUpdate = true;
checkCondition.signalAll();
} finally {
checkLock.unlock();
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,53 @@

package com.flipkart.ranger.finder;

import com.flipkart.ranger.model.Deserializer;
import com.flipkart.ranger.model.ServiceNodeSelector;
import com.flipkart.ranger.model.ServiceRegistry;
import com.flipkart.ranger.model.ShardSelector;
import com.flipkart.ranger.model.*;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

public abstract class BaseServiceFinderBuilder<T, RegistryType extends ServiceRegistry<T>, FinderType extends ServiceFinder<T, RegistryType>> {
private String namespace;
private String serviceName;
private CuratorFramework curatorFramework;
private String connectionString;
private CuratorSourceConfig curatorConfig;
private HttpSourceConfig httpConfig;

private int healthcheckRefreshTimeMillis;
private Deserializer<T> deserializer;
private ShardSelector<T, RegistryType> shardSelector;
private ServiceNodeSelector<T> nodeSelector = new RandomServiceNodeSelector<T>();

public BaseServiceFinderBuilder<T, RegistryType, FinderType> withNamespace(final String namespace) {
this.namespace = namespace;
public BaseServiceFinderBuilder<T, RegistryType, FinderType> withHttpSourceConfig(final String host, final Integer port, final String path) {
Preconditions.checkNotNull(host);
Preconditions.checkNotNull(port);
Preconditions.checkNotNull(path);
httpConfig = new HttpSourceConfig(host, port, path);
return this;
}

public BaseServiceFinderBuilder<T, RegistryType, FinderType> withServiceName(final String serviceName) {
this.serviceName = serviceName;
return this;
}

public BaseServiceFinderBuilder<T, RegistryType, FinderType> withCuratorFramework(CuratorFramework curatorFramework) {
this.curatorFramework = curatorFramework;
public BaseServiceFinderBuilder<T, RegistryType, FinderType> withCuratorSourceConfig(final String connectionString, final String namespace, final String serviceName) {
Preconditions.checkNotNull(connectionString);
Preconditions.checkNotNull(namespace);
Preconditions.checkNotNull(serviceName);
curatorConfig = new CuratorSourceConfig(connectionString, namespace, serviceName);
return this;
}

public BaseServiceFinderBuilder<T, RegistryType, FinderType> withConnectionString(final String connectionString) {
this.connectionString = connectionString;
return this;
}
// public BaseServiceFinderBuilder<T, RegistryType, FinderType> withNamespace(final String namespace) {
// this.namespace = namespace;
// return this;
// }
//
// public BaseServiceFinderBuilder<T, RegistryType, FinderType> withServiceName(final String serviceName) {
// this.serviceName = serviceName;
// return this;
// }
//
// public BaseServiceFinderBuilder<T, RegistryType, FinderType> withConnectionString(final String connectionString) {
// this.connectionString = connectionString;
// return this;
// }

public BaseServiceFinderBuilder<T, RegistryType, FinderType> withDeserializer(Deserializer<T> deserializer) {
this.deserializer = deserializer;
Expand All @@ -75,28 +84,40 @@ public BaseServiceFinderBuilder<T, RegistryType, FinderType> witHhealthcheckRefr
return this;
}



public FinderType build() throws Exception {
Preconditions.checkNotNull(namespace);
Preconditions.checkNotNull(serviceName);
Preconditions.checkNotNull(deserializer);
if( null == curatorFramework) {
Preconditions.checkNotNull(connectionString);
curatorFramework = CuratorFrameworkFactory.builder()
.namespace(namespace)
.connectString(connectionString)
.retryPolicy(new ExponentialBackoffRetry(1000, 100)).build();
curatorFramework.start();
}
if( 0 == healthcheckRefreshTimeMillis) {
Copy link
Contributor

@Tushar-Naik Tushar-Naik Jul 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preconditions.checkNotNull(serviceName); required

healthcheckRefreshTimeMillis = 1000;
}
Service service = new Service(curatorFramework, namespace, serviceName);
return buildFinder(service, deserializer, shardSelector, nodeSelector, healthcheckRefreshTimeMillis);
if (null != httpConfig){
Preconditions.checkNotNull(deserializer);
HttpServiceRegistryUpdater<T> registryUpdater = new HttpServiceRegistryUpdater<T>(httpConfig, deserializer);
return buildFinder(httpConfig, registryUpdater, deserializer, shardSelector, nodeSelector, healthcheckRefreshTimeMillis);
}
if (null != curatorConfig) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (curatorFramework !=null) use the curatorFramework instead of building one

Preconditions.checkNotNull(deserializer);
CuratorFramework curatorFramework = buildCuratorFramework(curatorConfig);
CuratorServiceRegistryUpdater<T> registryUpdater = new CuratorServiceRegistryUpdater<T>(curatorConfig, deserializer, curatorFramework);
return buildFinder(curatorConfig, registryUpdater, deserializer, shardSelector, nodeSelector, healthcheckRefreshTimeMillis);
}
//TODO: what should be the default case?
return buildFinder(null, null, deserializer, shardSelector, nodeSelector, healthcheckRefreshTimeMillis);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make the code look like before

if( null == curatorFramework) {	
            Preconditions.checkNotNull(connectionString);	
            curatorFramework = CuratorFrameworkFactory.builder()	
                    .namespace(namespace)	
                    .connectString(connectionString)	
                    .retryPolicy(new ExponentialBackoffRetry(1000, 100)).build();	
            curatorFramework.start();	
        }
CuratorServiceRegistryUpdater<T> registryUpdater = new CuratorServiceRegistryUpdater<T>(curatorConfig, deserializer, curatorFramework);
return buildFinder(curatorConfig, registryUpdater, deserializer, shardSelector, nodeSelector, healthcheckRefreshTimeMillis);

}

private CuratorFramework buildCuratorFramework(CuratorSourceConfig curatorConfig) {
curatorFramework = CuratorFrameworkFactory.builder()
.namespace(curatorConfig.getNamespace())
.connectString(curatorConfig.getConnectionString())
.retryPolicy(new ExponentialBackoffRetry(1000, 100)).build();

// CuratorSourceConfig config = new CuratorSourceConfig(connectionString, namespace, serviceName);
// config.setConnectionString(connectionString);
// config.setNamespace(namespace);
// config.setServiceName(serviceName);
return curatorFramework;
}

protected abstract FinderType buildFinder(Service service,
protected abstract FinderType buildFinder(SourceConfig config,
AbstractServiceRegistryUpdater<T> registryUpdater,
Deserializer<T> deserializer,
ShardSelector<T, RegistryType> shardSelector,
ServiceNodeSelector<T> nodeSelector,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package com.flipkart.ranger.finder;

import com.flipkart.ranger.healthcheck.HealthcheckStatus;
import com.flipkart.ranger.model.Deserializer;
import com.flipkart.ranger.model.PathBuilder;
import com.flipkart.ranger.model.ServiceNode;
import com.google.common.collect.Lists;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.zookeeper.WatchedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class CuratorServiceRegistryUpdater<T> extends AbstractServiceRegistryUpdater<T> {
private static final Logger logger = LoggerFactory.getLogger(AbstractServiceRegistryUpdater.class);

private CuratorSourceConfig config;
private Deserializer<T> deserializer;
private CuratorFramework curatorFramework;

protected CuratorServiceRegistryUpdater(CuratorSourceConfig config, Deserializer<T> deserializer, CuratorFramework curatorFramework){
this.config = config;
this.deserializer = deserializer;
this.curatorFramework = curatorFramework;
}

@Override
public void start() throws Exception {
curatorFramework.start();

//zookeeper cluster connection
curatorFramework.blockUntilConnected();
logger.debug("Connected to zookeeper cluster");
curatorFramework.newNamespaceAwareEnsurePath(PathBuilder.path(config))
.ensure(curatorFramework.getZookeeperClient());

//CuratorFramework curatorFramework = serviceRegistry.getService().getCuratorFramework();
curatorFramework.getChildren().usingWatcher(new CuratorWatcher() {
@Override
public void process(WatchedEvent event) throws Exception {
switch (event.getType()) {

case NodeChildrenChanged: {
checkForUpdate();
break;
}
case None:
case NodeCreated:
case NodeDeleted:
case NodeDataChanged:
break;
default:
break;
}
}
}).forPath(PathBuilder.path(config)); //Start watcher on service node
serviceRegistry.nodes(getHealthyServiceNodes());
logger.info("Started polling zookeeper for changes");
}

public boolean isRunning() {
return curatorFramework != null
&& (curatorFramework.getState() == CuratorFrameworkState.STARTED);
}

@Override
protected List<ServiceNode<T>> getHealthyServiceNodes() {
try {
final long healthcheckZombieCheckThresholdTime = System.currentTimeMillis() - 60000; //1 Minute
//final Service service = serviceRegistry.getService();
if(!isRunning()) {
return null;
}

// final Deserializer<T> deserializer = serviceRegistry.getDeserializer();
// final CuratorFramework curatorFramework = config.getCuratorFramework();
final String parentPath = PathBuilder.path(config);
List<String> children = curatorFramework.getChildren().forPath(parentPath);
List<ServiceNode<T>> nodes = Lists.newArrayListWithCapacity(children.size());
for(String child : children) {
final String path = String.format("%s/%s", parentPath, child);
if(null == curatorFramework.checkExists().forPath(path)) {
continue;
}
byte[] data = curatorFramework.getData().forPath(path);
if(null == data) {
logger.warn("Not data present for node: " + path);
continue;
}
ServiceNode<T> key = deserializer.deserialize(data);
if(HealthcheckStatus.healthy == key.getHealthcheckStatus()
&& key.getLastUpdatedTimeStamp() > healthcheckZombieCheckThresholdTime) {
nodes.add(key);
}
}
return nodes;
} catch (Exception e) {
logger.error("Error getting service data from zookeeper: ", e);
}
return null;
}

@Override
public void stop() throws Exception {
curatorFramework.close();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You cannot unconditionally close this. You did not start this. Why close this here?

logger.debug("Stopped updater");
}

}
Loading