Skip to content

Commit

Permalink
Code cleanup and removals of exists() check before node data read
Browse files Browse the repository at this point in the history
  • Loading branch information
santanusinha committed Feb 29, 2020
1 parent 1c56a3d commit 1830115
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public ZkNodeDataSink(

@Override
public void updateState(S serializer, ServiceNode<T> serviceNode) {
if (stopped.get()) {
if (isStopped()) {
log.warn("Node has been stopped already for service: {}. No update will be possible.",
service.getServiceName());
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ public Optional<List<ServiceNode<T>>> refresh(D deserializer) {
}

private Optional<List<ServiceNode<T>>> checkForUpdateOnZookeeper(D deserializer) {
if (!started.get()) {
if (!isStarted()) {
log.warn("Data source is not yet started for service: {}. No nodes will be returned.",
service.getServiceName());
return Optional.empty();
}
if (stopped.get()) {
if (isStopped()) {
log.warn("Data source is stopped already for service: {}. No nodes will be returned.",
service.getServiceName());
return Optional.empty();
Expand All @@ -58,25 +58,8 @@ private Optional<List<ServiceNode<T>>> checkForUpdateOnZookeeper(D deserializer)
List<ServiceNode<T>> nodes = Lists.newArrayListWithCapacity(children.size());
log.debug("Found {} nodes for [{}]", children.size(), serviceName);
for (String child : children) {
final String path = String.format("%s/%s", parentPath, child);
boolean hasChild = null != curatorFramework.checkExists().forPath(path);
byte[] data = null;
boolean skipNode = false;
try {
data = hasChild
? curatorFramework.getData().forPath(path)
: null;
}
catch (KeeperException e) {
log.error("Could not get data for node: " + path, e);
skipNode = true;
}
if (null == data) {
log.warn("No data present for node: {} of [{}]", path, serviceName);
skipNode = true;
}
if (skipNode) {
log.debug("Skipping node: {}", path);
byte[] data = readChild(parentPath, child).orElse(null);
if (data == null) {
continue;
}
final ServiceNode<T> node = deserializer.deserialize(data);
Expand All @@ -92,4 +75,19 @@ private Optional<List<ServiceNode<T>>> checkForUpdateOnZookeeper(D deserializer)
return Optional.empty();
}

private Optional<byte[]> readChild(String parentPath, String child) throws Exception {
final String path = String.format("%s/%s", parentPath, child);
try {
return Optional.ofNullable(curatorFramework.getData().forPath(path));
}
catch (KeeperException.NoNodeException e) {
log.warn("Node not found for path {}", path);
return Optional.empty();
}
catch (KeeperException e) {
log.error("Could not get data for node: " + path, e);
return Optional.empty();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import com.flipkart.ranger.core.util.Exceptions;
import com.flipkart.ranger.zookeeper.util.PathBuilder;
import com.github.rholder.retry.*;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
Expand All @@ -19,9 +21,15 @@
@Slf4j
public class ZkNodeDataStoreConnector<T> implements NodeDataStoreConnector<T> {

@Getter(AccessLevel.PROTECTED)
protected final Service service;
@Getter(AccessLevel.PROTECTED)
protected final CuratorFramework curatorFramework;
protected final Retryer<Boolean> discoveryRetrier = RetryerBuilder.<Boolean>newBuilder()

private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean stopped = new AtomicBoolean(false);

private final Retryer<Boolean> discoveryRetrier = RetryerBuilder.<Boolean>newBuilder()
.retryIfException(e -> IllegalStateException.class.isAssignableFrom(e.getClass()))
.retryIfResult(aBoolean -> false)
.withAttemptTimeLimiter(AttemptTimeLimiters.noTimeLimit())
Expand All @@ -34,8 +42,6 @@ public <V> void onRetry(Attempt<V> attempt) {
}
})
.build();
protected final AtomicBoolean started = new AtomicBoolean(false);
protected final AtomicBoolean stopped = new AtomicBoolean(false);

protected ZkNodeDataStoreConnector(
final Service service,
Expand Down Expand Up @@ -100,4 +106,12 @@ public boolean isActive() {
return curatorFramework != null
&& (curatorFramework.getState() == CuratorFrameworkState.STARTED);
}

protected boolean isStarted() {
return started.get();
}

protected boolean isStopped() {
return stopped.get();
}
}

0 comments on commit 1830115

Please sign in to comment.