Skip to content

Commit

Permalink
Revert "Replace deprecated INSTANCE_DFS_URI/DIR with INSTANCE_VOLUMES (
Browse files Browse the repository at this point in the history
…#2491)" (#2571)

This reverts commit 2673204.
  • Loading branch information
ivakegg authored Sep 19, 2024
1 parent bf1630d commit ddc514c
Showing 1 changed file with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class MultiRfileInputformat extends RFileInputFormat {
private static LoadingCache<Range,Set<Tuple2<String,Set<String>>>> locationMap = null;

protected static final Map<String,String> dfsUriMap = new ConcurrentHashMap<>();
protected static final Map<String,String> dfsDirMap = new ConcurrentHashMap<>();

@Override
public RecordReader<Key,Value> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
Expand Down Expand Up @@ -147,11 +148,12 @@ public static List<InputSplit> computeSplitPoints(AccumuloClient client, Configu
/**
* Attempt the following 1) try to get the default namespace from accumulo 2) Use the custom config option 3) use default name in the hdfs configuration
*/
if (dfsUriMap.get(tableId) == null) {
if (dfsUriMap.get(tableId) == null || dfsDirMap.get(tableId) == null) {

synchronized (MultiRfileInputformat.class) {
final InstanceOperations instOps = client.instanceOperations();
dfsUriMap.put(tableId, instOps.getSystemConfiguration().get(Property.INSTANCE_VOLUMES.getKey()));
dfsUriMap.put(tableId, instOps.getSystemConfiguration().get(Property.INSTANCE_DFS_URI.getKey()));
dfsDirMap.put(tableId, instOps.getSystemConfiguration().get(Property.INSTANCE_DFS_DIR.getKey()));
}
}

Expand All @@ -165,7 +167,7 @@ public static List<InputSplit> computeSplitPoints(AccumuloClient client, Configu
}
}

basePath = dfsUriMap.get(tableId);
basePath = dfsDirMap.get(tableId);

if (StringUtils.isEmpty(basePath)) {
basePath = ACCUMULO_BASE_PATH;
Expand Down

0 comments on commit ddc514c

Please sign in to comment.