Skip to content

Commit

Permalink
minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
santanusinha committed Sep 21, 2021
1 parent 52c9f70 commit 459e774
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 59 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@
target
*.ipr
*.iml
*.iws
*.iws
out
dep
23 changes: 14 additions & 9 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,18 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<guava.version>26.0-jre</guava.version>
<guava.version>30.1.1-jre</guava.version>
<curator.version>4.0.1</curator.version>
<jackson.version>2.10.1</jackson.version>
<slf4j.version>1.7.25</slf4j.version>
<jackson.version>2.12.5</jackson.version>
<slf4j.version>1.7.32</slf4j.version>

<maven.compiler.version>3.8.0</maven.compiler.version>
<java.version>1.8</java.version>
<lombok.version>1.18.4</lombok.version>
<lombok.version>1.18.20</lombok.version>

<junit.version>4.13.2</junit.version>
<awaitility.version>4.1.0</awaitility.version>
<guava-retrying.version>2.0.0</guava-retrying.version>
</properties>

<dependencies>
Expand All @@ -75,7 +78,7 @@
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>annotations</artifactId>
<version>3.0.1</version>
<version>3.0.1u2</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
Expand All @@ -85,7 +88,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.26</version>
<version>${slf4j.version}</version>
</dependency>

<dependency>
Expand All @@ -103,18 +106,18 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<version>${junit.version}</version>
</dependency>

<dependency>
<groupId>com.github.rholder</groupId>
<artifactId>guava-retrying</artifactId>
<version>2.0.0</version>
<version>${guava-retrying.version}</version>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.0.2</version>
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down Expand Up @@ -152,6 +155,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<id>attach-sources</id>
Expand All @@ -164,6 +168,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<id>attach-javadocs</id>
Expand Down
13 changes: 5 additions & 8 deletions ranger-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,16 @@

<artifactId>ranger-core</artifactId>

<properties>
<httpclient.version>4.5.13</httpclient.version>
</properties>

<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.11</version>
<version>${httpclient.version}</version>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,11 @@ protected F buildFinder() {
serviceName, nodeRefreshIntervalMs);
nodeRefreshIntervalMs = 1000;
}
Service service = new Service(namespace, serviceName);
val service = new Service(namespace, serviceName);
val finder = buildFinder(service, shardSelector, nodeSelector);
val registry = finder.getServiceRegistry();
List<Signal<T>> signalGenerators = new ArrayList<>();
final NodeDataSource<T, D> nodeDataSource = dataSource(service);
val signalGenerators = new ArrayList<Signal<T>>();
val nodeDataSource = dataSource(service);

signalGenerators.add(new ScheduledRegistryUpdateSignal<>(service, nodeRefreshIntervalMs));
additionalRefreshSignals.addAll(implementationSpecificRefreshSignals(service, nodeDataSource));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ public void start() {
try {
RetryerBuilder.<Boolean>newBuilder()
.retryIfResult(r -> null == r || !r)
.build()
.call(initialRefreshCompleted::get);
.build()
.call(initialRefreshCompleted::get);
}
catch (Exception e) {
Exceptions.illegalState("Could not perform initial state for service: " + serviceName, e);
}
log.info("Initial node list updated for service: {} in {}ms",
serviceName, stopwatch.elapsed(TimeUnit.MILLISECONDS));
serviceName, stopwatch.elapsed(TimeUnit.MILLISECONDS));
}

public void stop() {
Expand Down Expand Up @@ -114,6 +114,7 @@ private Void queryExecutor() {
catch (InterruptedException e) {
log.info("Updater thread interrupted");
Thread.currentThread().interrupt();
return null;
}
catch (Exception e) {
log.error("Registry update failed for service: " + serviceRegistry.getService().name(), e);
Expand Down Expand Up @@ -141,8 +142,8 @@ private void updateRegistry() {
initialRefreshCompleted.compareAndSet(false, true);
}
else {
log.warn("Null list returned from node data source. We are in a weird state. Keeping old list for {}",
serviceRegistry.getService().getServiceName());
log.warn("Empty list returned from node data source. We are in a weird state. Keeping old list for {}",
serviceRegistry.getService().getServiceName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.flipkart.ranger.core.signals.ExternalTriggeredSignal;
import com.flipkart.ranger.core.signals.ScheduledSignal;
import com.flipkart.ranger.core.signals.Signal;
import com.flipkart.ranger.core.util.Exceptions;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
Expand Down Expand Up @@ -111,7 +110,8 @@ private void monitor() {
}
catch (InterruptedException e) {
log.info("Updater thread interrupted");
Exceptions.illegalState(e);
Thread.currentThread().interrupt();
break;
}
finally {
updateAvailable = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,20 @@
import com.flipkart.ranger.http.config.HttpClientConfig;
import com.flipkart.ranger.http.serde.HTTPResponseDataDeserializer;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import okhttp3.HttpUrl;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

/**
*
*/
@Slf4j
public class HttpNodeDataSource<T, D extends HTTPResponseDataDeserializer<T>> extends HttpNodeDataStoreConnector<T> implements NodeDataSource<T, D> {

private final AtomicBoolean firstCall = new AtomicBoolean(false);

public HttpNodeDataSource(
Service service,
final HttpClientConfig config,
Expand All @@ -37,7 +33,7 @@ public HttpNodeDataSource(

@Override
public Optional<List<ServiceNode<T>>> refresh(D deserializer) {
final HttpUrl httpUrl = new HttpUrl.Builder()
val httpUrl = new HttpUrl.Builder()
.scheme(config.isSecure()
? "https"
: "http")
Expand All @@ -47,42 +43,34 @@ public Optional<List<ServiceNode<T>>> refresh(D deserializer) {
: config.getPort())
.encodedPath(String.format("/ranger/nodes/v1/%s/%s", service.getNamespace(), service.getServiceName()))
.build();

try {
final Response response = httpClient.newCall(new Request.Builder()
.url(httpUrl)
.get()
.build())
.execute();
val request = new Request.Builder()
.url(httpUrl)
.get()
.build();
try (val response = httpClient.newCall(request).execute()) {
if (response.isSuccessful()) {
final ResponseBody body = response.body();
if (null == body) {
log.warn("HTTP call to {} returned empty body", httpUrl.toString());
}
else {
final byte[] bytes;
try {
bytes = body.bytes();
try (final ResponseBody body = response.body()) {
if (null == body) {
log.warn("HTTP call to {} returned empty body", httpUrl);
}
finally {
if(null != body) {
body.close();
}
else {
final byte[] bytes = body.bytes();
return Optional.of(FinderUtils.filterValidNodes(
service,
deserializer.deserialize(bytes),
healthcheckZombieCheckThresholdTime(service)));
}
return Optional.of(FinderUtils.filterValidNodes(
service,
deserializer.deserialize(bytes),
healthcheckZombieCheckThresholdTime(service)));
}
}
else {
log.warn("HTTP call to {} returned: {}", httpUrl.toString(), response.code());
}
}
catch (IOException e) {
Exceptions.illegalState(e);
Exceptions.illegalState("Error fetching data from server: " + httpUrl, e);
}
throw new IllegalStateException("No data received from server");
log.error("No data returned from server: " + httpUrl);
return Optional.empty();
}

@Override
Expand Down

0 comments on commit 459e774

Please sign in to comment.