-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Http service #22
base: master
Are you sure you want to change the base?
Http service #22
Changes from 7 commits
2b74f82
1f8c065
998dbe3
b1bc7f1
e018009
3eea49a
7f55e47
d836f1f
9a8ad22
45902a0
dc12a40
a6d049a
bc77425
5dab01a
2c90b0a
ae6a6f7
023d9f5
07940bc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
package com.flipkart.ranger.finder; | ||
|
||
import com.flipkart.ranger.model.ServiceNode; | ||
import com.flipkart.ranger.model.ServiceRegistry; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.List; | ||
import java.util.concurrent.Callable; | ||
import java.util.concurrent.locks.Condition; | ||
import java.util.concurrent.locks.Lock; | ||
import java.util.concurrent.locks.ReentrantLock; | ||
|
||
public abstract class AbstractServiceRegistryUpdater<T> implements Callable<Void> { | ||
private static final Logger logger = LoggerFactory.getLogger(AbstractServiceRegistryUpdater.class); | ||
|
||
public abstract void start() throws Exception; | ||
public abstract void stop() throws Exception; | ||
|
||
protected ServiceRegistry<T> serviceRegistry; | ||
|
||
private Lock checkLock = new ReentrantLock(); | ||
private Condition checkCondition = checkLock.newCondition(); | ||
private boolean checkForUpdate = false; | ||
|
||
public void setServiceRegistry(ServiceRegistry<T> serviceRegistry) { | ||
this.serviceRegistry = serviceRegistry; | ||
} | ||
|
||
@Override | ||
public Void call() throws Exception { | ||
//Start checking for updates | ||
while (true) { | ||
try { | ||
checkLock.lock(); | ||
while (!checkForUpdate) { | ||
checkCondition.await(); | ||
} | ||
List<ServiceNode<T>> nodes = getHealthyServiceNodes(); | ||
if(null != nodes) { | ||
logger.debug("Setting nodelist of size: " + nodes.size()); | ||
serviceRegistry.nodes(nodes); | ||
} | ||
else { | ||
logger.warn("No service shards/nodes found. We are disconnected from zookeeper. Keeping old list."); | ||
} | ||
checkForUpdate =false; | ||
} finally { | ||
checkLock.unlock(); | ||
} | ||
} | ||
} | ||
|
||
//TODO: rename this method | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove todo, add documentation to explain what the method is intended to do |
||
protected abstract List<ServiceNode<T>> getHealthyServiceNodes(); | ||
|
||
public void checkForUpdate() { | ||
try { | ||
checkLock.lock(); | ||
checkForUpdate = true; | ||
checkCondition.signalAll(); | ||
} finally { | ||
checkLock.unlock(); | ||
} | ||
} | ||
} |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,32 +16,30 @@ | |
|
||
package com.flipkart.ranger.finder; | ||
|
||
import com.flipkart.ranger.model.Deserializer; | ||
import com.flipkart.ranger.model.ServiceNodeSelector; | ||
import com.flipkart.ranger.model.ServiceRegistry; | ||
import com.flipkart.ranger.model.ShardSelector; | ||
import com.flipkart.ranger.model.*; | ||
import com.google.common.base.Preconditions; | ||
import org.apache.curator.framework.CuratorFramework; | ||
import org.apache.curator.framework.CuratorFrameworkFactory; | ||
import org.apache.curator.retry.ExponentialBackoffRetry; | ||
|
||
public abstract class BaseServiceFinderBuilder<T, RegistryType extends ServiceRegistry<T>, FinderType extends ServiceFinder<T, RegistryType>> { | ||
private String namespace; | ||
private String serviceName; | ||
private CuratorFramework curatorFramework; | ||
private String connectionString; | ||
private CuratorSourceConfig curatorConfig; | ||
private HttpSourceConfig httpConfig; | ||
private String serviceName; | ||
|
||
private int healthcheckRefreshTimeMillis; | ||
private Deserializer<T> deserializer; | ||
private ShardSelector<T, RegistryType> shardSelector; | ||
private ServiceNodeSelector<T> nodeSelector = new RandomServiceNodeSelector<T>(); | ||
|
||
public BaseServiceFinderBuilder<T, RegistryType, FinderType> withNamespace(final String namespace) { | ||
this.namespace = namespace; | ||
public BaseServiceFinderBuilder<T, RegistryType, FinderType> withHttpSourceConfig(HttpSourceConfig httpConfig) { | ||
this.httpConfig = httpConfig; | ||
return this; | ||
} | ||
|
||
public BaseServiceFinderBuilder<T, RegistryType, FinderType> withServiceName(final String serviceName) { | ||
this.serviceName = serviceName; | ||
public BaseServiceFinderBuilder<T, RegistryType, FinderType> withCuratorSourceConfig(CuratorSourceConfig curatorConfig) { | ||
this.curatorConfig = curatorConfig; | ||
return this; | ||
} | ||
|
||
|
@@ -50,8 +48,8 @@ public BaseServiceFinderBuilder<T, RegistryType, FinderType> withCuratorFramewor | |
return this; | ||
} | ||
|
||
public BaseServiceFinderBuilder<T, RegistryType, FinderType> withConnectionString(final String connectionString) { | ||
this.connectionString = connectionString; | ||
public BaseServiceFinderBuilder<T, RegistryType, FinderType> withServiceName(final String serviceName) { | ||
this.serviceName = serviceName; | ||
return this; | ||
} | ||
|
||
|
@@ -75,28 +73,35 @@ public BaseServiceFinderBuilder<T, RegistryType, FinderType> witHhealthcheckRefr | |
return this; | ||
} | ||
|
||
|
||
|
||
public FinderType build() throws Exception { | ||
Preconditions.checkNotNull(namespace); | ||
Preconditions.checkNotNull(serviceName); | ||
Preconditions.checkNotNull(deserializer); | ||
if( null == curatorFramework) { | ||
Preconditions.checkNotNull(connectionString); | ||
curatorFramework = CuratorFrameworkFactory.builder() | ||
.namespace(namespace) | ||
.connectString(connectionString) | ||
.retryPolicy(new ExponentialBackoffRetry(1000, 100)).build(); | ||
curatorFramework.start(); | ||
} | ||
if( 0 == healthcheckRefreshTimeMillis) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
healthcheckRefreshTimeMillis = 1000; | ||
} | ||
Service service = new Service(curatorFramework, namespace, serviceName); | ||
return buildFinder(service, deserializer, shardSelector, nodeSelector, healthcheckRefreshTimeMillis); | ||
if (curatorFramework != null) { | ||
CuratorSourceConfig curatorConfig = new CuratorSourceConfig(null,curatorFramework.getNamespace()); | ||
CuratorServiceRegistryUpdater<T> registryUpdater = new CuratorServiceRegistryUpdater<T>(curatorConfig, deserializer, curatorFramework, serviceName); | ||
return buildFinder(curatorConfig, registryUpdater, deserializer, shardSelector, nodeSelector, healthcheckRefreshTimeMillis); | ||
} | ||
if (null != httpConfig){ | ||
Preconditions.checkNotNull(deserializer); | ||
HttpServiceRegistryUpdater<T> registryUpdater = new HttpServiceRegistryUpdater<T>(httpConfig, deserializer); | ||
return buildFinder(httpConfig, registryUpdater, deserializer, shardSelector, nodeSelector, healthcheckRefreshTimeMillis); | ||
} | ||
if (null != curatorConfig) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
Preconditions.checkNotNull(deserializer); | ||
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() | ||
.namespace(curatorConfig.getNamespace()) | ||
.connectString(curatorConfig.getConnectionString()) | ||
.retryPolicy(new ExponentialBackoffRetry(1000, 100)).build(); | ||
CuratorServiceRegistryUpdater<T> registryUpdater = new CuratorServiceRegistryUpdater<T>(curatorConfig, deserializer, curatorFramework, serviceName); | ||
return buildFinder(curatorConfig, registryUpdater, deserializer, shardSelector, nodeSelector, healthcheckRefreshTimeMillis); | ||
} | ||
//TODO: what should be the default case? | ||
return buildFinder(null, null, deserializer, shardSelector, nodeSelector, healthcheckRefreshTimeMillis); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. make the code look like before if( null == curatorFramework) {
Preconditions.checkNotNull(connectionString);
curatorFramework = CuratorFrameworkFactory.builder()
.namespace(namespace)
.connectString(connectionString)
.retryPolicy(new ExponentialBackoffRetry(1000, 100)).build();
curatorFramework.start();
}
CuratorServiceRegistryUpdater<T> registryUpdater = new CuratorServiceRegistryUpdater<T>(curatorConfig, deserializer, curatorFramework);
return buildFinder(curatorConfig, registryUpdater, deserializer, shardSelector, nodeSelector, healthcheckRefreshTimeMillis); |
||
} | ||
|
||
protected abstract FinderType buildFinder(Service service, | ||
protected abstract FinderType buildFinder(SourceConfig config, | ||
AbstractServiceRegistryUpdater<T> registryUpdater, | ||
Deserializer<T> deserializer, | ||
ShardSelector<T, RegistryType> shardSelector, | ||
ServiceNodeSelector<T> nodeSelector, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
package com.flipkart.ranger.finder; | ||
|
||
import com.flipkart.ranger.healthcheck.HealthcheckStatus; | ||
import com.flipkart.ranger.model.Deserializer; | ||
import com.flipkart.ranger.model.PathBuilder; | ||
import com.flipkart.ranger.model.ServiceNode; | ||
import com.google.common.collect.Lists; | ||
import org.apache.curator.framework.CuratorFramework; | ||
import org.apache.curator.framework.api.CuratorWatcher; | ||
import org.apache.curator.framework.imps.CuratorFrameworkState; | ||
import org.apache.zookeeper.WatchedEvent; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.List; | ||
|
||
public class CuratorServiceRegistryUpdater<T> extends AbstractServiceRegistryUpdater<T> { | ||
private static final Logger logger = LoggerFactory.getLogger(AbstractServiceRegistryUpdater.class); | ||
|
||
private CuratorSourceConfig config; | ||
private Deserializer<T> deserializer; | ||
private CuratorFramework curatorFramework; | ||
private String serviceName; | ||
|
||
protected CuratorServiceRegistryUpdater(CuratorSourceConfig config, Deserializer<T> deserializer, CuratorFramework curatorFramework, String serviceName){ | ||
this.config = config; | ||
this.deserializer = deserializer; | ||
this.curatorFramework = curatorFramework; | ||
this.serviceName = serviceName; | ||
} | ||
|
||
@Override | ||
public void start() throws Exception { | ||
curatorFramework.start(); | ||
|
||
//zookeeper cluster connection | ||
curatorFramework.blockUntilConnected(); | ||
logger.debug("Connected to zookeeper cluster"); | ||
curatorFramework.newNamespaceAwareEnsurePath(PathBuilder.path(serviceName)) | ||
.ensure(curatorFramework.getZookeeperClient()); | ||
|
||
curatorFramework.getChildren().usingWatcher(new CuratorWatcher() { | ||
@Override | ||
public void process(WatchedEvent event) throws Exception { | ||
switch (event.getType()) { | ||
|
||
case NodeChildrenChanged: { | ||
checkForUpdate(); | ||
break; | ||
} | ||
case None: | ||
case NodeCreated: | ||
case NodeDeleted: | ||
case NodeDataChanged: | ||
break; | ||
default: | ||
break; | ||
} | ||
} | ||
}).forPath(PathBuilder.path(serviceName)); //Start watcher on service node | ||
serviceRegistry.nodes(getHealthyServiceNodes()); | ||
logger.info("Started polling zookeeper for changes"); | ||
} | ||
|
||
public boolean isRunning() { | ||
return curatorFramework != null | ||
&& (curatorFramework.getState() == CuratorFrameworkState.STARTED); | ||
} | ||
|
||
@Override | ||
protected List<ServiceNode<T>> getHealthyServiceNodes() { | ||
try { | ||
final long healthcheckZombieCheckThresholdTime = System.currentTimeMillis() - 60000; //1 Minute | ||
|
||
if(!isRunning()) { | ||
return null; | ||
} | ||
|
||
final String parentPath = PathBuilder.path(serviceName); | ||
List<String> children = curatorFramework.getChildren().forPath(parentPath); | ||
List<ServiceNode<T>> nodes = Lists.newArrayListWithCapacity(children.size()); | ||
for(String child : children) { | ||
final String path = String.format("%s/%s", parentPath, child); | ||
if(null == curatorFramework.checkExists().forPath(path)) { | ||
continue; | ||
} | ||
byte[] data = curatorFramework.getData().forPath(path); | ||
if(null == data) { | ||
logger.warn("Not data present for node: " + path); | ||
continue; | ||
} | ||
ServiceNode<T> key = deserializer.deserialize(data); | ||
if(HealthcheckStatus.healthy == key.getHealthcheckStatus() | ||
&& key.getLastUpdatedTimeStamp() > healthcheckZombieCheckThresholdTime) { | ||
nodes.add(key); | ||
} | ||
} | ||
return nodes; | ||
} catch (Exception e) { | ||
logger.error("Error getting service data from zookeeper: ", e); | ||
} | ||
return null; | ||
} | ||
|
||
@Override | ||
public void stop() throws Exception { | ||
curatorFramework.close(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You cannot unconditionally close this. You did not start this. Why close this here? |
||
logger.debug("Stopped updater"); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename this to ServiceRegistryUpdater