Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Http service #22

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<groupId>com.flipkart.ranger</groupId>
<artifactId>ranger</artifactId>
<packaging>jar</packaging>
<version>0.3.0</version>
<version>0.4.0</version>

<distributionManagement>
<repository>
Expand Down Expand Up @@ -94,6 +94,11 @@
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock-standalone</artifactId>
<version>2.18.0</version>
Tushar-Naik marked this conversation as resolved.
Show resolved Hide resolved
</dependency>

<dependency>
<groupId>com.github.rholder</groupId>
Expand All @@ -114,8 +119,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<source>8</source>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1.8

<target>8</target>
</configuration>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/**
* Copyright 2015 Flipkart Internet Pvt. Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.flipkart.ranger.finder;

import com.flipkart.ranger.model.ServiceNode;
import com.flipkart.ranger.model.ServiceRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public abstract class AbstractServiceRegistryUpdater<T> implements Callable<Void> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename this to ServiceRegistryUpdater

private static final Logger logger = LoggerFactory.getLogger(AbstractServiceRegistryUpdater.class);

public abstract void start() throws Exception;
public abstract void stop() throws Exception;

protected ServiceRegistry<T> serviceRegistry;

private Lock checkLock = new ReentrantLock();
private Condition checkCondition = checkLock.newCondition();
private boolean checkForUpdate = false;

public void setServiceRegistry(ServiceRegistry<T> serviceRegistry) {
this.serviceRegistry = serviceRegistry;
}

@Override
public Void call() throws Exception {
while (true) {
try {
checkLock.lock();
while (!checkForUpdate) {
checkCondition.await();
}
List<ServiceNode<T>> nodes = getHealthyServiceNodes();
if(null != nodes) {
logger.debug("Setting nodelist of size: " + nodes.size());
serviceRegistry.nodes(nodes);
}
else {
logger.warn("No service shards/nodes found. We are disconnected from zookeeper/http server. Keeping old list.");
}
checkForUpdate =false;
} finally {
checkLock.unlock();
}
}
}

protected abstract List<ServiceNode<T>> getHealthyServiceNodes();

public void checkForUpdate() {
try {
checkLock.lock();
checkForUpdate = true;
checkCondition.signalAll();
} finally {
checkLock.unlock();
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2015 Flipkart Internet Pvt. Ltd.
*
* <p>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

para not required

* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -16,47 +16,33 @@

package com.flipkart.ranger.finder;

import com.flipkart.ranger.model.Deserializer;
import com.flipkart.ranger.model.ServiceNodeSelector;
import com.flipkart.ranger.model.ServiceRegistry;
import com.flipkart.ranger.model.ShardSelector;
import com.flipkart.ranger.model.*;
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

public abstract class BaseServiceFinderBuilder<T, RegistryType extends ServiceRegistry<T>, FinderType extends ServiceFinder<T, RegistryType>> {
private String namespace;
private String serviceName;
private CuratorFramework curatorFramework;
private String connectionString;
private CuratorSourceConfig<T> curatorSourceConfig;
private HttpSourceConfig<T> httpSourceConfig;
private CuratorFrameworkConfig<T> curatorFrameworkConfig;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not do this.

Take only one source config. You do not handle the source specific build here.

  • Define interface for factory
  • Derive the abstract factory interface and from a source specific visitor to create a concrete factory
  • Pass that as the default value for factory
  • Let user do the following:
    • Pass the SourceConfig
    • Pass in an override of the FinderBuilderFactory
      The above will let people completely replace and add more implementations based on hz, consul, etcd etc without bringing all that junk into ranger itself.


private int healthcheckRefreshTimeMillis;
private Deserializer<T> deserializer;
private ShardSelector<T, RegistryType> shardSelector;
private ServiceNodeSelector<T> nodeSelector = new RandomServiceNodeSelector<T>();

public BaseServiceFinderBuilder<T, RegistryType, FinderType> withNamespace(final String namespace) {
this.namespace = namespace;
return this;
}

public BaseServiceFinderBuilder<T, RegistryType, FinderType> withServiceName(final String serviceName) {
this.serviceName = serviceName;
return this;
}

public BaseServiceFinderBuilder<T, RegistryType, FinderType> withCuratorFramework(CuratorFramework curatorFramework) {
this.curatorFramework = curatorFramework;
public BaseServiceFinderBuilder<T, RegistryType, FinderType> withHttpSourceConfig(HttpSourceConfig<T> httpSourceConfig) {
this.httpSourceConfig = httpSourceConfig;
return this;
}

public BaseServiceFinderBuilder<T, RegistryType, FinderType> withConnectionString(final String connectionString) {
this.connectionString = connectionString;
public BaseServiceFinderBuilder<T, RegistryType, FinderType> withCuratorSourceConfig(CuratorSourceConfig<T> curatorSourceConfig) {
this.curatorSourceConfig = curatorSourceConfig;
return this;
}

public BaseServiceFinderBuilder<T, RegistryType, FinderType> withDeserializer(Deserializer<T> deserializer) {
this.deserializer = deserializer;
public BaseServiceFinderBuilder<T, RegistryType, FinderType> withCuratorFrameworkConfig(CuratorFrameworkConfig<T> curatorFrameworkConfig) {
this.curatorFrameworkConfig = curatorFrameworkConfig;
return this;
}

Expand All @@ -75,29 +61,40 @@ public BaseServiceFinderBuilder<T, RegistryType, FinderType> witHhealthcheckRefr
return this;
}



public FinderType build() throws Exception {
Preconditions.checkNotNull(namespace);
Preconditions.checkNotNull(serviceName);
Preconditions.checkNotNull(deserializer);
if( null == curatorFramework) {
Preconditions.checkNotNull(connectionString);
curatorFramework = CuratorFrameworkFactory.builder()
.namespace(namespace)
.connectString(connectionString)
if (0 == healthcheckRefreshTimeMillis) {
healthcheckRefreshTimeMillis = 1000;
}
if (null != httpSourceConfig) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor to private method

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assert to make sure at-least one of http or ZK source config is provided.

Preconditions.checkNotNull(httpSourceConfig.getHost());
Preconditions.checkNotNull(httpSourceConfig.getPort());
Preconditions.checkNotNull(httpSourceConfig.getListDeserializer());
Preconditions.checkArgument((httpSourceConfig.getPort() > 0 && httpSourceConfig.getPort() < 65535));
HttpServiceRegistryUpdater<T> registryUpdater = new HttpServiceRegistryUpdater<T>(httpSourceConfig);
return buildFinder(httpSourceConfig, registryUpdater, shardSelector, nodeSelector, healthcheckRefreshTimeMillis);
}
if (curatorFrameworkConfig == null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor to private method

Preconditions.checkNotNull(curatorSourceConfig.getNamespace());
Preconditions.checkNotNull(curatorSourceConfig.getConnectionString());
Preconditions.checkNotNull(curatorSourceConfig.getDeserializer());
Preconditions.checkNotNull(curatorSourceConfig.getServiceName());
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.namespace(curatorSourceConfig.getNamespace())
.connectString(curatorSourceConfig.getConnectionString())
.retryPolicy(new ExponentialBackoffRetry(1000, 100)).build();
curatorFramework.start();
CuratorServiceRegistryUpdater<T> registryUpdater = new CuratorServiceRegistryUpdater<T>(curatorSourceConfig.getDeserializer(), curatorFramework, curatorSourceConfig.getServiceName());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not build this here. User an interface for building the updater. Change design to handle building the updater using a source specific builder like above with a default impl.

Again because we need to get out of the backend building exercise for good.

return buildFinder(curatorSourceConfig, registryUpdater, shardSelector, nodeSelector, healthcheckRefreshTimeMillis);
}
if( 0 == healthcheckRefreshTimeMillis) {
healthcheckRefreshTimeMillis = 1000;
}
Service service = new Service(curatorFramework, namespace, serviceName);
return buildFinder(service, deserializer, shardSelector, nodeSelector, healthcheckRefreshTimeMillis);
Preconditions.checkNotNull(curatorFrameworkConfig.getCuratorFramework());
Preconditions.checkNotNull(curatorFrameworkConfig.getDeserializer());
Preconditions.checkNotNull(curatorFrameworkConfig.getServiceName());
CuratorServiceRegistryUpdater<T> registryUpdater = new CuratorServiceRegistryUpdater<T>(curatorFrameworkConfig.getDeserializer(), curatorFrameworkConfig.getCuratorFramework(), curatorFrameworkConfig.getServiceName());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Break this line.

return buildFinder(curatorFrameworkConfig, registryUpdater, shardSelector, nodeSelector, healthcheckRefreshTimeMillis);
}

protected abstract FinderType buildFinder(Service service,
Deserializer<T> deserializer,
protected abstract FinderType buildFinder(SourceConfig config,
AbstractServiceRegistryUpdater<T> registryUpdater,
ShardSelector<T, RegistryType> shardSelector,
ServiceNodeSelector<T> nodeSelector,
int healthcheckRefreshTimeMillis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,30 @@

package com.flipkart.ranger.finder;

import com.flipkart.ranger.model.Deserializer;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;

public class Service {
public class CuratorFrameworkConfig<T> extends SourceConfig {
private CuratorFramework curatorFramework;
private String namespace;
private Deserializer<T> deserializer;
private String serviceName;

public Service(CuratorFramework curatorFramework, String namespace, String serviceName) {
public CuratorFrameworkConfig(CuratorFramework curatorFramework, String serviceName, Deserializer<T> deserializer) {
super(SourceConfig.ServiceType.CURATOR);
this.curatorFramework = curatorFramework;
this.namespace = namespace;
this.deserializer = deserializer;
this.serviceName = serviceName;
}

public CuratorFramework getCuratorFramework() {
return curatorFramework;
}

public String getNamespace() {
return namespace;
}

public String getServiceName() {
return serviceName;
}

public boolean isRunning() {
return curatorFramework != null
&& (curatorFramework.getState() == CuratorFrameworkState.STARTED);
public Deserializer<T> getDeserializer() {
return deserializer;
}

public CuratorFramework getCuratorFramework() {
return curatorFramework;
}
}
Loading