diff --git a/pom.xml b/pom.xml index 22b95392..896407b8 100644 --- a/pom.xml +++ b/pom.xml @@ -110,6 +110,12 @@ guava-retrying 2.0.0 + + org.awaitility + awaitility + 4.0.2 + test + diff --git a/ranger-core/pom.xml b/ranger-core/pom.xml index 7897bdbe..28ececcd 100644 --- a/ranger-core/pom.xml +++ b/ranger-core/pom.xml @@ -18,5 +18,28 @@ ${slf4j.version} runtime + + org.apache.httpcomponents + httpclient + 4.5.11 + + + + + + + org.apache.maven.plugins + maven-jar-plugin + 3.2.0 + + + + test-jar + + + + + + \ No newline at end of file diff --git a/ranger-core/src/main/java/com/flipkart/ranger/core/finder/BaseServiceFinderBuilder.java b/ranger-core/src/main/java/com/flipkart/ranger/core/finder/BaseServiceFinderBuilder.java index e588872e..21a3379f 100644 --- a/ranger-core/src/main/java/com/flipkart/ranger/core/finder/BaseServiceFinderBuilder.java +++ b/ranger-core/src/main/java/com/flipkart/ranger/core/finder/BaseServiceFinderBuilder.java @@ -36,13 +36,14 @@ public abstract class BaseServiceFinderBuilder T, R extends ServiceRegistry, F extends ServiceFinder, - B extends BaseServiceFinderBuilder> { + B extends BaseServiceFinderBuilder, + D extends Deserializer> { protected String namespace; protected String serviceName; protected int nodeRefreshIntervalMs; protected boolean disablePushUpdaters; - protected Deserializer deserializer; + protected D deserializer; protected ShardSelector shardSelector; protected ServiceNodeSelector nodeSelector = new RandomServiceNodeSelector<>(); protected final List> additionalRefreshSignals = new ArrayList<>(); @@ -59,7 +60,7 @@ public B withServiceName(final String serviceName) { return (B)this; } - public B withDeserializer(Deserializer deserializer) { + public B withDeserializer(D deserializer) { this.deserializer = deserializer; return (B)this; } @@ -140,7 +141,7 @@ protected F buildFinder() { val finder = buildFinder(service, shardSelector, nodeSelector); val registry = finder.getServiceRegistry(); List> signalGenerators = new ArrayList<>(); - final NodeDataSource nodeDataSource = dataSource(service); + final NodeDataSource nodeDataSource = dataSource(service); signalGenerators.add(new ScheduledRegistryUpdateSignal<>(service, nodeRefreshIntervalMs)); additionalRefreshSignals.addAll(implementationSpecificRefreshSignals(service, nodeDataSource)); @@ -149,7 +150,7 @@ protected F buildFinder() { log.debug("Added additional signal handlers"); } - val updater = new ServiceRegistryUpdater(registry, nodeDataSource, signalGenerators, deserializer); + val updater = new ServiceRegistryUpdater(registry, nodeDataSource, signalGenerators, deserializer); finder.getStartSignal() .registerConsumers(startSignalHandlers) .registerConsumer(x -> nodeDataSource.start()) @@ -164,11 +165,11 @@ protected F buildFinder() { return finder; } - protected List> implementationSpecificRefreshSignals(Service service, NodeDataSource nodeDataSource) { + protected List> implementationSpecificRefreshSignals(Service service, NodeDataSource nodeDataSource) { return Collections.emptyList(); } - protected abstract NodeDataSource dataSource(Service service); + protected abstract NodeDataSource dataSource(Service service); protected abstract F buildFinder( Service service, diff --git a/ranger-core/src/main/java/com/flipkart/ranger/core/finder/ServiceRegistryUpdater.java b/ranger-core/src/main/java/com/flipkart/ranger/core/finder/ServiceRegistryUpdater.java index b7f3c014..4c361a1e 100644 --- a/ranger-core/src/main/java/com/flipkart/ranger/core/finder/ServiceRegistryUpdater.java +++ b/ranger-core/src/main/java/com/flipkart/ranger/core/finder/ServiceRegistryUpdater.java @@ -38,11 +38,11 @@ import java.util.concurrent.locks.ReentrantLock; @Slf4j -public class ServiceRegistryUpdater { +public class ServiceRegistryUpdater> { private final ServiceRegistry serviceRegistry; - private final NodeDataSource nodeDataSource; - private final Deserializer deserializer; + private final NodeDataSource nodeDataSource; + private final D deserializer; private Lock checkLock = new ReentrantLock(); private Condition checkCondition = checkLock.newCondition(); @@ -54,8 +54,9 @@ public class ServiceRegistryUpdater { public ServiceRegistryUpdater( ServiceRegistry serviceRegistry, - NodeDataSource nodeDataSource, - List> signalGenerators, Deserializer deserializer) { + NodeDataSource nodeDataSource, + List> signalGenerators, + D deserializer) { this.serviceRegistry = serviceRegistry; this.nodeDataSource = nodeDataSource; this.deserializer = deserializer; diff --git a/ranger-core/src/main/java/com/flipkart/ranger/core/finder/sharded/SimpleShardedServiceFinderBuilder.java b/ranger-core/src/main/java/com/flipkart/ranger/core/finder/sharded/SimpleShardedServiceFinderBuilder.java index 04b604b3..25a2a596 100644 --- a/ranger-core/src/main/java/com/flipkart/ranger/core/finder/sharded/SimpleShardedServiceFinderBuilder.java +++ b/ranger-core/src/main/java/com/flipkart/ranger/core/finder/sharded/SimpleShardedServiceFinderBuilder.java @@ -17,12 +17,13 @@ package com.flipkart.ranger.core.finder.sharded; import com.flipkart.ranger.core.finder.BaseServiceFinderBuilder; +import com.flipkart.ranger.core.model.Deserializer; import com.flipkart.ranger.core.model.Service; import com.flipkart.ranger.core.model.ServiceNodeSelector; import com.flipkart.ranger.core.model.ShardSelector; -public abstract class SimpleShardedServiceFinderBuilder> - extends BaseServiceFinderBuilder, SimpleShardedServiceFinder, B> { +public abstract class SimpleShardedServiceFinderBuilder, D extends Deserializer> + extends BaseServiceFinderBuilder, SimpleShardedServiceFinder, B, D> { @Override protected SimpleShardedServiceFinder buildFinder( diff --git a/ranger-core/src/main/java/com/flipkart/ranger/core/finder/unsharded/UnshardedFinderBuilder.java b/ranger-core/src/main/java/com/flipkart/ranger/core/finder/unsharded/UnshardedFinderBuilder.java index 1d40632e..b9af94aa 100644 --- a/ranger-core/src/main/java/com/flipkart/ranger/core/finder/unsharded/UnshardedFinderBuilder.java +++ b/ranger-core/src/main/java/com/flipkart/ranger/core/finder/unsharded/UnshardedFinderBuilder.java @@ -16,13 +16,14 @@ package com.flipkart.ranger.core.finder.unsharded; +import com.flipkart.ranger.core.model.Deserializer; import com.flipkart.ranger.core.model.Service; import com.flipkart.ranger.core.model.ShardSelector; import com.flipkart.ranger.core.finder.BaseServiceFinderBuilder; import com.flipkart.ranger.core.model.ServiceNodeSelector; -public abstract class UnshardedFinderBuilder> - extends BaseServiceFinderBuilder { +public abstract class UnshardedFinderBuilder, D extends Deserializer> + extends BaseServiceFinderBuilder { @Override protected UnshardedClusterFinder buildFinder( diff --git a/ranger-core/src/main/java/com/flipkart/ranger/core/model/Deserializer.java b/ranger-core/src/main/java/com/flipkart/ranger/core/model/Deserializer.java index 03eeb1a1..c4acf561 100644 --- a/ranger-core/src/main/java/com/flipkart/ranger/core/model/Deserializer.java +++ b/ranger-core/src/main/java/com/flipkart/ranger/core/model/Deserializer.java @@ -16,7 +16,5 @@ package com.flipkart.ranger.core.model; -@FunctionalInterface public interface Deserializer { - ServiceNode deserialize(final byte[] data); } diff --git a/ranger-core/src/main/java/com/flipkart/ranger/core/model/NodeDataSink.java b/ranger-core/src/main/java/com/flipkart/ranger/core/model/NodeDataSink.java new file mode 100644 index 00000000..92684451 --- /dev/null +++ b/ranger-core/src/main/java/com/flipkart/ranger/core/model/NodeDataSink.java @@ -0,0 +1,8 @@ +package com.flipkart.ranger.core.model; + +/** + * + */ +public interface NodeDataSink> extends NodeDataStoreConnector { + void updateState(S serializer, ServiceNode serviceNode); +} diff --git a/ranger-core/src/main/java/com/flipkart/ranger/core/model/NodeDataSource.java b/ranger-core/src/main/java/com/flipkart/ranger/core/model/NodeDataSource.java index 0d62fcd6..e11ccac9 100644 --- a/ranger-core/src/main/java/com/flipkart/ranger/core/model/NodeDataSource.java +++ b/ranger-core/src/main/java/com/flipkart/ranger/core/model/NodeDataSource.java @@ -6,16 +6,10 @@ /** * */ -public interface NodeDataSource { - void start(); +public interface NodeDataSource> extends NodeDataStoreConnector { + Optional>> refresh(D deserializer); - void ensureConnected(); - - void stop(); - - Optional>> refresh(Deserializer deserializer); - - boolean isActive(); - - void updateState(Serializer serializer, ServiceNode serviceNode); + default long healthcheckZombieCheckThresholdTime(Service service) { + return System.currentTimeMillis() - 60000; //1 Minute + } } diff --git a/ranger-core/src/main/java/com/flipkart/ranger/core/model/NodeDataStoreConnector.java b/ranger-core/src/main/java/com/flipkart/ranger/core/model/NodeDataStoreConnector.java new file mode 100644 index 00000000..e2cab33a --- /dev/null +++ b/ranger-core/src/main/java/com/flipkart/ranger/core/model/NodeDataStoreConnector.java @@ -0,0 +1,14 @@ +package com.flipkart.ranger.core.model; + +/** + * + */ +public interface NodeDataStoreConnector { + void start(); + + void ensureConnected(); + + void stop(); + + boolean isActive(); +} diff --git a/ranger-core/src/main/java/com/flipkart/ranger/core/model/Serializer.java b/ranger-core/src/main/java/com/flipkart/ranger/core/model/Serializer.java index f357107b..c4e8d401 100644 --- a/ranger-core/src/main/java/com/flipkart/ranger/core/model/Serializer.java +++ b/ranger-core/src/main/java/com/flipkart/ranger/core/model/Serializer.java @@ -16,7 +16,6 @@ package com.flipkart.ranger.core.model; -@FunctionalInterface public interface Serializer { - byte[] serialize(final ServiceNode data); +// byte[] serialize(final ServiceNode data); } diff --git a/ranger-core/src/main/java/com/flipkart/ranger/core/serviceprovider/BaseServiceProviderBuilder.java b/ranger-core/src/main/java/com/flipkart/ranger/core/serviceprovider/BaseServiceProviderBuilder.java index 8e1e2bd5..38db70af 100644 --- a/ranger-core/src/main/java/com/flipkart/ranger/core/serviceprovider/BaseServiceProviderBuilder.java +++ b/ranger-core/src/main/java/com/flipkart/ranger/core/serviceprovider/BaseServiceProviderBuilder.java @@ -22,10 +22,7 @@ import com.flipkart.ranger.core.healthservice.HealthService; import com.flipkart.ranger.core.healthservice.ServiceHealthAggregator; import com.flipkart.ranger.core.healthservice.monitor.IsolatedHealthMonitor; -import com.flipkart.ranger.core.model.NodeDataSource; -import com.flipkart.ranger.core.model.Serializer; -import com.flipkart.ranger.core.model.Service; -import com.flipkart.ranger.core.model.ServiceNode; +import com.flipkart.ranger.core.model.*; import com.flipkart.ranger.core.signals.ScheduledSignal; import com.flipkart.ranger.core.signals.Signal; import com.google.common.base.Preconditions; @@ -41,18 +38,18 @@ @Slf4j @NoArgsConstructor(access = AccessLevel.PROTECTED) -public abstract class BaseServiceProviderBuilder> { +public abstract class BaseServiceProviderBuilder, S extends Serializer> { protected String namespace; protected String serviceName; - protected Serializer serializer; + protected S serializer; protected String hostname; protected int port; protected T nodeData; protected int healthUpdateIntervalMs; protected int staleUpdateThresholdMs; protected List healthchecks = Lists.newArrayList(); - protected NodeDataSource nodeDataSource = null; + protected NodeDataSink nodeDataSource = null; protected final List> startSignalHandlers = Lists.newArrayList(); protected final List> stopSignalHandlers = Lists.newArrayList(); protected final List> extraRefreshSignals = Lists.newArrayList(); @@ -70,8 +67,8 @@ public B withServiceName(final String serviceName) { return (B)this; } - public B withSerializer(Serializer deserializer) { - this.serializer = deserializer; + public B withSerializer(S serializer) { + this.serializer = serializer; return (B)this; } @@ -119,7 +116,7 @@ public B withIsolatedHealthMonitor(IsolatedHealthMonitor monitor) { return (B)this; } - public B withNodeDataSource(NodeDataSource nodeDataSource) { + public B withNodeDataSource(NodeDataSink nodeDataSource) { this.nodeDataSource = nodeDataSource; return (B)this; } @@ -154,7 +151,7 @@ public B withExtraRefreshSignals(List> extraRefreshSig return (B)this; } - protected final ServiceProvider buildProvider() { + protected final ServiceProvider buildProvider() { Preconditions.checkNotNull(namespace); Preconditions.checkNotNull(serviceName); Preconditions.checkNotNull(serializer); @@ -186,7 +183,7 @@ protected final ServiceProvider buildProvider() { new HealthChecker(healthchecks, staleUpdateThresholdMs), Collections.emptyList(), healthUpdateIntervalMs); - final NodeDataSource usableNodeDataSource = dataSource(service); + final NodeDataSink usableNodeDataSource = dataSink(service); final List healthServices = Collections.singletonList(serviceHealthAggregator); final List> signalGenerators @@ -194,8 +191,9 @@ protected final ServiceProvider buildProvider() { .add(healthcheckUpdateSignalGenerator) .addAll(extraRefreshSignals) .build(); - final ServiceProvider serviceProvider = new ServiceProvider<>(service, + final ServiceProvider serviceProvider = new ServiceProvider<>(service, new ServiceNode<>(hostname, port, nodeData), + serializer, usableNodeDataSource, signalGenerators); final Signal startSignal = serviceProvider.getStartSignal(); @@ -215,7 +213,7 @@ protected final ServiceProvider buildProvider() { return serviceProvider; } - public abstract ServiceProvider build(); + public abstract ServiceProvider build(); - protected abstract NodeDataSource dataSource(final Service service); + protected abstract NodeDataSink dataSink(final Service service); } diff --git a/ranger-core/src/main/java/com/flipkart/ranger/core/serviceprovider/ServiceProvider.java b/ranger-core/src/main/java/com/flipkart/ranger/core/serviceprovider/ServiceProvider.java index ddbb161f..2c19997e 100644 --- a/ranger-core/src/main/java/com/flipkart/ranger/core/serviceprovider/ServiceProvider.java +++ b/ranger-core/src/main/java/com/flipkart/ranger/core/serviceprovider/ServiceProvider.java @@ -16,13 +16,10 @@ package com.flipkart.ranger.core.serviceprovider; -import com.flipkart.ranger.core.model.NodeDataSource; -import com.flipkart.ranger.core.model.Serializer; -import com.flipkart.ranger.core.model.Service; +import com.flipkart.ranger.core.model.*; import com.flipkart.ranger.core.signals.ExternalTriggeredSignal; import com.flipkart.ranger.core.signals.Signal; import com.flipkart.ranger.core.healthcheck.HealthcheckResult; -import com.flipkart.ranger.core.model.ServiceNode; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -30,12 +27,12 @@ import java.util.List; @Slf4j -public class ServiceProvider { +public class ServiceProvider> { private final Service service; private final ServiceNode serviceNode; - private final Serializer serializer; - private final NodeDataSource dataSource; + private final S serializer; + private final NodeDataSink dataSink; @Getter private final ExternalTriggeredSignal startSignal = new ExternalTriggeredSignal<>(() -> null, Collections.emptyList()); @Getter @@ -44,19 +41,19 @@ public class ServiceProvider { public ServiceProvider( Service service, ServiceNode serviceNode, - Serializer serializer, - NodeDataSource dataSource, + S serializer, + NodeDataSink dataSink, List> signalGenerators) { this.service = service; this.serviceNode = serviceNode; this.serializer = serializer; - this.dataSource = dataSource; + this.dataSink = dataSink; signalGenerators.forEach(signalGenerator -> signalGenerator.registerConsumer(this::handleHealthUpdate)); } public void start() { startSignal.trigger(); - dataSource.updateState(serializer, serviceNode); + dataSink.updateState(serializer, serviceNode); log.debug("Set initial node data on zookeeper for {}", service.getServiceName()); } @@ -71,7 +68,7 @@ private void handleHealthUpdate(HealthcheckResult result) { } serviceNode.setHealthcheckStatus(result.getStatus()); serviceNode.setLastUpdatedTimeStamp(result.getUpdatedTime()); - dataSource.updateState(serializer, serviceNode); + dataSink.updateState(serializer, serviceNode); log.debug("Updated node with health check result: {}", result); } diff --git a/ranger-core/src/main/java/com/flipkart/ranger/core/util/FinderUtils.java b/ranger-core/src/main/java/com/flipkart/ranger/core/util/FinderUtils.java new file mode 100644 index 00000000..204dd4b5 --- /dev/null +++ b/ranger-core/src/main/java/com/flipkart/ranger/core/util/FinderUtils.java @@ -0,0 +1,45 @@ +package com.flipkart.ranger.core.util; + +import com.flipkart.ranger.core.healthcheck.HealthcheckStatus; +import com.flipkart.ranger.core.model.Service; +import com.flipkart.ranger.core.model.ServiceNode; +import lombok.extern.slf4j.Slf4j; + +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +/** + * + */ +@Slf4j +public class FinderUtils { + + private FinderUtils() {} + + public static List> filterValidNodes( + final Service service, + final Collection> serviceNodes, + long healthcheckZombieCheckThresholdTime) { + return serviceNodes.stream() + .filter(serviceNode -> isValidNode(service, healthcheckZombieCheckThresholdTime, serviceNode)) + .collect(Collectors.toList()); + } + + public static boolean isValidNode( + Service service, + long healthcheckZombieCheckThresholdTime, + ServiceNode serviceNode) { + if(HealthcheckStatus.healthy != serviceNode.getHealthcheckStatus()) { + log.debug("Unhealthy node [{}:{}] found for [{}]", + serviceNode.getHost(), serviceNode.getPort(), service.getServiceName()); + return false; + } + if(serviceNode.getLastUpdatedTimeStamp() < healthcheckZombieCheckThresholdTime) { + log.warn("Zombie node [{}:{}] found for [{}]", + serviceNode.getHost(), serviceNode.getPort(), service.getServiceName()); + return false; + } + return true; + } +} diff --git a/ranger-core/src/test/java/com/flipkart/ranger/core/utils/TestUtils.java b/ranger-core/src/test/java/com/flipkart/ranger/core/utils/TestUtils.java new file mode 100644 index 00000000..cb98cc62 --- /dev/null +++ b/ranger-core/src/test/java/com/flipkart/ranger/core/utils/TestUtils.java @@ -0,0 +1,14 @@ +package com.flipkart.ranger.core.utils; + +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; + +/** + * + */ +public class TestUtils { + public static void sleepForSeconds(int numSeconds) { + await().pollDelay(numSeconds, TimeUnit.SECONDS).until(() -> true); + } +} diff --git a/ranger-http/pom.xml b/ranger-http/pom.xml index 6c32ba75..65c92be9 100644 --- a/ranger-http/pom.xml +++ b/ranger-http/pom.xml @@ -36,8 +36,13 @@ com.github.tomakehurst wiremock 2.25.1 - pom test + + com.flipkart.ranger + ranger-core + ${project.version} + test-jar + \ No newline at end of file diff --git a/ranger-http/src/main/java/com/flipkart/ranger/http/HttpNodeDataSource.java b/ranger-http/src/main/java/com/flipkart/ranger/http/HttpNodeDataSource.java index 261f1ef6..b002f4c0 100644 --- a/ranger-http/src/main/java/com/flipkart/ranger/http/HttpNodeDataSource.java +++ b/ranger-http/src/main/java/com/flipkart/ranger/http/HttpNodeDataSource.java @@ -1,67 +1,42 @@ package com.flipkart.ranger.http; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.flipkart.ranger.core.model.*; +import com.flipkart.ranger.core.model.NodeDataSource; +import com.flipkart.ranger.core.model.Service; +import com.flipkart.ranger.core.model.ServiceNode; import com.flipkart.ranger.core.util.Exceptions; +import com.flipkart.ranger.core.util.FinderUtils; import com.flipkart.ranger.http.config.HttpClientConfig; -import com.flipkart.ranger.http.model.ServiceNodesResponse; +import com.flipkart.ranger.http.serde.HTTPResponseDataDeserializer; import lombok.extern.slf4j.Slf4j; -import lombok.val; -import okhttp3.*; +import okhttp3.HttpUrl; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.ResponseBody; import java.io.IOException; import java.util.List; import java.util.Optional; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * */ @Slf4j -public class HttpNodeDataSource implements NodeDataSource { +public class HttpNodeDataSource> extends HttpNodeDataStoreConnector implements NodeDataSource { - private final Service service; - private final HttpClientConfig config; - private final OkHttpClient httpClient; - private final ObjectMapper mapper; + private final AtomicBoolean firstCall = new AtomicBoolean(false); public HttpNodeDataSource( Service service, final HttpClientConfig config, ObjectMapper mapper) { - this.service = service; - this.httpClient = new OkHttpClient.Builder() - .callTimeout(config.getOperationTimeoutMs() == 0 - ? 3000 - : config.getOperationTimeoutMs(), TimeUnit.MILLISECONDS) - .connectTimeout(config.getConnectionTimeoutMs() == 0 - ? 3000 - : config.getConnectionTimeoutMs(), TimeUnit.MILLISECONDS) - .followRedirects(true) - .connectionPool(new ConnectionPool(1, 30, TimeUnit.SECONDS)) - .build(); - this.config = config; - this.mapper = mapper; - } - - - @Override - public void start() { + super(service, config, mapper); } - @Override - public void ensureConnected() { - - } @Override - public void stop() { - - } - - @Override - public Optional>> refresh(Deserializer deserializer) { + public Optional>> refresh(D deserializer) { final HttpUrl httpUrl = new HttpUrl.Builder() .scheme(config.isSecure() ? "https" @@ -75,23 +50,29 @@ public Optional>> refresh(Deserializer deserializer) { try { final Response response = httpClient.newCall(new Request.Builder() - .url(httpUrl) - .get() - .build()) + .url(httpUrl) + .get() + .build()) .execute(); - if(response.isSuccessful()) { + if (response.isSuccessful()) { final ResponseBody body = response.body(); - if(null == body) { + if (null == body) { log.warn("HTTP call to {} returned empty body", httpUrl.toString()); } else { - val serviceResponse = translateResponse(body.bytes()); - if (serviceResponse.isSuccess()) { - return Optional.of(serviceResponse.getNodes()); + final byte[] bytes; + try { + bytes = body.bytes(); } - else { - log.warn("HTTP call to {} is not successful", httpUrl.toString()); + finally { + if(null != body) { + body.close(); + } } + return Optional.of(FinderUtils.filterValidNodes( + service, + deserializer.deserialize(bytes), + healthcheckZombieCheckThresholdTime(service))); } } else { @@ -104,23 +85,9 @@ public Optional>> refresh(Deserializer deserializer) { throw new IllegalStateException("No data received from server"); } - private int defaultPort() { - return config.isSecure() - ? 443 - : 80; - } - @Override public boolean isActive() { - return httpClient.connectionPool().connectionCount() > 0; - } - - @Override - public void updateState(Serializer serializer, ServiceNode serviceNode) { - throw new UnsupportedOperationException("State update is not supported on HTTP yet."); - } - - protected ServiceNodesResponse translateResponse(final byte data[]) throws IOException { - return mapper.readValue(data, new TypeReference>() {}); +// return httpClient.connectionPool().connectionCount() > 0; + return true; } } diff --git a/ranger-http/src/main/java/com/flipkart/ranger/http/HttpNodeDataStoreConnector.java b/ranger-http/src/main/java/com/flipkart/ranger/http/HttpNodeDataStoreConnector.java new file mode 100644 index 00000000..0c7690a6 --- /dev/null +++ b/ranger-http/src/main/java/com/flipkart/ranger/http/HttpNodeDataStoreConnector.java @@ -0,0 +1,71 @@ +package com.flipkart.ranger.http; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.flipkart.ranger.core.model.NodeDataStoreConnector; +import com.flipkart.ranger.core.model.Service; +import com.flipkart.ranger.http.config.HttpClientConfig; +import lombok.extern.slf4j.Slf4j; +import okhttp3.ConnectionPool; +import okhttp3.OkHttpClient; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * + */ +@Slf4j +public class HttpNodeDataStoreConnector implements NodeDataStoreConnector { + + protected final Service service; + protected final HttpClientConfig config; + protected final OkHttpClient httpClient; + protected final ObjectMapper mapper; + protected final AtomicBoolean firstCall = new AtomicBoolean(false); + + public HttpNodeDataStoreConnector( + Service service, + final HttpClientConfig config, + ObjectMapper mapper) { + this.service = service; + this.httpClient = new OkHttpClient.Builder() + .callTimeout(config.getOperationTimeoutMs() == 0 + ? 3000 + : config.getOperationTimeoutMs(), TimeUnit.MILLISECONDS) + .connectTimeout(config.getConnectionTimeoutMs() == 0 + ? 3000 + : config.getConnectionTimeoutMs(), TimeUnit.MILLISECONDS) + .followRedirects(true) + .connectionPool(new ConnectionPool(1, 30, TimeUnit.SECONDS)) + .build(); + this.config = config; + this.mapper = mapper; + } + + + @Override + public void start() { + } + + @Override + public void ensureConnected() { + + } + + @Override + public void stop() { + + } + + protected int defaultPort() { + return config.isSecure() + ? 443 + : 80; + } + + @Override + public boolean isActive() { +// return httpClient.connectionPool().connectionCount() > 0; + return true; + } +} diff --git a/ranger-http/src/main/java/com/flipkart/ranger/http/HttpShardedServiceFinderBuilder.java b/ranger-http/src/main/java/com/flipkart/ranger/http/HttpShardedServiceFinderBuilder.java index fc37df59..042f56e2 100644 --- a/ranger-http/src/main/java/com/flipkart/ranger/http/HttpShardedServiceFinderBuilder.java +++ b/ranger-http/src/main/java/com/flipkart/ranger/http/HttpShardedServiceFinderBuilder.java @@ -6,15 +6,16 @@ import com.flipkart.ranger.core.model.NodeDataSource; import com.flipkart.ranger.core.model.Service; import com.flipkart.ranger.http.config.HttpClientConfig; +import com.flipkart.ranger.http.serde.HTTPResponseDataDeserializer; /** * */ -public class HttpShardedServiceFinderBuilder extends SimpleShardedServiceFinderBuilder> { +public class HttpShardedServiceFinderBuilder> extends SimpleShardedServiceFinderBuilder, D> { private HttpClientConfig clientConfig; - public HttpShardedServiceFinderBuilder withClientConfig(final HttpClientConfig clientConfig) { + public HttpShardedServiceFinderBuilder withClientConfig(final HttpClientConfig clientConfig) { this.clientConfig = clientConfig; return this; } @@ -25,8 +26,7 @@ public SimpleShardedServiceFinder build() { } @Override - protected NodeDataSource dataSource( - Service service) { + protected NodeDataSource dataSource(Service service) { return new HttpNodeDataSource<>(service, clientConfig, new ObjectMapper()); } } diff --git a/ranger-http/src/main/java/com/flipkart/ranger/http/model/ServiceNodesResponse.java b/ranger-http/src/main/java/com/flipkart/ranger/http/model/ServiceNodesResponse.java index 6b52a250..2de6cc0d 100644 --- a/ranger-http/src/main/java/com/flipkart/ranger/http/model/ServiceNodesResponse.java +++ b/ranger-http/src/main/java/com/flipkart/ranger/http/model/ServiceNodesResponse.java @@ -1,5 +1,6 @@ package com.flipkart.ranger.http.model; +import com.fasterxml.jackson.annotation.JsonProperty; import com.flipkart.ranger.core.model.ServiceNode; import lombok.Builder; import lombok.Data; @@ -17,8 +18,8 @@ public class ServiceNodesResponse { @Builder public ServiceNodesResponse( - boolean success, - @Singular List> nodes) { + @JsonProperty("success") boolean success, + @JsonProperty("nodes") @Singular List> nodes) { this.success = success; this.nodes = nodes; } diff --git a/ranger-http/src/main/java/com/flipkart/ranger/http/serde/HTTPResponseDataDeserializer.java b/ranger-http/src/main/java/com/flipkart/ranger/http/serde/HTTPResponseDataDeserializer.java new file mode 100644 index 00000000..3ece5323 --- /dev/null +++ b/ranger-http/src/main/java/com/flipkart/ranger/http/serde/HTTPResponseDataDeserializer.java @@ -0,0 +1,14 @@ +package com.flipkart.ranger.http.serde; + +import com.flipkart.ranger.core.model.Deserializer; +import com.flipkart.ranger.core.model.ServiceNode; + +import java.util.Collection; + +/** + * + */ +@FunctionalInterface +public interface HTTPResponseDataDeserializer extends Deserializer { + Collection> deserialize(byte []data); +} diff --git a/ranger-http/src/main/java/com/flipkart/ranger/http/serde/JacksonHTTPResponseDataDeserializer.java b/ranger-http/src/main/java/com/flipkart/ranger/http/serde/JacksonHTTPResponseDataDeserializer.java new file mode 100644 index 00000000..64ff3600 --- /dev/null +++ b/ranger-http/src/main/java/com/flipkart/ranger/http/serde/JacksonHTTPResponseDataDeserializer.java @@ -0,0 +1,33 @@ +package com.flipkart.ranger.http.serde; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.flipkart.ranger.core.model.ServiceNode; +import com.flipkart.ranger.http.model.ServiceNodesResponse; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; + +/** + * + */ +public class JacksonHTTPResponseDataDeserializer implements HTTPResponseDataDeserializer { + private final ObjectMapper mapper; + + public JacksonHTTPResponseDataDeserializer(ObjectMapper mapper) { + this.mapper = mapper; + } + + @Override + public Collection> deserialize(byte[] data) { + try { + final ServiceNodesResponse response + = mapper.readValue(data, new TypeReference>() {}); + return response.isSuccess() ? response.getNodes() : Collections.emptyList(); + } + catch (IOException e) { + throw new IllegalArgumentException(e); + } + } +} diff --git a/ranger-http/src/test/java/com/flipkart/ranger/http/HttpShardedServiceFinderBuilderTest.java b/ranger-http/src/test/java/com/flipkart/ranger/http/HttpShardedServiceFinderBuilderTest.java index 9d6cfed6..7415aaa4 100644 --- a/ranger-http/src/test/java/com/flipkart/ranger/http/HttpShardedServiceFinderBuilderTest.java +++ b/ranger-http/src/test/java/com/flipkart/ranger/http/HttpShardedServiceFinderBuilderTest.java @@ -1,15 +1,94 @@ package com.flipkart.ranger.http; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.flipkart.ranger.core.finder.sharded.SimpleShardedServiceFinder; +import com.flipkart.ranger.core.healthcheck.HealthcheckStatus; +import com.flipkart.ranger.core.model.ServiceNode; +import com.flipkart.ranger.core.utils.TestUtils; +import com.flipkart.ranger.http.config.HttpClientConfig; +import com.flipkart.ranger.http.model.ServiceNodesResponse; +import com.flipkart.ranger.http.serde.HTTPResponseDataDeserializer; +import com.github.tomakehurst.wiremock.junit.WireMockRule; +import lombok.Data; +import lombok.val; +import org.junit.Assert; import org.junit.Rule; +import org.junit.Test; -import static org.junit.Assert.*; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.stream.Collectors; + +import static com.github.tomakehurst.wiremock.client.WireMock.*; /** * */ public class HttpShardedServiceFinderBuilderTest { + @Data + private static final class NodeData { + private final String name; + + public NodeData(@JsonProperty("name") String name) { + this.name = name; + } + } + + private static final ObjectMapper MAPPER = new ObjectMapper(); + @Rule - private WireMockRule + public WireMockRule server = new WireMockRule(8888); + + @Test + public void testFinder() throws Exception { + final NodeData testNode = new NodeData("testNode"); + final ServiceNode node = new ServiceNode<>("127.0.0.1", 80, testNode); + node.setHealthcheckStatus(HealthcheckStatus.healthy); + node.setLastUpdatedTimeStamp(System.currentTimeMillis()); + val payload = MAPPER.writeValueAsBytes( + ServiceNodesResponse.builder() + .node(node) + .success(true) + .build()); + server.stubFor(get(urlEqualTo("/ranger/nodes/v1/testns/test")) + .willReturn(aResponse() + .withBody(payload) + .withStatus(200))); + val clientConfig = HttpClientConfig.builder() + .host("127.0.0.1") + .port(server.port()) + .connectionTimeoutMs(30_000) + .operationTimeoutMs(30_000) + .build(); + final SimpleShardedServiceFinder finder = new HttpShardedServiceFinderBuilder>() + .withClientConfig(clientConfig) + .withNamespace("testns") + .withServiceName("test") + .withDeserializer(data -> { + final ServiceNodesResponse response; + try { + response = MAPPER.readValue(data, new TypeReference>() {}); + } + catch (IOException e) { + throw new IllegalArgumentException(e); + } + return response.isSuccess() ? response.getNodes() : Collections.emptyList(); + }) + .withShardSelector((criteria, registry) -> registry + .nodes() + .entries() + .stream() + .filter(e -> e.getKey().getName().equals(criteria.getName())) + .map(Map.Entry::getValue) + .collect(Collectors.toList())) + .build(); + finder.start(); + TestUtils.sleepForSeconds(3); + Assert.assertNotNull(finder.get(testNode)); + } } \ No newline at end of file diff --git a/ranger-http/src/test/resources/log4j.properties b/ranger-http/src/test/resources/log4j.properties new file mode 100644 index 00000000..a20f35db --- /dev/null +++ b/ranger-http/src/test/resources/log4j.properties @@ -0,0 +1,16 @@ +# Root logger option +log4j.rootLogger=FATAL, ranger + +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p %c{1}:%L - %m%n + +log4j.appender.ranger=org.apache.log4j.ConsoleAppender +log4j.appender.ranger.Target=System.out +log4j.appender.ranger.layout=org.apache.log4j.PatternLayout +log4j.appender.ranger.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p %c{1}:%L - %m%n + + +log4j.logger.com.flipkart=DEBUG \ No newline at end of file diff --git a/ranger-zookeeper/pom.xml b/ranger-zookeeper/pom.xml index 5d8b7fa9..1bcb3f28 100644 --- a/ranger-zookeeper/pom.xml +++ b/ranger-zookeeper/pom.xml @@ -36,7 +36,12 @@ ${curator.version} test - + + com.flipkart.ranger + ranger-core + ${project.version} + test-jar + \ No newline at end of file diff --git a/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/serde/ZkNodeDataDeserializer.java b/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/serde/ZkNodeDataDeserializer.java new file mode 100644 index 00000000..11b22821 --- /dev/null +++ b/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/serde/ZkNodeDataDeserializer.java @@ -0,0 +1,12 @@ +package com.flipkart.ranger.zookeeper.serde; + +import com.flipkart.ranger.core.model.Deserializer; +import com.flipkart.ranger.core.model.ServiceNode; + +/** + * + */ +@FunctionalInterface +public interface ZkNodeDataDeserializer extends Deserializer { + ServiceNode deserialize(final byte[] data); +} diff --git a/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/serde/ZkNodeDataSerializer.java b/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/serde/ZkNodeDataSerializer.java new file mode 100644 index 00000000..182f001b --- /dev/null +++ b/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/serde/ZkNodeDataSerializer.java @@ -0,0 +1,12 @@ +package com.flipkart.ranger.zookeeper.serde; + +import com.flipkart.ranger.core.model.Serializer; +import com.flipkart.ranger.core.model.ServiceNode; + +/** + * + */ +@FunctionalInterface +public interface ZkNodeDataSerializer extends Serializer { + byte[] serialize(final ServiceNode node); +} diff --git a/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkNodeDataSink.java b/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkNodeDataSink.java new file mode 100644 index 00000000..cc39913a --- /dev/null +++ b/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkNodeDataSink.java @@ -0,0 +1,76 @@ +package com.flipkart.ranger.zookeeper.zk; + +import com.flipkart.ranger.core.model.NodeDataSink; +import com.flipkart.ranger.core.model.Service; +import com.flipkart.ranger.core.model.ServiceNode; +import com.flipkart.ranger.core.util.Exceptions; +import com.flipkart.ranger.zookeeper.serde.ZkNodeDataSerializer; +import com.flipkart.ranger.zookeeper.util.PathBuilder; +import com.google.common.base.Preconditions; +import lombok.extern.slf4j.Slf4j; +import org.apache.curator.framework.CuratorFramework; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; + +/** + * + */ +@Slf4j +public class ZkNodeDataSink> extends ZkNodeDataStoreConnector implements NodeDataSink { + public ZkNodeDataSink( + Service service, + CuratorFramework curatorFramework) { + super(service, curatorFramework); + } + + @Override + public void updateState(S serializer, ServiceNode serviceNode) { + if (stopped.get()) { + log.warn("Node has been stopped already for service: {}. No update will be possible.", + service.getServiceName()); + return; + } + Preconditions.checkNotNull(serializer, "Serializer has not been set for node data"); + final String path = PathBuilder.instancePath(service, serviceNode); + try { + if (null == curatorFramework.checkExists().forPath(path)) { + log.info("No node exists for path: {}. Will create now.", path); + createPath(serviceNode, serializer); + } + else { + curatorFramework.setData().forPath(path, serializer.serialize(serviceNode)); + } + } + catch (Exception e) { + log.error("Error updating node data at path " + path, e); + Exceptions.illegalState(e); + } + } + + private synchronized void createPath( + ServiceNode serviceNode, + S serializer) { + final String instancePath = PathBuilder.instancePath(service, serviceNode); + try { + if (null == curatorFramework.checkExists().forPath(instancePath)) { + curatorFramework.create() + .creatingParentContainersIfNeeded() + .withMode(CreateMode.EPHEMERAL) + .forPath(instancePath, serializer.serialize(serviceNode)); + log.info("Created instance path: {}", instancePath); + } + } + catch (KeeperException.NodeExistsException e) { + log.warn("Node already exists.. Race condition?", e); + } + catch (Exception e) { + final String message = String.format( + "Could not create node for %s after 60 retries (1 min). " + + "This service will not be discoverable. Retry after some time.", service.getServiceName()); + log.error(message, e); + Exceptions.illegalState(message, e); + } + } + + +} diff --git a/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkNodeDataSource.java b/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkNodeDataSource.java index 90dd7d17..da78a2fc 100644 --- a/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkNodeDataSource.java +++ b/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkNodeDataSource.java @@ -2,169 +2,37 @@ import com.flipkart.ranger.core.model.NodeDataSource; import com.flipkart.ranger.core.model.Service; -import com.flipkart.ranger.core.healthcheck.HealthcheckStatus; -import com.flipkart.ranger.core.model.Deserializer; -import com.flipkart.ranger.zookeeper.util.PathBuilder; -import com.flipkart.ranger.core.model.Serializer; import com.flipkart.ranger.core.model.ServiceNode; -import com.flipkart.ranger.core.util.Exceptions; -import com.github.rholder.retry.*; +import com.flipkart.ranger.core.util.FinderUtils; +import com.flipkart.ranger.zookeeper.serde.ZkNodeDataDeserializer; +import com.flipkart.ranger.zookeeper.util.PathBuilder; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.imps.CuratorFrameworkState; -import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import java.util.List; import java.util.Optional; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; /** * */ @Slf4j -public class ZkNodeDataSource implements NodeDataSource { - - private final Service service; - private final CuratorFramework curatorFramework; - private final Retryer discoveryRetrier = RetryerBuilder.newBuilder() - .retryIfException(e -> IllegalStateException.class.isAssignableFrom(e.getClass())) - .retryIfResult(aBoolean -> false) - .withAttemptTimeLimiter(AttemptTimeLimiters.noTimeLimit()) - .withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS)) - .withBlockStrategy(BlockStrategies.threadSleepStrategy()) - .withRetryListener(new RetryListener() { - @Override - public void onRetry(Attempt attempt) { - log.debug("Retrying with attempt: {}", attempt); - } - }) - .build(); - private final AtomicBoolean started = new AtomicBoolean(false); - private final AtomicBoolean stopped = new AtomicBoolean(false); +public class ZkNodeDataSource> extends ZkNodeDataStoreConnector implements NodeDataSource { public ZkNodeDataSource( - final Service service, - final CuratorFramework curatorFramework) { - this.service = service; - this.curatorFramework = curatorFramework; + Service service, + CuratorFramework curatorFramework) { + super(service, curatorFramework); } @Override - public void start() { - if (started.get()) { - log.info("Start called on already initialized data source for service {}. Ignoring.", - service.getServiceName()); - return; - } - final String path = PathBuilder.servicePath(service); - try { - curatorFramework.blockUntilConnected(); - log.info("Connected to zookeeper cluster for {}", service.getServiceName()); - curatorFramework - .create() - .creatingParentContainersIfNeeded() - .forPath(path); - } - catch (KeeperException e) { - if (e.code() == KeeperException.Code.NODEEXISTS) { - log.info("Service node {} already exists for service: {}", path, service.getServiceName()); - } - } - catch (Exception e) { - Exceptions.illegalState("Could not start ZK data source for service: " + service.getServiceName(), e); - } - started.set(true); - } - - @Override - public void ensureConnected() { - try { - discoveryRetrier.call(this::isActive); - } - catch (Exception e) { - Exceptions.illegalState("Could not get zk connection", e); - } - } - - @Override - public void stop() { - if (stopped.get()) { - log.info("Data source has already been stopped for: {}. Ignoring.", service.getServiceName()); - return; - } - if (!started.get()) { - log.warn("Shutdown called for service: {}, but data source is not started.", service.getServiceName()); - } - log.info("Shutting down data source for service: {}. (It's a no-op.)", - service.getServiceName()); - stopped.set(true); - } - - @Override - public Optional>> refresh(Deserializer deserializer) { + public Optional>> refresh(D deserializer) { return checkForUpdateOnZookeeper(deserializer); } - @Override - public boolean isActive() { - return curatorFramework != null - && (curatorFramework.getState() == CuratorFrameworkState.STARTED); - } - - @Override - public void updateState(Serializer serializer, ServiceNode serviceNode) { - if (stopped.get()) { - log.warn("Node has been stopped already for service: {}. No update will be possible.", - service.getServiceName()); - return; - } - Preconditions.checkNotNull(serializer, "Serializer has not been set for node data"); - final String path = PathBuilder.instancePath(service, serviceNode); - try { - if (null == curatorFramework.checkExists().forPath(path)) { - log.info("No node exists for path: {}. Will create now.", path); - createPath(serviceNode, serializer); - } - else { - curatorFramework.setData().forPath(path, serializer.serialize(serviceNode)); - } - } - catch (Exception e) { - log.error("Error updating node data at path " + path, e); - Exceptions.illegalState(e); - } - } - - private synchronized void createPath( - ServiceNode serviceNode, - Serializer serializer) { - final String instancePath = PathBuilder.instancePath(service, serviceNode); - try { - if (null == curatorFramework.checkExists().forPath(instancePath)) { - curatorFramework.create() - .creatingParentContainersIfNeeded() - .withMode(CreateMode.EPHEMERAL) - .forPath(instancePath, serializer.serialize(serviceNode)); - log.info("Created instance path: {}", instancePath); - } - } - catch (KeeperException.NodeExistsException e) { - log.warn("Node already exists.. Race condition?", e); - } - catch (Exception e) { - final String message = String.format( - "Could not create node for %s after 60 retries (1 min). " + - "This service will not be discoverable. Retry after some time.", service.getServiceName()); - log.error(message, e); - Exceptions.illegalState(message, e); - } - } - - private Optional>> checkForUpdateOnZookeeper(Deserializer deserializer) { + private Optional>> checkForUpdateOnZookeeper(D deserializer) { if (!started.get()) { log.warn("Data source is not yet started for service: {}. No nodes will be returned.", service.getServiceName()); @@ -177,7 +45,7 @@ private Optional>> checkForUpdateOnZookeeper(Deserializer } Preconditions.checkNotNull(deserializer, "Deserializer has not been set for node data"); try { - final long healthcheckZombieCheckThresholdTime = System.currentTimeMillis() - 60000; //1 Minute + final long healthcheckZombieCheckThresholdTime = healthcheckZombieCheckThresholdTime(service); //1 Minute final String serviceName = service.getServiceName(); if (!isActive()) { log.warn("ZK connection is not active. Ignoring refresh request for service: {}", @@ -211,14 +79,9 @@ private Optional>> checkForUpdateOnZookeeper(Deserializer log.debug("Skipping node: {}", path); continue; } - final ServiceNode key = deserializer.deserialize(data); - if (HealthcheckStatus.healthy == key.getHealthcheckStatus()) { - if (key.getLastUpdatedTimeStamp() > healthcheckZombieCheckThresholdTime) { - nodes.add(key); - } - else { - log.warn("Zombie node [{}:{}] found for [{}]", key.getHost(), key.getPort(), serviceName); - } + final ServiceNode node = deserializer.deserialize(data); + if(FinderUtils.isValidNode(service, healthcheckZombieCheckThresholdTime, node)) { + nodes.add(node); } } return Optional.of(nodes); @@ -228,4 +91,5 @@ private Optional>> checkForUpdateOnZookeeper(Deserializer } return Optional.empty(); } + } diff --git a/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkNodeDataStoreConnector.java b/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkNodeDataStoreConnector.java new file mode 100644 index 00000000..14cc6228 --- /dev/null +++ b/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkNodeDataStoreConnector.java @@ -0,0 +1,103 @@ +package com.flipkart.ranger.zookeeper.zk; + +import com.flipkart.ranger.core.model.NodeDataStoreConnector; +import com.flipkart.ranger.core.model.Service; +import com.flipkart.ranger.core.util.Exceptions; +import com.flipkart.ranger.zookeeper.util.PathBuilder; +import com.github.rholder.retry.*; +import lombok.extern.slf4j.Slf4j; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.zookeeper.KeeperException; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * + */ +@Slf4j +public class ZkNodeDataStoreConnector implements NodeDataStoreConnector { + + protected final Service service; + protected final CuratorFramework curatorFramework; + protected final Retryer discoveryRetrier = RetryerBuilder.newBuilder() + .retryIfException(e -> IllegalStateException.class.isAssignableFrom(e.getClass())) + .retryIfResult(aBoolean -> false) + .withAttemptTimeLimiter(AttemptTimeLimiters.noTimeLimit()) + .withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS)) + .withBlockStrategy(BlockStrategies.threadSleepStrategy()) + .withRetryListener(new RetryListener() { + @Override + public void onRetry(Attempt attempt) { + log.debug("Retrying with attempt: {}", attempt); + } + }) + .build(); + protected final AtomicBoolean started = new AtomicBoolean(false); + protected final AtomicBoolean stopped = new AtomicBoolean(false); + + protected ZkNodeDataStoreConnector( + final Service service, + final CuratorFramework curatorFramework) { + this.service = service; + this.curatorFramework = curatorFramework; + } + + @Override + public void start() { + if (started.get()) { + log.info("Start called on already initialized data source for service {}. Ignoring.", + service.getServiceName()); + return; + } + final String path = PathBuilder.servicePath(service); + try { + curatorFramework.blockUntilConnected(); + log.info("Connected to zookeeper cluster for {}", service.getServiceName()); + curatorFramework + .create() + .creatingParentContainersIfNeeded() + .forPath(path); + } + catch (KeeperException e) { + if (e.code() == KeeperException.Code.NODEEXISTS) { + log.info("Service node {} already exists for service: {}", path, service.getServiceName()); + } + } + catch (Exception e) { + Exceptions.illegalState("Could not start ZK data source for service: " + service.getServiceName(), e); + } + started.set(true); + } + + @Override + public void ensureConnected() { + try { + discoveryRetrier.call(this::isActive); + } + catch (Exception e) { + Exceptions.illegalState("Could not get zk connection", e); + } + } + + @Override + public void stop() { + if (stopped.get()) { + log.info("Data source has already been stopped for: {}. Ignoring.", service.getServiceName()); + return; + } + if (!started.get()) { + log.warn("Shutdown called for service: {}, but data source is not started.", service.getServiceName()); + } + log.info("Shutting down data source for service: {}. (It's a no-op.)", + service.getServiceName()); + stopped.set(true); + } + + @Override + public boolean isActive() { + return curatorFramework != null + && (curatorFramework.getState() == CuratorFrameworkState.STARTED); + } +} diff --git a/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkServiceProviderBuilder.java b/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkServiceProviderBuilder.java index 06fbfc35..cb6a47f8 100644 --- a/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkServiceProviderBuilder.java +++ b/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkServiceProviderBuilder.java @@ -1,9 +1,10 @@ package com.flipkart.ranger.zookeeper.zk; -import com.flipkart.ranger.core.model.NodeDataSource; +import com.flipkart.ranger.core.model.NodeDataSink; import com.flipkart.ranger.core.model.Service; import com.flipkart.ranger.core.serviceprovider.BaseServiceProviderBuilder; import com.flipkart.ranger.core.serviceprovider.ServiceProvider; +import com.flipkart.ranger.zookeeper.serde.ZkNodeDataSerializer; import com.google.common.base.Preconditions; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; @@ -14,7 +15,7 @@ * */ @Slf4j -public class ZkServiceProviderBuilder extends BaseServiceProviderBuilder> { +public class ZkServiceProviderBuilder extends BaseServiceProviderBuilder, ZkNodeDataSerializer> { private CuratorFramework curatorFramework; private String connectionString; @@ -30,7 +31,7 @@ public ZkServiceProviderBuilder withConnectionString(final String connectionS } @Override - public ServiceProvider build() { + public ServiceProvider> build() { if (null == curatorFramework) { Preconditions.checkNotNull(connectionString); log.info("Building custom curator framework"); @@ -45,7 +46,7 @@ public ServiceProvider build() { } @Override - protected NodeDataSource dataSource(Service service) { - return new ZkNodeDataSource<>(service, curatorFramework); + protected NodeDataSink> dataSink(Service service) { + return new ZkNodeDataSink<>(service, curatorFramework); } } diff --git a/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkShardedServiceFinderFactory.java b/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkShardedServiceFinderFactory.java index 0bf5547c..f6feca8d 100644 --- a/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkShardedServiceFinderFactory.java +++ b/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkShardedServiceFinderFactory.java @@ -3,10 +3,10 @@ import com.flipkart.ranger.core.finder.sharded.MapBasedServiceRegistry; import com.flipkart.ranger.core.finder.sharded.SimpleShardedServiceFinder; import com.flipkart.ranger.core.finderhub.ServiceFinderFactory; -import com.flipkart.ranger.core.model.Deserializer; import com.flipkart.ranger.core.model.Service; import com.flipkart.ranger.core.model.ServiceNodeSelector; import com.flipkart.ranger.core.model.ShardSelector; +import com.flipkart.ranger.zookeeper.serde.ZkNodeDataDeserializer; import lombok.Builder; import lombok.val; import org.apache.curator.framework.CuratorFramework; @@ -19,7 +19,7 @@ public class ZkShardedServiceFinderFactory implements ServiceFinderFactory deserializer; + private final ZkNodeDataDeserializer deserializer; private final ShardSelector> shardSelector; private final ServiceNodeSelector nodeSelector; @@ -29,7 +29,7 @@ public ZkShardedServiceFinderFactory( String connectionString, int nodeRefreshIntervalMs, boolean disablePushUpdaters, - Deserializer deserializer, + ZkNodeDataDeserializer deserializer, ShardSelector> shardSelector, ServiceNodeSelector nodeSelector) { this.curatorFramework = curatorFramework; diff --git a/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkSimpleShardedServiceFinderBuilder.java b/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkSimpleShardedServiceFinderBuilder.java index 5cf00b04..bfb6345d 100644 --- a/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkSimpleShardedServiceFinderBuilder.java +++ b/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkSimpleShardedServiceFinderBuilder.java @@ -5,6 +5,7 @@ import com.flipkart.ranger.core.model.NodeDataSource; import com.flipkart.ranger.core.model.Service; import com.flipkart.ranger.core.signals.Signal; +import com.flipkart.ranger.zookeeper.serde.ZkNodeDataDeserializer; import com.google.common.base.Preconditions; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; @@ -18,7 +19,7 @@ * */ @Slf4j -public class ZkSimpleShardedServiceFinderBuilder extends SimpleShardedServiceFinderBuilder> { +public class ZkSimpleShardedServiceFinderBuilder extends SimpleShardedServiceFinderBuilder, ZkNodeDataDeserializer> { protected CuratorFramework curatorFramework; protected String connectionString; @@ -49,13 +50,12 @@ public SimpleShardedServiceFinder build() { } @Override - protected NodeDataSource dataSource( - Service service) { + protected NodeDataSource> dataSource(Service service) { return new ZkNodeDataSource<>(service, curatorFramework); } @Override - protected List> implementationSpecificRefreshSignals(final Service service, final NodeDataSource nodeDataSource) { + protected List> implementationSpecificRefreshSignals(final Service service, final NodeDataSource> nodeDataSource) { if (!disablePushUpdaters) { return Collections.singletonList( new ZkWatcherRegistryUpdateSignal<>(service, nodeDataSource, curatorFramework)); diff --git a/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkUnshardedFinderBuilder.java b/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkUnshardedFinderBuilder.java index badabc7e..1e0259d3 100644 --- a/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkUnshardedFinderBuilder.java +++ b/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkUnshardedFinderBuilder.java @@ -1,11 +1,12 @@ package com.flipkart.ranger.zookeeper.zk; -import com.flipkart.ranger.core.model.NodeDataSource; -import com.flipkart.ranger.core.model.Service; import com.flipkart.ranger.core.finder.unsharded.UnshardedClusterFinder; import com.flipkart.ranger.core.finder.unsharded.UnshardedClusterInfo; import com.flipkart.ranger.core.finder.unsharded.UnshardedFinderBuilder; +import com.flipkart.ranger.core.model.NodeDataSource; +import com.flipkart.ranger.core.model.Service; import com.flipkart.ranger.core.signals.Signal; +import com.flipkart.ranger.zookeeper.serde.ZkNodeDataDeserializer; import com.google.common.base.Preconditions; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; @@ -19,7 +20,7 @@ * */ @Slf4j -public class ZkUnshardedFinderBuilder extends UnshardedFinderBuilder { +public class ZkUnshardedFinderBuilder extends UnshardedFinderBuilder> { private CuratorFramework curatorFramework; private String connectionString; @@ -49,7 +50,7 @@ public UnshardedClusterFinder build() { } @Override - protected NodeDataSource dataSource( + protected NodeDataSource> dataSource( Service service) { return new ZkNodeDataSource<>(service, curatorFramework); } @@ -57,7 +58,7 @@ protected NodeDataSource dataSource( @Override protected List> implementationSpecificRefreshSignals( - final Service service, final NodeDataSource nodeDataSource) { + final Service service, final NodeDataSource> nodeDataSource) { if (!disablePushUpdaters) { return Collections.singletonList( new ZkWatcherRegistryUpdateSignal<>(service, nodeDataSource, curatorFramework)); diff --git a/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkWatcherRegistryUpdateSignal.java b/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkWatcherRegistryUpdateSignal.java index 2253922b..e4d9c3dc 100644 --- a/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkWatcherRegistryUpdateSignal.java +++ b/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkWatcherRegistryUpdateSignal.java @@ -1,6 +1,6 @@ package com.flipkart.ranger.zookeeper.zk; -import com.flipkart.ranger.core.model.NodeDataSource; +import com.flipkart.ranger.core.model.NodeDataStoreConnector; import com.flipkart.ranger.core.model.Service; import com.flipkart.ranger.zookeeper.util.PathBuilder; import com.flipkart.ranger.core.signals.Signal; @@ -21,12 +21,12 @@ @ToString(callSuper = true) public class ZkWatcherRegistryUpdateSignal extends Signal { private final Service service; - private final NodeDataSource dataSource; + private final NodeDataStoreConnector dataSource; private final CuratorFramework curatorFramework; public ZkWatcherRegistryUpdateSignal( Service service, - NodeDataSource dataSource, + NodeDataStoreConnector dataSource, CuratorFramework curatorFramework) { super(() -> null, Collections.emptyList()); this.service = service; diff --git a/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/healthservice/ServiceHealthAggregatorTest.java b/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/healthservice/ServiceHealthAggregatorTest.java index 55167e67..6f52e2eb 100644 --- a/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/healthservice/ServiceHealthAggregatorTest.java +++ b/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/healthservice/ServiceHealthAggregatorTest.java @@ -21,6 +21,7 @@ import com.flipkart.ranger.core.healthservice.TimeEntity; import com.flipkart.ranger.core.healthservice.monitor.IsolatedHealthMonitor; import com.flipkart.ranger.core.healthservice.monitor.Monitor; +import com.flipkart.ranger.core.utils.TestUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -48,7 +49,7 @@ public boolean isDisabled() { }); serviceHealthAggregator.start(); - Thread.sleep(1000); + TestUtils.sleepForSeconds(1); } @After @@ -62,7 +63,7 @@ public void testStaleRun() throws Exception { testMonitor.run(); testMonitor.setThreadSleep(2000); - Thread.sleep(4000); + TestUtils.sleepForSeconds(4); /* in the TestMonitor, thread was sleeping for 2 seconds, */ /* so its state is supposed to be stale (>1 second) and service has to be unhealthy */ @@ -70,7 +71,7 @@ public void testStaleRun() throws Exception { testMonitor.setThreadSleep(10); - Thread.sleep(4000); + TestUtils.sleepForSeconds(4); /* in the TestMonitor, thread is sleeping only for 10 milliseconds, */ /* so its state is supposed to be NOT stale (>1 second) and service has to be healthy */ diff --git a/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/healthservice/ServiceProviderIntegrationTest.java b/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/healthservice/ServiceProviderIntegrationTest.java index b0cdef98..64ed9401 100644 --- a/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/healthservice/ServiceProviderIntegrationTest.java +++ b/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/healthservice/ServiceProviderIntegrationTest.java @@ -19,17 +19,17 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.flipkart.ranger.zookeeper.ServiceFinderBuilders; -import com.flipkart.ranger.zookeeper.ServiceProviderBuilders; import com.flipkart.ranger.core.finder.unsharded.UnshardedClusterFinder; import com.flipkart.ranger.core.finder.unsharded.UnshardedClusterInfo; import com.flipkart.ranger.core.healthcheck.Healthchecks; import com.flipkart.ranger.core.healthservice.TimeEntity; import com.flipkart.ranger.core.healthservice.monitor.sample.RotationStatusMonitor; -import com.flipkart.ranger.core.model.Serializer; import com.flipkart.ranger.core.model.ServiceNode; -import com.flipkart.ranger.core.serviceprovider.ServiceProvider; import com.flipkart.ranger.core.util.Exceptions; +import com.flipkart.ranger.core.utils.TestUtils; +import com.flipkart.ranger.zookeeper.ServiceFinderBuilders; +import com.flipkart.ranger.zookeeper.ServiceProviderBuilders; +import lombok.val; import org.apache.curator.test.TestingCluster; import org.junit.After; import org.junit.Assert; @@ -87,7 +87,7 @@ public void stopTestCluster() throws Exception { testingCluster.close(); } serviceFinder.stop(); - Thread.sleep(1000); + TestUtils.sleepForSeconds(1); } @Test @@ -100,7 +100,7 @@ public void testBasicDiscovery() throws Exception { /* with file existing, 3 nodes should be healthy */ boolean filecreate = file.createNewFile(); System.out.println("created file"); - Thread.sleep(8000); + TestUtils.sleepForSeconds(8); List> all = serviceFinder.getAll(null); System.out.println("all = " + all); Assert.assertEquals(3, all.size()); @@ -108,7 +108,7 @@ public void testBasicDiscovery() throws Exception { /* with file deleted, all 3 nodes should be unhealthy */ delete = file.delete(); System.out.println("deleted file"); - Thread.sleep(8000); + TestUtils.sleepForSeconds(8); all = serviceFinder.getAll(null); System.out.println("all = " + all); Assert.assertEquals(0, all.size()); @@ -116,7 +116,7 @@ public void testBasicDiscovery() throws Exception { /* with anotherFile created, the 4th node should become healthy and discoverable */ filecreate = anotherFile.createNewFile(); System.out.println("created anotherFile"); - Thread.sleep(6000); + TestUtils.sleepForSeconds(6); all = serviceFinder.getAll(null); System.out.println("all = " + all); Assert.assertEquals(1, all.size()); @@ -127,20 +127,17 @@ public void testBasicDiscovery() throws Exception { } private void registerService(String host, int port, int shardId, File file) throws Exception { - ServiceProvider serviceProvider = ServiceProviderBuilders.unshardedServiceProviderBuilder() + val serviceProvider = ServiceProviderBuilders.unshardedServiceProviderBuilder() .withConnectionString(testingCluster.getConnectString()) .withNamespace("test") .withServiceName("test-service") - .withSerializer(new Serializer() { - @Override - public byte[] serialize(ServiceNode data) { - try { - return objectMapper.writeValueAsBytes(data); - } catch (JsonProcessingException e) { - e.printStackTrace(); - } - return null; + .withSerializer(data -> { + try { + return objectMapper.writeValueAsBytes(data); + } catch (JsonProcessingException e) { + e.printStackTrace(); } + return null; }) .withHostname(host) .withPort(port) diff --git a/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/model/CustomShardSelectorTest.java b/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/model/CustomShardSelectorTest.java index d0b46801..36587d52 100644 --- a/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/model/CustomShardSelectorTest.java +++ b/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/model/CustomShardSelectorTest.java @@ -19,16 +19,15 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.flipkart.ranger.zookeeper.ServiceFinderBuilders; -import com.flipkart.ranger.zookeeper.ServiceProviderBuilders; import com.flipkart.ranger.core.finder.sharded.MapBasedServiceRegistry; import com.flipkart.ranger.core.finder.sharded.SimpleShardedServiceFinder; import com.flipkart.ranger.core.healthcheck.Healthchecks; -import com.flipkart.ranger.core.model.Deserializer; -import com.flipkart.ranger.core.model.Serializer; import com.flipkart.ranger.core.model.ServiceNode; import com.flipkart.ranger.core.model.ShardSelector; import com.flipkart.ranger.core.serviceprovider.ServiceProvider; +import com.flipkart.ranger.zookeeper.ServiceFinderBuilders; +import com.flipkart.ranger.zookeeper.ServiceProviderBuilders; +import com.flipkart.ranger.zookeeper.serde.ZkNodeDataSerializer; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.apache.curator.test.TestingCluster; @@ -45,7 +44,7 @@ public class CustomShardSelectorTest { private TestingCluster testingCluster; private ObjectMapper objectMapper; - private List> serviceProviders = Lists.newArrayList(); + private List>> serviceProviders = Lists.newArrayList(); @Before public void startTestCluster() throws Exception { @@ -59,7 +58,7 @@ public void startTestCluster() throws Exception { @After public void stopTestCluster() throws Exception { - for(ServiceProvider serviceProvider : serviceProviders) { + for(ServiceProvider> serviceProvider : serviceProviders) { serviceProvider.stop(); } if(null != testingCluster) { @@ -128,6 +127,7 @@ public List> nodes(TestShardInfo criteria, MapBasedSe return nodes; } } + @Test public void testBasicDiscovery() throws Exception { SimpleShardedServiceFinder serviceFinder = ServiceFinderBuilders.shardedFinderBuilder() @@ -135,18 +135,14 @@ public void testBasicDiscovery() throws Exception { .withNamespace("test") .withServiceName("test-service") .withShardSelector(new TestShardSelector()) - .withDeserializer(new Deserializer() { - @Override - public ServiceNode deserialize(byte[] data) { - try { - return objectMapper.readValue(data, - new TypeReference>() { - }); - } catch (IOException e) { - e.printStackTrace(); - } - return null; + .withDeserializer(data -> { + try { + return objectMapper.readValue(data, + new TypeReference>() {}); + } catch (IOException e) { + e.printStackTrace(); } + return null; }) .build(); serviceFinder.start(); @@ -163,20 +159,17 @@ public ServiceNode deserialize(byte[] data) { } private void registerService(String host, int port, int a, int b) throws Exception { - final ServiceProvider serviceProvider = ServiceProviderBuilders.shardedServiceProviderBuilder() + final ServiceProvider> serviceProvider = ServiceProviderBuilders.shardedServiceProviderBuilder() .withConnectionString(testingCluster.getConnectString()) .withNamespace("test") .withServiceName("test-service") - .withSerializer(new Serializer() { - @Override - public byte[] serialize(ServiceNode data) { - try { - return objectMapper.writeValueAsBytes(data); - } catch (JsonProcessingException e) { - e.printStackTrace(); - } - return null; + .withSerializer(data -> { + try { + return objectMapper.writeValueAsBytes(data); + } catch (JsonProcessingException e) { + e.printStackTrace(); } + return null; }) .withHostname(host) .withPort(port) diff --git a/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/model/ServiceNoProviderTest.java b/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/model/ServiceNoProviderTest.java index 3e6c373b..0bb56679 100644 --- a/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/model/ServiceNoProviderTest.java +++ b/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/model/ServiceNoProviderTest.java @@ -1,12 +1,12 @@ /** * Copyright 2015 Flipkart Internet Pvt. Ltd. - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,11 +18,11 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.flipkart.ranger.zookeeper.ServiceFinderBuilders; import com.flipkart.ranger.core.finder.RoundRobinServiceNodeSelector; import com.flipkart.ranger.core.finder.sharded.SimpleShardedServiceFinder; -import com.flipkart.ranger.core.model.Deserializer; import com.flipkart.ranger.core.model.ServiceNode; +import com.flipkart.ranger.zookeeper.ServiceFinderBuilders; +import lombok.val; import org.apache.curator.test.TestingCluster; import org.junit.After; import org.junit.Assert; @@ -48,7 +48,7 @@ public void startTestCluster() throws Exception { @After public void stopTestCluster() throws Exception { - if(null != testingCluster) { + if (null != testingCluster) { testingCluster.close(); } } @@ -73,12 +73,18 @@ public void setShardId(int shardId) { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } TestShardInfo that = (TestShardInfo) o; - if (shardId != that.shardId) return false; + if (shardId != that.shardId) { + return false; + } return true; } @@ -95,18 +101,16 @@ public void testBasicDiscovery() throws Exception { .withConnectionString(testingCluster.getConnectString()) .withNamespace("test") .withServiceName("test-service") - .withDeserializer(new Deserializer() { - @Override - public ServiceNode deserialize(byte[] data) { - try { - return objectMapper.readValue(data, - new TypeReference>() { - }); - } catch (IOException e) { - e.printStackTrace(); - } - return null; + .withDeserializer(data -> { + try { + return objectMapper.readValue(data, + new TypeReference>() { + }); + } + catch (IOException e) { + e.printStackTrace(); } + return null; }) .build(); serviceFinder.start(); @@ -118,25 +122,23 @@ public ServiceNode deserialize(byte[] data) { @Test public void testBasicDiscoveryRR() throws Exception { - SimpleShardedServiceFinder serviceFinder = ServiceFinderBuilders.shardedFinderBuilder() - .withConnectionString(testingCluster.getConnectString()) - .withNamespace("test") - .withServiceName("test-service") - .withNodeSelector(new RoundRobinServiceNodeSelector()) - .withDeserializer(new Deserializer() { - @Override - public ServiceNode deserialize(byte[] data) { - try { - return objectMapper.readValue(data, - new TypeReference>() { - }); - } catch (IOException e) { - e.printStackTrace(); - } - return null; - } - }) - .build(); + val serviceFinder = ServiceFinderBuilders.shardedFinderBuilder() + .withConnectionString(testingCluster.getConnectString()) + .withNamespace("test") + .withServiceName("test-service") + .withNodeSelector(new RoundRobinServiceNodeSelector<>()) + .withDeserializer(data -> { + try { + return objectMapper.readValue(data, + new TypeReference>() { + }); + } + catch (IOException e) { + e.printStackTrace(); + } + return null; + }) + .build(); serviceFinder.start(); ServiceNode node = serviceFinder.get(new TestShardInfo(1)); Assert.assertNull(node); diff --git a/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/model/ServiceProviderExtCuratorTest.java b/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/model/ServiceProviderExtCuratorTest.java index f46d1d32..5c8b1183 100644 --- a/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/model/ServiceProviderExtCuratorTest.java +++ b/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/model/ServiceProviderExtCuratorTest.java @@ -19,14 +19,13 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.flipkart.ranger.zookeeper.ServiceFinderBuilders; -import com.flipkart.ranger.zookeeper.ServiceProviderBuilders; import com.flipkart.ranger.core.finder.sharded.SimpleShardedServiceFinder; import com.flipkart.ranger.core.healthcheck.Healthchecks; -import com.flipkart.ranger.core.model.Deserializer; -import com.flipkart.ranger.core.model.Serializer; import com.flipkart.ranger.core.model.ServiceNode; import com.flipkart.ranger.core.serviceprovider.ServiceProvider; +import com.flipkart.ranger.zookeeper.ServiceFinderBuilders; +import com.flipkart.ranger.zookeeper.ServiceProviderBuilders; +import com.flipkart.ranger.zookeeper.serde.ZkNodeDataSerializer; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; @@ -46,7 +45,7 @@ public class ServiceProviderExtCuratorTest { private TestingCluster testingCluster; private ObjectMapper objectMapper; - private List> serviceProviders = Lists.newArrayList(); + private List>> serviceProviders = Lists.newArrayList(); private CuratorFramework curatorFramework; @Before @@ -66,7 +65,7 @@ public void startTestCluster() throws Exception { @After public void stopTestCluster() throws Exception { - for(ServiceProvider serviceProvider : serviceProviders) { + for(ServiceProvider> serviceProvider : serviceProviders) { serviceProvider.stop(); } curatorFramework.close(); @@ -117,18 +116,15 @@ public void testBasicDiscovery() throws Exception { .withCuratorFramework(curatorFramework) .withNamespace("test") .withServiceName("test-service") - .withDeserializer(new Deserializer() { - @Override - public ServiceNode deserialize(byte[] data) { - try { - return objectMapper.readValue(data, - new TypeReference>() { - }); - } catch (IOException e) { - e.printStackTrace(); - } - return null; + .withDeserializer(data -> { + try { + return objectMapper.readValue(data, + new TypeReference>() { + }); + } catch (IOException e) { + e.printStackTrace(); } + return null; }) .build(); serviceFinder.start(); @@ -158,21 +154,18 @@ public ServiceNode deserialize(byte[] data) { //while (true); } - private void registerService(String host, int port, int shardId) throws Exception { - final ServiceProvider serviceProvider = ServiceProviderBuilders.shardedServiceProviderBuilder() + private void registerService(String host, int port, int shardId) { + final ServiceProvider> serviceProvider = ServiceProviderBuilders.shardedServiceProviderBuilder() .withCuratorFramework(curatorFramework) .withNamespace("test") .withServiceName("test-service") - .withSerializer(new Serializer() { - @Override - public byte[] serialize(ServiceNode data) { - try { - return objectMapper.writeValueAsBytes(data); - } catch (JsonProcessingException e) { - e.printStackTrace(); - } - return null; + .withSerializer(data -> { + try { + return objectMapper.writeValueAsBytes(data); + } catch (JsonProcessingException e) { + e.printStackTrace(); } + return null; }) .withHostname(host) .withPort(port) diff --git a/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/model/ServiceProviderHealthcheckTest.java b/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/model/ServiceProviderHealthcheckTest.java index c3b6a793..3e31e1e9 100644 --- a/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/model/ServiceProviderHealthcheckTest.java +++ b/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/model/ServiceProviderHealthcheckTest.java @@ -19,15 +19,15 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.flipkart.ranger.zookeeper.ServiceFinderBuilders; -import com.flipkart.ranger.zookeeper.ServiceProviderBuilders; import com.flipkart.ranger.core.finder.sharded.SimpleShardedServiceFinder; import com.flipkart.ranger.core.healthcheck.Healthcheck; import com.flipkart.ranger.core.healthcheck.HealthcheckStatus; -import com.flipkart.ranger.core.model.Deserializer; -import com.flipkart.ranger.core.model.Serializer; import com.flipkart.ranger.core.model.ServiceNode; import com.flipkart.ranger.core.serviceprovider.ServiceProvider; +import com.flipkart.ranger.core.utils.TestUtils; +import com.flipkart.ranger.zookeeper.ServiceFinderBuilders; +import com.flipkart.ranger.zookeeper.ServiceProviderBuilders; +import com.flipkart.ranger.zookeeper.serde.ZkNodeDataSerializer; import com.google.common.collect.Maps; import org.apache.curator.test.TestingCluster; import org.junit.After; @@ -103,18 +103,15 @@ public void testBasicDiscovery() throws Exception { .withConnectionString(testingCluster.getConnectString()) .withNamespace("test") .withServiceName("test-service") - .withDeserializer(new Deserializer() { - @Override - public ServiceNode deserialize(byte[] data) { - try { - return objectMapper.readValue(data, - new TypeReference>() { - }); - } catch (IOException e) { - e.printStackTrace(); - } - return null; + .withDeserializer(data -> { + try { + return objectMapper.readValue(data, + new TypeReference>() { + }); + } catch (IOException e) { + e.printStackTrace(); } + return null; }) .withNodeRefreshIntervalMs(1000) .build(); @@ -124,7 +121,7 @@ public ServiceNode deserialize(byte[] data) { Assert.assertEquals("localhost-1", node.getHost()); TestServiceProvider testServiceProvider = serviceProviders.get(node.getHost()); testServiceProvider.oor(); - Thread.sleep(6000); + TestUtils.sleepForSeconds(6); Assert.assertNull(serviceFinder.get(new TestShardInfo(1))); serviceFinder.stop(); } @@ -172,20 +169,17 @@ public void oor() { } public void start() throws Exception { - final ServiceProvider serviceProvider = ServiceProviderBuilders.shardedServiceProviderBuilder() + final ServiceProvider> serviceProvider = ServiceProviderBuilders.shardedServiceProviderBuilder() .withConnectionString(connectionString) .withNamespace("test") .withServiceName("test-service") - .withSerializer(new Serializer() { - @Override - public byte[] serialize(ServiceNode data) { - try { - return objectMapper.writeValueAsBytes(data); - } catch (JsonProcessingException e) { - e.printStackTrace(); - } - return null; + .withSerializer(data -> { + try { + return objectMapper.writeValueAsBytes(data); + } catch (JsonProcessingException e) { + e.printStackTrace(); } + return null; }) .withHostname(host) .withPort(port) diff --git a/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/model/ServiceProviderTest.java b/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/model/ServiceProviderTest.java index a5b0a514..23432125 100644 --- a/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/model/ServiceProviderTest.java +++ b/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/model/ServiceProviderTest.java @@ -19,15 +19,14 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.flipkart.ranger.zookeeper.ServiceFinderBuilders; -import com.flipkart.ranger.zookeeper.ServiceProviderBuilders; import com.flipkart.ranger.core.finder.RoundRobinServiceNodeSelector; import com.flipkart.ranger.core.finder.sharded.SimpleShardedServiceFinder; import com.flipkart.ranger.core.healthcheck.Healthchecks; -import com.flipkart.ranger.core.model.Deserializer; -import com.flipkart.ranger.core.model.Serializer; import com.flipkart.ranger.core.model.ServiceNode; import com.flipkart.ranger.core.serviceprovider.ServiceProvider; +import com.flipkart.ranger.zookeeper.ServiceFinderBuilders; +import com.flipkart.ranger.zookeeper.ServiceProviderBuilders; +import com.flipkart.ranger.zookeeper.serde.ZkNodeDataSerializer; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.apache.curator.test.TestingCluster; @@ -44,7 +43,7 @@ public class ServiceProviderTest { private TestingCluster testingCluster; private ObjectMapper objectMapper; - private List> serviceProviders = Lists.newArrayList(); + private List>> serviceProviders = Lists.newArrayList(); @Before public void startTestCluster() throws Exception { @@ -59,7 +58,7 @@ public void startTestCluster() throws Exception { @After public void stopTestCluster() throws Exception { - for (ServiceProvider serviceProvider : serviceProviders) { + for (ServiceProvider> serviceProvider : serviceProviders) { serviceProvider.stop(); } if (null != testingCluster) { @@ -159,19 +158,16 @@ public void testBasicDiscoveryRR() throws Exception { .withNamespace("test") .withServiceName("test-service") .withNodeSelector(new RoundRobinServiceNodeSelector()) - .withDeserializer(new Deserializer() { - @Override - public ServiceNode deserialize(byte[] data) { - try { - return objectMapper.readValue(data, - new TypeReference>() { - }); - } - catch (IOException e) { - e.printStackTrace(); - } - return null; + .withDeserializer(data -> { + try { + return objectMapper.readValue(data, + new TypeReference>() { + }); + } + catch (IOException e) { + e.printStackTrace(); } + return null; }) .build(); serviceFinder.start(); @@ -207,18 +203,15 @@ public void testVisibility() throws Exception { .withNamespace("test") .withServiceName("test-service") .withNodeSelector(new RoundRobinServiceNodeSelector()) - .withDeserializer(new Deserializer() { - @Override - public ServiceNode deserialize(byte[] data) { - try { - return objectMapper.readValue(data, - new TypeReference>() { - }); - } catch (IOException e) { - e.printStackTrace(); - } - return null; + .withDeserializer(data -> { + try { + return objectMapper.readValue(data, + new TypeReference>() { + }); + } catch (IOException e) { + e.printStackTrace(); } + return null; }) .build(); serviceFinder.start(); @@ -233,21 +226,18 @@ public ServiceNode deserialize(byte[] data) { } private void registerService(String host, int port, int shardId) throws Exception { - final ServiceProvider serviceProvider = ServiceProviderBuilders.shardedServiceProviderBuilder() + final ServiceProvider> serviceProvider = ServiceProviderBuilders.shardedServiceProviderBuilder() .withConnectionString(testingCluster.getConnectString()) .withNamespace("test") .withServiceName("test-service") - .withSerializer(new Serializer() { - @Override - public byte[] serialize(ServiceNode data) { - try { - return objectMapper.writeValueAsBytes(data); - } - catch (JsonProcessingException e) { - e.printStackTrace(); - } - return null; + .withSerializer(data -> { + try { + return objectMapper.writeValueAsBytes(data); + } + catch (JsonProcessingException e) { + e.printStackTrace(); } + return null; }) .withHostname(host) .withPort(port) diff --git a/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/model/SimpleServiceProviderTest.java b/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/model/SimpleServiceProviderTest.java index 751b5efe..71d163bb 100644 --- a/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/model/SimpleServiceProviderTest.java +++ b/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/model/SimpleServiceProviderTest.java @@ -19,17 +19,15 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.flipkart.ranger.zookeeper.ServiceFinderBuilders; -import com.flipkart.ranger.zookeeper.ServiceProviderBuilders; import com.flipkart.ranger.core.finder.unsharded.UnshardedClusterFinder; import com.flipkart.ranger.core.finder.unsharded.UnshardedClusterInfo; import com.flipkart.ranger.core.healthcheck.Healthchecks; -import com.flipkart.ranger.core.model.Deserializer; -import com.flipkart.ranger.core.model.Serializer; import com.flipkart.ranger.core.model.ServiceNode; -import com.flipkart.ranger.core.serviceprovider.ServiceProvider; +import com.flipkart.ranger.zookeeper.ServiceFinderBuilders; +import com.flipkart.ranger.zookeeper.ServiceProviderBuilders; import com.google.common.collect.HashMultiset; import com.google.common.collect.Multiset; +import lombok.val; import org.apache.curator.test.TestingCluster; import org.junit.After; import org.junit.Assert; @@ -66,18 +64,15 @@ public void testBasicDiscovery() throws Exception { .withConnectionString(testingCluster.getConnectString()) .withNamespace("test") .withServiceName("test-service") - .withDeserializer(new Deserializer() { - @Override - public ServiceNode deserialize(byte[] data) { - try { - return objectMapper.readValue(data, - new TypeReference>() { - }); - } catch (IOException e) { - e.printStackTrace(); - } - return null; + .withDeserializer(data -> { + try { + return objectMapper.readValue(data, + new TypeReference>() { + }); + } catch (IOException e) { + e.printStackTrace(); } + return null; }) .withDisableWatchers() .build(); @@ -101,20 +96,17 @@ public ServiceNode deserialize(byte[] data) { } private void registerService(String host, int port, int shardId) throws Exception { - ServiceProvider serviceProvider = ServiceProviderBuilders.unshardedServiceProviderBuilder() + val serviceProvider = ServiceProviderBuilders.unshardedServiceProviderBuilder() .withConnectionString(testingCluster.getConnectString()) .withNamespace("test") .withServiceName("test-service") - .withSerializer(new Serializer() { - @Override - public byte[] serialize(ServiceNode data) { - try { - return objectMapper.writeValueAsBytes(data); - } catch (JsonProcessingException e) { - e.printStackTrace(); - } - return null; + .withSerializer(data -> { + try { + return objectMapper.writeValueAsBytes(data); + } catch (JsonProcessingException e) { + e.printStackTrace(); } + return null; }) .withHostname(host) .withPort(port) diff --git a/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/servicehub/ServiceHubTest.java b/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/servicehub/ServiceHubTest.java index 085f3942..6292b505 100644 --- a/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/servicehub/ServiceHubTest.java +++ b/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/servicehub/ServiceHubTest.java @@ -10,6 +10,7 @@ import com.flipkart.ranger.core.model.ServiceNode; import com.flipkart.ranger.core.signals.ExternalTriggeredSignal; import com.flipkart.ranger.core.util.Exceptions; +import com.flipkart.ranger.core.utils.TestUtils; import com.flipkart.ranger.zookeeper.ServiceProviderBuilders; import com.flipkart.ranger.zookeeper.zk.ZkServiceDataSource; import com.flipkart.ranger.zookeeper.zk.ZkShardedServiceFinderFactory; @@ -103,7 +104,7 @@ public void testHub() throws InterruptedException { refreshProviderSignal.trigger(); refreshHubSignal.trigger(); - Thread.sleep(3_000); + TestUtils.sleepForSeconds(3); val node = hub.finder(new Service(NAMESPACE, "s1")) .map(finder -> finder.get(new TestShardInfo("prod"))) .orElse(null); diff --git a/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/serviceprovider/BaseServiceProviderBuilderTest.java b/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/serviceprovider/BaseServiceProviderBuilderTest.java index 5f6b6134..110f9dd7 100644 --- a/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/serviceprovider/BaseServiceProviderBuilderTest.java +++ b/ranger-zookeeper/src/test/java/com/flipkart/ranger/zookeeper/serviceprovider/BaseServiceProviderBuilderTest.java @@ -18,12 +18,9 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.flipkart.ranger.zookeeper.ServiceProviderBuilders; -import com.flipkart.ranger.core.finder.unsharded.UnshardedClusterInfo; import com.flipkart.ranger.core.healthcheck.Healthchecks; -import com.flipkart.ranger.core.model.Serializer; -import com.flipkart.ranger.core.model.ServiceNode; -import com.flipkart.ranger.core.serviceprovider.ServiceProvider; +import com.flipkart.ranger.zookeeper.ServiceProviderBuilders; +import lombok.val; import org.apache.curator.test.TestingCluster; import org.junit.After; import org.junit.Assert; @@ -60,20 +57,17 @@ public void testbuilder() throws Exception { final int port = 9000; Exception exception = null; try { - ServiceProvider serviceProvider = ServiceProviderBuilders.unshardedServiceProviderBuilder() + val serviceProvider = ServiceProviderBuilders.unshardedServiceProviderBuilder() .withConnectionString(testingCluster.getConnectString()) .withNamespace("test") .withServiceName("test-service") - .withSerializer(new Serializer() { - @Override - public byte[] serialize(ServiceNode data) { - try { - return objectMapper.writeValueAsBytes(data); - } catch (JsonProcessingException e) { - e.printStackTrace(); - } - return null; + .withSerializer(data -> { + try { + return objectMapper.writeValueAsBytes(data); + } catch (JsonProcessingException e) { + e.printStackTrace(); } + return null; }) .withHostname(host) .withPort(port) @@ -84,20 +78,17 @@ public byte[] serialize(ServiceNode data) { } Assert.assertTrue(exception instanceof IllegalArgumentException); - ServiceProvider serviceProvider = ServiceProviderBuilders.unshardedServiceProviderBuilder() + val serviceProvider = ServiceProviderBuilders.unshardedServiceProviderBuilder() .withConnectionString(testingCluster.getConnectString()) .withNamespace("test") .withServiceName("test-service") - .withSerializer(new Serializer() { - @Override - public byte[] serialize(ServiceNode data) { - try { - return objectMapper.writeValueAsBytes(data); - } catch (JsonProcessingException e) { - e.printStackTrace(); - } - return null; + .withSerializer(data -> { + try { + return objectMapper.writeValueAsBytes(data); + } catch (JsonProcessingException e) { + e.printStackTrace(); } + return null; }) .withHostname(host) .withHealthcheck(Healthchecks.defaultHealthyCheck())