diff --git a/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkNodeDataSink.java b/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkNodeDataSink.java index cc39913a..3e5d27ab 100644 --- a/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkNodeDataSink.java +++ b/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkNodeDataSink.java @@ -25,7 +25,7 @@ public ZkNodeDataSink( @Override public void updateState(S serializer, ServiceNode serviceNode) { - if (stopped.get()) { + if (isStopped()) { log.warn("Node has been stopped already for service: {}. No update will be possible.", service.getServiceName()); return; diff --git a/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkNodeDataSource.java b/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkNodeDataSource.java index da78a2fc..9d701053 100644 --- a/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkNodeDataSource.java +++ b/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkNodeDataSource.java @@ -33,12 +33,12 @@ public Optional>> refresh(D deserializer) { } private Optional>> checkForUpdateOnZookeeper(D deserializer) { - if (!started.get()) { + if (!isStarted()) { log.warn("Data source is not yet started for service: {}. No nodes will be returned.", service.getServiceName()); return Optional.empty(); } - if (stopped.get()) { + if (isStopped()) { log.warn("Data source is stopped already for service: {}. No nodes will be returned.", service.getServiceName()); return Optional.empty(); @@ -58,25 +58,8 @@ private Optional>> checkForUpdateOnZookeeper(D deserializer) List> nodes = Lists.newArrayListWithCapacity(children.size()); log.debug("Found {} nodes for [{}]", children.size(), serviceName); for (String child : children) { - final String path = String.format("%s/%s", parentPath, child); - boolean hasChild = null != curatorFramework.checkExists().forPath(path); - byte[] data = null; - boolean skipNode = false; - try { - data = hasChild - ? curatorFramework.getData().forPath(path) - : null; - } - catch (KeeperException e) { - log.error("Could not get data for node: " + path, e); - skipNode = true; - } - if (null == data) { - log.warn("No data present for node: {} of [{}]", path, serviceName); - skipNode = true; - } - if (skipNode) { - log.debug("Skipping node: {}", path); + byte[] data = readChild(parentPath, child).orElse(null); + if (data == null) { continue; } final ServiceNode node = deserializer.deserialize(data); @@ -92,4 +75,19 @@ private Optional>> checkForUpdateOnZookeeper(D deserializer) return Optional.empty(); } + private Optional readChild(String parentPath, String child) throws Exception { + final String path = String.format("%s/%s", parentPath, child); + try { + return Optional.ofNullable(curatorFramework.getData().forPath(path)); + } + catch (KeeperException.NoNodeException e) { + log.warn("Node not found for path {}", path); + return Optional.empty(); + } + catch (KeeperException e) { + log.error("Could not get data for node: " + path, e); + return Optional.empty(); + } + } + } diff --git a/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkNodeDataStoreConnector.java b/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkNodeDataStoreConnector.java index 14cc6228..2ad0c2f4 100644 --- a/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkNodeDataStoreConnector.java +++ b/ranger-zookeeper/src/main/java/com/flipkart/ranger/zookeeper/zk/ZkNodeDataStoreConnector.java @@ -5,6 +5,8 @@ import com.flipkart.ranger.core.util.Exceptions; import com.flipkart.ranger.zookeeper.util.PathBuilder; import com.github.rholder.retry.*; +import lombok.AccessLevel; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.imps.CuratorFrameworkState; @@ -19,9 +21,15 @@ @Slf4j public class ZkNodeDataStoreConnector implements NodeDataStoreConnector { + @Getter(AccessLevel.PROTECTED) protected final Service service; + @Getter(AccessLevel.PROTECTED) protected final CuratorFramework curatorFramework; - protected final Retryer discoveryRetrier = RetryerBuilder.newBuilder() + + private final AtomicBoolean started = new AtomicBoolean(false); + private final AtomicBoolean stopped = new AtomicBoolean(false); + + private final Retryer discoveryRetrier = RetryerBuilder.newBuilder() .retryIfException(e -> IllegalStateException.class.isAssignableFrom(e.getClass())) .retryIfResult(aBoolean -> false) .withAttemptTimeLimiter(AttemptTimeLimiters.noTimeLimit()) @@ -34,8 +42,6 @@ public void onRetry(Attempt attempt) { } }) .build(); - protected final AtomicBoolean started = new AtomicBoolean(false); - protected final AtomicBoolean stopped = new AtomicBoolean(false); protected ZkNodeDataStoreConnector( final Service service, @@ -100,4 +106,12 @@ public boolean isActive() { return curatorFramework != null && (curatorFramework.getState() == CuratorFrameworkState.STARTED); } + + protected boolean isStarted() { + return started.get(); + } + + protected boolean isStopped() { + return stopped.get(); + } }