Skip to content

Commit

Permalink
HTTP Service finder
Browse files Browse the repository at this point in the history
* Added data source
* Added data sync
* Added HTTP client
  • Loading branch information
santanusinha committed Feb 29, 2020
1 parent 3ed5df7 commit 745846f
Show file tree
Hide file tree
Showing 45 changed files with 850 additions and 536 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@
<artifactId>guava-retrying</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.0.2</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
23 changes: 23 additions & 0 deletions ranger-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,28 @@
<version>${slf4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.11</version>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ public abstract class BaseServiceFinderBuilder
T,
R extends ServiceRegistry<T>,
F extends ServiceFinder<T, R>,
B extends BaseServiceFinderBuilder<T, R, F, B>> {
B extends BaseServiceFinderBuilder<T, R, F, B, D>,
D extends Deserializer<T>> {

protected String namespace;
protected String serviceName;
protected int nodeRefreshIntervalMs;
protected boolean disablePushUpdaters;
protected Deserializer<T> deserializer;
protected D deserializer;
protected ShardSelector<T, R> shardSelector;
protected ServiceNodeSelector<T> nodeSelector = new RandomServiceNodeSelector<>();
protected final List<Signal<T>> additionalRefreshSignals = new ArrayList<>();
Expand All @@ -59,7 +60,7 @@ public B withServiceName(final String serviceName) {
return (B)this;
}

public B withDeserializer(Deserializer<T> deserializer) {
public B withDeserializer(D deserializer) {
this.deserializer = deserializer;
return (B)this;
}
Expand Down Expand Up @@ -140,7 +141,7 @@ protected F buildFinder() {
val finder = buildFinder(service, shardSelector, nodeSelector);
val registry = finder.getServiceRegistry();
List<Signal<T>> signalGenerators = new ArrayList<>();
final NodeDataSource<T> nodeDataSource = dataSource(service);
final NodeDataSource<T, D> nodeDataSource = dataSource(service);

signalGenerators.add(new ScheduledRegistryUpdateSignal<>(service, nodeRefreshIntervalMs));
additionalRefreshSignals.addAll(implementationSpecificRefreshSignals(service, nodeDataSource));
Expand All @@ -149,7 +150,7 @@ protected F buildFinder() {
log.debug("Added additional signal handlers");
}

val updater = new ServiceRegistryUpdater<T>(registry, nodeDataSource, signalGenerators, deserializer);
val updater = new ServiceRegistryUpdater<T, D>(registry, nodeDataSource, signalGenerators, deserializer);
finder.getStartSignal()
.registerConsumers(startSignalHandlers)
.registerConsumer(x -> nodeDataSource.start())
Expand All @@ -164,11 +165,11 @@ protected F buildFinder() {
return finder;
}

protected List<Signal<T>> implementationSpecificRefreshSignals(Service service, NodeDataSource<T> nodeDataSource) {
protected List<Signal<T>> implementationSpecificRefreshSignals(Service service, NodeDataSource<T, D> nodeDataSource) {
return Collections.emptyList();
}

protected abstract NodeDataSource<T> dataSource(Service service);
protected abstract NodeDataSource<T, D> dataSource(Service service);

protected abstract F buildFinder(
Service service,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
public class ServiceRegistryUpdater<T> {
public class ServiceRegistryUpdater<T, D extends Deserializer<T>> {

private final ServiceRegistry<T> serviceRegistry;
private final NodeDataSource<T> nodeDataSource;
private final Deserializer<T> deserializer;
private final NodeDataSource<T,D> nodeDataSource;
private final D deserializer;

private Lock checkLock = new ReentrantLock();
private Condition checkCondition = checkLock.newCondition();
Expand All @@ -54,8 +54,9 @@ public class ServiceRegistryUpdater<T> {

public ServiceRegistryUpdater(
ServiceRegistry<T> serviceRegistry,
NodeDataSource<T> nodeDataSource,
List<Signal<T>> signalGenerators, Deserializer<T> deserializer) {
NodeDataSource<T,D> nodeDataSource,
List<Signal<T>> signalGenerators,
D deserializer) {
this.serviceRegistry = serviceRegistry;
this.nodeDataSource = nodeDataSource;
this.deserializer = deserializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
package com.flipkart.ranger.core.finder.sharded;

import com.flipkart.ranger.core.finder.BaseServiceFinderBuilder;
import com.flipkart.ranger.core.model.Deserializer;
import com.flipkart.ranger.core.model.Service;
import com.flipkart.ranger.core.model.ServiceNodeSelector;
import com.flipkart.ranger.core.model.ShardSelector;

public abstract class SimpleShardedServiceFinderBuilder<T, B extends SimpleShardedServiceFinderBuilder<T,B>>
extends BaseServiceFinderBuilder<T, MapBasedServiceRegistry<T>, SimpleShardedServiceFinder<T>, B> {
public abstract class SimpleShardedServiceFinderBuilder<T, B extends SimpleShardedServiceFinderBuilder<T,B, D>, D extends Deserializer<T>>
extends BaseServiceFinderBuilder<T, MapBasedServiceRegistry<T>, SimpleShardedServiceFinder<T>, B, D> {

@Override
protected SimpleShardedServiceFinder<T> buildFinder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

package com.flipkart.ranger.core.finder.unsharded;

import com.flipkart.ranger.core.model.Deserializer;
import com.flipkart.ranger.core.model.Service;
import com.flipkart.ranger.core.model.ShardSelector;
import com.flipkart.ranger.core.finder.BaseServiceFinderBuilder;
import com.flipkart.ranger.core.model.ServiceNodeSelector;

public abstract class UnshardedFinderBuilder<B extends UnshardedFinderBuilder<B>>
extends BaseServiceFinderBuilder<UnshardedClusterInfo, UnshardedClusterServiceRegistry, UnshardedClusterFinder, B> {
public abstract class UnshardedFinderBuilder<B extends UnshardedFinderBuilder<B, D>, D extends Deserializer<UnshardedClusterInfo>>
extends BaseServiceFinderBuilder<UnshardedClusterInfo, UnshardedClusterServiceRegistry, UnshardedClusterFinder, B, D> {

@Override
protected UnshardedClusterFinder buildFinder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,5 @@

package com.flipkart.ranger.core.model;

@FunctionalInterface
public interface Deserializer<T> {
ServiceNode<T> deserialize(final byte[] data);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.flipkart.ranger.core.model;

/**
*
*/
public interface NodeDataSink<T, S extends Serializer<T>> extends NodeDataStoreConnector<T> {
void updateState(S serializer, ServiceNode<T> serviceNode);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,10 @@
/**
*
*/
public interface NodeDataSource<T> {
void start();
public interface NodeDataSource<T, D extends Deserializer<T>> extends NodeDataStoreConnector<T> {
Optional<List<ServiceNode<T>>> refresh(D deserializer);

void ensureConnected();

void stop();

Optional<List<ServiceNode<T>>> refresh(Deserializer<T> deserializer);

boolean isActive();

void updateState(Serializer<T> serializer, ServiceNode<T> serviceNode);
default long healthcheckZombieCheckThresholdTime(Service service) {
return System.currentTimeMillis() - 60000; //1 Minute
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.flipkart.ranger.core.model;

/**
*
*/
public interface NodeDataStoreConnector<T> {
void start();

void ensureConnected();

void stop();

boolean isActive();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.flipkart.ranger.core.model;

@FunctionalInterface
public interface Serializer<T> {
byte[] serialize(final ServiceNode<T> data);
// byte[] serialize(final ServiceNode<T> data);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@
import com.flipkart.ranger.core.healthservice.HealthService;
import com.flipkart.ranger.core.healthservice.ServiceHealthAggregator;
import com.flipkart.ranger.core.healthservice.monitor.IsolatedHealthMonitor;
import com.flipkart.ranger.core.model.NodeDataSource;
import com.flipkart.ranger.core.model.Serializer;
import com.flipkart.ranger.core.model.Service;
import com.flipkart.ranger.core.model.ServiceNode;
import com.flipkart.ranger.core.model.*;
import com.flipkart.ranger.core.signals.ScheduledSignal;
import com.flipkart.ranger.core.signals.Signal;
import com.google.common.base.Preconditions;
Expand All @@ -41,18 +38,18 @@

@Slf4j
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public abstract class BaseServiceProviderBuilder<T, B extends BaseServiceProviderBuilder<T, B>> {
public abstract class BaseServiceProviderBuilder<T, B extends BaseServiceProviderBuilder<T, B, S>, S extends Serializer<T>> {

protected String namespace;
protected String serviceName;
protected Serializer<T> serializer;
protected S serializer;
protected String hostname;
protected int port;
protected T nodeData;
protected int healthUpdateIntervalMs;
protected int staleUpdateThresholdMs;
protected List<Healthcheck> healthchecks = Lists.newArrayList();
protected NodeDataSource<T> nodeDataSource = null;
protected NodeDataSink<T, S> nodeDataSource = null;
protected final List<Consumer<Void>> startSignalHandlers = Lists.newArrayList();
protected final List<Consumer<Void>> stopSignalHandlers = Lists.newArrayList();
protected final List<Signal<HealthcheckResult>> extraRefreshSignals = Lists.newArrayList();
Expand All @@ -70,8 +67,8 @@ public B withServiceName(final String serviceName) {
return (B)this;
}

public B withSerializer(Serializer<T> deserializer) {
this.serializer = deserializer;
public B withSerializer(S serializer) {
this.serializer = serializer;
return (B)this;
}

Expand Down Expand Up @@ -119,7 +116,7 @@ public B withIsolatedHealthMonitor(IsolatedHealthMonitor monitor) {
return (B)this;
}

public B withNodeDataSource(NodeDataSource<T> nodeDataSource) {
public B withNodeDataSource(NodeDataSink<T, S> nodeDataSource) {
this.nodeDataSource = nodeDataSource;
return (B)this;
}
Expand Down Expand Up @@ -154,7 +151,7 @@ public B withExtraRefreshSignals(List<Signal<HealthcheckResult>> extraRefreshSig
return (B)this;
}

protected final ServiceProvider<T> buildProvider() {
protected final ServiceProvider<T, S> buildProvider() {
Preconditions.checkNotNull(namespace);
Preconditions.checkNotNull(serviceName);
Preconditions.checkNotNull(serializer);
Expand Down Expand Up @@ -186,16 +183,17 @@ protected final ServiceProvider<T> buildProvider() {
new HealthChecker(healthchecks, staleUpdateThresholdMs),
Collections.emptyList(),
healthUpdateIntervalMs);
final NodeDataSource<T> usableNodeDataSource = dataSource(service);
final NodeDataSink<T,S> usableNodeDataSource = dataSink(service);
final List<HealthService> healthServices = Collections.singletonList(serviceHealthAggregator);

final List<Signal<HealthcheckResult>> signalGenerators
= ImmutableList.<Signal<HealthcheckResult>>builder()
.add(healthcheckUpdateSignalGenerator)
.addAll(extraRefreshSignals)
.build();
final ServiceProvider<T> serviceProvider = new ServiceProvider<>(service,
final ServiceProvider<T, S> serviceProvider = new ServiceProvider<>(service,
new ServiceNode<>(hostname, port, nodeData),
serializer,
usableNodeDataSource,
signalGenerators);
final Signal<Void> startSignal = serviceProvider.getStartSignal();
Expand All @@ -215,7 +213,7 @@ protected final ServiceProvider<T> buildProvider() {
return serviceProvider;
}

public abstract ServiceProvider<T> build();
public abstract ServiceProvider<T,S> build();

protected abstract NodeDataSource<T> dataSource(final Service service);
protected abstract NodeDataSink<T,S> dataSink(final Service service);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,23 @@

package com.flipkart.ranger.core.serviceprovider;

import com.flipkart.ranger.core.model.NodeDataSource;
import com.flipkart.ranger.core.model.Serializer;
import com.flipkart.ranger.core.model.Service;
import com.flipkart.ranger.core.model.*;
import com.flipkart.ranger.core.signals.ExternalTriggeredSignal;
import com.flipkart.ranger.core.signals.Signal;
import com.flipkart.ranger.core.healthcheck.HealthcheckResult;
import com.flipkart.ranger.core.model.ServiceNode;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.Collections;
import java.util.List;

@Slf4j
public class ServiceProvider<T> {
public class ServiceProvider<T, S extends Serializer<T>> {

private final Service service;
private final ServiceNode<T> serviceNode;
private final Serializer<T> serializer;
private final NodeDataSource<T> dataSource;
private final S serializer;
private final NodeDataSink<T, S> dataSink;
@Getter
private final ExternalTriggeredSignal<Void> startSignal = new ExternalTriggeredSignal<>(() -> null, Collections.emptyList());
@Getter
Expand All @@ -44,19 +41,19 @@ public class ServiceProvider<T> {
public ServiceProvider(
Service service,
ServiceNode<T> serviceNode,
Serializer<T> serializer,
NodeDataSource<T> dataSource,
S serializer,
NodeDataSink<T, S> dataSink,
List<Signal<HealthcheckResult>> signalGenerators) {
this.service = service;
this.serviceNode = serviceNode;
this.serializer = serializer;
this.dataSource = dataSource;
this.dataSink = dataSink;
signalGenerators.forEach(signalGenerator -> signalGenerator.registerConsumer(this::handleHealthUpdate));
}

public void start() {
startSignal.trigger();
dataSource.updateState(serializer, serviceNode);
dataSink.updateState(serializer, serviceNode);
log.debug("Set initial node data on zookeeper for {}", service.getServiceName());
}

Expand All @@ -71,7 +68,7 @@ private void handleHealthUpdate(HealthcheckResult result) {
}
serviceNode.setHealthcheckStatus(result.getStatus());
serviceNode.setLastUpdatedTimeStamp(result.getUpdatedTime());
dataSource.updateState(serializer, serviceNode);
dataSink.updateState(serializer, serviceNode);
log.debug("Updated node with health check result: {}", result);
}

Expand Down
Loading

0 comments on commit 745846f

Please sign in to comment.