diff --git a/pom.xml b/pom.xml index ec9c3e3f..e683c142 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ com.flipkart.ranger ranger jar - 0.3.0 + 0.4.0 @@ -94,6 +94,12 @@ junit 4.12 + + com.github.tomakehurst + wiremock-standalone + 2.18.0 + test + com.github.rholder @@ -114,8 +120,8 @@ maven-compiler-plugin 3.1 - 1.7 - 1.7 + 1.8 + 1.8 diff --git a/src/main/java/com/flipkart/ranger/finder/AbstractZookeeperServiceRegistry.java b/src/main/java/com/flipkart/ranger/finder/AbstractZookeeperServiceRegistry.java deleted file mode 100644 index 4c7bd8ce..00000000 --- a/src/main/java/com/flipkart/ranger/finder/AbstractZookeeperServiceRegistry.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Copyright 2015 Flipkart Internet Pvt. Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.flipkart.ranger.finder; - -import com.flipkart.ranger.model.Deserializer; -import com.flipkart.ranger.model.PathBuilder; -import com.flipkart.ranger.model.ServiceRegistry; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.*; - -public abstract class AbstractZookeeperServiceRegistry extends ServiceRegistry { - private static final Logger logger = LoggerFactory.getLogger(AbstractZookeeperServiceRegistry.class); - private int refreshIntervalMillis; - private ServiceRegistryUpdater updater; - private ExecutorService executorService = Executors.newFixedThreadPool(1); - private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - private ScheduledFuture scheduledFuture; - - protected AbstractZookeeperServiceRegistry(Service service, Deserializer deserializer, int refreshIntervalMillis) { - super(service, deserializer); - this.refreshIntervalMillis = refreshIntervalMillis; - } - - @Override - public void start() throws Exception { - final Service service = getService(); - service.getCuratorFramework().blockUntilConnected(); - logger.debug("Connected to zookeeper cluster"); - service.getCuratorFramework().newNamespaceAwareEnsurePath(PathBuilder.path(service)) - .ensure(service.getCuratorFramework().getZookeeperClient()); - updater = new ServiceRegistryUpdater(this); - updater.start(); - executorService.submit(updater); - scheduledFuture = scheduler.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - try { - updater.checkForUpdate(); - } catch (Exception e) { - e.printStackTrace(); - } - } - }, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); - logger.debug("Service Registry Started"); - } - - @Override - public void stop() throws Exception { - try { - if( null != scheduledFuture ) { - scheduledFuture.cancel(true); - } - updater.stop(); - } catch (Exception e) { - logger.error("Error stopping ZK poller: ", e); - } - getService().getCuratorFramework().close(); - //TODO - logger.debug("Service Registry stopped"); - } - -} diff --git a/src/main/java/com/flipkart/ranger/finder/BaseServiceFinderBuilder.java b/src/main/java/com/flipkart/ranger/finder/BaseServiceFinderBuilder.java index 594fe02f..104d3f11 100644 --- a/src/main/java/com/flipkart/ranger/finder/BaseServiceFinderBuilder.java +++ b/src/main/java/com/flipkart/ranger/finder/BaseServiceFinderBuilder.java @@ -16,47 +16,18 @@ package com.flipkart.ranger.finder; -import com.flipkart.ranger.model.Deserializer; -import com.flipkart.ranger.model.ServiceNodeSelector; -import com.flipkart.ranger.model.ServiceRegistry; -import com.flipkart.ranger.model.ShardSelector; -import com.google.common.base.Preconditions; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.ExponentialBackoffRetry; +import com.flipkart.ranger.model.*; public abstract class BaseServiceFinderBuilder, FinderType extends ServiceFinder> { - private String namespace; - private String serviceName; - private CuratorFramework curatorFramework; - private String connectionString; + private SourceConfig sourceConfig; + private ServiceRegistryUpdater serviceRegistryUpdater; + private int healthcheckRefreshTimeMillis; - private Deserializer deserializer; private ShardSelector shardSelector; private ServiceNodeSelector nodeSelector = new RandomServiceNodeSelector(); - public BaseServiceFinderBuilder withNamespace(final String namespace) { - this.namespace = namespace; - return this; - } - - public BaseServiceFinderBuilder withServiceName(final String serviceName) { - this.serviceName = serviceName; - return this; - } - - public BaseServiceFinderBuilder withCuratorFramework(CuratorFramework curatorFramework) { - this.curatorFramework = curatorFramework; - return this; - } - - public BaseServiceFinderBuilder withConnectionString(final String connectionString) { - this.connectionString = connectionString; - return this; - } - - public BaseServiceFinderBuilder withDeserializer(Deserializer deserializer) { - this.deserializer = deserializer; + public BaseServiceFinderBuilder withSourceConfig(SourceConfig sourceConfig) { + this.sourceConfig = sourceConfig; return this; } @@ -70,34 +41,30 @@ public BaseServiceFinderBuilder withNodeSelector(Se return this; } - public BaseServiceFinderBuilder witHhealthcheckRefreshTimeMillis(int healthcheckRefreshTimeMillis) { + public BaseServiceFinderBuilder withHealthcheckRefreshTimeMillis(int healthcheckRefreshTimeMillis) { this.healthcheckRefreshTimeMillis = healthcheckRefreshTimeMillis; return this; } - + public BaseServiceFinderBuilder withServiceRegistryUpdater(ServiceRegistryUpdater serviceRegistryUpdater) { + this.serviceRegistryUpdater = serviceRegistryUpdater; + return this; + } public FinderType build() throws Exception { - Preconditions.checkNotNull(namespace); - Preconditions.checkNotNull(serviceName); - Preconditions.checkNotNull(deserializer); - if( null == curatorFramework) { - Preconditions.checkNotNull(connectionString); - curatorFramework = CuratorFrameworkFactory.builder() - .namespace(namespace) - .connectString(connectionString) - .retryPolicy(new ExponentialBackoffRetry(1000, 100)).build(); - curatorFramework.start(); - } - if( 0 == healthcheckRefreshTimeMillis) { + if (0 == healthcheckRefreshTimeMillis) { healthcheckRefreshTimeMillis = 1000; } - Service service = new Service(curatorFramework, namespace, serviceName); - return buildFinder(service, deserializer, shardSelector, nodeSelector, healthcheckRefreshTimeMillis); + + if(serviceRegistryUpdater == null) { + ServiceRegistryUpdaterFactory serviceRegistryUpdaterFactory = new ServiceRegistryUpdaterFactory(); + serviceRegistryUpdater = serviceRegistryUpdaterFactory.getServiceRegistryUpdater(sourceConfig); + } + return buildFinder(sourceConfig, serviceRegistryUpdater, shardSelector, nodeSelector, healthcheckRefreshTimeMillis); } - protected abstract FinderType buildFinder(Service service, - Deserializer deserializer, + protected abstract FinderType buildFinder(SourceConfig config, + ServiceRegistryUpdater registryUpdater, ShardSelector shardSelector, ServiceNodeSelector nodeSelector, int healthcheckRefreshTimeMillis); diff --git a/src/main/java/com/flipkart/ranger/finder/Service.java b/src/main/java/com/flipkart/ranger/finder/CuratorFrameworkConfig.java similarity index 64% rename from src/main/java/com/flipkart/ranger/finder/Service.java rename to src/main/java/com/flipkart/ranger/finder/CuratorFrameworkConfig.java index 9300b8dd..0f27b81f 100644 --- a/src/main/java/com/flipkart/ranger/finder/Service.java +++ b/src/main/java/com/flipkart/ranger/finder/CuratorFrameworkConfig.java @@ -16,34 +16,34 @@ package com.flipkart.ranger.finder; +import com.flipkart.ranger.model.Deserializer; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.imps.CuratorFrameworkState; -public class Service { +public class CuratorFrameworkConfig extends SourceConfig { private CuratorFramework curatorFramework; - private String namespace; + private Deserializer deserializer; private String serviceName; - public Service(CuratorFramework curatorFramework, String namespace, String serviceName) { + public CuratorFrameworkConfig(CuratorFramework curatorFramework, String serviceName, Deserializer deserializer) { + super(ServiceType.CURATORSOURCE); this.curatorFramework = curatorFramework; - this.namespace = namespace; + this.deserializer = deserializer; this.serviceName = serviceName; } - public CuratorFramework getCuratorFramework() { - return curatorFramework; + public String getServiceName() { + return serviceName; } - public String getNamespace() { - return namespace; + public Deserializer getDeserializer() { + return deserializer; } - public String getServiceName() { - return serviceName; + public CuratorFramework getCuratorFramework() { + return curatorFramework; } - public boolean isRunning() { - return curatorFramework != null - && (curatorFramework.getState() == CuratorFrameworkState.STARTED); + public ServiceRegistryUpdater accept(SourceConfigVisitor sourceConfigVisitor) throws Exception { + return sourceConfigVisitor.visit(this); } } diff --git a/src/main/java/com/flipkart/ranger/finder/CuratorServiceRegistryUpdater.java b/src/main/java/com/flipkart/ranger/finder/CuratorServiceRegistryUpdater.java new file mode 100644 index 00000000..e422ed85 --- /dev/null +++ b/src/main/java/com/flipkart/ranger/finder/CuratorServiceRegistryUpdater.java @@ -0,0 +1,124 @@ +/** + * Copyright 2015 Flipkart Internet Pvt. Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.flipkart.ranger.finder; + +import com.flipkart.ranger.healthcheck.HealthcheckStatus; +import com.flipkart.ranger.model.Deserializer; +import com.flipkart.ranger.model.PathBuilder; +import com.flipkart.ranger.model.ServiceNode; +import com.google.common.collect.Lists; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.zookeeper.WatchedEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class CuratorServiceRegistryUpdater extends ServiceRegistryUpdater { + private static final Logger logger = LoggerFactory.getLogger(ServiceRegistryUpdater.class); + + private Deserializer deserializer; + private CuratorFramework curatorFramework; + private String serviceName; + + protected CuratorServiceRegistryUpdater(Deserializer deserializer, CuratorFramework curatorFramework, String serviceName){ + this.deserializer = deserializer; + this.curatorFramework = curatorFramework; + this.serviceName = serviceName; + } + + @Override + public void start() throws Exception { + logger.info("Starting curator framework..."); + + curatorFramework.blockUntilConnected(); + logger.debug("Connected to zookeeper cluster"); + curatorFramework.newNamespaceAwareEnsurePath(PathBuilder.path(serviceName)) + .ensure(curatorFramework.getZookeeperClient()); + + curatorFramework.getChildren().usingWatcher(new CuratorWatcher() { + @Override + public void process(WatchedEvent event) throws Exception { + switch (event.getType()) { + + case NodeChildrenChanged: { + checkForUpdate(); + break; + } + case None: + case NodeCreated: + case NodeDeleted: + case NodeDataChanged: + break; + default: + break; + } + } + }).forPath(PathBuilder.path(serviceName)); //Start watcher on service node + serviceRegistry.nodes(getHealthyServiceNodes()); + logger.info("Started polling zookeeper for changes"); + } + + public boolean isRunning() { + return curatorFramework != null + && (curatorFramework.getState() == CuratorFrameworkState.STARTED); + } + + @Override + protected List> getHealthyServiceNodes() { + try { + final long healthcheckZombieCheckThresholdTime = System.currentTimeMillis() - 60000; //1 Minute + + if(!isRunning()) { + return null; + } + + final String parentPath = PathBuilder.path(serviceName); + List children = curatorFramework.getChildren().forPath(parentPath); + List> nodes = Lists.newArrayListWithCapacity(children.size()); + for(String child : children) { + final String path = String.format("%s/%s", parentPath, child); + if(null == curatorFramework.checkExists().forPath(path)) { + continue; + } + byte[] data = curatorFramework.getData().forPath(path); + if(null == data) { + logger.warn("Not data present for node: " + path); + continue; + } + ServiceNode key = deserializer.deserialize(data); + if(HealthcheckStatus.healthy == key.getHealthcheckStatus() + && key.getLastUpdatedTimeStamp() > healthcheckZombieCheckThresholdTime) { + nodes.add(key); + } + } + return nodes; + } catch (Exception e) { + logger.error("Error getting service data from zookeeper: ", e); + } + return null; + } + + @Override + public void stop() throws Exception { + curatorFramework.close(); + logger.debug("Stopped updater"); + } + +} diff --git a/src/main/java/com/flipkart/ranger/finder/HttpServiceRegistryUpdater.java b/src/main/java/com/flipkart/ranger/finder/HttpServiceRegistryUpdater.java new file mode 100644 index 00000000..91f1f4db --- /dev/null +++ b/src/main/java/com/flipkart/ranger/finder/HttpServiceRegistryUpdater.java @@ -0,0 +1,114 @@ +/** + * Copyright 2015 Flipkart Internet Pvt. Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.flipkart.ranger.finder; + +import com.flipkart.ranger.finder.HttpVerb.HttpVerbFactory; +import com.flipkart.ranger.healthcheck.HealthcheckStatus; +import com.flipkart.ranger.model.ServiceNode; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.util.List; +import java.util.stream.Collectors; + +public class HttpServiceRegistryUpdater extends ServiceRegistryUpdater { + private static final Logger logger = LoggerFactory.getLogger(HttpServiceRegistryUpdater.class); + + private HttpSourceConfig httpSourceConfig; + private CloseableHttpClient httpclient; + private URI uri; + private HttpVerbFactory httpVerbFactory; + private String scheme; + + protected HttpServiceRegistryUpdater(HttpSourceConfig httpSourceConfig) throws Exception { + this.httpSourceConfig = httpSourceConfig; + final String host = this.httpSourceConfig.getHost(); + final int port = this.httpSourceConfig.getPort(); + final String path = this.httpSourceConfig.getPath(); + + if(httpSourceConfig.isSecure()) { + this.scheme = "https"; + } else { + this.scheme = "http"; + } + this.uri = new URIBuilder() + .setScheme(scheme) + .setHost(host) + .setPort(port) + .setPath(path) + .build(); + this.httpVerbFactory = new HttpVerbFactory(); + } + + @Override + public void start() throws Exception { + if(this.scheme == "https" && this.httpSourceConfig.isSuppressHostCheck()){ + httpclient = HttpClients.custom().setSSLHostnameVerifier((s, sslSession) -> true).build(); + } else { + httpclient = HttpClients.createDefault(); + } + httpclient = HttpClients.custom().setSSLHostnameVerifier((s, sslSession) -> true).build(); + + serviceRegistry.nodes(getHealthyServiceNodes()); + logger.info("Started http updater"); + } + + @Override + public void stop() throws Exception { + httpclient.close(); + logger.debug("Stopped http updater"); + } + + @Override + protected List> getHealthyServiceNodes() { + try { + final long healthcheckZombieCheckThresholdTime = System.currentTimeMillis() - 60000; //1 Minute + + HttpRequestBase httpRequestBase = httpVerbFactory.getHttpVerb(httpSourceConfig.getHttpVerb(), uri); + + try (CloseableHttpResponse response = httpclient.execute(httpRequestBase)) { + int status = response.getStatusLine().getStatusCode(); + if (status < 200 && status >= 300) { + logger.error("Error in Http get, Status Code: " + response.getStatusLine().getStatusCode() + " received Response: " + response); + return null; + } + + byte[] data = EntityUtils.toByteArray(response.getEntity()); + if (null == data) { + logger.warn("No data present"); + return null; + } + + List> serviceNodes = httpSourceConfig.getHttpResponseDecoder().deserialize(data); + return serviceNodes.stream() + .filter(node -> (node.getHealthcheckStatus() == HealthcheckStatus.healthy && + node.getLastUpdatedTimeStamp() > healthcheckZombieCheckThresholdTime)) + .collect(Collectors.toList()); + } + } catch (Exception e) { + logger.error("Error getting service data from http: ", e); + } + return null; + } +} diff --git a/src/main/java/com/flipkart/ranger/finder/HttpSourceConfig.java b/src/main/java/com/flipkart/ranger/finder/HttpSourceConfig.java new file mode 100644 index 00000000..443074e4 --- /dev/null +++ b/src/main/java/com/flipkart/ranger/finder/HttpSourceConfig.java @@ -0,0 +1,75 @@ +/** + * Copyright 2015 Flipkart Internet Pvt. Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.flipkart.ranger.finder; + +import com.flipkart.ranger.finder.HttpVerb.HttpVerb; +import com.flipkart.ranger.model.HttpResponseDecoder; + +public class HttpSourceConfig extends SourceConfig { + private String host; + private int port = 80; + private String path; + private boolean secure = false; + + private boolean suppressHostCheck = false; + private HttpVerb httpVerb; + + private HttpResponseDecoder httpResponseDecoder; + + public HttpSourceConfig(String host, int port, String path, HttpResponseDecoder httpResponseDecoder, boolean secure, boolean suppressHostCheck, HttpVerb httpVerb) { + super(ServiceType.HTTPSOURCE); + this.host = host; + this.port = port; + this.path = path; + this.httpResponseDecoder = httpResponseDecoder; + this.secure = secure; + this.httpVerb = httpVerb; + this.suppressHostCheck = suppressHostCheck; + } + + public HttpResponseDecoder getHttpResponseDecoder() { + return httpResponseDecoder; + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + public String getPath() { + return path; + } + + public boolean isSecure() { + return secure; + } + + public boolean isSuppressHostCheck() { + return suppressHostCheck; + } + + public HttpVerb getHttpVerb() { + return httpVerb; + } + + public ServiceRegistryUpdater accept(SourceConfigVisitor sourceConfigVisitor) throws Exception { + return sourceConfigVisitor.visit(this); + } +} diff --git a/src/main/java/com/flipkart/ranger/finder/HttpVerb/GetHttpVerb.java b/src/main/java/com/flipkart/ranger/finder/HttpVerb/GetHttpVerb.java new file mode 100644 index 00000000..c2eacc57 --- /dev/null +++ b/src/main/java/com/flipkart/ranger/finder/HttpVerb/GetHttpVerb.java @@ -0,0 +1,11 @@ +package com.flipkart.ranger.finder.HttpVerb; + +public class GetHttpVerb extends HttpVerb { + public GetHttpVerb() { + super(HttpVerbType.GET); + } + + public T accept(HttpVerbVisitor httpVerbVisitor) throws Exception{ + return httpVerbVisitor.visit(this); + } +} diff --git a/src/main/java/com/flipkart/ranger/finder/HttpVerb/HttpVerb.java b/src/main/java/com/flipkart/ranger/finder/HttpVerb/HttpVerb.java new file mode 100644 index 00000000..361e35ae --- /dev/null +++ b/src/main/java/com/flipkart/ranger/finder/HttpVerb/HttpVerb.java @@ -0,0 +1,20 @@ +package com.flipkart.ranger.finder.HttpVerb; + +public abstract class HttpVerb { + public enum HttpVerbType { + GET, + POST + } + + private HttpVerbType httpVerbType; + + public HttpVerb(HttpVerbType httpVerbType) { + this.httpVerbType = httpVerbType; + } + + public HttpVerbType getHttpVerbType() { + return httpVerbType; + } + + public abstract T accept(HttpVerbVisitor httpVerbVisitor) throws Exception; +} diff --git a/src/main/java/com/flipkart/ranger/finder/HttpVerb/HttpVerbFactory.java b/src/main/java/com/flipkart/ranger/finder/HttpVerb/HttpVerbFactory.java new file mode 100644 index 00000000..17aabe5e --- /dev/null +++ b/src/main/java/com/flipkart/ranger/finder/HttpVerb/HttpVerbFactory.java @@ -0,0 +1,26 @@ +package com.flipkart.ranger.finder.HttpVerb; + +import org.apache.http.client.entity.UrlEncodedFormEntity; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpRequestBase; + +import java.net.URI; + +public class HttpVerbFactory { + public HttpRequestBase getHttpVerb(HttpVerb httpVerb, URI uri) throws Exception{ + return httpVerb.accept(new HttpVerbVisitor() { + @Override + public HttpRequestBase visit(GetHttpVerb getHttpVerb) throws Exception{ + return new HttpGet(uri); + } + + @Override + public HttpRequestBase visit(PostHttpVerb postHttpVerb) throws Exception{ + HttpPost httpPost = new HttpPost(uri); + httpPost.setEntity(new UrlEncodedFormEntity(postHttpVerb.getBody())); + return httpPost; + } + }); + } +} diff --git a/src/main/java/com/flipkart/ranger/finder/HttpVerb/HttpVerbVisitor.java b/src/main/java/com/flipkart/ranger/finder/HttpVerb/HttpVerbVisitor.java new file mode 100644 index 00000000..d0daea61 --- /dev/null +++ b/src/main/java/com/flipkart/ranger/finder/HttpVerb/HttpVerbVisitor.java @@ -0,0 +1,6 @@ +package com.flipkart.ranger.finder.HttpVerb; + +public interface HttpVerbVisitor { + T visit(GetHttpVerb getHttpVerb) throws Exception; + T visit(PostHttpVerb postHttpVerb) throws Exception; +} diff --git a/src/main/java/com/flipkart/ranger/finder/HttpVerb/PostHttpVerb.java b/src/main/java/com/flipkart/ranger/finder/HttpVerb/PostHttpVerb.java new file mode 100644 index 00000000..921fbd69 --- /dev/null +++ b/src/main/java/com/flipkart/ranger/finder/HttpVerb/PostHttpVerb.java @@ -0,0 +1,19 @@ +package com.flipkart.ranger.finder.HttpVerb; + +import java.util.List; + +public class PostHttpVerb extends HttpVerb { + List body; + public PostHttpVerb(List body) { + super(HttpVerbType.POST); + this.body = body; + } + + public List getBody() { + return body; + } + + public T accept(HttpVerbVisitor httpVerbVisitor) throws Exception{ + return httpVerbVisitor.visit(this); + } +} diff --git a/src/main/java/com/flipkart/ranger/finder/RoundRobinServiceNodeSelector.java b/src/main/java/com/flipkart/ranger/finder/RoundRobinServiceNodeSelector.java index 9e0d7fac..dc428a0a 100644 --- a/src/main/java/com/flipkart/ranger/finder/RoundRobinServiceNodeSelector.java +++ b/src/main/java/com/flipkart/ranger/finder/RoundRobinServiceNodeSelector.java @@ -20,7 +20,6 @@ import com.flipkart.ranger.model.ServiceNodeSelector; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; public class RoundRobinServiceNodeSelector implements ServiceNodeSelector { private static final ThreadLocal index = diff --git a/src/main/java/com/flipkart/ranger/finder/ServiceFinder.java b/src/main/java/com/flipkart/ranger/finder/ServiceFinder.java index 0e1d8719..4760c8ea 100644 --- a/src/main/java/com/flipkart/ranger/finder/ServiceFinder.java +++ b/src/main/java/com/flipkart/ranger/finder/ServiceFinder.java @@ -20,18 +20,31 @@ import com.flipkart.ranger.model.ServiceNodeSelector; import com.flipkart.ranger.model.ServiceRegistry; import com.flipkart.ranger.model.ShardSelector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; +import java.util.concurrent.*; public class ServiceFinder> { + private static final Logger logger = LoggerFactory.getLogger(ServiceFinder.class); + private final ServiceRegistryType serviceRegistry; private final ShardSelector shardSelector; + private ServiceRegistryUpdater updater; private final ServiceNodeSelector nodeSelector; + private int refreshIntervalMillis; + + private ExecutorService executorService = Executors.newFixedThreadPool(1); + private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + private ScheduledFuture scheduledFuture; - public ServiceFinder(ServiceRegistryType serviceRegistry, ShardSelector shardSelector, ServiceNodeSelector nodeSelector) { + public ServiceFinder(ServiceRegistryType serviceRegistry, ServiceRegistryUpdater updater, ShardSelector shardSelector, ServiceNodeSelector nodeSelector, int refreshIntervalMillis) { this.serviceRegistry = serviceRegistry; this.shardSelector = shardSelector; this.nodeSelector = nodeSelector; + this.updater = updater; + this.refreshIntervalMillis = refreshIntervalMillis; } public ServiceNode get(T criteria) { @@ -47,10 +60,33 @@ public List> getAll(T criteria) { } public void start() throws Exception { - serviceRegistry.start(); + updater.setServiceRegistry(serviceRegistry); + + updater.start(); + + executorService.submit(updater); + scheduledFuture = scheduler.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + try { + updater.checkForUpdate(); + } catch (Exception e) { + logger.warn("Error checking for update ZK poller: ", e); + } + } + }, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + logger.debug("ServiceFinder Scheduler Started"); } public void stop() throws Exception { - serviceRegistry.stop(); + try { + if( null != scheduledFuture ) { + scheduledFuture.cancel(true); + } + updater.stop(); + } catch (Exception e) { + logger.error("Error stopping ZK poller: ", e); + } + logger.info("ServiceFinder Scheduler stopped"); } } diff --git a/src/main/java/com/flipkart/ranger/finder/ServiceRegistryUpdater.java b/src/main/java/com/flipkart/ranger/finder/ServiceRegistryUpdater.java index a5b0bec5..eac7edd9 100644 --- a/src/main/java/com/flipkart/ranger/finder/ServiceRegistryUpdater.java +++ b/src/main/java/com/flipkart/ranger/finder/ServiceRegistryUpdater.java @@ -16,15 +16,8 @@ package com.flipkart.ranger.finder; -import com.flipkart.ranger.healthcheck.HealthcheckStatus; -import com.flipkart.ranger.model.Deserializer; -import com.flipkart.ranger.model.PathBuilder; import com.flipkart.ranger.model.ServiceNode; import com.flipkart.ranger.model.ServiceRegistry; -import com.google.common.collect.Lists; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.api.CuratorWatcher; -import org.apache.zookeeper.WatchedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,60 +27,37 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -public class ServiceRegistryUpdater implements Callable { - private static final Logger logger = LoggerFactory.getLogger(ServiceRegistry.class); +public abstract class ServiceRegistryUpdater implements Callable { + private static final Logger logger = LoggerFactory.getLogger(ServiceRegistryUpdater.class); - private ServiceRegistry serviceRegistry; + public abstract void start() throws Exception; + public abstract void stop() throws Exception; + + protected ServiceRegistry serviceRegistry; private Lock checkLock = new ReentrantLock(); private Condition checkCondition = checkLock.newCondition(); private boolean checkForUpdate = false; - public ServiceRegistryUpdater(ServiceRegistry serviceRegistry) { + public void setServiceRegistry(ServiceRegistry serviceRegistry) { this.serviceRegistry = serviceRegistry; } - public void start() throws Exception { - CuratorFramework curatorFramework = serviceRegistry.getService().getCuratorFramework(); - curatorFramework.getChildren().usingWatcher(new CuratorWatcher() { - @Override - public void process(WatchedEvent event) throws Exception { - switch (event.getType()) { - - case NodeChildrenChanged: { - checkForUpdate(); - break; - } - case None: - case NodeCreated: - case NodeDeleted: - case NodeDataChanged: - break; - default: - break; - } - } - }).forPath(PathBuilder.path(serviceRegistry.getService())); //Start watcher on service node - serviceRegistry.nodes(checkForUpdateOnZookeeper()); - logger.info("Started polling zookeeper for changes"); - } - @Override public Void call() throws Exception { - //Start checking for updates while (true) { try { checkLock.lock(); while (!checkForUpdate) { checkCondition.await(); } - List> nodes = checkForUpdateOnZookeeper(); + List> nodes = getHealthyServiceNodes(); if(null != nodes) { logger.debug("Setting nodelist of size: " + nodes.size()); serviceRegistry.nodes(nodes); } else { - logger.warn("No service shards/nodes found. We are disconnected from zookeeper. Keeping old list."); + logger.warn("No service shards/nodes found. We are disconnected from zookeeper/http server. Keeping old list."); } checkForUpdate =false; } finally { @@ -96,6 +66,8 @@ public Void call() throws Exception { } } + protected abstract List> getHealthyServiceNodes(); + public void checkForUpdate() { try { checkLock.lock(); @@ -105,43 +77,4 @@ public void checkForUpdate() { checkLock.unlock(); } } - - private List> checkForUpdateOnZookeeper() { - try { - final long healthcheckZombieCheckThresholdTime = System.currentTimeMillis() - 60000; //1 Minute - final Service service = serviceRegistry.getService(); - if(!service.isRunning()) { - return null; - } - final Deserializer deserializer = serviceRegistry.getDeserializer(); - final CuratorFramework curatorFramework = service.getCuratorFramework(); - final String parentPath = PathBuilder.path(service); - List children = curatorFramework.getChildren().forPath(parentPath); - List> nodes = Lists.newArrayListWithCapacity(children.size()); - for(String child : children) { - final String path = String.format("%s/%s", parentPath, child); - if(null == curatorFramework.checkExists().forPath(path)) { - continue; - } - byte[] data = curatorFramework.getData().forPath(path); - if(null == data) { - logger.warn("Not data present for node: " + path); - continue; - } - ServiceNode key = deserializer.deserialize(data); - if(HealthcheckStatus.healthy == key.getHealthcheckStatus() - && key.getLastUpdatedTimeStamp() > healthcheckZombieCheckThresholdTime) { - nodes.add(key); - } - } - return nodes; - } catch (Exception e) { - logger.error("Error getting service data from zookeeper: ", e); - } - return null; - } - - public void stop() { - logger.debug("Stopped updater"); - } } diff --git a/src/main/java/com/flipkart/ranger/finder/ServiceRegistryUpdaterFactory.java b/src/main/java/com/flipkart/ranger/finder/ServiceRegistryUpdaterFactory.java new file mode 100644 index 00000000..fcdb8acd --- /dev/null +++ b/src/main/java/com/flipkart/ranger/finder/ServiceRegistryUpdaterFactory.java @@ -0,0 +1,47 @@ +package com.flipkart.ranger.finder; + +import com.google.common.base.Preconditions; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; + +public class ServiceRegistryUpdaterFactory { + + public ServiceRegistryUpdater getServiceRegistryUpdater(SourceConfig sourceConfig) throws Exception{ + return sourceConfig.accept(new SourceConfigVisitor() { + @Override + public ServiceRegistryUpdater visit(CuratorFrameworkConfig curatorFrameworkConfig) throws Exception{ + Preconditions.checkNotNull(curatorFrameworkConfig.getCuratorFramework()); + Preconditions.checkNotNull(curatorFrameworkConfig.getDeserializer()); + Preconditions.checkNotNull(curatorFrameworkConfig.getServiceName()); + return new CuratorServiceRegistryUpdater(curatorFrameworkConfig.getDeserializer(), + curatorFrameworkConfig.getCuratorFramework(), curatorFrameworkConfig.getServiceName()); + } + + @Override + public ServiceRegistryUpdater visit(HttpSourceConfig httpSourceConfig) throws Exception{ + Preconditions.checkNotNull(httpSourceConfig.getHost()); + Preconditions.checkNotNull(httpSourceConfig.getPort()); + Preconditions.checkNotNull(httpSourceConfig.getHttpResponseDecoder()); + Preconditions.checkArgument((httpSourceConfig.getPort() > 0 && httpSourceConfig.getPort() < 65535)); + return new HttpServiceRegistryUpdater(httpSourceConfig); + } + + @Override + public ServiceRegistryUpdater visit(ZookeeperSourceConfig zookeeperSourceConfig) throws Exception{ + Preconditions.checkNotNull(zookeeperSourceConfig.getNamespace()); + Preconditions.checkNotNull(zookeeperSourceConfig.getConnectionString()); + Preconditions.checkNotNull(zookeeperSourceConfig.getZookeeperNodeDataDecoder()); + Preconditions.checkNotNull(zookeeperSourceConfig.getServiceName()); + CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() + .namespace(zookeeperSourceConfig.getNamespace()) + .connectString(zookeeperSourceConfig.getConnectionString()) + .retryPolicy(new ExponentialBackoffRetry(1000, 100)).build(); + curatorFramework.start(); + return new CuratorServiceRegistryUpdater(zookeeperSourceConfig.getZookeeperNodeDataDecoder(), + curatorFramework, zookeeperSourceConfig.getServiceName()); + } + }); + + } +} diff --git a/src/main/java/com/flipkart/ranger/finder/SourceConfig.java b/src/main/java/com/flipkart/ranger/finder/SourceConfig.java new file mode 100644 index 00000000..2af3966b --- /dev/null +++ b/src/main/java/com/flipkart/ranger/finder/SourceConfig.java @@ -0,0 +1,38 @@ +/** + * Copyright 2015 Flipkart Internet Pvt. Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.flipkart.ranger.finder; + +public abstract class SourceConfig { + + public enum ServiceType { + CURATORSOURCE, + ZOOKEEPERSOURCE, + HTTPSOURCE + } + + private ServiceType serviceType; + + public SourceConfig(ServiceType serviceType) { + this.serviceType = serviceType; + } + + public ServiceType getServiceType() { + return serviceType; + } + + public abstract ServiceRegistryUpdater accept(SourceConfigVisitor sourceConfigVisitor) throws Exception; +} diff --git a/src/main/java/com/flipkart/ranger/finder/SourceConfigVisitor.java b/src/main/java/com/flipkart/ranger/finder/SourceConfigVisitor.java new file mode 100644 index 00000000..f09b0642 --- /dev/null +++ b/src/main/java/com/flipkart/ranger/finder/SourceConfigVisitor.java @@ -0,0 +1,7 @@ +package com.flipkart.ranger.finder; + +public interface SourceConfigVisitor { + ServiceRegistryUpdater visit(HttpSourceConfig httpSourceConfig) throws Exception; + ServiceRegistryUpdater visit(ZookeeperSourceConfig zookeeperSourceConfig) throws Exception; + ServiceRegistryUpdater visit(CuratorFrameworkConfig curatorFrameworkConfig) throws Exception; +} \ No newline at end of file diff --git a/src/main/java/com/flipkart/ranger/finder/ZookeeperSourceConfig.java b/src/main/java/com/flipkart/ranger/finder/ZookeeperSourceConfig.java new file mode 100644 index 00000000..e317222d --- /dev/null +++ b/src/main/java/com/flipkart/ranger/finder/ZookeeperSourceConfig.java @@ -0,0 +1,54 @@ +/** + * Copyright 2015 Flipkart Internet Pvt. Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.flipkart.ranger.finder; + +import com.flipkart.ranger.model.Deserializer; + +public class ZookeeperSourceConfig extends SourceConfig{ + private String connectionString; + private String namespace; + private Deserializer zookeeperNodeDataDecoder; + private String serviceName; + + public ZookeeperSourceConfig(String connectionString, String namespace, String serviceName, Deserializer deserializer) { + super(ServiceType.ZOOKEEPERSOURCE); + this.connectionString = connectionString; + this.namespace = namespace; + this.zookeeperNodeDataDecoder = deserializer; + this.serviceName = serviceName; + } + + public String getServiceName() { + return serviceName; + } + + public Deserializer getZookeeperNodeDataDecoder() { + return zookeeperNodeDataDecoder; + } + + public String getNamespace() { + return namespace; + } + + public String getConnectionString() { + return connectionString; + } + + public ServiceRegistryUpdater accept(SourceConfigVisitor sourceConfigVisitor) throws Exception{ + return sourceConfigVisitor.visit(this); + } +} diff --git a/src/main/java/com/flipkart/ranger/finder/sharded/MapBasedServiceRegistry.java b/src/main/java/com/flipkart/ranger/finder/sharded/MapBasedServiceRegistry.java index b544f372..366c67ba 100644 --- a/src/main/java/com/flipkart/ranger/finder/sharded/MapBasedServiceRegistry.java +++ b/src/main/java/com/flipkart/ranger/finder/sharded/MapBasedServiceRegistry.java @@ -16,10 +16,8 @@ package com.flipkart.ranger.finder.sharded; -import com.flipkart.ranger.finder.AbstractZookeeperServiceRegistry; -import com.flipkart.ranger.finder.Service; -import com.flipkart.ranger.model.Deserializer; import com.flipkart.ranger.model.ServiceNode; +import com.flipkart.ranger.model.ServiceRegistry; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.ListMultimap; @@ -27,13 +25,9 @@ import java.util.List; import java.util.concurrent.atomic.AtomicReference; -public class MapBasedServiceRegistry extends AbstractZookeeperServiceRegistry { +public class MapBasedServiceRegistry implements ServiceRegistry { private AtomicReference>> nodes = new AtomicReference>>(); - public MapBasedServiceRegistry(Service service, Deserializer deserializer, int refreshInterval) { - super(service, deserializer, refreshInterval); - } - public ListMultimap> nodes() { return nodes.get(); } diff --git a/src/main/java/com/flipkart/ranger/finder/sharded/SimpleShardedServiceFinder.java b/src/main/java/com/flipkart/ranger/finder/sharded/SimpleShardedServiceFinder.java index 9d229705..fe0d1fdd 100644 --- a/src/main/java/com/flipkart/ranger/finder/sharded/SimpleShardedServiceFinder.java +++ b/src/main/java/com/flipkart/ranger/finder/sharded/SimpleShardedServiceFinder.java @@ -16,14 +16,15 @@ package com.flipkart.ranger.finder.sharded; +import com.flipkart.ranger.finder.ServiceRegistryUpdater; import com.flipkart.ranger.finder.ServiceFinder; import com.flipkart.ranger.model.ServiceNodeSelector; import com.flipkart.ranger.model.ShardSelector; public class SimpleShardedServiceFinder extends ServiceFinder> { - public SimpleShardedServiceFinder(MapBasedServiceRegistry serviceRegistry, + public SimpleShardedServiceFinder(MapBasedServiceRegistry serviceRegistry, ServiceRegistryUpdater updater, ShardSelector> shardSelector, - ServiceNodeSelector nodeSelector) { - super(serviceRegistry, shardSelector, nodeSelector); + ServiceNodeSelector nodeSelector, int healthcheckRefreshTimeMillis) { + super(serviceRegistry, updater, shardSelector, nodeSelector, healthcheckRefreshTimeMillis); } } diff --git a/src/main/java/com/flipkart/ranger/finder/sharded/SimpleShardedServiceFinderBuilder.java b/src/main/java/com/flipkart/ranger/finder/sharded/SimpleShardedServiceFinderBuilder.java index b153f82e..3349a70a 100644 --- a/src/main/java/com/flipkart/ranger/finder/sharded/SimpleShardedServiceFinderBuilder.java +++ b/src/main/java/com/flipkart/ranger/finder/sharded/SimpleShardedServiceFinderBuilder.java @@ -16,23 +16,23 @@ package com.flipkart.ranger.finder.sharded; +import com.flipkart.ranger.finder.ServiceRegistryUpdater; import com.flipkart.ranger.finder.BaseServiceFinderBuilder; -import com.flipkart.ranger.finder.Service; -import com.flipkart.ranger.model.Deserializer; +import com.flipkart.ranger.finder.SourceConfig; import com.flipkart.ranger.model.ServiceNodeSelector; import com.flipkart.ranger.model.ShardSelector; public class SimpleShardedServiceFinderBuilder extends BaseServiceFinderBuilder, SimpleShardedServiceFinder> { @Override - protected SimpleShardedServiceFinder buildFinder(Service service, - Deserializer deserializer, + protected SimpleShardedServiceFinder buildFinder(SourceConfig config, + ServiceRegistryUpdater registryUpdater, ShardSelector> shardSelector, ServiceNodeSelector nodeSelector, int healthcheckRefreshTimeMillis) { if(null == shardSelector) { shardSelector = new MatchingShardSelector(); } - MapBasedServiceRegistry serviceRegistry = new MapBasedServiceRegistry(service, deserializer, healthcheckRefreshTimeMillis); - return new SimpleShardedServiceFinder(serviceRegistry, shardSelector, nodeSelector); + MapBasedServiceRegistry serviceRegistry = new MapBasedServiceRegistry(); + return new SimpleShardedServiceFinder(serviceRegistry, registryUpdater, shardSelector, nodeSelector, healthcheckRefreshTimeMillis); } } diff --git a/src/main/java/com/flipkart/ranger/finder/unsharded/UnshardedClusterFinder.java b/src/main/java/com/flipkart/ranger/finder/unsharded/UnshardedClusterFinder.java index c932034b..2367e4fc 100644 --- a/src/main/java/com/flipkart/ranger/finder/unsharded/UnshardedClusterFinder.java +++ b/src/main/java/com/flipkart/ranger/finder/unsharded/UnshardedClusterFinder.java @@ -16,14 +16,15 @@ package com.flipkart.ranger.finder.unsharded; +import com.flipkart.ranger.finder.ServiceRegistryUpdater; import com.flipkart.ranger.finder.ServiceFinder; import com.flipkart.ranger.model.ServiceNodeSelector; import com.flipkart.ranger.model.ShardSelector; public class UnshardedClusterFinder extends ServiceFinder { - public UnshardedClusterFinder(UnshardedClusterServiceRegistry serviceRegistry, + public UnshardedClusterFinder(UnshardedClusterServiceRegistry serviceRegistry, ServiceRegistryUpdater updater, ShardSelector shardSelector, - ServiceNodeSelector nodeSelector) { - super(serviceRegistry, shardSelector, nodeSelector); + ServiceNodeSelector nodeSelector, int healthcheckRefreshTimeMillis) { + super(serviceRegistry, updater, shardSelector, nodeSelector, healthcheckRefreshTimeMillis); } } diff --git a/src/main/java/com/flipkart/ranger/finder/unsharded/UnshardedClusterServiceRegistry.java b/src/main/java/com/flipkart/ranger/finder/unsharded/UnshardedClusterServiceRegistry.java index 438e2785..e19935fc 100644 --- a/src/main/java/com/flipkart/ranger/finder/unsharded/UnshardedClusterServiceRegistry.java +++ b/src/main/java/com/flipkart/ranger/finder/unsharded/UnshardedClusterServiceRegistry.java @@ -16,30 +16,21 @@ package com.flipkart.ranger.finder.unsharded; -import com.flipkart.ranger.finder.AbstractZookeeperServiceRegistry; -import com.flipkart.ranger.finder.Service; -import com.flipkart.ranger.model.Deserializer; import com.flipkart.ranger.model.ServiceNode; +import com.flipkart.ranger.model.ServiceRegistry; import com.google.common.collect.ImmutableList; import java.util.List; import java.util.concurrent.atomic.AtomicReference; -public class UnshardedClusterServiceRegistry extends AbstractZookeeperServiceRegistry { +public class UnshardedClusterServiceRegistry implements ServiceRegistry { private AtomicReference>> nodes = new AtomicReference>>(); - protected UnshardedClusterServiceRegistry(Service service, - Deserializer deserializer, - int refreshInterval) { - super(service, deserializer, refreshInterval); - } - public List> nodes() { return nodes.get(); } - @Override public void nodes(List> serviceNodes) { nodes.set(ImmutableList.copyOf(serviceNodes)); } diff --git a/src/main/java/com/flipkart/ranger/finder/unsharded/UnshardedFinderBuilder.java b/src/main/java/com/flipkart/ranger/finder/unsharded/UnshardedFinderBuilder.java index 63dc21ce..14977fbe 100644 --- a/src/main/java/com/flipkart/ranger/finder/unsharded/UnshardedFinderBuilder.java +++ b/src/main/java/com/flipkart/ranger/finder/unsharded/UnshardedFinderBuilder.java @@ -16,22 +16,21 @@ package com.flipkart.ranger.finder.unsharded; +import com.flipkart.ranger.finder.ServiceRegistryUpdater; import com.flipkart.ranger.finder.BaseServiceFinderBuilder; -import com.flipkart.ranger.finder.Service; -import com.flipkart.ranger.model.Deserializer; +import com.flipkart.ranger.finder.SourceConfig; import com.flipkart.ranger.model.ServiceNodeSelector; import com.flipkart.ranger.model.ShardSelector; public class UnshardedFinderBuilder extends BaseServiceFinderBuilder { @Override - protected UnshardedClusterFinder buildFinder(Service service, - Deserializer deserializer, + protected UnshardedClusterFinder buildFinder(SourceConfig config, + ServiceRegistryUpdater registryUpdater, ShardSelector shardSelector, ServiceNodeSelector nodeSelector, int healthcheckRefreshTimeMillis) { - UnshardedClusterServiceRegistry unshardedClusterServiceRegistry - = new UnshardedClusterServiceRegistry(service, deserializer, healthcheckRefreshTimeMillis); - return new UnshardedClusterFinder(unshardedClusterServiceRegistry, new NoOpShardSelector(), nodeSelector); + UnshardedClusterServiceRegistry unshardedClusterServiceRegistry = new UnshardedClusterServiceRegistry(); + return new UnshardedClusterFinder(unshardedClusterServiceRegistry, registryUpdater, new NoOpShardSelector(), nodeSelector, healthcheckRefreshTimeMillis); } } diff --git a/src/main/java/com/flipkart/ranger/healthservice/ServiceHealthAggregator.java b/src/main/java/com/flipkart/ranger/healthservice/ServiceHealthAggregator.java index 3ec27e8f..eef4c463 100644 --- a/src/main/java/com/flipkart/ranger/healthservice/ServiceHealthAggregator.java +++ b/src/main/java/com/flipkart/ranger/healthservice/ServiceHealthAggregator.java @@ -40,7 +40,7 @@ public class ServiceHealthAggregator implements HealthService, Healthcheck { /* Logger */ - private static final Logger logger = LoggerFactory.getLogger(ServiceHealthAggregator.class.getSimpleName()); + private static final Logger logger = LoggerFactory.getLogger(ServiceHealthAggregator.class); /* An atomic reference of the aggregated health */ private AtomicReference healthcheckStatusAtomicReference; @@ -153,12 +153,15 @@ public HealthcheckStatus getServiceHealth() { /* check health status of isolated monitors */ for (IsolatedHealthMonitor isolatedHealthMonitor : isolatedHealthMonitorList) { if (isolatedHealthMonitor.isDisabled()) { + logger.debug("isolatedHealthMonitor is disabled"); continue; } Long timeDifference; if (null != isolatedHealthMonitor.getLastStatusUpdateTime()) { timeDifference = currentTime.getTime() - isolatedHealthMonitor.getLastStatusUpdateTime().getTime(); + logger.debug("time difference: " + timeDifference.toString()); } else { + logger.debug("time difference is set to null"); timeDifference = null; } /* check if the monitor and its last updated time is stale, if so, mark status as unhealthy */ @@ -167,6 +170,7 @@ public HealthcheckStatus getServiceHealth() { logger.error("Monitor: {} is stuck and its status is stale. Marking service as unhealthy", isolatedHealthMonitor.getName()); healthcheckStatusAtomicReference.set(HealthcheckStatus.unhealthy); } else if (HealthcheckStatus.unhealthy == isolatedHealthMonitor.getHealthStatus()) { + logger.debug("isolatedHealthMonitor is unhealthy"); healthcheckStatusAtomicReference.set(HealthcheckStatus.unhealthy); } } diff --git a/src/main/java/com/flipkart/ranger/model/HttpResponseDecoder.java b/src/main/java/com/flipkart/ranger/model/HttpResponseDecoder.java new file mode 100644 index 00000000..96e3fe1f --- /dev/null +++ b/src/main/java/com/flipkart/ranger/model/HttpResponseDecoder.java @@ -0,0 +1,23 @@ +/** + * Copyright 2015 Flipkart Internet Pvt. Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.flipkart.ranger.model; + +import java.util.List; + +public interface HttpResponseDecoder { + List> deserialize(final byte[] data); +} diff --git a/src/main/java/com/flipkart/ranger/model/PathBuilder.java b/src/main/java/com/flipkart/ranger/model/PathBuilder.java index 35725ed3..93d42f20 100644 --- a/src/main/java/com/flipkart/ranger/model/PathBuilder.java +++ b/src/main/java/com/flipkart/ranger/model/PathBuilder.java @@ -16,15 +16,13 @@ package com.flipkart.ranger.model; -import com.flipkart.ranger.finder.Service; - public class PathBuilder { private PathBuilder() { throw new InstantiationError("Must not instantiate this class"); } - public static String path(final Service service) { - return String.format("/%s", service.getServiceName()); + public static String path(final String serviceName) { + return String.format("/%s", serviceName); } } diff --git a/src/main/java/com/flipkart/ranger/model/ServiceRegistry.java b/src/main/java/com/flipkart/ranger/model/ServiceRegistry.java index a5851c23..36d08901 100644 --- a/src/main/java/com/flipkart/ranger/model/ServiceRegistry.java +++ b/src/main/java/com/flipkart/ranger/model/ServiceRegistry.java @@ -16,30 +16,8 @@ package com.flipkart.ranger.model; -import com.flipkart.ranger.finder.Service; - import java.util.List; -public abstract class ServiceRegistry { - private final Service service; - private final Deserializer deserializer; - - protected ServiceRegistry(Service service, Deserializer deserializer) { - this.service = service; - this.deserializer = deserializer; - } - - public abstract void start() throws Exception; - - public abstract void stop() throws Exception; - - abstract public void nodes(List> nodes); - - public Service getService() { - return service; - } - - public Deserializer getDeserializer() { - return deserializer; - } +public interface ServiceRegistry { + public void nodes(List> nodes); } diff --git a/src/test/java/com/flipkart/ranger/healthservice/ServiceHealthAggregatorTest.java b/src/test/java/com/flipkart/ranger/healthservice/ServiceHealthAggregatorTest.java index 4e6b010d..c7a873bf 100644 --- a/src/test/java/com/flipkart/ranger/healthservice/ServiceHealthAggregatorTest.java +++ b/src/test/java/com/flipkart/ranger/healthservice/ServiceHealthAggregatorTest.java @@ -24,9 +24,9 @@ import org.junit.Before; import org.junit.Test; -public class ServiceHealthAggregatorTest { - +import java.util.concurrent.atomic.AtomicInteger; +public class ServiceHealthAggregatorTest { ServiceHealthAggregator serviceHealthAggregator = new ServiceHealthAggregator(); TestMonitor testMonitor; @Before @@ -56,28 +56,25 @@ public void tearDown() throws Exception { @Test public void testStaleRun() throws Exception { - testMonitor.run(); - testMonitor.setThreadSleep(2000); + testMonitor.setThreadSleep(6000); Thread.sleep(4000); - /* in the TestMonitor, thread was sleeping for 2 seconds, */ + /* in the TestMonitor, thread was sleeping for 6 seconds, */ /* so its state is supposed to be stale (>1 second) and service has to be unhealthy */ Assert.assertEquals(HealthcheckStatus.unhealthy, serviceHealthAggregator.getServiceHealth()); - testMonitor.setThreadSleep(10); Thread.sleep(4000); /* in the TestMonitor, thread is sleeping only for 10 milliseconds, */ /* so its state is supposed to be NOT stale (>1 second) and service has to be healthy */ Assert.assertEquals(HealthcheckStatus.healthy, serviceHealthAggregator.getServiceHealth()); - } private class TestMonitor extends IsolatedHealthMonitor { - int threadSleep = 2000; + AtomicInteger threadSleep = new AtomicInteger(2000); public TestMonitor(String name, TimeEntity timeEntity) { super(name, timeEntity); @@ -88,13 +85,13 @@ public TestMonitor(String name, TimeEntity timeEntity, long stalenessAllowedInMi } public void setThreadSleep(int threadSleep) { - this.threadSleep = threadSleep; + this.threadSleep.set(threadSleep); } @Override public synchronized HealthcheckStatus monitor() { try { - Thread.sleep(threadSleep); + Thread.sleep(threadSleep.get()); } catch (InterruptedException e) { } return HealthcheckStatus.healthy; diff --git a/src/test/java/com/flipkart/ranger/healthservice/ServiceProviderIntegrationTest.java b/src/test/java/com/flipkart/ranger/healthservice/ServiceProviderIntegrationTest.java index e704322e..6ae46c7f 100644 --- a/src/test/java/com/flipkart/ranger/healthservice/ServiceProviderIntegrationTest.java +++ b/src/test/java/com/flipkart/ranger/healthservice/ServiceProviderIntegrationTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.flipkart.ranger.ServiceFinderBuilders; import com.flipkart.ranger.ServiceProviderBuilders; +import com.flipkart.ranger.finder.ZookeeperSourceConfig; import com.flipkart.ranger.finder.unsharded.UnshardedClusterFinder; import com.flipkart.ranger.finder.unsharded.UnshardedClusterInfo; import com.flipkart.ranger.healthcheck.Healthchecks; @@ -64,23 +65,23 @@ public void startTestCluster() throws Exception { registerService("localhost-4", 9000, 2, anotherFile); + Deserializer deserializer = new Deserializer() { + @Override + public ServiceNode deserialize(byte[] data) { + try { + return objectMapper.readValue(data, + new TypeReference>() { + }); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + }; + ZookeeperSourceConfig curatorSourceConfig = new ZookeeperSourceConfig(testingCluster.getConnectString(), "test", "test-service", deserializer); + serviceFinder = ServiceFinderBuilders.unshardedFinderBuilder() - .withConnectionString(testingCluster.getConnectString()) - .withNamespace("test") - .withServiceName("test-service") - .withDeserializer(new Deserializer() { - @Override - public ServiceNode deserialize(byte[] data) { - try { - return objectMapper.readValue(data, - new TypeReference>() { - }); - } catch (IOException e) { - e.printStackTrace(); - } - return null; - } - }) + .withSourceConfig(curatorSourceConfig) .build(); serviceFinder.start(); } diff --git a/src/test/java/com/flipkart/ranger/httpservicefinder/ShardedBasicTest.java b/src/test/java/com/flipkart/ranger/httpservicefinder/ShardedBasicTest.java new file mode 100644 index 00000000..d83fc61c --- /dev/null +++ b/src/test/java/com/flipkart/ranger/httpservicefinder/ShardedBasicTest.java @@ -0,0 +1,126 @@ +/** + * Copyright 2015 Flipkart Internet Pvt. Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.flipkart.ranger.httpservicefinder; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.flipkart.ranger.ServiceFinderBuilders; +import com.flipkart.ranger.finder.HttpVerb.GetHttpVerb; +import com.flipkart.ranger.finder.HttpSourceConfig; +import com.flipkart.ranger.finder.sharded.SimpleShardedServiceFinder; +import com.flipkart.ranger.model.HttpResponseDecoder; +import com.flipkart.ranger.model.ServiceNode; +import com.github.tomakehurst.wiremock.WireMockServer; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Instant; +import java.util.List; + +import static com.github.tomakehurst.wiremock.client.WireMock.*; + +public class ShardedBasicTest { + private static final Logger logger = LoggerFactory.getLogger(ShardedBasicTest.class); + private ObjectMapper objectMapper; + private WireMockServer wireMockServer; + + @Before + public void startTestCluster() throws Exception{ + objectMapper = new ObjectMapper(); + wireMockServer = new WireMockServer(); + wireMockServer.start(); + configureFor("localhost", 8080); + Instant instant = Instant.now(); + long timeStampMillis = instant.toEpochMilli(); + String json = "[{\"host\":\"localhost\",\"port\":31649,\"nodeData\":{\"shardId\":1},\"healthcheckStatus\":\"healthy\",\"lastUpdatedTimeStamp\":"+ Long.toString(timeStampMillis) + "}, {\"host\":\"localhost\",\"port\":31648,\"nodeData\":{\"shardId\":1},\"healthcheckStatus\":\"healthy\",\"lastUpdatedTimeStamp\":"+ Long.toString(timeStampMillis) + "}, {\"host\":\"localhost\",\"port\":31647,\"nodeData\":{\"shardId\":1},\"healthcheckStatus\":\"unhealthy\",\"lastUpdatedTimeStamp\":"+ Long.toString(timeStampMillis) + "}]"; + stubFor(get(urlEqualTo("/testsharded")).willReturn(aResponse().withBody(json))); + } + + @After + public void stopTestCluster() throws Exception { + wireMockServer.stop(); + } + + private static final class TestShardInfo { + private int shardId; + + public TestShardInfo(int shardId) { + this.shardId = shardId; + } + + public TestShardInfo() { + } + + public int getshardId() { + return shardId; + } + + public void setEnvironment(String environment) { + this.shardId = shardId; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ShardedBasicTest.TestShardInfo that = (ShardedBasicTest.TestShardInfo) o; + + if (shardId != that.shardId) return false; + + return true; + } + + @Override + public int hashCode() { + return shardId; + } + } + + @Test + public void testBasicDiscovery() throws Exception { + HttpResponseDecoder deserializer = new HttpResponseDecoder() { + @Override + public List> deserialize(byte[] data) { + try { + return objectMapper.readValue(data, + new TypeReference>>() { + }); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + }; + + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("localhost", 8080, "/testsharded", deserializer, false, false, new GetHttpVerb()); + + SimpleShardedServiceFinder serviceFinder = ServiceFinderBuilders.shardedFinderBuilder() + .withSourceConfig(httpSourceConfig) + .build(); + + serviceFinder.start(); + + List> nodesList = serviceFinder.getAll(new TestShardInfo(1)); + Assert.assertEquals(2, nodesList.size()); + } +} diff --git a/src/test/java/com/flipkart/ranger/httpservicefinder/ShardedTimerTest.java b/src/test/java/com/flipkart/ranger/httpservicefinder/ShardedTimerTest.java new file mode 100644 index 00000000..ba980579 --- /dev/null +++ b/src/test/java/com/flipkart/ranger/httpservicefinder/ShardedTimerTest.java @@ -0,0 +1,144 @@ +/** + * Copyright 2015 Flipkart Internet Pvt. Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.flipkart.ranger.httpservicefinder; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.flipkart.ranger.ServiceFinderBuilders; +import com.flipkart.ranger.finder.HttpVerb.GetHttpVerb; +import com.flipkart.ranger.finder.HttpSourceConfig; +import com.flipkart.ranger.finder.sharded.SimpleShardedServiceFinder; +import com.flipkart.ranger.healthcheck.HealthcheckStatus; +import com.flipkart.ranger.model.HttpResponseDecoder; +import com.flipkart.ranger.model.ServiceNode; +import com.github.tomakehurst.wiremock.WireMockServer; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Instant; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.github.tomakehurst.wiremock.client.WireMock.*; +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; + +public class ShardedTimerTest { + private static final Logger logger = LoggerFactory.getLogger(ShardedTimerTest.class); + private ObjectMapper objectMapper; + private WireMockServer wireMockServer; + + @Before + public void startTestCluster() throws Exception{ + objectMapper = new ObjectMapper(); + wireMockServer = new WireMockServer(); + wireMockServer.start(); + configureFor("localhost", 8080); + Instant instant = Instant.now(); + long timeStampMillis = instant.toEpochMilli(); + String json = "[{\"host\":\"localhost\",\"port\":31649,\"nodeData\":{\"shardId\":1},\"healthcheckStatus\":\"healthy\",\"lastUpdatedTimeStamp\":"+ Long.toString(timeStampMillis) + "}, {\"host\":\"localhost\",\"port\":31648,\"nodeData\":{\"shardId\":1},\"healthcheckStatus\":\"healthy\",\"lastUpdatedTimeStamp\":"+ Long.toString(timeStampMillis) + "}, {\"host\":\"localhost\",\"port\":31647,\"nodeData\":{\"shardId\":1},\"healthcheckStatus\":\"unhealthy\",\"lastUpdatedTimeStamp\":"+ Long.toString(timeStampMillis) + "}]"; + stubFor(get(urlEqualTo("/testsharded")).willReturn(aResponse().withBody(json))); + } + + @After + public void stopTestCluster() throws Exception { + wireMockServer.stop(); + } + + private static final class TestShardInfo { + private int shardId; + + public TestShardInfo(int shardId) { + this.shardId = shardId; + } + + public TestShardInfo() { + } + + public int getshardId() { + return shardId; + } + + public void setEnvironment(String environment) { + this.shardId = shardId; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ShardedTimerTest.TestShardInfo that = (ShardedTimerTest.TestShardInfo) o; + + if (shardId != that.shardId) return false; + + return true; + } + + @Override + public int hashCode() { + return shardId; + } + } + + @Test + public void testBasicDiscovery() throws Exception { + HttpResponseDecoder deserializer = new HttpResponseDecoder() { + private AtomicInteger integer = new AtomicInteger(0); + + @Override + public List> deserialize(byte[] data) { + try { + List> serviceNodes= + objectMapper.readValue(data, + new TypeReference>>() { + }); + //marking all the ServiceNodes unhealthy after few calls to deserialize + if (integer.incrementAndGet() > 2) { + for (ServiceNode serviceNode : serviceNodes) { + serviceNode.setHealthcheckStatus(HealthcheckStatus.unhealthy); + } + } + return serviceNodes; + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + }; + + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("localhost", 8080, "/testsharded", deserializer, false, false, new GetHttpVerb()); + + SimpleShardedServiceFinder serviceFinder = ServiceFinderBuilders.shardedFinderBuilder() + .withSourceConfig(httpSourceConfig) + .build(); + + serviceFinder.start(); + + List> nodesList = serviceFinder.getAll(new TestShardInfo(1)); + Assert.assertEquals(2, nodesList.size()); + + Thread.sleep(3000); + + //NodesList must be empty as all the ServiceNodes are marked unhealthy + List> secondNodesList = serviceFinder.getAll(new TestShardInfo(1)); + Assert.assertEquals(0, secondNodesList.size()); + } +} diff --git a/src/test/java/com/flipkart/ranger/httpservicefinder/UnshardedBasicTest.java b/src/test/java/com/flipkart/ranger/httpservicefinder/UnshardedBasicTest.java new file mode 100644 index 00000000..e1719715 --- /dev/null +++ b/src/test/java/com/flipkart/ranger/httpservicefinder/UnshardedBasicTest.java @@ -0,0 +1,107 @@ +/** + * Copyright 2015 Flipkart Internet Pvt. Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.flipkart.ranger.httpservicefinder; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.flipkart.ranger.ServiceFinderBuilders; +import com.flipkart.ranger.finder.HttpVerb.GetHttpVerb; +import com.flipkart.ranger.finder.HttpSourceConfig; +import com.flipkart.ranger.finder.unsharded.UnshardedClusterFinder; +import com.flipkart.ranger.finder.unsharded.UnshardedClusterInfo; +import com.flipkart.ranger.model.HttpResponseDecoder; +import com.flipkart.ranger.model.ServiceNode; +import com.github.tomakehurst.wiremock.WireMockServer; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Instant; +import java.util.List; + +import static com.github.tomakehurst.wiremock.client.WireMock.*; + +public class UnshardedBasicTest { + private static final Logger logger = LoggerFactory.getLogger(UnshardedBasicTest.class); + private ObjectMapper objectMapper; + private WireMockServer wireMockServer; + + @Before + public void startTestCluster() throws Exception{ + objectMapper = new ObjectMapper(); + wireMockServer = new WireMockServer(); + wireMockServer.start(); + configureFor("localhost", 8080); + Instant instant = Instant.now(); + long timeStampMillis = instant.toEpochMilli(); + String json = "[{\"host\":\"localhost\",\"port\":31649,\"nodeData\":{\"environment\":\"stage\"},\"healthcheckStatus\":\"healthy\",\"lastUpdatedTimeStamp\":"+ Long.toString(timeStampMillis) + "}, {\"host\":\"localhost\",\"port\":31648,\"nodeData\":{\"environment\":\"stage\"},\"healthcheckStatus\":\"healthy\",\"lastUpdatedTimeStamp\":"+ Long.toString(timeStampMillis) + "}, {\"host\":\"localhost\",\"port\":31647,\"nodeData\":{\"environment\":\"stage\"},\"healthcheckStatus\":\"unhealthy\",\"lastUpdatedTimeStamp\":"+ Long.toString(timeStampMillis) + "}]"; + logger.debug(json); + stubFor(get(urlEqualTo("/test")).willReturn(aResponse().withBody(json))); + } + + @After + public void stopTestCluster() throws Exception { + wireMockServer.stop(); + } + + static class EnvNodeData extends UnshardedClusterInfo{ + private String environment; + + public EnvNodeData() { + } + + public String getEnvironment() { + return environment; + } + + public void setEnvironment(String environment) { + this.environment = environment; + } + } + + @Test + public void testBasicDiscovery() throws Exception { + HttpResponseDecoder deserializer = new HttpResponseDecoder() { + @Override + public List> deserialize(byte[] data) { + try { + return objectMapper.readValue(data, + new TypeReference>>() { + }); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + }; + + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("localhost", 8080, "/test", deserializer, false, false, new GetHttpVerb()); + + UnshardedClusterFinder serviceFinder = ServiceFinderBuilders.unshardedFinderBuilder() + .withSourceConfig(httpSourceConfig) + .build(); + + serviceFinder.start(); + + List> nodesList = serviceFinder.getAll(null); + + Assert.assertEquals(2, nodesList.size()); + } +} diff --git a/src/test/java/com/flipkart/ranger/httpservicefinder/UnshardedTimerTest.java b/src/test/java/com/flipkart/ranger/httpservicefinder/UnshardedTimerTest.java new file mode 100644 index 00000000..c8769a3e --- /dev/null +++ b/src/test/java/com/flipkart/ranger/httpservicefinder/UnshardedTimerTest.java @@ -0,0 +1,126 @@ +/** + * Copyright 2015 Flipkart Internet Pvt. Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.flipkart.ranger.httpservicefinder; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.flipkart.ranger.ServiceFinderBuilders; +import com.flipkart.ranger.finder.HttpVerb.GetHttpVerb; +import com.flipkart.ranger.finder.HttpSourceConfig; +import com.flipkart.ranger.finder.unsharded.UnshardedClusterFinder; +import com.flipkart.ranger.finder.unsharded.UnshardedClusterInfo; +import com.flipkart.ranger.healthcheck.HealthcheckStatus; +import com.flipkart.ranger.model.HttpResponseDecoder; +import com.flipkart.ranger.model.ServiceNode; +import com.github.tomakehurst.wiremock.WireMockServer; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Instant; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.github.tomakehurst.wiremock.client.WireMock.*; +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; + +public class UnshardedTimerTest { + private static final Logger logger = LoggerFactory.getLogger(UnshardedTimerTest.class); + private ObjectMapper objectMapper; + private WireMockServer wireMockServer; + + @Before + public void startTestCluster() throws Exception{ + objectMapper = new ObjectMapper(); + wireMockServer = new WireMockServer(); + wireMockServer.start(); + configureFor("localhost", 8080); + Instant instant = Instant.now(); + long timeStampMillis = instant.toEpochMilli(); + String json = "[{\"host\":\"localhost\",\"port\":31649,\"nodeData\":{\"environment\":\"stage\"},\"healthcheckStatus\":\"healthy\",\"lastUpdatedTimeStamp\":"+ Long.toString(timeStampMillis) + "}, {\"host\":\"localhost\",\"port\":31648,\"nodeData\":{\"environment\":\"stage\"},\"healthcheckStatus\":\"healthy\",\"lastUpdatedTimeStamp\":"+ Long.toString(timeStampMillis) + "}, {\"host\":\"localhost\",\"port\":31647,\"nodeData\":{\"environment\":\"stage\"},\"healthcheckStatus\":\"unhealthy\",\"lastUpdatedTimeStamp\":"+ Long.toString(timeStampMillis) + "}]"; + logger.debug(json); + stubFor(get(urlEqualTo("/test")).willReturn(aResponse().withBody(json))); + } + + @After + public void stopTestCluster() throws Exception { + wireMockServer.stop(); + } + + @Test + public void testBasicDiscovery() throws Exception { + HttpResponseDecoder listDeserializer = new HttpResponseDecoder() { + private AtomicInteger integer = new AtomicInteger(0); + + @Override + public List> deserialize(byte[] data) { + try { + List> serviceNodes= + objectMapper.readValue(data, + new TypeReference>>() { + }); + //marking all the ServiceNodes unhealthy after few calls to deserialize + if (integer.incrementAndGet() > 2) { + for (ServiceNode serviceNode : serviceNodes) { + serviceNode.setHealthcheckStatus(HealthcheckStatus.unhealthy); + } + } + return serviceNodes; + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + }; + + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("localhost", 8080, "/test", listDeserializer, false, false, new GetHttpVerb()); + + UnshardedClusterFinder serviceFinder = ServiceFinderBuilders.unshardedFinderBuilder() + .withSourceConfig(httpSourceConfig) + .build(); + + serviceFinder.start(); + + List> nodesList = serviceFinder.getAll(null); + Assert.assertEquals(2, nodesList.size()); + + Thread.sleep(3000); + + //NodesList must be empty as all the ServiceNodes are marked unhealthy + List> secondNodesList = serviceFinder.getAll(null); + Assert.assertEquals(0, secondNodesList.size()); + + } + + static class EnvNodeData extends UnshardedClusterInfo{ + private String environment; + + public EnvNodeData() { + } + + public String getEnvironment() { + return environment; + } + + public void setEnvironment(String environment) { + this.environment = environment; + } + } +} diff --git a/src/test/java/com/flipkart/ranger/model/CustomShardSelectorTest.java b/src/test/java/com/flipkart/ranger/model/CustomShardSelectorTest.java index 67be4c2e..05e8caf3 100644 --- a/src/test/java/com/flipkart/ranger/model/CustomShardSelectorTest.java +++ b/src/test/java/com/flipkart/ranger/model/CustomShardSelectorTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.flipkart.ranger.ServiceFinderBuilders; import com.flipkart.ranger.ServiceProviderBuilders; +import com.flipkart.ranger.finder.ZookeeperSourceConfig; import com.flipkart.ranger.finder.sharded.MapBasedServiceRegistry; import com.flipkart.ranger.finder.sharded.SimpleShardedServiceFinder; import com.flipkart.ranger.healthcheck.Healthchecks; @@ -128,24 +129,23 @@ public List> nodes(TestShardInfo criteria, MapBasedSe } @Test public void testBasicDiscovery() throws Exception { + Deserializer deserializer = new Deserializer() { + @Override + public ServiceNode deserialize(byte[] data) { + try { + return objectMapper.readValue(data, + new TypeReference>() { + }); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + }; + ZookeeperSourceConfig curatorSourceConfig = new ZookeeperSourceConfig(testingCluster.getConnectString(), "test", "test-service", deserializer); SimpleShardedServiceFinder serviceFinder = ServiceFinderBuilders.shardedFinderBuilder() - .withConnectionString(testingCluster.getConnectString()) - .withNamespace("test") - .withServiceName("test-service") + .withSourceConfig(curatorSourceConfig) .withShardSelector(new TestShardSelector()) - .withDeserializer(new Deserializer() { - @Override - public ServiceNode deserialize(byte[] data) { - try { - return objectMapper.readValue(data, - new TypeReference>() { - }); - } catch (IOException e) { - e.printStackTrace(); - } - return null; - } - }) .build(); serviceFinder.start(); { diff --git a/src/test/java/com/flipkart/ranger/model/ServiceNoProviderTest.java b/src/test/java/com/flipkart/ranger/model/ServiceNoProviderTest.java index 83babe40..c0119473 100644 --- a/src/test/java/com/flipkart/ranger/model/ServiceNoProviderTest.java +++ b/src/test/java/com/flipkart/ranger/model/ServiceNoProviderTest.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.flipkart.ranger.ServiceFinderBuilders; +import com.flipkart.ranger.finder.ZookeeperSourceConfig; import com.flipkart.ranger.finder.RoundRobinServiceNodeSelector; import com.flipkart.ranger.finder.sharded.SimpleShardedServiceFinder; import org.apache.curator.test.TestingCluster; @@ -89,23 +90,23 @@ public int hashCode() { @Test public void testBasicDiscovery() throws Exception { + Deserializer deserializer = new Deserializer() { + @Override + public ServiceNode deserialize(byte[] data) { + try { + return objectMapper.readValue(data, + new TypeReference>() { + }); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + }; + ZookeeperSourceConfig curatorSourceConfig = new ZookeeperSourceConfig(testingCluster.getConnectString(), "test", "test-service", deserializer); + SimpleShardedServiceFinder serviceFinder = ServiceFinderBuilders.shardedFinderBuilder() - .withConnectionString(testingCluster.getConnectString()) - .withNamespace("test") - .withServiceName("test-service") - .withDeserializer(new Deserializer() { - @Override - public ServiceNode deserialize(byte[] data) { - try { - return objectMapper.readValue(data, - new TypeReference>() { - }); - } catch (IOException e) { - e.printStackTrace(); - } - return null; - } - }) + .withSourceConfig(curatorSourceConfig) .build(); serviceFinder.start(); ServiceNode node = serviceFinder.get(new TestShardInfo(1)); @@ -116,24 +117,23 @@ public ServiceNode deserialize(byte[] data) { @Test public void testBasicDiscoveryRR() throws Exception { + Deserializer deserializer = new Deserializer() { + @Override + public ServiceNode deserialize(byte[] data) { + try { + return objectMapper.readValue(data, + new TypeReference>() { + }); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + }; + ZookeeperSourceConfig curatorSourceConfig = new ZookeeperSourceConfig(testingCluster.getConnectString(), "test", "test-service", deserializer); SimpleShardedServiceFinder serviceFinder = ServiceFinderBuilders.shardedFinderBuilder() - .withConnectionString(testingCluster.getConnectString()) - .withNamespace("test") - .withServiceName("test-service") + .withSourceConfig(curatorSourceConfig) .withNodeSelector(new RoundRobinServiceNodeSelector()) - .withDeserializer(new Deserializer() { - @Override - public ServiceNode deserialize(byte[] data) { - try { - return objectMapper.readValue(data, - new TypeReference>() { - }); - } catch (IOException e) { - e.printStackTrace(); - } - return null; - } - }) .build(); serviceFinder.start(); ServiceNode node = serviceFinder.get(new TestShardInfo(1)); diff --git a/src/test/java/com/flipkart/ranger/model/ServiceProviderExtCuratorTest.java b/src/test/java/com/flipkart/ranger/model/ServiceProviderExtCuratorTest.java index f0783ac6..396b6d17 100644 --- a/src/test/java/com/flipkart/ranger/model/ServiceProviderExtCuratorTest.java +++ b/src/test/java/com/flipkart/ranger/model/ServiceProviderExtCuratorTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.flipkart.ranger.ServiceFinderBuilders; import com.flipkart.ranger.ServiceProviderBuilders; +import com.flipkart.ranger.finder.CuratorFrameworkConfig; import com.flipkart.ranger.finder.sharded.SimpleShardedServiceFinder; import com.flipkart.ranger.healthcheck.Healthchecks; import com.flipkart.ranger.serviceprovider.ServiceProvider; @@ -60,6 +61,7 @@ public void startTestCluster() throws Exception { registerService("localhost-1", 9000, 1); registerService("localhost-2", 9000, 1); registerService("localhost-3", 9000, 2); + Thread.sleep(1000); } @After @@ -111,23 +113,23 @@ public int hashCode() { @Test public void testBasicDiscovery() throws Exception { + Deserializer deserializer = new Deserializer() { + @Override + public ServiceNode deserialize(byte[] data) { + try { + return objectMapper.readValue(data, + new TypeReference>() { + }); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + }; + CuratorFrameworkConfig curatorFrameworkConfig = new CuratorFrameworkConfig(curatorFramework, "test-service", deserializer); + SimpleShardedServiceFinder serviceFinder = ServiceFinderBuilders.shardedFinderBuilder() - .withCuratorFramework(curatorFramework) - .withNamespace("test") - .withServiceName("test-service") - .withDeserializer(new Deserializer() { - @Override - public ServiceNode deserialize(byte[] data) { - try { - return objectMapper.readValue(data, - new TypeReference>() { - }); - } catch (IOException e) { - e.printStackTrace(); - } - return null; - } - }) + .withSourceConfig(curatorFrameworkConfig) .build(); serviceFinder.start(); { @@ -157,6 +159,7 @@ public ServiceNode deserialize(byte[] data) { } private void registerService(String host, int port, int shardId) throws Exception { + logger.debug("Registering Service: " + host + ", " + port + ". ShardId: " + shardId); final ServiceProvider serviceProvider = ServiceProviderBuilders.shardedServiceProviderBuilder() .withCuratorFramework(curatorFramework) .withNamespace("test") @@ -179,5 +182,6 @@ public byte[] serialize(ServiceNode data) { .buildServiceDiscovery(); serviceProvider.start(); serviceProviders.add(serviceProvider); + logger.debug("Service registered: " + host + ", " + port + ". ShardId: " + shardId); } } \ No newline at end of file diff --git a/src/test/java/com/flipkart/ranger/model/ServiceProviderHealthcheckTest.java b/src/test/java/com/flipkart/ranger/model/ServiceProviderHealthcheckTest.java index 326679f6..6b4b445c 100644 --- a/src/test/java/com/flipkart/ranger/model/ServiceProviderHealthcheckTest.java +++ b/src/test/java/com/flipkart/ranger/model/ServiceProviderHealthcheckTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.flipkart.ranger.ServiceFinderBuilders; import com.flipkart.ranger.ServiceProviderBuilders; +import com.flipkart.ranger.finder.ZookeeperSourceConfig; import com.flipkart.ranger.finder.sharded.SimpleShardedServiceFinder; import com.flipkart.ranger.healthcheck.Healthcheck; import com.flipkart.ranger.healthcheck.HealthcheckStatus; @@ -96,24 +97,24 @@ public int hashCode() { @Test public void testBasicDiscovery() throws Exception { + Deserializer deserializer = new Deserializer() { + @Override + public ServiceNode deserialize(byte[] data) { + try { + return objectMapper.readValue(data, + new TypeReference>() { + }); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + }; + ZookeeperSourceConfig curatorSourceConfig = new ZookeeperSourceConfig(testingCluster.getConnectString(), "test", "test-service", deserializer); + SimpleShardedServiceFinder serviceFinder = ServiceFinderBuilders.shardedFinderBuilder() - .withConnectionString(testingCluster.getConnectString()) - .withNamespace("test") - .withServiceName("test-service") - .withDeserializer(new Deserializer() { - @Override - public ServiceNode deserialize(byte[] data) { - try { - return objectMapper.readValue(data, - new TypeReference>() { - }); - } catch (IOException e) { - e.printStackTrace(); - } - return null; - } - }) - .witHhealthcheckRefreshTimeMillis(10) + .withSourceConfig(curatorSourceConfig) + .withHealthcheckRefreshTimeMillis(10) .build(); serviceFinder.start(); ServiceNode node = serviceFinder.get(new TestShardInfo(1)); diff --git a/src/test/java/com/flipkart/ranger/model/ServiceProviderTest.java b/src/test/java/com/flipkart/ranger/model/ServiceProviderTest.java index cf4cc7d9..1fed81c8 100644 --- a/src/test/java/com/flipkart/ranger/model/ServiceProviderTest.java +++ b/src/test/java/com/flipkart/ranger/model/ServiceProviderTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.flipkart.ranger.ServiceFinderBuilders; import com.flipkart.ranger.ServiceProviderBuilders; +import com.flipkart.ranger.finder.ZookeeperSourceConfig; import com.flipkart.ranger.finder.RoundRobinServiceNodeSelector; import com.flipkart.ranger.finder.sharded.SimpleShardedServiceFinder; import com.flipkart.ranger.healthcheck.Healthchecks; @@ -103,23 +104,23 @@ public int hashCode() { @Test public void testBasicDiscovery() throws Exception { + Deserializer deserializer = new Deserializer() { + @Override + public ServiceNode deserialize(byte[] data) { + try { + return objectMapper.readValue(data, + new TypeReference>() { + }); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + }; + ZookeeperSourceConfig curatorSourceConfig = new ZookeeperSourceConfig(testingCluster.getConnectString(), "test", "test-service", deserializer); + SimpleShardedServiceFinder serviceFinder = ServiceFinderBuilders.shardedFinderBuilder() - .withConnectionString(testingCluster.getConnectString()) - .withNamespace("test") - .withServiceName("test-service") - .withDeserializer(new Deserializer() { - @Override - public ServiceNode deserialize(byte[] data) { - try { - return objectMapper.readValue(data, - new TypeReference>() { - }); - } catch (IOException e) { - e.printStackTrace(); - } - return null; - } - }) + .withSourceConfig(curatorSourceConfig) .build(); serviceFinder.start(); { @@ -150,24 +151,24 @@ public ServiceNode deserialize(byte[] data) { @Test public void testBasicDiscoveryRR() throws Exception { + Deserializer deserializer = new Deserializer() { + @Override + public ServiceNode deserialize(byte[] data) { + try { + return objectMapper.readValue(data, + new TypeReference>() { + }); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + }; + ZookeeperSourceConfig curatorSourceConfig = new ZookeeperSourceConfig(testingCluster.getConnectString(), "test", "test-service", deserializer); + SimpleShardedServiceFinder serviceFinder = ServiceFinderBuilders.shardedFinderBuilder() - .withConnectionString(testingCluster.getConnectString()) - .withNamespace("test") - .withServiceName("test-service") + .withSourceConfig(curatorSourceConfig) .withNodeSelector(new RoundRobinServiceNodeSelector()) - .withDeserializer(new Deserializer() { - @Override - public ServiceNode deserialize(byte[] data) { - try { - return objectMapper.readValue(data, - new TypeReference>() { - }); - } catch (IOException e) { - e.printStackTrace(); - } - return null; - } - }) .build(); serviceFinder.start(); { @@ -200,24 +201,24 @@ public ServiceNode deserialize(byte[] data) { @Test public void testVisibility() throws Exception { + Deserializer deserializer = new Deserializer() { + @Override + public ServiceNode deserialize(byte[] data) { + try { + return objectMapper.readValue(data, + new TypeReference>() { + }); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + }; + ZookeeperSourceConfig curatorSourceConfig = new ZookeeperSourceConfig(testingCluster.getConnectString(), "test", "test-service", deserializer); + SimpleShardedServiceFinder serviceFinder = ServiceFinderBuilders.shardedFinderBuilder() - .withConnectionString(testingCluster.getConnectString()) - .withNamespace("test") - .withServiceName("test-service") + .withSourceConfig(curatorSourceConfig) .withNodeSelector(new RoundRobinServiceNodeSelector()) - .withDeserializer(new Deserializer() { - @Override - public ServiceNode deserialize(byte[] data) { - try { - return objectMapper.readValue(data, - new TypeReference>() { - }); - } catch (IOException e) { - e.printStackTrace(); - } - return null; - } - }) .build(); serviceFinder.start(); List> all = serviceFinder.getAll(new TestShardInfo(1)); diff --git a/src/test/java/com/flipkart/ranger/model/SimpleServiceProviderTest.java b/src/test/java/com/flipkart/ranger/model/SimpleServiceProviderTest.java index 664969b3..05f14e61 100644 --- a/src/test/java/com/flipkart/ranger/model/SimpleServiceProviderTest.java +++ b/src/test/java/com/flipkart/ranger/model/SimpleServiceProviderTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.flipkart.ranger.ServiceFinderBuilders; import com.flipkart.ranger.ServiceProviderBuilders; +import com.flipkart.ranger.finder.ZookeeperSourceConfig; import com.flipkart.ranger.finder.unsharded.UnshardedClusterFinder; import com.flipkart.ranger.finder.unsharded.UnshardedClusterInfo; import com.flipkart.ranger.healthcheck.Healthchecks; @@ -59,23 +60,23 @@ public void stopTestCluster() throws Exception { @Test public void testBasicDiscovery() throws Exception { + Deserializer deserializer = new Deserializer() { + @Override + public ServiceNode deserialize(byte[] data) { + try { + return objectMapper.readValue(data, + new TypeReference>() { + }); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + }; + ZookeeperSourceConfig curatorSourceConfig = new ZookeeperSourceConfig(testingCluster.getConnectString(), "test", "test-service", deserializer); + UnshardedClusterFinder serviceFinder = ServiceFinderBuilders.unshardedFinderBuilder() - .withConnectionString(testingCluster.getConnectString()) - .withNamespace("test") - .withServiceName("test-service") - .withDeserializer(new Deserializer() { - @Override - public ServiceNode deserialize(byte[] data) { - try { - return objectMapper.readValue(data, - new TypeReference>() { - }); - } catch (IOException e) { - e.printStackTrace(); - } - return null; - } - }) + .withSourceConfig(curatorSourceConfig) .build(); serviceFinder.start(); {