diff --git a/microservices/starters/datawave b/microservices/starters/datawave
index 2baa5f42a43..27a6063348c 160000
--- a/microservices/starters/datawave
+++ b/microservices/starters/datawave
@@ -1 +1 @@
-Subproject commit 2baa5f42a4369ba583c359c5e2cbf8e38f1a59b6
+Subproject commit 27a6063348cadb190bfe15282c3d1444dfb5b2fa
diff --git a/pom.xml b/pom.xml
index dfd2d6ba3db..10770bef1d7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -38,7 +38,7 @@
11
UTF-8
1C
- 2.1.2
+ 2.1.4-97e4684860
1.4.1.Final
1.0.0.Final
3.20.2
diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java
index b95a7a6a177..936e5174395 100755
--- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java
+++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java
@@ -88,7 +88,6 @@
*/
public final class BulkIngestMapFileLoader implements Runnable {
private static final Logger log = Logger.getLogger(BulkIngestMapFileLoader.class);
- private static final Gson gson = new GsonBuilder().registerTypeHierarchyAdapter(byte[].class, new ByteArrayToBase64TypeAdapter()).create();
private static int SLEEP_TIME = 30000;
private static int FAILURE_SLEEP_TIME = 10 * 60 * 1000; // 10 minutes
private static int MAX_DIRECTORIES = 1;
@@ -1014,13 +1013,13 @@ private LoadPlan getLoadPlan() throws IOException {
in.readFully(0, buffer);
String s = new String(buffer, StandardCharsets.UTF_8);
// TODO: Use Gson streaming api instead to minimize impact on heap and cpu
- builder.addPlan(gson.fromJson(s, LoadPlan.class));
+ builder.addPlan(LoadPlan.fromJson(s));
}
}
LoadPlan lp = builder.build();
log.debug("Completed deserializing load plan for " + tableDir);
if (log.isTraceEnabled()) {
- log.trace("Consolidated LoadPlan for " + tableDir + ": " + gson.toJson(lp));
+ log.trace("Consolidated LoadPlan for " + tableDir + ": " + lp.toJson());
}
return lp;
}
diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatter.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatter.java
index 965d71eec09..c063c927bb2 100644
--- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatter.java
+++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatter.java
@@ -3,27 +3,24 @@
import static datawave.ingest.mapreduce.job.BulkIngestMapFileLoader.BULK_IMPORT_MODE_CONFIG;
import static org.apache.accumulo.core.conf.Property.TABLE_CRYPTO_PREFIX;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Objects;
import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.rfile.RFileWriter;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.Property;
@@ -32,11 +29,9 @@
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.LoadPlan;
-import org.apache.accumulo.core.data.LoadPlan.RangeType;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVIterator;
-import org.apache.accumulo.core.file.FileSKVWriter;
import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
import org.apache.accumulo.core.spi.crypto.CryptoService;
@@ -60,8 +55,6 @@
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
import datawave.ingest.data.config.ingest.AccumuloHelper;
import datawave.ingest.mapreduce.handler.shard.ShardedDataTypeHandler;
@@ -73,9 +66,6 @@ public class MultiRFileOutputFormatter extends FileOutputFormat writers = null;
protected boolean loadPlanningEnabled = false;
protected Map> loadPlans = null;
@@ -264,7 +254,7 @@ protected void createAndRegisterWriter(String key, String table, Path filename,
filename = insertFileCount(filename, count.intValue());
// now create and register the writer
- SizeTrackingWriter writer = openWriter(filename.toString(), tableConf);
+ SizeTrackingWriter writer = openWriter(filename.toString(), tableConf, table);
writer.startDefaultLocalityGroup();
writers.put(key, writer);
unusedWriterPaths.put(key, filename);
@@ -275,14 +265,20 @@ protected void createAndRegisterWriter(String key, String table, Path filename,
}
}
- protected SizeTrackingWriter openWriter(String filename, AccumuloConfiguration tableConf) throws IOException {
+ protected SizeTrackingWriter openWriter(String filename, AccumuloConfiguration tableConf, String table) throws IOException {
startWriteTime = System.currentTimeMillis();
// @formatter:off
- CryptoService cs = CryptoFactoryLoader.getServiceForClient(
- CryptoEnvironment.Scope.TABLE, tableConf.getAllCryptoProperties());
- return new SizeTrackingWriter(
- FileOperations.getInstance().newWriterBuilder().forFile(filename, fs, conf, cs).withTableConfiguration(tableConf).build(),
- this.loadPlanningEnabled);
+
+ var builder = org.apache.accumulo.core.client.rfile.RFile.newWriter().to(filename).withFileSystem(fs).withTableProperties(tableConf);
+ if(this.loadPlanningEnabled) {
+ var splits = SplitsFile.getSplits(conf, table).get();
+ if(splits != null && !splits.isEmpty()) {
+ LoadPlan.SplitResolver splitResolver = row->findContainingSplits(row, splits);
+ builder = builder.withSplitResolver(splitResolver);
+ }
+ }
+
+ return new SizeTrackingWriter(builder.build());
// @formatter:on
}
@@ -304,10 +300,10 @@ protected void closeAndUpdateWriter(String key) throws IOException, AccumuloExce
Path file = usedWriterPaths.get(key);
// don't bother if this writer has not been used yet
if (file != null) {
- if (writer.isLoadPlanning()) {
- computeLoadPlan(writer, table, file);
- }
writer.close();
+ if (loadPlanningEnabled) {
+ addLoadPlan(table, writer.getLoadPlan(file.getName()));
+ }
// pull the index off the filename
file = removeFileCount(file);
createAndRegisterWriter(key, table, file, tableConfigs.get(table));
@@ -315,29 +311,8 @@ protected void closeAndUpdateWriter(String key) throws IOException, AccumuloExce
}
}
- /**
- * Init table's LoadPlan list and compute the plan for the given rfile asynchronously
- *
- * @param writer
- * RFile writer
- * @param tableName
- * table name
- * @param rfile
- * rfile path
- */
- protected void computeLoadPlan(SizeTrackingWriter writer, String tableName, Path rfile) {
- if (!loadPlans.containsKey(tableName)) {
- loadPlans.put(tableName, new LinkedList<>());
- }
- // @formatter:off
- loadPlanFutures.add(CompletableFuture
- .supplyAsync(() -> compute(rfile, writer.rows, tableName, conf))
- .thenAccept(plan -> {
- if (plan != null) {
- loadPlans.get(tableName).add(plan);
- }
- }));
- // @formatter:on
+ private void addLoadPlan(String tableName, LoadPlan loadPlan) {
+ loadPlans.computeIfAbsent(tableName, t -> new ArrayList<>()).add(loadPlan);
}
protected void commitLoadPlans(TaskAttemptContext context) {
@@ -362,12 +337,11 @@ private void writeLoadPlans(TaskAttemptContext context) {
var table = entry.getKey();
var path = new Path(String.format("%s/%s", workDir, table), getUniqueFile(context, "loadplan", ".json"));
var loadPlan = builder.build();
- // TODO: Use Gson streaming api instead (JsonWriter) to minimize impact on heap
//@formatter:off
futures.add(CompletableFuture.runAsync(() -> {
try (FSDataOutputStream out = fs.create(path)) {
log.debug("Begin writing load plan for " + path);
- out.write(gson.toJson(loadPlan).getBytes(StandardCharsets.UTF_8));
+ out.write(loadPlan.toJson().getBytes(StandardCharsets.UTF_8));
log.debug("Completed writing load plan for " + path);
} catch (IOException ioe) {
log.error("Failed to write plan for " + path, ioe);
@@ -387,67 +361,7 @@ private void writeLoadPlans(TaskAttemptContext context) {
}
/**
- * Creates a {@link LoadPlan} for the given RFile by mapping its row values to the relevant KeyExtents from the given table.
- *
- * @param rfile
- * RFile path
- * @param rfileRows
- * Set of rows contained in the RFile
- * @param tableName
- * Table whose splits are to be examined to create the mapping
- * @param conf
- * The configuration required to retrieve table splits
- * @return LoadPlan for the RFile
- */
- static LoadPlan compute(Path rfile, SortedSet rfileRows, String tableName, Configuration conf) {
- if (rfileRows != null && !rfileRows.isEmpty()) {
- try {
- var splits = SplitsFile.getSplits(conf, tableName);
- return compute(rfile, rfileRows, splits);
- } catch (Exception e) {
- throw new RuntimeException("Failed to retrieve splits!", e);
- }
- }
- return null;
- }
-
- /**
- * Creates a {@link LoadPlan} for the given RFile by mapping its row values to the relevant KeyExtents from the given table.
- *
- * @param rfile
- * RFile path
- * @param rfileRows
- * Set of rows contained in the RFile
- * @param tableSplits
- * Splits for the table being targeted
- * @return LoadPlan for the RFile.
- */
- static LoadPlan compute(Path rfile, SortedSet rfileRows, List tableSplits) {
- // @formatter:off
- var builder = LoadPlan.builder();
- if (tableSplits == null || tableSplits.isEmpty()) {
- // RFile examination and mapping handled later by bulk import client
- log.debug("Calculating FILE load plan for " + rfile);
- var ke = new KeyExtent(rfileRows.first(), rfileRows.last());
- builder.addPlan(LoadPlan.builder().loadFileTo(
- rfile.getName(), RangeType.FILE, ke.prevEndRow, ke.endRow).build());
- } else {
- // Compute extent mapping so that we can skip examination at load time
- log.debug("Calculating TABLE load plan for " + rfile);
- rfileRows.stream()
- .map(row -> findKeyExtent(row, tableSplits))
- .collect(Collectors.toCollection(HashSet::new))
- .forEach(ke -> builder.addPlan(LoadPlan.builder().loadFileTo(
- rfile.getName(), RangeType.TABLE, ke.prevEndRow, ke.endRow).build())
- );
- log.debug("Table load plan completed for file: " + rfile);
- }
- // @formatter:on
- return builder.build();
- }
-
- /**
- * Finds the KeyExtent where the specified row should reside
+ * Finds two contiguous table splits that contain the specified row should reside
*
* @param lookupRow
* Row value to be mapped
@@ -455,51 +369,20 @@ static LoadPlan compute(Path rfile, SortedSet rfileRows, List tableS
* Splits for the table in question
* @return KeyExtent mapping for the given row
*/
- static KeyExtent findKeyExtent(Text lookupRow, List tableSplits) {
- var ke = new KeyExtent();
- var ceilingIdx = findCeiling(lookupRow, tableSplits);
- if (ceilingIdx == -1) {
- // Last tablet (endRow remains null)
- ke.prevEndRow = tableSplits.get(tableSplits.size() - 1);
- } else {
- ke.endRow = tableSplits.get(ceilingIdx);
- // If ceiling == 0, then first tablet (prevEndRow remains null)
- if (ceilingIdx > 0) {
- ke.prevEndRow = tableSplits.get(ceilingIdx - 1);
- }
+ static LoadPlan.TableSplits findContainingSplits(Text lookupRow, List tableSplits) {
+ int position = Collections.binarySearch(tableSplits, lookupRow);
+ if (position < 0) {
+ position = -1 * (position + 1);
}
- return ke;
- }
- /**
- * Performs binary search on tableSplits to find the index of the first (least) split ≥ lookupRow
- *
- * @param lookupRow
- * Row for which we want to find the ceiling
- * @param tableSplits
- * Sorted table tableSplits list whose implementation is assumed to provide fast random access (i.e., {@link java.util.RandomAccess})
- * @return index of the first split ≥ lookupRow, or -1 if (lookupRow > tableSplits.get(tableSplits.size()-1)
- */
- static int findCeiling(Text lookupRow, List tableSplits) {
- int begin = 0;
- int end = tableSplits.size() - 1;
- int ceiling = -1;
- while (begin <= end) {
- int middle = (begin + end) / 2;
- if (tableSplits.get(middle).compareTo(lookupRow) >= 0) {
- end = middle - 1;
- ceiling = middle;
- } else {
- begin = middle + 1;
- }
- }
- return ceiling;
+ Text prevRow = position == 0 ? null : tableSplits.get(position - 1);
+ Text endRow = position == tableSplits.size() ? null : tableSplits.get(position);
+
+ return new LoadPlan.TableSplits(prevRow, endRow);
}
- public static class SizeTrackingWriter implements FileSKVWriter {
- private FileSKVWriter delegate;
- private boolean loadPlanning;
- SortedSet rows;
+ public static class SizeTrackingWriter {
+ private RFileWriter delegate;
long size = 0;
int entries = 0;
@@ -511,12 +394,9 @@ public int getNumEntries() {
return entries;
}
- public boolean supportsLocalityGroups() {
- return delegate.supportsLocalityGroups();
- }
-
public void startNewLocalityGroup(String name, Set columnFamilies) throws IOException {
- delegate.startNewLocalityGroup(name, columnFamilies);
+ var fams = columnFamilies.stream().map(bs -> bs.toArray()).collect(Collectors.toList());
+ delegate.startNewLocalityGroup(name, fams);
}
public void startDefaultLocalityGroup() throws IOException {
@@ -527,34 +407,22 @@ public void append(Key key, Value value) throws IOException {
entries++;
size += key.getLength() + (value == null ? 0 : value.getSize());
delegate.append(key, value);
- if (loadPlanning) {
- rows.add(key.getRow());
- }
- }
-
- public DataOutputStream createMetaStore(String name) throws IOException {
- return delegate.createMetaStore(name);
}
public void close() throws IOException {
delegate.close();
}
- @Override
public long getLength() throws IOException {
return getSize();
}
- public boolean isLoadPlanning() {
- return loadPlanning;
+ public LoadPlan getLoadPlan(String filename) {
+ return delegate.getLoadPlan(filename);
}
- public SizeTrackingWriter(FileSKVWriter delegate, boolean loadPlanning) {
+ public SizeTrackingWriter(RFileWriter delegate) {
this.delegate = delegate;
- this.loadPlanning = loadPlanning;
- if (this.loadPlanning) {
- rows = new TreeSet<>();
- }
}
}
@@ -797,12 +665,13 @@ public void write(BulkIngestKey key, Value value) throws IOException {
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
for (Map.Entry entry : writers.entrySet()) {
var writer = entry.getValue();
- if (writer.isLoadPlanning()) {
+ writer.close();
+ if (loadPlanningEnabled) {
var tableName = writerTableNames.get(entry.getKey());
var rfilePath = usedWriterPaths.get(entry.getKey());
- computeLoadPlan(writer, tableName, rfilePath);
+ addLoadPlan(tableName, writer.getLoadPlan(rfilePath.getName()));
}
- writer.close();
+
}
if (loadPlanningEnabled) {
@@ -921,56 +790,4 @@ protected Map getShardLocations(String tableName) throws IOExceptio
return tableShardLocations.get(tableName);
}
-
- /**
- * Simplified representation of an Accumulo KeyExtent, used here to track mapped tablets during LoadPlan creation (and to avoid yet another Accumulo
- * "disallowed import")
- */
- static class KeyExtent implements Comparable {
- Text prevEndRow = null;
- Text endRow = null;
-
- KeyExtent(Text prevEndRow, Text endRow) {
- this.prevEndRow = prevEndRow;
- this.endRow = endRow;
- }
-
- KeyExtent() {}
-
- private static final Comparator COMPARATOR = Comparator.comparing(KeyExtent::endRow, Comparator.nullsLast(Text::compareTo))
- .thenComparing(KeyExtent::prevEndRow, Comparator.nullsFirst(Text::compareTo));
-
- public Text endRow() {
- return endRow;
- }
-
- public Text prevEndRow() {
- return prevEndRow;
- }
-
- @Override
- public int compareTo(KeyExtent other) {
- return COMPARATOR.compare(this, other);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
- KeyExtent keyExtent = (KeyExtent) o;
- return Objects.equals(endRow, keyExtent.endRow) && Objects.equals(prevEndRow, keyExtent.prevEndRow);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(endRow, prevEndRow);
- }
-
- @Override
- public String toString() {
- return (prevEndRow == null ? "null" : prevEndRow.toString()) + ";" + (endRow == null ? "null" : endRow.toString());
- }
- }
}
diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SortedList.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SortedList.java
new file mode 100644
index 00000000000..41703ec13da
--- /dev/null
+++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SortedList.java
@@ -0,0 +1,77 @@
+package datawave.ingest.mapreduce.job;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Wraps a list that is immutable and verified as sorted.
+ */
+public class SortedList {
+
+ private static final Logger log = Logger.getLogger(SortedList.class);
+
+ private final List list;
+
+ private SortedList(List list) {
+ this.list = list;
+ }
+
+ public List get() {
+ return list;
+ }
+
+ private static final SortedList> EMPTY = new SortedList<>(List.of());
+
+ @SuppressWarnings("unchecked")
+ public static SortedList empty() {
+ return (SortedList) EMPTY;
+ }
+
+ /**
+ * For a list that is expected to be sorted this will verify it is sorted and if so return an immutable copy of it. If this list is not sorted it will log a
+ * warning, copy it, sort the copy, and return an immutable version of the copy.
+ */
+ public static SortedList fromSorted(List list) {
+ if (list.isEmpty()) {
+ return empty();
+ }
+
+ var copy = List.copyOf(list);
+
+ // verify after copying because nothing can change at this point
+ boolean isSorted = true;
+ for (int i = 1; i < copy.size(); i++) {
+ @SuppressWarnings("unchecked")
+ var prev = (Comparable super T2>) copy.get(i - 1);
+ if (prev.compareTo(copy.get(i)) > 0) {
+ isSorted = false;
+ }
+ }
+
+ if (isSorted) {
+ return new SortedList<>(copy);
+ } else {
+ log.warn("Input list of size " + copy.size() + " was expected to be sorted but was not", new IllegalArgumentException());
+ return fromUnsorted(copy);
+ }
+ }
+
+ /**
+ * Copies a list and sorts the copy returning an immutable version of the copy.
+ */
+ public static SortedList fromUnsorted(List list) {
+ if (list.isEmpty()) {
+ return empty();
+ }
+
+ var copy = new ArrayList<>(list);
+ @SuppressWarnings("unchecked")
+ var compartor = (Comparator super T2>) Comparator.naturalOrder();
+ list.sort(compartor);
+ return new SortedList<>(Collections.unmodifiableList(copy));
+ }
+}
diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SplitsFile.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SplitsFile.java
index 28283f940b9..00a84f088d5 100644
--- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SplitsFile.java
+++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SplitsFile.java
@@ -186,7 +186,7 @@ private static boolean shardsAreBalanced(Map locations, String date
return dateIsBalanced;
}
- public static Map> getSplits(Configuration conf) throws IOException {
+ public static Map> getSplits(Configuration conf) throws IOException {
return TableSplitsCache.getCurrentCache(conf).getSplits();
}
@@ -195,7 +195,7 @@ public static Map getSplitsAndLocations(Configuration conf, String
return TableSplitsCache.getCurrentCache(conf).getSplitsAndLocationByTable(tableName);
}
- public static List getSplits(Configuration conf, String tableName) throws IOException {
+ public static SortedList getSplits(Configuration conf, String tableName) throws IOException {
return TableSplitsCache.getCurrentCache(conf).getSplits(tableName);
}
}
diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java
index ab901845675..32337c3cfa5 100644
--- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java
+++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java
@@ -69,7 +69,7 @@ public class TableSplitsCache extends BaseHdfsFileCacheUtil {
private Path splitsPath = null;
private Map> splitLocations = new HashMap<>();
- private Map> splits = new HashMap<>();
+ private Map> splits = new HashMap<>();
private PartitionerCache partitionerCache;
@@ -116,6 +116,7 @@ public static void clear() {
* maximum number of splits to return
* @return split points grouped into fewer evenly grouped splits so as not to exceed maxSplits
*/
+ // TODO seems like this code is only used in test
public static List trimSplits(List tableSplits, int maxSplits) {
if (tableSplits.size() <= maxSplits) {
return tableSplits;
@@ -363,6 +364,7 @@ protected void readCache(BufferedReader in) throws IOException {
String line;
String tableName = null;
Map tmpSplitLocations = new ShardLocationTrieMap();
+ Map> tableSplits = new HashMap<>();
List tmpSplits = null;
while ((line = in.readLine()) != null) {
@@ -374,7 +376,7 @@ protected void readCache(BufferedReader in) throws IOException {
tableName = parts[0];
tmpSplitLocations = new ShardLocationTrieMap();
tmpSplits = new ArrayList<>();
- this.splits.put(tableName, Collections.unmodifiableList(tmpSplits));
+ tableSplits.put(tableName, tmpSplits);
}
if (parts.length >= 2) {
Text split = new Text(Base64.decodeBase64(parts[1]));
@@ -387,6 +389,11 @@ protected void readCache(BufferedReader in) throws IOException {
if (!tmpSplitLocations.isEmpty()) {
this.splitLocations.put(tableName, tmpSplitLocations);
}
+
+ tableSplits.forEach((table, splits) -> {
+ this.splits.put(table, SortedList.fromSorted(splits));
+ });
+
in.close();
}
@@ -407,12 +414,12 @@ private String dedup(Map dedupMap, String value) {
* @throws IOException
* for issues with read or write
*/
- public List getSplits(String table) throws IOException {
+ public SortedList getSplits(String table) throws IOException {
if (this.splits.isEmpty()) {
read();
}
- List splitList = this.splits.get(table);
- return (splitList == null ? Collections.emptyList() : splitList);
+ SortedList splitList = this.splits.get(table);
+ return (splitList == null ? SortedList.empty() : splitList);
}
/**
@@ -425,8 +432,9 @@ public List getSplits(String table) throws IOException {
* @throws IOException
* for issues with read or write
*/
+ // TODO seems like this code is only used in test, can it be removed? Looked into making it return SortedList
public List getSplits(String table, int maxSplits) throws IOException {
- return trimSplits(getSplits(table), maxSplits);
+ return trimSplits(getSplits(table).get(), maxSplits);
}
/**
@@ -434,7 +442,7 @@ public List getSplits(String table, int maxSplits) throws IOException {
* @throws IOException
* for issues with read or write
*/
- public Map> getSplits() throws IOException {
+ public Map> getSplits() throws IOException {
if (this.splits.isEmpty())
read();
return Collections.unmodifiableMap(splits);
diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/BalancedShardPartitioner.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/BalancedShardPartitioner.java
index 4eb8b2b43fe..f09618a0585 100755
--- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/BalancedShardPartitioner.java
+++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/BalancedShardPartitioner.java
@@ -23,6 +23,7 @@
import datawave.ingest.mapreduce.handler.shard.ShardIdFactory;
import datawave.ingest.mapreduce.handler.shard.ShardedDataTypeHandler;
import datawave.ingest.mapreduce.job.BulkIngestKey;
+import datawave.ingest.mapreduce.job.SortedList;
import datawave.ingest.mapreduce.job.SplitsFile;
import datawave.util.time.DateHelper;
@@ -134,7 +135,7 @@ private HashMap getPartitionsByShardId(String tableName) throws IO
if (log.isDebugEnabled())
log.debug("Loading splits data for " + tableName);
- List sortedSplits = SplitsFile.getSplits(conf, tableName);
+ SortedList sortedSplits = SplitsFile.getSplits(conf, tableName);
Map shardIdToLocation = SplitsFile.getSplitsAndLocations(conf, tableName);
if (log.isDebugEnabled())
@@ -158,7 +159,7 @@ private HashMap getPartitionsByShardId(String tableName) throws IO
* the map of shard ids and their location
* @return shardId to
*/
- private HashMap assignPartitionsForEachShard(List sortedShardIds, Map shardIdToLocations) {
+ private HashMap assignPartitionsForEachShard(SortedList sortedShardIds, Map shardIdToLocations) {
int totalNumUniqueTServers = calculateNumberOfUniqueTservers(shardIdToLocations);
HashMap partitionsByTServer = getTServerAssignments(totalNumUniqueTServers, sortedShardIds, shardIdToLocations);
@@ -178,11 +179,11 @@ private int calculateNumberOfUniqueTservers(Map shardIdToLocations)
return totalNumUniqueTServers;
}
- private HashMap getTServerAssignments(int totalNumTServers, List sortedShardIds, Map shardIdsToTservers) {
+ private HashMap getTServerAssignments(int totalNumTServers, SortedList sortedShardIds, Map shardIdsToTservers) {
HashMap partitionsByTServer = new HashMap<>(totalNumTServers);
int nextAvailableSlot = 0;
boolean alreadySkippedFutureShards = false;
- for (Text shard : sortedShardIds) {
+ for (Text shard : sortedShardIds.get()) {
if (alreadySkippedFutureShards || !isFutureShard(shard)) { // short circuiting for performance
alreadySkippedFutureShards = true;
String location = shardIdsToTservers.get(shard);
diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/MultiTableRangePartitioner.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/MultiTableRangePartitioner.java
index dba0e0547fd..5cee4854234 100644
--- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/MultiTableRangePartitioner.java
+++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/MultiTableRangePartitioner.java
@@ -18,6 +18,7 @@
import org.apache.log4j.Logger;
import datawave.ingest.mapreduce.job.BulkIngestKey;
+import datawave.ingest.mapreduce.job.SortedList;
import datawave.ingest.mapreduce.job.SplitsFile;
/**
@@ -82,7 +83,7 @@ public int getPartition(BulkIngestKey key, Value value, int numPartitions) {
String tableName = key.getTableName().toString();
- List cutPointArray = null;
+ SortedList cutPointArray = null;
try {
cutPointArray = SplitsFile.getSplits(conf, tableName);
} catch (IOException e) {
@@ -92,8 +93,8 @@ public int getPartition(BulkIngestKey key, Value value, int numPartitions) {
return (tableName.hashCode() & Integer.MAX_VALUE) % numPartitions;
}
key.getKey().getRow(holder);
- int index = Collections.binarySearch(cutPointArray, holder);
- index = calculateIndex(index, numPartitions, tableName, cutPointArray.size());
+ int index = Collections.binarySearch(cutPointArray.get(), holder);
+ index = calculateIndex(index, numPartitions, tableName, cutPointArray.get().size());
index = partitionLimiter.limit(numPartitions, index);
diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatterTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatterTest.java
index be7ea9df7c4..68ac0368e31 100644
--- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatterTest.java
+++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatterTest.java
@@ -1,7 +1,7 @@
package datawave.ingest.mapreduce.job;
import static datawave.ingest.mapreduce.job.BulkIngestMapFileLoader.BULK_IMPORT_MODE_CONFIG;
-import static datawave.ingest.mapreduce.job.MultiRFileOutputFormatter.findKeyExtent;
+import static datawave.ingest.mapreduce.job.MultiRFileOutputFormatter.findContainingSplits;
import static org.junit.Assert.assertEquals;
import java.io.DataOutputStream;
@@ -28,7 +28,6 @@
import org.apache.accumulo.core.file.FileSKVWriter;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.RecordWriter;
@@ -49,7 +48,6 @@
import datawave.ingest.data.config.ingest.AccumuloHelper;
import datawave.ingest.mapreduce.job.BulkIngestMapFileLoader.ImportMode;
-import datawave.ingest.mapreduce.job.MultiRFileOutputFormatter.KeyExtent;
import datawave.util.TableName;
public class MultiRFileOutputFormatterTest {
@@ -235,19 +233,20 @@ public void testPlanning() {
rfileRows.add(new Text("20170601_9"));
rfileRows.add(new Text("20200601_9"));
- Set expectedExtents = new HashSet<>();
- expectedExtents.add(new KeyExtent(new Text("20170601_0"), new Text("20170601_1")));
- expectedExtents.add(new KeyExtent(new Text("20170601_8"), new Text("20170601_9")));
- expectedExtents.add(new KeyExtent(new Text("20170602_0"), new Text("20170602_1")));
- expectedExtents.add(new KeyExtent(new Text("20170603_9"), null));
- expectedExtents.add(new KeyExtent(new Text("20170603_0c"), new Text("20170603_1")));
- expectedExtents.add(new KeyExtent(null, new Text("20170601_0")));
- expectedExtents.add(new KeyExtent(new Text("20170602_9c"), new Text("20170603_0")));
- expectedExtents.add(new KeyExtent(new Text("20170603_0a"), new Text("20170603_0b")));
- expectedExtents.add(new KeyExtent(new Text("20170603_0b"), new Text("20170603_0c")));
+ Set expectedExtents = new HashSet<>();
+ expectedExtents.add(new LoadPlan.TableSplits(new Text("20170601_0"), new Text("20170601_1")));
+ expectedExtents.add(new LoadPlan.TableSplits(new Text("20170601_8"), new Text("20170601_9")));
+ expectedExtents.add(new LoadPlan.TableSplits(new Text("20170602_0"), new Text("20170602_1")));
+ expectedExtents.add(new LoadPlan.TableSplits(new Text("20170603_9"), null));
+ expectedExtents.add(new LoadPlan.TableSplits(new Text("20170603_0c"), new Text("20170603_1")));
+ expectedExtents.add(new LoadPlan.TableSplits(null, new Text("20170601_0")));
+ expectedExtents.add(new LoadPlan.TableSplits(new Text("20170602_9c"), new Text("20170603_0")));
+ expectedExtents.add(new LoadPlan.TableSplits(new Text("20170603_0a"), new Text("20170603_0b")));
+ expectedExtents.add(new LoadPlan.TableSplits(new Text("20170603_0b"), new Text("20170603_0c")));
List tableSplits = getSplits();
- Set extents = rfileRows.stream().map(row -> findKeyExtent(row, tableSplits)).collect(Collectors.toCollection(HashSet::new));
+ Set extents = rfileRows.stream().map(row -> findContainingSplits(row, tableSplits))
+ .collect(Collectors.toCollection(HashSet::new));
assertEquals(expectedExtents, extents);
}
@@ -479,37 +478,28 @@ protected Map getShardLocations(String tableName) throws IOExceptio
}
@Override
- protected SizeTrackingWriter openWriter(String filename, AccumuloConfiguration tableConf) {
+ protected SizeTrackingWriter openWriter(String filename, AccumuloConfiguration tableConf, String table) {
filenames.add(filename);
- return new SizeTrackingWriter(new FileSKVWriter() {
+ return new SizeTrackingWriter(null) {
+ public void startNewLocalityGroup(String name, Set columnFamilies) throws IOException {
- @Override
- public boolean supportsLocalityGroups() {
- return false;
}
- @Override
- public void startNewLocalityGroup(String name, Set columnFamilies) throws IOException {}
+ public void startDefaultLocalityGroup() throws IOException {
- @Override
- public void startDefaultLocalityGroup() throws IOException {}
+ }
- @Override
- public DataOutputStream createMetaStore(String name) throws IOException {
- return null;
+ public void append(Key key, Value value) throws IOException {
+ entries++;
+ size += key.getLength() + (value == null ? 0 : value.getSize());
}
- @Override
public void close() throws IOException {}
- @Override
- public long getLength() throws IOException {
- return 0;
+ public LoadPlan getLoadPlan(String filename) {
+ return LoadPlan.builder().build();
}
-
- @Override
- public void append(Key key, Value value) throws IOException {}
- }, false);
+ };
}
};
diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/TableSplitsCacheTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/TableSplitsCacheTest.java
index d9672d6b28c..db3c705d616 100644
--- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/TableSplitsCacheTest.java
+++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/TableSplitsCacheTest.java
@@ -532,18 +532,18 @@ public void testGetSplitsNoArgument() throws IOException {
Assert.assertNotNull("TableSplitsCache constructor failed to construct an instance.", uut);
- Map> resultsSet = uut.getSplits();
+ Map> resultsSet = uut.getSplits();
Assert.assertNotNull("TableSplitsCache#getSplits() failed created a map of tables and their splits", resultsSet);
Assert.assertFalse("TableSplitsCache#getSplits() incorrectly populated map of tables and their splits", resultsSet.isEmpty());
Assert.assertEquals("TableSplitsCache#getSplits() incorrectly populated map of tables and their splits", 3, resultsSet.size());
- List listings = new ArrayList(resultsSet.get("shard"));
+ List listings = new ArrayList(resultsSet.get("shard").get());
Assert.assertNotNull("TableSplitsCache#getSplits() failed to a list of splits", listings);
Assert.assertFalse("TableSplitsCache#getSplits() incorrectly populated the list of splits", listings.isEmpty());
Assert.assertEquals("TableSplitsCache#getSplits() incorrectly populated the list of splits", 5, listings.size());
- listings = new ArrayList(resultsSet.get("shard1"));
+ listings = new ArrayList(resultsSet.get("shard1").get());
Assert.assertNotNull("TableSplitsCache#getSplits() failed to a list of splits", listings);
Assert.assertFalse("TableSplitsCache#getSplits() incorrectly populated the list of splits", listings.isEmpty());
Assert.assertEquals("TableSplitsCache#getSplits() incorrectly populated the list of splits", 1, listings.size());
@@ -566,7 +566,7 @@ public void testGetSplitsWithArgumentThatMatches() throws IOException {
Assert.assertNotNull("TableSplitsCache constructor failed to construct an instance.", uut);
- List resultsSet = uut.getSplits().get("shard");
+ List resultsSet = uut.getSplits().get("shard").get();
Assert.assertNotNull("TableSplitsCache#getSplits() failed to a list of splits", resultsSet);
Assert.assertFalse("TableSplitsCache#getSplits() incorrectly populated the list of splits", resultsSet.isEmpty());
@@ -589,7 +589,7 @@ public void testGetSplitsWithArgumentThatDoesNotMatch() throws IOException {
Assert.assertNotNull("TableSplitsCache constructor failed to construct an instance.", uut);
- List resultsSet = uut.getSplits("bad-table");
+ List resultsSet = uut.getSplits("bad-table").get();
Assert.assertNotNull("TableSplitsCache#getSplits() failed to a list of splits", resultsSet);
Assert.assertTrue("TableSplitsCache#getSplits() incorrectly populated the list of splits", resultsSet.isEmpty());